# -*- coding: utf-8 -*-
import datetime
import logging
import os
from typing import Callable, Union
import astropy
import numpy as np
from astropy.io import fits
from rascal.calibrator import Calibrator
from scipy import interpolate as itp
__all__ = ["SpectrumOneD"]
[docs]
class SpectrumOneD:
"""
Base class of a 1D spectral object to hold the information of each
extracted spectrum and the raw headers if was provided during the
data reduction. The FITS or CSV file output are all done here.
"""
def __init__(
self,
spec_id: Union[np.ndarray, list, int] = None,
verbose: bool = True,
logger_name: str = "SpectrumOneD",
log_level: str = "INFO",
log_file_folder: str = "default",
log_file_name: str = None,
):
"""
Initialise the object with a logger.
Parameters
----------
spec_id: int (Default: None)
The ID corresponding to the spectrum_oned object. Note that this
ID is unique in each reduction only.
verbose: boolean (Default: True)
Set to False to suppress all verbose warnings, except for
critical failure.
logger_name: str (Default: SpectrumOneD)
This will set the name of the logger, if the name is used already,
it will reference to the existing logger. This will be the
first part of the default log file name unless log_file_name is
provided.
log_level: str (Default: 'INFO')
Four levels of logging are available, in decreasing order of
information and increasing order of severity: (1) DEBUG, (2) INFO,
(3) WARNING, (4) ERROR and (5) CRITICAL. WARNING means that
there is suboptimal operations in some parts of that step. ERROR
means that the requested operation cannot be performed, but the
software can handle it by either using the default setting or
skipping the operation. CRITICAL means that the requested
operation cannot be resolved without human interaction, this is
most usually coming from missing data.
log_file_folder: None or str (Default: "default")
Folder in which the file is save, set to default to save to the
current path.
log_file_name: None or str (Default: None)
File name of the log, set to None to self.logger.warning to screen
only.
"""
# Set-up logger
self.logger = logging.getLogger(logger_name)
if (log_level.lower() == "critical") or (not verbose):
self.logger.setLevel(logging.CRITICAL)
self.log_level = "critical"
elif log_level.lower() == "error":
self.logger.setLevel(logging.ERROR)
self.log_level = "error"
elif log_level.lower() == "warning":
self.logger.setLevel(logging.WARNING)
self.log_level = "warning"
elif log_level.lower() == "info":
self.logger.setLevel(logging.INFO)
self.log_level = "info"
elif log_level.lower() == "debug":
self.logger.setLevel(logging.DEBUG)
self.log_level = "debug"
else:
raise ValueError("Unknonw logging level.")
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%a, %d %b %Y %H:%M:%S",
)
if log_file_name is None:
# Only print log to screen
self.handler = logging.StreamHandler()
else:
if log_file_name == "default":
t_str = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
log_file_name = "{logger_name}_{t_str}.log"
# Save log to file
if log_file_folder == "default":
log_file_folder = ""
self.handler = logging.FileHandler(
os.path.join(log_file_folder, log_file_name), "a+"
)
self.handler.setFormatter(formatter)
if self.logger.hasHandlers():
self.logger.handlers.clear()
self.logger.addHandler(self.handler)
# spectrum ID
if spec_id is None:
self.spec_id = 0
elif isinstance(spec_id, int):
self.spec_id = spec_id
else:
error_msg = (
"spec_id has to be of type int, "
+ "{} is given.".format(type(spec_id))
)
self.logger.critical(error_msg)
raise ValueError(error_msg)
# Reduction Meta-data
self.time_of_reduction = datetime.datetime.now().strftime(
"%Y_%m_%d_%H_%M_%S"
)
# Raw image headers
self.spectrum_header = None
self.arc_header = None
self.standard_header = None
# Detector properties
self.gain = None
self.readnoise = None
self.exptime = None
self.seeing = None
# Observing Condition
self.relative_humidity = None
self.pressure = None
self.temperature = None
self.airmass = None
# Trace properties
self.trace = None
self.trace_sigma = None
self.len_trace = None
self.effective_pixel = None
self.pixel_mapping_itp = None
self.widthdn = None
self.widthup = None
self.sepdn = None
self.sepup = None
self.skywidthdn = None
self.skywidthup = None
self.extraction_type = None
self.count = None
self.count_err = None
self.count_sky = None
self.var = None
self.profile = None
self.profile_func = None
# Wavelength calibration properties
self.arc_spec = None
self.peaks = None
self.peaks_refined = None
# fit constrains
self.calibrator = None
self.min_atlas_wavelength = None
self.max_atlas_wavelength = None
self.num_slopes = None
self.range_tolerance = None
self.fit_tolerance = None
self.fit_type = None
self.fit_deg = None
self.candidate_tolerance = None
self.linearity_tolerance = None
self.ransac_tolerance = None
self.num_candidates = None
self.xbins = None
self.ybins = None
self.brute_force = None
# fit config
self.sample_size = None
self.top_n = None
self.max_tries = None
self.intput_coeff = None
self.linear = None
self.weighted = None
self.filter_close = None
# fit output
self.fit_coeff = None
self.matched_peaks = None
self.matched_atlas = None
self.rms = None
self.residual = None
self.peak_utilisation = None
self.atlas_utilisation = None
# fit output
self.fit_coeff_rascal = None
self.matched_peaks_rascal = None
self.matched_atlas_rascal = None
self.rms_rascal = None
self.residual_rascal = None
self.peak_utilisation_rascal = None
self.atlas_utilisation_rascal = None
# fit output
self.fit_coeff_refine = None
self.matched_peaks_refine = None
self.matched_atlas_refine = None
self.rms_refine = None
self.residual_refine = None
self.peak_utilisation_refine = None
self.atlas_utilisation_refine = None
# fitted solution
self.wave = None
self.wave_bin = None
self.wave_start = None
self.wave_end = None
self.wave_resampled = None
self.count_resampled = None
self.count_err_resampled = None
self.count_sky_resampled = None
# Fluxes
self.flux = None
self.flux_err = None
self.flux_sky = None
self.flux_atm_ext_corrected = None
self.flux_err_atm_ext_corrected = None
self.flux_sky_atm_ext_corrected = None
self.flux_telluric_corrected = None
self.flux_err_telluric_corrected = None
self.flux_sky_telluric_corrected = None
self.flux_atm_ext_telluric_corrected = None
self.flux_err_atm_ext_telluric_corrected = None
self.flux_sky_atm_ext_telluric_corrected = None
self.flux_resampled = None
self.flux_err_resampled = None
self.flux_sky_resampled = None
self.flux_resampled_atm_ext_corrected = None
self.flux_err_resampled_atm_ext_corrected = None
self.flux_sky_resampled_atm_ext_corrected = None
self.flux_resampled_telluric_corrected = None
self.flux_err_resampled_telluric_corrected = None
self.flux_sky_resampled_telluric_corrected = None
self.flux_resampled_atm_ext_telluric_corrected = None
self.flux_err_resampled_atm_ext_telluric_corrected = None
self.flux_sky_resampled_atm_ext_telluric_corrected = None
# Continuum
self.count_continuum = None
self.count_resampled_continuum = None
self.flux_continuum = None
self.flux_resampled_continuum = None
# Sensitivity curve smoothing properties
self.smooth = None
self.lowess_kwargs = None
# standard star
self.library = None
self.target = None
# Tellurics
self.telluric_func = None
self.telluric_profile = None
self.telluric_profile_resampled = None
self.telluric_factor = 1.0
self.telluric_nudge_factor = 1.0
# Sensitivity curve properties
self.sensitivity = None
self.sensitivity_resampled = None
self.sensitivity_func = None
self.wave_literature = None
self.flux_literature = None
# Atmospheric extinction properties
self.atm_ext = None
self.atm_ext_resampled = None
# HDU lists for output
self.trace_hdulist = None
self.count_hdulist = None
self.weight_map_hdulist = None
self.arc_spec_hdulist = None
self.arc_lines_hdulist = None
self.wavecal_hdulist = None
self.wavelength_hdulist = None
self.wavelength_resampled_hdulist = None
self.count_resampled_hdulist = None
self.sensitivity_hdulist = None
self.flux_hdulist = None
self.atm_ext_hdulist = None
self.flux_atm_ext_corrected_hdulist = None
self.telluric_profile_hdulist = None
self.flux_telluric_corrected_hdulist = None
self.flux_atm_ext_telluric_corrected_hdulist = None
self.sensitivity_resampled_hdulist = None
self.flux_resampled_hdulist = None
self.atm_ext_resampled_hdulist = None
self.flux_resampled_atm_ext_corrected_hdulist = None
self.telluric_profile_resampled_hdulist = None
self.flux_resampled_telluric_corrected_hdulist = None
self.flux_resampled_atm_ext_telluric_corrected_hdulist = None
# FITS output properties
self.hdu_output = None
self.empty_primary_hdu = True
self.hdu_name = {
1: "trace",
2: "count",
3: "weight_map",
4: "arc_spec",
5: "arc_lines",
6: "wavecal_coefficients",
7: "wavelength",
8: "wavelength_resampled",
9: "count_resampled",
10: "sensitivity",
11: "flux",
12: "atm_ext",
13: "flux_atm_ext_corrected",
14: "telluric_profile",
15: "flux_telluric_corrected",
16: "flux_atm_ext_telluric_corrected",
17: "sensitivity_resampled",
18: "flux_resampled",
19: "atm_ext_resampled",
20: "flux_resampled_atm_ext_corrected",
21: "telluric_profile_resampled",
22: "flux_resampled_telluric_corrected",
23: "flux_resampled_atm_ext_telluric_corrected",
}
self.hdu_derscription = {
1: "Pixel positions of the trace in the spatial direction, "
+ "Width of the trace",
2: "Count, Count uncertainty, Sky count",
3: "Weight map of the extration (variance)",
4: "1D Arc spectrum",
5: "Arc line position, Arc line effective position",
6: "Polynomial coefficients for wavelength calibration",
7: "The pixel-to-wavelength mapping",
8: "The pixel-to-wavelength mapping in the resampled coordiates",
9: "Resampled count, Resampled count uncertainty, "
+ "Resampled sky count",
10: "Sensitivity curve",
11: "Flux, Flux uncertainty, Sky flux",
12: "Atmospheric extinction correction factor",
13: "Flux (atmospheric extinction corrected), "
+ "Flux uncertainty (atmospheric extinction corrected), "
+ "Sky flux (atmospheric extinction corrected)",
14: "Telluric absorption profile",
15: "Flux (telluric absorption corrected), "
+ "Flux Uncertainty (telluric absorption corrected), "
+ "Sky Flux (telluric absorption corrected)",
16: "Flux (atmospheric extinction and telluric absorption "
+ "corrected), Flux Uncertainty (atmospheric extinction and "
+ "telluric absorption corrected), Sky flux (atmospheric "
+ "extinction and telluric absorption corrected)",
17: "Resampled sensitivity curve",
18: "Resampled flux, Resampled flux uncertainty, "
+ "Resampled sky flux",
19: "Atmospheric extinction correction factor (resampled)",
20: "Flux (resampled, atmospheric extinction), "
+ "Flux uncertainty (resampled, atmospheric extinction), "
+ "Sky flux (resampled, atmospheric extinction)",
21: "Telluric absorption profile (resampled)",
22: "Flux (resampled, telluric absorption corrected), "
+ "Flux uncertainty (resampled, telluric absorption corrected), "
+ "Sky flux (resampled, telluric absorption corrected)",
23: "Flux (atmospheric extinction and telluric absorption "
+ "corrected), Flux uncertainty (atmospheric extinction and "
+ "telluric absorption corrected), Sky flux (atmospheric "
+ "extinction and telluric absorption corrected)",
}
self.ext_name = {
1: ["trace", "trace_sigma"],
2: ["count", "count_err", "count_sky"],
3: ["weight_map"],
4: ["arc_spec"],
5: ["peaks", "peaks_refined"],
6: ["polynomials"],
7: ["wavelength"],
8: ["wavelength_resampled"],
9: [
"count_resampled",
"count_err_resampled",
"count_sky_resampled",
],
10: ["sensitivity"],
11: ["flux", "flux_err", "flux_sky"],
12: ["atm_ext"],
13: [
"flux_atm_ext_corrected",
"flux_err_atm_ext_corrected",
"flux_sky_atm_ext_corrected",
],
14: ["telluric_profile"],
15: [
"flux_telluric_corrected",
"flux_err_telluric_corrected",
"flux_sky_telluric_corrected",
],
16: [
"flux_atm_ext_telluric_corrected",
"flux_err_atm_ext_telluric_corrected",
"flux_sky_atm_ext_telluric_corrected",
],
17: ["sensitivity_resampled"],
18: ["flux_resampled", "flux_err_resampled", "flux_sky_resampled"],
19: ["atm_ext_resampled"],
20: [
"flux_resampled_atm_ext_corrected",
"flux_err_resampled_atm_ext_corrected",
"flux_sky_resampled_atm_ext_corrected",
],
21: ["telluric_profile_resampled"],
22: [
"flux_resampled_telluric_corrected",
"flux_err_resampled_telluric_corrected",
"flux_sky_resampled_telluric_corrected",
],
23: [
"flux_resampled_atm_ext_telluric_corrected",
"flux_err_resampled_atm_ext_telluric_corrected",
"flux_sky_resampled_atm_ext_telluric_corrected",
],
}
self.header = {}
for d1, d2 in zip(
self.hdu_name.values(), self.hdu_derscription.values()
):
self.header[d1] = d2
# The order in which HDUs are arranged
self.hdu_order = {v: k for k, v in self.hdu_name.items()}
# The HDU availability
self.hdu_content = dict.fromkeys(self.hdu_order, False)
# Set the counters of HDUs
number_of_hdus = {
1: 2,
2: 3,
3: 1,
4: 1,
5: 2,
6: 1,
7: 1,
8: 1,
9: 3,
10: 1,
11: 3,
12: 1,
13: 3,
14: 1,
15: 3,
16: 3,
17: 1,
18: 3,
19: 1,
20: 3,
21: 1,
22: 3,
23: 3,
}
self.n_hdu = {}
for d1, d2 in zip(self.hdu_name.values(), number_of_hdus.values()):
self.n_hdu[d1] = d2
[docs]
def merge(self, spectrum_oned, overwrite: bool = False):
"""
This function copies all the info from the supplied spectrum_oned to
this one, including the spec_id.
Parameters
----------
spectrum_oned: SpectrumOneD object
The source SpectrumOneD to be deep copied over.
overwrite: boolean (Default: False)
Set to True to overwrite all the data in this SpectrumOneD.
"""
for attr, value in self.__dict__.items():
if attr == "spec_id":
if getattr(spectrum_oned, attr) != 0:
setattr(self, attr, value)
if getattr(self, attr) is None or []:
setattr(self, attr, getattr(spectrum_oned, attr))
if overwrite:
setattr(self, attr, getattr(spectrum_oned, attr))
else:
# if not overwrite, do nothing
pass
[docs]
def add_standard_header(
self, header: Union[fits.Header, list, np.ndarray]
):
"""
Add a header for the standard star that it is flux calibrated
against. The, if opted for, automatic atmospheric extinction
correction is applied based on the airmass found in this header.
Typically put the header of the raw 2D spectral image of the
standard star here. Some automated operations rely on reading
from the header.
Parameters
----------
header: astropy.io.fits.Header object
"""
if header is not None:
if isinstance(header, fits.Header):
self.standard_header = header
self.logger.info("standard_header is stored.")
elif isinstance(header[0], fits.Header):
self.standard_header = header[0]
self.logger.info("standard_header is stored.")
else:
self.logger.error(
"Unsupported type of header is provided: {}. "
"Only astropy.io.fits.Header or such in a list or array "
"can be accepted.".format(type(header))
)
[docs]
def remove_standard_header(self):
"""
Remove the FITS header of the standard.
"""
self.standard_header = None
[docs]
def add_trace(
self,
trace: Union[list, np.ndarray],
trace_sigma: Union[list, np.ndarray],
effective_pixel: Union[list, np.ndarray] = None,
):
"""
Add the trace of the spectrum.
Parameters
----------
trace: list or numpy.ndarray (N)
The spatial pixel value (can be sub-pixel) of the trace at each
spectral position.
trace_sigma: list or numpy.ndarray (N)
Standard deviation of the Gaussian profile of a trace
effective_pixel: list or numpy array (Default: None)
The pixel position of the trace in the dispersion direction.
This should be provided if you wish to override the default
range (num_pix), for example, in the case of accounting for chip
gaps (10 pixels) in a 3-CCD setting, you should provide
[0,1,2,...90, 100,101,...190, 200,201,...290]
"""
assert isinstance(
trace, (list, np.ndarray)
), "trace has to be a list or a numpy array"
assert isinstance(
trace_sigma, (list, np.ndarray)
), "trace_sigma has to be a list or a numpy array"
assert len(trace_sigma) == len(trace), "trace and trace_sigma have to "
" be the same size."
if effective_pixel is None:
effective_pixel = list(np.arange(len(trace)).astype("int"))
else:
assert isinstance(
effective_pixel, (list, np.ndarray)
), "effective_pixel has to be a list or a numpy array"
assert len(effective_pixel) == len(
trace
), "trace and effective_pixel have "
"to be the same size."
pixel_mapping_itp = itp.interp1d(
np.arange(len(trace)),
effective_pixel,
kind="cubic",
fill_value="extrapolate",
)
# Only add if all assertions are passed.
self.trace = trace
self.trace_sigma = trace_sigma
self.len_trace = len(trace)
self.add_effective_pixel(effective_pixel)
self.add_pixel_mapping_itp(pixel_mapping_itp)
[docs]
def remove_trace(self):
"""
Remove the trace, trace_sigma and len_trace, also remove the effective_pixel
and the interpolation function that maps the pixel values to the
effective pixel values.
"""
self.trace = None
self.trace_sigma = None
self.len_trace = None
self.remove_effective_pixel()
self.remove_pixel_mapping_itp()
[docs]
def add_aperture(
self,
widthdn: int,
widthup: int,
sepdn: int,
sepup: int,
skywidthdn: int,
skywidthup: int,
):
"""
The size of the aperture in which the spectrum is extracted. This is
merely the limit where extraction is performed, it does not hold the
information of the weighting::
................................. ^
Sky ................................. | skywidthup
................................. v
................................. ^
................................. | sepup
................................. v
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ^
--------------------------------- | widthup
Spectrum ================================= v ^
--------------------------------- | widthdn
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ v
................................. ^
................................. | sepdn
................................. v
................................. ^
Sky ................................. | skywidthdn
................................. v
Parameters
----------
widthdn: real positive number
The aperture size on the bottom side of the spectrum.
widthup: real positive number
The aperture size on the top side of the spectrum.
sepdn: real positive number
The gap between the spectrum and the sky region on the bottom
side of the spectrum.
sepup: real positive number
The gap between the spectrum and the sky region on the top
side of the spectrum.
skywidthdn: real positive number
The sky region on the bottom side of the spectrum.
skywidthup: real positive number
The sky region on the top side of the spectrum.
"""
assert np.isfinite(widthdn), "widthdn has to be finite."
assert np.isfinite(widthup), "widthup has to be finite."
assert np.isfinite(sepdn), "sepdn has to be finite."
assert np.isfinite(sepup), "sepup has to be finite."
assert np.isfinite(skywidthdn), "skywidthdn has to be finite."
assert np.isfinite(skywidthup), "skywidthup has to be finite."
self.widthdn = widthdn
self.widthup = widthup
self.sepdn = sepdn
self.sepup = sepup
self.skywidthdn = skywidthdn
self.skywidthup = skywidthup
[docs]
def add_count(
self,
count: Union[list, np.ndarray],
count_err: Union[list, np.ndarray] = None,
count_sky: Union[list, np.ndarray] = None,
):
"""
Add the photoelectron counts and the associated optional
uncertainty and sky counts.
Parameters
----------
count: 1-d array
The summed count at each column along the trace.
count_err: 1-d array
the uncertainties of the count values
count_sky: 1-d array
The integrated sky values along each column, suitable for
subtracting from the output of ap_extract
"""
# Check data type
assert isinstance(
count, (list, np.ndarray)
), "count has to be a list or a numpy array"
if count_err is not None:
assert isinstance(
count_err, (list, np.ndarray)
), "count_err has to be a list or a numpy array"
assert len(count_err) == len(
count
), "count_err has to be the same size as count"
if count_sky is not None:
assert isinstance(
count_sky, (list, np.ndarray)
), "count_sky has to be a list or a numpy array"
assert len(count_sky) == len(
count
), "count_sky has to be the same size as count"
self.count = count
# Only add if they are provided
if count_err is not None:
self.count_err = count_err
else:
self.count_err = np.zeros_like(self.count)
if count_sky is not None:
self.count_sky = count_sky
else:
self.count_sky = np.zeros_like(self.count)
if self.effective_pixel is None:
effective_pixel = list(np.arange(len(count)).astype("int"))
self.add_effective_pixel(effective_pixel)
else:
assert len(self.effective_pixel) == len(count), (
"count and effective_pixel have " + "to be the same size."
)
[docs]
def remove_count(self):
"""
Remove the count, count_err and count_sky.
"""
self.count = None
self.count_err = None
self.count_sky = None
[docs]
def add_variances(self, var: Union[list, np.ndarray]):
"""
Add the weight map of the extraction.
Parameters
----------
var: 2-d array
The weigh map of the input image for extraction.
"""
self.var = var
[docs]
def remove_variances(self):
"""
Remove the variances.
"""
self.var = None
[docs]
def add_line_spread_profile_upsampled(
self, line_spread_profile_upsampled: astropy.modeling.Fittable1DModel
):
"""
Add the empirical line spread profile as measured from the upsampled image.
Parameters
----------
profile_func: a fitted astropy.modeling.Fittable1DModel
The fitted trace profile.
"""
self.line_spread_profile_upsampled = line_spread_profile_upsampled
[docs]
def remove_line_spread_profile_upsampled(self):
"""
Remove the fitted trace profile.
"""
self.line_spread_profile_upsampled = None
[docs]
def add_line_spread_profile(
self, line_spread_profile: astropy.modeling.Fittable1DModel
):
"""
Add the empirical line spread profile as measured.
Parameters
----------
profile_func: a fitted astropy.modeling.Fittable1DModel
The fitted trace profile.
"""
self.line_spread_profile = line_spread_profile
[docs]
def remove_line_spread_profile(self):
"""
Remove the fitted trace profile.
"""
self.line_spread_profile = None
[docs]
def add_profile_func(self, profile_func: Callable):
"""
Add the fitted trace profile.
Parameters
----------
profile_func: a fitted astropy.modeling.Fittable1DModel
The fitted trace profile.
"""
self.profile_func = profile_func
[docs]
def remove_profile_func(self):
"""
Remove the fitted trace profile.
"""
self.profile_func = None
[docs]
def add_profile(self, profile: Union[list, np.ndarray]):
"""
Add the extraction profile (generated from the profile_func).
Parameters
----------
profile: 1-d array
The weight function of the extraction profile.
"""
self.profile = profile
[docs]
def remove_profile(self):
"""
Remove the extraction profile.
"""
self.profile = None
[docs]
def add_arc_spec(self, arc_spec: Union[list, np.ndarray]):
"""
Add the extracted 1D spectrum of the arc.
Parameters
----------
arc_spec: 1-d array
The photoelectron count of the spectrum of the arc lamp.
"""
assert isinstance(
arc_spec, (list, np.ndarray)
), "arc_spec has to be a list or a numpy array"
self.arc_spec = arc_spec
[docs]
def remove_arc_spec(self):
"""
Remove the 1D spectrum of the arc.
"""
self.arc_spec = None
[docs]
def add_effective_pixel(self, effective_pixel: Union[list, np.ndarray]):
"""
Add the pixel list, which contain the effective pixel values that
has accounted for chip gaps and non-integer pixel value.
Parameters
----------
effective_pixel: 1-d array
The effective position of the pixel.
"""
assert isinstance(
effective_pixel, (list, np.ndarray)
), "effective_pixel has to be a list or a numpy array"
self.effective_pixel = effective_pixel
[docs]
def remove_effective_pixel(self):
"""
Remove the pixel list.
"""
self.effective_pixel = None
[docs]
def add_pixel_mapping_itp(self, pixel_mapping_itp: Callable):
"""
Add the interpolated callable function that convert raw pixel
values into effective pixel values.
Parameters
----------
pixel_mapping_itp: callable function
The mapping function for raw-effective pixel position.
"""
assert callable(
pixel_mapping_itp
), "pixel_mapping_itp has to be a Callable function."
self.pixel_mapping_itp = pixel_mapping_itp
[docs]
def remove_pixel_mapping_itp(self):
"""
Remove the interpolation function that maps the pixel values to the
effective pixel value.
"""
self.pixel_mapping_itp = None
[docs]
def add_peaks(self, peaks: Union[list, np.ndarray]):
"""
Add the pixel values (int) where arc lines are located.
Parameters
----------
peaks: 1-d array
The pixel value of the identified peaks.
"""
assert isinstance(
peaks, (list, np.ndarray)
), "peaks has to be a list or a numpy array"
self.peaks = peaks
[docs]
def remove_peaks(self):
"""
Remove the list of peaks.
"""
self.peaks = None
[docs]
def add_peaks_refined(self, peaks_refined: Union[list, np.ndarray]):
"""
Add the refined pixel values (float) where arc lines are located.
Parameters
----------
peaks: 1-d array
The pixel value of the refined peak positions.
"""
assert isinstance(
peaks_refined, (list, np.ndarray)
), "peaks_refined has to be a list or a numpy array"
self.peaks_refined = peaks_refined
[docs]
def remove_peaks_refined(self):
"""
Remove the list of refined peaks.
"""
self.peaks_refined = None
[docs]
def add_peaks_wave(self, peaks_wave: Union[list, np.ndarray]):
"""
Add the wavelength (Angstrom) of the arc lines.
Parameters
----------
peaks: 1-d array
The wavelength value of the peaks.
"""
assert isinstance(
peaks_wave, (list, np.ndarray)
), "peaks_wave has to be a list or a numpy array"
self.peaks_wave = peaks_wave
[docs]
def remove_peaks_wave(self):
"""
Remove the list of wavelengths of the arc lines.
"""
self.peaks_wave = None
[docs]
def add_calibrator(self, calibrator: Calibrator):
"""
Add a RASCAL Calibrator object.
Parameters
----------
calibrator: rascal.Calibrator()
A RASCAL Calibrator object.
"""
assert isinstance(calibrator, Calibrator)
self.calibrator = calibrator
[docs]
def remove_calibrator(self):
"""
Remove the Calibrator.
"""
self.calibrator = None
[docs]
def add_atlas_wavelength_range(
self, min_atlas_wavelength: float, max_atlas_wavelength: float
):
"""
Add the allowed range of wavelength calibration.
Parameters
----------
min_atlas_wavelength: float
The minimum wavelength of the atlas.
max_atlas_wavelength: float
The maximum wavelength of the atlas.
"""
assert np.isfinite(
min_atlas_wavelength
), "min_atlas_wavelength has to be finite."
assert np.isfinite(
max_atlas_wavelength
), "max_atlas_wavelength has to be finite."
self.min_atlas_wavelength = min_atlas_wavelength
self.max_atlas_wavelength = max_atlas_wavelength
[docs]
def remove_atlas_wavelength_range(self):
"""
Remove the atlas wavelength range.
"""
self.min_atlas_wavelength = None
self.max_atlas_wavelength = None
[docs]
def add_min_atlas_intensity(self, min_atlas_intensity: float):
"""
Add the minimum allowed line intensity (theoretical NIST value)
that were used for wavelength calibration.
Parameters
----------
min_atlas_intensity: float
The minimum line strength used to get the atlas.
"""
assert np.isfinite(
min_atlas_intensity
), "min_atlas_intensity has to be finite."
self.min_atlas_intensity = min_atlas_intensity
[docs]
def remove_min_atlas_intensity(self):
"""
Remove the minimum atlas intensity.
"""
self.min_atlas_intensity = None
[docs]
def add_min_atlas_distance(self, min_atlas_distance: float):
"""
Add the minimum allowed line distance (only consider lines that
passed the minimum intensity threshold) that were used for
wavelength calibration.
Parameters
----------
min_atlas_distance: float
Minimum wavelength separation between neighbouring lines.
"""
assert np.isfinite(
min_atlas_distance
), "min_atlas_distance has to be finite."
self.min_atlas_distance = min_atlas_distance
[docs]
def remove_min_atlas_distance(self):
"""
Remove the minimum distance between lines to be accepted.
"""
self.min_atlas_distance = None
[docs]
def add_gain(self, gain: float):
"""
Add the gain value of the spectral image. This value can be
different from the one in the header as this can be overwritten
by an user input, while the header value is raw.
Parameters
----------
gain: float
The gain value of the detector.
"""
assert np.isfinite(gain), "gain has to be finite."
self.gain = gain
[docs]
def remove_gain(self):
"""
Remove the gain value.
"""
self.gain = None
[docs]
def add_readnoise(self, readnoise: float):
"""
Add the readnoise value of the spectral image. This value can be
different from the one in the header as this can be overwritten
by an user input, while the header value is raw.
Parameters
----------
readnoise: float
The read noise value of the detector.
"""
assert np.isfinite(readnoise), "readnoise has to be finite."
self.readnoise = readnoise
[docs]
def remove_readnoise(self):
"""
Remove the readnoise value.
"""
self.readnoise = None
[docs]
def add_exptime(self, exptime: float):
"""
Add the exposure time of the spectral image. This value can be
different from the one in the header as this can be overwritten
by an user input, while the header value is raw.
Parameters
----------
exptime: float
The exposure time of the input image.
"""
assert np.isfinite(exptime), "exptime has to be finite."
self.exptime = exptime
[docs]
def remove_exptime(self):
"""
Remove the exptime value.
"""
self.exptime = None
[docs]
def add_airmass(self, airmass: float):
"""
Add the airmass when the observation was carried out. This value
can be different from the one in the header as this can be
overwritten by an user input, while the header value is raw.
Parameters
----------
airmass: float
The effective airmass during the exposure.
"""
assert np.isfinite(airmass), "airmass has to be finite."
self.airmass = airmass
[docs]
def remove_airmass(self):
"""
Remove the airmass value.
"""
self.airmass = None
[docs]
def add_seeing(self, seeing: float):
"""
Add the seeing when the observation was carried out. This value
can be different from the one in the header as this can be
overwritten by an user input, while the header value is raw.
Parameters
----------
seeing: float
The effective seeing of the observation.
"""
assert np.isfinite(seeing), "airmass has to be finite."
self.seeing = seeing
[docs]
def remove_seeing(self):
"""
Remove the seeing value.
"""
self.seeing = None
[docs]
def add_weather_condition(
self, pressure: float, temperature: float, relative_humidity: float
):
"""
Add the pressure, temperature and relative humidity when the spectral
image was taken. These value can be different from the ones in the
header as this can be overwritten by an user input, while the header
value is raw.
Parameters
----------
pressure: float
The air pressure during the observation (in pascal).
temperature: float
The temperature during the observation (in Kelvin).
relative_hhumidity: float
The relative humidity during the observation (between 0 and 1).
"""
assert np.isfinite(pressure), "pressure has to be finite."
assert np.isfinite(temperature), "temperature has to be finite."
assert np.isfinite(
relative_humidity
), "relative_humidity has to be finite."
self.pressure = pressure
self.temperature = temperature
self.relative_humidity = relative_humidity
[docs]
def remove_weather_condition(self):
"""
Remove the Pressure, Temperature and Humidity.
"""
self.pressure = None
self.temperature = None
self.relative_humidity = None
[docs]
def add_fit_type(self, fit_type: str):
"""
Add the kind of polynomial used for fitting the pixel-to-wavelength
function.
Parameters
----------
fit_type: str
The type of polynomial for fitting the pixel-wavelength function.
"""
assert type(fit_type) == str, "fit_type has to be a string"
assert fit_type in ["poly", "leg", "cheb"], "fit_type must be "
"(1) poly(nomial); (2) leg(endre); or (3) cheb(yshev)."
self.fit_type = fit_type
[docs]
def remove_fit_type(self):
"""
Remove the chosen type of polynomial.
"""
self.fit_type = None
[docs]
def add_fit_coeff(self, fit_coeff: Union[list, np.ndarray]):
"""
Add the polynomial co-efficients of the pixel-to-wavelength
function. Note that this overwrites the wavelength calibrated
fit coefficients.
Parameters
----------
fit_coeff: list or 1-d array
The set of coefficients of the pixel-wavelength function.
"""
assert isinstance(
fit_coeff, (list, np.ndarray)
), "fit_coeff has to be a list or a numpy array."
self.fit_coeff = fit_coeff
[docs]
def remove_fit_coeff(self):
"""
Remove the polynomial co-efficients of the pixel-to-wavelength
function.
"""
self.fit_coeff = None
[docs]
def add_calibrator_properties(
self,
num_pix: int,
effective_pixel: Union[list, np.ndarray],
plotting_library: str,
):
"""
Add the properties of the RASCAL Calibrator.
Parameters
----------
num_pix: int
The number of pixels in the dispersion direction
effective_pixel: list or numpy array
The pixel position of the trace in the dispersion direction.
This should be provided if you wish to override the default
range(num_pix), for example, in the case of accounting
for chip gaps (10 pixels) in a 3-CCD setting, you should provide
[0,1,2,...90, 100,101,...190, 200,201,...290]
plotting_library : string
Choose between matplotlib and plotly.
"""
self.num_pix = num_pix
self.plotting_library = plotting_library
self.effective_pixel = effective_pixel
[docs]
def remove_calibrator_properties(self):
"""
Remove the properties of the RASCAL Calibrator as recorded in ASPIRED.
This does NOT remove the calibrator properites INSIDE the calibrator.
"""
self.num_pix = None
self.effective_pixel = None
self.plotting_library = None
[docs]
def add_hough_properties(
self,
num_slopes: int,
xbins: int,
ybins: int,
min_wavelength: float,
max_wavelength: float,
range_tolerance: float,
linearity_tolerance: float,
):
"""
Add the hough transform configuration of the RASCAL Calibrator.
Parameters
----------
num_slopes: int
Number of slopes to consider during Hough transform
xbins: int
Number of bins for Hough accumulation
ybins: int
Number of bins for Hough accumulation
min_wavelength: float
Minimum wavelength of the spectrum.
max_wavelength: float
Maximum wavelength of the spectrum.
range_tolerance: float
Estimation of the error on the provided spectral range
e.g. 3000-5000 with tolerance 500 will search for
solutions that may satisfy 2500-5500
linearity_tolerance: float
A toleranceold (Ansgtroms) which defines some padding around the
range tolerance to allow for non-linearity. This should be the
maximum expected excursion from linearity.
"""
self.num_slopes = num_slopes
self.xbins = xbins
self.ybins = ybins
self.min_wavelength = min_wavelength
self.max_wavelength = max_wavelength
self.range_tolerance = range_tolerance
self.linearity_tolerance = linearity_tolerance
[docs]
def remove_hough_properties(self):
"""
Remove the hough transform configuration of the RASCAL Calibrator.
This does NOT remove the hough transform configuration INSIDE the
calibrator.
"""
self.num_slopes = None
self.xbins = None
self.ybins = None
self.min_wavelength = None
self.max_wavelength = None
self.range_tolerance = None
self.linearity_tolerance = None
[docs]
def add_ransac_properties(
self,
sample_size: int,
top_n_candidate: int,
linear: bool,
filter_close: bool,
ransac_tolerance: float,
candidate_weighted: bool,
hough_weight: float,
minimum_matches: int,
minimum_peak_utilisation: float,
minimum_fit_error: float,
):
"""
Add the RANSAC properties of the RASCAL Calibrator.
Parameters
----------
sample_size: int
Number of pixel-wavelength hough pairs to be used for each arc line
being picked.
top_n_candidate: int
Top ranked lines to be fitted.
linear: bool
True to use the hough transformed gradient, otherwise, use the
known polynomial.
filter_close: bool
Remove the pairs that are out of bounds in the hough space.
ransac_tolerance: float
The distance criteria (Angstroms) to be considered an inlier to a
fit. This should be close to the size of the expected residuals on
the final fit (e.g. 1A is typical)
candidate_weighted: bool
Set to True to down-weight pairs that are far from the fit.
hough_weight: float or None
Set to use the hough space to weigh the fit. The theoretical
optimal weighting is unclear. The larger the value, the heavily it
relies on the overdensity in the hough space for a good fit.
minimum_matches: int
Minimum number of matches to accept the solution.
minimum_peak_utilisation: float
Minimum percentage of peaks used in the solution.
minimum_fit_error: float
Minimum fitting error. Probably only useful in simulated noiseless
case.
"""
self.sample_size = sample_size
self.top_n_candidate = top_n_candidate
self.linear = linear
self.filter_close = filter_close
self.ransac_tolerance = ransac_tolerance
self.candidate_weighted = candidate_weighted
self.hough_weight = hough_weight
self.minimum_matches = minimum_matches
self.minimum_peak_utilisation = minimum_peak_utilisation
self.minimum_fit_error = minimum_fit_error
[docs]
def remove_ransac_properties(self):
"""
Remove the RANSAC properties of the RASCAL Calibrator.
"""
self.sample_size = None
self.top_n_candidate = None
self.linear = None
self.filter_close = None
self.ransac_tolerance = None
self.candidate_weighted = None
self.hough_weight = None
self.minimum_matches = None
self.minimum_peak_utilisation = None
self.minimum_fit_error = None
[docs]
def add_fit_output_final(
self,
fit_coeff: Union[list, np.ndarray],
matched_peaks: Union[list, np.ndarray],
matched_atlas: Union[list, np.ndarray],
rms: float,
residual: Union[list, np.ndarray],
peak_utilisation: float,
atlas_utilisation: float,
):
"""
Add the final accepted polynomial solution.
Parameters
----------
fit_coeff: list
Set the baseline of the least square fit. If no fits outform this
set of polynomial coefficients, this will be used as the best fit.
matched_peaks: list
List of matched peaks
matched_atlas: list
List of matched atlas lines
rms: float
The root-mean-squared of the fit
residual: list
The residual of each fitted peak
peak_utilisation: float
The fraction of the input peaks used in the fit
atlas_utilisation: float
The fraction of the input atlas used in the fit
"""
# add assertion here
self.fit_coeff = fit_coeff
self.matched_peaks = matched_peaks
self.matched_atlas = matched_atlas
self.rms = rms
self.residual = residual
self.peak_utilisation = peak_utilisation
self.atlas_utilisation = atlas_utilisation
[docs]
def remove_fit_output_final(self):
"""
Remove the accepted polynomial solultion.
"""
self.fit_coeff = None
self.matched_peaks = None
self.matched_atlas = None
self.rms = None
self.residual = None
self.peak_utilisation = None
self.atlas_utilisation = None
[docs]
def add_fit_output_rascal(
self,
fit_coeff: Union[list, np.ndarray],
matched_peaks: Union[list, np.ndarray],
matched_atlas: Union[list, np.ndarray],
rms: float,
residual: Union[list, np.ndarray],
peak_utilisation: float,
atlas_utilisation: float,
):
"""
Add the RASCAL polynomial solution.
Parameters
----------
fit_coeff: list
Set the baseline of the least square fit. If no fits outform this
set of polynomial coefficients, this will be used as the best fit.
matched_peaks: list
List of matched peaks
matched_atlas: list
List of matched atlas lines
rms: float
The root-mean-squared of the fit
residual: list
The residual of each fitted peak
peak_utilisation: float
The fraction of the input peaks used in the fit
atlas_utilisation: float
The fraction of the input atlas used in the fit
"""
# add assertion here
self.fit_coeff_rascal = fit_coeff
self.matched_peaks_rascal = matched_peaks
self.matched_atlas_rascal = matched_atlas
self.rms_rascal = rms
self.residual_rascal = residual
self.peak_utilisation_rascal = peak_utilisation
self.atlas_utilisation_rascal = atlas_utilisation
self.add_fit_output_final(
fit_coeff,
matched_peaks,
matched_atlas,
rms,
residual,
peak_utilisation,
atlas_utilisation,
)
[docs]
def remove_fit_output_rascal(self):
"""
Remove the RASCAL polynomial solution.
"""
self.fit_coeff_rascal = None
self.matched_peaks_rascal = None
self.matched_atlas_rascal = None
self.rms_rascal = None
self.residual_rascal = None
self.peak_utilisation_rascal = None
self.atlas_utilisation_rascal = None
[docs]
def add_fit_output_refine(
self,
fit_coeff: Union[list, np.ndarray],
matched_peaks: Union[list, np.ndarray],
matched_atlas: Union[list, np.ndarray],
rms: float,
residual: Union[list, np.ndarray],
peak_utilisation: float,
atlas_utilisation: float,
):
"""
Add the refined RASCAL polynomial solution.
Parameters
----------
fit_coeff: list
Set the baseline of the least square fit. If no fits outform this
set of polynomial coefficients, this will be used as the best fit.
matched_peaks: list
List of matched peaks
matched_atlas: list
List of matched atlas lines
rms: float
The root-mean-squared of the fit
residual: list
The residual of each fitted peak
peak_utilisation: float
The fraction of the input peaks used in the fit
atlas_utilisation: float
The fraction of the input atlas used in the fit
"""
# add assertion here
self.fit_coeff_refine = fit_coeff
self.matched_peaks_refine = matched_peaks
self.matched_atlas_refine = matched_atlas
self.rms_refine = rms
self.residual_refine = residual
self.peak_utilisation_refine = peak_utilisation
self.atlas_utilisation_refine = atlas_utilisation
self.add_fit_output_final(
fit_coeff,
matched_peaks,
matched_atlas,
rms,
residual,
peak_utilisation,
atlas_utilisation,
)
[docs]
def remove_fit_output_refine(self):
"""
Remove the refined RASCAL polynomial solution.
"""
self.fit_coeff_refine = None
self.matched_peaks_refine = None
self.matched_atlas_refine = None
self.rms_refine = None
self.residual_refine = None
self.peak_utilisation_refine = None
self.atlas_utilisation_refine = None
[docs]
def add_wavelength(self, wave: Union[list, np.ndarray]):
"""
Add the wavelength of each effective pixel.
Parameters
----------
wave: list or 1d-array
The wavelength values at each effective pixel.
"""
self.wave = wave
# Note that the native pixels have varing bin size.
self.wave_bin = np.nanmedian(np.array(np.ediff1d(wave)))
self.wave_start = np.min(wave)
self.wave_end = np.max(wave)
[docs]
def remove_wavelength(self):
"""
Remove the wavelength of each effective pixel.
"""
self.wave = None
[docs]
def add_wavelength_resampled(
self, wave_resampled: Union[list, np.ndarray]
):
"""
Add the wavelength of the resampled spectrum which has an evenly
distributed wavelength spacing.
Parameters
----------
wave: list or 1d-array
The resampled wavelength values.
"""
# We assume that the resampled spectrum has fixed bin size
self.wave_bin = np.nanmedian(np.array(np.ediff1d(wave_resampled)))
self.wave_start = np.min(wave_resampled)
self.wave_end = np.max(wave_resampled)
self.wave_resampled = wave_resampled
[docs]
def remove_wavelength_resampled(self):
"""
Add the resampled wavelength.
"""
self.wave_bin = None
self.wave_start = None
self.wave_end = None
self.wave_resampled = None
[docs]
def add_count_resampled(
self,
count_resampled: Union[list, np.ndarray],
count_err_resampled: Union[list, np.ndarray],
count_sky_resampled: Union[list, np.ndarray],
):
"""
Add the photoelectron counts of the resampled spectrum which has
an evenly distributed wavelength spacing.
Parameters
----------
count_resampled: list or 1d-array
The resampled photoelectron count.
count_err_resampled: list or 1d-array
The uncertainty of the resampled photoelectron count.
count_sky_resampled: list or 1d-array
The background sky level of the resampled photoelectron count.
"""
self.count_resampled = count_resampled
self.count_err_resampled = count_err_resampled
self.count_sky_resampled = count_sky_resampled
[docs]
def remove_count_resampled(self):
"""
Remove the photoelectron counts of the resampled spectrum.
"""
self.count_resampled = None
self.count_err_resampled = None
self.count_sky_resampled = None
[docs]
def add_standard_star(self, library: str, target: str):
"""
Add the name of the standard star and its source.
Parameters
----------
library: str
The name of the colelction of standard stars.
target: str
The name of the standard star.
"""
self.library = library
self.target = target
[docs]
def remove_standard_star(self):
"""
Remove the name of the standard star and its source.
"""
self.library = None
self.target = None
[docs]
def add_smoothing(self, smooth: bool, **kwargs: dict):
"""
Add the SG smoothing parameters for computing the sensitivity curve.
Parameters
----------
smooth: bool
Indicate if smoothing was applied.
kwargs: dict
Other keyword arguments passed to the lowess smoothing function
"""
self.smooth = smooth
self.lowess_kwargs = kwargs
[docs]
def remove_smoothing(self):
"""
Remove the smoothing parameters for computing the sensitivity curve.
"""
self.smooth = None
self.slength = None
self.sorder = None
[docs]
def add_sensitivity_func(self, sensitivity_func: Callable):
"""
Add the callable function of the sensitivity curve.
Parameters
----------
sensitivity_func: callable function
The sensitivity curve as a function of wavelength.
"""
self.sensitivity_func = sensitivity_func
[docs]
def remove_sensitivity_func(self):
"""
Remove the sensitivity curve function.
"""
self.sensitivity_func = None
[docs]
def add_sensitivity(self, sensitivity: Union[list, np.ndarray]):
"""
Add the sensitivity values for each pixel (the list from dividing
the literature standard by the photoelectron count).
Parameters
----------
sensitivity: list or 1-d array
The sensitivity at the effective pixel.
"""
self.sensitivity = sensitivity
[docs]
def remove_sensitivity(self):
"""
Remove the sensitivity values.
"""
self.sensitivity = None
[docs]
def add_sensitivity_resampled(
self, sensitivity_resampled: Union[list, np.ndarray]
):
"""
Add the sensitivity after the spectrum is resampled.
Parameters
----------
sensitivity: list or 1-d array
The sensitivity in the reasmpled space.
"""
self.sensitivity_resampled = sensitivity_resampled
[docs]
def remove_sensitivity_resampled(self):
"""
Remove the sensitivity after the spectrum is resampled.
"""
self.sensitivity_resampled = None
[docs]
def add_literature_standard(
self,
wave_literature: Union[list, np.ndarray],
flux_literature: Union[list, np.ndarray],
):
"""
Add the literature wavelength and flux values of the standard star
used for calibration.
Parameters
----------
wave_literature: list or 1-d array
The wavelength values of the literature standard used.
flux_literature: list or 1-d array
The flux values of the literature standard used.
"""
self.wave_literature = wave_literature
self.flux_literature = flux_literature
[docs]
def remove_literature_standard(self):
"""
Remove the literature wavelength and flux values of the standard
star used for calibration
"""
self.wave_literature = None
self.flux_literature = None
[docs]
def add_count_continuum(self, count_continuum: Union[list, np.ndarray]):
"""
Add the continuum count value (should be the same size as count).
Parameters
----------
count_continuum: list or 1-d array
The photoelectron count of the continuum.
"""
self.count_continuum = count_continuum
[docs]
def remove_count_continuum(self):
"""
Remove the continuum count values.
"""
self.count_continuum = None
[docs]
def add_count_resampled_continuum(
self, count_resampled_continuum: Union[list, np.ndarray]
):
"""
Add the continuum count_resampled value (should be the same size as
count_resampled).
Parameters
----------
count_resampled_continuum: list or 1-d array
The photoelectron count of the continuum at the resampled
wavelength.
"""
self.count_resampled_continuum = count_resampled_continuum
[docs]
def remove_count_resampled_continuum(self):
"""
Remove the continuum count_resampled values.
"""
self.count_resampled_continuum = None
[docs]
def add_flux_continuum(self, flux_continuum: Union[list, np.ndarray]):
"""
Add the continuum flux value (should be the same size as flux).
Parameters
----------
flux_continuum: list or 1-d array
The flux of the continuum.
"""
self.flux_continuum = flux_continuum
[docs]
def remove_flux_continuum(self):
"""
Remove the continuum flux values.
"""
self.flux_continuum = None
[docs]
def add_telluric_func(self, telluric_func: Callable):
"""
Add the Telluric interpolated function.
Parameters
----------
telluric: Callable function
A callable function to interpolate the telluric features [0, 1]
"""
self.telluric_func = telluric_func
[docs]
def remove_telluric_func(self):
"""
Remove the Telluric interpolated function.
"""
self.telluric_func = None
[docs]
def add_telluric_profile(self, telluric_profile: Union[list, np.ndarray]):
"""
Add the Telluric profile - relative intensity at each pixel.
Parameters
----------
telluric_profile: list or numpy.ndarray
The relative intensity of the telluric absorption strength [0, 1]
"""
self.telluric_profile = telluric_profile
[docs]
def remove_telluric_profile(self):
"""
Remove the Telluric profile.
"""
self.telluric_profile = None
[docs]
def add_telluric_factor(self, telluric_factor: float):
"""
Add the Telluric factor.
Parameters
----------
telluric_factor: float
The multiplier to the telluric profile that minimises the sum
of the deviation of corrected spectrum from the continuum
spectrum.
"""
self.telluric_factor = telluric_factor
[docs]
def remove_telluric_factor(self):
"""
Remove the Telluric factor.
"""
self.telluric_factor = None
[docs]
def add_telluric_nudge_factor(self, telluric_nudge_factor: float):
"""
Add the Telluric nudge factor.
Parameters
----------
telluric_nudge_factor: float
The multiplier to the telluric profile that minimises the sum
of the deviation of corrected spectrum from the continuum
spectrum.
"""
self.telluric_nudge_factor = telluric_nudge_factor
[docs]
def remove_telluric_nudge_factor(self):
"""
Remove the Telluric nudge factor.
"""
self.telluric_nudge_factor = None
[docs]
def add_flux(
self,
flux: Union[list, np.ndarray],
flux_err: Union[list, np.ndarray],
flux_sky: Union[list, np.ndarray],
):
"""
Add the flux and the associated uncertainty and sky background
in the raw pixel sampling.
Parameters
----------
flux: list or numpy.ndarray
The flux at each extracted column of pixels.
flux_err: list or numpy.ndarray
The uncertainty in flux at each extracted column of pixels.
flux_sky: list or numpy.ndarray
The flux of the background sky at each extracted column of pixels.
"""
self.flux = flux
self.flux_err = flux_err
self.flux_sky = flux_sky
[docs]
def remove_flux(self):
"""
Remove the flux, uncertainty of flux, and background sky flux.
"""
self.flux = None
self.flux_err = None
self.flux_sky = None
[docs]
def add_atm_ext(self, atm_ext: Union[list, np.ndarray]):
"""
Add the atmospheric extinction correction factor in the native
wavelengths.
Parameters
----------
atm_ext: list or numpy.ndarray
The atmospheric absorption corrected flux at each extracted
column of pixels.
"""
self.atm_ext = atm_ext
[docs]
def remove_atm_ext(self):
"""
Remove the atmospheric extinction correction factor in the native
wavelengthss.
"""
self.atm_ext = None
[docs]
def add_flux_atm_ext_corrected(
self,
flux: Union[list, np.ndarray],
flux_err: Union[list, np.ndarray],
flux_sky: Union[list, np.ndarray],
):
"""
Add the atmospheric extinction corrected flux and the associated
uncertainty and sky background in the raw pixel sampling.
Parameters
----------
flux: list or numpy.ndarray
The atmospheric absorption corrected flux at each extracted
column of pixels.
flux_err: list or numpy.ndarray
The atmospheric absorption corrected uncertainty in flux at each
extracted column of pixels.
flux_sky: list or numpy.ndarray
The atmospheric absorption corrected flux of the background sky
at each extracted column of pixels.
"""
self.flux_atm_ext_corrected = flux
self.flux_err_atm_ext_corrected = flux_err
self.flux_sky_atm_ext_corrected = flux_sky
[docs]
def remove_flux_atm_ext_corrected(self):
"""
Remove the atmospheric absorption corrected flux, uncertainty of flux,
and background sky flux in the raw pixel sampling.
"""
self.flux_atm_ext_corrected = None
self.flux_err_atm_ext_corrected = None
self.flux_sky_atm_ext_corrected = None
[docs]
def add_flux_telluric_corrected(
self,
flux: Union[list, np.ndarray],
flux_err: Union[list, np.ndarray],
flux_sky: Union[list, np.ndarray],
):
"""
Add the telluric flux and the associated uncertainty and sky background
in the raw pixel sampling.
Parameters
----------
flux: list or numpy.ndarray
The atmospheric absorption corrected flux at each extracted
column of pixels.
flux_err: list or numpy.ndarray
The atmospheric absorption corrected uncertainty in flux at each
extracted column of pixels.
flux_sky: list or numpy.ndarray
The atmospheric absorption corrected flux of the background sky
at each extracted column of pixels.
"""
self.flux_telluric_corrected = flux
self.flux_err_telluric_corrected = flux_err
self.flux_sky_telluric_corrected = flux_sky
[docs]
def remove_flux_telluric_corrected(self):
"""
Remove the telluric corrected flux, uncertainty of flux,
and background sky flux in the raw pixel sampling.
"""
self.flux_telluric_corrected = None
self.flux_err_telluric_corrected = None
self.flux_sky_telluric_corrected = None
[docs]
def add_flux_atm_ext_telluric_corrected(
self,
flux: Union[list, np.ndarray],
flux_err: Union[list, np.ndarray],
flux_sky: Union[list, np.ndarray],
):
"""
Add the atmospheric extinction and telluric corrected flux and the
associated uncertainty and sky background in the raw pixel sampling.
Parameters
----------
flux: list or numpy.ndarray
The atmospheric absorption corrected flux at each extracted
column of pixels.
flux_err: list or numpy.ndarray
The atmospheric absorption corrected uncertainty in flux at each
extracted column of pixels.
flux_sky: list or numpy.ndarray
The atmospheric absorption corrected flux of the background sky
at each extracted column of pixels.
"""
self.flux_atm_ext_telluric_corrected = flux
self.flux_err_atm_ext_telluric_corrected = flux_err
self.flux_sky_atm_ext_telluric_corrected = flux_sky
[docs]
def remove_flux_atm_ext_telluric_corrected(self):
"""
Remove the atmospheric absorption and telluric corrected flux,
uncertainty of flux, and background sky flux in the raw pixel
sampling.
"""
self.flux_atm_ext_telluric_corrected = None
self.flux_err_atm_ext_telluric_corrected = None
self.flux_sky_atm_ext_telluric_corrected = None
[docs]
def add_flux_resampled(
self,
flux_resampled: Union[list, np.ndarray],
flux_err_resampled: Union[list, np.ndarray],
flux_sky_resampled: Union[list, np.ndarray],
):
"""
Add the flux and the associated uncertainty and sky background
of the resampled spectrum.
Parameters
----------
flux: list or numpy.ndarray
The flux at the resampled wavelenth.
flux_err: list or numpy.ndarray
The uncertainty in flux at the resampled wavelenth.
flux_sky: list or numpy.ndarray
The flux of the background sky at the resampled wavelenth.
"""
self.flux_resampled = flux_resampled
self.flux_err_resampled = flux_err_resampled
self.flux_sky_resampled = flux_sky_resampled
[docs]
def remove_flux_resampled(self):
"""
Remove the flux, uncertainty of flux, and background sky flux
of the resampled spectrum.
"""
self.flux_resampled = None
self.flux_err_resampled = None
self.flux_sky_resampled = None
[docs]
def add_atm_ext_resampled(
self, atm_ext_resampled: Union[list, np.ndarray]
):
"""
Add the atmospheric extinction correction factor in the resampled
wavelengths.
Parameters
----------
atm_ext_resampled: list or numpy.ndarray
The atmospheric absorption corrected flux at the resampled
wavelengths.
"""
self.atm_ext_resampled = atm_ext_resampled
[docs]
def remove_atm_ext_resampled(self):
"""
Remove the atmospheric extinction correction factor in the resampled
wavelengths.
"""
self.atm_ext_resampled = None
[docs]
def add_flux_resampled_atm_ext_corrected(
self,
flux: Union[list, np.ndarray],
flux_err: Union[list, np.ndarray],
flux_sky: Union[list, np.ndarray],
):
"""
Add the flux and the associated uncertainty and sky background
of the resampled spectrumg.
Parameters
----------
flux: list or numpy.ndarray
The atmospheric absorption corrected flux at each extracted
column of pixels.
flux_err: list or numpy.ndarray
The atmospheric absorption corrected uncertainty in flux at each
extracted column of pixels.
flux_sky: list or numpy.ndarray
The atmospheric absorption corrected flux of the background sky
at each extracted column of pixels.
"""
self.flux_resampled_atm_ext_corrected = flux
self.flux_err_resampled_atm_ext_corrected = flux_err
self.flux_sky_resampled_atm_ext_corrected = flux_sky
[docs]
def remove_flux_resampled_atm_ext_corrected(self):
"""
Remove the atmospheric absorption corrected flux, uncertainty of flux,
and background sky flux of the resampled spectrum.
"""
self.flux_resampled_atm_ext_corrected = None
self.flux_err_resampled_atm_ext_corrected = None
self.flux_sky_resampled_atm_ext_corrected = None
[docs]
def add_telluric_profile_resampled(
self, telluric_profile_resampled: Union[list, np.ndarray]
):
"""
Add the telluric absorption profile in the resampled wavelengths.
Parameters
----------
telluric_profile_resampled: list or numpy.ndarray
The atmospheric absorption corrected flux at each extracted
column of pixels.
"""
self.telluric_profile_resampled = telluric_profile_resampled
[docs]
def remove_telluric_profile_resampled(self):
"""
Remove the telluric absorption profile in the resampled wavelengthss.
"""
self.telluric_profile_resampled = None
[docs]
def add_flux_resampled_telluric_corrected(
self,
flux: Union[list, np.ndarray],
flux_err: Union[list, np.ndarray],
flux_sky: Union[list, np.ndarray],
):
"""
Add the flux and the associated uncertainty and sky background
of the resampled spectrum.
Parameters
----------
flux: list or numpy.ndarray
The telluric corrected flux at each extracted
column of pixels.
flux_err: list or numpy.ndarray
The telluric corrected uncertainty in flux at each
extracted column of pixels.
flux_sky: list or numpy.ndarray
The telluric corrected flux of the background sky
at each extracted column of pixels.
"""
self.flux_resampled_telluric_corrected = flux
self.flux_err_resampled_telluric_corrected = flux_err
self.flux_sky_resampled_telluric_corrected = flux_sky
[docs]
def remove_flux_resampled_telluric_corrected(self):
"""
Remove the atmospheric absorption corrected flux, uncertainty of flux,
and background sky flux of the resampled spectrum.
"""
self.flux_resampled_telluric_corrected = None
self.flux_err_resampled_telluric_corrected = None
self.flux_sky_resampled_telluric_corrected = None
[docs]
def add_flux_resampled_atm_ext_telluric_corrected(
self,
flux: Union[list, np.ndarray],
flux_err: Union[list, np.ndarray],
flux_sky: Union[list, np.ndarray],
):
"""
Add the flux and the associated uncertainty and sky background
of the resampled spectrum.
Parameters
----------
flux: list or numpy.ndarray
The atmospheric absorption and telluric corrected flux at each
extracted column of pixels.
flux_err: list or numpy.ndarray
The atmospheric absorption and telluric corrected uncertainty in
flux at each extracted column of pixels.
flux_sky: list or numpy.ndarray
The atmospheric absorption and telluric corrected flux of the
background sky at each extracted column of pixels.
"""
self.flux_resampled_atm_ext_telluric_corrected = flux
self.flux_err_resampled_atm_ext_telluric_corrected = flux_err
self.flux_sky_resampled_atm_ext_telluric_corrected = flux_sky
[docs]
def remove_flux_resampled_atm_ext_telluric_corrected(self):
"""
Remove the atmospheric absorption and telluric corrected flux,
uncertainty of flux, and background sky flux of the resampled
spectrum.
"""
self.flux_resampled_atm_ext_telluric_corrected = None
self.flux_err_resampled_atm_ext_telluric_corrected = None
self.flux_sky_resampled_atm_ext_telluric_corrected = None
def _modify_imagehdu_data(
self, hdulist: list, idx: int, method: str, *args: str
):
"""
Wrapper function to modify the data of an ImageHDU object.
"""
method_to_call = getattr(hdulist[idx].data, method)
method_to_call(*args)
def _modify_imagehdu_header(
self, hdulist: list, idx: int, method: str, *args: str
):
"""
Wrapper function to modify the header of an ImageHDU object.
e.g.
method = 'set'
args = 'BUNIT', 'Angstroms'
method_to_call(*args) becomes hdu[idx].header.set('BUNIT', 'Angstroms')
"""
method_to_call = getattr(hdulist[idx].header, method)
method_to_call(*args)
[docs]
def create_trace_fits(self):
"""
Create an ImageHDU for the trace.
"""
try:
# Use the header of the spectrum
if self.spectrum_header is not None:
trace_ImageHDU = fits.ImageHDU(
self.trace, header=self.spectrum_header
)
trace_sigma_ImageHDU = fits.ImageHDU(
self.trace_sigma, header=self.spectrum_header
)
else:
trace_ImageHDU = fits.ImageHDU(self.trace)
trace_sigma_ImageHDU = fits.ImageHDU(self.trace_sigma)
# Create an empty HDU list and populate with ImageHDUs
self.trace_hdulist = fits.HDUList()
self.trace_hdulist += [trace_ImageHDU]
self.trace_hdulist += [trace_sigma_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["trace"]]
# Add the trace
self.modify_trace_header(0, "set", "EXTNAME", hdu_names[0])
self.modify_trace_header(0, "set", "LABEL", "Trace")
self.modify_trace_header(0, "set", "CRPIX1", 1)
self.modify_trace_header(0, "set", "CDELT1", 1)
self.modify_trace_header(
0, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_trace_header(0, "set", "CTYPE1", "Pixel (Dispersion)")
self.modify_trace_header(0, "set", "CUNIT1", "Pixel")
self.modify_trace_header(0, "set", "BUNIT", "Pixel (Spatial)")
# Add the trace_sigma
self.modify_trace_header(1, "set", "EXTNAME", hdu_names[1])
self.modify_trace_header(1, "set", "LABEL", "Trace width")
self.modify_trace_header(1, "set", "CRPIX1", 1)
self.modify_trace_header(1, "set", "CDELT1", 1)
self.modify_trace_header(
1, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_trace_header(1, "set", "CTYPE1", "Pixel (Dispersion)")
self.modify_trace_header(1, "set", "CUNIT1", "Number of Pixels")
self.modify_trace_header(1, "set", "BUNIT", "Pixel (Spatial)")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("trace ImageHDU cannot be created.")
self.trace_hdulist = None
[docs]
def create_count_fits(self):
"""
Create an ImageHDU for the extracted spectrum in photoelectron counts.
"""
try:
# Use the header of the spectrum
if self.spectrum_header is not None:
count_ImageHDU = fits.ImageHDU(
self.count, header=self.spectrum_header
)
count_err_ImageHDU = fits.ImageHDU(
self.count_err, header=self.spectrum_header
)
count_sky_ImageHDU = fits.ImageHDU(
self.count_sky, header=self.spectrum_header
)
else:
count_ImageHDU = fits.ImageHDU(self.count)
count_err_ImageHDU = fits.ImageHDU(self.count_err)
count_sky_ImageHDU = fits.ImageHDU(self.count_sky)
# Create an empty HDU list and populate with ImageHDUs
self.count_hdulist = fits.HDUList()
self.count_hdulist += [count_ImageHDU]
self.count_hdulist += [count_err_ImageHDU]
self.count_hdulist += [count_sky_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["count"]]
# Add the count
self.modify_count_header(0, "set", "WIDTHDN", self.widthdn)
self.modify_count_header(0, "set", "WIDTHUP", self.widthup)
self.modify_count_header(0, "set", "SEPDN", self.sepdn)
self.modify_count_header(0, "set", "SEPUP", self.sepup)
self.modify_count_header(0, "set", "SKYDN", self.skywidthdn)
self.modify_count_header(0, "set", "SKYUP", self.skywidthup)
self.modify_count_header(0, "set", "XTYPE", self.extraction_type)
self.modify_count_header(0, "set", "EXTNAME", hdu_names[0])
self.modify_count_header(0, "set", "LABEL", "Electron count")
self.modify_count_header(0, "set", "CRPIX1", 1)
self.modify_count_header(0, "set", "CDELT1", 1)
self.modify_count_header(
0, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_count_header(0, "set", "CTYPE1", " Pixel (Dispersion)")
self.modify_count_header(0, "set", "CUNIT1", "Pixel")
self.modify_count_header(0, "set", "BUNIT", "electron")
self.modify_count_header(0, "set", "XPOSURE", self.exptime)
self.modify_count_header(0, "set", "GAIN", self.gain)
self.modify_count_header(0, "set", "RNOISE", self.readnoise)
self.modify_count_header(0, "set", "SEEING", self.seeing)
self.modify_count_header(0, "set", "AIRMASS", self.airmass)
# Add the uncertainty count
self.modify_count_header(1, "set", "EXTNAME", hdu_names[1])
self.modify_count_header(
1, "set", "LABEL", "Electron count (Uncertainty)"
)
self.modify_count_header(1, "set", "CRPIX1", 1)
self.modify_count_header(1, "set", "CDELT1", 1)
self.modify_count_header(
1, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_count_header(1, "set", "CTYPE1", "Pixel (Dispersion)")
self.modify_count_header(1, "set", "CUNIT1", "Pixel")
self.modify_count_header(1, "set", "BUNIT", "electron")
# Add the sky count
self.modify_count_header(2, "set", "EXTNAME", hdu_names[2])
self.modify_count_header(2, "set", "LABEL", "Electron count (Sky)")
self.modify_count_header(2, "set", "CRPIX1", 1)
self.modify_count_header(2, "set", "CDELT1", 1)
self.modify_count_header(
2, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_count_header(2, "set", "CTYPE1", "Pixel (Dispersion)")
self.modify_count_header(2, "set", "CUNIT1", "Pixel")
self.modify_count_header(2, "set", "BUNIT", "electron")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("count ImageHDU cannot be created.")
self.count_hdulist = None
[docs]
def create_weight_map_fits(self):
"""
Create an ImageHDU for the extraction profile weight function.
"""
try:
# Use the header of the spectrum
if self.spectrum_header is not None:
weight_map_ImageHDU = fits.ImageHDU(
self.var, header=self.spectrum_header
)
else:
weight_map_ImageHDU = fits.ImageHDU(self.var)
# Create an empty HDU list and populate with ImageHDUs
self.weight_map_hdulist = fits.HDUList()
self.weight_map_hdulist += [weight_map_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["weight_map"]]
# Add the extraction weights
self.modify_weight_map_header("set", "EXTNAME", hdu_names[0])
self.modify_weight_map_header(
"set", "LABEL", "Optimal extraction profile"
)
if self.var is not None:
self.modify_weight_map_header("set", "CRVAL1", len(self.var))
self.modify_weight_map_header("set", "CRPIX1", 1)
self.modify_weight_map_header("set", "CDELT1", 1)
self.modify_weight_map_header(
"set", "CTYPE1", "Pixel (Spatial)"
)
self.modify_weight_map_header("set", "CUNIT1", "Pixel")
self.modify_weight_map_header("set", "BUNIT", "weights")
else:
self.modify_weight_map_header(
"set", "COMMENT", "Extraction Profile is not available."
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("A weight map ImageHDU cannot be created.")
self.weight_map_hdulist = None
[docs]
def create_count_resampled_fits(self):
"""
Create an ImageHDU for the extracted spectrum in photoelectron count
at the resampled wavelengths.
"""
try:
# Use the header of the spectrum
if self.spectrum_header is not None:
count_resampled_ImageHDU = fits.ImageHDU(
self.count_resampled, header=self.spectrum_header
)
count_err_resampled_ImageHDU = fits.ImageHDU(
self.count_err_resampled, header=self.spectrum_header
)
count_sky_resampled_ImageHDU = fits.ImageHDU(
self.count_sky_resampled, header=self.spectrum_header
)
else:
count_resampled_ImageHDU = fits.ImageHDU(self.count_resampled)
count_err_resampled_ImageHDU = fits.ImageHDU(
self.count_err_resampled
)
count_sky_resampled_ImageHDU = fits.ImageHDU(
self.count_sky_resampled
)
# Create an empty HDU list and populate with ImageHDUs
self.count_resampled_hdulist = fits.HDUList()
self.count_resampled_hdulist += [count_resampled_ImageHDU]
self.count_resampled_hdulist += [count_err_resampled_ImageHDU]
self.count_resampled_hdulist += [count_sky_resampled_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["count_resampled"]]
# Add the resampled count
self.modify_count_resampled_header(
0, "set", "EXTNAME", hdu_names[0]
)
self.modify_count_resampled_header(
0, "set", "LABEL", "Resampled electron count"
)
self.modify_count_resampled_header(0, "set", "CRPIX1", 1.00e00)
self.modify_count_resampled_header(
0, "set", "CDELT1", self.wave_bin
)
self.modify_count_resampled_header(
0, "set", "CRVAL1", self.wave_start
)
self.modify_count_resampled_header(
0, "set", "CTYPE1", "Wavelength"
)
self.modify_count_resampled_header(0, "set", "CUNIT1", "Angstroms")
self.modify_count_resampled_header(0, "set", "BUNIT", "electron")
self.modify_count_resampled_header(0, "set", "BUNIT", "electron")
self.modify_count_resampled_header(
0, "set", "XPOSURE", self.exptime
)
self.modify_count_resampled_header(0, "set", "GAIN", self.gain)
self.modify_count_resampled_header(
0, "set", "RNOISE", self.readnoise
)
self.modify_count_resampled_header(0, "set", "SEEING", self.seeing)
self.modify_count_resampled_header(
0, "set", "AIRMASS", self.airmass
)
# Add the resampled uncertainty count
self.modify_count_resampled_header(
1, "set", "EXTNAME", hdu_names[1]
)
self.modify_count_resampled_header(
1, "set", "LABEL", "Resampled electron count (Uncertainty)"
)
self.modify_count_resampled_header(1, "set", "CRPIX1", 1.00e00)
self.modify_count_resampled_header(
1, "set", "CDELT1", self.wave_bin
)
self.modify_count_resampled_header(
1, "set", "CRVAL1", self.wave_start
)
self.modify_count_resampled_header(
1, "set", "CTYPE1", "Wavelength"
)
self.modify_count_resampled_header(1, "set", "CUNIT1", "Angstroms")
self.modify_count_resampled_header(1, "set", "BUNIT", "electron")
# Add the resampled sky count
self.modify_count_resampled_header(
2, "set", "EXTNAME", hdu_names[2]
)
self.modify_count_resampled_header(
2, "set", "LABEL", "Resampled electron count (Sky)"
)
self.modify_count_resampled_header(2, "set", "CRPIX1", 1.00e00)
self.modify_count_resampled_header(
2, "set", "CDELT1", self.wave_bin
)
self.modify_count_resampled_header(
2, "set", "CRVAL1", self.wave_start
)
self.modify_count_resampled_header(
2, "set", "CTYPE1", "Wavelength"
)
self.modify_count_resampled_header(2, "set", "CUNIT1", "Angstroms")
self.modify_count_resampled_header(2, "set", "BUNIT", "electron")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("count_resampled ImageHDU cannot be created.")
self.count_resampled_hdulist = None
[docs]
def create_arc_spec_fits(self):
"""
Create an ImageHDU for the spectrum of the arc lamp.
"""
try:
# Use the header of the arc
if self.arc_header is not None:
arc_spec_ImageHDU = fits.ImageHDU(
self.arc_spec, header=self.arc_header
)
else:
arc_spec_ImageHDU = fits.ImageHDU(self.arc_spec)
# Create an empty HDU list and populate with ImageHDUs
self.arc_spec_hdulist = fits.HDUList()
self.arc_spec_hdulist += [arc_spec_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["arc_spec"]]
# Add the arc spectrum
self.modify_arc_spec_header(0, "set", "EXTNAME", hdu_names[0])
self.modify_arc_spec_header(
0, "set", "LABEL", "Electron count (Arc)"
)
self.modify_arc_spec_header(0, "set", "CRPIX1", 1)
self.modify_arc_spec_header(0, "set", "CDELT1", 1)
self.modify_arc_spec_header(
0, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_arc_spec_header(
0, "set", "CTYPE1", "Pixel (Dispersion)"
)
self.modify_arc_spec_header(0, "set", "CUNIT1", "Pixel")
self.modify_arc_spec_header(0, "set", "BUNIT", "electron")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("arc_spec ImageHDU cannot be created.")
self.arc_spec_hdulist = None
[docs]
def create_arc_lines_fits(self):
"""
Create an ImageHDU for the spectrum of the arc lamp.
"""
try:
# Use the header of the arc
if self.arc_header is not None:
peaks_ImageHDU = fits.ImageHDU(
self.peaks, header=self.arc_header
)
peaks_refined_ImageHDU = fits.ImageHDU(
self.peaks_refined, header=self.arc_header
)
else:
peaks_ImageHDU = fits.ImageHDU(self.peaks)
peaks_refined_ImageHDU = fits.ImageHDU(self.peaks_refined)
# Create an empty HDU list and populate with ImageHDUs
self.arc_lines_hdulist = fits.HDUList()
self.arc_lines_hdulist += [peaks_ImageHDU]
self.arc_lines_hdulist += [peaks_refined_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["arc_lines"]]
# Add the peaks in native pixel value
self.modify_arc_lines_header(0, "set", "EXTNAME", hdu_names[0])
self.modify_arc_lines_header(
0, "set", "LABEL", "Peaks (Detector pixel)"
)
self.modify_arc_lines_header(0, "set", "BUNIT", "Pixel")
# Add the peaks in effective pixel value
self.modify_arc_lines_header(1, "set", "EXTNAME", hdu_names[1])
self.modify_arc_lines_header(
1, "set", "LABEL", "Peaks (Effective pixel)"
)
self.modify_arc_lines_header(1, "set", "BUNIT", "Pixel")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("arc_lines ImageHDU cannot be created.")
self.arc_lines_hdulist = None
[docs]
def create_wavecal_fits(self):
"""
Create an ImageHDU for the polynomial coeffcients of the
pixel-wavelength mapping function.
"""
try:
# Use the header of the arc
if self.arc_header is not None:
wavecal_ImageHDU = fits.ImageHDU(
self.fit_coeff, header=self.arc_header
)
else:
wavecal_ImageHDU = fits.ImageHDU(self.fit_coeff)
# Create an empty HDU list and populate with ImageHDUs
self.wavecal_hdulist = fits.HDUList()
self.wavecal_hdulist += [wavecal_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["wavecal_coefficients"]]
# Add the wavelength calibration header info
self.modify_wavecal_header("set", "FTYPE", self.fit_type)
self.modify_wavecal_header("set", "FDEG", self.fit_deg)
self.modify_wavecal_header("set", "FFRMS", self.rms)
self.modify_wavecal_header(
"set", "ATLWMIN", self.min_atlas_wavelength
)
self.modify_wavecal_header(
"set", "ATLWMAX", self.max_atlas_wavelength
)
self.modify_wavecal_header("set", "NSLOPES", self.num_slopes)
self.modify_wavecal_header("set", "RNGTOL", self.range_tolerance)
self.modify_wavecal_header("set", "FITTOL", self.fit_tolerance)
self.modify_wavecal_header(
"set", "CANTHRE", self.candidate_tolerance
)
self.modify_wavecal_header(
"set", "LINTHRE", self.linearity_tolerance
)
self.modify_wavecal_header("set", "RANTHRE", self.ransac_tolerance)
self.modify_wavecal_header("set", "NUMCAN", self.num_candidates)
self.modify_wavecal_header("set", "XBINS", self.xbins)
self.modify_wavecal_header("set", "YBINS", self.ybins)
self.modify_wavecal_header("set", "BRUTE", self.brute_force)
self.modify_wavecal_header("set", "SAMSIZE", self.sample_size)
self.modify_wavecal_header("set", "TOPN", self.top_n)
self.modify_wavecal_header("set", "MAXTRY", self.max_tries)
self.modify_wavecal_header("set", "INCOEFF", self.intput_coeff)
self.modify_wavecal_header("set", "LINEAR", self.linear)
self.modify_wavecal_header("set", "W8ED", self.weighted)
self.modify_wavecal_header("set", "FILTER", self.filter_close)
self.modify_wavecal_header("set", "PUSAGE", self.peak_utilisation)
self.modify_wavecal_header("set", "EXTNAME", hdu_names[0])
self.modify_wavecal_header(
"set", "LABEL", "Wavelength calibration coefficients"
)
self.modify_wavecal_header("set", "CRPIX1", 1.00e00)
self.modify_wavecal_header("set", "CDELT1", self.wave_bin)
self.modify_wavecal_header("set", "CRVAL1", self.wave_start)
self.modify_wavecal_header("set", "CTYPE1", "Wavelength")
self.modify_wavecal_header("set", "CUNIT1", "Angstroms")
self.modify_wavecal_header("set", "BUNIT", "electron")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("wavecal ImageHDU cannot be created.")
self.wavecal_hdulist = None
[docs]
def create_wavelength_fits(self):
"""
Create an ImageHDU for the wavelength at each of the native pixel.
"""
try:
# Put the data in an ImageHDU
# Use the header of the arc
if self.arc_header is not None:
wavelength_ImageHDU = fits.ImageHDU(
self.wave, header=self.arc_header
)
else:
wavelength_ImageHDU = fits.ImageHDU(self.wave)
# Create an empty HDU list and populate with the ImageHDU
self.wavelength_hdulist = fits.HDUList()
self.wavelength_hdulist += [wavelength_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["wavelength"]]
# Add the calibrated wavelength
self.modify_wavelength_header("set", "EXTNAME", hdu_names[0])
self.modify_wavelength_header(
"set", "LABEL", "Pixel-wavelength mapping"
)
self.modify_wavelength_header("set", "CRPIX1", 1)
self.modify_wavelength_header("set", "CDELT1", 1)
self.modify_wavelength_header(
"set", "CRVAL1", self.effective_pixel[0]
)
self.modify_wavelength_header(
"set", "CTYPE1", "Pixel (Dispersion)"
)
self.modify_wavelength_header("set", "CUNIT1", "Pixel")
self.modify_wavelength_header("set", "BUNIT", "Angstroms")
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("wavelength ImageHDU cannot be created.")
self.wavelength_hdulist = None
[docs]
def create_wavelength_resampled_fits(self):
"""
Create an ImageHDU for the wavelength at each resampled position.
"""
try:
# Put the data in an ImageHDU
# Use the header of the arc
if self.arc_header is not None:
wavelength_resampled_ImageHDU = fits.ImageHDU(
self.wave_resampled, header=self.arc_header
)
else:
wavelength_resampled_ImageHDU = fits.ImageHDU(
self.wave_resampled
)
# Create an empty HDU list and populate with the ImageHDU
self.wavelength_resampled_hdulist = fits.HDUList()
self.wavelength_resampled_hdulist += [
wavelength_resampled_ImageHDU
]
hdu_names = self.ext_name[self.hdu_order["wavelength_resampled"]]
# Add the calibrated wavelength
self.modify_wavelength_resampled_header(
"set", "EXTNAME", hdu_names[0]
)
self.modify_wavelength_resampled_header(
"set", "LABEL", "Wavelength at resampled position"
)
self.modify_wavelength_resampled_header("set", "CRPIX1", 1)
self.modify_wavelength_resampled_header("set", "CDELT1", 1)
self.modify_wavelength_resampled_header(
"set", "CRVAL1", self.effective_pixel[0]
)
self.modify_wavelength_resampled_header(
"set", "CTYPE1", "Pixel (Dispersion)"
)
self.modify_wavelength_resampled_header("set", "CUNIT1", "Pixel")
self.modify_wavelength_resampled_header(
"set", "BUNIT", "Angstroms"
)
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning(
"wavelength_resampled ImageHDU cannot be created."
)
self.wavelength_resampled_hdulist = None
[docs]
def create_sensitivity_fits(self):
"""
Create an ImageHDU for the sensitivity at each of the native pixel.
"""
try:
# Put the data in ImageHDUs
# Use the header of the standard
if self.standard_header is not None:
sensitivity_ImageHDU = fits.ImageHDU(
self.sensitivity, header=self.standard_header
)
else:
sensitivity_ImageHDU = fits.ImageHDU(self.sensitivity)
# Create an empty HDU list and populate with ImageHDUs
self.sensitivity_hdulist = fits.HDUList()
self.sensitivity_hdulist += [sensitivity_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["sensitivity"]]
self.modify_sensitivity_header("set", "EXTNAME", hdu_names[0])
self.modify_sensitivity_header("set", "LABEL", "Sensitivity")
self.modify_sensitivity_header("set", "CRPIX1", 1.00e00)
self.modify_sensitivity_header("set", "CDELT1", 1)
self.modify_sensitivity_header(
"set", "CRVAL1", self.effective_pixel[0]
)
self.modify_sensitivity_header("set", "CTYPE1", "Pixel")
self.modify_sensitivity_header("set", "CUNIT1", "Pixel")
self.modify_sensitivity_header(
"set", "BUNIT", "erg/(s*cm**2*Angstrom)/Count"
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("sensitivity ImageHDU cannot be created.")
self.sensitivity_hdulist = None
[docs]
def create_flux_fits(self):
"""
Create an ImageHDU for the flux calibrated spectrum at each native
pixel.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data in ImageHDUs
flux_ImageHDU = fits.ImageHDU(self.flux, header=header)
flux_err_ImageHDU = fits.ImageHDU(self.flux_err, header=header)
flux_sky_ImageHDU = fits.ImageHDU(self.flux_sky, header=header)
else:
flux_ImageHDU = fits.ImageHDU(self.flux)
flux_err_ImageHDU = fits.ImageHDU(self.flux_err)
flux_sky_ImageHDU = fits.ImageHDU(self.flux_sky)
# Create an empty HDU list and populate with ImageHDUs
self.flux_hdulist = fits.HDUList()
self.flux_hdulist += [flux_ImageHDU]
self.flux_hdulist += [flux_err_ImageHDU]
self.flux_hdulist += [flux_sky_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["flux"]]
# Note that wave_start is the centre of the starting bin
self.modify_flux_header(0, "set", "EXTNAME", hdu_names[0])
self.modify_flux_header(0, "set", "LABEL", "Flux")
self.modify_flux_header(0, "set", "CRPIX1", 1.00e00)
self.modify_flux_header(0, "set", "CDELT1", 1)
self.modify_flux_header(
0, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_header(0, "set", "CTYPE1", "Pixel")
self.modify_flux_header(0, "set", "CUNIT1", "Pixel")
self.modify_flux_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_header(0, "set", "XPOSURE", self.exptime)
self.modify_flux_header(0, "set", "GAIN", self.gain)
self.modify_flux_header(0, "set", "RNOISE", self.readnoise)
self.modify_flux_header(0, "set", "SEEING", self.seeing)
self.modify_flux_header(0, "set", "AIRMASS", self.airmass)
self.modify_flux_header(1, "set", "EXTNAME", hdu_names[1])
self.modify_flux_header(1, "set", "LABEL", "Flux (Uncertainty)")
self.modify_flux_header(1, "set", "CRPIX1", 1.00e00)
self.modify_flux_header(1, "set", "CDELT1", 1)
self.modify_flux_header(
1, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_header(1, "set", "CTYPE1", "Pixel")
self.modify_flux_header(1, "set", "CUNIT1", "Pixel")
self.modify_flux_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_header(2, "set", "EXTNAME", hdu_names[2])
self.modify_flux_header(2, "set", "LABEL", "Flux (Sky)")
self.modify_flux_header(2, "set", "CRPIX1", 1.00e00)
self.modify_flux_header(2, "set", "CDELT1", 1)
self.modify_flux_header(
2, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_header(2, "set", "CTYPE1", "Pixel")
self.modify_flux_header(2, "set", "CUNIT1", "Pixel")
self.modify_flux_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("flux ImageHDU cannot be created.")
self.flux_hdulist = None
[docs]
def create_atm_ext_fits(self):
"""
Create an ImageHDU for the atmospheric extinction corrected flux
calibrated spectrum at the resampled wavelength.
"""
try:
header = None
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data and header in ImageHDUs
atm_ext_ImageHDU = fits.ImageHDU(self.atm_ext, header=header)
else:
# Put the data in ImageHDUs
atm_ext_ImageHDU = fits.ImageHDU(self.atm_ext)
# Create an empty HDU list and populate with ImageHDUs
self.atm_ext_hdulist = fits.HDUList()
self.atm_ext_hdulist += [atm_ext_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["atm_ext"]]
# Note that wave_start is the centre of the starting bin
self.modify_atm_ext_header("set", "EXTNAME", hdu_names[0])
self.modify_atm_ext_header(
"set", "LABEL", "Atmopheric extinction correction factor"
)
self.modify_atm_ext_header("set", "CRPIX1", 1.00e00)
self.modify_atm_ext_header("set", "CDELT1", self.wave_bin)
self.modify_atm_ext_header("set", "CRVAL1", self.wave_start)
self.modify_atm_ext_header("set", "CTYPE1", "Wavelength")
self.modify_atm_ext_header("set", "CUNIT1", " ")
self.modify_atm_ext_header("set", "BUNIT", " ")
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning("atm_ext ImageHDU cannot be created.")
self.atm_ext_hdulist = None
[docs]
def create_flux_atm_ext_corrected_fits(self):
"""
Create an ImageHDU for the atmospheric extinction corrected flux
calibrated spectrum at each native pixel.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data in ImageHDUs
flux_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_atm_ext_corrected, header=header
)
flux_err_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_atm_ext_corrected, header=header
)
flux_sky_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_atm_ext_corrected, header=header
)
else:
flux_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_atm_ext_corrected
)
flux_err_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_atm_ext_corrected
)
flux_sky_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_atm_ext_corrected
)
# Create an empty HDU list and populate with ImageHDUs
self.flux_atm_ext_corrected_hdulist = fits.HDUList()
self.flux_atm_ext_corrected_hdulist += [
flux_atm_ext_corrected_ImageHDU
]
self.flux_atm_ext_corrected_hdulist += [
flux_err_atm_ext_corrected_ImageHDU
]
self.flux_atm_ext_corrected_hdulist += [
flux_sky_atm_ext_corrected_ImageHDU
]
hdu_names = self.ext_name[self.hdu_order["flux_atm_ext_corrected"]]
# Note that wave_start is the centre of the starting bin
self.modify_flux_atm_ext_corrected_header(
0, "set", "EXTNAME", hdu_names[0]
)
self.modify_flux_atm_ext_corrected_header(
0, "set", "LABEL", "Flux atmospheric extinction corrected"
)
self.modify_flux_atm_ext_corrected_header(
0, "set", "CRPIX1", 1.00e00
)
self.modify_flux_atm_ext_corrected_header(0, "set", "CDELT1", 1)
self.modify_flux_atm_ext_corrected_header(
0, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_atm_ext_corrected_header(
0, "set", "CTYPE1", "Pixel"
)
self.modify_flux_atm_ext_corrected_header(
0, "set", "CUNIT1", "Pixel"
)
self.modify_flux_atm_ext_corrected_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_atm_ext_corrected_header(
1,
"set",
"EXTNAME",
hdu_names[1],
)
self.modify_flux_atm_ext_corrected_header(
1,
"set",
"LABEL",
"Flux atmospheric extinction corrected (Uncertainty)",
)
self.modify_flux_atm_ext_corrected_header(
1, "set", "CRPIX1", 1.00e00
)
self.modify_flux_atm_ext_corrected_header(1, "set", "CDELT1", 1)
self.modify_flux_atm_ext_corrected_header(
1, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_atm_ext_corrected_header(
1, "set", "CTYPE1", "Pixel"
)
self.modify_flux_atm_ext_corrected_header(
1, "set", "CUNIT1", "Pixel"
)
self.modify_flux_atm_ext_corrected_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_atm_ext_corrected_header(
2,
"set",
"EXTNAME",
hdu_names[2],
)
self.modify_flux_atm_ext_corrected_header(
2,
"set",
"LABEL",
"Flux atmospheric extinction corrected (Sky)",
)
self.modify_flux_atm_ext_corrected_header(
2, "set", "CRPIX1", 1.00e00
)
self.modify_flux_atm_ext_corrected_header(2, "set", "CDELT1", 1)
self.modify_flux_atm_ext_corrected_header(
2, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_atm_ext_corrected_header(
2, "set", "CTYPE1", "Pixel"
)
self.modify_flux_atm_ext_corrected_header(
2, "set", "CUNIT1", "Pixel"
)
self.modify_flux_atm_ext_corrected_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning(
"flux_atm_ext_corrected ImageHDU cannot be created."
)
self.flux_atm_ext_corrected_hdulist = None
[docs]
def create_telluric_profile_fits(self):
"""
Create an ImageHDU for the Telluric absorption profile at resampled
wavelength.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data and header in ImageHDUs
telluric_profile_ImageHDU = fits.ImageHDU(
self.telluric_profile, header=header
)
else:
# Put the data in ImageHDUs
telluric_profile_ImageHDU = fits.ImageHDU(
self.telluric_profile
)
# Create an empty HDU list and populate with ImageHDUs
self.telluric_profile_hdulist = fits.HDUList()
self.telluric_profile_hdulist += [telluric_profile_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["telluric_profile"]]
# Note that wave_start is the centre of the starting bin
self.modify_telluric_profile_header("set", "EXTNAME", hdu_names[0])
self.modify_telluric_profile_header(
"set", "LABEL", "Telluric absorption profile"
)
self.modify_telluric_profile_header("set", "CRPIX1", 1.00e00)
self.modify_telluric_profile_header("set", "CDELT1", self.wave_bin)
self.modify_telluric_profile_header(
"set", "CRVAL1", self.wave_start
)
self.modify_telluric_profile_header("set", "CTYPE1", "Wavelength")
self.modify_telluric_profile_header("set", "CUNIT1", "")
self.modify_telluric_profile_header("set", "BUNIT", "")
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning("telluric_profile ImageHDU cannot be created.")
self.telluric_profile_hdulist = None
[docs]
def create_flux_telluric_corrected_fits(self):
"""
Create an ImageHDU for the telluric corrected flux calibrated
spectrum at each native pixel.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data in ImageHDUs
flux_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_telluric_corrected, header=header
)
flux_err_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_telluric_corrected, header=header
)
flux_sky_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_telluric_corrected, header=header
)
else:
flux_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_telluric_corrected
)
flux_err_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_telluric_corrected
)
flux_sky_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_telluric_corrected
)
# Create an empty HDU list and populate with ImageHDUs
self.flux_telluric_corrected_hdulist = fits.HDUList()
self.flux_telluric_corrected_hdulist += [
flux_telluric_corrected_ImageHDU
]
self.flux_telluric_corrected_hdulist += [
flux_err_telluric_corrected_ImageHDU
]
self.flux_telluric_corrected_hdulist += [
flux_sky_telluric_corrected_ImageHDU
]
hdu_names = self.ext_name[
self.hdu_order["flux_telluric_corrected"]
]
# Note that wave_start is the centre of the starting bin
self.modify_flux_telluric_corrected_header(
0, "set", "EXTNAME", hdu_names[0]
)
self.modify_flux_telluric_corrected_header(
0, "set", "LABEL", "Flux telluric corrected"
)
self.modify_flux_telluric_corrected_header(
0, "set", "CRPIX1", 1.00e00
)
self.modify_flux_telluric_corrected_header(0, "set", "CDELT1", 1)
self.modify_flux_telluric_corrected_header(
0, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_telluric_corrected_header(
0, "set", "CTYPE1", "Pixel"
)
self.modify_flux_telluric_corrected_header(
0, "set", "CUNIT1", "Pixel"
)
self.modify_flux_telluric_corrected_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_telluric_corrected_header(
1, "set", "EXTNAME", hdu_names[1]
)
self.modify_flux_telluric_corrected_header(
1, "set", "LABEL", "Flux telluric correct (Uncertainty)"
)
self.modify_flux_telluric_corrected_header(
1, "set", "CRPIX1", 1.00e00
)
self.modify_flux_telluric_corrected_header(1, "set", "CDELT1", 1)
self.modify_flux_telluric_corrected_header(
1, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_telluric_corrected_header(
1, "set", "CTYPE1", "Pixel"
)
self.modify_flux_telluric_corrected_header(
1, "set", "CUNIT1", "Pixel"
)
self.modify_flux_telluric_corrected_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_telluric_corrected_header(
2, "set", "EXTNAME", hdu_names[2]
)
self.modify_flux_telluric_corrected_header(
2, "set", "LABEL", "Flux telluric corrected (Sky)"
)
self.modify_flux_telluric_corrected_header(
2, "set", "CRPIX1", 1.00e00
)
self.modify_flux_telluric_corrected_header(2, "set", "CDELT1", 1)
self.modify_flux_telluric_corrected_header(
2, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_telluric_corrected_header(
2, "set", "CTYPE1", "Pixel"
)
self.modify_flux_telluric_corrected_header(
2, "set", "CUNIT1", "Pixel"
)
self.modify_flux_telluric_corrected_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning(
"flux_telluric_corrected ImageHDU cannot be created."
)
self.flux_telluric_corrected_hdulist = None
[docs]
def create_flux_atm_ext_telluric_corrected_fits(self):
"""
Create an ImageHDU for the atmospheric extinction corrected flux
calibrated spectrum at each native pixel.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data in ImageHDUs
flux_atm_ext_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_atm_ext_telluric_corrected, header=header
)
flux_err_atm_ext_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_atm_ext_telluric_corrected, header=header
)
flux_sky_atm_ext_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_atm_ext_telluric_corrected, header=header
)
else:
flux_atm_ext_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_atm_ext_telluric_corrected
)
flux_err_atm_ext_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_atm_ext_telluric_corrected
)
flux_sky_atm_ext_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_atm_ext_telluric_corrected
)
# Create an empty HDU list and populate with ImageHDUs
self.flux_atm_ext_telluric_corrected_hdulist = fits.HDUList()
self.flux_atm_ext_telluric_corrected_hdulist += [
flux_atm_ext_telluric_corrected_ImageHDU
]
self.flux_atm_ext_telluric_corrected_hdulist += [
flux_err_atm_ext_telluric_corrected_ImageHDU
]
self.flux_atm_ext_telluric_corrected_hdulist += [
flux_sky_atm_ext_telluric_corrected_ImageHDU
]
hdu_names = self.ext_name[
self.hdu_order["flux_atm_ext_telluric_corrected"]
]
# Note that wave_start is the centre of the starting bin
self.modify_flux_atm_ext_telluric_corrected_header(
0,
"set",
"EXTNAME",
hdu_names[0],
)
self.modify_flux_atm_ext_telluric_corrected_header(
0,
"set",
"LABEL",
"Flux atmospheric extinction telluric corrected",
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "CRPIX1", 1.00e00
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "CDELT1", 1
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "CTYPE1", "Pixel"
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "CUNIT1", "Pixel"
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "XPOSURE", self.exptime
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "GAIN", self.gain
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "RNOISE", self.readnoise
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "SEEING", self.seeing
)
self.modify_flux_atm_ext_telluric_corrected_header(
0, "set", "AIRMASS", self.airmass
)
self.modify_flux_atm_ext_telluric_corrected_header(
1,
"set",
"EXTNAME",
hdu_names[1],
)
self.modify_flux_atm_ext_telluric_corrected_header(
1,
"set",
"LABEL",
"Flux atmospheric extinction telluric corrected (Uncertainty)",
)
self.modify_flux_atm_ext_telluric_corrected_header(
1, "set", "CRPIX1", 1.00e00
)
self.modify_flux_atm_ext_telluric_corrected_header(
1, "set", "CDELT1", 1
)
self.modify_flux_atm_ext_telluric_corrected_header(
1, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_atm_ext_telluric_corrected_header(
1, "set", "CTYPE1", "Pixel"
)
self.modify_flux_atm_ext_telluric_corrected_header(
1, "set", "CUNIT1", "Pixel"
)
self.modify_flux_atm_ext_corrected_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_atm_ext_corrected_header(
2,
"set",
"EXTNAME",
hdu_names[2],
)
self.modify_flux_atm_ext_telluric_corrected_header(
2,
"set",
"LABEL",
"Flux atmospheric extinction telluric corrected (Sky)",
)
self.modify_flux_atm_ext_telluric_corrected_header(
2, "set", "CRPIX1", 1.00e00
)
self.modify_flux_atm_ext_telluric_corrected_header(
2, "set", "CDELT1", 1
)
self.modify_flux_atm_ext_telluric_corrected_header(
2, "set", "CRVAL1", self.effective_pixel[0]
)
self.modify_flux_atm_ext_telluric_corrected_header(
2, "set", "CTYPE1", "Pixel"
)
self.modify_flux_atm_ext_telluric_corrected_header(
2, "set", "CUNIT1", "Pixel"
)
self.modify_flux_atm_ext_telluric_corrected_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning(
"flux_atm_ext_telluric_corrected ImageHDU cannot be created."
)
self.flux_atm_ext_telluric_corrected_hdulist = None
[docs]
def create_sensitivity_resampled_fits(self):
"""
Create an ImageHDU for the sensitivity at the resampled wavelength.
"""
try:
# Put the data in ImageHDUs
sensitivity_resampled_ImageHDU = fits.ImageHDU(
self.sensitivity_resampled
)
# Create an empty HDU list and populate with ImageHDUs
self.sensitivity_resampled_hdulist = fits.HDUList()
self.sensitivity_resampled_hdulist += [
sensitivity_resampled_ImageHDU
]
hdu_names = self.ext_name[self.hdu_order["sensitivity_resampled"]]
self.modify_sensitivity_resampled_header(
"set", "EXTNAME", hdu_names[0]
)
self.modify_sensitivity_resampled_header(
"set", "LABEL", "Sensitivity"
)
self.modify_sensitivity_resampled_header("set", "CRPIX1", 1.00e00)
self.modify_sensitivity_resampled_header("set", "CDELT1", 1)
self.modify_sensitivity_resampled_header(
"set", "CRVAL1", self.effective_pixel[0]
)
self.modify_sensitivity_resampled_header("set", "CTYPE1", "Pixel")
self.modify_sensitivity_resampled_header("set", "CUNIT1", "Pixel")
self.modify_sensitivity_resampled_header(
"set", "BUNIT", "erg/(s*cm**2*Angstrom)/Count"
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("sensitivity ImageHDU cannot be created.")
self.sensitivity_resampled_hdulist = None
[docs]
def create_flux_resampled_fits(self):
"""
Create an ImageHDU for the flux calibrated spectrum at the resampled
wavelength.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data in ImageHDUs
flux_resampled_ImageHDU = fits.ImageHDU(
self.flux_resampled, header=header
)
flux_err_resampled_ImageHDU = fits.ImageHDU(
self.flux_err_resampled, header=header
)
flux_sky_resampled_ImageHDU = fits.ImageHDU(
self.flux_sky_resampled, header=header
)
else:
# Put the data in ImageHDUs
flux_resampled_ImageHDU = fits.ImageHDU(self.flux_resampled)
flux_err_resampled_ImageHDU = fits.ImageHDU(
self.flux_err_resampled
)
flux_sky_resampled_ImageHDU = fits.ImageHDU(
self.flux_sky_resampled
)
# Create an empty HDU list and populate with ImageHDUs
self.flux_resampled_hdulist = fits.HDUList()
self.flux_resampled_hdulist += [flux_resampled_ImageHDU]
self.flux_resampled_hdulist += [flux_err_resampled_ImageHDU]
self.flux_resampled_hdulist += [flux_sky_resampled_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["flux_resampled"]]
# Note that wave_start is the centre of the starting bin
self.modify_flux_resampled_header(
0, "set", "EXTNAME", hdu_names[0]
)
self.modify_flux_resampled_header(
0, "set", "LABEL", "Flux resampled"
)
self.modify_flux_resampled_header(0, "set", "CRPIX1", 1.00e00)
self.modify_flux_resampled_header(
0, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_header(
0, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_header(0, "set", "CTYPE1", "Wavelength")
self.modify_flux_resampled_header(0, "set", "CUNIT1", "Angstroms")
self.modify_flux_resampled_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_header(
0, "set", "XPOSURE", self.exptime
)
self.modify_flux_resampled_header(0, "set", "GAIN", self.gain)
self.modify_flux_resampled_header(
0, "set", "RNOISE", self.readnoise
)
self.modify_flux_resampled_header(0, "set", "SEEING", self.seeing)
self.modify_flux_resampled_header(
0, "set", "AIRMASS", self.airmass
)
self.modify_flux_resampled_header(
1, "set", "EXTNAME", hdu_names[1]
)
self.modify_flux_resampled_header(
1, "set", "LABEL", "Flux resampled (Uncertainty)"
)
self.modify_flux_resampled_header(1, "set", "CRPIX1", 1.00e00)
self.modify_flux_resampled_header(
1, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_header(
1, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_header(1, "set", "CTYPE1", "Wavelength")
self.modify_flux_resampled_header(1, "set", "CUNIT1", "Angstroms")
self.modify_flux_resampled_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_header(
2, "set", "EXTNAME", hdu_names[2]
)
self.modify_flux_resampled_header(
2, "set", "LABEL", "Flux resampled (Sky)"
)
self.modify_flux_resampled_header(2, "set", "CRPIX1", 1.00e00)
self.modify_flux_resampled_header(
2, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_header(
2, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_header(2, "set", "CTYPE1", "Wavelength")
self.modify_flux_resampled_header(2, "set", "CUNIT1", "Angstroms")
self.modify_flux_resampled_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.error(str(e))
# Set it to None if the above failed
self.logger.error("flux_resampled ImageHDU cannot be created.")
self.flux_resampled_hdulist = None
[docs]
def create_atm_ext_resampled_fits(self):
"""
Create an ImageHDU for the atmospheric extinction corrected flux
calibrated spectrum at the resampled wavelength.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data and header in ImageHDUs
atm_ext_resampled_ImageHDU = fits.ImageHDU(
self.atm_ext_resampled, header=header
)
else:
# Put the data in ImageHDUs
atm_ext_resampled_ImageHDU = fits.ImageHDU(
self.atm_ext_resampled
)
# Create an empty HDU list and populate with ImageHDUs
self.atm_ext_resampled_hdulist = fits.HDUList()
self.atm_ext_resampled_hdulist += [atm_ext_resampled_ImageHDU]
hdu_names = self.ext_name[self.hdu_order["atm_ext_resampled"]]
# Note that wave_start is the centre of the starting bin
self.modify_atm_ext_resampled_header(
"set", "EXTNAME", hdu_names[0]
)
self.modify_atm_ext_resampled_header(
"set", "LABEL", "Atmopheric extinction correction factor"
)
self.modify_atm_ext_resampled_header("set", "CRPIX1", 1.00e00)
self.modify_atm_ext_resampled_header(
"set", "CDELT1", self.wave_bin
)
self.modify_atm_ext_resampled_header(
"set", "CRVAL1", self.wave_start
)
self.modify_atm_ext_resampled_header("set", "CTYPE1", "Wavelength")
self.modify_atm_ext_resampled_header("set", "CUNIT1", "")
self.modify_atm_ext_resampled_header("set", "BUNIT", "")
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning(
"atm_ext_resampled ImageHDU cannot be created."
)
self.atm_ext_resampled_hdulist = None
[docs]
def create_flux_resampled_atm_ext_corrected_fits(self):
"""
Create an ImageHDU for the atmospheric extinction corrected flux
calibrated spectrum at the resampled wavelength.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data and header in ImageHDUs
flux_resampled_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_resampled_atm_ext_corrected, header=header
)
flux_err_resampled_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_resampled_atm_ext_corrected, header=header
)
flux_sky_resampled_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_resampled_atm_ext_corrected, header=header
)
else:
# Put the data in ImageHDUs
flux_resampled_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_resampled_atm_ext_corrected
)
flux_err_resampled_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_resampled_atm_ext_corrected
)
flux_sky_resampled_atm_ext_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_resampled_atm_ext_corrected
)
# Create an empty HDU list and populate with ImageHDUs
self.flux_resampled_atm_ext_corrected_hdulist = fits.HDUList()
self.flux_resampled_atm_ext_corrected_hdulist += [
flux_resampled_atm_ext_corrected_ImageHDU
]
self.flux_resampled_atm_ext_corrected_hdulist += [
flux_err_resampled_atm_ext_corrected_ImageHDU
]
self.flux_resampled_atm_ext_corrected_hdulist += [
flux_sky_resampled_atm_ext_corrected_ImageHDU
]
hdu_names = self.ext_name[
self.hdu_order["flux_resampled_atm_ext_corrected"]
]
# Note that wave_start is the centre of the starting bin
self.modify_flux_resampled_atm_ext_corrected_header(
0,
"set",
"EXTNAME",
hdu_names[0],
)
self.modify_flux_resampled_atm_ext_corrected_header(
0,
"set",
"LABEL",
"Flux resampled atmospheric extinction corrected",
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "CRPIX1", 1.00e00
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "CTYPE1", "Wavelength"
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "CUNIT1", "Angstroms"
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "XPOSURE", self.exptime
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "GAIN", self.gain
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "RNOISE", self.readnoise
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "SEEING", self.seeing
)
self.modify_flux_resampled_atm_ext_corrected_header(
0, "set", "AIRMASS", self.airmass
)
self.modify_flux_resampled_atm_ext_corrected_header(
1,
"set",
"EXTNAME",
hdu_names[1],
)
self.modify_flux_resampled_atm_ext_corrected_header(
1,
"set",
"LABEL",
"Flux resampled atmospheric extinction corrected (Uncertainty)",
)
self.modify_flux_resampled_atm_ext_corrected_header(
1, "set", "CRPIX1", 1.00e00
)
self.modify_flux_resampled_atm_ext_corrected_header(
1, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_atm_ext_corrected_header(
1, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_atm_ext_corrected_header(
1, "set", "CTYPE1", "Wavelength"
)
self.modify_flux_resampled_atm_ext_corrected_header(
1, "set", "CUNIT1", "Angstroms"
)
self.modify_flux_resampled_atm_ext_corrected_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_atm_ext_corrected_header(
2,
"set",
"EXTNAME",
hdu_names[2],
)
self.modify_flux_resampled_atm_ext_corrected_header(
2,
"set",
"LABEL",
"Flux resampled atmospheric extinction corrected (Sky)",
)
self.modify_flux_resampled_atm_ext_corrected_header(
2, "set", "CRPIX1", 1.00e00
)
self.modify_flux_resampled_atm_ext_corrected_header(
2, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_atm_ext_corrected_header(
2, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_atm_ext_corrected_header(
2, "set", "CTYPE1", "Wavelength"
)
self.modify_flux_resampled_atm_ext_corrected_header(
2, "set", "CUNIT1", "Angstroms"
)
self.modify_flux_resampled_atm_ext_corrected_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning(
"flux_resampled_atm_ext_corrected ImageHDU cannot be created."
)
self.flux_resampled_atm_ext_corrected_hdulist = None
[docs]
def create_telluric_profile_resampled_fits(self):
"""
Create an ImageHDU for the Telluric absorption profile at resampled
wavelength.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data and header in ImageHDUs
telluric_profile_resampled_ImageHDU = fits.ImageHDU(
self.telluric_profile_resampled, header=header
)
else:
# Put the data in ImageHDUs
telluric_profile_resampled_ImageHDU = fits.ImageHDU(
self.telluric_profile_resampled
)
# Create an empty HDU list and populate with ImageHDUs
self.telluric_profile_resampled_hdulist = fits.HDUList()
self.telluric_profile_resampled_hdulist += [
telluric_profile_resampled_ImageHDU
]
hdu_names = self.ext_name[
self.hdu_order["telluric_profile_resampled"]
]
# Note that wave_start is the centre of the starting bin
self.modify_telluric_profile_resampled_header(
"set", "EXTNAME", hdu_names[0]
)
self.modify_telluric_profile_resampled_header(
"set", "LABEL", "Telluric absorption profile"
)
self.modify_telluric_profile_resampled_header(
"set", "CRPIX1", 1.00e00
)
self.modify_telluric_profile_resampled_header(
"set", "CDELT1", self.wave_bin
)
self.modify_telluric_profile_resampled_header(
"set", "CRVAL1", self.wave_start
)
self.modify_telluric_profile_resampled_header(
"set", "CTYPE1", "Wavelength"
)
self.modify_telluric_profile_resampled_header("set", "CUNIT1", "")
self.modify_telluric_profile_resampled_header("set", "BUNIT", "")
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning(
"telluric_profile_resampled ImageHDU cannot be created."
)
self.telluric_profile_resampled_hdulist = None
[docs]
def create_flux_resampled_telluric_corrected_fits(self):
"""
Create an ImageHDU for the atmospheric extinction corrected flux
calibrated spectrum at the resampled wavelength.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data and header in ImageHDUs
flux_resampled_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_resampled_telluric_corrected, header=header
)
flux_err_resampled_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_resampled_telluric_corrected, header=header
)
flux_sky_resampled_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_resampled_telluric_corrected, header=header
)
else:
# Put the data in ImageHDUs
flux_resampled_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_resampled_telluric_corrected
)
flux_err_resampled_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_err_resampled_telluric_corrected
)
flux_sky_resampled_telluric_corrected_ImageHDU = fits.ImageHDU(
self.flux_sky_resampled_telluric_corrected
)
# Create an empty HDU list and populate with ImageHDUs
self.flux_resampled_telluric_corrected_hdulist = fits.HDUList()
self.flux_resampled_telluric_corrected_hdulist += [
flux_resampled_telluric_corrected_ImageHDU
]
self.flux_resampled_telluric_corrected_hdulist += [
flux_err_resampled_telluric_corrected_ImageHDU
]
self.flux_resampled_telluric_corrected_hdulist += [
flux_sky_resampled_telluric_corrected_ImageHDU
]
hdu_names = self.ext_name[
self.hdu_order["flux_resampled_telluric_corrected"]
]
# Note that wave_start is the centre of the starting bin
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "EXTNAME", hdu_names[0]
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "LABEL", "Flux resampled telluric corrected"
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "CRPIX1", 1.00e00
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "CTYPE1", "Wavelength"
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "CUNIT1", "Angstroms"
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "XPOSURE", self.exptime
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "GAIN", self.gain
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "RNOISE", self.readnoise
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "SEEING", self.seeing
)
self.modify_flux_resampled_telluric_corrected_header(
0, "set", "AIRMASS", self.airmass
)
self.modify_flux_resampled_telluric_corrected_header(
1,
"set",
"EXTNAME",
hdu_names[1],
)
self.modify_flux_resampled_telluric_corrected_header(
1,
"set",
"LABEL",
"Flux resampled telluric corrected (Uncertainty)",
)
self.modify_flux_resampled_telluric_corrected_header(
1, "set", "CRPIX1", 1.00e00
)
self.modify_flux_resampled_telluric_corrected_header(
1, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_telluric_corrected_header(
1, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_telluric_corrected_header(
1, "set", "CTYPE1", "Wavelength"
)
self.modify_flux_resampled_telluric_corrected_header(
1, "set", "CUNIT1", "Angstroms"
)
self.modify_flux_resampled_telluric_corrected_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_telluric_corrected_header(
2, "set", "EXTNAME", hdu_names[2]
)
self.modify_flux_resampled_telluric_corrected_header(
2, "set", "LABEL", "Flux resampled telluric corrected (Sky)"
)
self.modify_flux_resampled_telluric_corrected_header(
2, "set", "CRPIX1", 1.00e00
)
self.modify_flux_resampled_telluric_corrected_header(
2, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_telluric_corrected_header(
2, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_telluric_corrected_header(
2, "set", "CTYPE1", "Wavelength"
)
self.modify_flux_resampled_telluric_corrected_header(
2, "set", "CUNIT1", "Angstroms"
)
self.modify_flux_resampled_telluric_corrected_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning(
"flux_resampled_telluric_corrected ImageHDU cannot be created."
)
self.flux_resampled_telluric_corrected_hdulist = None
[docs]
def create_flux_resampled_atm_ext_telluric_corrected_fits(self):
"""
Create an ImageHDU for the atmospheric extinction corrected flux
calibrated spectrum at the resampled wavelength.
"""
try:
header = None
# Use the header of the standard
if self.spectrum_header is not None:
header = self.spectrum_header
if header is not None:
# Put the data and header in ImageHDUs
flux_resampled_atm_ext_telluric_corrected_ImageHDU = (
fits.ImageHDU(
self.flux_resampled_atm_ext_telluric_corrected,
header=header,
)
)
flux_err_resampled_atm_ext_telluric_corrected_ImageHDU = (
fits.ImageHDU(
self.flux_err_resampled_atm_ext_telluric_corrected,
header=header,
)
)
flux_sky_resampled_atm_ext_telluric_corrected_ImageHDU = (
fits.ImageHDU(
self.flux_sky_resampled_atm_ext_telluric_corrected,
header=header,
)
)
else:
# Put the data in ImageHDUs
flux_resampled_atm_ext_telluric_corrected_ImageHDU = (
fits.ImageHDU(
self.flux_resampled_atm_ext_telluric_corrected
)
)
flux_err_resampled_atm_ext_telluric_corrected_ImageHDU = (
fits.ImageHDU(
self.flux_err_resampled_atm_ext_telluric_corrected
)
)
flux_sky_resampled_atm_ext_telluric_corrected_ImageHDU = (
fits.ImageHDU(
self.flux_sky_resampled_atm_ext_telluric_corrected
)
)
# Create an empty HDU list and populate with ImageHDUs
self.flux_resampled_atm_ext_telluric_corrected_hdulist = (
fits.HDUList()
)
self.flux_resampled_atm_ext_telluric_corrected_hdulist += [
flux_resampled_atm_ext_telluric_corrected_ImageHDU
]
self.flux_resampled_atm_ext_telluric_corrected_hdulist += [
flux_err_resampled_atm_ext_telluric_corrected_ImageHDU
]
self.flux_resampled_atm_ext_telluric_corrected_hdulist += [
flux_sky_resampled_atm_ext_telluric_corrected_ImageHDU
]
hdu_names = self.ext_name[
self.hdu_order["flux_resampled_atm_ext_telluric_corrected"]
]
# Note that wave_start is the centre of the starting bin
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0,
"set",
"EXTNAME",
hdu_names[0],
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0,
"set",
"LABEL",
"Flux resampled atmospheric extinction telluric corrected",
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "CRPIX1", 1.00e00
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "CTYPE1", "Wavelength"
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "CUNIT1", "Angstroms"
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "XPOSURE", self.exptime
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "GAIN", self.gain
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "RNOISE", self.readnoise
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "SEEING", self.seeing
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
0, "set", "AIRMASS", self.airmass
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
1,
"set",
"EXTNAME",
hdu_names[1],
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
1,
"set",
"LABEL",
(
"Flux resampled atmospheric extinction telluric corrected"
" (Uncertainty)"
),
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
1, "set", "CRPIX1", 1.00e00
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
1, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
1, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
1, "set", "CTYPE1", "Wavelength"
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
1, "set", "CUNIT1", "Angstroms"
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
1, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
2,
"set",
"EXTNAME",
hdu_names[2],
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
2,
"set",
"LABEL",
(
"Flux resampled atmospheric extinction telluric corrected"
" (Sky)"
),
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
2, "set", "CRPIX1", 1.00e00
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
2, "set", "CDELT1", self.wave_bin
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
2, "set", "CRVAL1", self.wave_start
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
2, "set", "CTYPE1", "Wavelength"
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
2, "set", "CUNIT1", "Angstroms"
)
self.modify_flux_resampled_atm_ext_telluric_corrected_header(
2, "set", "BUNIT", "erg/(s*cm**2*Angstrom)"
)
except Exception as e:
self.logger.warning(str(e))
# Set it to None if the above failed
self.logger.warning(
"flux_resampled_atm_ext_telluric_corrected ImageHDU cannot "
"be created."
)
self.flux_resampled_atm_ext_telluric_corrected_hdulist = None
[docs]
def remove_trace_fits(self):
"""
Remove the trace FITS HDUList.
"""
self.trace_hdulist = None
[docs]
def remove_count_fits(self):
"""
Remove the count FITS HDUList.
"""
self.count_hdulist = None
[docs]
def remove_count_resampled_fits(self):
"""
Remove the count_resampled FITS HDUList.
"""
self.count_resampled_hdulist = None
[docs]
def remove_arc_spec_fits(self):
"""
Remove the arc_spec FITS HDUList.
"""
self.arc_spec_hdulist = None
[docs]
def remove_wavecal_fits(self):
"""
Remove the wavecal FITS HDUList.
"""
self.wavecal_hdulist = None
[docs]
def remove_wavelength_fits(self):
"""
Remove the wavelength FITS HDUList.
"""
self.wavelength_hdulist = None
[docs]
def remove_weight_map_fits(self):
"""
Remove the weight_map FITS HDUList.
"""
self.weight_map_hdulist = None
[docs]
def remove_flux_fits(self):
"""
Remove the flux FITS HDUList.
"""
self.flux_hdulist = None
[docs]
def remove_atm_ext_fits(self):
"""
Remove the atm_ext FITS HDUList.
"""
self.atm_ext_hdulist = None
[docs]
def remove_flux_atm_ext_corrected_fits(self):
"""
Remove the flux_resampled_atm_ext_corrected FITS HDUList.
"""
self.flux_atm_ext_corrected_hdulist = None
[docs]
def remove_telluric_profile_fits(self):
"""
Remove the telluric_profile FITS HDUList.
"""
self.telluric_profile_hdulist = None
[docs]
def remove_flux_telluric_corrected_fits(self):
"""
Remove the flux_resampled_telluric_corrected FITS HDUList.
"""
self.flux_telluric_corrected_hdulist = None
[docs]
def remove_flux_atm_ext_telluric_corrected_fits(self):
"""
Remove the flux_resampled_atm_ext_telluric_corrected FITS HDUList.
"""
self.flux_atm_ext_telluric_corrected_hdulist = None
[docs]
def remove_wavelength_resampled_fits(self):
"""
Remove the resampled wavelength FITS HDUList.
"""
self.wavelength_resampled_hdulist = None
[docs]
def remove_sensitivity_resampled_fits(self):
"""
Remove the sensitivity_resampled FITS HDUList.
"""
self.sensitivity_resampled_hdulist = None
[docs]
def remove_flux_resampled_fits(self):
"""
Remove the flux_resampled FITS HDUList.
"""
self.flux_resampled_hdulist = None
[docs]
def remove_atm_ext_resampled_fits(self):
"""
Remove the atm_ext_resampled FITS HDUList.
"""
self.atm_ext_resampled_hdulist = None
[docs]
def remove_flux_resampled_atm_ext_corrected_fits(self):
"""
Remove the flux_resampled_atm_ext_corrected FITS HDUList.
"""
self.flux_resampled_atm_ext_corrected_hdulist = None
[docs]
def remove_telluric_profile_resampled_fits(self):
"""
Remove the telluric_profile_resampled FITS HDUList.
"""
self.telluric_profile_resampled_hdulist = None
[docs]
def remove_flux_resampled_telluric_corrected_fits(self):
"""
Remove the flux_resampled_telluric_corrected FITS HDUList.
"""
self.flux_resampled_telluric_corrected_hdulist = None
[docs]
def remove_flux_resampled_atm_ext_telluric_corrected_fits(self):
"""
Remove the flux_resampled_atm_ext_telluric_corrected FITS HDUList.
"""
self.flux_resampled_atm_ext_telluric_corrected_hdulist = None
[docs]
def create_fits(
self,
output: str = "*",
recreate: bool = True,
empty_primary_hdu: bool = True,
return_hdu_list: bool = False,
):
"""
Create a HDU list, with a choice of any combination of the
data, see below the 'output' parameters for details.
Parameters
----------
output: String
Type of data to be saved, the order is fixed (in the order of
the following description), but the options are flexible. The
input strings are delimited by "+":
trace: 2 HDUs
Trace, and trace width (pixel)
count: 3 HDUs
Count, uncertainty, and sky (pixel)
weight_map: 1 HDU
Weight (pixel)
arc_spec: 1 HDU
1D arc spectrum
arc_lines: 2 HDUs
arc line position (pixel), and arc
line effective position (pixel)
wavecal_coefficients: 1 HDU
Polynomial coefficients for wavelength calibration
wavelength: 1 HDU
Wavelength of each pixel
wavelength_resampled: 1 HDU
Wavelength of each resampled position
count_resampled: 3 HDUs
Resampled Count, uncertainty, and sky (wavelength)
sensitivity: 1 HDU
Sensitivity (pixel)
flux: 3 HDUs
Flux, uncertainty, and sky (pixel)
atm_ext: 1 HDU
Atmospheric extinction correction factor
flux_atm_ext_corrected: 3 HDUs
Atmospheric extinction corrected flux, uncertainty, and
sky (pixel)
telluric_profile: 1 HDU
Telluric absorption profile
flux_telluric_corrected: 3 HDUs
Telluric corrected flux, uncertainty, and
sky (pixel)
flux_atm_ext_telluric_corrected: 3 HDUs
Atmospheric extinction and telluric corrected flux,
uncertainty, and sky (pixel)
sensitivity_resampled: 1 HDU
Sensitivity (wavelength)
flux_resampled: 4 HDUs
Flux, uncertainty, and sky (wavelength)
atm_ext_resampled: 1 HDU
Atmospheric extinction correction factor
flux_resampled_atm_ext_corrected: 3 HDUs
Atmospheric extinction corrected flux, uncertainty, and
sky (wavelength)
telluric_profile_resampled: 1 HDU
Telluric absorption profile
flux_resampled_telluic_corrected: 3 HDUs
Telluric corrected flux, uncertainty, and
sky (wavelength)
flux_resampled_atm_ext_telluric_corrected: 3 HDUs
Atmospheric extinction and telluric corrected flux,
uncertainty, and sky (wavelength)
recreate: boolean (Default: False)
Set to True to overwrite the FITS data and header.
empty_primary_hdu: boolean (Default: True)
Set to True to leave the Primary HDU blank.
return_hdu_list: boolean (Default: False)
Set to True to return the HDU List.
"""
if output == "*":
output = (
"trace+count+weight_map+arc_spec+arc_lines+"
"wavecal_coefficients+wavelength+wavelength_resampled+"
"count_resampled+sensitivity+flux+atm_ext+"
"flux_atm_ext_corrected+telluric_profile+"
"flux_telluric_corrected+flux_atm_ext_telluric_corrected+"
"sensitivity_resampled+flux_resampled+atm_ext_resampled+"
"flux_resampled_atm_ext_corrected+"
"telluric_profile_resampled+"
"flux_resampled_telluic_corrected+"
"flux_resampled_atm_ext_telluric_corrected"
)
self.logger.info(f"{output} is read.")
output_split = output.split("+")
self.logger.info(f"HDUs of {output_split} are to be created.")
# If to recreate the FITS, set all contents to False
if recreate:
for k, v in self.hdu_content.items():
self.hdu_content[k] = False
self.logger.info("HDUList is cleared.")
# If the requested list of HDUs is already good to go
if set([k for k, v in self.hdu_content.items() if v]) == set(
output_split
):
self.logger.info("HDUList is ready to go.")
# If there is an empty primary HDU, but requested without
if self.empty_primary_hdu & (not empty_primary_hdu):
self.hdu_output.pop(0)
self.empty_primary_hdu = False
# If there is not an empty primary HDU, but requested one
elif (not self.empty_primary_hdu) & empty_primary_hdu:
self.hdu_output.insert(0, fits.PrimaryHDU())
self.empty_primary_hdu = True
# Otherwise, the self.hdu_output does not need to be modified
else:
pass
# If the requested list is different or empty, (re)create the list
else:
self.hdu_output = None
self.logger.info("Populating the HDUList now.")
# Empty list for appending HDU lists
hdu_output = fits.HDUList()
# If leaving the primary HDU empty
if empty_primary_hdu:
hdu_output.append(fits.PrimaryHDU())
# Join the list(s)
if "trace" in output_split:
if not self.hdu_content["trace"]:
self.logger.info("Creating trace HDU now.")
self.create_trace_fits()
self.logger.info("Created trace HDU.")
if (self.trace_hdulist is not None) and (
self.trace is not None
):
hdu_output += self.trace_hdulist
self.hdu_content["trace"] = True
if "count" in output_split:
if not self.hdu_content["count"]:
self.logger.info("Creating count HDU now.")
self.create_count_fits()
self.logger.info("Created trace HDU.")
if (self.count_hdulist is not None) and (
self.count is not None
):
hdu_output += self.count_hdulist
self.hdu_content["count"] = True
if "weight_map" in output_split:
if not self.hdu_content["weight_map"]:
self.logger.info("Creating weight_map HDU now.")
self.create_weight_map_fits()
self.logger.info("Created weight_map HDU.")
if (self.weight_map_hdulist is not None) and (
self.var is not None
):
hdu_output += self.weight_map_hdulist
self.hdu_content["weight_map"] = True
if "arc_spec" in output_split:
if not self.hdu_content["arc_spec"]:
self.logger.info("Creating arc_spec HDU now.")
self.create_arc_spec_fits()
self.logger.info("Created arc_spec HDU.")
if (self.arc_spec_hdulist is not None) and (
self.arc_spec is not None
):
hdu_output += self.arc_spec_hdulist
self.hdu_content["arc_spec"] = True
if "arc_lines" in output_split:
if not self.hdu_content["arc_lines"]:
self.logger.info("Creating arc_lines HDU now.")
self.create_arc_lines_fits()
self.logger.info("Created HDU.")
if self.arc_lines_hdulist is not None:
hdu_output += self.arc_lines_hdulist
self.hdu_content["arc_lines"] = True
if "wavecal_coefficients" in output_split:
if not self.hdu_content["wavecal_coefficients"]:
self.logger.info("Creat wavecal_coefficients HDU now.")
self.create_wavecal_fits()
self.logger.info("Creat wavecal_coefficients HDU.")
if (self.wavecal_hdulist is not None) and (
self.fit_coeff is not None
):
hdu_output += self.wavecal_hdulist
self.hdu_content["wavecal_coefficients"] = True
if "wavelength" in output_split:
if not self.hdu_content["wavelength"]:
self.logger.info("Creating wavelength HDU now.")
self.create_wavelength_fits()
self.logger.info("Created wavelength HDU.")
if (self.wavelength_hdulist is not None) and (
self.wave is not None
):
hdu_output += self.wavelength_hdulist
self.hdu_content["wavelength"] = True
if "wavelength_resampled" in output_split:
if not self.hdu_content["wavelength_resampled"]:
self.logger.info("Creating wavelength_resampled HDU now.")
self.create_wavelength_resampled_fits()
self.logger.info("Created wavelength_resampled HDU.")
if (self.wavelength_resampled_hdulist is not None) and (
self.wave_resampled is not None
):
hdu_output += self.wavelength_resampled_hdulist
self.hdu_content["wavelength_resampled"] = True
if "count_resampled" in output_split:
if not self.hdu_content["count_resampled"]:
self.logger.info("Creating count_resampled HDU now.")
self.create_count_resampled_fits()
self.logger.info("Created count_resampled HDU.")
if (self.count_resampled_hdulist is not None) and (
self.count_resampled is not None
):
hdu_output += self.count_resampled_hdulist
self.hdu_content["count_resampled"] = True
if "sensitivity" in output_split:
if not self.hdu_content["sensitivity"]:
self.logger.info("Creating sensitivity HDU now.")
self.create_sensitivity_fits()
self.logger.info("Created sensitivity HDU.")
if (self.sensitivity_hdulist is not None) and (
self.sensitivity is not None
):
hdu_output += self.sensitivity_hdulist
self.hdu_content["sensitivity"] = True
if "flux" in output_split:
if not self.hdu_content["flux"]:
self.logger.info("Creating flux HDU now.")
self.create_flux_fits()
self.logger.info("Created flux HDU.")
if (self.flux_hdulist is not None) and (self.flux is not None):
hdu_output += self.flux_hdulist
self.hdu_content["flux"] = True
if "atm_ext" in output_split:
if not self.hdu_content["atm_ext"]:
self.logger.info("Creating atm_ext HDU now.")
self.create_atm_ext_fits()
self.logger.info("Created atm_ext HDU.")
if (self.atm_ext_hdulist is not None) and (
self.atm_ext is not None
):
hdu_output += self.atm_ext_hdulist
self.hdu_content["atm_ext"] = True
if "flux_atm_ext_corrected" in output_split:
if not self.hdu_content["flux_atm_ext_corrected"]:
self.logger.info(
"Creating flux_atm_ext_corrected HDU now."
)
self.create_flux_atm_ext_corrected_fits()
self.logger.info("Created flux_atm_ext_corrected HDU.")
if (self.flux_atm_ext_corrected_hdulist is not None) and (
self.flux_atm_ext_corrected is not None
):
hdu_output += self.flux_atm_ext_corrected_hdulist
self.hdu_content["flux_atm_ext_corrected"] = True
if "telluric_profile" in output_split:
if not self.hdu_content["telluric_profile"]:
self.logger.info("Creating telluric_profile HDU now.")
self.create_telluric_profile_fits()
self.logger.info("Created telluric_profile HDU.")
if (self.telluric_profile_hdulist is not None) and (
self.telluric_profile is not None
):
hdu_output += self.telluric_profile_hdulist
self.hdu_content["telluric_profile"] = True
if "flux_telluric_corrected" in output_split:
if not self.hdu_content["flux_telluric_corrected"]:
self.logger.info(
"Creating flux_telluric_corrected HDU now."
)
self.create_flux_telluric_corrected_fits()
self.logger.info("Created flux_telluric_corrected HDU.")
if (self.flux_telluric_corrected_hdulist is not None) and (
self.flux_telluric_corrected is not None
):
hdu_output += self.flux_telluric_corrected_hdulist
self.hdu_content["flux_telluric_corrected"] = True
if "flux_atm_ext_telluric_corrected" in output_split:
if not self.hdu_content["flux_atm_ext_telluric_corrected"]:
self.logger.info(
"Creating flux_atm_ext_telluric_corrected HDU now."
)
self.create_flux_atm_ext_telluric_corrected_fits()
self.logger.info(
"Created flux_atm_ext_telluric_corrected HDU."
)
if (
self.flux_atm_ext_telluric_corrected_hdulist is not None
) and (self.flux_atm_ext_telluric_corrected is not None):
hdu_output += self.flux_atm_ext_telluric_corrected_hdulist
self.hdu_content["flux_atm_ext_telluric_corrected"] = True
if "sensitivity_resampled" in output_split:
if not self.hdu_content["sensitivity_resampled"]:
self.logger.info("Creating sensitivity_resampled HDU now.")
self.create_sensitivity_resampled_fits()
self.logger.info("Created sensitivity_resampled HDU.")
if (self.sensitivity_resampled_hdulist is not None) and (
self.sensitivity_resampled is not None
):
hdu_output += self.sensitivity_resampled_hdulist
self.hdu_content["sensitivity_resampled"] = True
if "flux_resampled" in output_split:
if not self.hdu_content["flux_resampled"]:
self.logger.info("Creating flux_resampled HDU now.")
self.create_flux_resampled_fits()
self.logger.info("Created flux_resampled HDU.")
if (self.flux_resampled_hdulist is not None) and (
self.flux_resampled is not None
):
hdu_output += self.flux_resampled_hdulist
self.hdu_content["flux_resampled"] = True
if "atm_ext_resampled" in output_split:
if not self.hdu_content["atm_ext_resampled"]:
self.logger.info("Creating atm_ext_resampled HDU now.")
self.create_atm_ext_resampled_fits()
self.logger.info("Created atm_ext_resampled HDU.")
if (self.atm_ext_resampled_hdulist is not None) and (
self.atm_ext_resampled is not None
):
hdu_output += self.atm_ext_resampled_hdulist
self.hdu_content["atm_ext_resampled"] = True
if "flux_resampled_atm_ext_corrected" in output_split:
if not self.hdu_content["flux_resampled_atm_ext_corrected"]:
self.logger.info(
"Creating flux_resampled_atm_ext_corrected HDU now."
)
self.create_flux_resampled_atm_ext_corrected_fits()
self.logger.info(
"Created flux_resampled_atm_ext_corrected HDU."
)
if (
self.flux_resampled_atm_ext_corrected_hdulist is not None
) and (self.flux_resampled_atm_ext_corrected is not None):
hdu_output += self.flux_resampled_atm_ext_corrected_hdulist
self.hdu_content["flux_resampled_atm_ext_corrected"] = True
if "telluric_profile_resampled" in output_split:
if not self.hdu_content["telluric_profile_resampled"]:
self.logger.info(
"Creating telluric_profile_resampled HDU now."
)
self.create_telluric_profile_resampled_fits()
self.logger.info("Created telluric_profile_resampled HDU.")
if (self.telluric_profile_resampled_hdulist is not None) and (
self.telluric_profile_resampled is not None
):
hdu_output += self.telluric_profile_resampled_hdulist
self.hdu_content["telluric_profile_resampled"] = True
if "flux_resampled_telluric_corrected" in output_split:
if not self.hdu_content["flux_resampled_telluric_corrected"]:
self.logger.info(
"Creating flux_resampled_telluric_corrected HDU now."
)
self.create_flux_resampled_telluric_corrected_fits()
self.logger.info(
"Created flux_resampled_telluric_corrected HDU."
)
if (
self.flux_resampled_telluric_corrected_hdulist is not None
) and (self.flux_resampled_telluric_corrected is not None):
hdu_output += (
self.flux_resampled_telluric_corrected_hdulist
)
self.hdu_content[
"flux_resampled_telluric_corrected"
] = True
if "flux_resampled_atm_ext_telluric_corrected" in output_split:
if not self.hdu_content[
"flux_resampled_atm_ext_telluric_corrected"
]:
self.logger.info(
"Creating flux_resampled_atm_ext_telluric_corrected HDU"
" now."
)
self.create_flux_resampled_atm_ext_telluric_corrected_fits()
self.logger.info(
"Created flux_resampled_atm_ext_telluric_corrected HDU."
)
if (
self.flux_resampled_atm_ext_telluric_corrected_hdulist
is not None
) and (
self.flux_resampled_atm_ext_telluric_corrected is not None
):
hdu_output += (
self.flux_resampled_atm_ext_telluric_corrected_hdulist
)
self.hdu_content[
"flux_resampled_atm_ext_telluric_corrected"
] = True
# If the primary HDU is not chosen to be empty
if empty_primary_hdu:
hdu_output.update_extend()
self.empty_primary_hdu = True
else:
# Convert the first HDU to PrimaryHDU
hdu_output[0] = fits.PrimaryHDU(
hdu_output[0].data, hdu_output[0].header
)
hdu_output.update_extend()
self.empty_primary_hdu = False
self.hdu_output = hdu_output
if return_hdu_list:
return self.hdu_output
[docs]
def save_fits(
self,
output: str,
filename: str,
overwrite: bool = False,
recreate: bool = True,
empty_primary_hdu: bool = True,
create_folder: bool = False,
):
"""
Save the reduced data to disk, with a choice of any combination of the
data, see below the 'output' parameters for details.
Parameters
----------
output: String
Type of data to be saved, the order is fixed (in the order of
the following description), but the options are flexible. The
input strings are delimited by "+",
trace: 2 HDUs
Trace, and trace width (pixel)
count: 3 HDUs
Count, uncertainty, and sky (pixel)
weight_map: 1 HDU
Weight (pixel)
arc_spec: 1 HDU
1D arc spectrum
arc_lines: 2 HDUs
Arc line position (pixel), and arc line effective
position (pixel)
wavecal: 1 HDU
Polynomial coefficients for wavelength calibration
wavelength: 1 HDU
Wavelength of each pixel
wavelength_resampled: 1 HDU
Wavelength of each resampled position
count_resampled: 3 HDUs
Resampled Count, uncertainty, and sky (wavelength)
sensitivity: 1 HDU
Sensitivity (pixel)
flux: 3 HDUs
Flux, uncertainty, and sky (pixel)
atm_ext: 1 HDU
Atmospheric extinction correction factor
flux_atm_ext_corrected: 3 HDUs
Atmospheric extinction corrected flux, uncertainty, and
sky (pixel)
telluric_profile: 1 HDU
Telluric absorption profile
flux_telluric_corrected: 3 HDUs
Telluric corrected flux, uncertainty, and
sky (pixel)
flux_atm_ext_telluric_corrected: 3 HDUs
Atmospheric extinction and telluric corrected flux,
uncertainty, and sky (pixel)
sensitivity_resampled: 1 HDU
Sensitivity (wavelength)
flux_resampled: 4 HDUs
Flux, uncertainty, and sky (wavelength)
atm_ext_resampled: 1 HDU
Atmospheric extinction correction factor
flux_resampled_atm_ext_corrected: 3 HDUs
Atmospheric extinction corrected flux, uncertainty, and
sky (wavelength)
telluric_profile_resampled: 1 HDU
Telluric absorption profile
flux_resampled_telluic_corrected: 3 HDUs
Telluric corrected flux, uncertainty, and
sky (wavelength)
flux_resampled_atm_ext_telluric_corrected: 3 HDUs
Atmospheric extinction and telluric corrected flux,
uncertainty, and sky (wavelength)
filename: str
Filename for the output, all of them will share the same name but
will have different extension.
overwrite: boolean
Default is False.
recreate: boolean (Default: False)
Set to True to overwrite the FITS data and header.
empty_primary_hdu: boolean (Default: True)
Set to True to leave the Primary HDU blank (Default: True)
create_folder: boolean (Default: False)
Create folder if not exist. Use with caution.
"""
self.create_fits(
output, recreate=recreate, empty_primary_hdu=empty_primary_hdu
)
# create the director if not exist
if create_folder:
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
# Save file to disk
if os.path.splitext(filename)[-1].lower() in [".fits", ".fit", ".fts"]:
self.hdu_output.writeto(
filename, overwrite=overwrite, output_verify="fix+ignore"
)
else:
self.hdu_output.writeto(
filename + ".fits",
overwrite=overwrite,
output_verify="fix+ignore",
)
[docs]
def save_csv(
self,
output: str,
filename: str,
overwrite: bool = False,
recreate: bool = True,
create_folder: bool = False,
):
"""
Save the reduced data to disk, with a choice of any combination of the
5 sets of data, see below the 'output' parameters for details.
Parameters
----------
output: String
Type of data to be saved, the order is fixed (in the order of
the following description), but the options are flexible. The
input strings are delimited by "+",
trace: 2 HDUs
Trace, and trace width (pixel)
count: 3 HDUs
Count, uncertainty, and sky (pixel)
weight_map: 1 HDU
Weight (pixel)
arc_spec: 1 HDU
1D arc spectrum
arc_lines: 2 HDUs
Arc line position (pixel), and arc line effective
position (pixel)
wavecal: 1 HDU
Polynomial coefficients for wavelength calibration
wavelength: 1 HDU
Wavelength of each pixel
wavelength_resampled: 1 HDU
Wavelength of each resampled position
count_resampled: 3 HDUs
Resampled Count, uncertainty, and sky (wavelength)
sensitivity: 1 HDU
Sensitivity (pixel)
flux: 3 HDUs
Flux, uncertainty, and sky (pixel)
atm_ext: 1 HDU
Atmospheric extinction correction factor
flux_atm_ext_corrected: 3 HDUs
Atmospheric extinction corrected flux, uncertainty, and
sky (pixel)
telluric_profile: 1 HDU
Telluric absorption profile
flux_telluric_corrected: 3 HDUs
Telluric corrected flux, uncertainty, and
sky (pixel)
flux_atm_ext_telluric_corrected: 3 HDUs
Atmospheric extinction and telluric corrected flux,
uncertainty, and sky (pixel)
sensitivity_resampled: 1 HDU
Sensitivity (wavelength)
flux_resampled: 4 HDUs
Flux, uncertainty, and sky (wavelength)
atm_ext_resampled: 1 HDU
Atmospheric extinction correction factor
flux_resampled_atm_ext_corrected: 3 HDUs
Atmospheric extinction corrected flux, uncertainty, and
sky (wavelength)
telluric_profile_resampled: 1 HDU
Telluric absorption profile
flux_resampled_telluic_corrected: 3 HDUs
Telluric corrected flux, uncertainty, and
sky (wavelength)
flux_resampled_atm_ext_telluric_corrected: 3 HDUs
Atmospheric extinction and telluric corrected flux,
uncertainty, and sky (wavelength)
filename: String
Disk location to be written to. Default is at where the
process/subprocess is execuated.
overwrite: boolean
Default is False.
recreate: boolean (Default: False)
Set to True to overwrite the FITS data and header.
create_folder: boolean (Default: False)
Create folder if not exist. Use with caution.
"""
self.create_fits(output, recreate=recreate, empty_primary_hdu=False)
# create the director if not exist
if create_folder:
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
output_split = output.split("+")
start = 0
for output_type in self.hdu_order.keys():
if (output_type in output_split) & self.hdu_content[output_type]:
logging.info(f"Exporting {output_type} to CSV.")
end = start + self.n_hdu[output_type]
logging.info(f"The HDU index is from {start} to {end - 1}.")
if output_type != "arc_lines":
output_data = np.column_stack(
[hdu.data for hdu in self.hdu_output[start:end]]
)
if overwrite or (
not os.path.exists(
filename + "_" + output_type + ".csv"
)
):
np.savetxt(
filename + "_" + output_type + ".csv",
output_data,
delimiter=",",
header=self.header[output_type],
)
else:
self.logger.warning(
filename
+ f"_{output_type}.csv cannot be saved to disk. "
+ "Please check the path and/or set overwrite to "
+ "True."
)
else:
output_data_arc_peaks = self.hdu_output[start].data
output_data_arc_peaks_refined = self.hdu_output[
start + 1
].data
if overwrite or (
not os.path.exists(filename + "_arc_peaks.csv")
):
np.savetxt(
f"{filename}_arc_peaks.csv",
output_data_arc_peaks,
delimiter=",",
header=self.header[output_type],
)
else:
self.logger.warning(
f"{filename}_arc_peaks.csv cannot be saved to "
+ "disk. Please check the path and/or set "
+ "overwrite to True."
)
if overwrite or (
not os.path.exists(filename + "_arc_peaks_refined.csv")
):
np.savetxt(
filename + "_arc_peaks_refined.csv",
output_data_arc_peaks_refined,
delimiter=",",
header=self.header[output_type],
)
else:
self.logger.warning(
f"{filename}_arc_peaks_refined.csv cannot be "
+ "saved to disk. Please check the path and/or "
+ "set overwrite to True."
)
start = end
logging.info(f"Exported {output_type} to CSV successfully.")