#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Core of the Reducer class to perform image reduction"""
import copy
import datetime
import logging
import os
from typing import Union
import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.nddata import CCDData
from astroscrappy import detect_cosmics
from ccdproc import Combiner
from plotly import graph_objects as go
from plotly import io as pio
from .util import bfixpix, create_bad_pixel_mask, create_cutoff_mask
__all__ = ["ImageReduction", "ImageReducer"]
class ImageReducer:
"""
This class is not intented for quality data reduction, it exists for
completeness such that users can produce a minimal pipeline with
a single pacakge. Users should preprocess calibration frames, for
example, we cannot properly combine arc frames taken with long and
short exposures for wavelength calibration with both bright and faint
lines; fringing correction with flat frames; light frames with various
exposure times.
"""
def __init__(
self,
verbose: bool = True,
logger_name: str = "Reducer",
log_level: str = "info",
log_file_folder: str = "default",
log_file_name: str = None,
):
"""
The initialisation only sets up the logger.
Parameters
----------
verbose: boolean (Default: True)
Set to False to suppress all verbose warnings, except for
critical failure.
logger_name: str (Default: ImageReduction)
This will set the name of the logger, if the name is used already,
it will reference to the existing logger. This will be the
first part of the default log file name unless log_file_name is
provided.
log_level: str (Default: 'INFO')
Four levels of logging are available, in decreasing order of
information and increasing order of severity: (1) DEBUG, (2) INFO,
(3) WARNING, (4) ERROR and (5) CRITICAL. WARNING means that
there is suboptimal operations in some parts of that step. ERROR
means that the requested operation cannot be performed, but the
software can handle it by either using the default setting or
skipping the operation. CRITICAL means that the requested
operation cannot be resolved without human interaction, this is
most usually coming from missing data.
log_file_folder: None or str (Default: "default")
Folder in which the file is save, set to default to save to the
current path.
log_file_name: None or str (Default: None)
File name of the log, set to None to print to screen only.
"""
# Set-up logger
self.logger = logging.getLogger(logger_name)
if not isinstance(verbose, bool):
raise ValueError("Please provide a boolean value for `verbose`.")
if (log_level.lower() == "critical") or (not verbose):
self.logger.setLevel(logging.CRITICAL)
elif log_level.lower() == "error":
self.logger.setLevel(logging.ERROR)
elif log_level.lower() == "warning":
self.logger.setLevel(logging.WARNING)
elif log_level.lower() == "info":
self.logger.setLevel(logging.INFO)
elif log_level.lower() == "debug":
self.logger.setLevel(logging.DEBUG)
else:
raise ValueError(f"Unknonw logging level: {log_level}.")
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%a, %d %b %Y %H:%M:%S",
)
if log_file_name is None:
# Only print log to screen
self.handler = logging.StreamHandler()
else:
if log_file_name == "default":
t_str = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
log_file_name = f"{logger_name}_{t_str}.log"
# Save log to file
if log_file_folder == "default":
log_file_folder = ""
self.handler = logging.FileHandler(
os.path.join(log_file_folder, log_file_name), "a+"
)
self.handler.setFormatter(formatter)
if self.logger.hasHandlers():
self.logger.handlers.clear()
self.logger.addHandler(self.handler)
self.saxis_default = 1
self.combinetype_light_default = "median"
self.sigma_clipping_light_default = True
self.clip_low_light_default = 5.0
self.clip_high_light_default = 5.0
self.exptime_light_default = 1.0
self.exptime_light_keyword_default = None
self.combinetype_dark_default = "median"
self.sigma_clipping_dark_default = True
self.clip_low_dark_default = 5.0
self.clip_high_dark_default = 5.0
self.exptime_dark_default = 1.0
self.exptime_dark_keyword_default = None
self.combinetype_flat_default = "median"
self.sigma_clipping_flat_default = True
self.clip_low_flat_default = 5.0
self.clip_high_flat_default = 5.0
self.exptime_flat_default = 1.0
self.exptime_flat_keyword_default = None
self.combinetype_bias_default = "median"
self.sigma_clipping_bias_default = False
self.clip_low_bias_default = 5.0
self.clip_high_bias_default = 5.0
self.combinetype_arc_default = "median"
self.sigma_clipping_arc_default = False
self.clip_low_arc_default = 5.0
self.clip_high_arc_default = 5.0
self.cosmicray_default = False
self.gain_default = 1.0
self.readnoise_default = 0.0
self.fsmode_default = "convolve"
self.psfmodel_default = "gaussy"
self.cr_kwargs_default = {}
self.heal_pixels_default = False
self.cutoff_default = 60000.0
self.grow_default = False
self.iterations_default = 1
self.diagonal_default = False
# FITS keyword standard recommends XPOSURE, but most observatories
# use EXPTIME for supporting iraf. Also included a few other keywords
# which are the proxy-exposure times at best. ASPIRED will use the
# first keyword found on the list, if all failed, an exposure time of
# 1 second will be applied. A warning will be promted.
self.exptime_keyword_list = [
"XPOSURE",
"EXPOSURE",
"EXPTIME",
"EXPOSED",
"TELAPSED",
"ELAPSED",
]
self.saxis = self.saxis_default
self.combinetype_light = self.combinetype_light_default
self.sigma_clipping_light = self.sigma_clipping_light_default
self.clip_low_light = self.clip_low_light_default
self.clip_high_light = self.clip_high_light_default
self.exptime_light = self.exptime_light_default
self.exptime_light_keyword = self.exptime_light_keyword_default
self.combinetype_dark = self.combinetype_dark_default
self.sigma_clipping_dark = self.sigma_clipping_dark_default
self.clip_low_dark = self.clip_low_dark_default
self.clip_high_dark = self.clip_high_dark_default
self.exptime_dark = self.exptime_dark_default
self.exptime_dark_keyword = self.exptime_dark_keyword_default
self.combinetype_flat = self.combinetype_flat_default
self.sigma_clipping_flat = self.sigma_clipping_flat_default
self.clip_low_flat = self.clip_low_flat_default
self.clip_high_flat = self.clip_high_flat_default
self.exptime_flat = self.exptime_flat_default
self.exptime_flat_keyword = self.exptime_flat_keyword_default
self.combinetype_bias = self.combinetype_bias_default
self.sigma_clipping_bias = self.sigma_clipping_bias_default
self.clip_low_bias = self.clip_low_bias_default
self.clip_high_bias = self.clip_high_bias_default
self.combinetype_arc = self.combinetype_arc_default
self.sigma_clipping_arc = self.sigma_clipping_arc_default
self.clip_low_arc = self.clip_low_arc_default
self.clip_high_arc = self.clip_high_arc_default
self.cosmicray = self.cosmicray_default
self.gain = self.gain_default
self.readnoise = self.readnoise_default
self.fsmode = self.fsmode_default
self.psfmodel = self.psfmodel_default
self.cr_kwargs = self.cr_kwargs_default
self.heal_pixels = self.heal_pixels_default
self.cutoff = self.cutoff_default
self.grow = self.grow_default
self.iterations = self.iterations_default
self.diagonal = self.diagonal_default
self.bias_list = []
self.dark_list = []
self.flat_list = []
self.arc_list = []
self.light = []
self.bias_main = None
self.dark_main = None
self.flat_main = None
self.arc_main = None
self.light_main = None
self.flat_reduced = None
self.light_reduced = None
self.image_fits = None
self.bad_pixel_mask = None
self.bad_pixels = False
self.saturation_mask = None
self.saturated = False
self.bad_mask = None
self.pixel_healed = False
self.light_filename = []
self.arc_filename = []
self.dark_filename = []
self.flat_filename = []
self.bias_filename = []
self.light_ccddata = []
self.arc_ccddata = []
self.dark_ccddata = []
self.flat_ccddata = []
self.bias_ccddata = []
self.light_header = []
self.arc_header = []
self.dark_header = []
self.flat_header = []
self.bias_header = []
self.light_time = []
self.arc_time = []
self.dark_time = []
self.flat_time = []
self.light_total_time = 0.0
self.dark_total_time = 0.0
self.flat_total_time = 0.0
self.light_average_time = 0.0
self.light_stack_weighted = False
self.dark_stack_weighted = False
self.flat_stack_weighted = False
# For the filelist
self.delimiter = None
self.ftype = None
self.filelist = None
self.filelist_abspath = None
self.imtype = None
self.impath = None
self.hdunum = None
self.bias_list = None
self.dark_list = None
self.flat_list = None
self.arc_list = None
self.light_list = None
self.bias_hdunum = None
self.dark_hdunum = None
self.flat_hdunum = None
self.arc_hdunum = None
self.light_hdunum = None
def add_filelist(
self,
filelist: str,
ftype: str = "csv",
delimiter: str = None,
auto_load: bool = True,
):
"""
If a field-flattened 2D spectrum is already avaialble, it can be
the only listed item. Set it as a 'light' frame.
Parameters
----------
filelist: str
File location, does not support URL
ftype: str (Default: "csv")
One of csv, tsv and ascii. Default is csv.
delimiter: str (Default: None)
Set the delimiter. This overrides ftype.
auto_load: bool (Default: True)
Set to load the files automatically, this will overwrite the
filepath and data that are already loaded. This will also skip
the set_properties().
"""
if delimiter is not None:
self.delimiter = delimiter
self.ftype = None
else:
self.ftype = ftype
if ftype == "csv":
self.delimiter = ","
elif ftype == "tsv":
self.delimiter = "\t"
elif ftype == "ascii":
self.delimiter = " "
# import file with first column as image type and second column as
# file path
if isinstance(filelist, str):
if os.path.isabs(filelist):
self.filelist = filelist
else:
self.filelist = os.path.abspath(filelist)
self.logger.debug("The filelist is: %s", self.filelist)
# Check if running on Windows
if os.name == "nt":
self.filelist_abspath = self.filelist.rsplit("\\", 1)[0]
else:
self.filelist_abspath = self.filelist.rsplit("/", 1)[0]
self.logger.debug(
"The absolute path of the filelist is: %s",
self.filelist_abspath,
)
self.logger.info("Loading filelist from %s.", self.filelist)
self.filelist = np.loadtxt(
self.filelist, delimiter=self.delimiter, dtype="U", ndmin=2
)
if np.shape(self.filelist)[1] == 3:
self.logger.debug("filelist contains 3 columns.")
elif np.shape(self.filelist)[1] == 2:
self.logger.debug("filelist contains 2 column.")
else:
error_msg = (
"Please provide a text file with 2 or 3 "
+ "columns: where the first column is the image type "
+ "and the second column is the file path, and optional "
+ "third column being the #HDU."
)
self.logger.critical(error_msg)
raise RuntimeError(error_msg)
elif isinstance(filelist, np.ndarray):
self.filelist = filelist
self.filelist_abspath = ""
self.logger.info("Loading filelist from an numpy.ndarray.")
if np.shape(self.filelist)[1] == 3:
self.logger.debug("filelist contains 3 columns.")
elif np.shape(self.filelist)[1] == 2:
self.logger.debug("filelist contains 2 columns.")
else:
error_msg = (
"Please provide a numpy.ndarray with at "
+ "least 2 columns: where the first column is the "
+ "image type and the second column is the file "
+ "path, and optional third column being the #HDU."
)
self.logger.critical(error_msg)
raise RuntimeError(error_msg)
else:
error_msg = "Please provide a file path to the file list."
self.logger.critical(error_msg)
raise TypeError(error_msg)
if np.shape(self.filelist)[1] == 3:
self.logger.debug("filelist contains 3 columns.")
self.imtype = self.filelist[:, 0].astype("object")
self.impath = self.filelist[:, 1].astype("object")
self.hdunum = self.filelist[:, 2].astype("int")
else:
self.logger.debug("filelist contains 2 columns.")
self.imtype = self.filelist[:, 0].astype("object")
self.impath = self.filelist[:, 1].astype("object")
self.hdunum = np.zeros(len(self.imtype)).astype("int")
for i, path_i in enumerate(self.impath):
if not os.path.isabs(path_i):
self.impath[i] = os.path.join(
self.filelist_abspath, path_i.strip()
)
self.logger.debug(self.impath[i])
# If there are multiple rows, which is in most of the cases
self.bias_list = self.impath[self.imtype == "bias"]
self.dark_list = self.impath[self.imtype == "dark"]
self.flat_list = self.impath[self.imtype == "flat"]
self.arc_list = self.impath[self.imtype == "arc"]
self.light_list = self.impath[self.imtype == "light"]
self.bias_hdunum = self.hdunum[self.imtype == "bias"]
self.dark_hdunum = self.hdunum[self.imtype == "dark"]
self.flat_hdunum = self.hdunum[self.imtype == "flat"]
self.arc_hdunum = self.hdunum[self.imtype == "arc"]
self.light_hdunum = self.hdunum[self.imtype == "light"]
# Load the data
if auto_load:
self.load_data()
def set_saxis(self, saxis: int = None):
"""
Set the orientation of the image.
Parameters
----------
saxis: 0, 1 or None
0 for a spectrum going horizontally, 1 for vertically.
"""
if saxis is None:
if "SAXIS" in self.light_ccddata[0].header:
self.saxis = int(self.light_ccddata[0].header["SAXIS"])
else:
self.saxis = 1
elif np.in1d(saxis, [0, 1]).any():
self.saxis = saxis
else:
self.saxis = 1
self.logger.info(f"Saxis is found/set to be {self.saxis}.")
def set_properties(
self,
saxis: int = -1,
combinetype_light: Union[int, str] = -1,
sigma_clipping_light: Union[int, float] = -1,
clip_low_light: Union[int, float] = -1,
clip_high_light: Union[int, float] = -1,
exptime_light: Union[int, float] = -1,
exptime_light_keyword: Union[int, str] = -1,
combinetype_arc: Union[int, str] = -1,
sigma_clipping_arc: Union[int, float] = -1,
clip_low_arc: Union[int, float] = -1,
clip_high_arc: Union[int, float] = -1,
combinetype_dark: Union[int, str] = -1,
sigma_clipping_dark: Union[int, float] = -1,
clip_low_dark: Union[int, float] = -1,
clip_high_dark: Union[int, float] = -1,
exptime_dark: Union[int, float] = -1,
exptime_dark_keyword: Union[int, str] = -1,
combinetype_bias: Union[int, str] = -1,
sigma_clipping_bias: Union[int, float] = -1,
clip_low_bias: Union[int, float] = -1,
clip_high_bias: Union[int, float] = -1,
combinetype_flat: Union[int, str] = -1,
sigma_clipping_flat: Union[int, float] = -1,
clip_low_flat: Union[int, float] = -1,
clip_high_flat: Union[int, float] = -1,
exptime_flat: Union[int, float] = -1,
exptime_flat_keyword: Union[int, str] = -1,
cosmicray: Union[int, bool] = -1,
gain: Union[int, float] = -1,
readnoise: Union[int, float] = -1,
fsmode: Union[int, str] = -1,
psfmodel: Union[int, str] = -1,
heal_pixels: Union[int, bool] = -1,
cutoff: Union[int, float] = -1,
grow: Union[int, bool] = -1,
iterations: int = -1,
diagonal: Union[int, bool] = -1,
**kwargs,
):
"""
Parameters
----------
sxais: int, 0 or 1 (Default: None)
OVERRIDE the SAXIS value in the FITS header, or to provide the
SAXIS if it does not exist
combinetype_light: str (Default: 'median')
'average' or 'median' for CCDproc.Combiner.average_combine() and
CCDproc.Combiner.median_combine(). All the frame types follow
the same combinetype.
sigma_clipping_light: bool (Default: True)
Perform sigma clipping if set to True. sigma is computed with the
numpy.ma.std method
clip_low_light: float (Default: 5)
Set the lower threshold of the sigma clipping
clip_high_light: float (Default: 5)
Set the upper threshold of the sigma clipping
exptime_light: float (Default: -1)
OVERRIDE the exposure time value in the FITS header, or to provide
one if the keyword does not exist
exptime_light_keyword: str (Default: -1)
HDU keyword for the exposure time of the light frame
combinetype_dark: str (Default: 'median')
'average' or 'median' for CCDproc.Combiner.average_combine() and
CCDproc.Combiner.median_combine(). All the frame types follow
the same combinetype.
sigma_clipping_dark: bool (Default: True)
Perform sigma clipping if set to True. sigma is computed with the
numpy.ma.std method
clip_low_dark: float (Default: 5)
Set the lower threshold of the sigma clipping
clip_high_dark: float (Default: 5)
Set the upper threshold of the sigma clipping
exptime_dark: float (Default: -1)
OVERRIDE the exposure time value in the FITS header, or to provide
one if the keyword does not exist
exptime_dark_keyword: str (Default: -1)
HDU keyword for the exposure time of the dark frame
combinetype_bias: str (Default: 'median')
'average' or 'median' for CCDproc.Combiner.average_combine() and
CCDproc.Combiner.median_combine(). All the frame types follow
the same combinetype.
sigma_clipping_bias: bool (Default: False)
Perform sigma clipping if set to True. sigma is computed with the
numpy.ma.std method
clip_low_bias: float (Default: 5)
Set the lower threshold of the sigma clipping
clip_high_bias: float (Default: 5)
Set the upper threshold of the sigma clipping
combinetype_flat: str (Default: 'median')
'average' or 'median' for CCDproc.Combiner.average_combine() and
CCDproc.Combiner.median_combine(). All the frame types follow
the same combinetype.
sigma_clipping_flat: bool (Default: True)
Perform sigma clipping if set to True. sigma is computed with the
numpy.ma.std method
clip_low_flat: float (Default: 5)
Set the lower threshold of the sigma clipping
clip_high_flat: float (Default: 5)
Set the upper threshold of the sigma clipping
exptime_flat: float (Default: -1)
OVERRIDE the exposure time value in the FITS header, or to provide
one if the keyword does not exist
exptime_flat_keyword: str (Default: -1)
HDU keyword for the exposure time of the flat frame
cosmicray: bool (Default: False)
Set to True to remove cosmic rays, this directly alter the reduced
image data. We only explicitly include the 4 most important
parameters in this function: `gain`, `readnoise`, `fsmode`, and
`psfmodel`, the rest can be configured with kwargs.
gain: float (Default: 1.0)
Gain of the image (electrons / ADU). We always need to work in
electrons for cosmic ray detection.
readnoise: float (Default: 0.0)
Read noise of the image (electrons). Used to generate the noise
model of the image.
fsmode: str (Default: 'convolve')
Method to build the fine structure image: `median`: Use the median
filter in the standard LA Cosmic algorithm. `convolve`: Convolve
the image with the psf kernel to calculate the fine structure
image.
psfmodel: str (Default: 'gaussy')
Model to use to generate the psf kernel if fsmode is `convolve`
and psfk is None. The current choices are Gaussian and Moffat
profiles. `gauss` and 'moffat' produce circular PSF kernels. The
`gaussx` and `gaussy` produce Gaussian kernels in the x and y
directions respectively. `gaussxy` and `gaussyx` apply the
Gaussian kernels in the x then the y direction, and first y then
x direction, respectively.
heal_pixels: bool (Deafult: False)
Set to True to attempt to heal bad pixels by taking the median
value of neighbouring pixels.
cutoff: float (Default: 60000.)
This sets the (lower and) upper limit of electron count.
grow: bool (Default: False)
Set to True to grow the mask, see `grow_mask()`
iterations: int (Default: 1)
The number of pixel growth along the Cartesian axes directions.
diagonal: bool (Default: False)
Set to True to grow in the diagonal directions.
**kwargs:
Extra keyword arguments for the astroscrappy.detect_cosmics:
https://astroscrappy.readthedocs.io/en/latest/api/
astroscrappy.detect_cosmics.html
The default setting is::
astroscrappy.detect_cosmics(indat, inmask=None, bkg=None,
var=None, sigclip=4.5, sigfrac=0.3, objlim=5.0, gain=1.0,
readnoise=6.5, satlevel=65536.0, niter=4, sepmed=True,
cleantype='meanmask', fsmode='median', psfmodel='gauss',
psffwhm=2.5, psfsize=7, psfk=None, psfbeta=4.765,
verbose=False)
"""
self.saxis = saxis
self.set_light_properties(
combinetype_light=combinetype_light,
sigma_clipping_light=sigma_clipping_light,
clip_low_light=clip_low_light,
clip_high_light=clip_high_light,
exptime_light=exptime_light,
exptime_light_keyword=exptime_light_keyword,
)
self.set_dark_properties(
combinetype_dark=combinetype_dark,
sigma_clipping_dark=sigma_clipping_dark,
clip_low_dark=clip_low_dark,
clip_high_dark=clip_high_dark,
exptime_dark=exptime_dark,
exptime_dark_keyword=exptime_dark_keyword,
)
self.set_flat_properties(
combinetype_flat=combinetype_flat,
sigma_clipping_flat=sigma_clipping_flat,
clip_low_flat=clip_low_flat,
clip_high_flat=clip_high_flat,
exptime_flat=exptime_flat,
exptime_flat_keyword=exptime_flat_keyword,
)
self.set_bias_properties(
combinetype_bias=combinetype_bias,
sigma_clipping_bias=sigma_clipping_bias,
clip_low_bias=clip_low_bias,
clip_high_bias=clip_high_bias,
)
self.set_arc_properties(
combinetype_arc=combinetype_arc,
sigma_clipping_arc=sigma_clipping_arc,
clip_low_arc=clip_low_arc,
clip_high_arc=clip_high_arc,
)
self.set_cosmic_properties(
cosmicray=cosmicray,
fsmode=fsmode,
psfmodel=psfmodel,
kwargs=kwargs,
)
self.set_detector_properties(
gain=gain,
readnoise=readnoise,
heal_pixels=heal_pixels,
cutoff=cutoff,
grow=grow,
iterations=iterations,
diagonal=diagonal,
)
def set_light_properties(
self,
combinetype_light: Union[int, str] = -1,
sigma_clipping_light: Union[int, float] = -1,
clip_low_light: Union[int, float] = -1,
clip_high_light: Union[int, float] = -1,
exptime_light: Union[int, float] = -1,
exptime_light_keyword: Union[int, str] = -1,
):
"""
Set the properties of the light frame. -1 means stay the same, None
means use the default value, and any other valid input for the
respective argument. See set_properties() for the argument
description.
"""
# combinetype_light
if combinetype_light is None:
self.combinetype_light = self.combinetype_light_default
self.logger.warning(
"Unknown combinetype_light, it is set as "
+ f"{self.combinetype_light}."
)
elif isinstance(combinetype_light, (float, int)):
pass
elif isinstance(combinetype_light, str):
if combinetype_light in ["average", "median"]:
# use the given readnoise value
self.combinetype_light = combinetype_light
self.logger.info(
f"combinetype_light is set to {self.combinetype_light}."
)
else:
self.combinetype_light = self.combinetype_light_default
self.logger.warning(
"Unknown combinetype_light, it is set as "
+ f"{self.combinetype_light}."
)
else:
self.combinetype_light = self.combinetype_light_default
self.logger.warning(
"Unknown combinetype_light, it is set as "
+ f"{self.combinetype_light}.",
)
# sigma_clipping_light
if sigma_clipping_light is None:
self.sigma_clipping_light = self.sigma_clipping_light_default
self.logger.warning(
"Unknown sigma_clipping_light, it is set to "
+ f"{self.sigma_clipping_light}.",
)
elif isinstance(sigma_clipping_light, (float, int)):
pass
elif isinstance(sigma_clipping_light, bool):
self.sigma_clipping_light = sigma_clipping_light
else:
self.sigma_clipping_light = self.sigma_clipping_light_default
self.logger.warning(
"Unknown sigma_clipping_light, it is set to %s.",
self.sigma_clipping_light,
)
# clip_low_light
if clip_low_light is None:
self.clip_low_light = self.clip_low_light_default
self.logger.warning(
"Unknown sigma_clipping_light, it is set to %s.",
self.clip_low_light,
)
elif isinstance(clip_low_light, (float, int)):
if clip_low_light > 0:
self.clip_low_light = clip_low_light
else:
pass
else:
self.clip_low_light = self.clip_low_light_default
self.logger.warning(
"Unknown clip_low_light, it is set to %s.", self.clip_low_light
)
# clip_high_light
if clip_high_light is None:
self.clip_high_light = self.clip_high_light_default
self.logger.warning(
"Unknown sigma_clipping_light, it is set to %s.",
self.clip_high_light,
)
elif isinstance(clip_high_light, (float, int)):
if clip_high_light > 0:
self.clip_high_light = clip_high_light
else:
pass
else:
self.clip_high_light = self.clip_high_light_default
self.logger.warning(
"Unknown clip_high_light, it is set to %s.",
self.clip_high_light,
)
# exptime_light
if exptime_light is None:
self.exptime_light = 1.0
self.logger.warning("Unknown exptime_light, it is set to 1.0.")
elif isinstance(exptime_light, (float, int)):
if exptime_light > 0:
self.exptime_light = exptime_light
else:
pass
else:
self.exptime_light = self.exptime_light_default
self.logger.warning(
"Unknown exptime_light, it is set to %s.", self.exptime_light
)
# exptime_light_keyword
if exptime_light_keyword is None:
self.exptime_light_keyword = self.exptime_light_keyword_default
self.logger.warning(
"Unknown exptime_light_keyword, it is set to %s.",
self.exptime_light_keyword,
)
elif isinstance(exptime_light_keyword, (float, int)):
pass
elif isinstance(exptime_light_keyword, str):
self.exptime_light_keyword = exptime_light_keyword
else:
self.exptime_light_keyword = self.exptime_light_keyword_default
self.logger.warning(
"Unknown exptime_light_keyword, it is set to %s.",
self.exptime_light_keyword,
)
def set_dark_properties(
self,
combinetype_dark: Union[int, str] = -1,
sigma_clipping_dark: Union[int, float] = -1,
clip_low_dark: Union[int, float] = -1,
clip_high_dark: Union[int, float] = -1,
exptime_dark: Union[int, float] = -1,
exptime_dark_keyword: Union[int, str] = -1,
):
"""
Set the properties of the dark frame. -1 means stay the same, None
means use the default value, and any other valid input for the
respective argument. See set_properties() for the argument
description.
"""
if combinetype_dark is None:
self.combinetype_dark = self.combinetype_dark_default
self.logger.warning(
"Unknown combinetype_dark, it is set as %s.",
self.combinetype_dark,
)
else:
if isinstance(combinetype_dark, (float, int)):
pass
elif isinstance(combinetype_dark, str):
if combinetype_dark in ["average", "median"]:
# use the given readnoise value
self.combinetype_dark = combinetype_dark
self.logger.info(
"combinetype_dark is set to %s.", self.combinetype_dark
)
else:
self.combinetype_dark = self.combinetype_dark_default
self.logger.warning(
"Unknown combinetype_dark, it is set as %s.",
self.combinetype_dark,
)
else:
self.combinetype_dark = self.combinetype_dark_default
self.logger.warning(
"Unknown combinetype_dark, it is set as %s.",
self.combinetype_dark,
)
if sigma_clipping_dark is None:
self.sigma_clipping_dark = self.sigma_clipping_dark_default
self.logger.warning(
"Unknown sigma_clipping_dark, it is set to %s.",
self.sigma_clipping_dark,
)
elif isinstance(sigma_clipping_dark, (float, int)):
pass
elif isinstance(sigma_clipping_dark, bool):
self.sigma_clipping_dark = sigma_clipping_dark
else:
self.sigma_clipping_dark = self.sigma_clipping_dark_default
self.logger.warning(
"Unknown sigma_clipping_dark, it is set to %s.",
self.sigma_clipping_dark,
)
# clip_low_dark
if clip_low_dark is None:
self.clip_low_dark = self.clip_low_dark_default
self.logger.warning(
"Unknown sigma_clipping_dark, it is set to %s.",
self.clip_low_dark,
)
elif isinstance(clip_low_dark, (float, int)):
if clip_low_dark > 0:
self.clip_low_dark = clip_low_dark
else:
pass
else:
self.clip_low_dark = self.clip_low_dark_default
self.logger.warning(
"Unknown clip_low_dark, it is set to %s.", self.clip_low_dark
)
# clip_high_dark
if clip_high_dark is None:
self.clip_high_dark = self.clip_high_dark_default
self.logger.warning(
"Unknown sigma_clipping_dark, it is set to %s.",
self.clip_high_dark,
)
elif isinstance(clip_high_dark, (float, int)):
if clip_high_dark > 0:
self.clip_high_dark = clip_high_dark
else:
pass
else:
self.clip_high_dark = self.clip_high_dark_default
self.logger.warning(
"Unknown clip_high_dark, it is set to %s.", self.clip_high_dark
)
# exptime_dark
if exptime_dark is None:
self.exptime_dark = self.exptime_dark_default
self.logger.warning(
"Unknown sigma_clipping_dark, it is set to %s.",
self.exptime_dark,
)
else:
if isinstance(exptime_dark, (float, int)):
if exptime_dark > 0:
self.exptime_dark = exptime_dark
else:
pass
else:
self.exptime_dark = self.exptime_dark_default
self.logger.warning(
"Unknown exptime_dark, it is set to %s.", self.exptime_dark
)
# exptime_dark_keyword
if exptime_dark_keyword is None:
self.exptime_dark_keyword = self.exptime_dark_default
self.logger.warning(
"Unknown exptime_dark_keyword, it is set to %s.",
self.exptime_dark_keyword,
)
else:
if isinstance(exptime_dark_keyword, (float, int)):
pass
elif isinstance(exptime_dark_keyword, str):
self.exptime_dark_keyword = exptime_dark_keyword
else:
self.exptime_dark_keyword = self.exptime_dark_keyword_default
self.logger.warning(
"Unknown exptime_dark_keyword, it is set to %s.",
self.exptime_dark_keyword,
)
def set_flat_properties(
self,
combinetype_flat: Union[int, str] = -1,
sigma_clipping_flat: Union[int, float] = -1,
clip_low_flat: Union[int, float] = -1,
clip_high_flat: Union[int, float] = -1,
exptime_flat: Union[int, float] = -1,
exptime_flat_keyword: Union[int, str] = -1,
):
"""
Set the properties of the flat frame. -1 means stay the same, None
means use the default value, and any other valid input for the
respective argument. See set_properties() for the argument
description.
"""
# combinetype_flat
if combinetype_flat is None:
self.combinetype_flat = self.combinetype_flat_default
self.logger.warning(
"Unknown combinetype_flat, it is set as %s.",
self.combinetype_flat,
)
elif isinstance(combinetype_flat, (float, int)):
pass
elif isinstance(combinetype_flat, str):
if combinetype_flat in ["average", "median"]:
# use the given readnoise value
self.combinetype_flat = combinetype_flat
self.logger.info(
"combinetype_flat is set to %s.", self.combinetype_flat
)
else:
self.combinetype_flat = self.combinetype_flat_default
self.logger.warning(
"Unknown combinetype_flat, it is set as %s.",
self.combinetype_flat,
)
else:
self.combinetype_flat = self.combinetype_flat_default
self.logger.warning(
"Unknown combinetype_flat, it is set as %s.",
self.combinetype_flat,
)
# sigma_clipping_flat
if sigma_clipping_flat is None:
self.sigma_clipping_flat = self.sigma_clipping_flat_default
self.logger.warning(
"Unknown sigma_clipping_flat, it is set to %s.",
self.sigma_clipping_flat,
)
elif isinstance(sigma_clipping_flat, (float, int)):
pass
elif isinstance(sigma_clipping_flat, bool):
self.sigma_clipping_flat = sigma_clipping_flat
else:
self.sigma_clipping_flat = self.sigma_clipping_flat_default
self.logger.warning(
"Unknown sigma_clipping_flat, it is set to %s.",
self.sigma_clipping_flat,
)
# clip_low_flat
if clip_low_flat is None:
self.clip_low_flat = self.clip_low_flat_default
self.logger.warning(
"Unknown sigma_clipping_flat, it is set to %s.",
self.clip_low_flat,
)
elif isinstance(clip_low_flat, (float, int)):
if clip_low_flat > 0:
self.clip_low_flat = clip_low_flat
else:
pass
else:
self.clip_low_flat = self.clip_low_flat_default
self.logger.warning(
"Unknown clip_low_flat, it is set to %s.", self.clip_low_flat
)
# clip_high_flat
if clip_high_flat is None:
self.clip_high_flat = self.clip_high_flat_default
self.logger.warning(
"Unknown sigma_clipping_flat, it is set to %s.",
self.clip_high_flat,
)
elif isinstance(clip_high_flat, (float, int)):
if clip_high_flat > 0:
self.clip_high_flat = clip_high_flat
else:
pass
else:
self.clip_high_flat = self.clip_high_flat_default
self.logger.warning(
"Unknown clip_high_flat, it is set to %s.", self.clip_high_flat
)
# exptime_flat
if exptime_flat is None:
self.exptime_flat = self.exptime_flat_default
self.logger.warning(
"Unknown sigma_clipping_flat, it is set to %s.",
self.exptime_flat,
)
elif isinstance(exptime_flat, (float, int)):
if exptime_flat > 0:
self.exptime_flat = exptime_flat
else:
pass
else:
self.exptime_flat = self.exptime_flat_default
self.logger.warning(
"Unknown exptime_flat, it is set to %s.", self.exptime_flat
)
# exptime_flat_keyword
if exptime_flat_keyword is None:
self.exptime_flat_keyword = self.exptime_flat_keyword_default
self.logger.warning(
"Unknown exptime_flat_keyword, it is set to %s.",
self.exptime_flat_keyword,
)
elif isinstance(exptime_flat_keyword, (float, int)):
pass
elif isinstance(exptime_flat_keyword, str):
self.exptime_flat_keyword = exptime_flat_keyword
else:
self.exptime_flat_keyword = self.exptime_flat_keyword_default
self.logger.warning(
"Unknown exptime_flat_keyword, it is set to %s.",
self.exptime_flat_keyword,
)
def set_bias_properties(
self,
combinetype_bias: Union[int, str] = -1,
sigma_clipping_bias: Union[int, float] = -1,
clip_low_bias: Union[int, float] = -1,
clip_high_bias: Union[int, float] = -1,
):
"""
Set the properties of the bias frame. -1 means stay the same, None
means use the default value, and any other valid input for the
respective argument. See set_properties() for the argument
description.
"""
if combinetype_bias is None:
self.combinetype_bias = self.combinetype_bias_default
self.logger.warning(
"Unknown combinetype_bias, it is set as %s.",
self.combinetype_bias,
)
elif isinstance(combinetype_bias, (float, int)):
pass
elif isinstance(combinetype_bias, str):
if combinetype_bias in ["average", "median"]:
# use the given readnoise value
self.combinetype_bias = combinetype_bias
self.logger.info(
"combinetype_bias is set to %s.", self.combinetype_bias
)
else:
self.combinetype_bias = self.combinetype_bias_default
self.logger.warning(
"Unknown combinetype_bias, it is set as median."
)
else:
self.combinetype_bias = self.combinetype_bias_default
self.logger.warning(
"Unknown combinetype_bias, it is set as median."
)
if sigma_clipping_bias is None:
self.sigma_clipping_bias = self.sigma_clipping_bias_default
self.logger.warning(
"Unknown sigma_clipping_bias, it is set to %s.",
self.combinetype_bias,
)
elif isinstance(sigma_clipping_bias, (float, int)):
pass
elif isinstance(sigma_clipping_bias, bool):
self.sigma_clipping_bias = sigma_clipping_bias
else:
self.sigma_clipping_bias = self.sigma_clipping_bias_default
self.logger.warning(
"Unknown sigma_clipping_bias, it is set to %s.",
self.sigma_clipping_bias_default,
)
if clip_low_bias is None:
self.clip_low_bias = self.clip_high_bias_default
self.logger.warning(
"Unknown sigma_clipping_bias, it is set to %s.",
self.clip_high_bias,
)
elif isinstance(clip_low_bias, (float, int)):
if clip_low_bias > 0:
self.clip_low_bias = clip_low_bias
else:
pass
else:
self.clip_low_bias = self.clip_high_bias_default
self.logger.warning(
"Unknown clip_low_bias, it is set to %s.", self.clip_high_bias
)
if clip_high_bias is None:
self.clip_high_bias = self.clip_high_bias_default
self.logger.warning(
"Unknown sigma_clipping_bias, it is set to %s.",
self.clip_high_bias,
)
elif isinstance(clip_high_bias, (float, int)):
if clip_high_bias > 0:
self.clip_high_bias = clip_high_bias
else:
pass
else:
self.clip_high_bias = self.clip_high_bias_default
self.logger.warning(
"Unknown clip_high_bias, it is set to %s.", self.clip_high_bias
)
def set_arc_properties(
self,
combinetype_arc: Union[int, str] = -1,
sigma_clipping_arc: Union[int, float] = -1,
clip_low_arc: Union[int, float] = -1,
clip_high_arc: Union[int, float] = -1,
):
"""
Set the properties of the arc frame. -1 means stay the same, None
means use the default value, and any other valid input for the
respective argument. See set_properties() for the argument
description.
"""
# combinetype_arc
if combinetype_arc is None:
self.combinetype_arc = self.combinetype_arc_default
self.logger.warning(
"Unknown combinetype_arc, it is set as %s.",
self.combinetype_arc_default,
)
elif isinstance(combinetype_arc, (float, int)):
pass
elif isinstance(combinetype_arc, str):
if combinetype_arc in ["average", "median"]:
# use the given readnoise value
self.combinetype_arc = combinetype_arc
self.logger.info(
"combinetype_arc is set to %s.", self.combinetype_arc
)
else:
self.combinetype_arc = self.combinetype_arc_default
self.logger.warning(
"Unknown combinetype_arc, it is set as %s.",
self.combinetype_arc,
)
else:
self.combinetype_arc = self.combinetype_arc_default
self.logger.warning(
"Unknown combinetype_arc, it is set as %s.",
self.combinetype_arc,
)
# sigma_clipping_arc
if sigma_clipping_arc is None:
self.sigma_clipping_arc = self.sigma_clipping_arc_default
self.logger.warning(
"Unknown sigma_clipping_arc, it is set to %s.",
self.sigma_clipping_arc,
)
else:
if isinstance(sigma_clipping_arc, (float, int)):
pass
elif isinstance(sigma_clipping_arc, bool):
self.sigma_clipping_arc = sigma_clipping_arc
else:
self.sigma_clipping_arc = self.sigma_clipping_arc_default
self.logger.warning(
"Unknown sigma_clipping_arc, it is set to %s.",
self.sigma_clipping_arc,
)
# clip_low_arc
if clip_low_arc is None:
self.clip_low_arc = self.clip_low_arc_default
self.logger.warning(
"Unknown sigma_clipping_arc, it is set to %s.",
self.clip_low_arc,
)
elif isinstance(clip_low_arc, (float, int)):
if clip_low_arc > 0:
self.clip_low_arc = clip_low_arc
else:
pass
else:
self.clip_low_arc = self.clip_low_arc_default
self.logger.warning("Unknown clip_low_arc, it is set to 5.")
# clip_high_arc
if clip_high_arc is None:
self.clip_high_arc = self.clip_high_arc_default
self.logger.warning(
"Unknown sigma_clipping_arc, it is set to %s.",
self.clip_high_arc,
)
elif isinstance(clip_high_arc, (float, int)):
if clip_high_arc > 0:
self.clip_high_arc = clip_high_arc
else:
pass
else:
self.clip_high_arc = self.clip_high_arc_default
self.logger.warning(
"Unknown clip_high_arc, it is set to %s.", self.clip_high_arc
)
def set_cosmic_properties(
self,
cosmicray: Union[int, bool] = -1,
fsmode: Union[int, str] = -1,
psfmodel: Union[int, str] = -1,
kwargs: Union[int, dict] = -1,
):
"""
Set the properties for the cosmic ray rejection with AstroScrappy.
See set_properties() for the argument description.
"""
# cosmicray
if isinstance(cosmicray, int):
if cosmicray == 1:
self.cosmicray = True
elif cosmicray == 0:
self.cosmicray = False
else:
pass
elif isinstance(cosmicray, bool):
self.cosmicray = cosmicray
else:
self.cosmicray = self.cosmicray_default
self.logger.warning(
"Unknown cosmicray, it is set to %s.", self.cosmicray
)
# fsmode in detect_cosmics()
if fsmode is None:
self.fsmode = self.fsmode_default
self.logger.warning(
"Unknown fsmode, it is set as %s.", self.fsmode_default
)
elif isinstance(fsmode, (float, int)):
pass
elif isinstance(fsmode, str):
if fsmode in ["convolve", "median"]:
# use the given fsmode value
self.fsmode = fsmode
self.logger.info("fsmode is set to %s.", self.fsmode)
else:
self.fsmode = self.fsmode_default
self.logger.warning(
"Unknown fsmode, it is set as %s.", self.fsmode
)
else:
self.fsmode = self.fsmode_default
self.logger.warning(
"Unknown fsmode, it is set as %s.", self.fsmode
)
# psfmodel in detect_cosmics() (and the two added modes)
if psfmodel is None:
self.psfmodel = self.psfmodel_default
self.logger.warning(
"psfmodel is given as None, it is set as %s.",
self.psfmodel_default,
)
elif isinstance(psfmodel, (float, int)):
pass
elif isinstance(psfmodel, str):
if psfmodel in [
"gauss",
"gaussx",
"gaussy",
"gaussxy",
"gaussyx",
"moffat",
]:
# use the given psfmodel value
self.psfmodel = psfmodel
self.logger.info("psfmodel is set to %s.", self.psfmodel)
else:
self.psfmodel = self.psfmodel_default
self.logger.warning(
"Unknown psfmodel, it is set as %s.", self.psfmodel
)
else:
self.psfmodel = self.psfmodel_default
self.logger.warning(
"Unknown psfmodel, it is set as %s.", self.psfmodel
)
# extra keyword arguments for detect_cosmics()
if kwargs is None:
self.cr_kwargs = self.cr_kwargs_default
if isinstance(kwargs, (float, int)):
pass
elif isinstance(kwargs, dict):
self.cr_kwargs = kwargs
else:
self.cr_kwargs = self.cr_kwargs_default
self.logger.warning(
"Unknown cr_kwargs, it is set as %s.", self.cr_kwargs
)
def set_detector_properties(
self,
gain: Union[int, float] = -1,
readnoise: Union[int, float] = -1,
heal_pixels: Union[int, bool] = -1,
cutoff: Union[int, float] = -1,
grow: Union[int, bool] = -1,
iterations: Union[int, float] = -1,
diagonal: Union[int, bool] = -1,
):
"""
Set the properties for the detector. See set_properties() for the
argument description.
"""
# gain
if isinstance(gain, (float, int)):
if gain > 0:
self.gain = gain
else:
pass
else:
self.gain = self.gain_default
# readnoise
if isinstance(readnoise, (float, int)):
if readnoise >= 0:
self.readnoise = readnoise
else:
pass
else:
self.readnoise = self.readnoise_default
# heal_pixels
if isinstance(heal_pixels, (float, int)):
if heal_pixels == 1:
self.heal_pixels = True
elif heal_pixels == 0:
self.heal_pixels = False
else:
pass
elif isinstance(heal_pixels, bool):
self.heal_pixels = heal_pixels
else:
self.heal_pixels = self.heal_pixels_default
self.logger.warning(
"Unknown heal_pixels, it is set to %s.",
self.heal_pixels_default,
)
# cutoff
if isinstance(cutoff, (float, int)):
if cutoff > 0:
self.cutoff = cutoff
else:
pass
else:
self.cutoff = self.cutoff_default
# grow
if isinstance(grow, (float, int)):
if grow == 1:
self.grow = True
elif grow == 0:
self.grow = False
else:
pass
elif isinstance(grow, bool):
self.grow = grow
else:
self.grow = self.grow_default
self.logger.warning(
"Unknown grow, it is set to %s.", self.grow_default
)
# iterations
if isinstance(iterations, (float, int)):
if iterations >= 0:
self.iterations = iterations
else:
pass
else:
self.iterations = self.iterations_default
# diagonal
if isinstance(diagonal, (float, int)):
if diagonal == 1:
self.diagonal = True
elif diagonal == 0:
self.diagonal = False
else:
pass
elif isinstance(diagonal, bool):
self.diagonal = diagonal
else:
self.diagonal = self.diagonal_default
self.logger.warning(
"Unknown diagonal, it is set to %s.", self.diagonal_default
)
def load_data(self):
"""
Load the data listed in the filelist and apply the property setting
as provided by the various set properties commands.
"""
# If there is no science frames, nothing to process.
assert self.light_list.size > 0, "There is no light frame."
# Only load the science data, other types of image data are loaded by
# separate methods.
for i in range(self.light_list.size):
# Open all the light frames
self.logger.debug("Loading light frame: %s.", self.light_list[i])
light = fits.open(self.light_list[i])[self.light_hdunum[i]]
(
light_data,
light_header,
light_exposure_time,
) = self._get_data_and_header(light, self.exptime_light_keyword)
self.add_light(light_data, light_header, light_exposure_time)
# Cosmic ray cleaning
if self.cosmicray:
self.logger.info(
"Removing cosmic rays in mode: %s.", self.psfmodel
)
if self.fsmode == "convolve":
if self.psfmodel == "gaussyx":
self.light_ccddata[i].data = detect_cosmics(
self.light_ccddata[i].data / self.gain,
gain=self.gain,
readnoise=self.readnoise,
fsmode="convolve",
psfmodel="gaussy",
**self.cr_kwargs,
)[1]
self.light_ccddata[i].data = detect_cosmics(
self.light_ccddata[i].data / self.gain,
gain=self.gain,
readnoise=self.readnoise,
fsmode="convolve",
psfmodel="gaussx",
**self.cr_kwargs,
)[1]
elif self.psfmodel == "gaussxy":
self.light_ccddata[i].data = detect_cosmics(
self.light_ccddata[i].data / self.gain,
gain=self.gain,
readnoise=self.readnoise,
fsmode="convolve",
psfmodel="gaussx",
**self.cr_kwargs,
)[1]
self.light_ccddata[i].data = detect_cosmics(
self.light_ccddata[i].data / self.gain,
gain=self.gain,
readnoise=self.readnoise,
fsmode="convolve",
psfmodel="gaussy",
**self.cr_kwargs,
)[1]
else:
self.light_ccddata[i].data = detect_cosmics(
self.light_ccddata[i].data / self.gain,
gain=self.gain,
readnoise=self.readnoise,
fsmode="convolve",
psfmodel=self.psfmodel,
**self.cr_kwargs,
)[1]
elif self.fsmode == "median":
self.light_ccddata[i].data = detect_cosmics(
self.light_ccddata[i].data / self.gain,
gain=self.gain,
readnoise=self.readnoise,
fsmode=self.fsmode,
psfmodel=self.psfmodel,
**self.cr_kwargs,
)[1]
else:
self.logger.error("Unknown fsmode: %s", self.fsmode)
self.logger.debug("Light frame header: %s.", self.light_header[i])
self.logger.debug(
"Appending light filename: %s.",
self.light_list[i].split("/")[-1],
)
self.light_filename.append(self.light_list[i].split("/")[-1])
saturation_mask, saturated = create_cutoff_mask(
self.light_ccddata[i].data,
self.cutoff,
self.grow,
self.iterations,
self.diagonal,
)
if self.saturation_mask is None:
self.saturation_mask = saturation_mask
self.saturated = saturated
else:
self.saturation_mask *= saturation_mask
self.saturated *= saturated
# FITS keyword standard for the spectral direction, if FITS header
# does not contain SAXIS, the image in assumed to have the spectra
# going across (left to right corresponds to blue to red). All frames
# get rotated in the anti-clockwise direction if the first light frame
# has a verticle spectrum (top to bottom corresponds to blue to red).
self.set_saxis(self.saxis)
# Get the arcs if available
if len(self.arc_list) > 0:
# Combine the arcs
for i in range(self.arc_list.size):
# Open all the light frames
arc = fits.open(self.arc_list[i])[self.arc_hdunum[i]]
arc_data, arc_header, _ = self._get_data_and_header(arc)
self.add_arc(arc_data, arc_header)
self.logger.debug("Arc frame header: %s.", self.arc_header[i])
self.logger.debug(
"Appending arc filename: %s.",
self.arc_list[i].split("/")[-1],
)
self.arc_filename.append(self.arc_list[i].split("/")[-1])
# combine the arc frames
self.arc_main, _ = self.combine_arc()
# Get the darks if available
if len(self.dark_list) > 0:
for i in range(self.dark_list.size):
# Open all the dark frames
dark = fits.open(self.dark_list[i])[self.dark_hdunum[i]]
(
dark_data,
dark_header,
dark_exposure_time,
) = self._get_data_and_header(dark, self.exptime_dark_keyword)
self.add_dark(dark_data, dark_header, dark_exposure_time)
self.logger.debug(
"Dark frame header: %s.", self.dark_header[i]
)
self.logger.debug(
"Appending dark filename: %s.",
self.dark_list[i].split("/")[-1],
)
self.dark_filename.append(self.dark_list[i].split("/")[-1])
# combine the arc frames
self.dark_main, self.dark_stack_weighted = self.combine_dark()
# Get the flats if available
if len(self.flat_list) > 0:
for i in range(self.flat_list.size):
# Open all the flatfield frames
flat = fits.open(self.flat_list[i])[self.flat_hdunum[i]]
(
flat_data,
flat_header,
flat_exposure_time,
) = self._get_data_and_header(flat, self.exptime_flat_keyword)
self.add_flat(flat_data, flat_header, flat_exposure_time)
self.logger.debug(
"Flat frame header: %s.", self.flat_header[i]
)
self.logger.debug(
"Appending flat filename: %s.",
self.flat_list[i].split("/")[-1],
)
self.flat_filename.append(self.flat_list[i].split("/")[-1])
# combine the arc frames
self.flat_main, self.flat_stack_weighted = self.combine_flat()
# Get the biases if available
if len(self.bias_list) > 0:
for i in range(self.bias_list.size):
# Open all the flatfield frames
bias = fits.open(self.bias_list[i])[self.bias_hdunum[i]]
bias_data, bias_header, _ = self._get_data_and_header(bias)
self.add_bias(bias_data, bias_header)
self.logger.debug(
"Flat frame header: %s.", self.bias_header[i]
)
self.logger.debug(
"Appending flat filename: %s.",
self.bias_list[i].split("/")[-1],
)
self.bias_filename.append(self.bias_list[i].split("/")[-1])
# combine the arc frames
self.bias_main = self.combine_bias()
def add_light(
self,
light: Union[np.ndarray, CCDData],
header: fits.Header,
exposure_time: Union[int, float],
):
"""
Add the light frame.
Parameters
----------
light: 2-d array or CCDData object
The light image (i.e. science/target frame)
header: astropy header object
The FITS header
exposure_time: float
The exposure time of the frame.
"""
if isinstance(light, np.ndarray):
light = CCDData(light.astype("float"), unit=u.ct)
self.light_ccddata.append(light)
self.light_header.append(header)
self.light_time.append(exposure_time)
def add_arc(self, arc: Union[np.ndarray, CCDData], header: fits.Header):
"""
Add the arc frame.
Parameters
----------
arc: 2-d array or CCDData object
The arc image
header: astropy header object
The FITS header
"""
if isinstance(arc, np.ndarray):
arc = CCDData(arc.astype("float"), unit=u.ct)
self.arc_ccddata.append(arc)
self.arc_header.append(header)
def add_flat(
self,
flat: Union[np.ndarray, CCDData],
header: fits.Header,
exposure_time: Union[int, float],
):
"""
Add the flat frame.
Parameters
----------
flat: 2-d array or CCDData object
The flat image
header: astropy header object
The FITS header
exposure_time: float
The exposure time of the frame.
"""
if isinstance(flat, np.ndarray):
flat = CCDData(flat.astype("float"), unit=u.ct)
self.flat_ccddata.append(flat)
self.flat_header.append(header)
self.flat_time.append(exposure_time)
def add_dark(
self,
dark: Union[np.ndarray, CCDData],
header: fits.Header,
exposure_time: Union[int, float],
):
"""
Add the dark frame.
Parameters
----------
dark: 2-d array or CCDData object
The dark image
header: astropy header object
The FITS header
exposure_time: float
The exposure time of the frame.
"""
if isinstance(dark, np.ndarray):
dark = CCDData(dark.astype("float"), unit=u.ct)
self.dark_ccddata.append(dark)
self.dark_header.append(header)
self.dark_time.append(exposure_time)
def add_bias(self, bias: Union[np.ndarray, CCDData], header: fits.Header):
"""
Add the bias frame.
Parameters
----------
bias: 2-d array or CCDData object
The bias image
header: astropy header object
The FITS header
"""
if isinstance(bias, np.ndarray):
bias = CCDData(bias.astype("float"), unit=u.ct)
self.bias_ccddata.append(bias)
self.bias_header.append(header)
def _get_data_and_header(
self,
input_data: Union[fits.hdu.hdulist.HDUList, str],
exptime_keyword: str = None,
):
if isinstance(input_data, fits.hdu.hdulist.HDUList):
input_data = input_data[0]
self.logger.warning(
"An HDU list is provided, only the first HDU will be read."
)
input_shape = np.shape(input_data.data)
self.logger.debug("data.data has shape %s.", input_shape)
# Normal case
if len(input_shape) == 2:
self.logger.debug("input_data.data is 2 dimensional.")
input_ccddata = CCDData(input_data.data.astype("float"), unit=u.ct)
input_header = input_data.header
# Try to trap common error when saving FITS file
# Case with multiple image extensions, we only take the first one
elif len(input_shape) == 3:
self.logger.debug("input_data.data is 3 dimensional.")
input_ccddata = CCDData(
input_data.data[0].astype("float"), unit=u.ct
)
input_header = input_data.header
# Case with an extra bracket when saving
elif len(input_shape) == 4:
self.logger.debug(
"input_data.data is 4 dimensional, there is most "
"likely an extra bracket, attempting to go in "
"one level."
)
# In case it in a multiple extension format, we take the
# first one only
if len(np.shape(input_data.data[0])) == 3:
input_ccddata = CCDData(
input_data.data[0][0].astype("float"), unit=u.ct
)
input_header = input_data.header
else:
input_ccddata = CCDData(
input_data.data[0].astype("float"), unit=u.ct
)
input_header = input_data.header
else:
error_msg = (
"Please check the shape/dimension of the input_data frame, "
"it is probably empty or has an atypical format. The shape "
f"of the data is: {input_shape}. The data type is: "
f"{type(input_data)}."
)
self.logger.critical(error_msg)
raise RuntimeError(error_msg)
exposure_time = None
# Get the exposure time for the frame
if isinstance(exptime_keyword, str):
if exptime_keyword in input_header:
exposure_time = input_header[exptime_keyword]
self.logger.info(
"Exposure time found with the supplied keyword."
)
else:
pass
else:
pass
# If the exposure_time is still None, loop through the default list
if exposure_time is None:
if np.in1d(self.exptime_keyword_list, input_header).any():
# Get the exposure time for the light frames
exptime_keyword_idx = int(
np.where(np.in1d(self.exptime_keyword_list, input_header))[
0
][0]
)
exptime_keyword = self.exptime_keyword_list[
exptime_keyword_idx
]
exposure_time = input_header[exptime_keyword]
self.logger.info(
"Exposure time found from the backup keyword list."
)
else:
# If exposure time cannot be found from the header and
# user failed to supply the exposure time, use 1 second
self.logger.warning(
"Exposure time cannot be found. "
"1 second is used as the exposure time."
)
exposure_time = 1.0
return input_ccddata, input_header, exposure_time
def combine_light(
self,
combinetype_light: str = None,
sigma_clipping_light: Union[int, float] = None,
clip_low_light: Union[int, float] = None,
clip_high_light: Union[int, float] = None,
):
"""
Combine the light frameself.light_ccddatas. The parameters provide here OVERRIDE those
set previously. Use with caution.
"""
if combinetype_light is not None:
self.combinetype_light = combinetype_light
if sigma_clipping_light is not None:
self.sigma_clipping_light = sigma_clipping_light
if clip_low_light is not None:
self.clip_low_light = clip_low_light
if clip_high_light is not None:
self.clip_high_light = clip_high_light
return self._combine(
self.light_ccddata,
self.light_time,
self.combinetype_light,
self.sigma_clipping_light,
self.clip_low_light,
self.clip_high_light,
)
def combine_arc(
self,
combinetype_arc: str = None,
sigma_clipping_arc: Union[int, float] = None,
clip_low_arc: Union[int, float] = None,
clip_high_arc: Union[int, float] = None,
):
"""
Combine the arc frames. The parameters provide here OVERRIDE those
set previously. Use with caution.
"""
if combinetype_arc is not None:
self.combinetype_arc = combinetype_arc
if sigma_clipping_arc is not None:
self.sigma_clipping_arc = sigma_clipping_arc
if clip_low_arc is not None:
self.clip_low_arc = clip_low_arc
if clip_high_arc is not None:
self.clip_high_arc = clip_high_arc
return self._combine(
self.arc_ccddata,
None,
self.combinetype_arc,
self.sigma_clipping_arc,
self.clip_low_arc,
self.clip_high_arc,
)
def combine_flat(
self,
combinetype_flat: str = None,
sigma_clipping_flat: Union[int, float] = None,
clip_low_flat: Union[int, float] = None,
clip_high_flat: Union[int, float] = None,
):
"""
Combine the flat frames. The parameters provide here OVERRIDE those
set previously. Use with caution.
"""
if combinetype_flat is not None:
self.combinetype_flat = combinetype_flat
if sigma_clipping_flat is not None:
self.sigma_clipping_flat = sigma_clipping_flat
if clip_low_flat is not None:
self.clip_low_flat = clip_low_flat
if clip_high_flat is not None:
self.clip_high_flat = clip_high_flat
return self._combine(
self.flat_ccddata,
self.flat_time,
self.combinetype_flat,
self.sigma_clipping_flat,
self.clip_low_flat,
self.clip_high_flat,
)
def combine_dark(
self,
combinetype_dark: str = None,
sigma_clipping_dark: Union[int, float] = None,
clip_low_dark: Union[int, float] = None,
clip_high_dark: Union[int, float] = None,
):
"""
Combine the dark frames. The parameters provide here OVERRIDE those
set previously. Use with caution.
"""
if combinetype_dark is not None:
self.combinetype_dark = combinetype_dark
if sigma_clipping_dark is not None:
self.sigma_clipping_dark = sigma_clipping_dark
if clip_low_dark is not None:
self.clip_low_dark = clip_low_dark
if clip_high_dark is not None:
self.clip_high_dark = clip_high_dark
return self._combine(
self.dark_ccddata,
self.dark_time,
self.combinetype_dark,
self.sigma_clipping_dark,
self.clip_low_dark,
self.clip_high_dark,
)
def combine_bias(
self,
combinetype_bias: str = None,
sigma_clipping_bias: Union[int, float] = None,
clip_low_bias: Union[int, float] = None,
clip_high_bias: Union[int, float] = None,
):
"""
Combine the bias frames. The parameters provide here OVERRIDE those
set previously. Use with caution.
"""
if combinetype_bias is not None:
self.combinetype_bias = combinetype_bias
if sigma_clipping_bias is not None:
self.sigma_clipping_bias = sigma_clipping_bias
if clip_low_bias is not None:
self.clip_low_bias = clip_low_bias
if clip_high_bias is not None:
self.clip_high_bias = clip_high_bias
return self._combine(
self.bias_ccddata,
None,
self.combinetype_bias,
self.sigma_clipping_bias,
self.clip_low_bias,
self.clip_high_bias,
)
def _combine(
self,
ccd_data: CCDData,
exptime: list,
combine_type: str,
sigma_clipping: float,
clip_low: float,
clip_high: float,
):
# Put data into a Combiner
combiner = Combiner(ccd_data)
weighted = False
# Apply sigma clipping
if sigma_clipping:
combiner.sigma_clipping(
low_thresh=clip_low, high_thresh=clip_high, func=np.ma.median
)
# Image combine by median or average
if combine_type == "median":
combined_ccddata = combiner.median_combine()
elif combine_type == "average":
# weighted combine if exposure times are available
if exptime:
if len(exptime) == len(ccd_data):
combiner.weights = np.array(exptime)
weighted = True
else:
pass
combined_ccddata = combiner.average_combine()
else:
self.logger.error("Unknown combinetype.")
raise RuntimeError(f"Unknown combinetype: {combine_type}.")
# Free memory
combiner = None
return combined_ccddata, weighted
def _bias_subtract(self):
"""
Perform bias subtraction if bias frames are available.
"""
# Put data into a Combiner
self.bias_main, _ = self.combine_bias()
# Bias subtract
if self.bias_main is None:
self.logger.error(
"Main flat is not available, frame will not be flattened."
)
else:
self.light_reduced = self.light_reduced.subtract(self.bias_main)
def _dark_subtract(self):
"""
Perform dark subtraction if dark frames are available
"""
self.dark_main, self.dark_stack_weighted = self.combine_dark()
if not self.dark_filename:
# Dark subtraction adjusted for exposure time
self.light_reduced = self.light_reduced.subtract(
self.dark_main.divide(
np.nanmean(self.dark_time) / np.nanmean(self.light_time)
)
)
self.logger.info("Light frame is dark subtracted.")
def _flatfield(self):
"""
Perform field flattening if flat frames are available
"""
self.flat_main, self.flat_stack_weighted = self.combine_flat()
# Field-flattening
if self.flat_main is None:
self.logger.warning(
"Main flat is not available, frame will not be flattened."
)
else:
self.flat_reduced = copy.deepcopy(self.flat_main)
# Dark subtract the flat field
if self.dark_main is None:
self.logger.warning(
"Main dark is not available, main "
"flat will not be dark subtracted."
)
else:
self.flat_reduced = self.flat_reduced.subtract(self.dark_main)
self.logger.info("Flat frame is flat subtracted.")
# Bias subtract the flat field
if self.bias_main is None:
self.logger.warning(
"Main bias is not available, main "
"flat will not be bias subtracted."
)
else:
self.flat_reduced = self.flat_reduced.subtract(self.bias_main)
self.logger.info("Flat frame is bias subtracted.")
self.flat_reduced = self.flat_reduced / np.nanmean(
self.flat_reduced
)
# Flattenning the light frame
self.light_reduced = self.light_reduced / self.flat_reduced
self.logger.info("Light frame is flattened.")
def create_bad_pixel_mask(
self,
grow: bool = False,
iterations: int = 1,
diagonal: bool = False,
create_bad_mask: bool = True,
):
"""
Heal the bad pixels by taking the average of their n-nearest
good neighboring pixels. See more at util.bfixpix(). If you
wish to have fine control of the the bad mask creation, please
use the util.create_cutoff_mask() and util.create_bad_mask()
manually; or supply your own the bad masks.
Parameters
----------
grow: bool (Default: False)
Set to True to grow the mask, see `grow_mask()`
iterations: int (Default: 1)
The number of pixel growth along the Cartesian axes directions.
diagonal: bool (Default: False)
Set to True to grow in the diagonal directions.
create_bad_mask: bool (Deafult: True)
If set to True, combine the the bad_pixel_mask and saturation_mask
"""
self.bad_pixel_mask, self.bad_pixels = create_bad_pixel_mask(
self.light_reduced, grow, iterations, diagonal
)
if create_bad_mask:
self.create_bad_mask()
def create_bad_mask(self):
"""
Create mask for bad pixels, and saturated pixels.
"""
if self.bad_pixel_mask is None:
self.create_bad_pixel_mask(create_bad_mask=False)
if self.saturation_mask is None:
saturation_mask, saturated = create_cutoff_mask(
self.light_reduced,
self.cutoff,
self.grow,
self.iterations,
self.diagonal,
)
if self.saturation_mask is None:
self.saturation_mask = saturation_mask
self.saturated = saturated
else:
self.saturation_mask *= saturation_mask
self.saturated *= saturated
self.bad_mask = self.saturation_mask | self.bad_pixel_mask
def reduce(self):
"""
Perform data reduction using the frames provided.
"""
if self.light_main is None:
self.light_main, self.light_stack_weighted = self.combine_light()
self.light_total_time = np.sum(self.light_time)
if self.combinetype_light == "median":
self.light_average_time = np.median(self.light_time)
elif self.combinetype_light == "mean":
self.light_average_time = np.mean(self.light_time)
self.light_reduced = self.light_main
# Process the arc
if len(self.arc_ccddata) > 0:
if self.arc_main is None:
self.arc_main, _ = self.combine_arc()
# Bias subtraction
if len(self.bias_ccddata) > 0:
if self.bias_main is None:
self.bias_main, _ = self.combine_bias()
self._bias_subtract()
else:
self.logger.warning(
"No bias frames. Bias subtraction is not performed."
)
# Dark subtraction
if len(self.dark_ccddata) > 0:
if self.dark_main is None:
self.dark_main, self.dark_stack_weighted = self.combine_dark()
self.dark_total_time = np.sum(self.dark_time)
self._dark_subtract()
else:
self.logger.warning(
"No dark frames. Dark subtraction is not performed."
)
# Field flattening
if len(self.flat_ccddata) > 0:
if self.flat_main is None:
self.flat_main, self.flat_stack_weighted = self.combine_flat()
self.flat_total_time = np.sum(self.flat_time)
self._flatfield()
else:
self.logger.warning(
"No flat frames. Field-flattening is not performed."
)
# Construct a FITS object of the reduced frame
self.light_reduced = np.array((self.light_reduced))
# Create bad pixel mask
self.create_bad_pixel_mask()
# Heal pixel immediately
if self.heal_pixels:
self.heal_bad_pixels()
# rotate the frame by 90 degrees anti-clockwise if saxis is 0
if self.saxis == 0:
self.light_reduced = np.rot90(self.light_reduced)
self.bad_mask = np.rot90(self.bad_mask)
self.bad_pixel_mask = np.rot90(self.bad_pixel_mask)
self.saturation_mask = np.rot90(self.saturation_mask)
self.arc_main = np.rot90(self.arc_main)
def heal_bad_pixels(self, bad_mask: np.ndarray = None, n_nearest: int = 4):
"""
Heal the bad pixels by taking the average of their n-nearest
good neighboring pixels. See more at util.bfixpix(). By default,
this is not used, becase any automatic tampering of data is
not a good idea.
Parameters
----------
bad_mask: numpy.ndarray
The bad pixel mask for healing. If it is not provided, it
will be computed from the reduced data.
n_nearest: int
Number of nearest, good pixels to average over.
"""
if bad_mask is None:
if self.bad_mask is None:
self.create_bad_mask()
self.logger.info("Created a bad_mask using the reduced data.")
elif np.shape(bad_mask) == np.shape(self.light_reduced):
self.bad_mask = bad_mask
self.logger.info("Bad_mask is set to the supplied mask.")
else:
err_msg = (
"The bad_mask provided has to be the "
+ "same shape as the data."
)
self.logger.error(err_msg)
raise RuntimeError(err_msg)
bfixpix(self.light_reduced, self.bad_mask, n_nearest)
self.pixel_healed = True
def create_image_fits(self):
"""
Put the reduced data in FITS format with an image header.
"""
self.image_fits = fits.ImageHDU(self.light_reduced)
self.logger.info("Appending the header from the first light frame.")
self.image_fits.header = self.light_header[0]
# Add the names of all the light frames to header
if len(self.light_filename) > 0:
for i, filename in enumerate(self.light_filename):
self.logger.debug(
"Light frame: %s is added to the header.",
filename,
)
self.image_fits.header.set(
keyword="light" + str(i + 1),
value=filename,
comment="Light frames",
)
# Add the names of all the biad frames to header
if len(self.bias_filename) > 0:
for i, filename in enumerate(self.bias_filename):
self.logger.debug(
"Bias frame: %s is added to the header.",
filename,
)
self.image_fits.header.set(
keyword="bias" + str(i + 1),
value=filename,
comment="Bias frames",
)
# Add the names of all the dark frames to header
if len(self.dark_filename) > 0:
for i, filename in enumerate(self.dark_filename):
self.logger.debug(
"Dark frame: %s is added to the header.",
filename,
)
self.image_fits.header.set(
keyword="dark" + str(i + 1),
value=filename,
comment="Dark frames",
)
# Add the names of all the flat frames to header
if len(self.flat_filename) > 0:
for i, filename in enumerate(self.flat_filename):
self.logger.debug(
"Flat frame: %s is added to the header.",
filename,
)
self.image_fits.header.set(
keyword="flat" + str(i + 1),
value=filename,
comment="Flat frames",
)
# Add all the other keywords
self.image_fits.header.set(
keyword="COMBTYPE",
value=self.combinetype_light,
comment="Type of image combine of the light frames.",
)
self.image_fits.header.set(
keyword="SIGCLIP",
value=self.sigma_clipping_light,
comment="True if the light frames are sigma clipped.",
)
self.image_fits.header.set(
keyword="CLIPLOW",
value=self.clip_low_light,
comment="Lower threshold of sigma clipping of the light frames.",
)
self.image_fits.header.set(
keyword="CLIPHIG",
value=self.clip_high_light,
comment="Higher threshold of sigma clipping of the light frames.",
)
self.image_fits.header.set(
keyword="XPOSURE",
value=self.light_total_time,
comment="Total exposure time of the light frames.",
)
self.image_fits.header.set(
keyword="AXPOSURE",
value=self.light_average_time,
comment="Average exposure time of the light frames.",
)
self.image_fits.header.set(
keyword="WEIGHT",
value=self.light_stack_weighted,
comment="True if frame are weighted by exposure time in combine.",
)
self.image_fits.header.set(
keyword="KEYWORD",
value=self.exptime_light_keyword,
comment=(
"Automatically identified exposure time keyword of the "
"light frames."
),
)
self.image_fits.header.set(
keyword="DCOMTYPE",
value=self.combinetype_dark,
comment="Type of image combine of the dark frames.",
)
self.image_fits.header.set(
keyword="DSIGCLIP",
value=self.sigma_clipping_dark,
comment="True if the dark frames are sigma clipped.",
)
self.image_fits.header.set(
keyword="DCLIPLOW",
value=self.clip_low_dark,
comment="Lower threshold of sigma clipping of the dark frames.",
)
self.image_fits.header.set(
keyword="DCLIPHIG",
value=self.clip_high_dark,
comment="Higher threshold of sigma clipping of the dark frames.",
)
self.image_fits.header.set(
keyword="DXPOSURE",
value=self.dark_total_time,
comment="Total exposure time of the dark frames.",
)
self.image_fits.header.set(
keyword="DWEIGHT",
value=self.dark_stack_weighted,
comment="True if frame are weighted by exposure time in combine.",
)
self.image_fits.header.set(
keyword="DKEYWORD",
value=self.exptime_dark_keyword,
comment="Automatically identified exposure time keyword of the "
+ "dark frames.",
)
self.image_fits.header.set(
keyword="BCOMTYPE",
value=self.combinetype_bias,
comment="Type of image combine of the bias frames.",
)
self.image_fits.header.set(
keyword="BSIGCLIP",
value=self.sigma_clipping_bias,
comment="True if the dark frames are sigma clipped.",
)
self.image_fits.header.set(
keyword="BCLIPLOW",
value=self.clip_low_bias,
comment="Lower threshold of sigma clipping of the bias frames.",
)
self.image_fits.header.set(
keyword="BCLIPHIG",
value=self.clip_high_bias,
comment="Higher threshold of sigma clipping of the bias frames.",
)
self.image_fits.header.set(
keyword="FCOMTYPE",
value=self.combinetype_flat,
comment="Type of image combine of the flat frames.",
)
self.image_fits.header.set(
keyword="FSIGCLIP",
value=self.sigma_clipping_flat,
comment="True if the flat frames are sigma clipped.",
)
self.image_fits.header.set(
keyword="FCLIPLOW",
value=self.clip_low_flat,
comment="Lower threshold of sigma clipping of the flat frames.",
)
self.image_fits.header.set(
keyword="FCLIPHIG",
value=self.clip_high_flat,
comment="Higher threshold of sigma clipping of the flat frames.",
)
self.image_fits.header.set(
keyword="FXPOSURE",
value=self.flat_total_time,
comment="Total exposure time of the flat frames.",
)
self.image_fits.header.set(
keyword="FWEIGHT",
value=self.flat_stack_weighted,
comment="True if frame are weighted by exposure time in combine.",
)
self.image_fits.header.set(
keyword="CCLEANED",
value=self.cosmicray,
comment="Indicate if cosmic ray cleaning was performed.",
)
self.image_fits.header.set(
keyword="CGAIN",
value=self.gain,
comment="Gain value used for cosmic ray cleaning.",
)
self.image_fits.header.set(
keyword="CRDNOISE",
value=self.readnoise,
comment="Readnoise value used for cosmic ray cleaning.",
)
self.image_fits.header.set(
keyword="CFSMODE",
value=self.fsmode,
comment="The fine structure mode for cosmic ray cleaning.",
)
self.image_fits.header.set(
keyword="CPSFMOD",
value=self.psfmodel,
comment="The PSF model used for cosmic ray cleaning.",
)
self.image_fits.header.set(
keyword="CUTOFF",
value=self.cutoff,
comment="The upper and lower limit of the good pixel values.",
)
self.image_fits.header.set(
keyword="GROW",
value=self.grow,
comment="Indicate if the bad pixel mask is grown outward.",
)
self.image_fits.header.set(
keyword="ITERATE",
value=self.iterations,
comment="The number of pixels the bad pixel mask is grown.",
)
self.image_fits.header.set(
keyword="DIAGONAL",
value=self.diagonal,
comment=(
"If False, ITERATE is the number of pixels grown "
"in Mahattan distance."
),
)
self.image_fits.header.set(
keyword="HEALED",
value=self.pixel_healed,
comment="Indicate if the pixels are healed (i.e. altered!).",
)
if self.cr_kwargs is not None:
for key, value in self.cr_kwargs.items():
self.image_fits.header.set(
keyword=key, value=value, comment=key
)
def save_fits(
self,
filename: str = "reduced_image",
extension: str = "fits",
overwrite: bool = False,
):
"""
Save the reduced image to disk.
Parameters
----------
filename: str
Disk location to be written to. Default is at where the
process/subprocess is execuated.
extension: str
File extension without the dot.
overwrite: bool
Default is False.
"""
if filename[-5:] == ".fits":
filename = filename[:-5]
if filename[-4:] == ".fit":
filename = filename[:-4]
self.create_image_fits()
self.image_fits = fits.PrimaryHDU(
self.image_fits.data, self.image_fits.header
)
# Save file to disk
self.image_fits.writeto(
filename + "." + extension, overwrite=overwrite
)
self.logger.info("FITS file saved to %s.", filename + "." + extension)
def save_masks(
self,
filename: str = "reduced_image_mask",
extension: str = "fits",
overwrite: bool = False,
):
"""
Save the reduced image to disk.
Parameters
----------
filename: str
Disk location to be written to. Default is at where the
process/subprocess is execuated.
extension: str
File extension without the dot.
overwrite: bool
Default is False.
"""
if filename[-5:] == ".fits":
filename = filename[:-5]
if filename[-4:] == ".fit":
filename = filename[:-4]
bad_mask_fits = fits.PrimaryHDU(self.bad_mask.astype("int"))
bad_pixel_mask_fits = fits.ImageHDU(self.bad_pixel_mask.astype("int"))
saturation_mask_fits = fits.ImageHDU(
self.saturation_mask.astype("int")
)
output_hdu = fits.HDUList(
[bad_mask_fits, bad_pixel_mask_fits, saturation_mask_fits]
)
# Save file to disk
output_hdu.writeto(filename + "." + extension, overwrite=overwrite)
self.logger.info("FITS file saved to %s.", filename + "." + extension)
def inspect(
self,
log: bool = True,
display: bool = True,
renderer: str = "default",
width: int = 1280,
height: int = 720,
return_jsonstring: bool = False,
save_fig: bool = False,
fig_type: str = "iframe+png",
filename: str = None,
open_iframe: bool = False,
):
"""
Display the reduced image with a supported plotly renderer or export
as json strings.
Parameters
----------
log: bool
Log the ADU count per second in the display. Default is True.
display: bool
Set to True to display disgnostic plot.
renderer: str
plotly renderer options.
width: int/float
Number of pixels in the horizontal direction of the outputs
height: int/float
Number of pixels in the vertical direction of the outputs
return_jsonstring: bool (Default: False)
set to True to return json string that can be rendered by Plotly
in any support language.
save_fig: bool (default: False)
Save an image if set to True. Plotly uses the pio.write_html()
or pio.write_image(). The support format types should be provided
in fig_type.
fig_type: string (default: 'iframe+png')
Image type to be saved, choose from:
jpg, png, svg, pdf and iframe. Delimiter is '+'.
filename: str (Default: None)
Filename for the output, all of them will share the same name but
will have different extension.
open_iframe: bool (Default: False)
Open the save_iframe in the default browser if set to True.
Returns
-------
JSON strings if return_jsonstring is set to True.
"""
if log:
fig = go.Figure(
data=go.Heatmap(
z=np.log10(self.light_reduced), colorscale="Viridis"
)
)
else:
fig = go.Figure(
data=go.Heatmap(z=self.light_reduced, colorscale="Viridis")
)
fig.update_layout(
yaxis_title="Spatial Direction / pixel",
xaxis=dict(
zeroline=False,
showgrid=False,
title="Spectral Direction / pixel",
),
bargap=0,
hovermode="closest",
showlegend=False,
autosize=False,
height=height,
width=width,
)
if filename is None:
filename = "reduced_image"
if save_fig:
fig_type_split = fig_type.split("+")
for f_type in fig_type_split:
if f_type == "iframe":
pio.write_html(
fig, filename + "." + f_type, auto_open=open_iframe
)
elif f_type in ["jpg", "png", "svg", "pdf"]:
pio.write_image(fig, filename + "." + f_type)
if display:
if renderer == "default":
fig.show()
else:
fig.show(renderer)
if return_jsonstring:
return fig.to_json()
def list_files(self):
"""
Print the file input list.
"""
for i in self.filelist:
print(i)
[docs]
class ImageReduction(ImageReducer):
"""
Wrapper class of ImageReduction that will be deprecated in the next
release.
"""
def __init_subclass__(self):
import warnings
warnings.warn(
"Class has been renamed `ImageReducer`", DeprecationWarning, 2
)