# -*- coding: utf-8 -*-
import copy
import datetime
import logging
import os
import numpy as np
from astropy.io import fits
from scipy import interpolate as itp
__all__ = ["Spectrum1D"]
[docs]class Spectrum1D:
"""
Base class of a 1D spectral object to hold the information of each
extracted spectrum and the raw headers if was provided during the
data reduction. The FITS or CSV file output are all done here.
"""
def __init__(
self,
spec_id=None,
verbose=True,
logger_name="Spectrum1D",
log_level="INFO",
log_file_folder="default",
log_file_name=None,
):
"""
Initialise the object with a logger.
Parameters
----------
spec_id: int (Default: None)
The ID corresponding to the spectrum1D object. Note that this
ID is unique in each reduction only.
verbose: boolean (Default: True)
Set to False to suppress all verbose warnings, except for
critical failure.
logger_name: str (Default: Spectrum1D)
This will set the name of the logger, if the name is used already,
it will reference to the existing logger. This will be the
first part of the default log file name unless log_file_name is
provided.
log_level: str (Default: 'INFO')
Four levels of logging are available, in decreasing order of
information and increasing order of severity: (1) DEBUG, (2) INFO,
(3) WARNING, (4) ERROR and (5) CRITICAL. WARNING means that
there is suboptimal operations in some parts of that step. ERROR
means that the requested operation cannot be performed, but the
software can handle it by either using the default setting or
skipping the operation. CRITICAL means that the requested
operation cannot be resolved without human interaction, this is
most usually coming from missing data.
log_file_folder: None or str (Default: "default")
Folder in which the file is save, set to default to save to the
current path.
log_file_name: None or str (Default: None)
File name of the log, set to None to self.logger.warning to screen
only.
"""
# Set-up logger
self.logger = logging.getLogger(logger_name)
if (log_level == "CRITICAL") or (not verbose):
self.logger.setLevel(logging.CRITICAL)
elif log_level == "ERROR":
self.logger.setLevel(logging.ERROR)
elif log_level == "WARNING":
self.logger.setLevel(logging.WARNING)
elif log_level == "INFO":
self.logger.setLevel(logging.INFO)
elif log_level == "DEBUG":
self.logger.setLevel(logging.DEBUG)
else:
raise ValueError("Unknonw logging level.")
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s [%(filename)s:%(lineno)d] "
"%(message)s",
datefmt="%a, %d %b %Y %H:%M:%S",
)
if log_file_name is None:
# Only print log to screen
self.handler = logging.StreamHandler()
else:
if log_file_name == "default":
log_file_name = "{}_{}.log".format(
logger_name,
datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S"),
)
# Save log to file
if log_file_folder == "default":
log_file_folder = ""
self.handler = logging.FileHandler(
os.path.join(log_file_folder, log_file_name), "a+"
)
self.handler.setFormatter(formatter)
if self.logger.hasHandlers():
self.logger.handlers.clear()
self.logger.addHandler(self.handler)
# spectrum ID
if spec_id is None:
self.spec_id = 0
elif type(spec_id) == int:
self.spec_id = spec_id
else:
error_msg = (
"spec_id has to be of type int, "
+ "{} is given.".format(type(spec_id))
)
self.logger.critical(error_msg)
raise ValueError(error_msg)
# Reduction Meta-data
self.time_of_reduction = datetime.datetime.now().strftime(
"%Y_%m_%d_%H_%M_%S"
)
# Raw image headers
self.spectrum_header = None
self.arc_header = None
self.standard_header = None
# Detector properties
self.gain = None
self.readnoise = None
self.exptime = None
self.seeing = None
# Observing Condition
self.relative_humidity = None
self.pressure = None
self.temperature = None
self.airmass = None
# Trace properties
self.trace = None
self.trace_sigma = None
self.len_trace = None
self.pixel_list = None
self.pixel_mapping_itp = None
self.widthdn = None
self.widthup = None
self.sepdn = None
self.sepup = None
self.skywidthdn = None
self.skywidthup = None
self.extraction_type = None
self.count = None
self.count_err = None
self.count_sky = None
self.var = None
self.profile = None
self.profile_func = None
# Wavelength calibration properties
self.arc_spec = None
self.peaks = None
self.peaks_refined = None
# fit constrains
self.calibrator = None
self.min_atlas_wavelength = None
self.max_atlas_wavelength = None
self.num_slopes = None
self.range_tolerance = None
self.fit_tolerance = None
self.fit_type = None
self.fit_deg = None
self.candidate_tolerance = None
self.linearity_tolerance = None
self.ransac_tolerance = None
self.num_candidates = None
self.xbins = None
self.ybins = None
self.brute_force = None
# fit config
self.sample_size = None
self.top_n = None
self.max_tries = None
self.intput_coeff = None
self.linear = None
self.weighted = None
self.filter_close = None
# fit output
self.fit_coeff = None
self.matched_peaks = None
self.matched_atlas = None
self.rms = None
self.residual = None
self.peak_utilisation = None
self.atlas_utilisation = None
# fit output
self.fit_coeff_rascal = None
self.matched_peaks_rascal = None
self.matched_atlas_rascal = None
self.rms_rascal = None
self.residual_rascal = None
self.peak_utilisation_rascal = None
self.atlas_utilisation_rascal = None
# fit output
self.fit_coeff_refine = None
self.matched_peaks_refine = None
self.matched_atlas_refine = None
self.rms_refine = None
self.residual_refine = None
self.peak_utilisation_refine = None
self.atlas_utilisation_refine = None
# fitted solution
self.wave = None
self.wave_bin = None
self.wave_start = None
self.wave_end = None
self.wave_resampled = None
self.arc_spec_resampled = None
self.count_resampled = None
self.count_err_resampled = None
self.count_sky_resampled = None
# Fluxes
self.flux = None
self.flux_err = None
self.flux_sky = None
self.flux_resampled = None
self.flux_err_resampled = None
self.flux_sky_resampled = None
# Continuum
self.count_continuum = None
self.count_resampled_continuum = None
self.flux_continuum = None
self.flux_resampled_continuum = None
# Sensitivity curve smoothing properties
self.smooth = None
self.slength = None
self.sorder = None
# Tellurics
self.telluric_func = None
self.telluric_profile = None
self.telluric_profile_resampled = None
self.telluric_factor = 0.0
self.telluric_factor_resampled = 0.0
# Sensitivity curve properties
self.sensitivity = None
self.sensitivity_resampled = None
self.sensitivity_func = None
self.wave_literature = None
self.flux_literature = None
# Atmospheric extinction properties
self.standard_flux_extinction_fraction = None
self.standard_flux_resampled_extinction_fraction = None
self.science_flux_extinction_fraction = None
self.science_flux_resampled_extinction_fraction = None
# HDU lists for output
self.trace_hdulist = None
self.count_hdulist = None
self.arc_spec_hdulist = None
self.wavecal_hdulist = None
self.count_resampled_hdulist = None
self.flux_hdulist = None
self.flux_resampled_hdulist = None
self.wavelength_hdulist = None
# FITS output properties
self.hdu_output = None
self.empty_primary_hdu = True
self.header = {
"flux_resampled": "Resampled Flux, Resampled Flux Uncertainty, Resampled Sky Flux, "
"Resampled Sensitivity Curve",
"count_resampled": "Resampled Count, Resampled Count Uncertainty, "
"Resampled Sky Count",
"arc_spec": "1D Arc Spectrum, Arc Line Position, Arc Line Effective Position",
"wavecal": "Polynomial coefficients for wavelength calibration",
"wavelength": "The pixel-to-wavelength mapping",
"flux": "Flux, Flux Uncertainty, Sky Flux",
"sensitivity": "Sensitivity Curve",
"weight_map": "Weight map of the extration (variance)",
"count": "Count, Count Uncertainty, Sky Count",
"trace": "Pixel positions of the trace in the spatial direction, "
"Width of the trace",
}
self.n_hdu = {
"trace": 2,
"count": 3,
"weight_map": 1,
"arc_spec": 3,
"wavecal": 1,
"wavelength": 1,
"count_resampled": 3,
"sensitivity": 1,
"flux": 3,
"sensitivity_resampled": 1,
"flux_resampled": 3,
}
self.hdu_order = {
"trace": 0,
"count": 1,
"weight_map": 2,
"arc_spec": 3,
"wavecal": 4,
"wavelength": 5,
"count_resampled": 6,
"sensitivity": 7,
"flux": 8,
"sensitivity_resampled": 9,
"flux_resampled": 10,
}
self.hdu_content = {
"trace": False,
"count": False,
"weight_map": False,
"arc_spec": False,
"wavecal": False,
"wavelength": False,
"count_resampled": False,
"sensitivity": False,
"flux": False,
"sensitivity_resampled": False,
"flux_resampled": False,
}
[docs] def merge(self, spectrum1D, overwrite=False):
"""
This function copies all the info from the supplied spectrum1D to
this one, including the spec_id.
Parameters
----------
spectrum1D: Spectrum1D object
The source Spectrum1D to be deep copied over.
overwrite: boolean (Default: False)
Set to True to overwrite all the data in this Spectrum1D.
"""
for attr, value in self.__dict__.items():
if attr == "spec_id":
if getattr(spectrum1D, attr) != 0:
setattr(self, attr, value)
if getattr(self, attr) is None or []:
setattr(self, attr, getattr(spectrum1D, attr))
if overwrite:
setattr(self, attr, getattr(spectrum1D, attr))
else:
# if not overwrite, do nothing
pass
[docs] def add_standard_header(self, header):
"""
Add a header for the standard star that it is flux calibrated
against. The, if opted for, automatic atmospheric extinction
correction is applied based on the airmass found in this header.
Typically put the header of the raw 2D spectral image of the
standard star here. Some automated operations rely on reading
from the header.
Parameters
----------
header: astropy.io.fits.Header object
"""
if header is not None:
if type(header) == fits.Header:
self.standard_header = header
self.logger.info("standard_header is stored.")
elif type(header[0]) == fits.Header:
self.standard_header = header[0]
self.logger.info("standard_header is stored.")
else:
self.logger.error(
"Unsupported type of header is provided: {}. "
"Only astropy.io.fits.Header or such in a list or array "
"can be accepted.".format(type(header))
)
[docs] def remove_standard_header(self):
"""
Remove the FITS header of the standard.
"""
self.standard_header = None
[docs] def add_trace(self, trace, trace_sigma, pixel_list=None):
"""
Add the trace of the spectrum.
Parameters
----------
trace: list or numpy.ndarray (N)
The spatial pixel value (can be sub-pixel) of the trace at each
spectral position.
trace_sigma: list or numpy.ndarray (N)
Standard deviation of the Gaussian profile of a trace
pixel_list: list or numpy array (Default: None)
The pixel position of the trace in the dispersion direction.
This should be provided if you wish to override the default
range (num_pix), for example, in the case of accounting for chip
gaps (10 pixels) in a 3-CCD setting, you should provide
[0,1,2,...90, 100,101,...190, 200,201,...290]
"""
assert isinstance(
trace, (list, np.ndarray)
), "trace has to be a list or a numpy array"
assert isinstance(
trace_sigma, (list, np.ndarray)
), "trace_sigma has to be a list or a numpy array"
assert len(trace_sigma) == len(trace), "trace and trace_sigma have to "
" be the same size."
if pixel_list is None:
pixel_list = list(np.arange(len(trace)).astype("int"))
else:
assert isinstance(
pixel_list, (list, np.ndarray)
), "pixel_list has to be a list or a numpy array"
assert len(pixel_list) == len(trace), "trace and pixel_list have "
"to be the same size."
pixel_mapping_itp = itp.interp1d(
np.arange(len(trace)),
pixel_list,
kind="cubic",
fill_value="extrapolate",
)
# Only add if all assertions are passed.
self.trace = trace
self.trace_sigma = trace_sigma
self.len_trace = len(trace)
self.add_pixel_list(pixel_list)
self.add_pixel_mapping_itp(pixel_mapping_itp)
[docs] def remove_trace(self):
"""
Remove the trace, trace_sigma and len_trace, also remove the pixel_list
and the interpolation function that maps the pixel values to the
effective pixel values.
"""
self.trace = None
self.trace_sigma = None
self.len_trace = None
self.remove_pixel_list()
self.remove_pixel_mapping_itp()
[docs] def add_aperture(
self, widthdn, widthup, sepdn, sepup, skywidthdn, skywidthup
):
"""
The size of the aperture in which the spectrum is extracted. This is
merely the limit where extraction is performed, it does not hold the
information of the weighting::
................................. ^
Sky ................................. | skywidthup
................................. v
................................. ^
................................. | sepup
................................. v
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ^
--------------------------------- | widthup
Spectrum ================================= v ^
--------------------------------- | widthdn
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ v
................................. ^
................................. | sepdn
................................. v
................................. ^
Sky ................................. | skywidthdn
................................. v
Parameters
----------
widthdn: real positive number
The aperture size on the bottom side of the spectrum.
widthup: real positive number
The aperture size on the top side of the spectrum.
sepdn: real positive number
The gap between the spectrum and the sky region on the bottom
side of the spectrum.
sepup: real positive number
The gap between the spectrum and the sky region on the top
side of the spectrum.
skywidthdn: real positive number
The sky region on the bottom side of the spectrum.
skywidthup: real positive number
The sky region on the top side of the spectrum.
"""
assert np.isfinite(widthdn), "widthdn has to be finite."
assert np.isfinite(widthup), "widthup has to be finite."
assert np.isfinite(sepdn), "sepdn has to be finite."
assert np.isfinite(sepup), "sepup has to be finite."
assert np.isfinite(skywidthdn), "skywidthdn has to be finite."
assert np.isfinite(skywidthup), "skywidthup has to be finite."
self.widthdn = widthdn
self.widthup = widthup
self.sepdn = sepdn
self.sepup = sepup
self.skywidthdn = skywidthdn
self.skywidthup = skywidthup
[docs] def add_count(self, count, count_err=None, count_sky=None):
"""
Add the photoelectron counts and the associated optional
uncertainty and sky counts.
Parameters
----------
count: 1-d array
The summed count at each column along the trace.
count_err: 1-d array
the uncertainties of the count values
count_sky: 1-d array
The integrated sky values along each column, suitable for
subtracting from the output of ap_extract
"""
# Check data type
assert isinstance(
count, (list, np.ndarray)
), "count has to be a list or a numpy array"
if count_err is not None:
assert isinstance(
count_err, (list, np.ndarray)
), "count_err has to be a list or a numpy array"
assert len(count_err) == len(
count
), "count_err has to be the same size as count"
if count_sky is not None:
assert isinstance(
count_sky, (list, np.ndarray)
), "count_sky has to be a list or a numpy array"
assert len(count_sky) == len(
count
), "count_sky has to be the same size as count"
self.count = count
# Only add if they are provided
if count_err is not None:
self.count_err = count_err
else:
self.count_err = np.zeros_like(self.count)
if count_sky is not None:
self.count_sky = count_sky
else:
self.count_sky = np.zeros_like(self.count)
if self.pixel_list is None:
pixel_list = list(np.arange(len(count)).astype("int"))
self.add_pixel_list(pixel_list)
else:
assert len(self.pixel_list) == len(count), (
"count and pixel_list have " + "to be the same size."
)
[docs] def remove_count(self):
"""
Remove the count, count_err and count_sky.
"""
self.count = None
self.count_err = None
self.count_sky = None
[docs] def add_variances(self, var):
"""
Add the weight map of the extraction.
Parameters
----------
var: 2-d array
The weigh map of the input image for extraction.
"""
self.var = var
[docs] def remove_variances(self):
"""
Remove the variances.
"""
self.var = None
[docs] def add_profile_func(self, profile_func):
"""
Add the fitted trace profile.
Parameters
----------
profile_func: a fitted astropy.model
The fitted trace profile.
"""
self.profile_func = profile_func
[docs] def remove_profile_func(self):
"""
Remove the fitted trace profile.
"""
self.profile_func = None
[docs] def add_profile(self, profile):
"""
Add the extraction profile.
Parameters
----------
profile: 1-d array
The weight function of the extraction profile.
"""
self.profile = profile
[docs] def remove_profile(self):
"""
Remove the extraction profile.
"""
self.profile = None
[docs] def add_arc_spec(self, arc_spec):
"""
Add the extracted 1D spectrum of the arc.
Parameters
----------
arc_spec: 1-d array
The photoelectron count of the spectrum of the arc lamp.
"""
assert isinstance(
arc_spec, (list, np.ndarray)
), "arc_spec has to be a list or a numpy array"
self.arc_spec = arc_spec
[docs] def remove_arc_spec(self):
"""
Remove the 1D spectrum of the arc.
"""
self.arc_spec = None
[docs] def add_pixel_list(self, pixel_list):
"""
Add the pixel list, which contain the effective pixel values that
has accounted for chip gaps and non-integer pixel value.
Parameters
----------
pixel_list: 1-d array
The effective position of the pixel.
"""
assert isinstance(
pixel_list, (list, np.ndarray)
), "pixel_list has to be a list or a numpy array"
self.pixel_list = pixel_list
[docs] def remove_pixel_list(self):
"""
Remove the pixel list.
"""
self.pixel_list = None
[docs] def add_pixel_mapping_itp(self, pixel_mapping_itp):
"""
Add the interpolated callable function that convert raw pixel
values into effective pixel values.
Parameters
----------
pixel_mapping_itp: callable function
The mapping function for raw-effective pixel position.
"""
assert (
type(pixel_mapping_itp) == itp.interpolate.interp1d
), "pixel_mapping_itp has to be a "
"scipy.interpolate.interpolate.interp1d object."
self.pixel_mapping_itp = pixel_mapping_itp
[docs] def remove_pixel_mapping_itp(self):
"""
Remove the interpolation function that maps the pixel values to the
effective pixel value.
"""
self.pixel_mapping_itp = None
[docs] def add_peaks(self, peaks):
"""
Add the pixel values (int) where arc lines are located.
Parameters
----------
peaks: 1-d array
The pixel value of the identified peaks.
"""
assert isinstance(
peaks, (list, np.ndarray)
), "peaks has to be a list or a numpy array"
self.peaks = peaks
[docs] def remove_peaks(self):
"""
Remove the list of peaks.
"""
self.peaks = None
[docs] def add_peaks_refined(self, peaks_refined):
"""
Add the refined pixel values (float) where arc lines are located.
Parameters
----------
peaks: 1-d array
The pixel value of the refined peak positions.
"""
assert isinstance(
peaks_refined, (list, np.ndarray)
), "peaks_refined has to be a list or a numpy array"
self.peaks_refined = peaks_refined
[docs] def remove_peaks_refined(self):
"""
Remove the list of refined peaks.
"""
self.peaks_refined = None
[docs] def add_peaks_wave(self, peaks_wave):
"""
Add the wavelength (Angstrom) of the arc lines.
Parameters
----------
peaks: 1-d array
The wavelength value of the peaks.
"""
assert isinstance(
peaks_wave, (list, np.ndarray)
), "peaks_wave has to be a list or a numpy array"
self.peaks_wave = peaks_wave
[docs] def remove_peaks_wave(self):
"""
Remove the list of wavelengths of the arc lines.
"""
self.peaks_wave = None
[docs] def add_calibrator(self, calibrator):
"""
Add a RASCAL Calibrator object.
Parameters
----------
calibrator: rascal.Calibrator()
A RASCAL Calibrator object.
"""
self.calibrator = calibrator
[docs] def remove_calibrator(self):
"""
Remove the Calibrator.
"""
self.calibrator = None
[docs] def add_atlas_wavelength_range(
self, min_atlas_wavelength, max_atlas_wavelength
):
"""
Add the allowed range of wavelength calibration.
Parameters
----------
min_atlas_wavelength: float
The minimum wavelength of the atlas.
max_atlas_wavelength: float
The maximum wavelength of the atlas.
"""
assert np.isfinite(
min_atlas_wavelength
), "min_atlas_wavelength has to be finite."
assert np.isfinite(
max_atlas_wavelength
), "max_atlas_wavelength has to be finite."
self.min_atlas_wavelength = min_atlas_wavelength
self.max_atlas_wavelength = max_atlas_wavelength
[docs] def remove_atlas_wavelength_range(self):
"""
Remove the atlas wavelength range.
"""
self.min_atlas_wavelength = None
self.max_atlas_wavelength = None
[docs] def add_min_atlas_intensity(self, min_atlas_intensity):
"""
Add the minimum allowed line intensity (theoretical NIST value)
that were used for wavelength calibration.
Parameters
----------
min_atlas_intensity: float
The minimum line strength used to get the atlas.
"""
assert np.isfinite(
min_atlas_intensity
), "min_atlas_intensity has to be finite."
self.min_atlas_intensity = min_atlas_intensity
[docs] def remove_min_atlas_intensity(self):
"""
Remove the minimum atlas intensity.
"""
self.min_atlas_intensity = None
[docs] def add_min_atlas_distance(self, min_atlas_distance):
"""
Add the minimum allowed line distance (only consider lines that
passed the minimum intensity threshold) that were used for
wavelength calibration.
Parameters
----------
min_atlas_distance: float
Minimum wavelength separation between neighbouring lines.
"""
assert np.isfinite(
min_atlas_distance
), "min_atlas_distance has to be finite."
self.min_atlas_distance = min_atlas_distance
[docs] def remove_min_atlas_distance(self):
"""
Remove the minimum distance between lines to be accepted.
"""
self.min_atlas_distance = None
[docs] def add_gain(self, gain):
"""
Add the gain value of the spectral image. This value can be
different from the one in the header as this can be overwritten
by an user input, while the header value is raw.
Parameters
----------
gain: float
The gain value of the detector.
"""
assert np.isfinite(gain), "gain has to be finite."
self.gain = gain
[docs] def remove_gain(self):
"""
Remove the gain value.
"""
self.gain = None
[docs] def add_readnoise(self, readnoise):
"""
Add the readnoise value of the spectral image. This value can be
different from the one in the header as this can be overwritten
by an user input, while the header value is raw.
Parameters
----------
readnoise: float
The read noise value of the detector.
"""
assert np.isfinite(readnoise), "readnoise has to be finite."
self.readnoise = readnoise
[docs] def remove_readnoise(self):
"""
Remove the readnoise value.
"""
self.readnoise = None
[docs] def add_exptime(self, exptime):
"""
Add the exposure time of the spectral image. This value can be
different from the one in the header as this can be overwritten
by an user input, while the header value is raw.
Parameters
----------
exptime: float
The exposure time of the input image.
"""
assert np.isfinite(exptime), "exptime has to be finite."
self.exptime = exptime
[docs] def remove_exptime(self):
"""
Remove the exptime value.
"""
self.exptime = None
[docs] def add_airmass(self, airmass):
"""
Add the airmass when the observation was carried out. This value
can be different from the one in the header as this can be
overwritten by an user input, while the header value is raw.
Parameters
----------
airmass: float
The effective airmass during the exposure.
"""
assert np.isfinite(airmass), "airmass has to be finite."
self.airmass = airmass
[docs] def remove_airmass(self):
"""
Remove the airmass value.
"""
self.airmass = None
[docs] def add_seeing(self, seeing):
"""
Add the seeing when the observation was carried out. This value
can be different from the one in the header as this can be
overwritten by an user input, while the header value is raw.
Parameters
----------
seeing: float
The effective seeing of the observation.
"""
assert np.isfinite(seeing), "airmass has to be finite."
self.seeing = seeing
[docs] def remove_seeing(self):
"""
Remove the seeing value.
"""
self.seeing = None
[docs] def add_weather_condition(self, pressure, temperature, relative_humidity):
"""
Add the pressure, temperature and relative humidity when the spectral
image was taken. These value can be different from the ones in the
header as this can be overwritten by an user input, while the header
value is raw.
Parameters
----------
pressure: float
The air pressure during the observation (in pascal).
temperature: float
The temperature during the observation (in Kelvin).
relative_hhumidity: float
The relative humidity during the observation (between 0 and 1).
"""
assert np.isfinite(pressure), "pressure has to be finite."
assert np.isfinite(temperature), "temperature has to be finite."
assert np.isfinite(
relative_humidity
), "relative_humidity has to be finite."
self.pressure = pressure
self.temperature = temperature
self.relative_humidity = relative_humidity
[docs] def remove_weather_condition(self):
"""
Remove the Pressure, Temperature and Humidity.
"""
self.pressure = None
self.temperature = None
self.relative_humidity = None
[docs] def add_fit_type(self, fit_type):
"""
Add the kind of polynomial used for fitting the pixel-to-wavelength
function.
Parameters
----------
fit_type: str
The type of polynomial for fitting the pixel-wavelength function.
"""
assert type(fit_type) == str, "fit_type has to be a string"
assert fit_type in ["poly", "leg", "cheb"], "fit_type must be "
"(1) poly(nomial); (2) leg(endre); or (3) cheb(yshev)."
self.fit_type = fit_type
[docs] def remove_fit_type(self):
"""
Remove the chosen type of polynomial.
"""
self.fit_type = None
[docs] def add_fit_coeff(self, fit_coeff):
"""
Add the polynomial co-efficients of the pixel-to-wavelength
function. Note that this overwrites the wavelength calibrated
fit coefficients.
Parameters
----------
fit_coeff: list or 1-d array
The set of coefficients of the pixel-wavelength function.
"""
assert isinstance(
fit_coeff, (list, np.ndarray)
), "fit_coeff has to be a list or a numpy array."
self.fit_coeff = fit_coeff
[docs] def remove_fit_coeff(self):
"""
Remove the polynomial co-efficients of the pixel-to-wavelength
function.
"""
self.fit_coeff = None
[docs] def add_calibrator_properties(
self, num_pix, pixel_list, plotting_library, log_level
):
"""
Add the properties of the RASCAL Calibrator.
Parameters
----------
num_pix: int
The number of pixels in the dispersion direction
pixel_list: list or numpy array
The pixel position of the trace in the dispersion direction.
This should be provided if you wish to override the default
range(num_pix), for example, in the case of accounting
for chip gaps (10 pixels) in a 3-CCD setting, you should provide
[0,1,2,...90, 100,101,...190, 200,201,...290]
plotting_library : string
Choose between matplotlib and plotly.
log_level : string
Choose from {CRITICAL, ERROR, WARNING, INFO, DEBUG, NOTSET}.
"""
self.num_pix = num_pix
self.plotting_library = plotting_library
self.log_level = log_level
self.pixel_list = pixel_list
[docs] def remove_calibrator_properties(self):
"""
Remove the properties of the RASCAL Calibrator as recorded in ASPIRED.
This does NOT remove the calibrator properites INSIDE the calibrator.
"""
self.num_pix = None
self.pixel_list = None
self.plotting_library = None
self.log_level = None
[docs] def add_hough_properties(
self,
num_slopes,
xbins,
ybins,
min_wavelength,
max_wavelength,
range_tolerance,
linearity_tolerance,
):
"""
Add the hough transform configuration of the RASCAL Calibrator.
Parameters
----------
num_slopes: int
Number of slopes to consider during Hough transform
xbins: int
Number of bins for Hough accumulation
ybins: int
Number of bins for Hough accumulation
min_wavelength: float
Minimum wavelength of the spectrum.
max_wavelength: float
Maximum wavelength of the spectrum.
range_tolerance: float
Estimation of the error on the provided spectral range
e.g. 3000-5000 with tolerance 500 will search for
solutions that may satisfy 2500-5500
linearity_tolerance: float
A toleranceold (Ansgtroms) which defines some padding around the
range tolerance to allow for non-linearity. This should be the
maximum expected excursion from linearity.
"""
self.num_slopes = num_slopes
self.xbins = xbins
self.ybins = ybins
self.min_wavelength = min_wavelength
self.max_wavelength = max_wavelength
self.range_tolerance = range_tolerance
self.linearity_tolerance = linearity_tolerance
[docs] def remove_hough_properties(self):
"""
Remove the hough transform configuration of the RASCAL Calibrator.
This does NOT remove the hough transform configuration INSIDE the
calibrator.
"""
self.num_slopes = None
self.xbins = None
self.ybins = None
self.min_wavelength = None
self.max_wavelength = None
self.range_tolerance = None
self.linearity_tolerance = None
[docs] def add_ransac_properties(
self,
sample_size,
top_n_candidate,
linear,
filter_close,
ransac_tolerance,
candidate_weighted,
hough_weight,
minimum_matches,
minimum_peak_utilisation,
minimum_fit_error,
):
"""
Add the RANSAC properties of the RASCAL Calibrator.
Parameters
----------
sample_size: int
Number of pixel-wavelength hough pairs to be used for each arc line
being picked.
top_n_candidate: int
Top ranked lines to be fitted.
linear: bool
True to use the hough transformed gradient, otherwise, use the
known polynomial.
filter_close: bool
Remove the pairs that are out of bounds in the hough space.
ransac_tolerance: float
The distance criteria (Angstroms) to be considered an inlier to a
fit. This should be close to the size of the expected residuals on
the final fit (e.g. 1A is typical)
candidate_weighted: bool
Set to True to down-weight pairs that are far from the fit.
hough_weight: float or None
Set to use the hough space to weigh the fit. The theoretical
optimal weighting is unclear. The larger the value, the heavily it
relies on the overdensity in the hough space for a good fit.
"""
self.sample_size = sample_size
self.top_n_candidate = top_n_candidate
self.linear = linear
self.filter_close = filter_close
self.ransac_tolerance = ransac_tolerance
self.candidate_weighted = candidate_weighted
self.hough_weight = hough_weight
self.minimum_matches = minimum_matches
self.minimum_peak_utilisation = minimum_peak_utilisation
self.minimum_fit_error = minimum_fit_error
[docs] def remove_ransac_properties(self):
"""
Remove the RANSAC properties of the RASCAL Calibrator.
"""
self.sample_size = None
self.top_n_candidate = None
self.linear = None
self.filter_close = None
self.ransac_tolerance = None
self.candidate_weighted = None
self.hough_weight = None
self.minimum_matches = None
self.minimum_peak_utilisation = None
self.minimum_fit_error = None
[docs] def add_fit_output_final(
self,
fit_coeff,
matched_peaks,
matched_atlas,
rms,
residual,
peak_utilisation,
atlas_utilisation,
):
"""
Add the final accepted polynomial solution.
Parameters
----------
fit_coeff: list
Set the baseline of the least square fit. If no fits outform this
set of polynomial coefficients, this will be used as the best fit.
matched_peaks: list
List of matched peaks
matched_atlas: list
List of matched atlas lines
rms: float
The root-mean-squared of the fit
residual: list
The residual of each fitted peak
peak_utilisation: float
The fraction of the input peaks used in the fit
atlas_utilisation: float
The fraction of the input atlas used in the fit
"""
# add assertion here
self.fit_coeff = fit_coeff
self.matched_peaks = matched_peaks
self.matched_atlas = matched_atlas
self.rms = rms
self.residual = residual
self.peak_utilisation = peak_utilisation
self.atlas_utilisation = atlas_utilisation
[docs] def remove_fit_output_final(self):
"""
Remove the accepted polynomial solultion.
"""
self.fit_coeff = None
self.matched_peaks = None
self.matched_atlas = None
self.rms = None
self.residual = None
self.peak_utilisation = None
self.atlas_utilisation = None
[docs] def add_fit_output_rascal(
self,
fit_coeff,
matched_peaks,
matched_atlas,
rms,
residual,
peak_utilisation,
atlas_utilisation,
):
"""
Add the RASCAL polynomial solution.
Parameters
----------
fit_coeff: list
Set the baseline of the least square fit. If no fits outform this
set of polynomial coefficients, this will be used as the best fit.
matched_peaks: list
List of matched peaks
matched_atlas: list
List of matched atlas lines
rms: float
The root-mean-squared of the fit
residual: list
The residual of each fitted peak
peak_utilisation: float
The fraction of the input peaks used in the fit
atlas_utilisation: float
The fraction of the input atlas used in the fit
"""
# add assertion here
self.fit_coeff_rascal = fit_coeff
self.matched_peaks_rascal = matched_peaks
self.matched_atlas_rascal = matched_atlas
self.rms_rascal = rms
self.residual_rascal = residual
self.peak_utilisation_rascal = peak_utilisation
self.atlas_utilisation_rascal = atlas_utilisation
self.add_fit_output_final(
fit_coeff,
matched_peaks,
matched_atlas,
rms,
residual,
peak_utilisation,
atlas_utilisation,
)
[docs] def remove_fit_output_rascal(self):
"""
Remove the RASCAL polynomial solution.
"""
self.fit_coeff_rascal = None
self.matched_peaks_rascal = None
self.matched_atlas_rascal = None
self.rms_rascal = None
self.residual_rascal = None
self.peak_utilisation_rascal = None
self.atlas_utilisation_rascal = None
[docs] def add_fit_output_refine(
self,
fit_coeff,
matched_peaks,
matched_atlas,
rms,
residual,
peak_utilisation,
atlas_utilisation,
):
"""
Add the refined RASCAL polynomial solution.
Parameters
----------
fit_coeff: list
Set the baseline of the least square fit. If no fits outform this
set of polynomial coefficients, this will be used as the best fit.
matched_peaks: list
List of matched peaks
matched_atlas: list
List of matched atlas lines
rms: float
The root-mean-squared of the fit
residual: list
The residual of each fitted peak
peak_utilisation: float
The fraction of the input peaks used in the fit
atlas_utilisation: float
The fraction of the input atlas used in the fit
"""
# add assertion here
self.fit_coeff_refine = fit_coeff
self.matched_peaks_refine = matched_peaks
self.matched_atlas_refine = matched_atlas
self.rms_refine = rms
self.residual_refine = residual
self.peak_utilisation_refine = peak_utilisation
self.atlas_utilisation_refine = atlas_utilisation
self.add_fit_output_final(
fit_coeff,
matched_peaks,
matched_atlas,
rms,
residual,
peak_utilisation,
atlas_utilisation,
)
[docs] def remove_fit_output_refine(self):
"""
Remove the refined RASCAL polynomial solution.
"""
self.fit_coeff_refine = None
self.matched_peaks_refine = None
self.matched_atlas_refine = None
self.rms_refine = None
self.residual_refine = None
self.peak_utilisation_refine = None
self.atlas_utilisation_refine = None
[docs] def add_wavelength(self, wave):
"""
Add the wavelength of each effective pixel.
Parameters
----------
wave: list or 1d-array
The wavelength values at each effective pixel.
"""
self.wave = wave
# Note that the native pixels have varing bin size.
self.wave_bin = np.nanmedian(np.array(np.ediff1d(wave)))
self.wave_start = np.min(wave)
self.wave_end = np.max(wave)
[docs] def remove_wavelength(self):
"""
Remove the wavelength of each effective pixel.
"""
self.wave = None
[docs] def add_wavelength_resampled(self, wave_resampled):
"""
Add the wavelength of the resampled spectrum which has an evenly
distributed wavelength spacing.
Parameters
----------
wave: list or 1d-array
The resampled wavelength values.
"""
# We assume that the resampled spectrum has fixed bin size
self.wave_bin = np.nanmedian(np.array(np.ediff1d(wave_resampled)))
self.wave_start = np.min(wave_resampled)
self.wave_end = np.max(wave_resampled)
self.wave_resampled = wave_resampled
[docs] def remove_wavelength_resampled(self):
"""
Add the resampled wavelength.
"""
self.wave_bin = None
self.wave_start = None
self.wave_end = None
self.wave_resampled = None
[docs] def add_count_resampled(
self, count_resampled, count_err_resampled, count_sky_resampled
):
"""
Add the photoelectron counts of the resampled spectrum which has
an evenly distributed wavelength spacing.
Parameters
----------
count_resampled: list or 1d-array
The resampled photoelectron count.
count_err_resampled: list or 1d-array
The uncertainty of the resampled photoelectron count.
count_sky_resampled: list or 1d-array
The background sky level of the resampled photoelectron count.
"""
self.count_resampled = count_resampled
self.count_err_resampled = count_err_resampled
self.count_sky_resampled = count_sky_resampled
[docs] def remove_count_resampled(self):
"""
Remove the photoelectron counts of the resampled spectrum.
"""
self.count_resampled = None
self.count_err_resampled = None
self.count_sky_resampled = None
[docs] def add_standard_star(self, library, target):
"""
Add the name of the standard star and its source.
Parameters
----------
library: str
The name of the colelction of standard stars.
target: str
The name of the standard star.
"""
self.library = library
self.target = target
[docs] def remove_standard_star(self):
"""
Remove the name of the standard star and its source.
"""
self.library = None
self.target = None
[docs] def add_smoothing(self, smooth, slength, sorder):
"""
Add the SG smoothing parameters for computing the sensitivity curve.
Parameters
----------
smooth: bool
Indicate if smoothing was applied.
slength: int
The size of the smoothing window.
sorder: int
The order of polynomial used for SG smoothing.
"""
self.smooth = smooth
self.slength = slength
self.sorder = sorder
[docs] def remove_smoothing(self):
"""
Remove the smoothing parameters for computing the sensitivity curve.
"""
self.smooth = None
self.slength = None
self.sorder = None
[docs] def add_sensitivity_func(self, sensitivity_func):
"""
Add the callable function of the sensitivity curve.
Parameters
----------
sensitivity_func: callable function
The sensitivity curve as a function of wavelength.
"""
self.sensitivity_func = sensitivity_func
[docs] def remove_sensitivity_func(self):
"""
Remove the sensitivity curve function.
"""
self.sensitivity_func = None
[docs] def add_sensitivity(self, sensitivity):
"""
Add the sensitivity values for each pixel (the list from dividing
the literature standard by the photoelectron count).
Parameters
----------
sensitivity: list or 1-d array
The sensitivity at the effective pixel.
"""
self.sensitivity = sensitivity
[docs] def remove_sensitivity(self):
"""
Remove the sensitivity values.
"""
self.sensitivity = None
[docs] def add_sensitivity_resampled(self, sensitivity_resampled):
"""
Add the sensitivity after the spectrum is resampled.
Parameters
----------
sensitivity: list or 1-d array
The sensitivity in the reasmpled space.
"""
self.sensitivity_resampled = sensitivity_resampled
[docs] def remove_sensitivity_resampled(self):
"""
Remove the sensitivity after the spectrum is resampled.
"""
self.sensitivity_resampled = None
[docs] def add_literature_standard(self, wave_literature, flux_literature):
"""
Add the literature wavelength and flux values of the standard star
used for calibration.
Parameters
----------
wave_literature: list or 1-d array
The wavelength values of the literature standard used.
flux_literature: list or 1-d array
The flux values of the literature standard used.
"""
self.wave_literature = wave_literature
self.flux_literature = flux_literature
[docs] def remove_literature_standard(self):
"""
Remove the literature wavelength and flux values of the standard
star used for calibration
"""
self.wave_literature = None
self.flux_literature = None
[docs] def add_count_continuum(self, count_continuum):
"""
Add the continuum count value (should be the same size as count).
Parameters
----------
count_continuum: list or 1-d array
The photoelectron count of the continuum.
"""
self.count_continuum = count_continuum
[docs] def remove_count_continuum(self):
"""
Remove the continuum count values.
"""
self.count_continuum = None
[docs] def add_count_resampled_continuum(self, count_resampled_continuum):
"""
Add the continuum count_resampled value (should be the same size as
count_resampled).
Parameters
----------
count_resampled_continuum: list or 1-d array
The photoelectron count of the continuum at the resampled
wavelength.
"""
self.count_resampled_continuum = count_resampled_continuum
[docs] def remove_count_resampled_continuum(self):
"""
Remove the continuum count_resampled values.
"""
self.count_resampled_continuum = None
[docs] def add_flux_continuum(self, flux_continuum):
"""
Add the continuum flux value (should be the same size as flux).
Parameters
----------
flux_continuum: list or 1-d array
The flux of the continuum.
"""
self.flux_continuum = flux_continuum
[docs] def remove_flux_continuum(self):
"""
Remove the continuum flux values.
"""
self.flux_continuum = None
[docs] def add_flux_resampled_continuum(self, flux_resampled_continuum):
"""
Add the continuum flux_resampled value (should be the same size as
flux_resampled).
Parameters
----------
flux_resampled_continuum: list or 1-d array
The flux of the continuum at the resampled wavelength.
"""
self.flux_resampled_continuum = flux_resampled_continuum
[docs] def remove_flux_resampled_continuum(self):
"""
Remove the continuum flux_resampled values.
"""
self.flux_resampled_continuum = None
[docs] def add_telluric_func(self, telluric_func):
"""
Add the Telluric interpolated function.
Parameters
----------
telluric: Callable function
A callable function to interpolate the telluric features [0, 1]
"""
self.telluric_func = telluric_func
[docs] def remove_telluric_func(self):
"""
Remove the Telluric interpolated function.
"""
self.telluric_func = None
[docs] def add_telluric_profile(self, telluric_profile):
"""
Add the Telluric profile - relative intensity at each pixel.
Parameters
----------
telluric_profile: list or numpy.ndarray
The relative intensity of the telluric absorption strength [0, 1]
"""
self.telluric_profile = telluric_profile
[docs] def remove_telluric_profile(self):
"""
Remove the Telluric profile.
"""
self.telluric_profile = None
[docs] def add_telluric_profile_resampled(self, telluric_profile_resampled):
"""
Add the Telluric profile - relative intensity at each resampled
wavelength.
Parameters
----------
telluric_profile: list or numpy.ndarray
The relative intensity of the telluric absorption strength [0, 1]
"""
self.telluric_profile_resampled = telluric_profile_resampled
[docs] def remove_telluric_profile_resampled(self):
"""
Remove the Telluric profile at the resampled wavelengths.
"""
self.telluric_profile_resampled = None
[docs] def add_telluric_factor(self, telluric_factor):
"""
Add the Telluric factor.
Parameters
----------
telluric_factor: float
The multiplier to the telluric profile that minimises the sum
of the deviation of corrected spectrum from the continuum
spectrum.
"""
self.telluric_factor = telluric_factor
[docs] def remove_telluric_factor(self):
"""
Remove the Telluric factor.
"""
self.telluric_factor = None
[docs] def add_telluric_factor_resampled(self, telluric_factor_resampled):
"""
Add the Telluric factor found using the resampled flux.
Parameters
----------
telluric_factor_resampled: float
The multiplier to the telluric profile that minimises the sum
of the deviation of corrected spectrum from the resampled
continuum spectrum.
"""
self.telluric_factor_resampled = telluric_factor_resampled
[docs] def remove_telluric_factor_resampled(self):
"""
Remove the Telluric factor for the resampled spectrum.
"""
self.telluric_factor_resampled = None
[docs] def add_flux(self, flux, flux_err, flux_sky):
"""
Add the flux and the associated uncertainty and sky background
in the raw pixel sampling.
Parameters
----------
flux: list or numpy.ndarray
The flux at each extracted column of pixels.
flux_err: list or numpy.ndarray
The uncertainty in flux at each extracted column of pixels.
flux_sky: list or numpy.ndarray
The flux of the background sky at each extracted column of pixels.
"""
self.flux = flux
self.flux_err = flux_err
self.flux_sky = flux_sky
[docs] def remove_flux(self):
"""
Remove the flux, uncertainty of flux, and background sky flux.
"""
self.flux = None
self.flux_err = None
self.flux_sky = None
[docs] def add_flux_resampled(
self, flux_resampled, flux_err_resampled, flux_sky_resampled
):
"""
Add the flux and the associated uncertainty and sky background
of the resampled spectrum.
Parameters
----------
flux: list or numpy.ndarray
The flux at the resampled wavelenth.
flux_err: list or numpy.ndarray
The uncertainty in flux at the resampled wavelenth.
flux_sky: list or numpy.ndarray
The flux of the background sky at the resampled wavelenth.
"""
self.flux_resampled = flux_resampled
self.flux_err_resampled = flux_err_resampled
self.flux_sky_resampled = flux_sky_resampled
[docs] def remove_flux_resampled(self):
"""
Remove the flux, uncertainty of flux, and background sky flux
of the resampled spectrum.
"""
self.flux_resampled = None
self.flux_err_resampled = None
self.flux_sky_resampled = None
def _modify_imagehdu_data(self, hdulist, idx, method, *args):
"""
Wrapper function to modify the data of an ImageHDU object.
"""
method_to_call = getattr(hdulist[idx].data, method)
method_to_call(*args)
def _modify_imagehdu_header(self, hdulist, idx, method, *args):
"""
Wrapper function to modify the header of an ImageHDU object.
e.g.
method = 'set'
args = 'BUNIT', 'Angstroms'
method_to_call(*args) becomes hdu[idx].header.set('BUNIT', 'Angstroms')
"""
method_to_call = getattr(hdulist[idx].header, method)
method_to_call(*args)
[docs] def create_trace_fits(self):
"""
Create an ImageHDU for the trace.
"""
try:
# Use the header of the spectrum
if self.spectrum_header is not None:
trace_ImageHDU = fits.ImageHDU(
self.trace, header=self.spectrum_header
)
trace_sigma_ImageHDU = fits.ImageHDU(
self.trace_sigma, header=self.spectrum_header
)
else:
trace_ImageHDU = fits.ImageHDU(self.trace)
trace_sigma_ImageHDU = fits.ImageHDU(self.trace_sigma)
# Create an empty HDU list and populate with ImageHDUs
self.trace_hdulist = fits.HDUList()
self.trace_hdulist += [trace_ImageHDU]
self.trace_hdulist += [trace_sigma_ImageHDU]
# Add the trace
self.modify_trace_header(0, "set", "EXTNAME", "Trace")
self.modify_trace_header(0, "set", "LABEL", "Trace")
self.modify_trace_header(0, "set", "CRPIX1", 1)
self.modify_trace_header(0, "set", "CDELT1", 1)
self.modify_trace_header(0, "set", "CRVAL1", self.pixel_list[0])
self.modify_trace_header(0, "set", "CTYPE1", "Pixel (Dispersion)")
self.modify_trace_header(0, "set", "CUNIT1", "Pixel")
self.modify_trace_header(0, "set", "BUNIT", "Pixel (Spatial)")
# Add the trace_sigma
self.modify_trace_header(1, "set", "EXTNAME", "Trace width/sigma")
self.modify_trace_header(1, "set", "LABEL", "Trace width/sigma")
self.modify_trace_header(1, "set", "CRPIX1", 1)
self.modify_trace_header(1, "set", "CDELT1", 1)
self.modify_trace_header(1, "set", "CRVAL1", self.pixel_list[0])
self.modify_trace_header(1, "set", "CTYPE1", "Pixel (Dispersion)")
self.modify_trace_header(1, "set", "CUNIT1", "Number of Pixels")
self.modify_trace_header(1, "set", "BUNIT", "Pixel (Spatial)")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("trace ImageHDU cannot be created.")
self.trace_hdulist = None
[docs] def create_count_fits(self):
"""
Create an ImageHDU for the extracted spectrum in photoelectron counts.
"""
try:
# Use the header of the spectrum
if self.spectrum_header is not None:
count_ImageHDU = fits.ImageHDU(
self.count, header=self.spectrum_header
)
count_err_ImageHDU = fits.ImageHDU(
self.count_err, header=self.spectrum_header
)
count_sky_ImageHDU = fits.ImageHDU(
self.count_sky, header=self.spectrum_header
)
else:
count_ImageHDU = fits.ImageHDU(self.count)
count_err_ImageHDU = fits.ImageHDU(self.count_err)
count_sky_ImageHDU = fits.ImageHDU(self.count_sky)
# Create an empty HDU list and populate with ImageHDUs
self.count_hdulist = fits.HDUList()
self.count_hdulist += [count_ImageHDU]
self.count_hdulist += [count_err_ImageHDU]
self.count_hdulist += [count_sky_ImageHDU]
# Add the count
self.modify_count_header(0, "set", "WIDTHDN", self.widthdn)
self.modify_count_header(0, "set", "WIDTHUP", self.widthup)
self.modify_count_header(0, "set", "SEPDN", self.sepdn)
self.modify_count_header(0, "set", "SEPUP", self.sepup)
self.modify_count_header(0, "set", "SKYDN", self.skywidthdn)
self.modify_count_header(0, "set", "SKYUP", self.skywidthup)
self.modify_count_header(0, "set", "XTYPE", self.extraction_type)
self.modify_count_header(0, "set", "EXTNAME", "Electron Count")
self.modify_count_header(0, "set", "LABEL", "Electron Count")
self.modify_count_header(0, "set", "CRPIX1", 1)
self.modify_count_header(0, "set", "CDELT1", 1)
self.modify_count_header(0, "set", "CRVAL1", self.pixel_list[0])
self.modify_count_header(0, "set", "CTYPE1", " Pixel (Dispersion)")
self.modify_count_header(0, "set", "CUNIT1", "Pixel")
self.modify_count_header(0, "set", "BUNIT", "electron")
# Add the uncertainty count
self.modify_count_header(
1, "set", "EXTNAME", "Electron Count (Uncertainty)"
)
self.modify_count_header(
1, "set", "LABEL", "Electron Count (Uncertainty)"
)
self.modify_count_header(1, "set", "CRPIX1", 1)
self.modify_count_header(1, "set", "CDELT1", 1)
self.modify_count_header(1, "set", "CRVAL1", self.pixel_list[0])
self.modify_count_header(1, "set", "CTYPE1", "Pixel (Dispersion)")
self.modify_count_header(1, "set", "CUNIT1", "Pixel")
self.modify_count_header(1, "set", "BUNIT", "electron")
# Add the sky count
self.modify_count_header(
2, "set", "EXTNAME", "Electron Count (Sky)"
)
self.modify_count_header(2, "set", "LABEL", "Electron Count (Sky)")
self.modify_count_header(2, "set", "CRPIX1", 1)
self.modify_count_header(2, "set", "CDELT1", 1)
self.modify_count_header(2, "set", "CRVAL1", self.pixel_list[0])
self.modify_count_header(2, "set", "CTYPE1", "Pixel (Dispersion)")
self.modify_count_header(2, "set", "CUNIT1", "Pixel")
self.modify_count_header(2, "set", "BUNIT", "electron")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("count ImageHDU cannot be created.")
self.count_hdulist = None
[docs] def create_weight_map_fits(self):
"""
Create an ImageHDU for the extraction profile weight function.
"""
try:
# Use the header of the spectrum
if self.spectrum_header is not None:
weight_map_ImageHDU = fits.ImageHDU(
self.var, header=self.spectrum_header
)
else:
weight_map_ImageHDU = fits.ImageHDU(self.var)
# Create an empty HDU list and populate with ImageHDUs
self.weight_map_hdulist = fits.HDUList()
self.weight_map_hdulist += [weight_map_ImageHDU]
# Add the extraction weights
self.modify_weight_map_header(
"set", "EXTNAME", "Optimal Extraction Profile"
)
self.modify_weight_map_header(
"set", "LABEL", "Optimal Extraction Profile"
)
if self.var is not None:
self.modify_weight_map_header("set", "CRVAL1", len(self.var))
self.modify_weight_map_header("set", "CRPIX1", 1)
self.modify_weight_map_header("set", "CDELT1", 1)
self.modify_weight_map_header(
"set", "CTYPE1", "Pixel (Spatial)"
)
self.modify_weight_map_header("set", "CUNIT1", "Pixel")
self.modify_weight_map_header("set", "BUNIT", "weights")
else:
self.modify_weight_map_header(
"set", "COMMENT", "Extraction Profile is not available."
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("A weight map ImageHDU cannot be created.")
self.weight_map_hdulist = None
[docs] def create_count_resampled_fits(self):
"""
Create an ImageHDU for the extracted spectrum in photoelectron count
at the resampled wavelengths.
"""
try:
# Use the header of the spectrum
if self.spectrum_header is not None:
count_resampled_ImageHDU = fits.ImageHDU(
self.count_resampled, header=self.spectrum_header
)
count_err_resampled_ImageHDU = fits.ImageHDU(
self.count_err_resampled, header=self.spectrum_header
)
count_sky_resampled_ImageHDU = fits.ImageHDU(
self.count_sky_resampled, header=self.spectrum_header
)
else:
count_resampled_ImageHDU = fits.ImageHDU(self.count_resampled)
count_err_resampled_ImageHDU = fits.ImageHDU(
self.count_err_resampled
)
count_sky_resampled_ImageHDU = fits.ImageHDU(
self.count_sky_resampled
)
# Create an empty HDU list and populate with ImageHDUs
self.count_resampled_hdulist = fits.HDUList()
self.count_resampled_hdulist += [count_resampled_ImageHDU]
self.count_resampled_hdulist += [count_err_resampled_ImageHDU]
self.count_resampled_hdulist += [count_sky_resampled_ImageHDU]
# Add the resampled count
self.modify_count_resampled_header(
0, "set", "EXTNAME", "Resampled Electron Count"
)
self.modify_count_resampled_header(
0, "set", "LABEL", "Resampled Electron Count"
)
self.modify_count_resampled_header(0, "set", "CRPIX1", 1.00e00)
self.modify_count_resampled_header(
0, "set", "CDELT1", self.wave_bin
)
self.modify_count_resampled_header(
0, "set", "CRVAL1", self.wave_start
)
self.modify_count_resampled_header(
0, "set", "CTYPE1", "Wavelength"
)
self.modify_count_resampled_header(0, "set", "CUNIT1", "Angstroms")
self.modify_count_resampled_header(0, "set", "BUNIT", "electron")
# Add the resampled uncertainty count
self.modify_count_resampled_header(
1, "set", "EXTNAME", "Resampled Electron Count (Uncertainty)"
)
self.modify_count_resampled_header(
1, "set", "LABEL", "Resampled Electron Count (Uncertainty)"
)
self.modify_count_resampled_header(1, "set", "CRPIX1", 1.00e00)
self.modify_count_resampled_header(
1, "set", "CDELT1", self.wave_bin
)
self.modify_count_resampled_header(
1, "set", "CRVAL1", self.wave_start
)
self.modify_count_resampled_header(
1, "set", "CTYPE1", "Wavelength"
)
self.modify_count_resampled_header(1, "set", "CUNIT1", "Angstroms")
self.modify_count_resampled_header(1, "set", "BUNIT", "electron")
# Add the resampled sky count
self.modify_count_resampled_header(
2, "set", "EXTNAME", "Resampled Electron Count (Sky)"
)
self.modify_count_resampled_header(
2, "set", "LABEL", "Resampled Electron Count (Sky)"
)
self.modify_count_resampled_header(2, "set", "CRPIX1", 1.00e00)
self.modify_count_resampled_header(
2, "set", "CDELT1", self.wave_bin
)
self.modify_count_resampled_header(
2, "set", "CRVAL1", self.wave_start
)
self.modify_count_resampled_header(
2, "set", "CTYPE1", "Wavelength"
)
self.modify_count_resampled_header(2, "set", "CUNIT1", "Angstroms")
self.modify_count_resampled_header(2, "set", "BUNIT", "electron")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("count_resampled ImageHDU cannot be created.")
self.count_resampled_hdulist = None
[docs] def create_arc_spec_fits(self):
"""
Create an ImageHDU for the spectrum of the arc lamp.
"""
try:
# Use the header of the arc
if self.arc_header is not None:
arc_spec_ImageHDU = fits.ImageHDU(
self.arc_spec, header=self.arc_header
)
peaks_ImageHDU = fits.ImageHDU(
self.peaks, header=self.arc_header
)
peaks_refined_ImageHDU = fits.ImageHDU(
self.peaks_refined, header=self.arc_header
)
else:
arc_spec_ImageHDU = fits.ImageHDU(self.arc_spec)
peaks_ImageHDU = fits.ImageHDU(self.peaks)
peaks_refined_ImageHDU = fits.ImageHDU(self.peaks_refined)
# Create an empty HDU list and populate with ImageHDUs
self.arc_spec_hdulist = fits.HDUList()
self.arc_spec_hdulist += [arc_spec_ImageHDU]
self.arc_spec_hdulist += [peaks_ImageHDU]
self.arc_spec_hdulist += [peaks_refined_ImageHDU]
# Add the arc spectrum
self.modify_arc_spec_header(
0, "set", "EXTNAME", "Arc Electron Count"
)
self.modify_arc_spec_header(
0, "set", "LABEL", "Arc Electron Count"
)
self.modify_arc_spec_header(0, "set", "CRPIX1", 1)
self.modify_arc_spec_header(0, "set", "CDELT1", 1)
self.modify_arc_spec_header(0, "set", "CRVAL1", self.pixel_list[0])
self.modify_arc_spec_header(
0, "set", "CTYPE1", "Pixel (Dispersion)"
)
self.modify_arc_spec_header(0, "set", "CUNIT1", "Pixel")
self.modify_arc_spec_header(0, "set", "BUNIT", "electron")
# Add the peaks in native pixel value
self.modify_arc_spec_header(
1, "set", "EXTNAME", "Peaks (Detector Pixel)"
)
self.modify_arc_spec_header(
1, "set", "LABEL", "Peaks (Detector Pixel)"
)
self.modify_arc_spec_header(1, "set", "BUNIT", "Pixel")
# Add the peaks in effective pixel value
self.modify_arc_spec_header(
2, "set", "EXTNAME", "Peaks (Effective Pixel)"
)
self.modify_arc_spec_header(
2, "set", "LABEL", "Peaks (Effective Pixel)"
)
self.modify_arc_spec_header(2, "set", "BUNIT", "Pixel")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("arc_spec ImageHDU cannot be created.")
self.arc_spec_hdulist = None
[docs] def create_wavecal_fits(self):
"""
Create an ImageHDU for the polynomial coeffcients of the
pixel-wavelength mapping function.
"""
try:
# Use the header of the arc
if self.arc_header is not None:
wavecal_ImageHDU = fits.ImageHDU(
self.fit_coeff, header=self.arc_header
)
else:
wavecal_ImageHDU = fits.ImageHDU(self.fit_coeff)
# Create an empty HDU list and populate with ImageHDUs
self.wavecal_hdulist = fits.HDUList()
self.wavecal_hdulist += [wavecal_ImageHDU]
# Add the wavelength calibration header info
self.modify_wavecal_header("set", "FTYPE", self.fit_type)
self.modify_wavecal_header("set", "FDEG", self.fit_deg)
self.modify_wavecal_header("set", "FFRMS", self.rms)
self.modify_wavecal_header(
"set", "ATLWMIN", self.min_atlas_wavelength
)
self.modify_wavecal_header(
"set", "ATLWMAX", self.max_atlas_wavelength
)
self.modify_wavecal_header("set", "NSLOPES", self.num_slopes)
self.modify_wavecal_header("set", "RNGTOL", self.range_tolerance)
self.modify_wavecal_header("set", "FITTOL", self.fit_tolerance)
self.modify_wavecal_header(
"set", "CANTHRE", self.candidate_tolerance
)
self.modify_wavecal_header(
"set", "LINTHRE", self.linearity_tolerance
)
self.modify_wavecal_header("set", "RANTHRE", self.ransac_tolerance)
self.modify_wavecal_header("set", "NUMCAN", self.num_candidates)
self.modify_wavecal_header("set", "XBINS", self.xbins)
self.modify_wavecal_header("set", "YBINS", self.ybins)
self.modify_wavecal_header("set", "BRUTE", self.brute_force)
self.modify_wavecal_header("set", "SAMSIZE", self.sample_size)
self.modify_wavecal_header("set", "TOPN", self.top_n)
self.modify_wavecal_header("set", "MAXTRY", self.max_tries)
self.modify_wavecal_header("set", "INCOEFF", self.intput_coeff)
self.modify_wavecal_header("set", "LINEAR", self.linear)
self.modify_wavecal_header("set", "W8ED", self.weighted)
self.modify_wavecal_header("set", "FILTER", self.filter_close)
self.modify_wavecal_header("set", "PUSAGE", self.peak_utilisation)
self.modify_wavecal_header("set", "EXTNAME", "Peak Electron Count")
self.modify_wavecal_header("set", "LABEL", "Peak Electron Count")
self.modify_wavecal_header("set", "CRPIX1", 1.00e00)
self.modify_wavecal_header("set", "CDELT1", self.wave_bin)
self.modify_wavecal_header("set", "CRVAL1", self.wave_start)
self.modify_wavecal_header("set", "CTYPE1", "Wavelength")
self.modify_wavecal_header("set", "CUNIT1", "Angstroms")
self.modify_wavecal_header("set", "BUNIT", "electron")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("wavecal ImageHDU cannot be created.")
self.wavecal_hdulist = None
[docs] def create_wavelength_fits(self):
"""
Create an ImageHDU for the wavelength at each of the native pixel.
"""
try:
# Put the data in an ImageHDU
# Use the header of the arc
if self.arc_header is not None:
wavelength_ImageHDU = fits.ImageHDU(
self.wave, header=self.arc_header
)
else:
wavelength_ImageHDU = fits.ImageHDU(self.wave)
# Create an empty HDU list and populate with the ImageHDU
self.wavelength_hdulist = fits.HDUList()
self.wavelength_hdulist += [wavelength_ImageHDU]
# Add the calibrated wavelength
self.modify_wavelength_header(
"set", "EXTNAME", "Pixel-Wavelength Mapping"
)
self.modify_wavelength_header(
"set", "LABEL", "Pixel-Wavelength Mapping"
)
self.modify_wavelength_header("set", "CRPIX1", 1)
self.modify_wavelength_header("set", "CDELT1", 1)
self.modify_wavelength_header("set", "CRVAL1", self.pixel_list[0])
self.modify_wavelength_header(
"set", "CTYPE1", "Pixel (Dispersion)"
)
self.modify_wavelength_header("set", "CUNIT1", "Pixel")
self.modify_wavelength_header("set", "BUNIT", "Angstroms")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("wavelength ImageHDU cannot be created.")
self.wavelength_hdulist = None
[docs] def create_sensitivity_fits(self):
"""
Create an ImageHDU for the sensitivity at each of the native pixel.
"""
try:
# Put the data in ImageHDUs
# Use the header of the standard
if self.standard_header is not None:
sensitivity_ImageHDU = fits.ImageHDU(
self.sensitivity, header=self.standard_header
)
else:
sensitivity_ImageHDU = fits.ImageHDU(self.sensitivity)
# Create an empty HDU list and populate with ImageHDUs
self.sensitivity_hdulist = fits.HDUList()
self.sensitivity_hdulist += [sensitivity_ImageHDU]
self.modify_sensitivity_header("set", "EXTNAME", "Sensitivity")
self.modify_sensitivity_header("set", "LABEL", "Sensitivity")
self.modify_sensitivity_header("set", "CRPIX1", 1.00e00)
self.modify_sensitivity_header("set", "CDELT1", 1)
self.modify_sensitivity_header("set", "CRVAL1", self.pixel_list[0])
self.modify_sensitivity_header("set", "CTYPE1", "Pixel")
self.modify_sensitivity_header("set", "CUNIT1", "Pixel")
self.modify_sensitivity_header(
"set", "BUNIT", "erg/(s*cm**2*Angstrom)/Count"
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("sensitivity ImageHDU cannot be created.")
self.sensitivity_hdulist = None
[docs] def create_flux_fits(self):
"""
Create an ImageHDU for the flux calibrated spectrum at each native
pixel.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = copy.deepcopy(self.spectrum_header)
if self.standard_header is not None:
if header is None:
header = copy.deepcopy(self.standard_header)
else:
header += self.standard_header
if self.arc_header is not None:
if header is None:
header = copy.deepcopy(self.arc_header)
else:
header += self.arc_header
if header is not None:
# Put the data in ImageHDUs
flux_ImageHDU = fits.ImageHDU(self.flux, header=header)
flux_err_ImageHDU = fits.ImageHDU(self.flux_err, header=header)
flux_sky_ImageHDU = fits.ImageHDU(self.flux_sky, header=header)
else:
flux_ImageHDU = fits.ImageHDU(self.flux)
flux_err_ImageHDU = fits.ImageHDU(self.flux_err)
flux_sky_ImageHDU = fits.ImageHDU(self.flux_sky)
# Create an empty HDU list and populate with ImageHDUs
self.flux_hdulist = fits.HDUList()
self.flux_hdulist += [flux_ImageHDU]
self.flux_hdulist += [flux_err_ImageHDU]
self.flux_hdulist += [flux_sky_ImageHDU]
# Note that wave_start is the centre of the starting bin
self.modify_flux_header(0, "set", "EXTNAME", "Flux")
self.modify_flux_header(0, "set", "LABEL", "Flux")
self.modify_flux_header(0, "set", "CRPIX1", 1.00e00)
self.modify_flux_header(0, "set", "CDELT1", 1)
self.modify_flux_header(0, "set", "CRVAL1", self.pixel_list[0])
self.modify_flux_header(0, "set", "CTYPE1", "Pixel")
self.modify_flux_header(0, "set", "CUNIT1", "Pixel")
self.modify_flux_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_header(1, "set", "EXTNAME", "Flux (Uncertainty)")
self.modify_flux_header(1, "set", "LABEL", "Flux (Uncertainty)")
self.modify_flux_header(1, "set", "CRPIX1", 1.00e00)
self.modify_flux_header(1, "set", "CDELT1", 1)
self.modify_flux_header(1, "set", "CRVAL1", self.pixel_list[0])
self.modify_flux_header(1, "set", "CTYPE1", "Pixel")
self.modify_flux_header(1, "set", "CUNIT1", "Pixel")
self.modify_flux_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_header(2, "set", "EXTNAME", "Flux (Sky)")
self.modify_flux_header(2, "set", "LABEL", "Flux (Sky)")
self.modify_flux_header(2, "set", "CRPIX1", 1.00e00)
self.modify_flux_header(2, "set", "CDELT1", 1)
self.modify_flux_header(2, "set", "CRVAL1", self.pixel_list[0])
self.modify_flux_header(2, "set", "CTYPE1", "Pixel")
self.modify_flux_header(2, "set", "CUNIT1", "Pixel")
self.modify_flux_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("flux ImageHDU cannot be created.")
self.flux_hdulist = None
[docs] def create_sensitivity_resampled_fits(self):
"""
Create an ImageHDU for the sensitivity at the resampled wavelength.
"""
try:
# Put the data in ImageHDUs
sensitivity_resampled_ImageHDU = fits.ImageHDU(
self.sensitivity_resampled
)
# Create an empty HDU list and populate with ImageHDUs
self.sensitivity_resampled_hdulist = fits.HDUList()
self.sensitivity_resampled_hdulist += [
sensitivity_resampled_ImageHDU
]
self.modify_sensitivity_resampled_header(
"set", "EXTNAME", "Resampled Sensitivity"
)
self.modify_sensitivity_resampled_header(
"set", "LABEL", "Resampled Sensitivity"
)
self.modify_sensitivity_resampled_header("set", "CRPIX1", 1.00e00)
self.modify_sensitivity_resampled_header("set", "CDELT1", 1)
self.modify_sensitivity_resampled_header(
"set", "CRVAL1", self.pixel_list[0]
)
self.modify_sensitivity_resampled_header("set", "CTYPE1", "Pixel")
self.modify_sensitivity_resampled_header("set", "CUNIT1", "Pixel")
self.modify_sensitivity_resampled_header(
"set", "BUNIT", "erg/(s*cm**2*Angstrom)/Count"
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("sensitivity ImageHDU cannot be created.")
self.sensitivity_resampled_hdulist = None
[docs] def create_flux_resampled_fits(self):
"""
Create an ImageHDU for the flux calibrated spectrum at the resampled
wavelength.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = copy.deepcopy(self.spectrum_header)
if self.standard_header is not None:
if header is None:
header = copy.deepcopy(self.standard_header)
else:
header += self.standard_header
if self.arc_header is not None:
if header is None:
header = copy.deepcopy(self.arc_header)
else:
header += self.arc_header
if header is not None:
# Put the data in ImageHDUs
flux_resampled_ImageHDU = fits.ImageHDU(
self.flux_resampled, header=header
)
flux_err_resampled_ImageHDU = fits.ImageHDU(
self.flux_err_resampled, header=header
)
flux_sky_resampled_ImageHDU = fits.ImageHDU(
self.flux_sky_resampled, header=header
)
else:
# Put the data in ImageHDUs
flux_resampled_ImageHDU = fits.ImageHDU(self.flux_resampled)
flux_err_resampled_ImageHDU = fits.ImageHDU(
self.flux_err_resampled
)
flux_sky_resampled_ImageHDU = fits.ImageHDU(
self.flux_sky_resampled
)
# Create an empty HDU list and populate with ImageHDUs
self.flux_resampled_hdulist = fits.HDUList()
self.flux_resampled_hdulist += [flux_resampled_ImageHDU]
self.flux_resampled_hdulist += [flux_err_resampled_ImageHDU]
self.flux_resampled_hdulist += [flux_sky_resampled_ImageHDU]
# Note that wave_start is the centre of the starting bin
self.modify_flux_resampled_header(
0, "set", "EXTNAME", "Resampled Flux"
)
self.modify_flux_resampled_header(
0, "set", "LABEL", "Resampled Flux"
)
self.modify_flux_resampled_header(0, "set", "CRPIX1", 1.00e00)
self.modify_flux_resampled_header(
0, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_header(
0, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_header(0, "set", "CTYPE1", "Wavelength")
self.modify_flux_resampled_header(0, "set", "CUNIT1", "Angstroms")
self.modify_flux_resampled_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_header(
1, "set", "EXTNAME", "Resampled Flux (Uncertainty)"
)
self.modify_flux_resampled_header(
1, "set", "LABEL", "Resampled Flux (Uncertainty)"
)
self.modify_flux_resampled_header(1, "set", "CRPIX1", 1.00e00)
self.modify_flux_resampled_header(
1, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_header(
1, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_header(1, "set", "CTYPE1", "Wavelength")
self.modify_flux_resampled_header(1, "set", "CUNIT1", "Angstroms")
self.modify_flux_resampled_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_header(
2, "set", "EXTNAME", "Resampled Flux (Sky)"
)
self.modify_flux_resampled_header(
2, "set", "LABEL", "Resampled Flux (Sky)"
)
self.modify_flux_resampled_header(2, "set", "CRPIX1", 1.00e00)
self.modify_flux_resampled_header(
2, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_header(
2, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_header(2, "set", "CTYPE1", "Wavelength")
self.modify_flux_resampled_header(2, "set", "CUNIT1", "Angstroms")
self.modify_flux_resampled_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("flux_resampled ImageHDU cannot be created.")
self.flux_resampled_hdulist = None
[docs] def remove_trace_fits(self):
"""
Remove the trace FITS HDUList.
"""
self.trace_hdulist = None
[docs] def remove_count_fits(self):
"""
Remove the count FITS HDUList.
"""
self.count_hdulist = None
[docs] def remove_count_resampled_fits(self):
"""
Remove the count_resampled FITS HDUList.
"""
self.count_resampled_hdulist = None
[docs] def remove_arc_spec_fits(self):
"""
Remove the arc_spec FITS HDUList.
"""
self.arc_spec_hdulist = None
[docs] def remove_wavecal_fits(self):
"""
Remove the wavecal FITS HDUList.
"""
self.wavecal_hdulist = None
[docs] def remove_flux_resampled_fits(self):
"""
Remove the flux_resampled FITS HDUList.
"""
self.flux_resampled_hdulist = None
[docs] def remove_wavelength_fits(self):
"""
Remove the wavelength FITS HDUList.
"""
self.wavelength_hdulist = None
[docs] def remove_weight_map_fits(self):
"""
Remove the weight_map FITS HDUList.
"""
self.weight_map = None
[docs] def remove_flux_fits(self):
"""
Remove the flux FITS HDUList.
"""
self.flux = None
[docs] def remove_wavelength_resampled_fits(self):
"""
Remove the resampled wavelength FITS HDUList.
"""
self.wavelength_resampled_hdulist = None
[docs] def remove_sensitivity_resampled_fits(self):
"""
Remove the sensitivity_resampled FITS HDUList.
"""
self.sensitivity_resampled = None
[docs] def create_fits(
self,
output,
recreate=True,
empty_primary_hdu=True,
return_hdu_list=False,
):
"""
Create a HDU list, with a choice of any combination of the
data, see below the 'output' parameters for details.
Parameters
----------
output: String
Type of data to be saved, the order is fixed (in the order of
the following description), but the options are flexible. The
input strings are delimited by "+":
trace: 2 HDUs
Trace, and trace width (pixel)
count: 3 HDUs
Count, uncertainty, and sky (pixel)
weight_map: 1 HDU
Weight (pixel)
arc_spec: 3 HDUs
1D arc spectrum, arc line position (pixel), and arc
line effective position (pixel)
wavecal: 1 HDU
Polynomial coefficients for wavelength calibration
wavelength: 1 HDU
Wavelength of each pixel
count_resampled: 3 HDUs
Resampled Count, uncertainty, and sky (wavelength)
sensitivity: 1 HDU
Sensitivity (pixel)
flux: 4 HDUs
Flux, uncertainty, and sky (pixel)
sensitivity_resampled: 1 HDU
Sensitivity (wavelength)
flux_resampled: 4 HDUs
Flux, uncertainty, and sky (wavelength)
recreate: boolean (Default: False)
Set to True to overwrite the FITS data and header.
empty_primary_hdu: boolean (Default: True)
Set to True to leave the Primary HDU blank.
return_hdu_list: boolean (Default: False)
Set to True to return the HDU List.
"""
output_split = output.split("+")
if recreate:
for k, v in self.hdu_content.items():
self.hdu_content[k] = False
# If the requested list of HDUs is already good to go
if set([k for k, v in self.hdu_content.items() if v]) == set(
output_split
):
# If there is an empty primary HDU, but requested without
if self.empty_primary_hdu & (not empty_primary_hdu):
self.hdu_output.pop(0)
self.empty_primary_hdu = False
# If there is not an empty primary HDU, but requested one
elif (not self.empty_primary_hdu) & empty_primary_hdu:
self.hdu_output.insert(0, fits.PrimaryHDU())
self.empty_primary_hdu = True
# Otherwise, the self.hdu_output does not need to be modified
else:
pass
# If the requested list is different or empty, (re)create the list
else:
self.hdu_output = None
# Empty list for appending HDU lists
hdu_output = fits.HDUList()
# If leaving the primary HDU empty
if empty_primary_hdu:
hdu_output.append(fits.PrimaryHDU())
# Join the list(s)
if "trace" in output_split:
if not self.hdu_content["trace"]:
self.create_trace_fits()
hdu_output += self.trace_hdulist
self.hdu_content["trace"] = True
self.logger.info('Added trace ImageHDU.')
if "count" in output_split:
if not self.hdu_content["count"]:
self.create_count_fits()
hdu_output += self.count_hdulist
self.hdu_content["count"] = True
self.logger.info('Added count ImageHDU.')
if "weight_map" in output_split:
if not self.hdu_content["weight_map"]:
self.create_weight_map_fits()
hdu_output += self.weight_map_hdulist
self.hdu_content["weight_map"] = True
self.logger.info('Added weight_map ImageHDU.')
if "arc_spec" in output_split:
if not self.hdu_content["arc_spec"]:
self.create_arc_spec_fits()
hdu_output += self.arc_spec_hdulist
self.hdu_content["arc_spec"] = True
self.logger.info('Added arc_spec ImageHDU.')
if "wavecal" in output_split:
if not self.hdu_content["wavecal"]:
self.create_wavecal_fits()
hdu_output += self.wavecal_hdulist
self.hdu_content["wavecal"] = True
self.logger.info('Added wavecal ImageHDU.')
if "wavelength" in output_split:
if not self.hdu_content["wavelength"]:
self.create_wavelength_fits()
hdu_output += self.wavelength_hdulist
self.hdu_content["wavelength"] = True
self.logger.info('Added wavelength ImageHDU.')
if "count_resampled" in output_split:
if not self.hdu_content["count_resampled"]:
self.create_count_resampled_fits()
hdu_output += self.count_resampled_hdulist
self.hdu_content["count_resampled"] = True
self.logger.info('Added count_resampled ImageHDU.')
if "sensitivity" in output_split:
if not self.hdu_content["sensitivity"]:
self.create_sensitivity_fits()
hdu_output += self.sensitivity_hdulist
self.hdu_content["sensitivity"] = True
self.logger.info('Added sensitivity ImageHDU.')
if "flux" in output_split:
if not self.hdu_content["flux"]:
self.create_flux_fits()
hdu_output += self.flux_hdulist
self.hdu_content["flux"] = True
self.logger.info('Added flux ImageHDU.')
if "sensitivity_resampled" in output_split:
if not self.hdu_content["sensitivity_resampled"]:
self.create_sensitivity_resampled_fits()
hdu_output += self.sensitivity_resampled_hdulist
self.hdu_content["sensitivity_resampled"] = True
self.logger.info('Added sensitivity_resampled ImageHDU.')
if "flux_resampled" in output_split:
if not self.hdu_content["flux_resampled"]:
self.create_flux_resampled_fits()
hdu_output += self.flux_resampled_hdulist
self.hdu_content["flux_resampled"] = True
self.logger.info('Added flux_resampled ImageHDU.')
# If the primary HDU is not chosen to be empty
if not empty_primary_hdu:
# Convert the first HDU to PrimaryHDU
hdu_output[0] = fits.PrimaryHDU(
hdu_output[0].data, hdu_output[0].header
)
hdu_output.update_extend()
self.empty_primary_hdu = False
self.logger.info('Updated the HDU extend without an empty PrimaryHDU.')
else:
hdu_output.update_extend()
self.empty_primary_hdu = True
self.logger.info('Updated the HDU extend with an empty PrimaryHDU.')
self.hdu_output = hdu_output
if return_hdu_list:
return self.hdu_output
[docs] def save_fits(
self,
output,
filename,
overwrite=False,
recreate=False,
empty_primary_hdu=True,
):
"""
Save the reduced data to disk, with a choice of any combination of the
data, see below the 'output' parameters for details.
Parameters
----------
output: String
Type of data to be saved, the order is fixed (in the order of
the following description), but the options are flexible. The
input strings are delimited by "+",
trace: 2 HDUs
Trace, and trace width (pixel)
count: 3 HDUs
Count, uncertainty, and sky (pixel)
weight_map: 1 HDU
Weight (pixel)
arc_spec: 3 HDUs
1D arc spectrum, arc line pixels, and arc line effective
pixels
wavecal: 1 HDU
Polynomial coefficients for wavelength calibration
wavelength: 1 HDU
Wavelength of each pixel
count_resampled: 3 HDUs
Resampled Count, uncertainty, and sky (wavelength)
sensitivity: 1 HDU
Sensitivity (pixel)
flux: 4 HDUs
Flux, uncertainty, and sky (pixel)
sensitivity: 1 HDU
Sensitivity (wavelength)
flux_resampled: 4 HDUs
Flux, uncertainty, and sky (wavelength)
filename: str
Filename for the output, all of them will share the same name but
will have different extension.
overwrite: boolean
Default is False.
recreate: boolean (Default: False)
Set to True to overwrite the FITS data and header.
empty_primary_hdu: boolean (Default: True)
Set to True to leave the Primary HDU blank (Default: True)
"""
self.create_fits(
output, recreate=recreate, empty_primary_hdu=empty_primary_hdu
)
# Save file to disk
self.hdu_output.writeto(filename + ".fits", overwrite=overwrite)
[docs] def save_csv(self, output, filename, overwrite, recreate):
"""
Save the reduced data to disk, with a choice of any combination of the
5 sets of data, see below the 'output' parameters for details.
Parameters
----------
output: String
Type of data to be saved, the order is fixed (in the order of
the following description), but the options are flexible. The
input strings are delimited by "+",
trace: 2 HDUs
Trace, and trace width (pixel)
count: 4 HDUs
Count, uncertainty, sky, and optimal flag (pixel)
weight_map: 1 HDU
Weight (pixel)
arc_spec: 3 HDUs
1D arc spectrum, arc line pixels, and arc line effective
pixels
wavecal: 1 HDU
Polynomial coefficients for wavelength calibration
wavelength: 1 HDU
Wavelength of each pixel
count_resampled: 3 HDUs
Resampled Count, uncertainty, and sky (wavelength)
sensitivity: 1 HDU
Sensitivity (pixel)
flux: 4 HDUs
Flux, uncertainty, and sky (pixel)
sensitivity: 1 HDU
Sensitivity (wavelength)
flux_resampled: 3 HDUs
Flux, uncertainty, and sky (wavelength)
filename: String
Disk location to be written to. Default is at where the
process/subprocess is execuated.
overwrite: boolean
Default is False.
recreate: boolean (Default: False)
Set to True to overwrite the FITS data and header.
"""
self.create_fits(output, recreate=recreate, empty_primary_hdu=False)
output_split = output.split("+")
start = 0
for output_type in self.hdu_order.keys():
if output_type in output_split:
end = start + self.n_hdu[output_type]
if output_type != "arc_spec":
output_data = np.column_stack(
[hdu.data for hdu in self.hdu_output[start:end]]
)
if overwrite or (
not os.path.exists(
filename + "_" + output_type + ".csv"
)
):
np.savetxt(
filename + "_" + output_type + ".csv",
output_data,
delimiter=",",
header=self.header[output_type],
)
else:
self.logger.warning(
filename
+ "_"
+ output_type
+ ".csv cannot be saved to disk. Please check "
+ "the path and/or set overwrite to True."
)
else:
output_data_arc_spec = self.hdu_output[start].data
output_data_arc_peaks = self.hdu_output[start + 1].data
output_data_arc_peaks_refined = self.hdu_output[
start + 2
].data
if overwrite or (
not os.path.exists(filename + "_arc_spec.csv")
):
np.savetxt(
filename + "_arc_spec.csv",
output_data_arc_spec,
delimiter=",",
header=self.header[output_type],
)
else:
self.logger.warning(
filename
+ "_arc_spec.csv cannot be saved to disk. Please "
+ "check the path and/or set overwrite to True."
)
if overwrite or (
not os.path.exists(filename + "_arc_peaks.csv")
):
np.savetxt(
filename + "_arc_peaks.csv",
output_data_arc_peaks,
delimiter=",",
header=self.header[output_type],
)
else:
self.logger.warning(
filename
+ "_arc_peaks.csv cannot be saved to disk. Please "
+ "check the path and/or set overwrite to True."
)
if overwrite or (
not os.path.exists(filename + "_arc_peaks_refined.csv")
):
np.savetxt(
filename + "_arc_peaks_refined.csv",
output_data_arc_peaks_refined,
delimiter=",",
header=self.header[output_type],
)
else:
self.logger.warning(
filename
+ "_arc_peaks_refined.csv cannot be saved to "
+ "disk. Please check the path and/or set "
+ "overwrite to True."
)
start = end