# Licensed under GPL version 3 - see LICENSE.rst
'''Sources generate photons with all photon properties: energy, direction, time, polarization.
For objects of type `AstroSource`, the coordinates of the photon
origin on the sky are added to the photon
list. `astropy.coords.SkyCoord` is an object well suited for this
task. These objects can be added to photon tables through the
mechanims of `mixin columns
<http://docs.astropy.org/en/latest/table/index.html#mixin-columns>`). However,
mix-in columns don't (yet) support saving to all table formats or
tables operations such as stacking. Thus, it is much better to include
the coordinates as two columns of floats with names ``ra`` and ``dec``
into the table.
Sources take a `~astropy.coordinates.SkyCoord` from the user to avoid
any ambiguity about the coordinate systme used, but convert this into
plain floats used in the photon table.
'''
import os
from datetime import datetime
import numpy as np
from scipy.stats import expon
from astropy.table import Table
import astropy.units as u
from astropy.coordinates import SkyCoord, SkyOffsetFrame
from ..base import SimulationSequenceElement
from ..math.random import RandomArbitraryPdf
from .. import __version__ as marxsversion
[docs]
@u.quantity_input
def poisson_process(rate: (u.s * u.cm**2)**(-1)):
'''Return a function that generates Poisson distributed times with rate ``rate``.
Parameters
----------
rate : `~astropy.units.quantity.Quantity`
Expectation value for the total rate of photons with unit 1 / cm**2 / s.
Returns
-------
poisson_rate : function
Function that generates Poisson distributed times with rate ``rate``.
'''
if not rate.isscalar:
raise ValueError('"rate" must be scalar.')
@u.quantity_input(exposuretime=u.s)
def poisson_rate(exposuretime: u.s, geomarea: u.cm**2) -> u.s:
'''Generate Poisson distributed times.
Parameters
----------
exposuretime : `~astropy.units.quantity.Quantity`
Exposure time
geomarea : `~astropy.units.quantity.Quantity`
Geometric opening area of telescope
Returns
-------
times : `~astropy.units.quantity.Quantity`
Poisson distributed times.
'''
fullrate = rate * geomarea
# Make 10 % more numbers then we expect to need, because it's random
times = expon.rvs(scale=1./fullrate.to(1 / u.s),
size=int((exposuretime * fullrate * 1.1).to(u.dimensionless_unscaled)))
# If we don't have enough numbers right now, add some more.
while (times.sum() * u.s) < exposuretime:
times = np.hstack([times, expon.rvs(scale=1/fullrate.to(1 / u.s),
size=int(((exposuretime - times.sum() * u.s) * fullrate * 1.1).to(u.dimensionless_unscaled)))])
times = np.cumsum(times) * u.s
return times[times < exposuretime]
return poisson_rate
class SourceSpecificationError(Exception):
pass
[docs]
class Source(SimulationSequenceElement):
'''Base class for all photons sources.
This class provides a very general implementation of photons
sources. Typically, it is not used directly, but a more specialized
subclass, such as `PointSource` for an astronomical source or
`LabPointSource` for a source at a finite distance.
Most of the derived source support the same input argumets as `Source`,
thus they are explained in detail here.
Parameters
----------
flux : `~astropy.units.quantity.Quantity` or callable
This sets the total flux from a source in
photons/time/area. Options are:
- quantity: Constant (not Poisson distributed) flux.
- callable: Function that takes a total exposure time as input and
returns an array
of photon emission times between 0 and the total exposure time.
energy : `~astropy.units.quantity.Quantity` or callable or `~astropy.table.QTable`
This input decides the energy of the emitted photons.
Possible formats are:
- polarization.
- `~astropy.units.quantity.Quantity`: Constant energy.
- `astropy.table.Table`:
Given this table, the code assumes a piecewise flat spectrum.
The "energy" values contain the **upper** limit of each bin,
the "fluxdensity" array the flux density in each bin.
The first entry in the "fluxdensity" array is ignored, because the lower
bound of this bin is undefined.
The code draws an energy from this spectrum for every photon created.
- A function or callable object: This option allows for full
customization. The function must take an array of photon times as
input and return an equal length array of photon energies
`~astropy.units.quantity.Quantity`.
polarization : `~astropy.units.quantity.Quantity`, ``None``, `~astropy.table.QTable`, or callable.
There are several different ways to set the polarization angle of the
photons for a polarized source. In all cases, the angle is measured
North through East. (We ignore the special case of a polarized source
exactly on a pole.)
The default value is ``None`` (unpolarized source).
- ``None`` : An unpolarized source. Every photons is assigned a random
polarization.
- `~astropy.units.quantity.Quantity` :
Constant polarization angle for all photons
- `~astropy.table.Table` :
Table with columns called "angle" and "probabilitydensity".
The summed probability density will automatically be
normalized to one. Given this table, the
code assumes a piecewise constant probability density. The "angle"
values contain the **upper** limit of each bin. The first entry in the
"probabilitydenisty" array is ignored, because the lower bound of this bin
is undefined.
- a callable (function or callable object): This
option allows full customization. The function is called with two
arrays (time and energy values) as input and must return an array of
equal length that contains the polarization angles as
`~astropy.units.quantity.Quantity` object.
geomarea : `astropy.units.Quantity` or ``None``
Geometric opening area of telescope. If ``None`` then the flux must
be given in photons per time, not per time per unit area.
'''
def __init__(self, energy=1*u.keV, flux=1 / u.s / u.cm**2,
polarization=None, geomarea=1*u.cm**2, **kwargs):
self.energy = energy
self.flux = flux
self.polarization = polarization
self.geomarea = 1 if geomarea is None else geomarea
super().__init__(**kwargs)
[docs]
def __call__(self, *args, **kwargs):
return self.generate_photons(*args, **kwargs)
[docs]
@u.quantity_input()
def generate_times(self, exposuretime: u.s):
if callable(self.flux):
return self.flux(exposuretime, self.geomarea)
elif hasattr(self.flux, 'isscalar') and self.flux.isscalar:
return np.arange(0, exposuretime.to(u.s).value,
1. / (self.flux * self.geomarea * u.s).decompose()) * u.s
else:
raise SourceSpecificationError('`flux` must be a quantity or a callable.')
[docs]
@u.quantity_input()
def generate_energies(self, t: u.s) -> u.keV:
n = len(t)
# function
if callable(self.energy):
en = self.energy(t)
if len(en) != n:
raise SourceSpecificationError('`energy` has to return an array of same size as input time array.')
else:
return en
# astropy.table.QTable
elif hasattr(self.energy, 'columns'):
x = self.energy['energy'].to(u.keV).value
y = (self.energy['fluxdensity'][1:]).to((u.s * u.cm**2 * u.keV)**(-1)).value
y = np.hstack(([0], y))
rand = RandomArbitraryPdf(x, y)
return rand(n) * u.keV
# scalar quantity
elif hasattr(self.energy, 'isscalar') and self.energy.isscalar:
return np.ones(n) * self.energy.to(u.keV,
equivalencies=u.spectral())
# anything else
else:
raise SourceSpecificationError('`energy` must be Quantity, function, or have columns "energy" and "fluxdensity".')
[docs]
@u.quantity_input()
def generate_polarization(self, times: u.s, energies: u.keV) -> u.rad:
n = len(times)
# function
if callable(self.polarization):
pol = self.polarization(times, energies)
if len(pol) != n:
raise SourceSpecificationError('`polarization` has to return an array of same size as input time and energy arrays.')
else:
return pol
elif self.polarization is None:
return np.random.uniform(0, 2 * np.pi, n) * u.rad
# astropy.table.QTable
elif hasattr(self.polarization, 'columns'):
x = self.polarization['angle'].to(u.rad).value
y = (self.polarization['probabilitydensity']).to(1/u.rad).value
rand = RandomArbitraryPdf(x, y)
return rand(n) * u.rad
# scalar quantity
elif hasattr(self.polarization, 'isscalar') and self.polarization.isscalar:
return np.ones(n) * self.polarization
else:
raise SourceSpecificationError('`polarization` must be number (angle), callable, None (unpolarized), 2.n array or have fields "angle" (in rad) and "probability".')
[docs]
def generate_photon(self):
raise NotImplementedError
[docs]
@u.quantity_input()
def generate_photons(self, exposuretime: u.s):
'''Central function to generate photons.
Calling this function generates a photon table according to
the `flux`, `energy`, and `polarization` of this source. The
number of photons depends on the total exposure time, which is
a parameter of this function. Depending on the setting for
`flux` the photons could be distributed equally over the
interval 0..exposuretime or follow some other distribution.
Parameters
----------
exposuretime : `astropy.quantity.Quantity`
Total exposure time.
Returns
-------
photons : `astropy.table.Table`
Table with photon properties.
'''
times = self.generate_times(exposuretime)
energies = self.generate_energies(times)
pol = self.generate_polarization(times, energies)
n = len(times)
photons = Table([times.to(u.s).value,
energies.to(u.keV).value,
pol.to(u.rad).value,
np.ones(n)],
names=['time', 'energy', 'polangle', 'probability'])
photons.meta['EXTNAME'] = 'EVENTS'
photons.meta['EXPOSURE'] = (exposuretime.to(u.s).value,
'total exposure time [s]')
#photons.meta['DATE-OBS'] =
photons.meta['CREATOR'] = 'MARXS - Version {0}'.format(marxsversion)
photons.meta["LONGSTRN"] = ("OGIP 1.0", "The OGIP long string convention may be used.")
photons.meta['MARXSVER'] = (marxsversion, 'MARXS version')
now = datetime.now()
photons.meta['SIMDATE'] = (str(now.date()), 'Date simulation was run')
photons.meta['SIMTIME'] = (str(now.time())[:10], 'Time simulation was started')
photons.meta['SIMUSER'] = (os.environ.get('USER', 'unknown user'),
'User running simulation')
photons.meta['SIMHOST'] = (os.environ.get('HOST', 'unknown host'),
'Host system running simulation')
photons['time'].unit = u.s
photons['energy'].unit = u.keV
photons['polangle'].unit = u.rad
return photons
class AstroSource(Source):
'''Astrophysical source with a sky position
Parameters
----------
coords : `astropy.coordinates.SkySoord` (preferred)
Position of the source on the sky. If ``coords`` is not a
`~astropy.coordinates.SkyCoord` object itself, it is used to
initialize such an object. See `~astropy.coordinates.SkyCoord`
for a description of allowed input values.
'''
def __init__(self, **kwargs):
coords = kwargs.pop('coords')
if isinstance(coords, SkyCoord):
self.coords = coords
else:
self.coords = SkyCoord(coords)
if not self.coords.isscalar:
raise ValueError("Coordinate must be scalar, not array.")
super().__init__(**kwargs)
def set_pos(self, photons, coo):
'''Set Ra, Dec of photons in table
This function write Ra, Dec to a table. It is defined here to
make the way `astropy.coordinates.SkyCoord` objects are stored
more uniform. Currently, mixin columns in tables have some
disadvantages, e.g. they cause errors on writing and on
stacking. Thus, we store the coordinates as plain
numbers. Since that format is not unique (e.g. units could be
deg or rad), system could be ICRS, FK4, FK5 or other this
conversion is done here for all astrononimcal sources. This
also makes it easier to change that design in the future.
Parameters
----------
photons : `astropy.table.Table`
Photon table. Columns ``ra`` and ``dec`` will be added or overwritten.
coo : `astropy.coords.SkyCoord`
Photon coordinates
'''
photons['ra'] = coo.icrs.ra.deg
photons['dec'] = coo.icrs.dec.deg
photons['ra'].unit = u.degree
photons['dec'].unit = u.degree
photons.meta['COORDSYS'] = ('ICRS', 'Type of coordinate system')
[docs]
class PointSource(AstroSource):
'''Astrophysical point source.
Parameters
----------
kwargs : see `Source`
Other keyword arguments include ``flux``, ``energy`` and
``polarization``. See `Source` for details.
'''
def __init__(self, **kwargs):
super().__init__(**kwargs)
[docs]
@u.quantity_input
def generate_photons(self, exposuretime: u.s):
photons = super().generate_photons(exposuretime)
self.set_pos(photons, self.coords)
return photons
[docs]
class RadialDistributionSource(AstroSource):
'''Base class for sources where photons follow some radial distribution on the sky
Parameters
----------
radial_distribution : callable
A function that takes an interger as input, which specifies the number
of photons to produce. The output must be an `astropy.units.Quantity`
object with n angles in it.
func_par : object
``radial_distribution`` has access to ``self.func_par`` to
hold function parameters. This could be, e.g. a tuple with
coeffications.
kwargs : see `Source`
Other keyword arguments include ``flux``, ``energy`` and ``polarization``.
See `Source` for details.
'''
def __init__(self, **kwargs):
self.func = kwargs.pop('radial_distribution')
self.func_par = kwargs.pop('func_par', None)
super().__init__(**kwargs)
[docs]
@u.quantity_input
def generate_photons(self, exposuretime: u.s):
'''Photon positions are generated in a frame that is centered on the
coordinates set in ``coords``, then they get transformed into
the global sky system.
'''
photons = super().generate_photons(exposuretime)
relative_frame = SkyOffsetFrame(origin=self.coords)
n = len(photons)
phi = np.random.rand(n) * 2. * np.pi * u.rad
d = self.func(n)
relative_coords = SkyCoord(d * np.sin(phi), d * np.cos(phi),
frame=relative_frame)
origin_coord = relative_coords.transform_to(self.coords)
self.set_pos(photons, origin_coord)
return photons
[docs]
class SphericalDiskSource(RadialDistributionSource):
'''Astrophysical source with the shape of a circle or ring.
The `DiskSource` makes a small angle approximation. In contrast,
this source implements the full spherical geometry at the cost of
running slower. For radii less than a few degrees the difference
is negligible and we recommend use of the faster `DiskSource`.
Parameters
----------
a_inner, a_outer : `astropy.coordinates.Angle`
Inner and outer angle of the ring (e.g. in arcsec).
The default is a disk with no inner hole (``a_inner`` is set to zero.)
'''
def __init__(self, **kwargs):
kwargs['func_par'] = [kwargs.pop('a_outer'),
kwargs.pop('a_inner', 0. * u.rad)]
kwargs['radial_distribution'] = self.radial_distribution
super().__init__(**kwargs)
[docs]
def radial_distribution(self, n):
'''Radial distribution function.
See http://6degreesoffreedom.co/circle-random-sampling/ for an
explanation of how to derive the formula. Note however, that
here we use sin instead of cos because we measure the angle
from the top.
'''
u = np.random.rand(n)
return np.arccos(np.cos(self.func_par[1]) * (1. - u) + u * np.cos(self.func_par[0]))
[docs]
class DiskSource(RadialDistributionSource):
'''Astrophysical source with the shape of a circle or ring.
This source uses a small angle approximation which is valid for
radii less than a few degrees and runs much faster. See
``SphericalDiskSource`` for an implementation using full spherical
geometry.
Parameters
----------
a_inner, a_outer : `astropy.coordinates.Angle`
Inner and outer angle of the ring (e.g. in arcsec).
The default is a disk with no inner hole (``a_inner`` is set to zero.)
'''
def __init__(self, **kwargs):
kwargs['func_par'] = [kwargs.pop('a_outer'),
kwargs.pop('a_inner', 0. * u.rad)]
kwargs['radial_distribution'] = lambda n: np.sqrt(self.func_par[1]**2 +
np.random.rand(n) * (self.func_par[0]**2 - self.func_par[1]**2))
super().__init__(**kwargs)
[docs]
class GaussSource(AstroSource):
'''Astrophysical source with a Gaussian brightness profile.
This source uses a small angle approximation which is valid for
radii less than a few degrees.
Parameters
----------
sigma : `astropy.coordinates.Angle`
Gaussian sigma setting the width of the distribution
'''
def __init__(self, **kwargs):
self.sigma = kwargs.pop('sigma')
super().__init__(**kwargs)
[docs]
@u.quantity_input()
def generate_photons(self, exposuretime: u.s):
'''Photon positions are generated in a frame that is centered on the
coordinates set in ``coords``, then they get transformed into
the global sky system.
'''
photons = super().generate_photons(exposuretime)
relative_frame = SkyOffsetFrame(origin=self.coords)
n = len(photons)
relative_coords = SkyCoord(np.random.normal(scale=self.sigma.value,
size=n) * self.sigma.unit,
np.random.normal(scale=self.sigma.value,
size=n) * self.sigma.unit,
frame=relative_frame)
origin_coord = relative_coords.transform_to(self.coords)
self.set_pos(photons, origin_coord)
return photons
[docs]
class SymbolFSource(AstroSource):
'''Source shaped like the letter F.
This source provides a non-symmetric source for testing purposes.
Parameters
----------
size : `astropy.units.quantity`
angular size
kwargs : see `Source`
Other keyword arguments include ``flux``, ``energy`` and
``polarization``. See `Source` for details.
'''
def __init__(self, **kwargs):
self.size = kwargs.pop('size', 1. * u.degree)
super().__init__(**kwargs)
[docs]
@u.quantity_input()
def generate_photons(self, exposuretime: u.s):
photons = super().generate_photons(exposuretime)
n = len(photons)
elem = np.random.choice(3, size=n)
ra = np.ones(n) * self.coords.icrs.ra
dec = np.ones(n) * self.coords.icrs.dec
size = self.size
ra[elem == 0] += size * np.random.random(np.sum(elem == 0))
ra[elem == 1] += size
dec[elem == 1] += 0.5 * size * np.random.random(np.sum(elem == 1))
ra[elem == 2] += 0.8 * size
dec[elem == 2] += 0.3 * size * np.random.random(np.sum(elem == 2))
self.set_pos(photons, SkyCoord(ra, dec, frame=self.coords))
return photons