Source code for desitarget.train.train_test_RF.util.funcs

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from functools import reduce
import operator
import collections
import numpy as np

# ------------------------------------------------------------------------------
# ***Fonction qui produit un string formaté pour afficher une durée à partir
# d'une quantité donnée en secondes***


def Time2StrFunc(tm):

    tmStr = []
    var_tm = divmod(tm, 60.)
    # [ms]
    if divmod(tm, 1.)[1] > 0.:
        tmStr.append("{:d} ms".format(int(divmod(tm, 1.)[1] * 1000.)))
    # [s]
    if int(var_tm[1]) > 0:
        tmStr.append("{:d} sec".format(int(var_tm[1])))

    var_tm = divmod(var_tm[0], 60.)
    # [min]
    if int(var_tm[1]) > 0:
        tmStr.append("{:d} min".format(int(var_tm[1])))
    # [hour]
    if int(var_tm[0]) > 0:
        tmStr.append("{:d} h".format(int(var_tm[0])))

    tmStr = tmStr[::-1]
    tmStr = reduce(lambda a, b: a + ' - ' + b, tmStr)

    return tmStr


[docs]def shift_photo_north(gflux=None, rflux=None, zflux=None): """Convert fluxes in the northern (BASS/MzLS) to the southern (DECaLS) system. Parameters ---------- gflux, rflux, zflux : :class:`array_like` or `float` The flux in nano-maggies of g, r, z bands. Returns ------- The equivalent fluxes shifted to the southern system. Notes ----- - see also https://desi.lbl.gov/DocDB/cgi-bin/private/RetrieveFile?docid=3390;filename=Raichoor_DESI_05Dec2017.pdf;version=1 """ # ADM only use the g-band color shift when r and g are non-zero gshift = gflux * 10**(-0.4*0.013) w = np.where((gflux != 0) & (rflux != 0)) gshift[w] = (gflux[w] * 10**(-0.4*0.013) * (gflux[w]/rflux[w])**complex(-0.059)).real # ADM only use the r-band color shift when r and z are non-zero # ADM and only use the z-band color shift when r and z are non-zero w = np.where((rflux != 0) & (zflux != 0)) rshift = rflux * 10**(-0.4*0.007) zshift = zflux * 10**(+0.4*0.022) rshift[w] = (rflux[w] * 10**(-0.4*0.007) * (rflux[w]/zflux[w])**complex(-0.027)).real zshift[w] = (zflux[w] * 10**(+0.4*0.022) * (rflux[w]/zflux[w])**complex(+0.019)).real return gshift, rshift, zshift
def _Flux2MagFunc(OBJ_Data_flux, OBJ_Data_mw_transmission): OBJ_flux = OBJ_Data_flux / OBJ_Data_mw_transmission limitInf = 1.e-04 OBJ_flux = OBJ_flux.clip(limitInf) OBJ_mag = np.where(OBJ_flux > limitInf, 22.5 - 2.5 * np.log10(OBJ_flux), 0.) OBJ_mag[np.isnan(OBJ_mag)] = 0. OBJ_mag[np.isinf(OBJ_mag)] = 0. return OBJ_mag def Flux2MagFunc(dataArray): gflux = dataArray.FLUX_G[:]/dataArray.MW_TRANSMISSION_G[:] rflux = dataArray.FLUX_R[:]/dataArray.MW_TRANSMISSION_R[:] zflux = dataArray.FLUX_Z[:]/dataArray.MW_TRANSMISSION_Z[:] W1flux = dataArray.FLUX_W1[:]/dataArray.MW_TRANSMISSION_W1[:] W2flux = dataArray.FLUX_W2[:]/dataArray.MW_TRANSMISSION_W2[:] limitInf = 1.e-04 gflux = gflux.clip(limitInf) rflux = rflux.clip(limitInf) zflux = zflux.clip(limitInf) W1flux = W1flux.clip(limitInf) W2flux = W2flux.clip(limitInf) g = np.where(gflux > limitInf, 22.5-2.5*np.log10(gflux), 0.) r = np.where(rflux > limitInf, 22.5-2.5*np.log10(rflux), 0.) z = np.where(zflux > limitInf, 22.5-2.5*np.log10(zflux), 0.) W1 = np.where(W1flux > limitInf, 22.5-2.5*np.log10(W1flux), 0.) W2 = np.where(W2flux > limitInf, 22.5-2.5*np.log10(W2flux), 0.) g[np.isnan(g)] = 0. g[np.isinf(g)] = 0. r[np.isnan(r)] = 0. r[np.isinf(r)] = 0. z[np.isnan(z)] = 0. z[np.isinf(z)] = 0. W1[np.isnan(W1)] = 0. W1[np.isinf(W1)] = 0. W2[np.isnan(W2)] = 0. W2[np.isinf(W2)] = 0. return g, r, z, W1, W2 def ColorsFunc(nbEntries, nfeatures, g, r, z, W1, W2): colors = np.zeros((nbEntries, nfeatures)) colors[:, 0] = g-r colors[:, 1] = r-z colors[:, 2] = g-z colors[:, 3] = g-W1 colors[:, 4] = r-W1 colors[:, 5] = z-W1 colors[:, 6] = g-W2 colors[:, 7] = r-W2 colors[:, 8] = z-W2 colors[:, 9] = W1-W2 colors[:, 10] = r return colors def GetColorsFunc(data, color_names): colors = np.zeros((len(data), len(color_names))) for i, col_name in enumerate(color_names): colors[:, i] = data[col_name] return colors def AreaFunc(dec1, dec2, alpha1, alpha2): res = (alpha2 - alpha1) * (np.sin(dec2) - np.sin(dec1)) * (180. / np.pi)**2 return res def RA_DEC_AreaFunc(OBJ_RA, OBJ_DEC, binVect_RA, binVect_DEC, N_OBJ_th=2): RA_DEC_meshgrid = np.meshgrid(binVect_RA, binVect_DEC) OBJ_dNdRAdDEC = np.histogram2d(x=OBJ_DEC, y=OBJ_RA, bins=[binVect_DEC, binVect_RA])[0] med_OBJ_dNdRAdDEC = np.median(OBJ_dNdRAdDEC) skyArea_meshgrid = AreaFunc(RA_DEC_meshgrid[1][0:-1, 0:-1] * np.pi / 180., RA_DEC_meshgrid[1][1:, 0: -1] * np.pi / 180., RA_DEC_meshgrid[0][0:-1, 0: -1] * np.pi / 180., RA_DEC_meshgrid[0][0:-1, 1:] * np.pi / 180.) area_OK = (OBJ_dNdRAdDEC) >= N_OBJ_th res_area = np.sum(skyArea_meshgrid[area_OK]) return res_area, med_OBJ_dNdRAdDEC def getFromDict(dataDict, mapList): return reduce(operator.getitem, mapList, dataDict) def setInDict(dataDict, mapList, value): getFromDict(dataDict, mapList[:-1])[mapList[-1]] = value def speGetFromDict(dataDict, tag): mapList = tag.split(':') return getFromDict(dataDict, mapList) def speSetInDict(dataDict, tag, value): mapList = tag.split(':') getFromDict(dataDict, mapList[:-1])[mapList[-1]] = value def RecHyParamDictExplFunc(hyParamDict): # 'RecursiveHyParamDictExplorationFunc' = 'Tree2FlatHyParamDictConversionFunc' new_dict = collections.OrderedDict() for key, value in hyParamDict.items(): if (type(value) == dict) or (type(value) == collections.OrderedDict): _dict = collections.OrderedDict([ (':'.join([key, _key]), _value) for _key, _value in RecHyParamDictExplFunc(value).items()]) new_dict.update(_dict) else: new_dict[key] = value return new_dict def Flat2TreeHyParamConvFunc(coords, hyParamDictTemplate, hyParamSpaceTags, hyParamSpaceItems): # 'Flat2TreeHyParamDictConversionFunc' for it, indEl in enumerate(coords): tag = hyParamSpaceTags[it] value = hyParamSpaceItems[it][indEl] speSetInDict(hyParamDictTemplate, tag, value) return hyParamDictTemplate def proba_rmag_func(OBJ_rmag, proba_thold=0.5, rmag_thold=[20.], slope=[0.05]): n_segments = len(rmag_thold) if n_segments > 1: test = np.array(rmag_thold) test = np.sort(test) == test if not(np.all(test)): assert(False), "ATTENTION : rmag_thold non ordonné croissant" proba_thold_rmag = np.ones(OBJ_rmag.size) * proba_thold tmp = np.copy(proba_thold) for num_seg in range(n_segments): test_rmag_thold = OBJ_rmag > rmag_thold[num_seg] if num_seg < (n_segments - 1): test = OBJ_rmag > rmag_thold[num_seg + 1] test_rmag_thold &= ~test tmp -= (rmag_thold[num_seg + 1] - rmag_thold[num_seg]) * slope[num_seg] proba_thold_rmag[test] = tmp proba_thold_rmag[test_rmag_thold] -= ( (OBJ_rmag[test_rmag_thold] - rmag_thold[num_seg]) * slope[num_seg]) return proba_thold_rmag