# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
desitarget.uratmatch
====================
Useful `URAT`_ matching and manipulation routines.
.. _`URAT`: http://cdsarc.u-strasbg.fr/viz-bin/cat/I/329
"""
import os
import numpy as np
import fitsio
import requests
import pickle
from importlib import resources
from time import time
from astropy.io import ascii
from glob import glob
import healpy as hp
from desitarget.internal import sharedmem
from desimodel.footprint import radec2pix
from desitarget.geomask import add_hp_neighbors, radec_match_to
from desitarget import io
# ADM set up the DESI default logger
from desiutil.log import get_logger
log = get_logger()
# ADM start the clock
start = time()
# ADM columns contained in our version of the URAT fits files.
uratdatamodel = np.array([], dtype=[
('URAT_ID', '>i8'), ('RA', '>f8'), ('DEC', '>f8'),
('APASS_G_MAG', '>f4'), ('APASS_G_MAG_ERROR', '>f4'),
('APASS_R_MAG', '>f4'), ('APASS_R_MAG_ERROR', '>f4'),
('APASS_I_MAG', '>f4'), ('APASS_I_MAG_ERROR', '>f4'),
('PMRA', '>f4'), ('PMDEC', '>f4'), ('PM_ERROR', '>f4')
])
[docs]
def get_urat_dir():
"""Convenience function to grab the URAT environment variable.
Returns
-------
:class:`str`
The directory stored in the $URAT_DIR environment variable.
"""
# ADM check that the $URAT_DIR environment variable is set.
uratdir = os.environ.get('URAT_DIR')
if uratdir is None:
msg = "Set $URAT_DIR environment variable!"
log.critical(msg)
raise ValueError(msg)
return uratdir
[docs]
def _get_urat_nside():
"""Grab the HEALPixel nside to be used throughout this module.
Returns
-------
:class:`int`
The HEALPixel nside number for URAT file creation and retrieval.
"""
nside = 32
return nside
[docs]
def scrape_urat(url="http://cdsarc.u-strasbg.fr/ftp/I/329/URAT1/v12/",
nfiletest=None):
"""Retrieve the binary versions of the URAT files.
Parameters
----------
url : :class:`str`
The web directory that hosts the archived binary URAT files.
nfiletest : :class:`int`, optional, defaults to ``None``
If an integer is sent, only retrieve this number of files, for testing.
Returns
-------
Nothing
But the archived URAT files are written to $URAT_DIR/binary.
Notes
-----
- The environment variable $URAT_DIR must be set.
- Runs in about 50 minutes for 575 URAT files.
"""
# ADM check that the URAT_DIR is set and retrieve it.
uratdir = get_urat_dir()
# ADM construct the directory to which to write files.
bindir = os.path.join(uratdir, 'binary')
# ADM the directory better be empty for the wget!
if os.path.exists(bindir):
if len(os.listdir(bindir)) > 0:
msg = "{} should be empty to wget URAT binary files!".format(bindir)
log.critical(msg)
raise ValueError(msg)
# ADM make the directory, if needed.
else:
log.info('Making URAT directory for storing binary files')
os.makedirs(bindir)
index = requests.get(url)
# ADM retrieve any file name that starts with z.
# ADM the [1::2] pulls back just the odd lines from the split list.
garbled = index.text.split("z")[1::2]
filelist = ["z{}".format(g[:3]) for g in garbled]
# ADM if nfiletest was passed, just work with that number of files.
test = nfiletest is not None
if test:
filelist = filelist[:nfiletest]
nfiles = len(filelist)
# ADM loop through the filelist.
start = time()
for nfile, fileinfo in enumerate(filelist):
# ADM make the wget command to retrieve the file and issue it.
cmd = 'wget -q {} -P {}'.format(os.path.join(url, fileinfo), bindir)
os.system(cmd)
if nfile % 25 == 0 or test:
elapsed = time() - start
rate = nfile / elapsed
log.info(
'{}/{} files; {:.1f} files/sec; {:.1f} total mins elapsed'
.format(nfile+1, nfiles, rate, elapsed/60.)
)
log.info('Done...t={:.1f}s'.format(time()-start))
return
[docs]
def urat_binary_to_csv():
"""Convert files in $URAT_DIR/binary to files in $URAT_DIR/csv.
Returns
-------
Nothing
But the archived URAT binary files in $URAT_DIR/binary are
converted to CSV files in the $URAT_DIR/csv.
Notes
-----
- The environment variable $URAT_DIR must be set.
- Relies on the executable urat/fortran/v1dump, which is only
tested at NERSC and might need compiled by the user.
- Runs in about 40 minutes for 575 files.
"""
# ADM check that the URAT_DIR is set.
uratdir = get_urat_dir()
# ADM a quick check that the csv directory is empty before writing.
csvdir = os.path.join(uratdir, 'csv')
if os.path.exists(csvdir):
if len(os.listdir(csvdir)) > 0:
msg = "{} should be empty to make URAT files!".format(csvdir)
log.critical(msg)
raise ValueError(msg)
# ADM make the directory, if needed.
else:
log.info('Making URAT directory for storing CSV files')
os.makedirs(csvdir)
log.info('Begin converting URAT files to CSV...t={:.1f}s'
.format(time()-start))
# ADM check the v1dump executable has been compiled.
readme = resources.files('desitarget').joinpath('urat/fortran/README')
cmd = resources.files('desitarget').joinpath('urat/fortran/v1dump')
if not (os.path.exists(cmd) and os.access(cmd, os.X_OK)):
msg = "{} must have been compiled (see {})".format(cmd, readme)
log.critical(msg)
raise ValueError(msg)
# ADM execute v1dump.
os.system(cmd)
log.info('Done...t={:.1f}s'.format(time()-start))
return
[docs]
def urat_csv_to_fits(numproc=5):
"""Convert files in $URAT_DIR/csv to files in $URAT_DIR/fits.
Parameters
----------
numproc : :class:`int`, optional, defaults to 5
The number of parallel processes to use.
Returns
-------
Nothing
But the archived URAT CSV files in $URAT_DIR/csv are converted
to FITS files in the directory $URAT_DIR/fits. Also, a look-up
table is written to $URAT_DIR/fits/hpx-to-files.pickle for which
each index is an nside=_get_urat_nside(), nested scheme HEALPixel
and each entry is a list of the FITS files that touch that HEAPixel.
Notes
-----
- The environment variable $URAT_DIR must be set.
- if numproc==1, use the serial code instead of the parallel code.
- Runs in about 10 minutes with numproc=25 for 575 files.
"""
# ADM the resolution at which the URAT HEALPix files should be stored.
nside = _get_urat_nside()
# ADM check that the URAT_DIR is set.
uratdir = get_urat_dir()
log.info("running on {} processors".format(numproc))
# ADM construct the directories for reading/writing files.
csvdir = os.path.join(uratdir, 'csv')
fitsdir = os.path.join(uratdir, 'fits')
# ADM make sure the output directory is empty.
if os.path.exists(fitsdir):
if len(os.listdir(fitsdir)) > 0:
msg = "{} should be empty to make URAT FITS files!".format(fitsdir)
log.critical(msg)
raise ValueError(msg)
# ADM make the output directory, if needed.
else:
log.info('Making URAT directory for storing FITS files')
os.makedirs(fitsdir)
# ADM construct the list of input files.
infiles = sorted(glob("{}/*csv*".format(csvdir)))
nfiles = len(infiles)
# ADM the critical function to run on every file.
def _write_urat_fits(infile):
"""read an input name for a csv file and write it to FITS"""
outbase = os.path.basename(infile)
outfilename = "{}.fits".format(outbase.split(".")[0])
outfile = os.path.join(fitsdir, outfilename)
# ADM astropy understands without specifying format='csv'.
fitstable = ascii.read(infile)
# ADM map the ascii-read csv to typical DESI quantities.
nobjs = len(fitstable)
done = np.zeros(nobjs, dtype=uratdatamodel.dtype)
# ADM have to do this one-by-one, given the format.
done["RA"] = fitstable['col1']/1000./3600.
done["DEC"] = fitstable['col2']/1000./3600. - 90.
done["PMRA"] = fitstable['col16']/10.
done["PMDEC"] = fitstable['col17']/10.
done["PM_ERROR"] = fitstable['col18']/10.
done["APASS_G_MAG"] = fitstable['col36']/1000.
done["APASS_R_MAG"] = fitstable['col37']/1000.
done["APASS_I_MAG"] = fitstable['col38']/1000.
done["APASS_G_MAG_ERROR"] = fitstable['col41']/1000.
done["APASS_R_MAG_ERROR"] = fitstable['col42']/1000.
done["APASS_I_MAG_ERROR"] = fitstable['col43']/1000.
done["URAT_ID"] = fitstable['col46']
fitsio.write(outfile, done, extname='URATFITS')
# ADM return the HEALPixels that this file touches.
pix = set(radec2pix(nside, done["RA"], done["DEC"]))
return [pix, os.path.basename(outfile)]
# ADM this is just to count processed files in _update_status.
nfile = np.zeros((), dtype='i8')
t0 = time()
def _update_status(result):
"""wrapper function for the critical reduction operation,
that occurs on the main parallel process"""
if nfile % 25 == 0 and nfile > 0:
rate = nfile / (time() - t0)
elapsed = time() - t0
log.info(
'{}/{} files; {:.1f} files/sec; {:.1f} total mins elapsed'
.format(nfile, nfiles, rate, elapsed/60.)
)
nfile[...] += 1 # this is an in-place modification
return result
# - Parallel process input files...
if numproc > 1:
pool = sharedmem.MapReduce(np=numproc)
with pool:
pixinfile = pool.map(_write_urat_fits, infiles, reduce=_update_status)
# ADM ...or run in serial.
else:
pixinfile = list()
for file in infiles:
pixinfile.append(_update_status(_write_urat_fits(file)))
# ADM create a list for which each index is a HEALPixel and each
# ADM entry is a list of files that touch that HEALPixel.
npix = hp.nside2npix(nside)
pixlist = [[] for i in range(npix)]
for pixels, file in pixinfile:
for pix in pixels:
pixlist[pix].append(file)
# ADM write out the HEALPixel->files look-up table.
outfilename = os.path.join(fitsdir, "hpx-to-files.pickle")
outfile = open(outfilename, "wb")
pickle.dump(pixlist, outfile)
outfile.close()
log.info('Done...t={:.1f}s'.format(time()-t0))
return
[docs]
def urat_fits_to_healpix(numproc=5):
"""Convert files in $URAT_DIR/fits to files in $URAT_DIR/healpix.
Parameters
----------
numproc : :class:`int`, optional, defaults to 5
The number of parallel processes to use.
Returns
-------
Nothing
But the archived URAT FITS files in $URAT_DIR/fits are
rearranged by HEALPixel in the directory $URAT_DIR/healpix.
The HEALPixel sense is nested with nside=_get_urat_nside(), and
each file in $URAT_DIR/healpix is called healpix-xxxxx.fits,
where xxxxx corresponds to the HEALPixel number.
Notes
-----
- The environment variable $URAT_DIR must be set.
- if numproc==1, use the serial code instead of the parallel code.
- Runs in about 10 minutes with numproc=25.
"""
# ADM the resolution at which the URAT HEALPix files should be stored.
nside = _get_urat_nside()
# ADM check that the URAT_DIR is set.
uratdir = get_urat_dir()
# ADM construct the directories for reading/writing files.
fitsdir = os.path.join(uratdir, 'fits')
hpxdir = os.path.join(uratdir, 'healpix')
# ADM make sure the output directory is empty.
if os.path.exists(hpxdir):
if len(os.listdir(hpxdir)) > 0:
msg = "{} should be empty to make URAT HEALPix files!".format(hpxdir)
log.critical(msg)
raise ValueError(msg)
# ADM make the output directory, if needed.
else:
log.info('Making URAT directory for storing HEALPix files')
os.makedirs(hpxdir)
# ADM read the pixel -> file look-up table.
infilename = os.path.join(fitsdir, "hpx-to-files.pickle")
infile = open(infilename, "rb")
pixlist = pickle.load(infile)
npixels = len(pixlist)
# ADM include the pixel number explicitly in the look-up table.
pixlist = list(zip(np.arange(npixels), pixlist))
# ADM the critical function to run on every file.
def _write_hpx_fits(pixlist):
"""from files that touch a pixel, write out objects in each pixel"""
pixnum, files = pixlist
# ADM only proceed if some files touch a pixel.
if len(files) > 0:
# ADM track if it's our first time through the files loop.
first = True
# ADM Read in files that touch a pixel.
for file in files:
filename = os.path.join(fitsdir, file)
objs = fitsio.read(filename)
# ADM only retain objects in the correct pixel.
pix = radec2pix(nside, objs["RA"], objs["DEC"])
if first:
done = objs[pix == pixnum]
first = False
else:
done = np.hstack([done, objs[pix == pixnum]])
# ADM construct the name of the output file.
outfilename = io.hpx_filename(pixnum)
outfile = os.path.join(hpxdir, outfilename)
# ADM write out the file.
hdr = fitsio.FITSHDR()
hdr['HPXNSIDE'] = nside
hdr['HPXNEST'] = True
fitsio.write(outfile, done, extname='URATHPX', header=hdr)
return
# ADM this is just to count processed files in _update_status.
npix = np.zeros((), dtype='i8')
t0 = time()
def _update_status(result):
"""wrapper function for the critical reduction operation,
that occurs on the main parallel process"""
if npix % 500 == 0 and npix > 0:
rate = npix / (time() - t0)
elapsed = time() - t0
log.info(
'{}/{} files; {:.1f} files/sec; {:.1f} total mins elapsed'
.format(npix, npixels, rate, elapsed/60.)
)
npix[...] += 1 # this is an in-place modification
return result
# - Parallel process input files...
if numproc > 1:
pool = sharedmem.MapReduce(np=numproc)
with pool:
_ = pool.map(_write_hpx_fits, pixlist, reduce=_update_status)
# ADM ...or run in serial.
else:
for pix in pixlist:
_update_status(_write_hpx_fits(pix))
log.info('Done...t={:.1f}s'.format(time()-t0))
return
[docs]
def make_urat_files(numproc=5, download=False):
"""Make the HEALPix-split URAT files in one fell swoop.
Parameters
----------
numproc : :class:`int`, optional, defaults to 5
The number of parallel processes to use.
download : :class:`bool`, optional, defaults to ``False``
If ``True`` then wget the URAT binary files from Vizier.
Returns
-------
Nothing
But produces:
- URAT DR1 binary files in $URAT_DIR/binary (if download=True).
- URAT CSV files with all URAT columns in $URAT_DIR/csv.
- FITS files with columns from `uratdatamodel` in $URAT_DIR/fits.
- FITS files reorganized by HEALPixel in $URAT_DIR/healpix.
The HEALPixel sense is nested with nside=_get_urat_nside(), and
each file in $URAT_DIR/healpix is called healpix-xxxxx.fits,
where xxxxx corresponds to the HEALPixel number.
Notes
-----
- The environment variable $URAT_DIR must be set.
- if numproc==1, use the serial, instead of the parallel, code.
- Runs in about 2 hours with numproc=25 if download is ``True``.
- Runs in about 1 hour with numproc=25 if download is ``False``.
"""
t0 = time()
log.info('Begin making URAT files...t={:.1f}s'.format(time()-t0))
# ADM check that the URAT_DIR is set.
uratdir = get_urat_dir()
# ADM a quick check that the fits and healpix directories are empty
# ADM before embarking on the slower parts of the code.
csvdir = os.path.join(uratdir, 'csv')
fitsdir = os.path.join(uratdir, 'fits')
hpxdir = os.path.join(uratdir, 'healpix')
for direc in [csvdir, fitsdir, hpxdir]:
if os.path.exists(direc):
if len(os.listdir(direc)) > 0:
msg = "{} should be empty to make URAT files!".format(direc)
log.critical(msg)
raise ValueError(msg)
if download:
scrape_urat()
log.info('Retrieved URAT files from Vizier...t={:.1f}s'
.format(time()-t0))
urat_binary_to_csv()
log.info('Converted binary files to CSV...t={:.1f}s'.format(time()-t0))
urat_csv_to_fits(numproc=numproc)
log.info('Converted CSV files to FITS...t={:.1f}s'.format(time()-t0))
urat_fits_to_healpix(numproc=numproc)
log.info('Rearranged FITS files by HEALPixel...t={:.1f}s'.format(time()-t0))
return
[docs]
def find_urat_files(objs, neighbors=True, radec=False):
"""Find full paths to URAT healpix files for objects by RA/Dec.
Parameters
----------
objs : :class:`~numpy.ndarray`
Array of objects. Must contain the columns "RA" and "DEC".
neighbors : :class:`bool`, optional, defaults to ``True``
Also return all pixels that touch the files of interest
to prevent edge effects (e.g. if a URAT source is 1 arcsec
away from a primary source and so in an adjacent pixel).
radec : :class:`bool`, optional, defaults to ``False``
If ``True`` then the passed `objs` is an [RA, Dec] list
instead of a rec array that contains "RA" and "DEC".
Returns
-------
:class:`list`
A list of all URAT files to read to account for objects at
the passed locations.
Notes
-----
- The environment variable $URAT_DIR must be set.
"""
# ADM the resolution at which the URAT HEALPix files are stored.
nside = _get_urat_nside()
# ADM check that the URAT_DIR is set and retrieve it.
uratdir = get_urat_dir()
hpxdir = os.path.join(uratdir, 'healpix')
# ADM remember to pass "strict", as URAT doesn't cover the whole sky.
return io.find_star_files(objs, hpxdir, nside, strict=True,
neighbors=neighbors, radec=radec)
[docs]
def match_to_urat(objs, matchrad=1., radec=False):
"""Match objects to URAT healpix files and return URAT information.
Parameters
----------
objs : :class:`~numpy.ndarray`
Must contain at least "RA" and "DEC".
matchrad : :class:`float`, optional, defaults to 1 arcsec
The radius at which to match in arcseconds.
radec : :class:`bool`, optional, defaults to ``False``
If ``True`` then the passed `objs` is an [RA, Dec] list instead of
a rec array.
Returns
-------
:class:`~numpy.ndarray`
The matching URAT information for each object. The returned
format is as for desitarget.uratmatch.uratdatamodel with
an extra column "URAT_SEP" which is the matching distance
in ARCSECONDS.
Notes
-----
- For objects that do NOT have a match in URAT, the "URAT_ID"
and "URAT_SEP" columns are -1, and other columns are zero.
- Retrieves the CLOSEST match to URAT for each passed object.
- Because this reads in HEALPixel split files, it's (far) faster
for objects that are clumped rather than widely distributed.
"""
# ADM parse whether a structure or coordinate list was passed.
if radec:
ra, dec = objs
else:
ra, dec = objs["RA"], objs["DEC"]
# ADM set up an array of URAT information for the output.
nobjs = len(ra)
done = np.zeros(nobjs, dtype=uratdatamodel.dtype)
# ADM objects without matches should have URAT_ID, URAT_SEP of -1.
done["URAT_ID"] = -1
urat_sep = np.zeros(nobjs) - 1
# ADM determine which URAT files need to be scraped.
uratfiles = find_urat_files([ra, dec], radec=True)
nfiles = len(uratfiles)
# ADM catch the case of no matches to URAT.
if nfiles > 0:
# ADM loop through the URAT files and find matches.
for ifn, fn in enumerate(uratfiles):
if ifn % 500 == 0 and ifn > 0:
log.info('{}/{} files; {:.1f} total mins elapsed'
.format(ifn, nfiles, (time()-start)/60.))
urat = fitsio.read(fn)
idurat, idobjs, dist = radec_match_to(
[urat["RA"], urat["DEC"]], [ra, dec],
sep=matchrad, radec=True, return_sep=True)
# ADM update matches whenever we have a CLOSER match.
ii = (urat_sep[idobjs] == -1) | (urat_sep[idobjs] > dist)
done[idobjs[ii]] = urat[idurat[ii]]
urat_sep[idobjs[ii]] = dist[ii]
# ADM add the separation distances to the output array.
dt = uratdatamodel.dtype.descr + [("URAT_SEP", ">f4")]
output = np.zeros(nobjs, dtype=dt)
for col in uratdatamodel.dtype.names:
output[col] = done[col]
output["URAT_SEP"] = urat_sep
return output