Source code for aspired.flux_calibration

# -*- coding: utf-8 -*-
import datetime
import difflib
import json
import logging
import os
import pkg_resources

import numpy as np
from plotly import graph_objects as go
from plotly import io as pio
from scipy import signal
from scipy import interpolate as itp

try:
    from spectres import spectres_numba as spectres
except ImportError as err:
    print(err)
    from spectres import spectres

from .spectrum1D import Spectrum1D
from .util import get_continuum

base_dir = os.path.dirname(__file__)

__all__ = ["StandardLibrary", "FluxCalibration"]


class StandardLibrary:
    def __init__(
        self,
        verbose=True,
        logger_name="StandardLibrary",
        log_level="INFO",
        log_file_folder="default",
        log_file_name=None,
    ):
        """
        This class handles flux calibration by comparing the extracted and
        wavelength-calibrated standard observation to the "ground truth"
        from

        https://github.com/iraf-community/iraf/tree/master/noao/lib/onedstandards
        https://www.eso.org/sci/observing/tools/standards/spectra.html

        See explanation notes at those links for details.

        Parameters
        ----------
        verbose: bool (Default: True)
            Set to False to suppress all verbose warnings, except for
            critical failure.
        logger_name: str (Default: StandardLibrary)
            This will set the name of the logger, if the name is used already,
            it will reference to the existing logger. This will be the
            first part of the default log file name unless log_file_name is
            provided.
        log_level: str (Default: 'INFO')
            Four levels of logging are available, in decreasing order of
            information and increasing order of severity: (1) DEBUG, (2) INFO,
            (3) WARNING, (4) ERROR and (5) CRITICAL. WARNING means that
            there is suboptimal operations in some parts of that step. ERROR
            means that the requested operation cannot be performed, but the
            software can handle it by either using the default setting or
            skipping the operation. CRITICAL means that the requested
            operation cannot be resolved without human interaction, this is
            most usually coming from missing data.
        log_file_folder: None or str (Default: "default")
            Folder in which the file is save, set to default to save to the
            current path.
        log_file_name: None or str (Default: None)
            File name of the log, set to None to logging.warning to screen
            only.

        """

        # Set-up logger
        self.logger = logging.getLogger(logger_name)
        if (log_level == "CRITICAL") or (not verbose):
            self.logger.setLevel(logging.CRITICAL)
        elif log_level == "ERROR":
            self.logger.setLevel(logging.ERROR)
        elif log_level == "WARNING":
            self.logger.setLevel(logging.WARNING)
        elif log_level == "INFO":
            self.logger.setLevel(logging.INFO)
        elif log_level == "DEBUG":
            self.logger.setLevel(logging.DEBUG)
        else:
            raise ValueError("Unknonw logging level.")

        formatter = logging.Formatter(
            "[%(asctime)s] %(levelname)s [%(filename)s:%(lineno)d] "
            "%(message)s",
            datefmt="%a, %d %b %Y %H:%M:%S",
        )

        if log_file_name is None:
            # Only print log to screen
            self.handler = logging.StreamHandler()
        else:
            if log_file_name == "default":
                log_file_name = "{}_{}.log".format(
                    logger_name,
                    datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S"),
                )
            # Save log to file
            if log_file_folder == "default":
                log_file_folder = ""

            self.handler = logging.FileHandler(
                os.path.join(log_file_folder, log_file_name), "a+"
            )

        self.handler.setFormatter(formatter)
        if self.logger.hasHandlers():
            self.logger.handlers.clear()
        self.logger.addHandler(self.handler)

        self.verbose = verbose

        self._load_standard_dictionary()

    def _load_standard_dictionary(self):
        """
        Load the dictionaries containing the names of all the standard stars.

        """

        self.lib_to_uname = json.load(
            open(
                pkg_resources.resource_filename(
                    "aspired", "standards/lib_to_uname.json"
                )
            )
        )
        self.uname_to_lib = json.load(
            open(
                pkg_resources.resource_filename(
                    "aspired", "standards/uname_to_lib.json"
                )
            )
        )

    def _get_eso_standard(self):
        """
        Formatter for reading the ESO standards.

        """

        folder = self.library

        # first letter of the file name
        if self.ftype == "flux":

            filename = "f"

        else:

            filename = "m"

        # the rest of the file name
        filename += self.target

        # the extension
        filename += ".dat"

        filepath = os.path.join(base_dir, "standards", folder, filename)

        f = np.loadtxt(filepath)

        self.standard_wave_true = f[:, 0]
        self.standard_fluxmag_true = f[:, 1]

        if self.ftype == "flux":

            if self.library != "esoxshooter":

                self.standard_fluxmag_true /= 10.0**16.0

    def _get_ing_standard(self):
        """
        Formatter for reading the ING standards.

        """

        folder = self.library.split("_")[0]

        # the first part of the file name
        filename = self.target
        extension = self.library.split("_")[-1]

        # last letter (or nothing) of the file name
        if self.ftype == "flux":

            # .mas only contain magnitude files
            if extension == "mas":

                filename += "a"

            if (filename == "g24" or filename == "g157") and (
                extension == "fg"
            ):

                filename += "a"

            if (filename == "h102") and (extension == "sto"):

                filename += "a"

        else:

            filename += "a"

        # the extension
        filename += "." + extension

        filepath = os.path.join(base_dir, "standards", folder, filename)

        f = open(filepath)
        wave = []
        fluxmag = []
        for line in f.readlines():

            if line[0] in ["*", "S"]:

                if line.startswith("SET .Z.UNITS = "):

                    # remove all special characters and white spaces
                    unit = "".join(
                        e for e in line.split('"')[1].lower() if e.isalnum()
                    )

            else:

                li = line.strip().strip(":").split()
                wave.append(li[0])
                fluxmag.append(li[1])

        f.close()
        self.standard_wave_true = np.array(wave).astype("float")
        self.standard_fluxmag_true = np.array(fluxmag).astype("float")

        # See https://www.stsci.edu/~strolger/docs/UNITS.txt for the unit
        # conversion.
        if self.ftype == "flux":

            # Trap the ones without flux files
            if (
                extension == "mas"
                or filename == "g24a.fg"
                or filename == "g157a.fg"
                or filename == "h102a.sto"
            ):

                self.standard_fluxmag_true = (
                    10.0 ** (-(self.standard_fluxmag_true / 2.5))
                    * 3630.780548
                    / 3.33564095e4
                    / self.standard_wave_true**2
                )

            # convert milli-Jy into F_lambda
            if unit == "mjy":

                self.standard_fluxmag_true = (
                    self.standard_fluxmag_true
                    * 1e-3
                    * 2.99792458e-05
                    / self.standard_wave_true**2
                )

            # convert micro-Jy into F_lambda
            if unit == "microjanskys":

                self.standard_fluxmag_true = (
                    self.standard_fluxmag_true
                    * 1e-6
                    * 2.99792458e-05
                    / self.standard_wave_true**2
                )

    def _get_iraf_standard(self):
        """
        Formatter for reading the iraf standards.

        """

        folder = self.library

        # file name and extension
        filename = self.target + ".dat"

        filepath = os.path.join(base_dir, "standards", folder, filename)

        f = np.loadtxt(filepath, skiprows=1)

        self.standard_wave_true = f[:, 0]
        self.standard_fluxmag_true = f[:, 1]

        # iraf is always in AB magnitude
        if self.ftype == "flux":

            # Convert from AB mag to flux
            self.standard_fluxmag_true = (
                10.0 ** (-(self.standard_fluxmag_true / 2.5))
                * 3630.780548
                / 3.33564095e4
                / self.standard_wave_true**2
            )

    def lookup_standard_libraries(self, target, cutoff=0.4):
        """
        Check if the requested standard and library exist. Return the three
        most similar words if the requested one does not exist. See

            https://docs.python.org/3.7/library/difflib.html

        Parameters
        ----------
        target: str
            Name of the standard star
        cutoff: float (Default: 0.4)
            The similarity toleranceold
            [0 (completely different) - 1 (identical)]

        """

        # Load the list of targets in the requested library
        try:

            libraries = self.uname_to_lib[target.lower()]
            return libraries, True

        except Exception as e:

            self.logger.warning(str(e))

            # If the requested target is not in any library, suggest the
            # closest match, Top 5 are returned.
            # difflib uses Gestalt pattern matching.
            target_list = difflib.get_close_matches(
                target.lower(),
                list(self.uname_to_lib.keys()),
                n=5,
                cutoff=cutoff,
            )

            if len(target_list) > 0:

                self.logger.warning(
                    "Requested standard star cannot be found, a list of "
                    + "the closest matching names are returned: {}".format(
                        target_list
                    )
                )

                return target_list, False

            else:
                error_msg = (
                    "Please check the name of your standard "
                    + "star, nothing share a similarity above {}.".format(
                        cutoff
                    )
                )
                self.logger.critical(error_msg)
                raise ValueError(error_msg)

    def load_standard(self, target, library=None, ftype="flux", cutoff=0.4):
        """
        Read the standard flux/magnitude file. And return the wavelength and
        flux/mag. The units of the data are always in

        | wavelength: A
        | flux:       ergs / cm / cm / s / A
        | mag:        mag (AB)

        Parameters
        ----------
        target: string
            Name of the standard star
        library: string (Default: None)
            Name of the library of standard star
        ftype: string (Default: 'flux')
            'flux' or 'mag'
        cutoff: float (Default: 0.4)
            The toleranceold for the word similarity in the range of [0, 1].

        """

        self.target = target.lower()
        self.ftype = ftype.lower()
        self.cutoff = cutoff

        libraries, success = self.lookup_standard_libraries(self.target)

        if success:

            if np.in1d([library], libraries):

                self.library = library

            else:

                self.library = libraries[0]

                self.logger.warning(
                    "The requested standard star cannot be found in the "
                    "given library,  or the library is not specified. "
                    "ASPIRED is using " + self.library + "."
                )

        else:

            # If not, search again with the first one returned from lookup.
            self.target = libraries[0]
            libraries, _ = self.lookup_standard_libraries(self.target)
            self.library = libraries[0]

            self.logger.warning(
                "The requested library does not exist, "
                + self.library
                + " is used because it has the closest matching name."
            )

        if not self.verbose:

            if library is None:

                # Use the default library order
                self.logger.warning(
                    "Standard library is not given, "
                    + self.library
                    + " is used."
                )

        if self.library.startswith("iraf"):

            self._get_iraf_standard()

        if self.library.startswith("ing"):

            self._get_ing_standard()

        if self.library.startswith("eso"):

            self._get_eso_standard()

    def inspect_standard(
        self,
        display=True,
        renderer="default",
        width=1280,
        height=720,
        return_jsonstring=False,
        save_fig=False,
        fig_type="iframe+png",
        filename=None,
        open_iframe=False,
    ):
        """
        Display the standard star plot.

        Parameters
        ----------
        display: bool (Default: True)
            Set to True to display disgnostic plot.
        renderer: string (Default: default)
            plotly renderer options.
        width: int/float (Default: 1280)
            Number of pixels in the horizontal direction of the outputs
        height: int/float (Default: 720)
            Number of pixels in the vertical direction of the outputs
        return_jsonstring: bool (Default: False)
            set to True to return json string that can be rendered by Plotly
            in any support language.
        save_fig: bool (default: False)
            Save an image if set to True. Plotly uses the pio.write_html()
            or pio.write_image(). The support format types should be provided
            in fig_type.
        fig_type: string (default: 'iframe+png')
            Image type to be saved, choose from:
            jpg, png, svg, pdf and iframe. Delimiter is '+'.
        filename: str (Default: None)
            Filename for the output, all of them will share the same name but
            will have different extension.
        open_iframe: bool (Default: False)
            Open the iframe in the default browser if set to True.

        Returns
        -------
        JSON strings if return_jsonstring is set to True.

        """

        fig = go.Figure(
            layout=dict(
                autosize=False,
                height=height,
                width=width,
                updatemenus=list(
                    [
                        dict(
                            active=0,
                            buttons=list(
                                [
                                    dict(
                                        label="Log Scale",
                                        method="update",
                                        args=[
                                            {"visible": [True, True]},
                                            {
                                                "title": "Log scale",
                                                "yaxis": {"type": "log"},
                                            },
                                        ],
                                    ),
                                    dict(
                                        label="Linear Scale",
                                        method="update",
                                        args=[
                                            {"visible": [True, False]},
                                            {
                                                "title": "Linear scale",
                                                "yaxis": {"type": "linear"},
                                            },
                                        ],
                                    ),
                                ]
                            ),
                        )
                    ]
                ),
                title="Log scale",
            )
        )

        # show the image on the top
        fig.add_trace(
            go.Scatter(
                x=self.standard_wave_true,
                y=self.standard_fluxmag_true,
                line=dict(color="royalblue", width=4),
            )
        )

        fig.update_layout(
            title=self.library + ": " + self.target + " " + self.ftype,
            xaxis_title=r"$\text{Wavelength / A}$",
            yaxis_title=(
                r"$\text{Flux / ergs cm}^{-2} \text{s}^{-1}" + "\text{A}^{-1}$"
            ),
            hovermode="closest",
            showlegend=False,
        )

        if filename is None:

            filename = "standard"

        if save_fig:

            fig_type_split = fig_type.split("+")

            for t in fig_type_split:

                if t == "iframe":

                    pio.write_html(
                        fig, filename + "." + t, auto_open=open_iframe
                    )

                elif t in ["jpg", "png", "svg", "pdf"]:

                    pio.write_image(fig, filename + "." + t)

        if display:

            if renderer == "default":

                fig.show()

            else:

                fig.show(renderer)

        if return_jsonstring:

            return fig.to_json()


