Source code for desitarget.streams.utilities

"""
desitarget.streams.utilities
============================

Utilities for the DESI MWS Stellar Stream programs.

Borrows heavily from Sergey Koposov's `astrolibpy routines`_.

.. _`astrolibpy routines`: https://github.com/segasai/astrolibpy/blob/master/my_utils
"""
import yaml
import os
import numpy as np
import astropy.coordinates as acoo
import astropy.units as auni
from importlib import resources
from scipy.interpolate import UnivariateSpline
from scipy.optimize import curve_fit
from time import time
import desitarget.streams.gaia_dr3_parallax_zero_point.zpt as gaia_zpt
from numpy.lib import recfunctions as rfn

# ADM set up the DESI default logger.
from desiutil.log import get_logger
log = get_logger()


[docs] def ivars_to_errors(objs, colnames=[]): """ Convert inverse variances to errors without dividing by zero. Parameters ---------- objs : :class:`~numpy.ndarray` Array that contains the columns to be converted from inverse variances to errors. colnames : :class:`list` The names of the columns to convert. Returns ------- :class:`~numpy.ndarray` The input `objs`, modified so that any columns in `colnames` are converted from inverse variances to errors and occurrences of "IVAR" in column names are converted to "ERROR". Notes ----- - Column names are assumed to be upper case for the conversion of IVAR->ERROR in the column names. - No copy is made to save memory. So `objs` will be modified in place and calls like a = ivars_to_errors(b, colnames=["X"]) will alter values in b as well as returning a. """ for colname in colnames: # ADM guard against dividing by zero. error = np.zeros_like(objs[colname]) + np.nan ii = objs[colname] != 0 error[ii] = 1./np.sqrt(objs[ii][colname]) objs[colname] = error newcolname = colname.replace("IVAR", "ERROR") # ADM rename any IVAR columns. objs = rfn.rename_fields(objs, {colname: newcolname}) return objs
[docs] def errors_to_ivars(objs, colnames=[]): """ Convert errors to ivars, correctly dealing with NaNs. Parameters ---------- objs : :class:`~numpy.ndarray` Array that contains the columns to be converted from errors to inverse variances. colnames : :class:`list` The names of the columns to convert. Returns ------- :class:`~numpy.ndarray` The input `objs`, modified so that any columns in `colnames` are converted from errors to inverse variances and occurrences of "ERROR" in column names are converted to "IVAR". Notes ----- - Column names are assumed to be upper case for the conversion of ERROR->IVAR in the column names. - No copy is made to save memory. So `objs` will be modified in place and calls like a = errors_to_ivars(b, colnames=["X"]) will alter values in b as well as returning a. """ for colname in colnames: # ADM remove any NaNs. ivar = np.zeros_like(objs[colname]) ii = ~np.isnan(objs[colname]) ivar[ii] = 1./objs[ii][colname]**2 objs[colname] = ivar newcolname = colname.replace("ERROR", "IVAR") # ADM rename any IVAR columns. objs = rfn.rename_fields(objs, {colname: newcolname}) return objs
[docs] def cosd(x): """Return cos(x) for an angle x in degrees. """ return np.cos(np.deg2rad(x))
[docs] def sind(x): """Return sin(x) for an angle x in degrees. """ return np.sin(np.deg2rad(x))
[docs] def betw(x, x1, x2): """Whether x lies in the range x1 <= x < x2. Parameters ---------- x : :class:`~numpy.ndarray` or `int` or `float` Value(s) that need checked against x1, x2. x1 : :class:`~numpy.ndarray` or `int` or `float` Lower range to check against (inclusive). x2 : :class:`~numpy.ndarray` or `int` or `float` Upper range to check against (exclusive). Returns ------- :class:`array_like` or `boolean` ``True`` for values of `x` that lie in the range x1 <= x < x2. If any input is an array then the output will be a Boolean array. Notes ----- - Very permissive. Arrays can be checked against other arrays, scalars against scalars and arrays against arrays. For example, if all the inputs are arrays the calculation will be element-wise. If `x1` and `x2` are floats and `x` is array-like then each element of `x` will be checked against the range. If `x` and `x2` are floats and `x1` is an array, all possible x1->x2 ranges will be checked. """ return (x >= x1) & (x < x2)
[docs] def torect(ra, dec): """Convert equatorial coordinates to Cartesian coordinates. Parameters ---------- ra : :class:`~numpy.ndarray` or `float` Right Ascension in DEGREES. dec : :class:`~numpy.ndarray` or `float` Declination in DEGREES. Returns ------- :class:`tuple` A tuple of the x, y, z converted values. If `ra`, `dec` are passed as arrays this will be a tuple of x, y, z, arrays. """ x = cosd(ra) * cosd(dec) y = sind(ra) * cosd(dec) z = sind(dec) return x, y, z
[docs] def fromrect(x, y, z): """Convert equatorial coordinates to Cartesian coordinates. Parameters ---------- x, y, z : :class:`~numpy.ndarray` or `float` Cartesian coordinates. Returns ------- :class:`tuple` A tuple of the RA, Dec converted values in DEGREES. If `x`, `y`, `z` are passed as arrays this will be a tuple of RA, Dec arrays. """ ra = np.arctan2(y, x) * 57.295779513082323 dec = 57.295779513082323 * np.arctan2(z, np.sqrt(x**2 + y**2)) return ra, dec
[docs] def rotation_matrix(rapol, decpol, ra0): """Return the rotation matrix corresponding to the pole of rapol, decpol and with the zero of the new latitude corresponding to ra=ra0. The resulting matrix needs to be np.dot'ed with a vector to forward transform that vector. Parameters ---------- rapol, decpol : :class:`float` Pole of the new coordinate system in DEGREES. ra0 : :class:`float` Zero latitude of the new coordinate system in DEGREES. Returns ------- :class:`~numpy.ndarray` 3x3 Rotation matrix. """ tmppol = np.array(torect(rapol, decpol)) # pole axis tmpvec1 = np.array(torect(ra0, 0)) # x axis tmpvec1 = np.array(tmpvec1) tmpvec1[2] = (-tmppol[0] * tmpvec1[0] - tmppol[1] * tmpvec1[1]) / tmppol[2] tmpvec1 /= np.sqrt((tmpvec1**2).sum()) tmpvec2 = np.cross(tmppol, tmpvec1) # y axis M = np.array([tmpvec1, tmpvec2, tmppol]) return M
[docs] def sphere_rotate(ra, dec, rapol, decpol, ra0, revert=False): """Rotate ra, dec to a new spherical coordinate system. Parameters ---------- ra : :class:`~numpy.ndarray` or `float` Right Ascension in DEGREES. dec : :class:`~numpy.ndarray` or `float` Declination in DEGREES. rapol, decpol : :class:`float` Pole of the new coordinate system in DEGREES. ra0 : :class:`float` Zero latitude of the new coordinate system in DEGREES. revert : :class:`bool`, optional, defaults to ``False`` Reverse the rotation. Returns ------- :class:`tuple` A tuple of the the new RA, Dec values in DEGREES. If `ra`, `dec` are passed as arrays this will be a tuple of RA, Dec arrays. """ x, y, z = torect(ra, dec) M = rotation_matrix(rapol, decpol, ra0) if not revert: Axx, Axy, Axz = M[0] Ayx, Ayy, Ayz = M[1] Azx, Azy, Azz = M[2] else: Axx, Ayx, Azx = M[0] Axy, Ayy, Azy = M[1] Axz, Ayz, Azz = M[2] xnew = x * Axx + y * Axy + z * Axz ynew = x * Ayx + y * Ayy + z * Ayz znew = x * Azx + y * Azy + z * Azz del x, y, z tmp = fromrect(xnew, ynew, znew) return (tmp[0], tmp[1])
[docs] def rotate_pm(ra, dec, pmra, pmdec, rapol, decpol, ra0, revert=False): """ Rotate proper motions to a new spherical coordinate system. Parameters ---------- ra, dec : :class:`~numpy.ndarray` or `float` Right Ascension, Declination in DEGREES. pmra, pmdec : :class:`~numpy.ndarray` or `float` Proper motion in Right Ascension, Declination in mas/yr. pmdec : :class:`~numpy.ndarray` or `float` Proper motion in Declination in mas/yr. rapol, decpol : :class:`float` Pole of the new coordinate system in DEGREES. ra0 : :class:`float` Zero latitude of the new coordinate system in DEGREES. revert : :class:`bool`, optional, defaults to ``False`` Reverse the rotation. Returns ------- :class:`tuple` A tuple of the the new pmra, pmdec values in DEGREES. If `ra`, `dec`, etc. are passed as arrays this will be a tuple of arrays. """ ra, dec, pmra, pmdec = [np.atleast_1d(_) for _ in [ra, dec, pmra, pmdec]] M = rotation_matrix(rapol, decpol, ra0) if revert: M = M.T # unit vectors e_mura = np.array([-sind(ra), cosd(ra), ra * 0]) e_mudec = np.array( [-sind(dec) * cosd(ra), -sind(dec) * sind(ra), cosd(dec)]) # velocity vector in arbitrary units V = pmra * e_mura + pmdec * e_mudec del e_mura, e_mudec # apply rotation to velocity V1 = M @ V del V X = np.array([cosd(ra) * cosd(dec), sind(ra) * cosd(dec), sind(dec)]) # apply rotation to position X1 = M @ X del X # rotated coordinates in radians lon = np.arctan2(X1[1, :], X1[0, :]) lat = np.arctan2(X1[2, :], np.sqrt(X1[0, :]**2 + X1[1, :]**2)) del X1 # unit vectors in rotated coordinates e_mura = np.array([-np.sin(lon), np.cos(lon), lon * 0]) e_mudec = np.array( [-np.sin(lat) * np.cos(lon), -np.sin(lat) * np.sin(lon), np.cos(lat)]) del lon, lat return np.sum(e_mura * V1, axis=0), np.sum(e_mudec * V1, axis=0)
[docs] def correct_pm(ra, dec, pmra, pmdec, dist): """Correct proper motions for the Sun's motion. Parameters ---------- ra, dec : :class:`~numpy.ndarray` or `float` Right Ascension, Declination in DEGREES. pmra, pmdec : :class:`~numpy.ndarray` or `float` Proper motion in Right Ascension, Declination in mas/yr. `pmra` includes the cosine term. dist : :class:`float` Distance in kpc. Returns ------- :class:`tuple` A tuple of the the new (pmra, pmdec) values in DEGREES. If `ra`, `dec`, etc. are passed as arrays this will be a tuple of arrays. If the input arrays are length zero, the input pmra, pmdec are returned unaltered """ if len(ra) == 0: return pmra, pmdec # ADM Galactic reference frame. Use astropy v4.0 defaults. GCPARAMS = acoo.galactocentric_frame_defaults.get_from_registry( "v4.0")['parameters'] # ADM some standard units. kms = auni.km / auni.s masyr = auni.mas / auni.year C = acoo.ICRS(ra=ra * auni.deg, dec=dec * auni.deg, radial_velocity=0 * kms, distance=dist * auni.kpc, pm_ra_cosdec=pmra * masyr, pm_dec=pmdec * masyr) frame = acoo.Galactocentric(**GCPARAMS) Cg = C.transform_to(frame) Cg1 = acoo.Galactocentric(x=Cg.x, y=Cg.y, z=Cg.z, v_x=Cg.v_x * 0, v_y=Cg.v_y * 0, v_z=Cg.v_z * 0, **GCPARAMS) del Cg del C del frame C1 = Cg1.transform_to(acoo.ICRS()) del Cg1 # CMR modified to free up some memory, otherwise fails pmracorrfac = C1.pm_ra_cosdec.to_value(masyr) pmdeccorrfac = C1.pm_dec.to_value(masyr) del C1 # return ((C.pm_ra_cosdec - C1.pm_ra_cosdec).to_value(masyr), # (C.pm_dec - C1.pm_dec).to_value(masyr)) return pmra - pmracorrfac, pmdec-pmdeccorrfac
[docs] def get_targmwext_parameters(targmwext_name): """Look up information for a given stream or dwarf Parameters ---------- targmwext_name : :class:`str` Name of a stream that appears in the ../data/streams.yaml file or ../data/dwarf.yaml Possibilities include 'GD1'. Returns ------- :class:`~dict` A dictionary of stream parameters for the passed `targmwext_name`. Includes isochrones and positional information. Notes ----- - Parameters for each stream are in the ../data/streams.yaml file. - Parameters for each dwarf are in the ../data/dwarfs.yaml file. """ # ADM guard against stream being passed as lower-case. targmwext_name = targmwext_name.upper() # ADM open and load the stream parameter yaml file. fn = resources.files('desitarget').joinpath('data/streams.yaml') with open(fn) as f: stream = yaml.safe_load(f) streamlist = list(stream.keys()) fn2 = resources.files('desitarget').joinpath('data/dwarfs.yaml') with open(fn2) as f: dwarf = yaml.safe_load(f) dwarflist = list(dwarf.keys()) if targmwext_name in streamlist: return stream[targmwext_name] elif targmwext_name in dwarflist: return dwarf[targmwext_name]
[docs] def get_CMD_interpolator(stream_name): """Isochrones via interpolating over points in color-magnitude space. Parameters ---------- stream_name : :class:`str` Name of a stream that appears in the ../data/streams.yaml file. Possibilities include 'GD1'. Returns ------- :class:`~scipy.interpolate.UnivariateSpline` A scipy interpolated UnivariateSpline. """ # ADM get information for the stream of interest. stream = get_targmwext_parameters(stream_name) # ADM retrieve the color and magnitude offsets. coloff = stream["COLOFF"] magoff = stream["MAGOFF"] # ADM the isochrones to interpolate over. iso_dartmouth_g = np.array(stream["ISO_G"]) iso_dartmouth_r = np.array(stream["ISO_R"]) # ADM UnivariateSpline is from scipy.interpolate. CMD_II = UnivariateSpline(iso_dartmouth_r[::-1] + magoff, (iso_dartmouth_g - iso_dartmouth_r - coloff)[::-1], s=0) return CMD_II
[docs] def pm12_sel_func(pm1track, pm2track, pmfi1, pmfi2, pm_err, pad=2, mult=2.5): """Select stream members in stream coordinates, using proper motion, padded by some error. This works with PM_RA,PM_Dec, too Parameters ---------- pm1track : :class:`~numpy.ndarray` or :class:`float` Allowed proper motions of stream targets, RA-sense. pm2track : :class:`~numpy.ndarray` or :class:`float` Allowed proper motions of stream targets, Dec-sense. pmfi1 : :class:`~numpy.ndarray` or :class:`float` Proper motion in stream coordinates of possible targets, derived from RA. pmfi2 : :class:`~numpy.ndarray` or :class:`float` Proper motion in stream coordinates of possible targets, derived from Dec. pm_err : :class:`~numpy.ndarray` or :class:`float` Proper motion error in stream coordinates of possible targets, combined across `pmfi1` and `pmfi2` errors. pad: : :class:`float` or :class:`int`, optional Extra offset with which to pad `mult` times proper_motion_error. mult : :class:`float` or :class:`int`, optional Multiple of the proper motion error to use for padding. Returns ------- array-like An array with values that are ``True`` for stream members. """ return np.sqrt((pmfi2 - pm2track)**2 + (pmfi1 - pm1track)**2) < pad + mult * pm_err
[docs] def pm12_distdep_sel_func(pm1track, pm2track, pmfi1, pmfi2, pm_err, dist, velpad, mult=2.5): """Select stream members in stream coordinate, using a proper motion range defined by a velocity width, padded by some multiple of the error. Parameters ---------- pm1track : :class:`~numpy.ndarray` or `float` Allowed proper motions of stream targets, RA-sense. pm2track : :class:`~numpy.ndarray` or `float` Allowed proper motions of stream targets, Dec-sense. pmfi1 : :class:`~numpy.ndarray` or `float` Proper motion in stream coordinates of possible targets, derived from RA. pmfi2 : :class:`~numpy.ndarray` or `float` Proper motion in stream coordinates of possible targets, derived from Dec. pm_err : :class:`~numpy.ndarray` or `float` Proper motion error in stream coordinates of possible targets, combined across `pmfi1` and `pmfi2` errors. pad: : :class:`float` or `int` Width of PM selection in km/s, with `mult` * proper_motion_error mult : :class:`float` or `int`, defaults to 2.5 Multiple of the proper motion error to use for padding. Returns ------- :class:`array_like` or `boolean` ``True`` for stream members. """ dpmtot = np.sqrt((pmfi2 - pm2track)**2 + (pmfi1 - pm1track)**2) dpmlim = velpad/(4.74*dist) return dpmtot < (dpmlim + mult * pm_err)
[docs] def pm0_sel_func(pmra0, pmdec0, D, pad=2, mult=2.5): """Select dwarf members using proper motion, using a PM range around the systemtic PM of the dwarf padded by some error. Parameters ---------- pmra0, pmdec0 : :class:`~numpy.ndarray` or `float` Systemic proper motions of dwarf galaxy. D : :class:`~numpy.ndarray` Numpy structured array of Gaia information that contains at least the columns `PMRA`, `PMDEC`, `PMRA_ERROR`, and `PMDEC_ERROR`. pad: : :class:`float` or `int`, defaults to 2 Extra offset with which to pad `mult` * proper_motion_error. mult : :class:`float` or `int`, defaults to 2.5 Multiple of the proper motion error to use for padding. Returns ------- :class:`array_like` or `boolean` ``True`` for dwarf members. """ # combine proper motion errors in RA and Dec pm_err = np.sqrt(0.5 * (D['PMRA_ERROR']**2 + D['PMDEC_ERROR']**2)) return np.sqrt((D['PMRA'] - pmra0)**2 + (D['PMDEC'] - pmdec0)**2) < pad + mult * pm_err
[docs] def apply_plx_zpt(D): """Apply zero-point correction to Gaia parallax Parameters ---------- D : :class:`~numpy.ndarray` Numpy structured array of Gaia information that contains at least the columns `RA`, `ASTROMETRIC_PARAMS_SOLVED`, `PHOT_G_MEAN_MAG`, `NU_EFF_USED_IN_ASTRONOMY`, `PSEUDOCOLOUR`, `ECL_LAT`, `PARALLAX` `PARALLAX_ERROR`. `PARALLAX_IVAR` will be used instead of `PARALLAX_ERROR` if `PARALLAX_ERROR` is not present. Returns ------- :class:`array_like` or `float` Zero-point corrected parallax. """ # ADM load the Gaia zero points. gaia_zpt.load_tables() subset = np.isin(D['ASTROMETRIC_PARAMS_SOLVED'], [31, 95]) plx_zpt_tmp = gaia_zpt.get_zpt(D['PHOT_G_MEAN_MAG'][subset], D['NU_EFF_USED_IN_ASTROMETRY'][subset], D['PSEUDOCOLOUR'][subset], D['ECL_LAT'][subset], D['ASTROMETRIC_PARAMS_SOLVED'][subset], _warnings=False) plx_zpt = np.zeros(len(D['RA'])) plx_zpt_tmp[~np.isfinite(plx_zpt_tmp)] = 0 plx_zpt[subset] = plx_zpt_tmp plx = D['PARALLAX'] - plx_zpt return plx
[docs] def get_plx_error(D): """Gets parallax error from inverse variance if necessary. Parameters ---------- D : :class:`~numpy.ndarray` Numpy structured array of Gaia information that contains either the column `PARALLAX_ERROR` or the column `PARALLAX_IVAR`. Returns ------- :class:`array_like` or `float` Parallax error. """ if 'PARALLAX_ERROR' in D.dtype.names: parallax_error = D['PARALLAX_ERROR'] elif 'PARALLAX_IVAR' in D.dtype.names: parallax_error = np.zeros_like(D["PARALLAX_IVAR"]) + 1e8 ii = D['PARALLAX_IVAR'] != 0 parallax_error[ii] = 1./np.sqrt(D[ii]['PARALLAX_IVAR']) else: msg = "Either PARALLAX_ERROR or PARALLAX_IVAR must be passed!" log.error(msg) return parallax_error
[docs] def plx_sel_func(dist, D, mult, plx_sys=0.05): """Select stream members using parallax, padded by some error. Parameters ---------- dist : :class:`~numpy.ndarray` or `float` Distance of possible stream members. D : :class:`~numpy.ndarray` Numpy structured array of Gaia information that contains at least the columns `RA`, `ASTROMETRIC_PARAMS_SOLVED`, `PHOT_G_MEAN_MAG`, `NU_EFF_USED_IN_ASTRONOMY`, `PSEUDOCOLOUR`, `ECL_LAT`, `PARALLAX` `PARALLAX_ERROR`. `PARALLAX_IVAR` will be used instead of `PARALLAX_ERROR` if `PARALLAX_ERROR` is not present. mult : :class:`float` or `int` Multiple of the parallax error to use for padding. plx_sys : :class:`float` Extra offset with which to pad `mult` * parallax_error. Returns ------- :class:`array_like` or `boolean` ``True`` for stream members. """ # get parallaxes and apply zero point correction where appropriate plx = apply_plx_zpt(D) # get errors from IVARS or ERRORS parallax_error = get_plx_error(D) dplx = 1 / dist - plx return np.abs(dplx) < plx_sys + mult * parallax_error
[docs] def simple_plx_sel(dist, D, multfac, plxlim, plx_sys=0.05): """Select stream members using a parallax upper limit, padded by some error. Parameters ---------- dist : :class:`~numpy.ndarray` or `float` Distance of possible stream members. D : :class:`~numpy.ndarray` Numpy structured array of Gaia information that contains at least the columns `RA`, `ASTROMETRIC_PARAMS_SOLVED`, `PHOT_G_MEAN_MAG`, `NU_EFF_USED_IN_ASTRONOMY`, `PSEUDOCOLOUR`, `ECL_LAT`, `PARALLAX` `PARALLAX_ERROR`. `PARALLAX_IVAR` will be used instead of `PARALLAX_ERROR` if `PARALLAX_ERROR` is not present. mult : :class:`float` or `int` Multiple of the parallax error to use for padding. plx_sys : :class:`float` Extra offset with which to pad `mult` * parallax_error. plxlim : :class:`float` select possible stream members with plx < plx_lim, plus pad Returns ------- :class:`array_like` or `boolean` ``True`` for stream members. """ # CMR first block of code to fix zpt and ivars is identical to plx_sel plx = apply_plx_zpt(D) parallax_error = get_plx_error(D) # CMR select plx < the upper limit give by plxlim psel = plx < (multfac*parallax_error + plx_sys + plxlim) return psel
[docs] def dwarf_plx_sel_func(dist, D, plx_sys=0.05, mult=2.5, keep_all_neg=False, min_plx_plxerr=-5): """Select dwarf members using parallax, padded by some error. NOTE: This is a slight alteration on the GD1 parallax selection, as it will select anything with a negative parallax regardless of the distance to the galaxy Parameters ---------- dist : :class:`~numpy.ndarray` or `float` Distance of possible stream members. D : :class:`~numpy.ndarray` Numpy structured array of Gaia information that contains at least the columns `RA`, `ASTROMETRIC_PARAMS_SOLVED`, `PHOT_G_MEAN_MAG`, `NU_EFF_USED_IN_ASTRONOMY`, `PSEUDOCOLOUR`, `ECL_LAT`, `PARALLAX` `PARALLAX_ERROR`. `PARALLAX_IVAR` will be used instead of `PARALLAX_ERROR` if `PARALLAX_ERROR` is not present. plx_sys : :class:`float` Extra offset with which to pad `mult` * parallax_error. mult : :class:`float` or `int` Multiple of the parallax error to use for padding. keep_all_neg : :class:`bool` If true, keep all targets with negative parallaxes. min_plx_plxerr : :class:`float` Minimum allowable parallax/parallax error, defaults to -5. Useful if keep_all_neg = True. Returns ------- :class:`array_like` or `boolean` ``True`` for stream members. """ # Apply zero-point correction to parallax plx = apply_plx_zpt(D) # Get parallax error parallax_error = get_plx_error(D) # Apply systemic error padding dplx = plx - 1 / dist if keep_all_neg: sel = (dplx < plx_sys + mult * parallax_error) \ & (plx / parallax_error > min_plx_plxerr) else: sel = (np.abs(dplx) < plx_sys + mult * parallax_error) \ & (plx / parallax_error > min_plx_plxerr) return sel
[docs] def stream_distance(fi1, stream_name, stream): """The distance to members of a stellar stream. Parameters ---------- fi1 : :class:`~numpy.ndarray` or `float` Phi1 stream coordinate of possible targets, derived from RA. stream_name : :class:`str` Name of a stream, e.g. "GD1". Returns ------- :class:`array_like` or `float` The distance to the passed members of the stream. Notes ----- - Output type is the same as that of the passed `fi1`. """ if stream_name.upper() == "GD1": # ADM The distance to GD1 (similar to Koposov et al. 2010 paper). # dm = 18.82 + ((fi1 + 48) / 57)**2 - 4.45 # return 10**(dm / 5. - 2) DISTSP = UnivariateSpline(stream['DIST_PHI1T'], stream['DISTT'], s=0) return DISTSP(fi1) if stream_name.upper() == "ORPHAN": DISTSP = UnivariateSpline(stream['DIST_PHI1T'], stream['DISTT']) return DISTSP(fi1) if stream_name.upper() == "PAL5": DISTSP = UnivariateSpline(stream['DIST_PHI1T'], stream['DISTT']) return DISTSP(fi1) if stream_name.upper() == "C19": DISTSP = UnivariateSpline(stream['DIST_PHI1T'], stream['DISTT']) return DISTSP(fi1) else: msg = f"stream name {stream_name} not recognized" log.error(msg)
[docs] def oldpop_bhb_sel(gmag, rmag, distance): """Select stream members with the CMD properties of BHBs belonging to the stream. Parameters ---------- distance : :class:`~numpy.ndarray` or `float` Distance of possible stream members, in kpc. gmag : :class:`~numpy.ndarray` or `float` extinction-corrected g magnitude of candidates rmag: :class:`~numpy.ndarray` or `float` extinction-corrected r magnitude of candidates Returns ------- :class:`array_like` or `boolean` ``True`` for stream members. Notes ----- Selects BHB for an old metal-poor population. This uses an M92 horizontal branch distance (kpc) to the stream at the phi1 location of each star (i.e., distance to each star if it were in the stream. """ # convert target distance to abs mag dm = 5 * np.log10(distance * 1e3) - 5 absr = rmag - dm # define the horizontal branch in absolute mag # based in the M92 horizontal branch hb_r = np.array([2.72079, 1.21161, 0.78141, 0.48101, 0.42081, 0.36061, 0.30041, 0.24021]) hb_g = np.array([2.297328, 0.877968, 0.547568, 0.446768, 0.486368, 0.525968, 0.565568, 0.605168]) # Now make CMD cut for BHB grw_bhb = 0.5 # BHB width in gr rw_bhb = 0.5 # BHB width in r grmin_bhb = -0.45 # min g-r of BHB grmax_bhb = 0.4 # max g-r of BHB # first select in the correct gmr range gmr_range_bhb = (gmag - rmag < grmax_bhb) & (gmag - rmag > grmin_bhb) # interpolate gmr vs r using the HB gr_bhb = np.interp(absr, hb_r[::-1], hb_g[::-1] - hb_r[::-1], left=np.nan, right=np.nan) # interpolate r vs. gmr using the HB absr_bhb = np.interp(gmag - rmag, hb_g - hb_r, hb_r, left=np.nan, right=np.nan) rr_bhb = absr_bhb + dm # data_gmr - interp_gmr del_color_cmd_bhb = gmag - rmag - gr_bhb # data_r - interp_r del_r_cmd_bhb = rmag - rr_bhb # select if abs(data_gmr - interp_gmr_) < grw_bhb or # abs(data_r - interp_r) < rw_bhb cmdsel_bhb = (gmr_range_bhb) & ((abs(del_color_cmd_bhb) < grw_bhb) | (abs(del_r_cmd_bhb) < rw_bhb)) return cmdsel_bhb
[docs] def cmd_sel_func( dwarf_name, D, rgb_color_tol=0.2, rgb_mag_tol=0.0, rgb_grmin=-0.5, rgb_grmax=1.5, rgb_rmin=16, rgb_rmax=23, hb_color_tol=0.1, hb_mag_tol=0.5, hb_grmin=-0.5, hb_grmax=0.5, hb_rmin=16, hb_rmax=23 ): """Select dwarf galaxy members using CMD cuts. Parameters ---------- dwarf_name : :class:`str` Name of a dwarf galaxy that appears in the ../data/dwarfs.yaml file. Possibilities include 'DRACO_1'. D : :class:`~numpy.ndarray` Numpy structured array of Gaia information that contains at least the columns `FLUX_G`, `FLUX_R`, `FLUX_Z`, `FLUX_IVAR_G`, `FLUX_IVAR_R`, `FLUX_IVAR_Z`, and `EBV`. rgb_color_tol : :class:`float` or `int` Width of RGB cmd selection, defaults to 0.2. rgb_mag_tol : :class:`float` or `int` Height of RGB cmd selection, defaults to 0.0. WARNING: Non-zero values lead to unexpected behavior at the tip of the RGB rgb_grmin : :class:`float` or `int` Minimum extinction-corrected g-r for RGB selection, defaults to -0.5. rgb_grmax : :class:`float` or `int` Maximum extinction-corrected g-r for RGB selection, defaults to 1.5. rgb_rmin : :class:`float` or `int` Bright limit for RGB selection, defaults to 16. rgb_rmax : :class:`float` or `int` Faint limit for RGB selection, defaults to 23. hb_color_tol : :class:`float` or `int` Width of HB cmd selection, defaults to 0.1. hb_mag_tol : :class:`float` or `int` Height of HB cmd selection, defaults to 0.5. hb_grmin : :class:`float` or `int` Minimum extinction-corrected g-r for HB selection, defaults to -0.5. hb_grmax : :class:`float` or `int` Maximum extinction-corrected g-r for HB selection, defaults to 0.5. hb_rmin : :class:`float` or `int` Bright limit for HB selection, defaults to 16. hb_rmax : :class:`float` or `int` Faint limit for HB selection, defaults to 23. show_magerr_plot : :class:`bool` Show log10(rmag error) vs. rmag relationship Returns ------- :class:`array_like` or `boolean` ``True`` for dwarf members. """ # look up the defining parameters of the dwarf. dwarf = get_targmwext_parameters(dwarf_name) # distance to the dwarf dist = dwarf["DIST"] # distance modulus of the dwarf dm = 5 * np.log10(dist * 1e3) - 5 # the RGB isochrone of the dwarf iso_rgb_g = np.array(dwarf["ISO_RGB_G"]) iso_rgb_r = np.array(dwarf["ISO_RGB_R"]) # the HB isochrone of the dwarf iso_hb_g = np.array(dwarf["ISO_HB_G"]) iso_hb_r = np.array(dwarf["ISO_HB_R"]) # retrieve the color and magnitude offsets. coloff = dwarf["COLOFF"] magoff = dwarf["MAGOFF"] # retrieve coefficients for the rmag - rmagerr linear fit rmag_rmagerr_coeffs = np.array(dwarf["RMAG_RMAGERR_COEFFS"]) def log10_error_func(x, a, b): return a * x + b # apply isochrone offsets iso_rgb_gr = iso_rgb_g - iso_rgb_r iso_hb_gr = iso_hb_g - iso_hb_r iso_rgb_r += magoff iso_hb_r += magoff iso_rgb_gr -= coloff iso_hb_gr -= coloff # dust correction. ext_coeff = dict(g=3.237, r=2.176, z=1.217) eg, er, ez = [ext_coeff[_] * D['EBV'] for _ in 'grz'] g, r, z = [22.5 - 2.5 * np.log10(D['FLUX_' + _]) for _ in 'GRZ'] gerr, rerr, zerr = [2.5 / np.log(10) * (np.sqrt(1./D['FLUX_IVAR_'+_]) / D['FLUX_' + _]) for _ in 'GRZ'] g0 = g - eg r0 = r - er z0 = z - ez # magnitude cut along RGB mag_sel_rgb = betw(r0, np.min(iso_rgb_r + dm) - 0.5, rgb_rmax) & betw(g0 - r0, rgb_grmin, rgb_grmax) # color cut along RGB rgb_color_tol = np.sqrt(rgb_color_tol**2 + (3*10**log10_error_func(iso_rgb_r + dm, *rmag_rmagerr_coeffs))**2) grmax1 = np.interp(r0, iso_rgb_r[::-1] + dm, iso_rgb_gr[::-1] + rgb_color_tol[::-1], left=None, right=None) grmax2 = np.interp(r0, iso_rgb_r[::-1] + dm + rgb_mag_tol, iso_rgb_gr[::-1] + rgb_color_tol[::-1], left=None, right=None) grmax3 = np.interp(r0, iso_rgb_r[::-1] + dm - rgb_mag_tol, iso_rgb_gr[::-1] + rgb_color_tol[::-1], left=None, right=None) rgb_grmax_cmd = np.max(np.array([grmax1, grmax2, grmax3]), axis=0) grmin1 = np.interp(r0, iso_rgb_r[::-1] + dm, iso_rgb_gr[::-1] - rgb_color_tol[::-1], left=None, right=None) grmin2 = np.interp(r0, iso_rgb_r[::-1] + dm - rgb_mag_tol, iso_rgb_gr[::-1] - rgb_color_tol[::-1], left=None, right=None) grmin3 = np.interp(r0, iso_rgb_r[::-1] + dm + rgb_mag_tol, iso_rgb_gr[::-1] - rgb_color_tol[::-1], left=None, right=None) rgb_grmin_cmd = np.min(np.array([grmin1, grmin2, grmin3]), axis=0) color_sel_rgb = betw(g0 - r0, rgb_grmin_cmd, rgb_grmax_cmd) # final RGB selection rgb_sel = mag_sel_rgb & color_sel_rgb # magnitude cut along HB mag_sel_hb = betw(r0, hb_rmin, hb_rmax) & betw(g0 - r0, hb_grmin, hb_grmax) # color cut along HB gr_hb = np.interp(r0, iso_hb_r[::-1] + dm, iso_hb_gr[::-1], left=np.nan, right=np.nan) rr_hb = np.interp(g0 - r0, iso_hb_gr, iso_hb_r + dm, left=np.nan, right=np.nan) color_sel_hb = mag_sel_hb & ((abs((g0 - r0) - gr_hb) < hb_color_tol) | (abs(r0 - rr_hb) < hb_mag_tol)) # final HB selection hb_sel = mag_sel_hb & color_sel_hb return rgb_sel | hb_sel
[docs] def spatial_sel_func(ra0, dec0, maxd, D): """Selects dwarf members within a given radius. Currently redundant with the pre-selection used in generating the input catalog. Parameters ---------- ra0, dec0 : :class:`float` Central coordinates of dwarf galaxy in DEGREES. D : :class:`~numpy.ndarray` Numpy structured array of Gaia information that contains at least the columns `RA` and `DEC`. maxd : :class:`~numpy.ndarray` or `float` Maximum distance of possible targets to dwarf. Returns ------- :class:`array_like` or `boolean` ``True`` for dwarf members. """ # coordinates of the dwarf. cdwarf = acoo.SkyCoord(ra0 * auni.degree, dec0 * auni.degree) cobjs = acoo.SkyCoord(D['RA'] * auni.degree, D['DEC'] * auni.degree) # separation between the objects of interest and the dwarf. sep = cobjs.separation(cdwarf) # lies in the the dwarf return sep.value < maxd
[docs] def sort_targmwext_by_rank(in_targmwextlist): """Return a list of all the stream and dwarf names in in_targmwextlist ordered by their priority as given in TARGMWEXT_RANK. Smaller numbers are higher priority. Parameters ---------- in_targmwextlist : :class:string` List of dwarf and stream objects to be sorted Returns ------- :class:`string` Sorted list of dwarf and stream objects """ fn = resources.files('desitarget').joinpath('data/streams.yaml') with open(fn) as f: streaminfo = yaml.safe_load(f) all_stream_names = list(streaminfo.keys()) fn2 = resources.files('desitarget').joinpath('data/dwarfs.yaml') with open(fn2) as f: dwarfinfo = yaml.safe_load(f) all_dwarf_names = list(dwarfinfo.keys()) in_ranklist = [] # CMR in_prilist is 1-to-1 matched to names in in_targmwextlist. for targmwext in in_targmwextlist: # guard against stream being passed as lower-case. targmwext = targmwext.upper() if targmwext in all_stream_names: sinfo = streaminfo[targmwext] rankval = sinfo['TARGMWEXT_RANK'] elif targmwext in all_dwarf_names: dinfo = dwarfinfo[targmwext] rankval = dinfo['TARGMWEXT_RANK'] else: log.info(f"{targmwext} not in the list of known streams or dwarfs") in_ranklist.append(rankval) in_ranklist = np.array(in_ranklist) sortrankidx = np.argsort(in_ranklist) sort_targmwextlist = [] out_ranklist = in_ranklist[sortrankidx] out_targmwextlist = [] for i in range(len(sortrankidx)): nextname = in_targmwextlist[sortrankidx[i]] nextname = nextname.upper() out_targmwextlist.append(nextname) return out_targmwextlist
[docs] def targmwext_resolve(targmwext_name, mws_target, ibright_pm1, ibright_pm2, ibright_pm3, ipm_only, ifaint_cmd, ifiller): """Resolve ambiguity with target subclass bits in the mwstarget mask using TARGMWEXT_RANK. Parameters ---------- targmwext_name : :class:`~numpy.ndarray` or `float` CMR FIX THIS IS A STRING Name of the stream or dwarf galaxy being selected mws_target : :class:`~numpy.ndarray` or `float` Numpy 1d array of target bits as set by PREVIOUS selections ibright_pm1 : :class:`array_like` or `boolean` Numpy 1d array, ``True`` for objects that pass the bright_pm1 selection criteria ibright_pm2 : :class:`array_like` or `boolean` Numpy 1d array, ``True`` for objects that pass the bright_pm2 selection criteria ibright_pm3 : :class:`array_like` or `boolean` Numpy 1d array, ``True`` for objects that pass the bright_pm3 selection criteria ipm_only : :class:`array_like` or `boolean` Numpy 1d array, ``True`` for objects that pass pm_only. Use for dwarfs but not streams ifaint_cmd : :class:`array_like` or `boolean` Numpy 1d array, ``True`` for objects that pass faint_cmd. Use for streams but not dwarfs ifiller : :class:`array_like` or `boolean` Numpy 1d array, ``True`` for objects that pass the filler selection criteria. Returns ------- :class:`array_like` ``True`` if the object is a "BRIGHT_PM1" target and has priorty for duplicates :class:`array_like` ``True`` if the object is a "BRIGHT_PM2" target and has priorty for duplicates :class:`array_like` ``True`` if the object is a "BRIGHT_PM3" target and has priorty for duplicates :class:`array_like` ``True`` if the object is a "PM_ONLY" target and has priorty for duplicates :class:`array_like` ``True`` if the object is a "FAINT_CMD" target and has priorty for duplicates :class:`array_like` ``True`` if the object is a "FILLER" target and has priorty for duplicates Notes ----- - See ../data/targetmask.yaml for the definition of the target bits and targmwext_priority. - Smaller numbers are higher priority. - Streams and dwarfs are selected in rank order and targets selected for lower ranking streams that are also selected in higher ranking dwarfs are only selected if their target subclass outranks the target subclass they were selected as for the higher ranking object. All dwarfs outrank streams. GD1 is the highest ranking stream, Orphan the second. Ranking of subtarget classes: bright_pm1, bright_pm2, bright_pm3, pm_only, faint_cmd, filler We make assumptions, which avoid the brute-force implementation of these priorities: * brightpm[123] never overlap in magnitude, so an object can't be, e.g., pm1 and pm2 in different stream/dwarfs * pm_only can only overlap with brightpm[12] * faint_cmd and filler can only overlap with each other and bright_pm3 The bright_pm1, bright_pm2 and bright_pm3 and pm_only outrank faint_cmd and filler. The only overlaps possible and their relative rankings are: 1) bright_pm3 outranks faint_cmd and filler. bright_pm1 and bright_pm2 are too bright to overlap with either of the faint selections. 2) faint_cmd outranks filler. 3) pm_only can only overlap bright_pm1 and bright_pm2 (and it is only used for dwarfs) Note that the magnitude ranges of bright_pm[123] are always the same so that, e.g., bright_pm1 and bright_pm2 can neve be set for the same object. See <insert pointer to Nathan's document>. """ from desitarget.targetmask import mws_mask # target bits set selecting the current stream or dwarf. any_set_here = ibright_pm1 | ibright_pm2 | ibright_pm3 | ipm_only | ifaint_cmd | ifiller # target bits set when selecting streams and dwarfs that rank higher than this one. any_set_prev = (mws_target & mws_mask['MWS_EXT']) != 0 # look for objects selected in this stream that were also previously targeted. # for a higher ranking stream or dwarf. iialldups = any_set_here & any_set_prev # CMR this is boolean, not bitwise. ndups = np.sum(iialldups != 0) log.info(f"Ndups {ndups}") if ndups > 0: log.info(f"Found {ndups} duplicates when selecting {targmwext_name}") # get the yaml file info for all streams and dwarfs so we can get TARGMWEXT_RANK fn = resources.files('desitarget').joinpath('data/streams.yaml') with open(fn) as f: streaminfo = yaml.safe_load(f) all_stream_names = list(streaminfo.keys()) fn2 = resources.files('desitarget').joinpath('data/dwarfs.yaml') with open(fn2) as f: dwarfinfo = yaml.safe_load(f) all_dwarf_names = list(dwarfinfo.keys()) # get the yaml file info for our current stream or dwarf if targmwext_name in all_stream_names: thisinfo = streaminfo[targmwext_name] elif targmwext_name in all_dwarf_names: thisinfo = dwarfinfo[targmwext_name] thisrank = thisinfo['TARGMWEXT_RANK'] # CMR go through all higher ranking streams and dwarfs and check for dups, # find and resolve by subtarget_class. # We should not have duplicates between dwarfs, but it is easy and useful to check allchecknames = np.concatenate((all_dwarf_names, all_stream_names)) for icheckname in allchecknames: if icheckname in all_dwarf_names: iinfo = dwarfinfo[icheckname] elif icheckname in all_stream_names: iinfo = streaminfo[icheckname] icheckrank = iinfo['TARGMWEXT_RANK'] icheckbit_name = f"MWS_{icheckname}" # now figure out which bright_pm bits are set target_bit_set = iinfo["TARGET_BIT_SET"] if target_bit_set == "STREAM": brightpm1bit = mws_mask.MWS_STREAM_PM1 brightpm2bit = mws_mask.MWS_STREAM_PM2 brightpm3bit = mws_mask.MWS_STREAM_PM3 elif target_bit_set == "DSPH": brightpm1bit = mws_mask.MWS_DSPH_PM1 brightpm2bit = mws_mask.MWS_DSPH_PM2 brightpm3bit = mws_mask.MWS_DSPH_PM3 elif target_bit_set == "UFD": brightpm1bit = mws_mask.MWS_UFD_PM1 brightpm2bit = mws_mask.MWS_UFD_PM2 brightpm3bit = mws_mask.MWS_UFD_PM3 else: log.info(f"did not find brightpm bits for {icheckname}") if icheckrank >= thisrank: # we should not have new dups with a lower ranked object continue # as lower-ranked objects get selected later if icheckname == targmwext_name: # no duplciates with the object we are currently targeting continue # if input stream or dwarf is lower ranking than the object we are checking # we need to remove target subclasses that are lower ranked iicheckdups = iialldups & (mws_target & mws_mask[icheckbit_name] != 0) # dups for the specifc stream or dwarf we are checking if np.sum(iicheckdups) > 0: log.info(f"Duplicate targets found in {icheckname} when selecting targets for {targmwext_name}.") # Now resolve. See assumption list in the comments about possible duplicate target combinations. # bright_pm1 and bright_pm2 for the higher ranking object outranks pm_only for the lower rank obj # so can't set pm_only for this object iduppmonlybrightpm1 = iicheckdups & (mws_target & brightpm1bit != 0) & (ipm_only != 0) if np.sum(iduppmonlybrightpm1) > 0: ipmonly[iduppmonlybrightpm1 != 0] = 0 iduppmonlybrightpm2 = iicheckdups & (mws_target & brightpm2bit != 0) & (ipm_only != 0) if np.sum(iduppmonlybrightpm2) > 0: ipmonly[iduppmonlybrightpm2 != 0] = 0 # our lower ranked object can't set faint_cmd or filler # if the higher ranked one has set bright_pm3 idupbrightpm3faintcmd = iicheckdups & (mws_target & brightpm3bit != 0) & (ifaint_cmd != 0) if np.sum(idupbrightpm3faintcmd) > 0: ifaint_cmd[idupbrightpm3faintcmd != 0] = 0 idupbrightpm3filler = iicheckdups & (mws_target & brightpm3bit != 0) & (ifiller != 0) if np.sum(idupbrightpm3filler) > 0: ifiller[idupbrightpm3filler != 0] = 0 # our lower ranked objectd cannot set filler if the higher ranked one set faint_cmd idupfaintcmdfiller = iicheckdups & (mws_target & mws_mask['MWS_FAINT_CMD'] != 0) & (ifiller != 0) if np.sum(idupfaintcmdfiller) > 0: ifiller[idupfaintcmdfiller != 0] = 0 return ibright_pm1, ibright_pm2, ibright_pm3, ipm_only, ifaint_cmd, ifiller