# timing_model.py
# Defines the basic timing model interface classes
import functools
from .parameter import strParameter
from ..phase import Phase
from astropy import log
import astropy.time as time
import numpy as np
import pint.utils as utils
import astropy.units as u
import copy
import inspect
# parameters or lines in parfiles to ignore (for now?), or at
# least not to complain about
ignore_params = ['START', 'FINISH', 'SOLARN0', 'EPHEM', 'CLK', 'UNITS',
'TIMEEPH', 'T2CMETHOD', 'CORRECT_TROPOSPHERE', 'DILATEFREQ',
'NTOA', 'CLOCK', 'TRES', 'TZRMJD', 'TZRFRQ', 'TZRSITE',
'NITS', 'IBOOT','BINARY']
ignore_prefix = ['DMXF1_','DMXF2_','DMXEP_'] # DMXEP_ for now.
[docs]class Cache(object):
"""Temporarily cache timing model internal computation results.
The Cache class defines two decorators, use_cache and cache_result.
"""
# The name of the cache attribute
the_cache = "cache"
@classmethod
[docs] def cache_result(cls, function):
"""Caching decorator for functions.
This can be applied as a decorator to any timing model method
for which it might be useful to store the value, once computed
for a given TOA. Note that the cache must be manually enabled
and cleared when appropriate, so this functionality should be
used with care.
"""
the_func = function.__name__
@functools.wraps(function)
def get_cached_result(*args, **kwargs):
log.debug("Checking for cached value of %s" % the_func)
# What to do about checking for a change of arguments?
# args[0] should be a "self"
if hasattr(args[0], cls.the_cache):
cache = getattr(args[0], cls.the_cache)
if isinstance(cache, cls):
if hasattr(cache, the_func):
# Return the cached value
log.debug(" ... using cached result")
return getattr(cache, the_func)
else:
# Evaluate the function and cache the results
log.debug(" ... computing new result")
result = function(*args, **kwargs)
setattr(cache, the_func, result)
return result
# Couldn't access the cache, just return the result
# without caching it.
log.debug(" ... no cache found")
return function(*args, **kwargs)
return get_cached_result
@classmethod
[docs] def use_cache(cls, function):
"""Caching decorator for functions.
This can be applied as a decorator to a function that should
internally use caching of function return values. The cache
will be deleted when the function exits. If the top-level function
calls other functions that have caching enabled they will share
the cache, and it will only be deleted when the top-level function
exits.
"""
@functools.wraps(function)
def use_cached_results(*args, **kwargs):
# args[0] should be a "self"
# Test whether a cache attribute is present
if hasattr(args[0], cls.the_cache):
cache = getattr(args[0], cls.the_cache)
# Test whether caching is already enabled
if isinstance(cache, cls):
# Yes, just execute the function
return function(*args, **kwargs)
else:
# Init the cache, excute the function, then delete cache
setattr(args[0], cls.the_cache, cls())
result = function(*args, **kwargs)
setattr(args[0], cls.the_cache, None)
return result
else:
# no "self.cache" attrib is found. Could raise an error, or
# just execute the function normally.
return function(*args, **kwargs)
return use_cached_results
[docs]class TimingModel(object):
"""
Base-level object provides an interface for implementing pulsar timing
models. It contains several over all wrapper methods.
Notes
-----
PINT models pulsar pulse time of arrival at observer from its emission process and
propagation to observer. Emission generally modeled as pulse 'Phase' and propagation.
'time delay'. In pulsar timing different astrophysics phenomenons are separated to
time model components for handling a specific emission or propagation effect.
All timing model component classes should subclass this timing model base class.
Each timing model component generally requires the following parts:
Timing Parameters
Delay/Phase functions which implements the time delay and phase.
Derivatives of delay and phase respect to parameter for fitting toas.
Each timing parameters are stored as TimingModel attribute in the type of `pint.model.parameter`
delay or phase and its derivatives are implemented as TimingModel Methods.
Attributes
----------
params : list
A list of all the parameter names.
prefix_params : list
A list of prefixed parameter names.
delay_funcs : dict
All the delay functions implemented in timing model. The delays do not
need barycentric toas are placed under the 'L1' keys as a list of methods,
the ones needs barycentric toas are under the 'L2' delay. This will be improved
in the future. One a delay method is defined in model component, it should
get registered in this dictionary.
phase_funcs : list
All the phase functions implemented in timing model. Once a phase method is defined
in model component, it should get registered in this list.
delay_derivs : list
All the delay derivatives respect to timing parameters.
Once a delay derivative method is defined in model component, it should get registered in this list.
phase_derivs : list
All the phase derivatives respect to timing parameters.
Once a phase derivative method is defined in model component, it should get registered in this list.
phase_derivs_wrt_delay : list
All the phase derivatives respect to delay.
"""
def __init__(self):
self.params = [] # List of model parameter names
self.prefix_params = [] # List of model parameter names
self.delay_funcs = {'L1':[],'L2':[]} # List of delay component functions
# L1 is the first level of delays. L1 delay does not need barycentric toas
# After L1 delay, the toas have been corrected to solar system barycenter.
# L2 is the second level of delays. L2 delay need barycentric toas
self.phase_funcs = [] # List of phase component functions
self.cache = None
self.add_param(strParameter(name="PSR",
description="Source name",
aliases=["PSRJ", "PSRB"]))
self.model_type = None
self.delay_derivs = {}
self.phase_derivs = {}
self.phase_derivs_wrt_delay = []
[docs] def setup(self):
"""This is a abstract class for setting up timing model class. It is designed for
reading .par file and check parameters.
"""
pass
[docs] def add_param(self, param,binary_param = False):
"""Add a parameter to the timing model. If it is a prefixe parameter,
it will add prefix information to the prefix information attributes.
"""
setattr(self, param.name, param)
self.params += [param.name,]
if binary_param is True:
self.binary_params +=[param.name,]
[docs] def remove_param(self, param):
delattr(self, param)
self.params.remove(param)
if param in self.binary_params:
self.binary_params.remove(param)
[docs] def set_special_params(self, spcl_params):
als = []
for p in spcl_params:
als += getattr(self, p).aliases
spcl_params += als
self.model_special_params = spcl_params
[docs] def param_help(self):
"""Print help lines for all available parameters in model.
"""
s = "Available parameters for %s\n" % self.__class__
for par in self.params:
s += "%s\n" % getattr(self, par).help_line()
return s
[docs] def get_params_of_type(self, param_type):
""" Get all the parameters in timing model for one specific type
"""
result = []
for p in self.params:
par = getattr(self, p)
par_type = type(par).__name__
par_prefix = par_type[:-9]
if param_type.upper() == par_type.upper() or \
param_type.upper() == par_prefix.upper():
result.append(par.name)
return result
#@Cache.use_cache
[docs] def get_prefix_mapping(self,prefix):
"""Get the index mapping for the prefix parameters.
Parameter
----------
prefix : str
Name of prefix.
Return
----------
A dictionary with prefix pararameter real index as key and parameter
name as value.
"""
parnames = [x for x in self.params if x.startswith(prefix)]
mapping = dict()
for parname in parnames:
par = getattr(self, parname)
if par.is_prefix == True and par.prefix == prefix:
mapping[par.index] = parname
return mapping
[docs] def match_param_aliases(self, alias):
p_aliases = {}
# if alias is a parameter name, return itself
if alias in self.params:
return alias
# get all the aliases
for p in self.params:
par = getattr(self, p)
if par.aliases !=[]:
p_aliases[p] = par.aliases
# match alias
for pa, pav in zip(p_aliases.keys(), p_aliases.values()):
if alias in pav:
return pa
# if not found any thing.
return ''
#@Cache.use_cache
[docs] def phase(self, toas):
"""Return the model-predicted pulse phase for the given TOAs."""
# First compute the delays to "pulsar time"
delay = self.delay(toas)
phase = Phase(np.zeros(len(toas)), np.zeros(len(toas)))
# Then compute the relevant pulse phases
for pf in self.phase_funcs:
phase += Phase(pf(toas, delay))
return phase
#@Cache.use_cache
[docs] def delay(self, toas):
"""Total delay for the TOAs.
Return the total delay which will be subtracted from the given
TOA to get time of emission at the pulsar.
"""
delay = np.zeros(len(toas))
for dlevel in self.delay_funcs.keys():
for df in self.delay_funcs[dlevel]:
delay += df(toas)
return delay
#@Cache.use_cache
[docs] def get_barycentric_toas(self,toas):
toasObs = toas['tdbld']
delay = np.zeros(len(toas))
for df in self.delay_funcs['L1']:
delay += df(toas)
toasBary = toasObs*u.day - delay*u.second
return toasBary
def _make_delay_derivative_funcs(self, param, function, name_tplt):
"""This function is a method to help make the delay derivatives wrt timing
model parameters use the parameter specific name and only have the toas
table as input.
Parameter
----------
param: str
Name of parameter
function: method
The method to compute the delay derivatives. It is generally in the
formate of function(toas, parameter)
name_tplt: str
The name template of the new function. The parameter name will be
added in the end. For example: 'd_delay_d_' is a name template.
Return
---------
A delay derivative function wrt to input parameter name with the input
name template and parameter name.
"""
def deriv_func(toas):
return function(toas, param)
deriv_func.__name__ = name_tplt + param
deriv_func.__doc__ = "Delay derivative wrt " + param + " \n"
deriv_func.__doc__ += "Parameter\n----------\ntoas: TOA table\n "
deriv_func.__doc__ += "TOA point where the derivative is evaluated at.\n"
deriv_func.__doc__ += "Return\n---------\n Delay derivatives wrt " + param
deriv_func.__doc__ += " at toa."
setattr(self, deriv_func.__name__, deriv_func)
def _make_phase_derivative_funcs(self, param, function, name_tplt):
"""This function is a method to help make the phase derivatives wrt timing
model parameters use the parameter specific name and only have the toas
table as input.
Parameter
----------
param: str
Name of parameter
function: method
The method to compute the phase derivatives. It is generally in the
formate of 'function(toas, parameter, delay)'
name_tplt: str
The name template of the new function. The parameter name will be
added in the end. For example: 'd_phase_d_' is a name template.
Return
---------
A phase derivative function wrt to input parameter name with the input
name template and parameter name.
"""
def deriv_func(toas, delay):
return function(toas, param, delay)
deriv_func.__name__ = name_tplt + param
deriv_func.__doc__ = "Phase derivative wrt " + param + " \n"
deriv_func.__doc__ += "Parameter\n----------\ntoas: TOA table\n "
deriv_func.__doc__ += "TOA point where the derivative is evaluated at.\n"
deriv_func.__doc__ += "delay: numpy array\n Time delay for phase calculation.\n"
deriv_func.__doc__ += "Return\n---------\n Phase derivatives wrt " + param
deriv_func.__doc__ += " at toa."
setattr(self, deriv_func.__name__, deriv_func)
[docs] def register_deriv_funcs(self, func, deriv_type, param=''):
"""
This is a function to register the derivative function in to the
deriv_func dictionaries.
Parameter
---------
func: method
The method calculates the derivative
deriv_type: str ['delay', 'phase', 'd_phase_d_delay']
Flag for different type of derivatives. It only accepts the three
above.
param: str, if for d_phase_d_delay it is optional
Name of parameter the derivative respect to
"""
if deriv_type == 'd_phase_d_delay':
self.phase_derivs_wrt_delay += [func,]
elif deriv_type == 'delay':
pn = self.match_param_aliases(param)
if pn == '':
raise ValueError("Parameter '%s' in not in the model." % param)
if pn not in self.delay_derivs.keys():
self.delay_derivs[pn] = [func,]
else:
self.delay_derivs[pn] += [func,]
elif deriv_type == 'phase':
pn = self.match_param_aliases(param)
if pn == '':
raise ValueError("Parameter '%s' in not in the model." % param)
if pn not in self.phase_derivs.keys():
self.phase_derivs[pn] = [func,]
else:
self.phase_derivs[pn] += [func,]
[docs] def d_phase_d_tpulsar(self, toas):
"""Return the derivative of phase wrt time at the pulsar.
NOT implemented yet.
"""
pass
[docs] def d_phase_d_toa(self, toas, sample_step=None):
"""Return the derivative of phase wrt TOA
Parameter
---------
toas : PINT TOAs class
The toas when the derivative of phase will be evaluated at.
sample_step : float optional
Finite difference steps. If not specified, it will take 1/10 of the
spin period.
"""
copy_toas = copy.deepcopy(toas)
if sample_step is None:
pulse_period = 1.0 / self.F0.value
sample_step = pulse_period * 1000
sample_dt = [-sample_step, 2 * sample_step]
sample_phase = []
for dt in sample_dt:
dt_array = ([dt] * copy_toas.ntoas) * u.s
deltaT = time.TimeDelta(dt_array)
copy_toas.adjust_TOAs(deltaT)
phase = self.phase(copy_toas.table)
sample_phase.append(phase)
# Use finite difference method.
# phase'(t) = (phase(t+h)-phase(t-h))/2+ 1/6*F2*h^2 + ..
# The error should be near 1/6*F2*h^2
dp = (sample_phase[1] - sample_phase[0])
d_phase_d_toa = dp.int / (2*sample_step) + dp.frac / (2*sample_step)
del copy_toas
return d_phase_d_toa
#@Cache.use_cache
[docs] def d_phase_d_param(self, toas, delay, param):
""" Return the derivative of phase with respect to the parameter.
"""
result = 0.0
par = getattr(self, param)
# TODO need to do correct chain rule stuff wrt delay derivs, etc
# Is it safe to assume that any param affecting delay only affects
# phase indirectly (and vice-versa)??
result = np.longdouble(np.zeros(len(toas))) * u.Unit('')/par.units
param_phase_derivs = []
if param in self.phase_derivs.keys():
for df in self.phase_derivs[param]:
if df.func_name.endswith(param):
result += df(toas, delay).to(result.unit,
equivalencies=u.dimensionless_angles())
else: # Then this is a general derivative function.
result += df(toas, param, delay).to(result.unit,
equivalencies=u.dimensionless_angles())
else: # Apply chain rule for the parameters in the delay.
d_delay_d_p = self.d_delay_d_param(toas, param)
for dpddf in self.phase_derivs_wrt_delay:
result += (dpddf(toas, delay) * d_delay_d_p).to(result.unit,
equivalencies=u.dimensionless_angles())
return result
#@Cache.use_cache
[docs] def d_phase_d_param_num(self, toas, param, step=1e-2):
""" Return the derivative of phase with respect to the parameter.
"""
# TODO : We need to know the range of parameter.
par = getattr(self, param)
ori_value = par.value
unit = par.units
if ori_value == 0:
h = 1.0 * step
else:
h = ori_value * step
parv = [par.value-h, par.value+h]
phaseI = np.zeros((len(toas),2))
phaseF = np.zeros((len(toas),2))
for ii, val in enumerate(parv):
par.value = val
ph = self.phase(toas)
phaseI[:,ii] = ph.int
phaseF[:,ii] = ph.frac
resI = (- phaseI[:,0] + phaseI[:,1])
resF = (- phaseF[:,0] + phaseF[:,1])
result = (resI + resF)/(2.0 * h)
# shift value back to the original value
par.value = ori_value
return result * u.Unit("")/unit
[docs] def d_delay_d_param_num(self, toas, param, step=1e-2):
""" Return the derivative of phase with respect to the parameter.
"""
# TODO : We need to know the range of parameter.
par = getattr(self, param)
ori_value = par.value
if ori_value is None:
# A parameter did not get to use in the model
log.warn("Parameter '%s' is not used by timing model." % param)
return np.zeros(len(toas)) * (u.second/par.units)
unit = par.units
if ori_value == 0:
h = 1.0 * step
else:
h = ori_value * step
parv = [par.value-h, par.value+h]
delay = np.zeros((len(toas),2))
for ii, val in enumerate(parv):
par.value = val
try:
delay[:,ii] = self.delay(toas)
except:
par.value = ori_value
raise
d_delay = (-delay[:,0] + delay[:,1])/2.0/h
par.value = ori_value
return d_delay * (u.second/unit)
[docs] def d_delay_d_param(self, toas, param):
"""
Return the derivative of delay with respect to the parameter.
"""
par = getattr(self, param)
result = np.longdouble(np.zeros(len(toas)) * u.s/par.units)
if param not in self.delay_derivs.keys():
raise AttributeError("Derivative function for '%s' is not provided"
" or not registred. "%param)
for df in self.delay_derivs[param]:
# The derivative function is for a specific parameter.
if df.func_name.endswith(param):
result += df(toas).to(result.unit, equivalencies=u.dimensionless_angles())
else: # Then this is a general derivative function.
result += df(toas, param).to(result.unit, equivalencies=u.dimensionless_angles())
return result
#@Cache.use_cache
[docs] def designmatrix(self, toas, scale_by_F0=True, incfrozen=False, incoffset=True):
"""
Return the design matrix: the matrix with columns of d_phase_d_param/F0
or d_toa_d_param
"""
params = ['Offset',] if incoffset else []
params += [par for par in self.params if incfrozen or
not getattr(self, par).frozen]
F0 = self.F0.quantity # 1/sec
ntoas = len(toas)
nparams = len(params)
delay = self.delay(toas)
units = []
# Apply all delays ?
#tt = toas['tdbld']
#for df in self.delay_funcs:
# tt -= df(toas)
M = np.zeros((ntoas, nparams))
for ii, param in enumerate(params):
if param == 'Offset':
M[:,ii] = 1.0
units.append(u.s/u.s)
else:
# NOTE Here we have negative sign here. Since in pulsar timing
# the residuals are calculated as (Phase - int(Phase)), which is different
# from the conventional defination of least square definetion (Data - model)
# We decide to add minus sign here in the design matrix, so the fitter
# keeps the conventional way.
q = - self.d_phase_d_param(toas, delay,param)
M[:,ii] = q
units.append(u.Unit("")/ getattr(self, param).units)
if scale_by_F0:
mask = []
for ii, un in enumerate(units):
if params[ii] == 'Offset':
continue
units[ii] = un * u.second
mask.append(ii)
M[:, mask] /= F0.value
return M, params, units, scale_by_F0
def __str__(self):
result = ""
for par in self.params:
result += str(getattr(self, par)) + "\n"
return result
[docs] def as_parfile(self):
"""Returns a parfile representation of the entire model as a string."""
result = ""
for par in self.params:
result += getattr(self, par).as_parfile_line()
# Always include UNITS in par file. For now, PINT only supports TDB
result += "UNITS TDB\n"
if hasattr(self,'binary_model_name'):
result += "BINARY {0}\n".format(self.binary_model_name)
return result
[docs] def read_parfile(self, filename):
"""Read values from the specified parfile into the model parameters."""
checked_param = []
repeat_param = {}
pfile = open(filename, 'r')
for l in [pl.strip() for pl in pfile.readlines()]:
# Skip blank lines
if not l:
continue
# Skip commented lines
if l.startswith('#') or l[:2]=="C ":
continue
k = l.split()
name = k[0].upper()
if name in checked_param:
if name in repeat_param.keys():
repeat_param[name] += 1
else:
repeat_param[name] = 2
k[0] = k[0] + str(repeat_param[name])
l = ' '.join(k)
parsed = False
for par in self.params:
if getattr(self, par).from_parfile_line(l):
parsed = True
if not parsed:
try:
prefix,f,v = utils.split_prefixed_name(l.split()[0])
if prefix not in ignore_prefix:
log.warn("Unrecognized parfile line '%s'" % l)
except:
if l.split()[0] not in ignore_params:
log.warn("Unrecognized parfile line '%s'" % l)
checked_param.append(name)
# The "setup" functions contain tests for required parameters or
# combinations of parameters, etc, that can only be done
# after the entire parfile is read
self.setup()
[docs] def is_in_parfile(self,para_dict):
""" Check if this subclass inclulded in parfile.
Parameters
------------
para_dict : dictionary
A dictionary contain all the parameters with values in string
from one parfile
Return
------------
True : bool
The subclass is inculded in the parfile.
False : bool
The subclass is not inculded in the parfile.
"""
pNames_inpar = para_dict.keys()
pNames_inModel = self.params
prefix_inModel = self.prefix_params
# Remove the common parameter PSR
try:
del pNames_inModel[pNames_inModel.index('PSR')]
except:
pass
# For solar system shapiro delay component
if hasattr(self,'PLANET_SHAPIRO'):
if "NO_SS_SHAPIRO" in pNames_inpar:
return False
else:
return True
# For Binary model component
try:
if getattr(self,'binary_model_name') == para_dict['BINARY'][0]:
return True
else:
return False
except:
pass
# Compare the componets parameter names with par file parameters
compr = list(set(pNames_inpar).intersection(pNames_inModel))
if compr==[]:
# Check aliases
for p in pNames_inModel:
al = getattr(self,p).aliases
# No aliase in parameters
if al == []:
continue
# Find alise check if match any of parameter name in parfile
if list(set(pNames_inpar).intersection(al)):
return True
else:
continue
# TODO Check prefix parameter
return False
return True
[docs]def generate_timing_model(name, components, attributes={}):
"""Build a timing model from components.
Returns a timing model class generated from the specifiied
sub-components. The return value is a class type, not an instance,
so needs to be called to generate a usable instance. For example:
MyModel = generate_timing_model("MyModel",(Astrometry,Spindown),{})
my_model = MyModel()
my_model.read_parfile("J1234+1234.par")
"""
# Test that all the components are derived from
# TimingModel
try:
numComp = len(components)
except:
components = (components,)
for c in components:
try:
if not issubclass(c,TimingModel):
raise(TypeError("Class "+c.__name__+
" is not a subclass of TimingModel"))
except:
raise(TypeError("generate_timing_model() Arg 2"+
"has to be a tuple of classes"))
return type(name, components, attributes)
[docs]class TimingModelError(Exception):
"""Generic base class for timing model errors."""
pass
[docs]class MissingParameter(TimingModelError):
"""A required model parameter was not included.
Attributes:
module = name of the model class that raised the error
param = name of the missing parameter
msg = additional message
"""
def __init__(self, module, param, msg=None):
super(MissingParameter, self).__init__()
self.module = module
self.param = param
self.msg = msg
def __str__(self):
result = self.module + "." + self.param
if self.msg is not None:
result += "\n " + self.msg
return result