""" Utility functions for virus reductions
.. moduleauthor:: Maximilian Fabricius <mxhf@mpe.mpg.de>, gregz
This code relies on original software from:
Copyright (c) 2011-2016, Astropy Developers
Copyright (c) 2012, Free Software Foundation
"""
import numpy as np
import sys
import os
import logging
from collections import OrderedDict
_logger = logging.getLogger()
[docs]def createDir(directory):
""" Creates a directory.
Does not raise an excpetion if the directory already exists.
Args:
directory (string): Name for directory to create.
"""
try:
if not os.path.exists(directory):
os.makedirs(directory)
except OSError:
_logger.error('Creating directory. ' + directory)
[docs]def read_radec(filename):
""" Reads radec.dat file and returns ra,dec,pa.
Args:
filename (str): Filename, typically radec.dat or radec2.dat.
Returns:
float,float,float: 3 element list with RA, DEC and PA
"""
with open(filename, "r") as f:
ll = f.readlines()
ra, dec, pa = ll[0].split()
return float(ra), float(dec), float(pa)
[docs]def write_radec(ra, dec, pa, filename):
""" Creates radec.dat-type file and returns ra,dec,pa.
Args:
ra (float): Right ascension
dec (float): declination
pa (float): position angle
filename (str): Filename, typically radec.dat or radec2.dat.
"""
with open(filename, "w") as f:
s = "{:.6f} {:.6f} {:.6f}\n".format(ra, dec, pa)
f.write(s)
[docs]def read_all_mch(all_mch):
""" Reads all.mch and returns dither information.
Args:
all_mch (str): Filename, typically all.mch.
Returns:
(OrdereDict): Dictionary of float tuples, with dither offsets,
e.g. {1 : (0.,0.), 2 : (1.27,-0.73), 3 : (1.27,0.73)}
"""
dither_offsets = OrderedDict()
with open(all_mch, "r") as f:
ll = f.readlines()
for i, l in enumerate(ll):
tt = l.split()
dx, dy = float(tt[2]), float(tt[3])
dither_offsets[i+1] = (dx, dy)
return dither_offsets
[docs]def rm(ff):
""" Takes a list of files names and deletes them.
Does not raise an Exception if a specific file was not in place.
Args:
ff (list): List of file names to delete.
"""
for f in ff:
try:
os.remove(f)
except Exception:
pass
[docs]def biweight_bin(xv, x, y):
'''
Compute the biweight location with a moving window of size "order"
'''
diff_array = np.hstack([np.diff(xv), np.diff(xv)[-1]]) / 2.
mxcol = 0
sel_list = []
for i, v in enumerate(xv):
sel_list.append(np.where((x > (xv[i]-diff_array[i]))
* (x < (xv[i]+diff_array[i])))[0])
mxcol = np.max([mxcol, len(sel_list[-1])])
A = np.ones((len(xv), mxcol))*-999.
for i, v in enumerate(xv):
A[i, :len(sel_list[i])] = y[sel_list[i]]
C = np.ma.array(A, mask=(A == -999.).astype(np.int))
return biweight_location(C, axis=(1,))
[docs]def biweight_filter2d(a, Order, Ignore_central=(3, 3), c=6.0,
M=None, func=None):
'''
Compute the biweight location with a moving window of size "order"
'''
if not isinstance(Order, tuple):
print("The order should be an tuple")
sys.exit(1)
if not isinstance(Ignore_central, tuple):
print("The ignore_central value should be an tuple")
sys.exit(1)
for order in Order:
if order % 2 == 0:
order += 1
for ignore_central in Ignore_central:
if ignore_central % 2 == 0:
ignore_central += 1
if ((Order[0]-3 <= Ignore_central[0] and Order[0] > 1)
or (Order[1]-3 <= Ignore_central[1] and Order[1] > 1)):
print("The max order-3 should be larger than max ignore_central.")
sys.exit(1)
if func is None:
func = biweight_location
a = np.array(a, copy=False)
if a.ndim != 2:
print("Input array/list should be 2-dimensional")
sys.exit()
yc = np.arange(Ignore_central[0]/2+1, Order[0]/2+1)
xc = np.arange(Ignore_central[1]/2+1, Order[1]/2+1)
ly = np.max([len(yc)*2, 1])
lx = np.max([len(xc)*2, 1])
size = lx * ly
A = np.ones(a.shape + (size,))*-999.
k = 0
if Order[0] > 1:
if Order[1] > 1:
for i in yc:
for j in xc:
A[:-i, :-j, k] = a[i:, j:]
k += 1
for i in yc:
for j in xc:
A[i:, j:, k] = a[:-i, :-j]
k += 1
for i in yc:
for j in xc:
A[:-i, j:, k] = a[i:, :-j]
k += 1
for i in yc:
for j in xc:
A[i:, :-j, k] = a[:-i, j:]
k += 1
else:
for i in yc:
A[:-i, :, k] = a[i:, :]
k += 1
for i in yc:
A[i:, :, k] = a[:-i, :]
k += 1
else:
for j in xc:
A[:, :-j, k] = a[:, j:]
k += 1
for j in xc:
A[:, j:, k] = a[:, :-j]
k += 1
C = np.ma.array(A, mask=(A == -999.).astype(np.int))
return func(C, axis=(2,))
[docs]def biweight_filter(a, order, ignore_central=3, c=6.0, M=None, func=None):
if order % 2 == 0:
order += 1
if ignore_central % 2 == 0:
ignore_central += 1
if func is None:
func = biweight_location
if ignore_central+3 >= order:
print('The order, %i, should be +3 higher than ignore_central, %i.'
% (order, ignore_central))
sys.exit(1)
a = np.array(a, copy=False)
if ignore_central > 0:
yc = np.arange(ignore_central/2+1, order/2+1)
else:
yc = np.arange(0, order/2+1)
ly = np.max([len(yc)*2, 1])
A = np.ones(a.shape + (ly,))*-999.
k = 0
if order > 1:
for i in yc:
A[:-i, k] = a[i:]
k += 1
for i in yc:
A[i:, k] = a[:-i]
k += 1
C = np.ma.array(A, mask=(A == -999.).astype(np.int))
return func(C, axis=(1,))
# def biweight_filter(a, order, ignore_central=3, c=6.0, M=None, func=None):
# '''
# Compute the biweight location with a moving window of size "order"
#
# '''
# if not isinstance(order, int):
# print("The order should be an integer")
# sys.exit(1)
# if not isinstance(ignore_central, int):
# print("The ignore_central value should be an integer")
# sys.exit(1)
# if order%2==0:
# order+=1
# if ignore_central%2==0:
# ignore_central+=1
# if order-3 <= ignore_central:
# print("The order-3 should be larger than ignore_central.")
# sys.exit(1)
# if func is None:
# func = biweight_location
# a = np.array(a, copy=False)
# if a.ndim != 1:
# print("Input array/list should be 1-dimensional")
# sys.exit()
# ignore = [order/2]
# for i in xrange(ignore_central/2):
# ignore.append(order/2 - i - 1)
# ignore.append(order/2 + i + 1)
# half_order = order / 2
# order_array = np.delete(np.arange(order), ignore)
# A = np.zeros((len(a)-order+1, len(order_array)))
# k=0
# for i in order_array:
# if (order-i-1) == 0:
# A[:,k] = a[i:]
# else:
# A[:,k] = a[i:-(order-i-1)]
# k+=1
#
# Ab = func(A, axis=(1,))
# A1 = np.zeros((half_order,))
# A2 = np.zeros((half_order,))
# for i in xrange(half_order):
# ignore_l = [i]
# ignore_h = [half_order]
# for j in xrange(ignore_central/2):
# if (i - j - 1) >=0:
# ignore_l.append(i - j - 1)
# ignore_l.append(i + j + 1)
# if i > j:
# ignore_h.append(half_order+j+1)
# ignore_h.append(half_order-j-1)
# A1[i] = func(np.delete(a[:(half_order+i+1)], ignore_l))
# A2[-(i+1)] = func(np.delete(a[-(half_order+i+1):], ignore_h))
# return np.hstack([A1,Ab,A2])
[docs]def biweight_location(a, c=6.0, M=None, axis=None, eps=1e-8):
"""
Copyright (c) 2011-2016, Astropy Developers
Compute the biweight location for an array.
Returns the biweight location for the array elements.
The biweight is a robust statistic for determining the central
location of a distribution.
The biweight location is given by the following equation
.. math::
C_{bl}= M+\\frac{\Sigma_{\|u_i\|<1} (x_i-M)(1-u_i^2)^2}
{\Sigma_{\|u_i\|<1} (1-u_i^2)^2}
where M is the sample mean or if run iterative the initial guess,
and u_i is given by
.. math::
u_{i} = \\frac{(x_i-M)}{cMAD}
where MAD is the median absolute deviation.
For more details, see Beers, Flynn, and Gebhardt, 1990, AJ, 100, 32B
Parameters
----------
a : array-like
Input array or object that can be converted to an array.
c : float, optional
Tuning constant for the biweight estimator. Default value is 6.0.
M : float, optional
Initial guess for the biweight location.
axis : tuple, optional
tuple of the integer axis values ot calculate over. Should be sorted.
Returns
-------
biweight_location : float
Returns the biweight location for the array elements.
Examples
--------
This will generate random variates from a Gaussian distribution and return
the biweight location of the distribution::
>>> from utils import biweight_location
>>> from numpy.random import randn
>>> randvar = randn(10000)
>>> cbl = biweight_location(randvar)
See Also
--------
median_absolute_deviation, biweight_midvariance
Note
--------
Copy of the astropy function with the "axis" argument added appropriately.
"""
if M is None:
if isinstance(a, np.ma.MaskedArray):
func = np.ma.median
else:
a = np.array(a, copy=False)
func = np.median
M = func(a, axis=axis)
else:
a = np.array(a, copy=False)
N = M*1.
# set up the difference
if axis is not None:
for i in axis:
N = np.expand_dims(N, axis=i)
d = a - N
# set up the weighting
if axis is not None:
MAD = median_absolute_deviation(a, axis=axis)
for i in axis:
MAD = np.expand_dims(MAD, axis=i)
else:
MAD = median_absolute_deviation(a)
u = np.where(MAD < eps, 0., d / c / MAD)
# now remove the outlier points
if isinstance(a, np.ma.MaskedArray):
mask = (np.abs(u) < 1).astype(np.int) * (1-a.mask.astype(np.int))
else:
mask = (np.abs(u) < 1).astype(np.int)
u = (1 - u ** 2) ** 2
return M + (d * u * mask).sum(axis=axis) / (u * mask).sum(axis=axis)
[docs]def biweight_midvariance(a, c=15.0, M=None, axis=None, eps=1e-8, niter=1):
"""
Copyright (c) 2011-2016, Astropy Developers
Compute the biweight midvariance for an array.
Returns the biweight midvariance for the array elements.
The biweight midvariance is a robust statistic for determining
the midvariance (i.e. the standard deviation) of a distribution.
The biweight location is given by the following equation
.. math::
C_{bl}= (n')^{1/2} \\frac{[\Sigma_{|u_i|<1} (x_i-M)^2(1-u_i^2)^4]^{0.5}}
{|\Sigma_{|u_i|<1} (1-u_i^2)(1-5u_i^2)|}
where :math:`u_i` is given by
.. math::
u_{i} = \\frac{(x_i-M)}{cMAD}
where MAD is the median absolute deviation.
:math:`n'` is the number of data for which :math:`|u_i| < 1` holds,
while the summations are over all i up to n:
.. math::
n' = \Sigma_{|u_i|<1}^n 1
This is slightly different than given in the reference below, but
results in a value closer to the true midvariance.
The midvariance parameter c is typically 9.0.
For more details, see Beers, Flynn, and Gebhardt, 1990, AJ, 100, 32B
Parameters
----------
a : array-like
Input array or object that can be converted to an array.
c : float
Tuning constant for the biweight estimator. Default value is 9.0.
M : float, optional
Initial guess for the biweight location.
axis : tuple, optional
tuple of the integer axis values ot calculate over. Should be sorted.
Returns
-------
biweight_midvariance : float
Returns the biweight midvariance for the array elements.
Examples
--------
This will generate random variates from a Gaussian distribution and return
the biweight midvariance of the distribution::
>>> from utils import biweight_midvariance
>>> from numpy.random import randn
>>> randvar = randn(10000)
>>> scl = biweight_midvariance(randvar)
See Also
--------
median_absolute_deviation, biweight_location
Note
--------
Copy of the astropy function with the "axis" argument added appropriately.
"""
for k in np.arange(niter):
if isinstance(a, np.ma.MaskedArray):
func = np.ma.median
else:
a = np.array(a, copy=False)
func = np.median
if M is None or k > 0:
M = func(a, axis=axis)
N = M*1.
# set up the difference
if axis is not None:
for i in axis:
N = np.expand_dims(N, axis=i)
# set up the difference
d = np.asarray(a - N)
# set up the weighting
if axis is not None:
MAD = median_absolute_deviation(a, axis=axis)
for i in axis:
MAD = np.expand_dims(MAD, axis=i)
else:
MAD = median_absolute_deviation(a)
# set up the weighting
u = np.where(MAD < eps, 0., d / c / MAD)
# now remove the outlier points
if isinstance(a, np.ma.MaskedArray):
mask = (np.abs(u) < 1).astype(np.int) * (1-a.mask.astype(np.int))
a.mask = 1 - mask
else:
mask = (np.abs(u) < 1).astype(np.int)
u = u ** 2
n = mask.sum(axis=axis)
return n ** 0.5 * (mask * d**2 * (1 - u) ** 4).sum(axis=axis) ** 0.5\
/ np.abs((mask * (1 - u) * (1 - 5 * u)).sum(axis=axis))
[docs]def is_outlier(points, thresh=3.5):
"""
Copyright (c) 2012, Free Software Foundation
Returns a boolean array with True if points are outliers and False
otherwise.
Parameters:
-----------
points : An numobservations by numdimensions array of observations
thresh : The modified z-score to use as a threshold. Observations with
a modified z-score (based on the median absolute deviation) greater
than this value will be classified as outliers.
Returns:
--------
mask : A numobservations-length boolean array.
References:
----------
Boris Iglewicz and David Hoaglin (1993), "Volume 16: How to Detect and
Handle Outliers", The ASQC Basic References in Quality Control:
Statistical Techniques, Edward F. Mykytka, Ph.D., Editor.
"""
if len(points.shape) == 1:
points = points[:, None]
median = np.median(points, axis=0)
diff = np.sum((points - median)**2, axis=-1)
diff = np.sqrt(diff)
med_abs_deviation = np.median(diff)
modified_z_score = 0.6745 * diff / med_abs_deviation
return modified_z_score > thresh
[docs]def matrixCheby2D_7(x, y):
if isinstance(x, (tuple, list)):
x = np.asarray(x)
if isinstance(y, (tuple, list)):
y = np.asarray(y)
T2x = 2. * x**2 - 1.
T3x = 4. * x**3 - 3. * x
T4x = 8. * x**4 - 8. * x**2 + 1.
T5x = 16. * x**5 - 20. * x**3 + 5. * x
T6x = 32. * x**6 - 48. * x**4 + 18. * x**2 - 1.
T7x = 64. * x**7 - 112. * x**5 + 56. * x**3 - 7. * x
T2y = 2. * y**2 - 1.
T3y = 4. * y**3 - 3. * y
T4y = 8. * y**4 - 8. * y**2 + 1.
T5y = 16. * y**5 - 20. * y**3 + 5. * y
T6y = 32. * y**6 - 48. * y**4 + 18. * y**2 - 1
T7y = 64. * y**7 - 112. * y**5 + 56. * y**3 - 7 * y
return np.vstack((T7x, T6x, T5x, T4x, T3x, T2x, x, T7y, T6y, T5y,
T4y, T3y, T2y, y, T6x*y, x*T6y, T5x*T2y, T2x*T5y,
T4x*T3y, T3x*T4y, T5x*y, x*T5y, T4x*T2y, T2x*T4y,
T3x*T3y, T4x*y, x*T4y, T3x*T2y, T2x*T3y, T3x*y,
x*T3y, T2x*T2y, T2x*y, x*T2y, x*y,
np.ones(x.shape))).swapaxes(0, 1)
# ---------------------------------------------------------------------
# Logging utils
# ---------------------------------------------------------------------
[docs]def setup_logging(logger, logfile, loglevel):
# Setup the logging
fmt = '%(asctime)s %(levelname)-8s %(threadName)12s %(funcName)15s(): ' \
'%(message)s'
formatter = logging.Formatter(fmt, datefmt='%m-%d %H:%M:%S')
logger.setLevel(loglevel)
cHndlr = logging.StreamHandler()
cHndlr.setLevel(loglevel)
cHndlr.setFormatter(formatter)
logger.addHandler(cHndlr)
fHndlr = logging.FileHandler(logfile, mode='w')
fHndlr.setLevel(loglevel)
fHndlr.setFormatter(formatter)
logger.addHandler(fHndlr)
# ---------------------------------------------------------------------
# Config utils
# ---------------------------------------------------------------------
[docs]def mangle_config_pathname(path):
return path.replace('$config', configdir())
[docs]def write_conf_file(fname):
from vdrp.astrometry import getDefaults as ast_getDefaults
from vdrp.photometry import getDefaults as tp_getDefaults
from vdrp.star_extraction import getDefaults as st_getDefaults
from vdrp.calc_fluxlim import getDefaults as fl_getDefaults
from vdrp.fit_radec import getDefaults as fr_getDefaults
# Now read the packaged config file
with open(configdir() + '/vdrp.config', 'r') as _in:
cfgtxt = _in.read()
defaults = {}
ast_defaults = ast_getDefaults()
st_defaults = st_getDefaults()
tp_defaults = tp_getDefaults()
fl_defaults = fl_getDefaults()
fr_defaults = fr_getDefaults()
for k in ast_defaults:
defaults['ast_'+k] = ast_defaults[k]
if 'ast_'+k not in cfgtxt:
print('Astrometry parameter %s is missing in config file!' % k)
for k in st_defaults:
defaults['stext_'+k] = st_defaults[k]
if 'stext_'+k not in cfgtxt:
print('Star Extraction parameter %s is missing in config file!'
% k)
for k in tp_defaults:
defaults['tp_'+k] = tp_defaults[k]
if 'tp_'+k not in cfgtxt:
print('Throughput parameter %s is missing in config file!' % k)
for k in fl_defaults:
defaults['fl_'+k] = fl_defaults[k]
if 'fl_'+k not in cfgtxt:
print('Fluxlim parameter %s is missing in config file!' % k)
for k in fr_defaults:
defaults['fr_'+k] = fr_defaults[k]
if 'fl_'+k not in cfgtxt:
print('FitRADEC parameter %s is missing in config file!' % k)
with open(fname, 'w') as _out:
_out.write(cfgtxt.format(**defaults))
[docs]def print_conffile():
from argparse import ArgumentParser as AP
parser = AP(description='Write a config file with default values')
parser.add_argument('filename', type=str,
help='Name of the file to write')
args = parser.parse_args()
write_conf_file(args.filename)
# ---------------------------------------------------------------------
# Package utils
# ---------------------------------------------------------------------
[docs]def bindir():
return os.path.dirname(os.path.realpath(__file__)) + '/bin'
[docs]def print_bindir():
print(bindir())
[docs]def configdir():
return os.path.dirname(os.path.realpath(__file__)) + '/config'
[docs]def print_configdir():
print(configdir())