Source code for aspired.flux_calibration

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""For Flux Calibration"""

import datetime
import difflib
import json
import logging
import os
from typing import Callable, Union

import numpy as np
import pkg_resources
from plotly import graph_objects as go
from plotly import io as pio
from scipy import interpolate as itp
from scipy import signal
from spectresc import spectres
from statsmodels.nonparametric.smoothers_lowess import lowess

from .spectrum_oneD import SpectrumOneD

base_dir = os.path.dirname(__file__)

__all__ = ["StandardLibrary", "FluxCalibration"]


class StandardLibrary:
    """
    This class handles flux calibration by comparing the extracted and
    wavelength-calibrated standard observation to the "ground truth"
    from

    https://github.com/iraf-community/iraf/tree/master/noao/lib/onedstandards
    https://www.eso.org/sci/observing/tools/standards/spectra.html

    See explanation notes at those links for details.

    Parameters
    ----------
    verbose: bool (Default: True)
        Set to False to suppress all verbose warnings, except for
        critical failure.
    logger_name: str (Default: StandardLibrary)
        This will set the name of the logger, if the name is used already,
        it will reference to the existing logger. This will be the
        first part of the default log file name unless log_file_name is
        provided.
    log_level: str (Default: 'INFO')
        Four levels of logging are available, in decreasing order of
        information and increasing order of severity: (1) DEBUG, (2) INFO,
        (3) WARNING, (4) ERROR and (5) CRITICAL. WARNING means that
        there is suboptimal operations in some parts of that step. ERROR
        means that the requested operation cannot be performed, but the
        software can handle it by either using the default setting or
        skipping the operation. CRITICAL means that the requested
        operation cannot be resolved without human interaction, this is
        most usually coming from missing data.
    log_file_folder: None or str (Default: "default")
        Folder in which the file is save, set to default to save to the
        current path.
    log_file_name: None or str (Default: None)
        File name of the log, set to None to logging.warning to screen
        only.

    """

    def __init__(
        self,
        verbose: bool = True,
        logger_name: str = "StandardLibrary",
        log_level: str = "INFO",
        log_file_folder: str = "default",
        log_file_name: str = None,
    ):
        # Set-up logger
        self.logger = logging.getLogger(logger_name)
        if (log_level == "CRITICAL") or (not verbose):
            self.logger.setLevel(logging.CRITICAL)
        elif log_level == "ERROR":
            self.logger.setLevel(logging.ERROR)
        elif log_level == "WARNING":
            self.logger.setLevel(logging.WARNING)
        elif log_level == "INFO":
            self.logger.setLevel(logging.INFO)
        elif log_level == "DEBUG":
            self.logger.setLevel(logging.DEBUG)
        else:
            raise ValueError("Unknonw logging level.")

        formatter = logging.Formatter(
            "[%(asctime)s] %(levelname)s [%(filename)s:%(lineno)d] %(message)s",
            datefmt="%a, %d %b %Y %H:%M:%S",
        )

        if log_file_name is None:
            # Only print log to screen
            self.handler = logging.StreamHandler()
        else:
            if log_file_name == "default":
                d_str = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
                log_file_name = f"{logger_name}_{d_str}.log"
            # Save log to file
            if log_file_folder == "default":
                log_file_folder = ""

            self.handler = logging.FileHandler(
                os.path.join(log_file_folder, log_file_name), "a+"
            )

        self.handler.setFormatter(formatter)
        if self.logger.hasHandlers():
            self.logger.handlers.clear()
        self.logger.addHandler(self.handler)

        self.verbose = verbose

        self.standard_fluxmag_true = False
        self.standard_wave_true = False

        self.library = None
        self.target = None
        self.designation = None
        self.ftype = "flux"
        self.cutoff = 0.4

        self.designation_to_lib_filename = None
        self.lib_to_filename = None
        self.lib_to_designation = None
        self.filename_to_lib = None

        self.spectrum_oned_imported = False

        self._load_standard_dictionary()

    def _load_standard_dictionary(self):
        """
        Load the dictionaries containing the names of all the standard stars.

        """

        self.designation_to_lib_filename = json.load(
            open(
                pkg_resources.resource_filename(
                    "aspired", "standards/designation_to_lib_filename.json"
                ),
                encoding="ascii",
            )
        )
        self.lib_to_filename = json.load(
            open(
                pkg_resources.resource_filename(
                    "aspired", "standards/lib_to_filename.json"
                ),
                encoding="ascii",
            )
        )
        self.lib_to_designation = json.load(
            open(
                pkg_resources.resource_filename(
                    "aspired", "standards/lib_to_designation.json"
                ),
                encoding="ascii",
            )
        )
        self.filename_to_lib = json.load(
            open(
                pkg_resources.resource_filename(
                    "aspired", "standards/filename_to_lib.json"
                ),
                encoding="ascii",
            )
        )

        self.designation_list = self.designation_to_lib_filename.keys()
        self.lib_list = self.lib_to_filename.keys()
        self.filename_list = self.filename_to_lib.keys()

    def _get_eso_standard(self):
        """
        Formatter for reading the ESO standards.

        """

        folder = self.library

        # first letter of the file name
        if self.ftype == "flux":
            filename = "f"

        else:
            filename = "m"

        # match the target designation to the file name
        filename += self.designation_to_lib_filename[self.designation][
            self.library
        ]

        # the extension
        filename += ".dat"

        filepath = os.path.join(base_dir, "standards", folder, filename)

        _f = np.loadtxt(filepath)

        self.standard_wave_true = _f[:, 0]
        self.standard_fluxmag_true = _f[:, 1]

        if self.ftype == "flux":
            if self.library != "esoxshooter":
                self.standard_fluxmag_true /= 10.0**16.0

    def _get_ing_standard(self):
        """
        Formatter for reading the ING standards.

        """

        folder = self.library

        # the first part of the file name
        filename = self.designation_to_lib_filename[self.designation][
            self.library
        ]
        extension = self.library[3:]

        # last letter (or nothing) of the file name
        if self.ftype == "flux":
            if (filename == "g24" or filename == "g157") and (
                extension == "fg"
            ):
                filename += "a"

            if (filename == "h102") and (extension == "sto"):
                filename += "a"

        else:
            filename += "a"

        # the extension
        filename += "." + extension

        filepath = os.path.join(base_dir, "standards", folder, filename)

        _f = open(filepath, encoding="ascii")
        wave = []
        fluxmag = []
        for line in _f.readlines():
            if line[0] in ["*", "S"]:
                if line.startswith("SET .Z.UNITS = "):
                    # remove all special characters and white spaces
                    unit = "".join(
                        e for e in line.split('"')[1].lower() if e.isalnum()
                    )

            else:
                _li = line.strip().strip(":").split()
                wave.append(_li[0])
                fluxmag.append(_li[1])

        _f.close()
        self.standard_wave_true = np.array(wave).astype("float")
        self.standard_fluxmag_true = np.array(fluxmag).astype("float")

        # See https://www.stsci.edu/~strolger/docs/UNITS.txt for the unit
        # conversion.
        if self.ftype == "flux":
            # Trap the ones without flux files
            if (
                extension == "mas"
                or filename == "g24a.fg"
                or filename == "g157a.fg"
                or filename == "h102a.sto"
            ):
                self.standard_fluxmag_true = (
                    10.0 ** (-(self.standard_fluxmag_true / 2.5))
                    * 3630.780548
                    / 3.33564095e4
                    / self.standard_wave_true**2
                )

            # convert milli-Jy into F_lambda
            if unit == "mjy":
                self.standard_fluxmag_true = (
                    self.standard_fluxmag_true
                    * 1e-3
                    * 2.99792458e-05
                    / self.standard_wave_true**2
                )

            # convert micro-Jy into F_lambda
            if unit == "microjanskys":
                self.standard_fluxmag_true = (
                    self.standard_fluxmag_true
                    * 1e-6
                    * 2.99792458e-05
                    / self.standard_wave_true**2
                )

    def _get_iraf_standard(self):
        """
        Formatter for reading the iraf standards.

        """

        folder = self.library

        # file name and extension
        filename = (
            self.designation_to_lib_filename[self.designation][self.library]
            + ".dat"
        )

        filepath = os.path.join(base_dir, "standards", folder, filename)

        _f = np.loadtxt(filepath, skiprows=1)

        self.standard_wave_true = _f[:, 0]
        self.standard_fluxmag_true = _f[:, 1]

        # iraf is always in AB magnitude
        if self.ftype == "flux":
            # Convert from AB mag to flux
            self.standard_fluxmag_true = (
                10.0 ** (-(self.standard_fluxmag_true / 2.5))
                * 3630.780548
                / 3.33564095e4
                / self.standard_wave_true**2
            )

    def lookup_standard_libraries(self, target: str, cutoff: float = 0.4):
        """
        Check if the requested standard and library exist. Return the three
        most similar words if the requested one does not exist. See

            https://docs.python.org/3.7/library/difflib.html

        This method tries to match the designation of the standard star
        that is available on SIMBAD or as named by the Isaac Newton Group of
        Telescopes. All comparisons are performed in lower case, all space
        and special symbols (expect ".", "+", "-", "_") are stripped.

        Parameters
        ----------
        target: str
            Name/designation of the standard star
        cutoff: float (Default: 0.4)
            The similarity toleranceold
            [0 (completely different) - 1 (identical)]

        """

        library_list = []
        filename_list = []

        if not isinstance(target, str):
            error_msg = (
                f"Target name has to be of type str, {type(target)} is"
                " provided."
            )
            self.logger.critical(error_msg)

        # If the provided designation exists
        if target.lower() in self.designation_list:
            _filename_list = self.designation_to_lib_filename[target.lower()]

            exact_match = True

        else:
            # If the requested target is not in any library, suggest the
            # closest match, Top 5 are returned.
            # difflib uses Gestalt pattern matching.
            designation_list = difflib.get_close_matches(
                target.lower(),
                list(self.designation_to_lib_filename.keys()),
                n=5,
                cutoff=cutoff,
            )

            for designation in designation_list:
                _filename_list = self.designation_to_lib_filename[designation]

            exact_match = False

        for libname, filename in _filename_list.items():
            filename_list.append(filename)
            library_list.append(libname)

        if len(filename_list) > 0:
            self.logger.warning(
                "Requested standard star cannot be found, a list of"
                " the closest matching names are returned:"
                f" {filename_list}."
            )
        else:
            error_msg = (
                (
                    "Please check the name of your standard star, nothing "
                    f"share a similarity above {cutoff}."
                ),
            )
            self.logger.critical(error_msg)
            raise ValueError(error_msg)

        # Return pair(s) of filename and library
        return [
            (f, l) for l, f in zip(library_list, filename_list)
        ], exact_match

    def lookup_closet_match_in_library(
        self, target: str, library: str, cutoff: float = 0.2
    ):
        """
        Check if the requested standard and library exist. Only if the
        similarity is better than (by default) 0.2 a target name will be returned. See

            https://docs.python.org/3.11/library/difflib.html

        Parameters
        ----------
        target: str
            Desination of the standard star
        library: str
            Name of the library
        cutoff: float (Default: 0.4)
            The toleranceold for the word similarity in the range of [0, 1].

        Return:
        -------
        The closest designation to the target in that (closet) library.

        """

        # Load the list of libraries
        library_name = difflib.get_close_matches(
            library, list(self.lib_to_designation.keys()), n=1, cutoff=cutoff
        )

        if library_name == []:
            return None, None

        else:
            library_name = library_name[0]

        # difflib uses Gestalt pattern matching.
        target_designation = difflib.get_close_matches(
            target.lower(),
            self.lib_to_designation[library_name],
            n=1,
            cutoff=cutoff,
        )
        if target_designation == []:
            target_designation = None

        else:
            target_designation = target_designation[0]

        return target_designation, library_name

    def load_standard(
        self,
        target: str,
        library: str = None,
        ftype: str = "flux",
        cutoff: float = 0.4,
    ):
        """
        Read the standard flux/magnitude file. And return the wavelength and
        flux/mag. The units of the data are always in

        | wavelength: A
        | flux:       ergs / cm / cm / s / A
        | mag:        mag (AB)

        Parameters
        ----------
        target: string
            Name of the standard star
        library: string (Default: None)
            Name of the library of standard star
        ftype: string (Default: 'flux')
            'flux' or 'mag'
        cutoff: float (Default: 0.4)
            The toleranceold for the word similarity in the range of [0, 1].

        """

        self.target = target.lower()
        self.ftype = ftype.lower()
        self.cutoff = cutoff

        # If there is a close match from the user-provided library, use
        # that first, it will only accept the library and target if the
        # cutoff is above 0.4
        self.library = library
        if self.library is not None:
            (
                _designation,
                _library,
            ) = self.lookup_closet_match_in_library(self.target, self.library)

            if _designation is not None:
                self.designation = _designation
                self.library = _library
                self.logger.info(
                    f"The requested standard star {self.target} is found in the"
                    f" given library {self.library}."
                )

        # If not, search again with the first one returned from lookup.
        else:
            libraries, success = self.lookup_standard_libraries(self.target)

            if success:
                self.target, self.library = libraries[0]

                if not np.in1d([library], libraries):
                    self.logger.warning(
                        "The requested standard star cannot be found in the"
                        " given library, or the library is not specified."
                        f" ASPIRED is using {self.library}."
                    )

            else:
                # When success is Flase, the libraries is a list of standards
                self.target, self.library = libraries[0]

                self.logger.warning(
                    f"The requested library does not exist, {self.library} "
                    "is used because it has the closest matching name."
                )

            self.designation, _ = self.lookup_closet_match_in_library(
                self.target, self.library
            )

        if self.library.startswith("iraf"):
            self._get_iraf_standard()

        if self.library.startswith("ing"):
            self._get_ing_standard()

        if self.library.startswith("eso"):
            self._get_eso_standard()

    def inspect_standard(
        self,
        display: bool = True,
        renderer: str = "default",
        width: int = 1280,
        height: int = 720,
        return_jsonstring: bool = False,
        save_fig: bool = False,
        fig_type: str = "iframe+png",
        filename: str = None,
        open_iframe: bool = False,
    ):
        """
        Display the standard star plot.

        Parameters
        ----------
        display: bool (Default: True)
            Set to True to display disgnostic plot.
        renderer: string (Default: default)
            plotly renderer options.
        width: int/float (Default: 1280)
            Number of pixels in the horizontal direction of the outputs
        height: int/float (Default: 720)
            Number of pixels in the vertical direction of the outputs
        return_jsonstring: bool (Default: False)
            set to True to return json string that can be rendered by Plotly
            in any support language.
        save_fig: bool (default: False)
            Save an image if set to True. Plotly uses the pio.write_html()
            or pio.write_image(). The support format types should be provided
            in fig_type.
        fig_type: string (default: 'iframe+png')
            Image type to be saved, choose from:
            jpg, png, svg, pdf and iframe. Delimiter is '+'.
        filename: str (Default: None)
            Filename for the output, all of them will share the same name but
            will have different extension.
        open_iframe: bool (Default: False)
            Open the iframe in the default browser if set to True.

        Returns
        -------
        JSON strings if return_jsonstring is set to True.

        """

        fig = go.Figure(
            layout=dict(
                autosize=False,
                height=height,
                width=width,
                updatemenus=list(
                    [
                        dict(
                            active=0,
                            buttons=list(
                                [
                                    dict(
                                        label="Log Scale",
                                        method="update",
                                        args=[
                                            {"visible": [True, True]},
                                            {
                                                "title": "Log scale",
                                                "yaxis": {"type": "log"},
                                            },
                                        ],
                                    ),
                                    dict(
                                        label="Linear Scale",
                                        method="update",
                                        args=[
                                            {"visible": [True, False]},
                                            {
                                                "title": "Linear scale",
                                                "yaxis": {"type": "linear"},
                                            },
                                        ],
                                    ),
                                ]
                            ),
                        )
                    ]
                ),
                title="Log scale",
            )
        )

        # show the image on the top
        fig.add_trace(
            go.Scatter(
                x=self.standard_wave_true,
                y=self.standard_fluxmag_true,
                line=dict(color="royalblue", width=4),
            )
        )

        fig.update_layout(
            title=self.library + ": " + self.target + " " + self.ftype,
            xaxis_title=r"$\text{Wavelength / A}$",
            yaxis_title=(
                r"$\text{Flux / ergs cm}^{-2} \text{s}^{-1}" + "\text{A}^{-1}$"
            ),
            hovermode="closest",
            showlegend=False,
        )

        if filename is None:
            filename = "standard"

        if save_fig:
            fig_type_split = fig_type.split("+")

            for _t in fig_type_split:
                if _t == "iframe":
                    pio.write_html(
                        fig, filename + "." + _t, auto_open=open_iframe
                    )

                elif _t in ["jpg", "png", "svg", "pdf"]:
                    pio.write_image(fig, filename + "." + _t)

        if display:
            if renderer == "default":
                fig.show()

            else:
                fig.show(renderer)

        if return_jsonstring:
            return fig.to_json()


[docs]class FluxCalibration(StandardLibrary): """ For flux calibration using iraf, ING and ESO standards. """ def __init__( self, verbose: bool = True, logger_name: str = "FluxCalibration", log_level: str = "INFO", log_file_folder: str = "default", log_file_name: str = None, ): """ Initialise a FluxCalibration object. Parameters ---------- verbose: bool (Default: True) Set to False to suppress all verbose warnings, except for critical failure. logger_name: str (Default: FluxCalibration) This will set the name of the logger, if the name is used already, it will reference to the existing logger. This will be the first part of the default log file name unless log_file_name is provided. log_level: str (Default: 'INFO') Four levels of logging are available, in decreasing order of information and increasing order of severity: CRITICAL, DEBUG, INFO, WARNING, ERROR log_file_folder: None or str (Default: "default") Folder in which the file is save, set to default to save to the current path. log_file_name: None or str (Default: None) File name of the log, set to None to logging.warning to screen only. """ # Set-up logger self.logger = logging.getLogger(logger_name) if (log_level == "CRITICAL") or (not verbose): logging.basicConfig(level=logging.CRITICAL) if log_level == "ERROR": logging.basicConfig(level=logging.ERROR) if log_level == "WARNING": logging.basicConfig(level=logging.WARNING) if log_level == "INFO": logging.basicConfig(level=logging.INFO) if log_level == "DEBUG": logging.basicConfig(level=logging.DEBUG) formatter = logging.Formatter( "[%(asctime)s] %(levelname)s [%(filename)s:%(lineno)d] %(message)s", datefmt="%a, %d %b %Y %H:%M:%S", ) if log_file_name is None: # Only logging.warning log to screen handler = logging.StreamHandler() else: if log_file_name == "default": d_str = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") log_file_name = f"{logger_name}_{d_str}.log" # Save log to file if log_file_folder == "default": log_file_folder = "" handler = logging.FileHandler( os.path.join(log_file_folder, log_file_name), "a+" ) handler.setFormatter(formatter) self.logger.addHandler(handler) self.verbose = verbose self.logger_name = logger_name self.log_level = log_level self.log_file_folder = log_file_folder self.log_file_name = log_file_name # Load the dictionary super().__init__( verbose=self.verbose, logger_name=self.logger_name, log_level=self.log_level, log_file_folder=self.log_file_folder, log_file_name=self.log_file_name, ) self.verbose = verbose self.spectrum_oned = SpectrumOneD( spec_id=0, verbose=self.verbose, logger_name=self.logger_name, log_level=self.log_level, log_file_folder=self.log_file_folder, log_file_name=self.log_file_name, ) self.target_spec_id = None self.standard_wave_true = None self.standard_fluxmag_true = None self.count_continuum = None self.flux_continuum = None
[docs] def from_spectrum_oned( self, spectrum_oned: SpectrumOneD, merge: bool = False, overwrite: bool = False, ): """ This function copies all the info from the spectrum_oned, because users may supply different level/combination of reduction, everything is copied from the spectrum_oned even though in most cases only a None will be passed. By default, this is passing object by reference by default, so it directly modifies the spectrum_oned supplied. By setting merger to True, it copies the data into the SpectrumOneD in the FluxCalibration object. Parameters ---------- spectrum_oned: SpectrumOneD object The SpectrumOneD to be referenced or copied. merge: bool (Default: False) Set to True to copy everything over to the local SpectrumOneD, hence FluxCalibration will not be acting on the SpectrumOneD outside. """ if merge: self.spectrum_oned.merge(spectrum_oned, overwrite=overwrite) else: self.spectrum_oned = spectrum_oned self.spectrum_oned_imported = True
[docs] def remove_spectrum_oned(self): """ Delete the spectrum_oned object. """ self.spectrum_oned = SpectrumOneD( spec_id=0, verbose=self.verbose, logger_name=self.logger_name, log_level=self.log_level, log_file_folder=self.log_file_folder, log_file_name=self.log_file_name, ) self.spectrum_oned_imported = False
[docs] def load_standard( self, target: str, library: str = None, ftype: str = "flux", cutoff: float = 0.4, ): """ Read the standard flux/magnitude file. And return the wavelength and flux/mag. The units of the data are always in | wavelength: A | flux: ergs / cm / cm / s / A | mag: mag (AB) Parameters ---------- target: string Name of the standard star library: string (Default: None) Name of the library of standard star ftype: string (Default: 'flux') 'flux' or 'mag' cutoff: float (Default: 0.4) The toleranceold for the word similarity in the range of [0, 1]. """ super().load_standard( target=target, library=library, ftype=ftype, cutoff=cutoff ) # the best target and library found can be different from the input self.spectrum_oned.add_standard_star( library=self.library, target=self.target )
[docs] def add_standard( self, wavelength: Union[np.ndarray, list], count: Union[np.ndarray, list], count_err: Union[np.ndarray, list] = None, count_sky: Union[np.ndarray, list] = None, ): """ Add spectrum (wavelength, count, count_err & count_sky). Parameters ---------- wavelength: 1-d array The wavelength at each pixel of the trace. count: 1-d array The summed count at each column about the trace. count_err: 1-d array (Default: None) the uncertainties of the count values count_sky: 1-d array (Default: None) The integrated sky values along each column, suitable for subtracting from the output of ap_extract """ self.spectrum_oned.add_wavelength(wavelength) self.spectrum_oned.add_count(count, count_err, count_sky)
[docs] def get_telluric_profile( self, wave: Union[np.ndarray, list], flux: Union[np.ndarray, list], continuum: Union[np.ndarray, list], mask_range: Union[np.ndarray, list] = [[6850, 6960], [7580, 7700]], return_function: bool = False, ): """ Getting the Telluric absorption profile from the continuum of the standard star spectrum. Parameters ---------- wave: list or 1-d array (N) Wavelength. flux: list or 1-d array (N) Flux. continuum: list or 1-d array (N) Continuum Flux. mask_range: list of list list of lists with 2 values indicating the range marked by each of the Telluric regions. return_function: bool (Default: False) Set to True to explicitly return the interpolated function of the Telluric profile. """ telluric_profile = np.zeros_like(wave) # Get the continuum of the spectrum # This should give the POSITIVE values over the telluric region residual = continuum - flux for m_range in mask_range: # Get the indices for the two sides of the masking region # at the native pixel scale left_of_mask = np.where(wave <= m_range[0])[0] right_of_mask = np.where(wave >= m_range[1])[0] if (len(left_of_mask) == 0) or (len(right_of_mask) == 0): continue left_telluric_start = int(max(left_of_mask)) right_telluric_end = int(min(right_of_mask)) + 1 telluric_profile[ left_telluric_start:right_telluric_end ] = residual[left_telluric_start:right_telluric_end] # normalise the profile telluric_factor = np.ptp(telluric_profile) telluric_profile /= telluric_factor # If the spectrum doesn't cover any given telluric mask regions if np.isnan(telluric_profile).all(): telluric_profile = np.zeros_like(telluric_profile) telluric_func = itp.interp1d( wave, telluric_profile, fill_value="extrapolate" ) self.spectrum_oned.add_telluric_func(telluric_func) self.spectrum_oned.add_telluric_profile(telluric_profile) self.spectrum_oned.add_telluric_factor(telluric_factor) if return_function: return telluric_func, telluric_profile, telluric_factor
[docs] def inspect_telluric_profile( self, display: bool = True, renderer: str = "default", width: int = 1280, height: int = 720, return_jsonstring: bool = False, save_fig: bool = False, fig_type: str = "iframe+png", filename: str = None, open_iframe: bool = False, ): """ Display the Telluric profile. Parameters ---------- display: bool (Default: True) Set to True to display disgnostic plot. renderer: string (Default: 'default') plotly renderer options. width: int/float (Default: 1280) Number of pixels in the horizontal direction of the outputs height: int/float (Default: 720) Number of pixels in the vertical direction of the outputs return_jsonstring: bool (Default: False) set to True to return json string that can be rendered by Plotly in any support language. save_fig: bool (default: False) Save an image if set to True. Plotly uses the pio.write_html() or pio.write_image(). The support format types should be provided in fig_type. fig_type: string (default: 'iframe+png') Image type to be saved, choose from: jpg, png, svg, pdf and iframe. Delimiter is '+'. filename: str (Default: None) Filename for the output, all of them will share the same name but will have different extension. open_iframe: bool (Default: False) Open the iframe in the default browser if set to True. Returns ------- JSON strings if return_jsonstring is set to True. """ fig = go.Figure( layout=dict( autosize=False, height=height, width=width, title="Log scale" ) ) # show the image on the top self.logger.debug(np.asarray(self.spectrum_oned.wave)) self.logger.debug( self.spectrum_oned.telluric_func( np.asarray(self.spectrum_oned.wave) ) ) fig.add_trace( go.Scatter( x=np.asarray(self.spectrum_oned.wave), y=self.spectrum_oned.telluric_func( np.asarray(self.spectrum_oned.wave) ), line=dict(color="royalblue", width=4), name="Flux", ) ) fig.update_layout( hovermode="closest", title="Telluric Profile", showlegend=True, xaxis_title=r"$\text{Wavelength / A}$", yaxis_title="Arbitrary", yaxis=dict(title="Flux"), legend=go.layout.Legend( x=0, y=1, traceorder="normal", font=dict(family="sans-serif", size=12, color="black"), bgcolor="rgba(0,0,0,0)", ), ) if filename is None: filename = "telluric_profile" if save_fig: fig_type_split = fig_type.split("+") for f_type in fig_type_split: if f_type == "iframe": pio.write_html( fig, filename + "." + f_type, auto_open=open_iframe ) elif f_type in ["jpg", "png", "svg", "pdf"]: pio.write_image(fig, filename + "." + f_type) if display: if renderer == "default": fig.show() else: fig.show(renderer) if return_jsonstring: return fig.to_json()
[docs] def get_sensitivity( self, k: int = 3, method: str = "interpolate", mask_range: Union[np.ndarray, list] = [ [6850.0, 6960.0], [7580.0, 7700.0], ], mask_fit_order: int = 1, mask_fit_size: int = 3, smooth: bool = True, return_function: bool = True, sens_deg: int = 7, use_continuum: bool = False, **kwargs: str, ): """ The sensitivity curve is computed by dividing the true values by the wavelength calibrated standard spectrum, which is resampled with the spectres.spectres(). The curve is then interpolated with a cubic spline by default and is stored as a scipy interp1d object. 6850 - 6960, 7575 - 7700, and 8925 - 9050 A are masked by default. Parameters ---------- k: integer [1,2,3,4,5 only] The order of the spline. method: str (Default: interpolate) This should be either 'interpolate' of 'polynomial'. Note that the polynomial is computed from the interpolated function. The default is interpolate because it is much more stable at the wavelength limits of a spectrum in an automated system. mask_range: None or list of list Masking out regions of Telluric absorption when fitting the sensitivity curve. None for no mask. List of list has the pattern [[min_pix_1, max_pix_1], [min_pix_2, max_pix_2],...] mask_fit_order: int (Default: 1) Order of polynomial to be fitted over the masked regions mask_fit_size: int (Default: 3) Number of "pixels" to be fitted on each side of the masked regions. smooth: bool (Default: False) set to smooth the input spectrum with a lowess function with statsmodels return_function: bool (Default: True) Set to True to explicity return the interpolated function of the sensitivity curve. sens_deg: int (Default: 7) The degree of polynomial of the sensitivity curve, only used if the method is 'polynomial'. use_continuum: bool (Default: False) Set to True to use continuum for finding the sensitivity function. If used, the smoothing filter will be applied on the continuum. **kwargs: keyword arguments for passing to the LOWESS function for getting the continuum, see `statsmodels.nonparametric.smoothers_lowess.lowess()` Returns ------- A callable function as a function of wavelength if return_function is set to True. """ # resampling both the observed and the database standard spectra # in unit of flux per second. The higher resolution spectrum is # resampled to match the lower resolution one. if use_continuum: count = np.array(getattr(self.spectrum_oned, "count_continuum")) else: count = np.array(getattr(self.spectrum_oned, "count")) count_err = np.array(getattr(self.spectrum_oned, "count_err")) wave = np.array(getattr(self.spectrum_oned, "wave")) exptime = getattr(self.spectrum_oned, "exptime") if exptime is None: exptime = 1.0 self.logger.info( ( "The exposure time used for computing sensitivity curve " f"is {exptime} seconds." ), ) # Mask regions before smoothing and avoiding telluric absorptions if mask_range is not None: for m in mask_range: # If the mask is partially outside the spectrum, ignore if (m[0] < min(wave)) or (m[1] > max(wave)): continue # Get the indices for the two sides of the masking region left_end = int(max(np.where(wave <= m[0])[0])) + 1 left_start = int(left_end - mask_fit_size) right_start = int(min(np.where(wave >= m[1])[0])) right_end = int(right_start + mask_fit_size) + 1 # Get the wavelengths of the two sides wave_temp = np.concatenate( ( wave[left_start:left_end], wave[right_start:right_end], ) ) # Get the count of the two sides count_temp = np.concatenate( ( count[left_start:left_end], count[right_start:right_end], ) ) finite_mask = ( ~np.isnan(count_temp) & (count_temp > 0.0) & np.isfinite(count_temp) ) # Fit the polynomial across the masked region coeff = np.polynomial.polynomial.polyfit( wave_temp[finite_mask], count_temp[finite_mask], mask_fit_order, ) # Replace the snsitivity values with the fitted values count[left_end:right_start] = np.polynomial.polynomial.polyval( wave[left_end:right_start], coeff ) # This applies a lowess filter from statsmodels that # uses a different lowess_frac default value that is more appropriate in # getting a first guess continuum which reject "outliers" much more # aggressively. if smooth: if kwargs is None: kwargs = {} if "frac" not in kwargs: kwargs["frac"] = 0.1 if "return_sorted" not in kwargs: kwargs["return_sorted"] = False count = lowess(count, wave, **kwargs) # Set the smoothing parameters self.spectrum_oned.add_smoothing(smooth, **kwargs) # If the median resolution of the observed spectrum is higher than # the literature one if np.nanmedian(np.array(np.ediff1d(wave))) < np.nanmedian( np.array(np.ediff1d(self.standard_wave_true)) ): standard_count, _ = spectres( np.array(self.standard_wave_true).reshape(-1), np.array(wave).reshape(-1), np.array(count).reshape(-1), np.array(count_err).reshape(-1), verbose=True, ) standard_flux_true = self.standard_fluxmag_true standard_wave_true = self.standard_wave_true # If the median resolution of the observed spectrum is lower than # the literature one else: standard_count = count # standard_flux_err = count_err standard_flux_true = spectres( np.array(wave).reshape(-1), np.array(self.standard_wave_true).reshape(-1), np.array(self.standard_fluxmag_true).reshape(-1), verbose=True, ) standard_wave_true = wave # Get the sensitivity curve and convert the unit to per second sensitivity = standard_flux_true / (standard_count / exptime) mask = np.isfinite(np.log10(sensitivity)) & ~np.isnan( np.log10(sensitivity) ) sensitivity_masked = sensitivity.copy()[mask] standard_wave_masked = standard_wave_true[mask] standard_flux_masked = standard_flux_true[mask] if method == "interpolate": tck = itp.splrep( standard_wave_masked, np.log10(sensitivity_masked), k=k ) def sensitivity_func(_x): return itp.splev(_x, tck) elif method == "polynomial": coeff = np.polynomial.polynomial.polyfit( standard_wave_masked, np.log10(sensitivity_masked), deg=sens_deg, ) def sensitivity_func(x): return np.polynomial.polynomial.polyval(x, coeff) else: error_msg = "{method} is not implemented." self.logger.critical(error_msg) raise NotImplementedError(error_msg) self.spectrum_oned.add_sensitivity(sensitivity_masked) self.spectrum_oned.add_literature_standard( standard_wave_masked, standard_flux_masked ) # Add to each SpectrumOneD object self.spectrum_oned.add_sensitivity_func(sensitivity_func) if return_function: return sensitivity_func
[docs] def add_sensitivity_func(self, sensitivity_func: Callable): """ parameters ---------- sensitivity_func: callable function Interpolated sensivity curve object (in unit of per second). """ # Add to both science and standard spectrum_list self.spectrum_oned.add_sensitivity_func( sensitivity_func=sensitivity_func ) self.spectrum_oned.add_literature_standard( self.standard_wave_true, self.standard_fluxmag_true )
[docs] def save_sensitivity_func(self): """ Saving the sensitivity function to disk, to be implemented. """ pass
[docs] def inspect_sensitivity( self, display: bool = True, renderer: str = "default", width: int = 1280, height: int = 720, return_jsonstring: bool = False, save_fig: bool = False, fig_type: str = "iframe+png", filename: str = None, open_iframe: bool = False, ): """ Display the computed sensitivity curve. Parameters ---------- display: bool (Default: True) Set to True to display disgnostic plot. renderer: string (Default: 'default') plotly renderer options. width: int/float (Default: 1280) Number of pixels in the horizontal direction of the outputs height: int/float (Default: 720) Number of pixels in the vertical direction of the outputs return_jsonstring: bool (Default: False) set to True to return json string that can be rendered by Plotly in any support language. save_fig: bool (default: False) Save an image if set to True. Plotly uses the pio.write_html() or pio.write_image(). The support format types should be provided in fig_type. fig_type: string (default: 'iframe+png') Image type to be saved, choose from: jpg, png, svg, pdf and iframe. Delimiter is '+'. filename: str (Default: None) Filename for the output, all of them will share the same name but will have different extension. open_iframe: bool (Default: False) Open the iframe in the default browser if set to True. Returns ------- JSON strings if return_jsonstring is set to True. """ wave_literature = getattr(self.spectrum_oned, "wave_literature") flux_literature = getattr(self.spectrum_oned, "flux_literature") sensitivity = getattr(self.spectrum_oned, "sensitivity") sensitivity_func = getattr(self.spectrum_oned, "sensitivity_func") library = getattr(self.spectrum_oned, "library") target = getattr(self.spectrum_oned, "target") fig = go.Figure( layout=dict( autosize=False, height=height, width=width, updatemenus=list( [ dict( active=0, buttons=list( [ dict( label="Log Scale", method="update", args=[ {"visible": [True, True]}, { "title": "Log scale", "yaxis": {"type": "log"}, }, ], ), dict( label="Linear Scale", method="update", args=[ {"visible": [True, False]}, { "title": "Linear scale", "yaxis": {"type": "linear"}, }, ], ), ] ), ) ] ), title="Log scale", ) ) # show the image on the top fig.add_trace( go.Scatter( x=wave_literature, y=flux_literature, line=dict(color="royalblue", width=4), name="Count / s (Observed)", ) ) fig.add_trace( go.Scatter( x=wave_literature, y=sensitivity, yaxis="y2", line=dict(color="firebrick", width=4), name="Sensitivity Curve", ) ) fig.add_trace( go.Scatter( x=wave_literature, y=10.0 ** sensitivity_func(wave_literature), yaxis="y2", line=dict(color="black", width=2), name="Best-fit Sensitivity Curve", ) ) fig.update_layout( title=library + ": " + target, yaxis_title="Count / s" ) fig.update_layout( hovermode="closest", showlegend=True, xaxis_title=r"$\text{Wavelength / A}$", yaxis=dict(title="Count / s"), yaxis2=dict( title="Sensitivity Curve", type="log", anchor="x", overlaying="y", side="right", ), legend=go.layout.Legend( x=0, y=1, traceorder="normal", font=dict(family="sans-serif", size=12, color="black"), bgcolor="rgba(0,0,0,0)", ), ) if filename is None: filename = "senscurve" if save_fig: fig_type_split = fig_type.split("+") for f_type in fig_type_split: if f_type == "iframe": pio.write_html( fig, filename + "." + f_type, auto_open=open_iframe ) elif f_type in ["jpg", "png", "svg", "pdf"]: pio.write_image(fig, filename + "." + f_type) if display: if renderer == "default": fig.show() else: fig.show(renderer) if return_jsonstring: return fig.to_json()
[docs] def apply_flux_calibration( self, target_spectrum_oned: SpectrumOneD, inspect: bool = False, wave_min: float = None, wave_max: float = None, display: bool = False, renderer: str = "default", width: int = 1280, height: int = 720, return_jsonstring: bool = False, save_fig: bool = False, fig_type: str = "iframe+png", filename: str = None, open_iframe: bool = False, ): """ Apply the computed sensitivity curve. And resample the spectra to match the highest resolution (the smallest wavelength bin) part of the spectrum. Note: This function directly modify the *target_spectrum_oned*. Note 2: the wave_min and wave_max are for DISPLAY purpose only. Parameters ---------- target_spectrum_oned: SpectrumOneD object The spectrum to be flux calibrated. inspect: bool (Default: False) Set to True to create/display/save figure wave_min: float (Default: None -> 3500) Minimum wavelength to display wave_max: float (Default: None -> 8500) Maximum wavelength to display display: bool (Default: False) Set to True to display disgnostic plot. renderer: string (Default: 'default') plotly renderer options. width: int/float (Default: 1280) Number of pixels in the horizontal direction of the outputs height: int/float (Default: 720) Number of pixels in the vertical direction of the outputs return_jsonstring: bool (Default: False) set to True to return json string that can be rendered by Plotly in any support language. save_fig: bool (default: False) Save an image if set to True. Plotly uses the pio.write_html() or pio.write_image(). The support format types should be provided in fig_type. fig_type: string (default: 'iframe+png') Image type to be saved, choose from: jpg, png, svg, pdf and iframe. Delimiter is '+'. filename: str (Default: None) Filename for the output, all of them will share the same name but will have different extension. open_iframe: bool (Default: False) Open the iframe in the default browser if set to True. Returns ------- JSON strings if return_jsonstring is set to True. """ self.target_spec_id = getattr(target_spectrum_oned, "spec_id") wave = getattr(target_spectrum_oned, "wave") count = getattr(target_spectrum_oned, "count") count_err = getattr(target_spectrum_oned, "count_err") count_sky = getattr(target_spectrum_oned, "count_sky") exptime = getattr(target_spectrum_oned, "exptime") if exptime is None: exptime = 1.0 self.logger.info( f"The exposure time used for flux calibration is {exptime} seconds." ) # apply the flux calibration sensitivity_func = getattr(self.spectrum_oned, "sensitivity_func") sensitivity = 10.0 ** sensitivity_func(wave) / exptime flux = sensitivity * count if count_err is not None: flux_err = sensitivity * count_err if count_sky is not None: flux_sky = sensitivity * count_sky target_spectrum_oned.add_flux(flux, flux_err, flux_sky) target_spectrum_oned.add_sensitivity(sensitivity) # Add the rest of the flux calibration parameters target_spectrum_oned.merge(self.spectrum_oned) if wave_min is None: wave_min = 3500.0 if wave_max is None: wave_max = 8500.0 if inspect: wave_mask = (np.array(wave).reshape(-1) > wave_min) & ( np.array(wave).reshape(-1) < wave_max ) flux_low = ( np.nanpercentile(np.array(flux).reshape(-1)[wave_mask], 5) / 1.5 ) flux_high = ( np.nanpercentile(np.array(flux).reshape(-1)[wave_mask], 95) * 1.5 ) flux_mask = (np.array(flux).reshape(-1) > flux_low) & ( np.array(flux).reshape(-1) < flux_high ) flux_min = np.log10( np.nanmin(np.array(flux).reshape(-1)[flux_mask]) ) flux_max = np.log10( np.nanmax(np.array(flux).reshape(-1)[flux_mask]) ) fig = go.Figure( layout=dict( autosize=False, height=height, width=width, updatemenus=list( [ dict( active=0, buttons=list( [ dict( label="Log Scale", method="update", args=[ {"visible": [True, True]}, { "title": "Log scale", "yaxis": {"type": "log"}, }, ], ), dict( label="Linear Scale", method="update", args=[ {"visible": [True, False]}, { "title": "Linear scale", "yaxis": { "type": "linear" }, }, ], ), ] ), ) ] ), title="Log scale", ) ) # show the image on the top fig.add_trace( go.Scatter( x=wave, y=flux, line=dict(color="royalblue"), name="Flux", ) ) if flux_err is not None: fig.add_trace( go.Scatter( x=wave, y=flux_err, line=dict(color="firebrick"), name="Flux Uncertainty", ) ) if flux_sky is not None: fig.add_trace( go.Scatter( x=wave, y=flux_sky, line=dict(color="orange"), name="Sky Flux", ) ) fig.update_layout( hovermode="closest", showlegend=True, xaxis=dict(title="Wavelength / A", range=[wave_min, wave_max]), yaxis=dict( title="Flux", range=[flux_min, flux_max], type="log" ), legend=go.layout.Legend( x=0, y=1, traceorder="normal", font=dict(family="sans-serif", size=12, color="black"), bgcolor="rgba(0,0,0,0)", ), ) if filename is None: filename = "spectrum_" + str(self.target_spec_id) else: filename = ( os.path.splitext(filename)[0] + "_" + str(self.target_spec_id) ) if save_fig: fig_type_split = fig_type.split("+") for f_type in fig_type_split: if f_type == "iframe": pio.write_html( fig, filename + "." + f_type, auto_open=open_iframe ) elif f_type in ["jpg", "png", "svg", "pdf"]: pio.write_image(fig, filename + "." + f_type) if display: if renderer == "default": fig.show() else: fig.show(renderer) if return_jsonstring: return fig.to_json()
[docs] def create_fits( self, output: str = "count+wavelength+sensitivity+flux", empty_primary_hdu: bool = True, recreate: bool = False, ): """ Parameters ---------- output: String Type of data to be saved, the order is fixed (in the order of the following description), but the options are flexible. The input strings are delimited by "+", count: 4 HDUs Count, uncertainty, sky, optimal flag, and weight (pixel) wavelength: 1 HDU Wavelength of each pixel sensitivity: 1 HDU Sensitivity (pixel) flux: 4 HDUs Flux, uncertainty, and sky empty_primary_hdu: bool (Default: True) Set to True to leave the Primary HDU blank (Default: True) recreate: bool (Default: False) Set to True to overwrite the FITS data and header. """ # If flux is calibrated self.spectrum_oned.create_fits( output=output, empty_primary_hdu=empty_primary_hdu, recreate=recreate, )
[docs] def save_fits( self, output: str = "count+wavelength+sensitivity+flux", filename: str = "fluxcal", empty_primary_hdu: bool = True, overwrite: bool = False, recreate: bool = False, ): """ Save the reduced data to disk, with a choice of any combination of the data that are already present in the SpectrumOneD, see below the 'output' parameters for details. Parameters ---------- output: String Type of data to be saved, the order is fixed (in the order of the following description), but the options are flexible. The input strings are delimited by "+". Because a FluxCalibration only requires a subset of all the data, only a few data products are guaranteed to exist. count: 4 HDUs Count, uncertainty, sky, optimal flag, and weight (pixel) wavelength: 1 HDU Wavelength of each pixel sensitivity: 1 HDU Sensitivity (pixel) flux: 4 HDUs Flux, uncertainty, and sky filename: String Disk location to be written to. Default is at where the process/subprocess is execuated. empty_primary_hdu: bool (Default: True) Set to True to leave the Primary HDU blank (Default: True) overwrite: bool Default is False. recreate: bool (Default: False) Set to True to overwrite the FITS data and header. """ # Fix the names and extensions if self.target_spec_id is not None: filename = ( os.path.splitext(filename)[0] + "_" + str(self.target_spec_id) ) else: filename = os.path.splitext(filename)[0] self.spectrum_oned.save_fits( output=output, filename=filename, overwrite=overwrite, recreate=recreate, empty_primary_hdu=empty_primary_hdu, )
[docs] def save_csv( self, output: str = "count+wavelength+sensitivity+flux", filename: str = "fluxcal", overwrite: bool = False, recreate: bool = False, ): """ Save the reduced data to disk, with a choice of any combination of the data that are already present in the SpectrumOneD, see below the 'output' parameters for details. Parameters ---------- output: String Type of data to be saved, the order is fixed (in the order of the following description), but the options are flexible. The input strings are delimited by "+". Because a FluxCalibration only requires a subset of all the data, only a few data products are guaranteed to exist. count: 4 HDUs Count, uncertainty, sky, optimal flag, and weight (pixel) wavelength: 1 HDU Wavelength of each pixel sensitivity: 1 HDU Sensitivity (pixel) flux: 4 HDUs Flux, uncertainty, and sky filename: String (Default: None) Disk location to be written to. Default is at where the process/subprocess is execuated. overwrite: bool (Default: False) Set to True to allow overwriting the FITS data at the file destination. recreate: bool (Default: False) Set to True to regenerate the FITS data and header. """ # Fix the names and extensions if self.target_spec_id is not None: filename = ( os.path.splitext(filename)[0] + "_" + str(self.target_spec_id) ) else: filename = os.path.splitext(filename)[0] self.spectrum_oned.save_csv( output=output, filename=filename, overwrite=overwrite, recreate=recreate, )
[docs] def get_spectrum_oned(self): """ Return the spectrum_oned object. """ return self.spectrum_oned