Source code for desitarget.skyutilities.legacypipe.util

# Much of this code is from version dr6.rc1.2 of legacypipe:
# https://github.com/legacysurvey/legacypipe
# The files from which this code was derived
# (legacypipe/survey.py) and (legacypipe/utils.py)
# are licensed under the BSD-3 license as of
# version dr6.rc1.2 of legacypipe (git hash e8a188a).
"""
=======================================
desitarget.skyutilities.legacypipe.util
=======================================

Module so desitarget sky fiber code doesn't need explicit legacypipe dependencies
"""
import numpy as np
import os
from desitarget.skyutilities.astrometry.fits import fits_table

[docs]def get_rgb(imgs, bands, mnmx=None, arcsinh=None): '''Given a list of images in the given bands, returns a scaled RGB image. Parameters ---------- imgs : :class:`list` List of numpy arrays, all the same size, in nanomaggies. bands : :class:`list` List of strings, *e.g.*, ``['g','r','z']``. mnmx : :func:`tuple` (min,max), values that will become black/white *after* scaling. Default is (-3,10). arcsinh : :class:`float`, optional Use nonlinear scaling as in SDSS. Returns ------- array-like (H,W,3) numpy array with values between 0 and 1. ''' bands = ''.join(bands) scales = dict(g = (2, 0.0066), r = (1, 0.01), z = (0, 0.025), ) h,w = imgs[0].shape rgb = np.zeros((h,w,3), np.float32) for im,band in zip(imgs, bands): if not band in scales: print('Warning: band', band, 'not used in creating RGB image') continue plane,scale = scales.get(band, (0,1.)) # print('RGB: band', band, 'in plane', plane, 'scaled by', scale) rgb[:,:,plane] = (im / scale).astype(np.float32) if mnmx is None: mn,mx = -3, 10 else: mn,mx = mnmx if arcsinh is not None: def nlmap(x): return np.arcsinh(x * arcsinh) / np.sqrt(arcsinh) rgb = nlmap(rgb) mn = nlmap(mn) mx = nlmap(mx) rgb = (rgb - mn) / (mx - mn) return rgb
def _ring_unique(wcs, W, H, i, unique, ra1,ra2,dec1,dec2): lo, hix, hiy = i, W-i-1, H-i-1 # one slice per side; we double-count the last pix of each side. sidex = slice(lo,hix+1) sidey = slice(lo,hiy+1) top = (lo, sidex) bot = (hiy, sidex) left = (sidey, lo) right = (sidey, hix) xx = np.arange(W) yy = np.arange(H) nu,ntot = 0,0 for slc in [top, bot, left, right]: #print('xx,yy', xx[slc], yy[slc]) (yslc,xslc) = slc #ADM the one change to the legacypipe "borrowed" code, as in #ADM desitarget we use astropy in place of astrometry.net #ADM start at pixel 0,0 in the top-left (the numpy standard) rr, dd = wcs.all_pix2world(xx[xslc], yy[yslc], 0) # rr,dd = wcs.pixelxy2radec(xx[xslc]+1, yy[yslc]+1) U = (rr >= ra1 ) * (rr < ra2 ) * (dd >= dec1) * (dd < dec2) #print('Pixel', i, ':', np.sum(U), 'of', len(U), 'pixels are unique') unique[slc] = U nu += np.sum(U) ntot += len(U) #if allin: # print('Scanned to pixel', i) # break return nu,ntot def find_unique_pixels(wcs, W, H, unique, ra1,ra2,dec1,dec2): if unique is None: unique = np.ones((H,W), bool) # scan the outer annulus of pixels, and shrink in until all pixels # are unique. step = 10 for i in range(0, W//2, step): nu,ntot = _ring_unique(wcs, W, H, i, unique, ra1,ra2,dec1,dec2) #print('Pixel', i, ': nu/ntot', nu, ntot) if nu > 0: i -= step break unique[:i,:] = False unique[H-1-i:,:] = False unique[:,:i] = False unique[:,W-1-i:] = False for j in range(max(i+1, 0), W//2): nu,ntot = _ring_unique(wcs, W, H, j, unique, ra1,ra2,dec1,dec2) #print('Pixel', j, ': nu/ntot', nu, ntot) if nu == ntot: break return unique
[docs]class LegacySurveyData(object): ''' A class describing the contents of a LEGACY_SURVEY_DIR directory -- tables of CCDs and of bricks, and calibration data. Methods for dealing with the CCDs and bricks tables. This class is also responsible for creating LegacySurveyImage objects (eg, DecamImage objects), which then allow data to be read from disk. ''' def __init__(self, survey_dir=None): '''Create a LegacySurveyData object using data from the given *survey_dir* directory, or from the $LEGACY_SURVEY_DIR environment variable. Parameters ---------- survey_dir : string Where to look for files including calibration files, tables of CCDs and bricks, image data, etc. ''' from desiutil.log import get_logger from collections import OrderedDict if survey_dir is None: log.info('survey_dir not passed...using pwd as survey_dir, but this is likely to fail') survey_dir = os.getcwd() self.survey_dir = survey_dir self.cache_dir = None self.output_dir = '.' self.output_file_hashes = OrderedDict() self.ccds = None self.bricks = None # Create and cache a kd-tree for bricks_touching_radec_box ? self.cache_tree = False self.bricktree = None ### HACK! Hard-coded brick edge size, in degrees! self.bricksize = 0.25 self.allbands = 'ugrizY' self.version = None # Filename prefix for coadd files self.file_prefix = 'legacysurvey' def __str__(self): return ('%s: dir %s, out %s' % (type(self).__name__, self.survey_dir, self.output_dir))
[docs] def find_file(self, filetype, brick=None, brickpre=None, band='%(band)s', output=False): ''' Returns the filename of a Legacy Survey file. *filetype* : string, type of file to find, including: "tractor" -- Tractor catalogs "depth" -- PSF depth maps "galdepth" -- Canonical galaxy depth maps "nexp" -- number-of-exposure maps *brick* : string, brick name such as "0001p000" *output*: True if we are about to write this file; will use self.outdir as the base directory rather than self.survey_dir. Returns: path to the specified file (whether or not it exists). ''' from glob import glob if brick is None: brick = '%(brick)s' brickpre = '%(brick).3s' else: brickpre = brick[:3] if output: basedir = self.output_dir else: basedir = self.survey_dir # ADM if you can't find the file, it's one # ADM directory up. Assuming this is dr8+. basedir = basedir.rstrip('/') check = os.path.basename(os.path.dirname(basedir)) if check[:2] == 'dr' and int(check[-1]) >= 8: truebasedir = os.path.dirname(basedir) else: truebasedir = basedir if brick is not None: codir = os.path.join(basedir, 'coadd', brickpre, brick) # Swap in files in the self.cache_dir, if they exist. def swap(fn): if output: return fn return self.check_cache(fn) def swaplist(fns): if output or self.cache_dir is None: return fns return [self.check_cache(fn) for fn in fns] sname = self.file_prefix if filetype == 'bricks': fn = 'survey-bricks.fits.gz' return swap(os.path.join(truebasedir, fn)) elif filetype == 'ccds': if self.version in ['dr1','dr2']: return swaplist([os.path.join(basedir, 'decals-ccds.fits.gz')]) else: return swaplist(sorted( glob(os.path.join(truebasedir, 'survey-ccds-*.fits.gz')))) elif filetype == 'ccd-kds': return swaplist(sorted( glob(os.path.join(truebasedir, 'survey-ccds-*.kd.fits')))) elif filetype == 'tycho2': return swap(os.path.join(basedir, 'tycho2.fits.gz')) elif filetype == 'annotated-ccds': if self.version == 'dr2': return swaplist(sorted( glob(os.path.join(basedir, 'decals-ccds-annotated.fits')))) return swaplist(sorted( glob(os.path.join(truebasedir, 'ccds-annotated-*.fits.gz')))) elif filetype == 'tractor': return swap(os.path.join(basedir, 'tractor', brickpre, 'tractor-%s.fits' % brick)) elif filetype == 'tractor-intermediate': return swap(os.path.join(basedir, 'tractor-i', brickpre, 'tractor-%s.fits' % brick)) elif filetype == 'galaxy-sims': return swap(os.path.join(basedir, 'tractor', brickpre, 'galaxy-sims-%s.fits' % brick)) elif filetype in ['ccds-table', 'depth-table']: ty = filetype.split('-')[0] return swap( os.path.join(codir, '%s-%s-%s.fits' % (sname, brick, ty))) elif filetype in ['image-jpeg', 'model-jpeg', 'resid-jpeg', 'imageblob-jpeg', 'simscoadd-jpeg','imagecoadd-jpeg']: ty = filetype.split('-')[0] return swap( os.path.join(codir, '%s-%s-%s.jpg' % (sname, brick, ty))) elif filetype in ['invvar', 'chi2', 'image', 'model', 'depth', 'galdepth', 'nexp']: return swap(os.path.join(codir, '%s-%s-%s-%s.fits.fz' % (sname, brick, filetype,band))) elif filetype in ['blobmap']: return swap(os.path.join(basedir, 'metrics', brickpre, 'blobs-%s.fits.gz' % (brick))) elif filetype in ['skies']: return swap(os.path.join(basedir, 'metrics', brickpre, 'skies-%s.fits.gz' % (brick))) elif filetype in ['all-models']: return swap(os.path.join(basedir, 'metrics', brickpre, 'all-models-%s.fits' % (brick))) elif filetype == 'checksums': return swap(os.path.join(basedir, 'tractor', brickpre, 'brick-%s.sha256sum' % brick)) print('Unknown filetype "%s"' % filetype) assert(False)
def check_cache(self, fn): if self.cache_dir is None: return fn cfn = fn.replace(self.survey_dir, self.cache_dir) if os.path.exists(cfn): return cfn return fn
[docs] def get_bricks(self): ''' Returns a table of bricks. The caller owns the table. For read-only purposes, see *get_bricks_readonly()*, which uses a cached version. ''' return fits_table(self.find_file('bricks'))
[docs] def get_bricks_readonly(self): ''' Returns a read-only (shared) copy of the table of bricks. ''' if self.bricks is None: self.bricks = self.get_bricks() # Assert that bricks are the sizes we think they are. # ... except for the two poles, which are half-sized assert(np.all(np.abs((self.bricks.dec2 - self.bricks.dec1)[1:-1] - self.bricksize) < 1e-3)) return self.bricks
[docs] def get_brick_by_name(self, brickname): ''' Returns a brick (as one row in a table) by name (string). ''' B = self.get_bricks_readonly() I, = np.nonzero(np.array([n == brickname for n in B.brickname])) if len(I) == 0: return None return B[I[0]]