# -*- coding: utf-8 -*-
import logging
import numpy as np
from scipy import interpolate as itp
from scipy import ndimage
from statsmodels.nonparametric.smoothers_lowess import lowess
def bfixpix(data, badmask, n=4, retdat=False):
"""
Replace pixels flagged as nonzero in a bad-pixel mask with the
average of their nearest four good neighboring pixels.
Taken and updated from Ian Crossfield's code
https://www.lpl.arizona.edu/~ianc/python/_modules/nsdata.html#bfixpix
Parameters
----------
data: numpy array (N, M)
badmask: numpy array (N, M)
Bad pixel mask.
n: int
number of nearby, good pixels to average over
retdat: bool
If True, return an array instead of replacing-in-place and do
_not_ modify input array `data`. This is always True if a 1D
array is input!
Returns
-------
another numpy array (if retdat is True)
"""
nx, ny = data.shape
badx, bady = np.nonzero(badmask)
nbad = len(badx)
if retdat:
data = np.array(data, copy=True)
for i in range(nbad):
rad = 0
numNearbyGoodPixels = 0
while numNearbyGoodPixels < n:
rad += 1
xmin = max(0, badx[i] - rad)
xmax = min(nx, badx[i] + rad)
ymin = max(0, bady[i] - rad)
ymax = min(ny, bady[i] + rad)
x = np.arange(nx)[xmin : xmax + 1]
y = np.arange(ny)[ymin : ymax + 1]
yy, xx = np.meshgrid(y, x)
rr = abs(xx + 1j * yy) * (
1.0 - badmask[xmin : xmax + 1, ymin : ymax + 1]
)
numNearbyGoodPixels = (rr > 0).sum()
closestDistances = np.unique(np.sort(rr[rr > 0])[0:n])
numDistances = len(closestDistances)
localSum = 0.0
localDenominator = 0.0
for j in range(numDistances):
localSum += data[xmin : xmax + 1, ymin : ymax + 1][
rr == closestDistances[j]
].sum()
localDenominator += (rr == closestDistances[j]).sum()
data[badx[i], bady[i]] = 1.0 * localSum / localDenominator
if retdat:
ret = data
else:
ret = None
return ret
def create_cutoff_mask(
data, cutoff=65535.0, grow=False, iterations=1, diagonal=False
):
"""
Create a simple mask from a numpy.ndarray, pixel values above
(or below) the specified cutoff value(s) are masked as *BAD* pixels as
True. If only one value is given, it will be treated as the upper limit.
Parameters
----------
data: numpy.ndarray
Image data to be used for generating saturation mask
cutoff: float
This sets the (lower and) upper limit of electron count.
grow: bool
Set to True to grow the mask, see `grow_mask()`
iterations: int
The number of pixel growth along the Cartesian axes directions.
diagonal: boolean
Set to True to grow in the diagonal directions.
Return
------
cutoff_mask: numpy.ndarray
Any pixel outside the cutoff values will be masked as True (bad).
"""
if isinstance(cutoff, (list, np.ndarray)):
if len(cutoff) == 2:
lower_limit = cutoff[0]
upper_limit = cutoff[1]
else:
err_msg = (
"Please supply a list or array for the cutoff. "
+ "The given cutoff is {} and and a size of {}.".format(
cutoff, len(cutoff)
)
)
logging.error(err_msg)
raise RuntimeError(err_msg)
elif isinstance(cutoff, (int, float)):
lower_limit = -1e10
upper_limit = cutoff
else:
err_msg = (
"Please supply a numeric value for the cutoff. "
+ "The given cutoff is {} of type {}.".format(cutoff, type(cutoff))
)
logging.error(err_msg)
raise RuntimeError(err_msg)
cutoff_mask = (data > upper_limit) | (data < lower_limit)
if grow:
cutoff_mask = grow_mask(
cutoff_mask, iterations=iterations, diagonal=diagonal
)
if (data > upper_limit).any():
logging.warning("Saturated pixels detected.")
return cutoff_mask, True
else:
return cutoff_mask, False
def create_bad_pixel_mask(data, grow=False, iterations=1, diagonal=False):
"""
Create a simple mask from a 2D numpy.ndarray, pixel with non-numeric
values will be masked as bad pixels (True).
Parameters
----------
data: numpy.ndarray
Image data to be used for generating saturation mask
grow: bool
Set to True to grow the mask, see `grow_mask()`
iterations: int
The number of pixel growth along the Cartesian axes directions.
diagonal: boolean
Set to True to grow in the diagonal directions.
Return
------
bad_pixel_mask: numpy.ndarray
Any pixel outside the cutoff values will be masked as True (bad).
"""
bad_pixel_mask = ~np.isfinite(data) | np.isnan(data)
if grow:
bad_pixel_mask = grow_mask(
mask=bad_pixel_mask, iterations=iterations, diagonal=diagonal
)
if bad_pixel_mask.any():
logging.warning("Bad pixels detected.")
return bad_pixel_mask, True
else:
return bad_pixel_mask, False
def grow_mask(mask, iterations, diagonal):
"""
This extends the mask by the given "iterations".
The schematic of the combination of iterations and diagonal parameters to
grow from 1 pixel to 5 by 5:
.. code-block:: python
0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 iterations = 1, diagonal = False 0 0 1 0 0
0 0 1 0 0 ------------------------------> 0 1 1 1 0
0 0 0 0 0 0 0 1 0 0
0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 iterations = 1, diagonal = True 0 1 1 1 0
0 0 1 0 0 ------------------------------> 0 1 1 1 0
0 0 0 0 0 0 1 1 1 0
0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 1 0 0
0 0 0 0 0 iterations = 2, diagonal = False 0 1 1 1 0
0 0 1 0 0 ------------------------------> 1 1 1 1 1
0 0 0 0 0 0 1 1 1 0
0 0 0 0 0 0 0 1 0 0
0 0 0 0 0 1 1 1 1 1
0 0 0 0 0 iterations = 2, diagonal = True 1 1 1 1 1
0 0 1 0 0 ------------------------------> 1 1 1 1 1
0 0 0 0 0 1 1 1 1 1
0 0 0 0 0 1 1 1 1 1
These two will arrive at the same final mask.
0 0 0 0 0 0 1 1 1 0
0 0 1 0 0 iterations = 1, diagonal = True 1 1 1 1 1
0 1 1 1 0 ------------------------------> 1 1 1 1 1
0 0 1 0 0 1 1 1 1 1
0 0 0 0 0 0 1 1 1 0
0 0 0 0 0 0 1 1 1 0
0 1 1 1 0 iterations = 1, diagonal = False 1 1 1 1 1
0 1 1 1 0 ------------------------------> 1 1 1 1 1
0 1 1 1 0 1 1 1 1 1
0 0 0 0 0 0 1 1 1 0
Parameters
----------
mask: numpy.ndarray
Input mask.
iterations: int
The number of pixel growth along the Cartesian axes directions.
diagonal: boolean
Set to True to grow in the diagonal directions.
"""
if diagonal:
struct = ndimage.generate_binary_structure(2, 2)
else:
struct = ndimage.generate_binary_structure(2, 1)
mask_grown = ndimage.binary_dilation(
input=mask, structure=struct, iterations=iterations
)
return mask_grown
def get_continuum(x, y, **kwargs):
"""
This is a wrapper function of the lowess function from statsmodels that
uses a different frac default value that is more appropriate in getting
a first guess continuum which reject "outliers" much more aggressively.
This function also takes in values in a Pythonic way that of providing
arguments: "first x then y".
Parameters
----------
x: list or numpy.ndarray
Absicissa (conventionally the first number of a coordinate pair)
y: list or numpy.ndarray
Ordinate (conventionally the second number of a coordinate pair)
**kwargs: dict
The keyword arguments for the lowess function.
"""
if "lowess_frac" not in kwargs:
kwargs["frac"] = 0.01
else:
kwargs["frac"] = kwargs["lowess_frac"]
# dictionary pass by value so it's safe to pop
kwargs.pop("lowess_frac")
if "return_sorted" not in kwargs:
kwargs["return_sorted"] = False
x = np.asarray(x, dtype=float)
y = np.asarray(y, dtype=float)
mask = np.isfinite(y) & ~np.isnan(y) & (y > 0.0)
x_smoothed = x[mask]
y_smoothed = lowess(y[mask], x_smoothed, **kwargs)
return itp.interp1d(
x_smoothed, y_smoothed, kind="cubic", fill_value="extrapolate"
)(x)