#!/usr/bin/env python
#
# Copyright (C) 2015, the ximpol team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU GengReral Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import numpy
from astropy.io import fits
from ximpol.utils.logging_ import logger, abort
from ximpol.irf.base import OGIP_HEADER_SPECS
from ximpol.core.fitsio import xBinTableHDUBase
from ximpol.core.spline import xInterpolatedUnivariateSplineLinear
from ximpol.core.rand import xUnivariateAuxGenerator
from ximpol.core.spline import optimize_grid_linear
[docs]def mdp99(mu_effective, num_signal, num_bkg=0.):
"""Return the MDP at the 99% confidence level.
Note that the function returns numpy.nan if the number of signal events is
zero.
Arguments
---------
mu_effective : float or array
The effective (i.e., weighted over the count spectrum) modulation
factor.
num_signal : float or array
The number of signal events.
num_bkg : float or array
The number of background events.
"""
if num_signal == 0:
return numpy.nan
return 4.292/mu_effective*numpy.sqrt(num_signal + num_bkg)/num_signal
[docs]class XMDPRecord:
"""Small utility class to keep track
"""
def __init__(self, mdp, min_energy, max_energy, mu_effective, num_signal,
num_bkg=0):
"""Constructor.
"""
self.mdp = mdp
self.min_energy = min_energy
self.max_energy = max_energy
self.mu_effective = mu_effective
self.num_signal = num_signal
self.num_bkg = num_bkg
def __str__(self):
"""String formatting.
"""
return '%.2f--%.2f keV: %d/%d counts, effective mu %.3f, MDP %.2f%%' %\
(self.min_energy, self.max_energy, self.num_signal, self.num_bkg,
self.mu_effective, 100*self.mdp)
[docs]class xMDPTable:
"""Small utility class to store a set MDP values evaluated in energy bins.
"""
def __init__(self, observation_time):
"""Constructor.
"""
self.observation_time = observation_time
self.rows = []
[docs] def add_row(self, mdp, min_energy, max_energy, mu_effective, num_signal,
num_bkg=0):
"""Add a row to the MDP table.
"""
row = XMDPRecord(mdp, min_energy, max_energy, mu_effective, num_signal,
num_bkg)
self.rows.append(row)
[docs] def mdp_values(self):
"""Return the MDP values in the various energy bins.
"""
return [row.mdp for row in self.rows]
def __str__(self):
"""String formatting.
"""
text = 'MDP table (%.1f s observation time)\n' % self.observation_time
for row in self.rows:
text += '%s\n' % row
return text
[docs]class xBinTableHDUMODFRESP(xBinTableHDUBase):
"""Binary table for the MODFRESP extension of a mrf file.
"""
NAME = 'MODFRESP'
HEADER_KEYWORDS = [
('HDUCLAS1', 'RESPONSE', 'dataset relates to spectral response'),
('HDUCLAS2', 'MODFRESP', 'dataset contains spectral response')
] + OGIP_HEADER_SPECS
DATA_SPECS = [
('ENERG_LO', 'E', 'keV'),
('ENERG_HI', 'E', 'keV'),
('MODFRESP', 'E')
]
[docs]class xAzimuthalResponseGenerator(xUnivariateAuxGenerator):
"""Random number generator for the azimuthal response of the polarimeter.
Here is the basic underlying math. Typically the response of a polarimeter
to monochromatic, linearly polarized incident radiation is of the form:
.. math:: N(\\phi) = A + B \\cos^2(\\phi - \\phi_0).
This can be conveniently rewritten in term of the overall normalization
(i.e., the total number of events) and the visibility of the modulation,
defined as
.. math::
\\xi = \\frac{N_\\text{max} - N_\\text{min}}
{N_\\text{max} + N_\\text{min}} = \\frac{B}{2A + B}
(the visibility can also be characterized as the fraction of modulated
events in a given distribution, as can be readily demonstrated, and it is
the quantity that the detector is effectively measuring).
The angular response now becomes
.. math::
N(\\phi) = \\frac{N_0}{\\pi} \\left[
\\frac{(1 - \\xi)}{2} + \\xi \\cos^2(\\phi - \\phi_0)
\\right].
For completeness, the visibility, the modulation factor and the polarization
degree for a monocromatic source are related to each other through:
.. math::
\\xi(E, t) = P(E, t) \\times \\mu(E)
(i.e., at any given energy the modulation factor is the visibility of the
modulation of the detector response for 100% linearly polarized incident
radiation).
In terms of throwing random numbers, the phase is a trivial constant that
can be added after the fact (modulo 2pi), so effectively the
relevant probability density function is
.. math::
\\text{pdf}(\\phi) = \\frac{1}{\\pi} \\left[
\\frac{(1 - \\xi)}{2} + \\xi \\cos^2(\\phi) \\right],
.. image:: ../figures/test_azimuthal_resp_pdf.png
The corresponding cumulative distribution function is
.. math::
\\text{cdf}(\\phi) = \\frac{1}{2\\pi} \\left(
\\phi + \\frac{\\xi}{2}\\sin{(2\\phi)} \\right),
and it is unfortunate that it cannot be inverted, otherwise we would
have no need to interpolate for generating random numbers according to
this distribution.
.. image:: ../figures/test_azimuthal_resp_cdf.png
From the prospecive of the code, this generator is a standard
`xUnivariateAuxGenerator` where the azimuthal angle is our
random variable and the visbility is our auxiliary variable. For any given
value of the visibility, a vertical slice is providing the corresponding
one-dimensional pdf.
.. image:: ../figures/test_azimuthal_resp_generator.png
The class also provide facilities to fit a histogram to recover the
underlying modulation visibility and phase.
Example
-------
>>> import numpy
>>> from ximpol.utils.matplotlib_ import pyplot as plt
>>> from ximpol.irf.mrf import xAzimuthalResponseGenerator
>>>
>>> generator = xAzimuthalResponseGenerator()
>>> visibility = numpy.full(1000000, 0.5)
>>> phase = numpy.radians(45.)
>>> phi = generator.rvs_phi(visibility, phase)
>>> hist = plt.hist(phi, bins=numpy.linspace(0, 2*numpy.pi, 100),
histtype='step', label='Random angles')
>>> fit_results = generator.fit_histogram(hist)
>>> fit_results.plot()
>>> plt.show()
.. image:: ../figures/test_azimuthal_resp_rvs.png
"""
def __init__(self):
"""Constructor.
"""
_visibility = numpy.linspace(0., 1., 100)
_phi = numpy.linspace(0., 2*numpy.pi, 100)
fmt = dict(auxname='$\\xi$', rvname='$\\phi$', rvunits='rad')
xUnivariateAuxGenerator.__init__(self, _visibility, _phi, self.pdf,
**fmt)
@classmethod
[docs] def pdf(cls, phi, visibility):
"""Evaluate the underlying one-dimensional pdf for a given value of the
visibility, and assuming that the phase of the modulation is zero.
Arguments
---------
phi : float or array
The (independent) azimuthal angle variable, in radians.
visibility : float or array
The visibility of the modulation, in [0--1].
"""
return (0.5*(1. - visibility) +\
visibility*numpy.power(numpy.cos(phi), 2.0))/numpy.pi
@classmethod
[docs] def cdf(cls, phi, visibility):
"""Evaluate the underlying one-dimensional cdf for a given value of the
visibility, and assuming that the phase of the modulation is zero.
Arguments
---------
phi : float or array
The (independent) azimuthal angle variable, in radians.
visibility : float or array
The visibility of the modulation, in [0--1].
Warning
-------
We could overload the build_vpppf method for the class using this,
since we have an analytic expression. (This function is effectively
not used at the moment.)
"""
return (phi + 0.5*visibility*numpy.sin(2.*phi))/(2*numpy.pi)
[docs] def rvs_phi(self, visibility, phase):
"""Generate random variates for any visibility and phase values.
This is essentially calling the underlying xUnivariateAuxGenerator.rvs()
method passing the visibility array as an argument and adding the phase
array (modulo 2pi) to the output.
Arguments
---------
visibility : array
An array of visibility values. (The function returns an equal-length
array of phi values.)
phase : float or array
The phase of the modulation. (This can either be a vector or an
array of the same length as `visibility`.)
"""
return numpy.mod(self.rvs(visibility) + phase, 2*numpy.pi)
@classmethod
[docs] def fit_function(cls, phi, visibility, phase, normalization):
"""Convenience function (with the phase back in) to allow histogram
fitting.
"""
return normalization*cls.pdf((phi - phase), visibility)
@classmethod
[docs] def fit_histogram(cls, histogram, fit_normalization=False):
"""Fit an azimuthal histogram.
"""
from scipy.optimize import curve_fit
_y, binning, patches = histogram
_x = 0.5*(binning[1:] + binning[:-1])
norm = _y.sum()*(binning[1] - binning[0])
if not fit_normalization:
p0 = (0.5, 0.5)
# Wrap the fit function, keeping the normalization frozen.
def f(phi, visibility, phase):
return cls.fit_function(phi, visibility, phase, norm)
else:
p0 = (0.5, 0.5, norm)
f = cls.fit_function
popt, pcov = curve_fit(f, _x, _y, p0, numpy.sqrt(_y))
if not fit_normalization:
# Add back the normalization to the parameter vector and covariance
# matrix.
popt = numpy.concatenate((popt, numpy.array([norm])))
pcov = numpy.vstack((pcov, numpy.array([[0., 0.]])))
pcov = numpy.hstack((pcov, numpy.array([[0., 0., 0.]]).transpose()))
_xs = numpy.linspace(0, 2*numpy.pi, 250)
_ys = cls.fit_function(_xs, *popt)/(binning[1] - binning[0])
spline = xInterpolatedUnivariateSplineLinear(_xs, _ys)
mask = _y > 0.
obs = _y[mask]
exp = []
for _min, _max in zip(binning[:-1], binning[1:]):
exp.append(spline.integral(_min, _max))
exp = numpy.array(exp)[mask]
chisquare = ((exp - obs)**2/exp).sum()
# Horrible hack.
if popt[0] < 0.:
popt[0] = -popt[0]
popt[1] += 0.5*numpy.pi
if popt[1] < 0.:
popt[1] += numpy.pi
return xModulationFitResults(popt, pcov, chisquare, len(mask))
[docs]class xModulationFitResults:
"""Small convenience class encapsulating the result of a fit to an
azimuthal angle distribution.
This includes facilities for plotting and annotating the best-fit
model (e.g., overlaying it onto the underlying fitted histogram).
"""
def __init__(self, popt, pcov, chisquare, ndof):
"""Constructor.
"""
self.popt = popt
self.pcov = pcov
self.chisquare = chisquare
self.ndof = ndof
self.visibility, self.phase, self.normalization = popt
self.visibility_error, self.phase_error,\
self.normalization_error = numpy.sqrt(pcov.diagonal())
self.polarization_degree = None
self.polarization_degree_error = None
[docs] def set_polarization(self, modulation_factor):
"""
"""
self.polarization_degree = self.visibility/modulation_factor
self.polarization_degree_error = self.visibility_error/modulation_factor
[docs] def plot(self, show=False, stat=True, text_size=15, simple_stat=False,
**options):
"""Plot the fit results.
"""
from ximpol.utils.matplotlib_ import pyplot as plt
_x = numpy.linspace(0., 2*numpy.pi, 100)
_y = xAzimuthalResponseGenerator.fit_function(_x, *self.popt)
plt.plot(_x, _y, **options)
if stat:
posh = 0.02
posv = 0.25
delv = 0.07
for i, text in enumerate(self.latex().split(',')):
if simple_stat and (i is 0 or i is 2):
posv -= delv
continue
text = text.strip()
plt.text(posh, posv, text, transform=plt.gca().transAxes,
fontsize=text_size)
posv -= delv
if show:
plt.show()
[docs] def latex(self):
"""LaTeX formatting.
"""
text = r'$\xi = %.3f \pm %.3f$, $\phi = (%.2f \pm %.2f)^\circ$,'\
r'$\chi^2/{\rm ndf} = %.1f/%d$' %\
(self.visibility, self.visibility_error,
numpy.degrees(self.phase), numpy.degrees(self.phase_error),
self.chisquare, self.ndof)
if self.polarization_degree is not None:
text += r', $P = %.3f \pm %.3f$' %\
(self.polarization_degree, self.polarization_degree_error)
return text
def __str__(self):
"""String formatting.
"""
text = 'Visibility = %.3f +/- %.3f, phase = %.2f +/- %.2f deg' %\
(self.visibility, self.visibility_error,
numpy.degrees(self.phase), numpy.degrees(self.phase_error))
if self.polarization_degree is not None:
text += ', polarization degree = %.3f +/- %.3f' %\
(self.polarization_degree, self.polarization_degree_error)
return text
[docs]class xStokesAccumulator:
"""Small utility class implementing the event-by-event analysis dexcribed
in https://arxiv.org/abs/1409.6214
"""
def __init__(self):
"""Constructor.
"""
self.I = 0.
self.Q = 0.
self.U = 0.
[docs] def reset(self):
"""Reset the Stokes parameters.
"""
self.I = 0.
self.Q = 0.
self.U = 0.
[docs] def fill(self, phi):
"""Fill the accumulator with one or more values of measured azimuthal
directions.
Arguments
---------
phi : float or array
The (independent) azimuthal angle variable, in radians.
"""
if isinstance(phi, numpy.ndarray):
self.I += len(phi)
self.Q += (numpy.cos(2*phi)).sum()
self.U += (numpy.sin(2*phi)).sum()
else:
self.I += 1
self.Q += numpy.cos(2*phi)
self.U += numpy.sin(2*phi)
[docs] def u(self):
"""Return the normalized U parameter (11a).
"""
if self.I == 0.:
return 0.
return self.U/self.I
[docs] def q(self):
"""Return the normalized Q parameter (11b).
"""
if self.I == 0.:
return 0.
return self.Q/self.I
[docs] def visibility(self):
"""Return the current accumulated visibility and associated uncertainty. Recall that the visibility is the polarization fraction times the modulation function. The polarization function comes from eq 21 and here we set the modulation function to 1.
"""
if self.I < 2:
return (None, None)
_v = 2*numpy.sqrt(self.q()**2 + self.u()**2)
_dv = numpy.sqrt((2 - _v**2)/(self.I - 1))
return _v, _dv
[docs] def phase(self):
"""Return the current accumulated phase and associated undertainty. Based on eq 22.
"""
if self.I < 2:
return (None, None)
_p = 0.5*numpy.arctan2(self.u(), self.q())
_v = 2*numpy.sqrt(self.q()**2 + self.u()**2)
_dp = 1./(_v*numpy.sqrt(2*(self.I - 1)))
return _p, _dp
[docs] def polarization_frac(self,mu):
"""Return the polarization fraction given the visibility divided by effective mu.
"""
eff_mu = mu
#pol_frac = (2/mu)*numpy.sqrt(self.q()**2 + self.u()**2)
#denominator = (self.I - 1)*eff_mu**2
#dpol_frac = numpy.sqrt((2-pol_frac**2*eff_mu**2)/denominator)
v,dv = self.visibility()
pol_frac = v/eff_mu
dpol_frac = dv/eff_mu
return pol_frac, dpol_frac
[docs]class xModulationFactor(xInterpolatedUnivariateSplineLinear):
"""Class describing the modulation factor.
The effective area is essentially a linear spline, with built-in facilities
for evaluation and plotting.
Arguments
---------
mrf_file_path : str
The path to the .mrf FITS file containing the modulation response table.
To zero-th order, an `xModulationFactor` instance is an object capable of
evaluating itself in a point or in a grid of points, and capable of
plotting itself.
Example
-------
>>> import os
>>> import numpy
>>> from ximpol import XIMPOL_IRF
>>>
>>> file_path = os.path.join(XIMPOL_IRF,'fits','xipe_baseline.mrf')
>>> modf = xModulationFactor(file_path)
>>> x = numpy.arange(1, 10, 1)
>>> print(modf(x))
>>> modf.view()
More interestingly, it can generate random `phi` values, given a vector
of event energies and corresponding vectors (or simple floats) representing
the polarization degree and angle corresponding to the energies themselves.
Internally, any `xModulationFactor` has an `xAzimuthalResponseGenerator`
object and when the `xModulationFactor.rvs_phi()` method is called,
the polarization degree is multiplied by the modulation factor of the
detector, evaluated at the right energy, and converted into a visibility
value, after which the underlying `xAzimuthalResponseGenerator.rvs_phi()`
is called.
Example
-------
>>> import os
>>> import numpy
>>> from ximpol import XIMPOL_IRF
>>>
>>> file_path = os.path.join(XIMPOL_IRF,'fits','xipe_baseline.mrf')
>>> modf = xModulationFactor(file_path)
>>> # Throw 100000 random energy values.
>>> energy = numpy.random.uniform(1, 10, 100000)
>>> # This will create an array of 100000 phi values where the visibility
>>> # of the modulation is tracking the modulation factor of the polarimeter
>>> # (the phase is constant at 45 degrees).
>>> phi = modf.rvs_phi(energy, 1., numpy.radians(45))
"""
def __init__(self, mrf_file_path):
"""Constructor.
"""
logger.info('Reading modulation factor data from %s...' % mrf_file_path)
self.hdu_list = fits.open(mrf_file_path)
self.hdu_list.info()
_data = self.hdu_list['MODFRESP'].data
_x = 0.5*(_data.field('ENERG_LO') + _data.field('ENERG_HI'))
_y = _data.field('MODFRESP')
fmt = dict(xname='Energy', xunits='keV', yname='Modulation factor',
optimize=True, tolerance=1e-4)
xInterpolatedUnivariateSplineLinear.__init__(self, _x, _y, **fmt)
self.generator = xAzimuthalResponseGenerator()
[docs] def rvs_phi(self, energy, polarization_degree, polarization_angle):
"""Return random variates for a given array of values of energy,
polarization degree and polarization angle.
Arguments
---------
energy : array
An array of energy values. (The function returns an equal-length
array of phi values.)
polarization_degree : array or float
The polarization degree, in [0--1]. (This can either be a vector
or an array of the same length as `energy`.)
polarization_angle : array or float
The polarization angle, in radians. (This can either be a vector or
an array of the same length as `energy`.)
"""
try:
min_degree = polarization_degree.min()
max_degree = polarization_degree.max()
except AttributeError:
# This is catching the case where the polarization degree is
# constant and is passed through as a float.
min_degree = max_degree = polarization_degree
if max_degree > 1:
abort('The polarization degree must be <= 1')
if min_degree < 0:
abort('The polarization degree must be >= 0')
visibility = self(energy)*polarization_degree
return self.generator.rvs_phi(visibility, polarization_angle)
[docs] def weighted_average(self, energy):
"""Return the weighted average of the mudulation factor given an
array of energies.
.. math::
\\mu_{\\rm eff} =
Arguments
---------
energy : array
An array of energy values.
"""
return self(energy).sum()/len(energy)
[docs] def view(self, show=True, **kwargs):
"""Overloaded method for the xpirfview application.
"""
self.plot(show=show, **kwargs)
if __name__ == '__main__':
main()