# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Model a fiber spectrograph camera for spectroscopic simulations.
Cameras belong to an :class:`Instrument <specsim.instrument.Instrument>` and
are usually initialized from a configuration used to create
a simulator and then accessible via its ``instrument.cameras`` attribute,
for example:
>>> import specsim.simulator
>>> simulator = specsim.simulator.Simulator('test') # doctest: +IGNORE_OUTPUT
>>> print(np.round(simulator.instrument.cameras[0].read_noise, 1))
2.9 electron / pix2
See :doc:`/api` for examples of changing model parameters defined in the
configuration. No attributes can be changed after a simulator has
been created. File a github issue if you would like to change this.
"""
import numpy as np
import scipy.sparse
import astropy.units as u
[docs]
class Camera(object):
"""Model the response of a single fiber spectrograph camera.
No camera attributes can be changed after an instrument has been
created. File a github issue if you would like to change this.
Parameters
----------
name : str
A brief descriptive name for this camera. Typically a single letter
indicating the wavelength band covered by this camera.
wavelength : astropy.units.Quantity
Array of wavelength bin centers where the instrument response is
calculated, with units. Must be equally spaced.
throughput : numpy.ndarray
Array of throughput values tabulated at each wavelength bin center.
row_size : astropy.units.Quantity
Array of row size values tabulated at each wavelength bin center.
Units are required, e.g. Angstrom / pixel.
fwhm_resolution : astropy.units.Quantity
Array of wavelength resolution FWHM values tabulated at each wavelength
bin center. Units are required, e.g., Angstrom.
neff_spatial : astropy.units.Quantity
Array of effective trace sizes in the spatial (fiber) direction
tabulated at each wavelength bin center. Units are required, e.g.
pixel.
read_noise : astropy.units.Quantity
Camera noise per readout operation. Units are required, e.g. electron.
dark_current : astropy.units.Quantity
Nominal mean dark current from sensor. Units are required, e.g.
electron / hour.
gain : astropy.units.Quantity
CCD amplifier gain. Units are required, e.g., electron / adu.
(This is really 1/gain).
num_sigmas_clip : float
Number of sigmas where the resolution should be clipped when building
a sparse resolution matrix.
output_pixel_size : astropy.units.Quantity
Size of output pixels for this camera. Units are required, e.g.
Angstrom. Must be a multiple of the the spacing of the wavelength
input parameter.
allow_convolution : bool
Set True to precompute the sparse resolution matrix needed by
:meth:`get_output_resolution_matrix`, :meth:`apply_resolution` and
:meth:`downsample`.
"""
def __init__(self, name, wavelength, throughput, row_size,
fwhm_resolution, neff_spatial, read_noise, dark_current,
gain, num_sigmas_clip, output_pixel_size,
allow_convolution=True):
self.name = name
self._wavelength = wavelength.to(self._wavelength_unit).value
self.throughput = throughput
self._row_size = row_size.to(self._wavelength_unit / u.pixel).value
self._fwhm_resolution = fwhm_resolution.to(self._wavelength_unit).value
self._neff_spatial = neff_spatial.to(u.pixel).value
self.read_noise = read_noise
self.dark_current = dark_current
self.gain = gain
self.num_sigmas_clip = num_sigmas_clip
self.allow_convolution = allow_convolution
# The arrays defining the CCD properties must all have identical
# wavelength coverage.
# Since these arrays are generally interpolated, we might have some
# very small values due to roundoff errors. Set those to zero first.
for data in (self._row_size, self._fwhm_resolution, self._neff_spatial):
data[np.abs(data) < 1e-10] = 0.
# Use the row size array to define the CCD coverage.
ccd_nonzero = np.where(self._row_size > 0)[0]
ccd_start, ccd_stop = ccd_nonzero[0], ccd_nonzero[-1] + 1
# Verify that the other arrays have matching coverage.
if (np.any(self._fwhm_resolution[:ccd_start] != 0) or
np.any(self._fwhm_resolution[ccd_stop:] != 0)):
raise RuntimeError('Resolution extends beyond CCD coverage.')
if (np.any(self._neff_spatial[:ccd_start] != 0) or
np.any(self._neff_spatial[ccd_stop:] != 0)):
raise RuntimeError('Spatial Neff extends beyond CCD coverage.')
# CCD properties must be valid across the coverage.
if np.any(self._row_size[ccd_start:ccd_stop] <= 0.):
raise RuntimeError('CCD row size has invalid values <= 0.')
if np.any(self._fwhm_resolution[ccd_start:ccd_stop] <= 0.):
raise RuntimeError('CCD resolution has invalid values <= 0.')
if np.any(self._neff_spatial[ccd_start:ccd_stop] <= 0.):
raise RuntimeError('CCD spatial Neff has invalid values <= 0.')
self.ccd_slice = slice(ccd_start, ccd_stop)
self.ccd_coverage = np.zeros_like(self._wavelength, dtype=bool)
self.ccd_coverage[ccd_start:ccd_stop] = True
self._wavelength_min = self._wavelength[ccd_start]
self._wavelength_max = self._wavelength[ccd_stop - 1]
# Calculate the size of each wavelength bin in units of pixel rows.
self._wavelength_bin_size = np.gradient(self._wavelength)
neff_wavelength = np.zeros_like(self._neff_spatial)
neff_wavelength[self.ccd_slice] = (
self._wavelength_bin_size[self.ccd_slice] /
self._row_size[self.ccd_slice])
# Calculate the effective pixel area contributing to the signal
# in each wavelength bin.
self.neff_pixels = neff_wavelength * self._neff_spatial * u.pixel ** 2
# Calculate the read noise per wavelength bin, assuming that
# readnoise is uncorrelated between pixels (hence the sqrt scaling). The
# value will be zero in pixels that are not used by this camera.
self.read_noise_per_bin = (
self.read_noise * np.sqrt(self.neff_pixels.value) * u.pixel ** 2
).to(u.electron)
# Calculate the dark current per wavelength bin.
self.dark_current_per_bin = (
self.dark_current * self.neff_pixels).to(u.electron / u.s)
# Calculate the RMS resolution assuming a Gaussian PSF.
fwhm_to_sigma = 1. / (2 * np.sqrt(2 * np.log(2)))
self._rms_resolution = fwhm_to_sigma * self._fwhm_resolution
# Find the minimum wavelength that can disperse into the CCD,
# assuming a constant extrapolation of the resolution.
sigma_lo = self._rms_resolution[ccd_start]
min_wave = (self._wavelength[ccd_start] -
self.num_sigmas_clip * sigma_lo)
if min_wave < self._wavelength[0]:
raise RuntimeError(
'Wavelength grid min does not cover {0}-camera response.'
.format(self.name))
matrix_start = np.where(self._wavelength >= min_wave)[0][0]
# Find the maximum wavelength that can disperse into the CCD,
# assuming a constant extrapolation of the resolution.
sigma_hi = self._rms_resolution[ccd_stop - 1]
max_wave = (self._wavelength[ccd_stop - 1] +
self.num_sigmas_clip * sigma_hi)
if max_wave > self._wavelength[-1]:
raise RuntimeError(
'Wavelength grid max does not cover {0}-camera response.'
.format(self.name))
matrix_stop = np.where(self._wavelength <= max_wave)[0][-1] + 1
self.response_slice = slice(matrix_start, matrix_stop)
# Pad the RMS array to cover the full resolution matrix range.
sigma = np.empty((matrix_stop - matrix_start))
sigma[:ccd_start - matrix_start] = sigma_lo
sigma[ccd_start - matrix_start:ccd_stop - matrix_start] = (
self._rms_resolution[ccd_start:ccd_stop])
sigma[ccd_stop - matrix_start:] = sigma_hi
# Calculate the range of wavelengths where the dispersion will
# be evaluated. The evaluation range extends beyond wavelengths that
# can disperse into the CCD in order to calculate the normalization.
wave = self._wavelength[matrix_start:matrix_stop]
min_wave = wave - self.num_sigmas_clip * sigma
max_wave = wave + self.num_sigmas_clip * sigma
eval_start = np.searchsorted(self._wavelength, min_wave)
eval_stop = np.searchsorted(self._wavelength, max_wave) + 1
# The columns of the resolution matrix are clipped to the CCD coverage.
column_start = np.maximum(eval_start, ccd_start)
column_stop = np.minimum(eval_stop, ccd_stop)
column_size = column_stop - column_start
assert np.all(column_size > 0)
# The remaining steps are only necessary to support convolution
# and downsampling.
if not self.allow_convolution:
return
# Prepare start, stop values for slicing eval -> column.
trim_start = column_start - eval_start
trim_stop = column_stop - eval_start
assert np.all(trim_stop > trim_start)
# Prepare a sparse resolution matrix in compressed column format.
matrix_size = np.sum(column_size)
data = np.empty((matrix_size,), float)
indices = np.empty((matrix_size,), int)
indptr = np.empty((len(column_size) + 1,), int)
indptr[0] = 0
indptr[1:] = np.cumsum(column_size)
assert indptr[-1] == matrix_size
# Fill sparse matrix arrays.
sparse_start = 0
for i in range(matrix_stop - matrix_start):
eval_slice = slice(eval_start[i], eval_stop[i])
w = self._wavelength[eval_slice]
dw = self._wavelength_bin_size[eval_slice]
column = dw * np.exp(-0.5 * ((w - wave[i]) / sigma[i]) ** 2)
# Normalize over the full evaluation range.
column /= np.sum(column)
# Trim to the CCD coverage.
s = slice(sparse_start, sparse_start + column_size[i])
data[s] = column[trim_start[i]:trim_stop[i]]
indices[s] = np.arange(column_start[i], column_stop[i]) - ccd_start
sparse_start = s.stop
assert np.all((indices >= 0) & (indices < ccd_stop - ccd_start))
assert s.stop == matrix_size
# Create the matrix in CSC format.
matrix_shape = (ccd_stop - ccd_start, matrix_stop - matrix_start)
self._resolution_matrix = scipy.sparse.csc_matrix(
(data, indices, indptr), shape=matrix_shape)
# Convert to CSR format for faster matrix multiplies.
self._resolution_matrix = self._resolution_matrix.tocsr()
# Initialize downsampled output pixels.
self._output_pixel_size = (
output_pixel_size.to(self._wavelength_unit).value)
# Check that we can downsample simulation pixels to obtain
# output pixels. This check will only work if the simulation
# grid is equally spaced, but no other part of the Camera class
# class currently requires this.
wavelength_step = self._wavelength[1] - self._wavelength[0]
if not np.allclose(np.diff(self._wavelength), wavelength_step):
raise RuntimeError(
'Non-uniform simulation wavelength grid not supported yet.')
self._downsampling = int(round(
self._output_pixel_size / wavelength_step))
if not np.allclose(self._downsampling * wavelength_step,
self._output_pixel_size):
raise ValueError(
'Invalid output_pixel_size {0} for {1} camera: '
.format(output_pixel_size, self.name) +
'must be multiple of {0} {1}.'
.format(wavelength_step, self._wavelength_unit))
# The self._wavelength array stores the centers of fixed-width bins.
# Calculate the edges of the downsampled output pixels. Trim
# any partial output pixel on the high end.
output_min = self._wavelength_min - 0.5 * wavelength_step
output_max = self._wavelength_max + 0.5 * wavelength_step
num_downsampled = int(np.floor(
(output_max - output_min) / self._output_pixel_size))
pixel_edges = (
output_min +
np.arange(num_downsampled + 1) * self._output_pixel_size)
# Save the centers of each output pixel.
self._output_wavelength = 0.5 * (pixel_edges[1:] + pixel_edges[:-1])
# Initialize the parameters used by the downsample() method.
self._output_slice = slice(
self.ccd_slice.start,
self.ccd_slice.start + num_downsampled * self._downsampling)
self._downsampled_shape = (num_downsampled, self._downsampling)
[docs]
def get_output_resolution_matrix(self):
"""Return the output resolution matrix in DIA sparse format.
The output resolution is calculated by summing output pixel
blocks of the full resolution matrix. This is equivalent to
the convolution of our resolution with a boxcar representing
an output pixel. Edge effects are not handled very gracefully
in order to return a square matrix.
The memory required for this operation scales with the number
of non-zero elements in the returned matrix. This matrix is
not used internally and is re-calcuated each time this method
is called.
Returns
-------
:class:`scipy.sparse.dia_matrix`
Square array of resolution matrix elements in the DIA
sparse format.
"""
if not self.allow_convolution:
raise RuntimeError('Camera created with allow_convolution False.')
n = len(self._output_wavelength)
m = self._downsampling
i0 = self.ccd_slice.start - self.response_slice.start
output_slice = slice(i0, i0 + n * m)
# Initialize CSR format arrays for building the output matrix.
indptr_out = np.empty((n + 1,), int)
indices_out = []
data_out = []
row_size = self._resolution_matrix.shape[1]
cols_sum = np.empty(row_size, int)
data_sum = np.empty(row_size, float)
# Loop over rows of the CSR format sparse data.
indices_in = self._resolution_matrix.indices
indptr_in = self._resolution_matrix.indptr
data_in = self._resolution_matrix.data
# Loop over rows in the full resolution matrix.
num_out = 0
for i in range(0, n * m, m):
cols_sum[:] = 0
data_sum[:] = 0.
# Loop over rows that will be combined into a single output row.
for k in range(i, i + m):
packed = slice(indptr_in[k], indptr_in[k + 1])
# Find the columns with data in this row.
expanded = indices_in[packed]
# But shift them to the center to prevent smearing
# If m is even, the center is not a grid point, so
# we add it twice
ic1_ = m // 2 - (k - i)
ic2_ = (m - 1) // 2 - (k - i)
# Count the rows contributing to each column.
cols_sum[expanded + ic1_] += 1
cols_sum[expanded + ic2_] += 1
# Sum the data across rows for each column.
data_sum[expanded + ic1_] += data_in[packed]
data_sum[expanded + ic2_] += data_in[packed]
# Combine into a single output row.
data = data_sum[output_slice].reshape(n, m).mean(axis=1) / 2
counts = cols_sum[output_slice].reshape(n, m).sum(axis=1)
indices = np.where(counts > 0)[0]
indices_out.append(indices)
data_out.append(data[indices])
indptr_out[i // m] = num_out
num_out += len(indices)
indptr_out[(i // m) + 1] = num_out
# Combine row arrays.
data_out = np.hstack(data_out)
indices_out = np.hstack(indices_out)
# Build the output matrix in CSR format.
R = scipy.sparse.csr_matrix((data_out, indices_out, indptr_out), (n, n))
# Convert to DIA format and return.
return R.todia()
[docs]
def downsample(self, data, method=np.sum):
"""Downsample data tabulated on the simulation grid to output pixels.
"""
if not self.allow_convolution:
raise RuntimeError('Camera created with allow_convolution False.')
data = np.asanyarray(data)
if data.shape[0] != len(self._wavelength):
raise ValueError(
'Invalid data shape for downsampling: {0}.'.format(data.shape))
output = data[self._output_slice]
new_shape = self._downsampled_shape + output.shape[1:]
return method(output.reshape(new_shape), axis=1)
[docs]
def apply_resolution(self, flux):
"""
Input should be on the simulation wavelength grid.
Any throughput should already be applied.
"""
if not self.allow_convolution:
raise RuntimeError('Camera created with allow_convolution False.')
flux = np.asarray(flux)
dispersed = np.zeros_like(flux)
dispersed[self.ccd_slice] = self._resolution_matrix.dot(
flux[self.response_slice])
return dispersed
# Canonical wavelength unit used for all internal arrays.
_wavelength_unit = u.Angstrom
@property
def wavelength_min(self):
"""Minimum wavelength covered by this camera's CCD.
"""
return self._wavelength_min * self._wavelength_unit
@property
def wavelength_max(self):
"""Maximum wavelength covered by this camera's CCD.
"""
return self._wavelength_max * self._wavelength_unit
@property
def rms_resolution(self):
"""Array of RMS resolution values.
"""
return self._rms_resolution * self._wavelength_unit
@property
def row_size(self):
"""Array of row sizes in the dispersion direction.
"""
return self._row_size * self._wavelength_unit / u.pixel
@property
def neff_spatial(self):
"""Array of effective pixel dimensions in the spatial (fiber) direction.
"""
return self._neff_spatial * u.pixel
@property
def output_pixel_size(self):
"""Size of output pixels.
Must be a multiple of the simulation wavelength grid.
"""
if not self.allow_convolution:
raise RuntimeError('Camera created with allow_convolution False.')
return self._output_pixel_size * self._wavelength_unit
@property
def output_wavelength(self):
"""Output pixel central wavelengths.
"""
if not self.allow_convolution:
raise RuntimeError('Camera created with allow_convolution False.')
return self._output_wavelength * self._wavelength_unit