#!/usr/bin/env python
#
# Copyright (C) 2015, the ximpol team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU GengReral Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Module encapsulating the event structure and related facilities.
"""
import numpy
import numbers
from astropy.io import fits
from astropy.coordinates import SkyCoord
from ximpol.utils.logging_ import logger
from ximpol.core.fitsio import xPrimaryHDU, xBinTableHDUBase
from ximpol.core.fitsio import FITS_TO_NUMPY_TYPE_DICT
[docs]class xBinTableHDUEvents(xBinTableHDUBase):
"""Binary table description for the EVENTS extension of the observation
output files.
This is partially modeled based on the `XMM-Newton data format
<http://www.mpe.mpg.de/xray/wave/xmm/cookbook/EPIC_PN/event_descr.php>`_.
A recap of the basic FITS data types is `here
<https://pythonhosted.org/pyfits/users_guide/users_table.html>`_.
This bit is separated from the additional Monte Carlo event fields, as
in the future the "real" part of the data format might be stored
elsewhere.
"""
NAME = 'EVENTS'
DATA_SPECS = [
('TIME' , 'D', 's' , 'event time in seconds'),
('PHA' , 'I', None , 'uncorrected event channel'),
('PE_ANGLE', 'E', 'degrees', 'reconstructed photoelectron angle'),
('ENERGY' , 'E', 'KeV' , 'reconstructed event energy'),
('RA' , 'E', 'degrees', 'reconstructed right ascension'),
('DEC' , 'E', 'degrees', 'reconstructed declination')
]
[docs]class xBinTableHDUMonteCarloEvents(xBinTableHDUBase):
"""Binary table description for the EVENTS extension of the observation
output files, including the additional Monte Carlo fields.
"""
NAME = xBinTableHDUEvents.NAME
DATA_SPECS = xBinTableHDUEvents.DATA_SPECS + [
('PHASE' , 'E', None , 'Event phase (for periodic sources)'),
('MC_ENERGY', 'E', 'keV' , 'Monte Carlo event energy'),
('MC_RA' , 'E', 'degrees', 'Monte Carlo right ascension'),
('MC_DEC' , 'E', 'degrees', 'Monte Carlo declination'),
('MC_SRC_ID', 'I', None , 'Monte Carlo souce identifier')
]
[docs]class xBinTableHDUGTI(xBinTableHDUBase):
"""Binary tablefor the good time intervals (GTI).
"""
NAME = 'GTI'
DATA_SPECS = [
('START', 'D', 's', 'GTI start time'),
('STOP' , 'D', 's', 'GTI stop time')
]
[docs]class xBinTableHDURoiTable(xBinTableHDUBase):
"""Binary tablefor the good time intervals (GTI).
"""
NAME = 'ROITABLE'
DATA_SPECS = [
('SRCID' , 'I' , None, 'source identifier'),
('SRCNAME', 'A20', None, 'source name')
]
[docs]class xMonteCarloEventList(dict):
"""Class describing a Monte Carlo event list.
"""
def __init__(self):
"""Constructor.
"""
for name, dtype in xBinTableHDUMonteCarloEvents.spec_names_and_types():
dtype = FITS_TO_NUMPY_TYPE_DICT[dtype]
self[name] = numpy.array([], dtype)
self.length = 0
[docs] def __len__(self):
"""Return the length of the event list.
"""
return self.length
[docs] def set_column(self, name, data):
"""Set a column array.
Arguments
---------
name : string
The name of the column
data : array or number
The actual data to put in the column. (If `data` is a number,\
an array of the proper length is automatically created, assuming\
that the `lenght` class member is defined.)
"""
assert self.has_key(name)
dtype = self[name].dtype
if isinstance(data, numbers.Number):
data = numpy.full(self.length, data, dtype)
if self.length > 0:
assert(len(data) == self.length)
else:
self.length = len(data)
self[name] = data
[docs] def __add__(self, other):
"""Concatenate two event lists.
"""
_list = xMonteCarloEventList()
for name in xBinTableHDUMonteCarloEvents.spec_names():
_list.set_column(name,numpy.append(self[name], other[name]))
return _list
[docs] def sort(self):
"""Sort the event list based on the event time.
"""
_index = numpy.argsort(self['TIME'])
for name in xBinTableHDUMonteCarloEvents.spec_names():
self.set_column(name, self[name][_index])
[docs] def trim(self, mask):
"""Trim all the data columns according to a generic input mask.
"""
assert len(mask) == len(self)
for name in xBinTableHDUMonteCarloEvents.spec_names():
self[name] = self[name][mask]
self.length = numpy.count_nonzero(mask)
[docs] def apply_vignetting(self, aeff, ra_pointing, dec_pointing):
"""Apply the effective area vignetting to the event list.
"""
logger.info('Applying vignetting to the event list...')
num_initial_events = len(self)
evt_ra = self['MC_RA']
evt_dec = self['MC_DEC']
evt_energy = self['MC_ENERGY']
evt_skycoord = SkyCoord(evt_ra, evt_dec, unit='deg')
ref_skyccord = SkyCoord(ra_pointing, dec_pointing, unit='deg')
evt_angsep = evt_skycoord.separation(ref_skyccord).arcmin
vignetting = aeff.vignetting(evt_energy, evt_angsep)
self.trim(numpy.random.random(vignetting.shape) <= vignetting)
logger.info('Done, %d events out of %d remaining.' %\
(len(self), num_initial_events))
[docs] def write_fits(self, file_path, simulation_info):
"""Write the event list and associated ancillary information to file.
Arguments
---------
file_path : str
The path to the output file.
simulation_info :
A generic container with all the relevant information about the
simulation.
Warning
-------
The information about the detector and telescope should be in the
primary header of the IRF tables, and that's where we should be
retrieving it from. (See issue #49.)
"""
primary_hdu = xPrimaryHDU()
roi_model = simulation_info.roi_model
irf_name = simulation_info.irf_name
ebounds_header = simulation_info.edisp.hdu_list['EBOUNDS'].header
gti_list = simulation_info.gti_list
keywords = [
('ROIRA' , roi_model.ra , 'right ascension of the ROI center'),
('ROIDEC' , roi_model.dec, 'declination of the ROI center'),
('EQUINOX' , 2000. , 'equinox for RA and DEC'),
('IRFNAME' , irf_name , 'name of the IRFs used for the MC'),
('TELESCOP', ebounds_header['TELESCOP']),
('INSTRUME', ebounds_header['INSTRUME']),
('DETNAM' , ebounds_header['DETNAM']),
('DETCHANS', ebounds_header['DETCHANS'])
]
primary_hdu.setup_header(keywords)
data = [self[name] for name in\
xBinTableHDUMonteCarloEvents.spec_names()]
event_hdu = xBinTableHDUMonteCarloEvents(data)
_start = numpy.array([gti[0] for gti in gti_list])
_stop = numpy.array([gti[1] for gti in gti_list])
gti_hdu = xBinTableHDUGTI([_start, _stop])
_src_id = numpy.array([src.identifier for src in roi_model.values()])
_src_name = numpy.array([src.name for src in roi_model.values()])
roi_hdu = xBinTableHDURoiTable([_src_id, _src_name])
hdu_list = fits.HDUList([primary_hdu, event_hdu, gti_hdu, roi_hdu])
hdu_list.info()
hdu_list.writeto(file_path, clobber=True)
logger.info('Event list written to %s...' % file_path)
[docs]class xEventFile:
"""Read-mode interface to event files.
"""
def __init__(self, file_path):
"""Constructor.
"""
assert(file_path.endswith('.fits'))
logger.info('Opening input event file %s...' % file_path)
self.hdu_list = fits.open(file_path)
self.hdu_list.info()
self.event_data = self.hdu_list['EVENTS'].data
self.roi_table = self.build_roi_table()
[docs] def close(self):
"""Close the HDU list.
"""
self.hdu_list.close()
[docs] def num_events(self):
"""Return the total number of events in the event file.
"""
return len(self.event_data)
[docs] def file_path(self):
"""Return the path to the underlying file.
"""
return self.hdu_list.filename()
[docs] def primary_keywords(self):
"""Return a list of all the relevant keywords in the
PRIMARY header.
This is used, e.g., to propagate all the necessary information (such as
the ROI and the IRFs used) from the event files to the binned
data files that are created from them.
"""
_header = self.hdu_list['PRIMARY'].header
keywords = []
for key in ['ROIRA', 'ROIDEC', 'EQUINOX', 'IRFNAME', 'TELESCOP',
'INSTRUME', 'DETCHANS']:
keywords.append((key, _header[key], _header.comments[key]))
return keywords
[docs] def build_roi_table(self):
"""Rebuild the ROI table based in the information in the ROITABLE
extension of the event file.
"""
roi_table = {}
_src_id = self.hdu_list['ROITABLE'].data['SRCID']
_src_name = self.hdu_list['ROITABLE'].data['SRCNAME']
for _id, _name in zip(_src_id, _src_name):
roi_table[_name] = _id
return roi_table
[docs] def roi_center(self):
"""Return the righ ascension and declination of the center of the
ROI, as written in the header of the primary extension.
"""
return self.hdu_list['PRIMARY'].header['ROIRA'],\
self.hdu_list['PRIMARY'].header['ROIDEC']
[docs] def irf_name(self):
"""Return the name of the IRF set used to run the simulation.
"""
return self.hdu_list['PRIMARY'].header['IRFNAME']
[docs] def total_good_time(self):
"""Return the sum of all the GTIs in the event file.
"""
_start = self.hdu_list['GTI'].data['START']
_stop = self.hdu_list['GTI'].data['STOP']
return (_stop - _start).sum()
[docs] def min_good_time(self):
"""Return the smaller START time for the GTIs in the event file.
"""
return self.hdu_list['GTI'].data['START'].min()
[docs] def max_good_time(self):
"""Return the largest STOP time for the GTIs in the event file.
"""
return self.hdu_list['GTI'].data['STOP'].max()
[docs] def source_id(self, source_name):
"""Return the source identifier for a given source name in the ROI.
"""
return self.roi_table[source_name]