[docs]class FluxCalibration(StandardLibrary): def __init__( self, verbose=True, logger_name="FluxCalibration", log_level="INFO", log_file_folder="default", log_file_name=None, ): """ Initialise a FluxCalibration object. Parameters ---------- verbose: bool (Default: True) Set to False to suppress all verbose warnings, except for critical failure. logger_name: str (Default: FluxCalibration) This will set the name of the logger, if the name is used already, it will reference to the existing logger. This will be the first part of the default log file name unless log_file_name is provided. log_level: str (Default: 'INFO') Four levels of logging are available, in decreasing order of information and increasing order of severity: CRITICAL, DEBUG, INFO, WARNING, ERROR log_file_folder: None or str (Default: "default") Folder in which the file is save, set to default to save to the current path. log_file_name: None or str (Default: None) File name of the log, set to None to logging.warning to screen only. """ # Set-up logger self.logger = logging.getLogger(logger_name) if (log_level == "CRITICAL") or (not verbose): logging.basicConfig(level=logging.CRITICAL) if log_level == "ERROR": logging.basicConfig(level=logging.ERROR) if log_level == "WARNING": logging.basicConfig(level=logging.WARNING) if log_level == "INFO": logging.basicConfig(level=logging.INFO) if log_level == "DEBUG": logging.basicConfig(level=logging.DEBUG) formatter = logging.Formatter( "[%(asctime)s] %(levelname)s [%(filename)s:%(lineno)d] " "%(message)s", datefmt="%a, %d %b %Y %H:%M:%S", ) if log_file_name is None: # Only logging.warning log to screen handler = logging.StreamHandler() else: if log_file_name == "default": log_file_name = "{}_{}.log".format( logger_name, datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S"), ) # Save log to file if log_file_folder == "default": log_file_folder = "" handler = logging.FileHandler( os.path.join(log_file_folder, log_file_name), "a+" ) handler.setFormatter(formatter) self.logger.addHandler(handler) self.verbose = verbose self.logger_name = logger_name self.log_level = log_level self.log_file_folder = log_file_folder self.log_file_name = log_file_name # Load the dictionary super().__init__( verbose=self.verbose, logger_name=self.logger_name, log_level=self.log_level, log_file_folder=self.log_file_folder, log_file_name=self.log_file_name, ) self.verbose = verbose self.spectrum1D = Spectrum1D( spec_id=0, verbose=self.verbose, logger_name=self.logger_name, log_level=self.log_level, log_file_folder=self.log_file_folder, log_file_name=self.log_file_name, ) self.target_spec_id = None self.standard_wave_true = None self.standard_fluxmag_true = None self.count_continuum = None self.flux_continuum = None
[docs] def from_spectrum1D(self, spectrum1D, merge=False, overwrite=False): """ This function copies all the info from the spectrum1D, because users may supply different level/combination of reduction, everything is copied from the spectrum1D even though in most cases only a None will be passed. By default, this is passing object by reference by default, so it directly modifies the spectrum1D supplied. By setting merger to True, it copies the data into the Spectrum1D in the FluxCalibration object. Parameters ---------- spectrum1D: Spectrum1D object The Spectrum1D to be referenced or copied. merge: bool (Default: False) Set to True to copy everything over to the local Spectrum1D, hence FluxCalibration will not be acting on the Spectrum1D outside. """ if merge: self.spectrum1D.merge(spectrum1D, overwrite=overwrite) else: self.spectrum1D = spectrum1D self.spectrum1D_imported = True
def remove_spectrum1D(self): self.spectrum1D = Spectrum1D( spec_id=0, verbose=self.verbose, logger_name=self.logger_name, log_level=self.log_level, log_file_folder=self.log_file_folder, log_file_name=self.log_file_name, ) self.spectrum1D_imported = False
[docs] def load_standard(self, target, library=None, ftype="flux", cutoff=0.4): """ Read the standard flux/magnitude file. And return the wavelength and flux/mag. The units of the data are always in | wavelength: A | flux: ergs / cm / cm / s / A | mag: mag (AB) Parameters ---------- target: string Name of the standard star library: string (Default: None) Name of the library of standard star ftype: string (Default: 'flux') 'flux' or 'mag' cutoff: float (Default: 0.4) The toleranceold for the word similarity in the range of [0, 1]. """ super().load_standard( target=target, library=library, ftype=ftype, cutoff=cutoff ) # the best target and library found can be different from the input self.spectrum1D.add_standard_star( library=self.library, target=self.target )
[docs] def add_standard(self, wavelength, count, count_err=None, count_sky=None): """ Add spectrum (wavelength, count, count_err & count_sky). Parameters ---------- wavelength: 1-d array The wavelength at each pixel of the trace. count: 1-d array The summed count at each column about the trace. count_err: 1-d array (Default: None) the uncertainties of the count values count_sky: 1-d array (Default: None) The integrated sky values along each column, suitable for subtracting from the output of ap_extract """ self.spectrum1D.add_wavelength(wavelength) self.spectrum1D.add_count(count, count_err, count_sky)
[docs] def get_telluric_profile( self, wave, flux, continuum, mask_range=[[6850, 6960], [7580, 7700]], return_function=False, ): """ Getting the Telluric absorption profile from the continuum of the standard star spectrum. Parameters ---------- wave: list or 1-d array (N) Wavelength. flux: list or 1-d array (N) Flux. continuum: list or 1-d array (N) Continuum Flux. mask_range: list of list list of lists with 2 values indicating the range marked by each of the Telluric regions. return_function: bool (Default: False) Set to True to explicitly return the interpolated function of the Telluric profile. """ telluric_profile = np.zeros_like(wave) # Get the continuum of the spectrum # This should give the POSITIVE values over the telluric region residual = continuum - flux for m in mask_range: # Get the indices for the two sides of the masking region # at the native pixel scale left_of_mask = np.where(wave <= m[0])[0] right_of_mask = np.where(wave >= m[1])[0] if (len(left_of_mask) == 0) or (len(right_of_mask) == 0): continue left_telluric_start = int(max(left_of_mask)) right_telluric_end = int(min(right_of_mask)) + 1 telluric_profile[ left_telluric_start:right_telluric_end ] = residual[left_telluric_start:right_telluric_end] # normalise the profile telluric_factor = np.ptp(telluric_profile) telluric_profile /= telluric_factor telluric_func = itp.interp1d( wave, telluric_profile, fill_value="extrapolate" ) self.spectrum1D.add_telluric_func(telluric_func) self.spectrum1D.add_telluric_profile(telluric_profile) self.spectrum1D.add_telluric_factor(telluric_factor) if return_function: return telluric_func, telluric_profile, telluric_factor
[docs] def inspect_telluric_profile( self, display=True, renderer="default", width=1280, height=720, return_jsonstring=False, save_fig=False, fig_type="iframe+png", filename=None, open_iframe=False, ): """ Display the Telluric profile. Parameters ---------- display: bool (Default: True) Set to True to display disgnostic plot. renderer: string (Default: 'default') plotly renderer options. width: int/float (Default: 1280) Number of pixels in the horizontal direction of the outputs height: int/float (Default: 720) Number of pixels in the vertical direction of the outputs return_jsonstring: bool (Default: False) set to True to return json string that can be rendered by Plotly in any support language. save_fig: bool (default: False) Save an image if set to True. Plotly uses the pio.write_html() or pio.write_image(). The support format types should be provided in fig_type. fig_type: string (default: 'iframe+png') Image type to be saved, choose from: jpg, png, svg, pdf and iframe. Delimiter is '+'. filename: str (Default: None) Filename for the output, all of them will share the same name but will have different extension. open_iframe: bool (Default: False) Open the iframe in the default browser if set to True. Returns ------- JSON strings if return_jsonstring is set to True. """ fig = go.Figure( layout=dict( autosize=False, height=height, width=width, title="Log scale" ) ) # show the image on the top self.logger.info(np.asarray(self.spectrum1D.wave)) fig.add_trace( go.Scatter( x=np.asarray(self.spectrum1D.wave), y=self.spectrum1D.telluric_func( np.asarray(self.spectrum1D.wave) ), line=dict(color="royalblue", width=4), name="Count / s (Observed)", ) ) fig.update_layout( hovermode="closest", title="Telluric Profile", showlegend=True, xaxis_title=r"$\text{Wavelength / A}$", yaxis_title="Arbitrary", yaxis=dict(title="Count / s"), legend=go.layout.Legend( x=0, y=1, traceorder="normal", font=dict(family="sans-serif", size=12, color="black"), bgcolor="rgba(0,0,0,0)", ), ) if filename is None: filename = "telluric_profile" if save_fig: fig_type_split = fig_type.split("+") for t in fig_type_split: if t == "iframe": pio.write_html( fig, filename + "." + t, auto_open=open_iframe ) elif t in ["jpg", "png", "svg", "pdf"]: pio.write_image(fig, filename + "." + t) if display: if renderer == "default": fig.show() else: fig.show(renderer) if return_jsonstring: return fig.to_json()
[docs] def get_sensitivity( self, k=3, method="interpolate", mask_range=[[6850.0, 6960.0], [7580.0, 7700.0]], mask_fit_order=1, mask_fit_size=3, smooth=False, slength=5, sorder=3, return_function=True, sens_deg=7, **kwargs ): """ The sensitivity curve is computed by dividing the true values by the wavelength calibrated standard spectrum, which is resampled with the spectres.spectres(). The curve is then interpolated with a cubic spline by default and is stored as a scipy interp1d object. A Savitzky-Golay filter is available for smoothing before the interpolation but it is not used by default. 6850 - 6960, 7575 - 7700, and 8925 - 9050 A are masked by default. Parameters ---------- k: integer [1,2,3,4,5 only] The order of the spline. method: str (Default: interpolate) This should be either 'interpolate' of 'polynomial'. Note that the polynomial is computed from the interpolated function. The default is interpolate because it is much more stable at the wavelength limits of a spectrum in an automated system. mask_range: None or list of list Masking out regions of Telluric absorption when fitting the sensitivity curve. None for no mask. List of list has the pattern [[min_pix_1, max_pix_1], [min_pix_2, max_pix_2],...] mask_fit_order: int (Default: 1) Order of polynomial to be fitted over the masked regions mask_fit_size: int (Default: 3) Number of "pixels" to be fitted on each side of the masked regions. smooth: bool (Default: False) set to smooth the input spectrum with scipy.signal.savgol_filter slength: int (Default: 5) SG-filter window size sorder: int (Default: 3) SG-filter polynomial order return_function: bool (Default: True) Set to True to explicity return the interpolated function of the sensitivity curve. sens_deg: int (Default: 7) The degree of polynomial of the sensitivity curve, only used if the method is 'polynomial'. **kwargs: keyword arguments for passing to the LOWESS functionj for getting the continuum, see `statsmodels.nonparametric.smoothers_lowess.lowess()` Returns ------- A callable function as a function of wavelength if return_function is set to True. """ # resampling both the observed and the database standard spectra # in unit of flux per second. The higher resolution spectrum is # resampled to match the lower resolution one. count = np.asarray(getattr(self.spectrum1D, "count")) count_err = np.asarray(getattr(self.spectrum1D, "count_err")) wave = np.asarray(getattr(self.spectrum1D, "wave")) print(wave) print(count) if getattr(self.spectrum1D, "count_continuum") is None: self.spectrum1D.add_count_continuum( get_continuum(wave, count, **kwargs) ) count = np.asarray(getattr(self.spectrum1D, "count_continuum")) # If the median resolution of the observed spectrum is higher than # the literature one if np.nanmedian(np.array(np.ediff1d(wave))) < np.nanmedian( np.array(np.ediff1d(self.standard_wave_true)) ): standard_count, _ = spectres( np.array(self.standard_wave_true).reshape(-1), np.array(wave).reshape(-1), np.array(count).reshape(-1), np.array(count_err).reshape(-1), verbose=True, ) standard_flux_true = self.standard_fluxmag_true standard_wave_true = self.standard_wave_true # If the median resolution of the observed spectrum is lower than # the literature one else: standard_count = count # standard_flux_err = count_err standard_flux_true = spectres( np.array(wave).reshape(-1), np.array(self.standard_wave_true).reshape(-1), np.array(self.standard_fluxmag_true).reshape(-1), verbose=True, ) standard_wave_true = wave # apply a Savitzky-Golay filter to remove noise and Telluric lines if smooth: standard_count = signal.savgol_filter( standard_count, slength, sorder ) # Set the smoothing parameters self.spectrum1D.add_smoothing(smooth, slength, sorder) # Get the sensitivity curve sensitivity = standard_flux_true / standard_count sensitivity_masked = sensitivity.copy() if mask_range is not None: for m in mask_range: # If the mask is partially outside the spectrum, ignore if ( (m[0] < min(standard_wave_true)) or (m[0] < min(wave)) or (m[1] > max(standard_wave_true)) or (m[1] > max(wave)) ): continue # Get the indices for the two sides of the masking region left_end = ( int(np.max(np.where(standard_wave_true <= m[0]))) + 1 ) left_start = int(left_end - mask_fit_size) right_start = int(np.min(np.where(standard_wave_true >= m[1]))) right_end = int(right_start + mask_fit_size) + 1 # Get the wavelengths of the two sides wave_temp = np.concatenate( ( standard_wave_true[left_start:left_end], standard_wave_true[right_start:right_end], ) ) # Get the sensitivity of the two sides sensitivity_temp = np.concatenate( ( sensitivity[left_start:left_end], sensitivity[right_start:right_end], ) ) finite_mask = ( ~np.isnan(sensitivity_temp) & (sensitivity_temp > 0.0) & np.isfinite(sensitivity_temp) ) # Fit the polynomial across the masked region coeff = np.polynomial.polynomial.polyfit( wave_temp[finite_mask], sensitivity_temp[finite_mask], mask_fit_order, ) # Replace the snsitivity values with the fitted values sensitivity_masked[ left_end:right_start ] = np.polynomial.polynomial.polyval( standard_wave_true[left_end:right_start], coeff ) mask = np.isfinite(np.log10(sensitivity_masked)) & ~np.isnan( np.log10(sensitivity_masked) ) sensitivity_masked = sensitivity_masked[mask] standard_wave_masked = standard_wave_true[mask] standard_flux_masked = standard_flux_true[mask] if method == "interpolate": tck = itp.splrep( standard_wave_masked, np.log10(sensitivity_masked), k=k ) def sensitivity_func(x): return itp.splev(x, tck) elif method == "polynomial": coeff = np.polynomial.polynomial.polyfit( standard_wave_masked, np.log10(sensitivity_masked), deg=sens_deg, ) def sensitivity_func(x): return np.polynomial.polynomial.polyval(x, coeff) else: error_msg = "{} is not implemented.".format(method) self.logger.critical(error_msg) raise NotImplementedError(error_msg) self.spectrum1D.add_sensitivity(sensitivity_masked) self.spectrum1D.add_literature_standard( standard_wave_masked, standard_flux_masked ) # Add to each Spectrum1D object self.spectrum1D.add_sensitivity_func(sensitivity_func) self.spectrum1D.add_count_continuum(count) self.spectrum1D.add_flux_continuum(count * sensitivity_func(wave)) ( telluric_func, telluric_profile, telluric_factor, ) = self.get_telluric_profile( wave, getattr(self.spectrum1D, "count") * 10.0 ** sensitivity_func(wave), count * 10.0 ** sensitivity_func(wave), mask_range=mask_range, return_function=True, ) self.spectrum1D.add_telluric_func(telluric_func) self.spectrum1D.add_telluric_profile(telluric_profile) self.spectrum1D.add_telluric_factor(telluric_factor) if return_function: return sensitivity_func
[docs] def add_sensitivity_func(self, sensitivity_func): """ parameters ---------- sensitivity_func: callable function Interpolated sensivity curve object. """ # Add to both science and standard spectrum_list self.spectrum1D.add_sensitivity_func(sensitivity_func=sensitivity_func) self.spectrum1D.add_literature_standard( self.standard_wave_true, self.standard_fluxmag_true )
def save_sensitivity_func(self): pass
[docs] def inspect_sensitivity( self, display=True, renderer="default", width=1280, height=720, return_jsonstring=False, save_fig=False, fig_type="iframe+png", filename=None, open_iframe=False, ): """ Display the computed sensitivity curve. Parameters ---------- display: bool (Default: True) Set to True to display disgnostic plot. renderer: string (Default: 'default') plotly renderer options. width: int/float (Default: 1280) Number of pixels in the horizontal direction of the outputs height: int/float (Default: 720) Number of pixels in the vertical direction of the outputs return_jsonstring: bool (Default: False) set to True to return json string that can be rendered by Plotly in any support language. save_fig: bool (default: False) Save an image if set to True. Plotly uses the pio.write_html() or pio.write_image(). The support format types should be provided in fig_type. fig_type: string (default: 'iframe+png') Image type to be saved, choose from: jpg, png, svg, pdf and iframe. Delimiter is '+'. filename: str (Default: None) Filename for the output, all of them will share the same name but will have different extension. open_iframe: bool (Default: False) Open the iframe in the default browser if set to True. Returns ------- JSON strings if return_jsonstring is set to True. """ wave_literature = getattr(self.spectrum1D, "wave_literature") flux_literature = getattr(self.spectrum1D, "flux_literature") sensitivity = getattr(self.spectrum1D, "sensitivity") sensitivity_func = getattr(self.spectrum1D, "sensitivity_func") library = getattr(self.spectrum1D, "library") target = getattr(self.spectrum1D, "target") fig = go.Figure( layout=dict( autosize=False, height=height, width=width, updatemenus=list( [ dict( active=0, buttons=list( [ dict( label="Log Scale", method="update", args=[ {"visible": [True, True]}, { "title": "Log scale", "yaxis": {"type": "log"}, }, ], ), dict( label="Linear Scale", method="update", args=[ {"visible": [True, False]}, { "title": "Linear scale", "yaxis": {"type": "linear"}, }, ], ), ] ), ) ] ), title="Log scale", ) ) # show the image on the top fig.add_trace( go.Scatter( x=wave_literature, y=flux_literature, line=dict(color="royalblue", width=4), name="Count / s (Observed)", ) ) fig.add_trace( go.Scatter( x=wave_literature, y=sensitivity, yaxis="y2", line=dict(color="firebrick", width=4), name="Sensitivity Curve", ) ) fig.add_trace( go.Scatter( x=wave_literature, y=10.0 ** sensitivity_func(wave_literature), yaxis="y2", line=dict(color="black", width=2), name="Best-fit Sensitivity Curve", ) ) fig.update_layout( title=library + ": " + target, yaxis_title="Count / s" ) fig.update_layout( hovermode="closest", showlegend=True, xaxis_title=r"$\text{Wavelength / A}$", yaxis=dict(title="Count / s"), yaxis2=dict( title="Sensitivity Curve", type="log", anchor="x", overlaying="y", side="right", ), legend=go.layout.Legend( x=0, y=1, traceorder="normal", font=dict(family="sans-serif", size=12, color="black"), bgcolor="rgba(0,0,0,0)", ), ) if filename is None: filename = "senscurve" if save_fig: fig_type_split = fig_type.split("+") for t in fig_type_split: if t == "iframe": pio.write_html( fig, filename + "." + t, auto_open=open_iframe ) elif t in ["jpg", "png", "svg", "pdf"]: pio.write_image(fig, filename + "." + t) if display: if renderer == "default": fig.show() else: fig.show(renderer) if return_jsonstring: return fig.to_json()
[docs] def apply_flux_calibration( self, target_spectrum1D, inspect=False, wave_min=3500.0, wave_max=8500.0, display=False, renderer="default", width=1280, height=720, return_jsonstring=False, save_fig=False, fig_type="iframe+png", filename=None, open_iframe=False, ): """ Apply the computed sensitivity curve. And resample the spectra to match the highest resolution (the smallest wavelength bin) part of the spectrum. Note: This function directly modify the *target_spectrum1D*. Note 2: the wave_min and wave_max are for DISPLAY purpose only. Parameters ---------- target_spectrum1D: Spectrum1D object The spectrum to be flux calibrated. inspect: bool (Default: False) Set to True to create/display/save figure wave_min: float (Default: 3500) Minimum wavelength to display wave_max: float (Default: 8500) Maximum wavelength to display display: bool (Default: False) Set to True to display disgnostic plot. renderer: string (Default: 'default') plotly renderer options. width: int/float (Default: 1280) Number of pixels in the horizontal direction of the outputs height: int/float (Default: 720) Number of pixels in the vertical direction of the outputs return_jsonstring: bool (Default: False) set to True to return json string that can be rendered by Plotly in any support language. save_fig: bool (default: False) Save an image if set to True. Plotly uses the pio.write_html() or pio.write_image(). The support format types should be provided in fig_type. fig_type: string (default: 'iframe+png') Image type to be saved, choose from: jpg, png, svg, pdf and iframe. Delimiter is '+'. filename: str (Default: None) Filename for the output, all of them will share the same name but will have different extension. open_iframe: bool (Default: False) Open the iframe in the default browser if set to True. Returns ------- JSON strings if return_jsonstring is set to True. """ self.target_spec_id = getattr(target_spectrum1D, "spec_id") wave = getattr(target_spectrum1D, "wave") wave_resampled = getattr(target_spectrum1D, "wave_resampled") if wave_resampled is None: wave_resampled = wave count = getattr(target_spectrum1D, "count") count_err = getattr(target_spectrum1D, "count_err") count_sky = getattr(target_spectrum1D, "count_sky") if getattr(target_spectrum1D, "count_continuum") is None: target_spectrum1D.add_count_continuum(get_continuum(wave, count)) count_continuum = getattr(target_spectrum1D, "count_continuum") # apply the flux calibration sensitivity_func = getattr(self.spectrum1D, "sensitivity_func") sensitivity = 10.0 ** sensitivity_func(wave) flux = sensitivity * count flux_resampled = spectres( np.array(wave_resampled).reshape(-1), np.array(wave).reshape(-1), np.array(flux).reshape(-1), verbose=True, ) if count_err is None: flux_err_resampled = np.zeros_like(flux_resampled) else: flux_err = sensitivity * count_err flux_err_resampled = spectres( np.array(wave_resampled).reshape(-1), np.array(wave).reshape(-1), np.array(flux_err).reshape(-1), verbose=True, ) if count_sky is None: flux_sky_resampled = np.zeros_like(flux_resampled) else: flux_sky = sensitivity * count_sky flux_sky_resampled = spectres( np.array(wave_resampled).reshape(-1), np.array(wave).reshape(-1), np.array(flux_sky).reshape(-1), verbose=True, ) flux_continuum = sensitivity * count_continuum flux_continuum_resampled = spectres( np.array(wave_resampled).reshape(-1), np.array(wave).reshape(-1), np.array(flux_continuum).reshape(-1), verbose=True, ) flux_continuum_resampled[np.isnan(flux_continuum_resampled)] = 0.0 count_continuum_resampled = spectres( np.array(wave_resampled).reshape(-1), np.array(wave).reshape(-1), np.array(count_continuum).reshape(-1), verbose=True, ) target_spectrum1D.add_flux_continuum(flux_continuum) target_spectrum1D.add_flux_resampled_continuum( flux_continuum_resampled ) target_spectrum1D.add_count_resampled_continuum( count_continuum_resampled ) # Only computed for diagnostic sensitivity_resampled = spectres( np.array(wave_resampled).reshape(-1), np.array(wave).reshape(-1), np.array(sensitivity).reshape(-1), verbose=True, ) flux_resampled[np.isnan(flux_resampled)] = 0.0 flux_err_resampled[np.isnan(flux_err_resampled)] = 0.0 flux_sky_resampled[np.isnan(flux_sky_resampled)] = 0.0 target_spectrum1D.add_flux(flux, flux_err, flux_sky) target_spectrum1D.add_flux_resampled( flux_resampled, flux_err_resampled, flux_sky_resampled ) target_spectrum1D.add_sensitivity(sensitivity) target_spectrum1D.add_sensitivity_resampled(sensitivity_resampled) # Add the rest of the flux calibration parameters target_spectrum1D.merge(self.spectrum1D) if inspect: wave_mask = (np.array(wave_resampled).reshape(-1) > wave_min) & ( np.array(wave_resampled).reshape(-1) < wave_max ) flux_low = ( np.nanpercentile( np.array(flux_resampled).reshape(-1)[wave_mask], 5 ) / 1.5 ) flux_high = ( np.nanpercentile( np.array(flux_resampled).reshape(-1)[wave_mask], 95 ) * 1.5 ) flux_mask = (np.array(flux_resampled).reshape(-1) > flux_low) & ( np.array(flux_resampled).reshape(-1) < flux_high ) flux_min = np.log10( np.nanmin(np.array(flux_resampled).reshape(-1)[flux_mask]) ) flux_max = np.log10( np.nanmax(np.array(flux_resampled).reshape(-1)[flux_mask]) ) fig = go.Figure( layout=dict( autosize=False, height=height, width=width, updatemenus=list( [ dict( active=0, buttons=list( [ dict( label="Log Scale", method="update", args=[ {"visible": [True, True]}, { "title": "Log scale", "yaxis": {"type": "log"}, }, ], ), dict( label="Linear Scale", method="update", args=[ {"visible": [True, False]}, { "title": "Linear scale", "yaxis": { "type": "linear" }, }, ], ), ] ), ) ] ), title="Log scale", ) ) # show the image on the top fig.add_trace( go.Scatter( x=wave_resampled, y=flux_resampled, line=dict(color="royalblue"), name="Flux", ) ) if flux_err is not None: fig.add_trace( go.Scatter( x=wave_resampled, y=flux_err_resampled, line=dict(color="firebrick"), name="Flux Uncertainty", ) ) if flux_sky is not None: fig.add_trace( go.Scatter( x=wave_resampled, y=flux_sky_resampled, line=dict(color="orange"), name="Sky Flux", ) ) if flux_continuum is not None: fig.add_trace( go.Scatter( x=wave_resampled, y=flux_continuum_resampled, line=dict(color="grey"), name="Continuum", ) ) fig.update_layout( hovermode="closest", showlegend=True, xaxis=dict(title="Wavelength / A", range=[wave_min, wave_max]), yaxis=dict( title="Flux", range=[flux_min, flux_max], type="log" ), legend=go.layout.Legend( x=0, y=1, traceorder="normal", font=dict(family="sans-serif", size=12, color="black"), bgcolor="rgba(0,0,0,0)", ), ) if filename is None: filename = "spectrum_" + str(self.target_spec_id) else: filename = ( os.path.splitext(filename)[0] + "_" + str(self.target_spec_id) ) if save_fig: fig_type_split = fig_type.split("+") for t in fig_type_split: if t == "iframe": pio.write_html( fig, filename + "." + t, auto_open=open_iframe ) elif t in ["jpg", "png", "svg", "pdf"]: pio.write_image(fig, filename + "." + t) if display: if renderer == "default": fig.show() else: fig.show(renderer) if return_jsonstring: return fig.to_json()
[docs] def create_fits( self, output="count+count_resampled+flux+flux_resampled", empty_primary_hdu=True, recreate=False, ): """ Parameters ---------- output: String Type of data to be saved, the order is fixed (in the order of the following description), but the options are flexible. The input strings are delimited by "+", trace: 2 HDUs Trace, and trace width (pixel) count: 4 HDUs Count, uncertainty, sky, optimal flag, and weight (pixel) arc_spec: 3 HDUs 1D arc spectrum, arc line pixels, and arc line effective pixels wavecal: 1 HDU Polynomial coefficients for wavelength calibration wavelength: 1 HDU Wavelength of each pixel count_resampled: 3 HDUs Resampled Count, uncertainty, and sky (wavelength) flux: 4 HDUs Flux, uncertainty, sky, and sensitivity (pixel) flux_resampled: 4 HDUs Flux, uncertainty, sky, and sensitivity (wavelength) empty_primary_hdu: bool (Default: True) Set to True to leave the Primary HDU blank (Default: True) recreate: bool (Default: False) Set to True to overwrite the FITS data and header. """ # If flux is calibrated self.spectrum1D.create_fits( output=output, empty_primary_hdu=empty_primary_hdu, recreate=recreate, )
[docs] def save_fits( self, output="count_resampled+sensitivity_resampled+flux_resampled", filename="fluxcal", empty_primary_hdu=True, overwrite=False, recreate=False, ): """ Save the reduced data to disk, with a choice of any combination of the data that are already present in the Spectrum1D, see below the 'output' parameters for details. Parameters ---------- output: String Type of data to be saved, the order is fixed (in the order of the following description), but the options are flexible. The input strings are delimited by "+". Because a FluxCalibration only requires a subset of all the data, only a few data products are guaranteed to exist. count: 4 HDUs Count, uncertainty, sky, optimal flag, and weight (pixel) count_resampled: 3 HDUs Resampled Count, uncertainty, and sky (wavelength) wavelength: 1 HDU Wavelength of each pixel flux: 4 HDUs Flux, uncertainty, sky, and sensitivity (pixel) flux_resampled: 4 HDUs Flux, uncertainty, sky, and sensitivity (wavelength) filename: String Disk location to be written to. Default is at where the process/subprocess is execuated. empty_primary_hdu: bool (Default: True) Set to True to leave the Primary HDU blank (Default: True) overwrite: bool Default is False. recreate: bool (Default: False) Set to True to overwrite the FITS data and header. """ # Fix the names and extensions if self.target_spec_id is not None: filename = ( os.path.splitext(filename)[0] + "_" + str(self.target_spec_id) ) else: filename = os.path.splitext(filename)[0] self.spectrum1D.save_fits( output=output, filename=filename, overwrite=overwrite, recreate=recreate, empty_primary_hdu=empty_primary_hdu, )
[docs] def save_csv( self, output="sensitivity_resampled+flux_resampled", filename="fluxcal", overwrite=False, recreate=False, ): """ Save the reduced data to disk, with a choice of any combination of the data that are already present in the Spectrum1D, see below the 'output' parameters for details. Parameters ---------- output: String Type of data to be saved, the order is fixed (in the order of the following description), but the options are flexible. The input strings are delimited by "+". Because a FluxCalibration only requires a subset of all the data, only a few data products are guaranteed to exist. count: 4 HDUs Count, uncertainty, sky, and weight (pixel) count_resampled: 3 HDUs Resampled Count, uncertainty, and sky (wavelength) wavelength: 1 HDU Wavelength of each pixel flux: 4 HDUs Flux, uncertainty, sky, and sensitivity (pixel) flux_resampled: 4 HDUs Flux, uncertainty, sky, and sensitivity (wavelength) filename: String (Default: None) Disk location to be written to. Default is at where the process/subprocess is execuated. overwrite: bool (Default: False) Set to True to allow overwriting the FITS data at the file destination. recreate: bool (Default: False) Set to True to regenerate the FITS data and header. """ # Fix the names and extensions if self.target_spec_id is not None: filename = ( os.path.splitext(filename)[0] + "_" + str(self.target_spec_id) ) else: filename = os.path.splitext(filename)[0] self.spectrum1D.save_csv( output=output, filename=filename, overwrite=overwrite, recreate=recreate, )
[docs] def get_spectrum1D(self): """ Return the spectrum1D object. """ return self.spectrum1D