Source code for pycrires.util

"""
Module with utility functions for ``pycrires``.
"""

import bz2
import lzma
import os

import numpy as np
import pooch

from beartype import beartype, typing
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter


[docs] @beartype def lowpass_filter(flux: np.ndarray, window_length: int) -> np.ndarray: """ Function that low-pass filters a spectrum. Parameters ---------- flux: np.ndarray Spectrum to low-pass filter window_length : int Length of the Savitsky-Golay filter to use Returns ------- NoneType filtered: np.ndarray The low-pass filtered spectrum """ nans = np.isnan(flux) if np.sum(nans) > 0.5 * flux.size: return np.tile(np.nan, flux.size) window_length = min(window_length, 2 * (np.sum(~nans) // 2) - 1) filtered = flux.copy() filtered[~nans] = np.array(savgol_filter(flux[~nans], window_length, 2)) return filtered
[docs] @beartype def highpass_filter(order_flux: np.ndarray, window_length: int) -> np.ndarray: """ Function that high-pass filters a spectrum. Parameters ---------- flux: np.ndarray Spectrum to high-pass filter window_length : int Length of the Savitsky-Golay filter to use Returns ------- filtered: np.ndarray The high-pass filtered spectrum """ continuum = lowpass_filter(order_flux, window_length) return order_flux - continuum
[docs] @beartype def mask_tellurics( order_flux: np.ndarray, order_wl: np.ndarray, lower_lim: float = 0.5, upper_lim: float = 2.0, fill_val: float = np.nan, ) -> np.ndarray: """ Function that masks telluric absorption or emission lines. Parameters ---------- order_flux: np.ndarray 2D spectrum (N_rows, N_wavelengths) to apply the masking to. order_wl : np.ndarray 2D array (N_rows, N_wavelengths) corresponding to the wavelength at each bin. lower_lim : float Lower limit for the masking. A value of 0.5 will mask all tellurics that have less than 50% flux w.r.t to the continuum envelope. upper_lim : float Upper limit for the masking. A value of 2. will mask all tellurics that have more than 100% excess flux w.r.t to the continuum envelope. fill_val : float Value to fill the masked bins with. Returns ------- masked_order: np.ndarray 2D spectrum (N_rows, N_wavelengths) with applied masking. """ wl = np.nanmedian(order_wl, axis=0) tot_spec = np.nansum(order_flux, axis=0) percentile = np.nanpercentile(tot_spec, 0.3) mask = tot_spec > percentile continuum = np.poly1d(np.polyfit(wl[mask], tot_spec[mask], 3))(wl) cont_normalized_flux = tot_spec / continuum to_mask = (cont_normalized_flux < lower_lim) + (cont_normalized_flux > upper_lim) masked_order = np.copy(order_flux) masked_order[:, to_mask] = fill_val return masked_order
[docs] @beartype def fit_svd_kernel( order_flux: np.ndarray, order_wl: np.ndarray, star_model: np.ndarray, max_shift: int = 50, rcond: float = 1e-3, ) -> np.ndarray: """ Function that tries to determine the line spread function of each row using a Singular Value Decomposition. Parameters ---------- order_flux : np.ndarray 2D spectrum (N_rows, N_wavelengths) to apply the masking to. order_wl : np.ndarray 2D array (N_rows, N_wavelengths) corresponding to the wavelength at each bin. star_model : np.ndarray 2D array (N_rows, N_wavelengths) with the estimated stellar contribution to each row. max_shift : int Maximum allowed shift (in pixels) for the line spread function kernel. rcond : float Cutoff for small singular values in the inversion. Returns ------- result : np.ndarray 2D array (N_rows, N_wavelengths) with the stellar contribution to each row corrected for the local line spread function. """ result = np.copy(order_flux) for i, (spec, wl, model) in enumerate(zip(order_flux, order_wl, star_model)): interpolator = interp1d( wl, model, bounds_error=False, fill_value=np.nanmedian(model) ) shifts = np.arange(-max_shift, max_shift + 1e-10) dlam = np.nanmean(np.diff(wl)) modes = [interpolator(wl + shift * dlam) for shift in shifts] modes = np.array(modes) modes[np.isnan(modes)] = 0 proj_matrix = np.linalg.pinv(modes, rcond=rcond) spec[np.isnan(spec)] = 0 amps = proj_matrix.T.dot(spec) result[i] = amps.dot(modes) return result
[docs] @beartype def flag_outliers( order_flux: np.ndarray, sigma: float = 4.0, fill_value: float = np.nan ) -> np.ndarray: """ Function that flags outliers in a 2D spectrum. Parameters ---------- order_flux: np.ndarray 2D spectrum (N_rows, N_wavelengths) to flag. sigma: float Values with a 'sigma' standard deviations from the mean will be flagged. fill_value: float Value to replace the outliers with Returns ------- order_flux: np.ndarray 2D spectrum (N_rows, N_wavelengths) with flagged values. """ z = (order_flux - np.nanmedian(order_flux, axis=1)[:, np.newaxis]) / np.nanstd( order_flux, axis=1 )[:, np.newaxis] outliers = z > sigma order_flux[outliers] = fill_value return order_flux
class _Gdl: def __init__(self, vsini, epsilon): """ Calculate the broadening profile. Class copied from PyAstronomy package. Parameters ---------- vsini : float Projected rotation speed of the star [km/s] epsilon : float Linear limb-darkening coefficient """ self.vc = vsini / 299792.458 self.eps = epsilon def gdl(self, dl, refwvl, dwl): """ Calculates the broadening profile. Parameters ---------- dl : array 'Delta wavelength': The distance to the reference point in wavelength space [A]. refwvl : array The reference wavelength [A]. dwl : float The wavelength bin size [A]. Returns ------- Broadening profile : array The broadening profile according to Gray. """ self.dlmax = self.vc * refwvl self.c1 = 2.0 * (1.0 - self.eps) / (np.pi * self.dlmax * (1.0 - self.eps / 3.0)) self.c2 = self.eps / (2.0 * self.dlmax * (1.0 - self.eps / 3.0)) result = np.zeros(len(dl)) x = dl / self.dlmax indi = np.where(np.abs(x) < 1.0)[0] result[indi] = self.c1 * np.sqrt(1.0 - x[indi] ** 2) + self.c2 * ( 1.0 - x[indi] ** 2 ) # Correct the normalization for numeric accuracy # The integral of the function is normalized, however, especially in the case # of mild broadening (compared to the wavelength resolution), the discrete # broadening profile may no longer be normalized, which leads to a shift of # the output spectrum, if not accounted for. result /= np.sum(result) * dwl return result
[docs] def fastRotBroad(wvl, flux, epsilon, vsini, effWvl=None): """ Apply rotational broadening using a single broadening kernel. Function copied from PyAstronomy package. The effect of rotational broadening on the spectrum is wavelength dependent, because the Doppler shift depends on wavelength. This function neglects this dependence, which is weak if the wavelength range is not too large. .. note:: numpy.convolve is used to carry out the convolution and "mode = same" is used. Therefore, the output will be of the same size as the input, but it will show edge effects. Parameters ---------- wvl : array The wavelength flux : array The flux epsilon : float Linear limb-darkening coefficient vsini : float Projected rotational velocity in km/s. effWvl : float, optional The wavelength at which the broadening kernel is evaluated. If not specified, the mean wavelength of the input will be used. Returns ------- Broadened spectrum : array The rotationally broadened output spectrum. """ # Check whether wavelength array is evenly spaced sp = wvl[1::] - wvl[0:-1] if abs(max(sp) - min(sp)) > 1e-6: raise ValueError("Input wavelength array is not evenly spaced.") if vsini <= 0.0: raise ValueError("vsini must be positive.") if (epsilon < 0) or (epsilon > 1.0): raise ValueError( "Linear limb-darkening coefficient, epsilon, should be '0 < epsilon < 1'." ) # Wavelength binsize dwl = wvl[1] - wvl[0] if effWvl is None: effWvl = np.mean(wvl) gdl = _Gdl(vsini, epsilon) # The number of bins needed to create the broadening kernel binnHalf = int(np.floor(((vsini / 299792.458) * effWvl / dwl))) + 1 gwvl = (np.arange(4 * binnHalf) - 2 * binnHalf) * dwl + effWvl # Create the broadening kernel dl = gwvl - effWvl g = gdl.gdl(dl, effWvl, dwl) # Remove the zero entries indi = np.where(g > 0.0)[0] g = g[indi] result = np.convolve(flux, g, mode="same") * dwl return result
[docs] @beartype def load_bt_settl_template( t_eff: float, log_g: float, vsini: typing.Optional[float] = None, wl_lims: typing.Tuple[float, float] = (0.8, 3.0), ) -> typing.Tuple[np.ndarray, np.ndarray]: """ Function that loads a BT-SETTL template for the given temperature, surface gravity and vsin(i). Parameters ---------- t_eff: float Effective temperature of the model to load. The grid has a resolution of 100 K, so will be rounded to the nearest value. log_g: float Surface gravity of the model to load. The grid has a resolution of 0.5 dex, so will be rounded to the nearest value. vsini : float, None Rotational velocity of the model to load used for the broadening kernel. The broadening is not applied if the argument is set to ``None``. wl_lim : tuple(float, float) Lower and upper limits (in micron) of the wavelength range to load. Returns ------- np.ndarray Flux of the planetary template (in erg/s). np.ndarray Wavelengths. """ # The file names contain the main parameters of the models: # lte{Teff/10}-{Logg}{[M/H]}a[alpha/H].GRIDNAME.7.spec.gz/bz2/xz t_val = int(np.round(t_eff / 100)) log_g_val = 0.5 * np.round(log_g / 0.5) if t_val < 12: fname = f"lte{t_val:03d}-{log_g_val:.1f}-0.0a+0.0.BT-Settl.spec.7.bz2" else: fname = f"lte{t_val:03d}.0-{log_g_val:.1f}-0.0a+0.0.BT-Settl.spec.7.xz" data_path = os.path.join(os.getcwd(), "bt_settl_spectra") if not os.path.exists(data_path): print("Making local data folder...") os.mkdir(data_path) fpath = os.path.join(data_path, fname) decompressed_fpath = fpath[:-4] if not os.path.exists(decompressed_fpath): if t_val < 12: url = "https://phoenix.ens-lyon.fr/Grids/BT-Settl/CIFIST2011/SPECTRA/" else: url = "https://phoenix.ens-lyon.fr/Grids/BT-Settl/CIFIST2011_2015/SPECTRA/" url += fname print("Downloading BT-SETTL spectra from:", url) downloader = pooch.HTTPDownloader(verify=False) pooch.retrieve( url=url, known_hash=None, fname=fname, path=data_path, progressbar=True, downloader=downloader, ) if t_val < 12: with open(fpath, "rb") as open_file: data = open_file.read() decompressed_data = bz2.decompress(data) else: decompressed_data = lzma.open(fpath).read() with open(decompressed_fpath, "wb") as open_file: open_file.write(decompressed_data) with open(decompressed_fpath, "r", encoding="utf-8") as open_file: data = open_file.readlines() wl = np.zeros(len(data)) flux = np.zeros(len(data)) for i, line in enumerate(data): split_line = line.split() approx_wl = float(split_line[0][:6]) if (approx_wl > wl_lims[0] * 1e4) * (approx_wl < wl_lims[1] * 1e4): try: wl[i] = float(split_line[0]) * 1e-4 flux[i] = 10 ** (float(split_line[1].replace("D", "E")) - 8) except: double_splitted = split_line[0].split("-") wl[i] = float(double_splitted[0]) * 1e-4 if len(split_line) == 2: flux[i] = 10 ** (float(double_splitted[1].replace("D", "E")) - 8) else: flux[i] = 10 ** ( float( (double_splitted[1] + "-" + double_splitted[2]).replace( "D", "E" ) ) - 8 ) if wl_lims is not None: mask = (wl > wl_lims[0]) * (wl < wl_lims[1]) wl, flux = wl[mask], flux[mask] sorting = np.argsort(wl) wl = wl[sorting] flux = flux[sorting] waves_even = np.linspace(np.min(wl), np.max(wl), wl.size * 2) flux_even = interp1d(wl, flux)(waves_even) if vsini is not None: broad_flux = fastRotBroad(waves_even, flux_even, 0.0, vsini) else: broad_flux = np.copy(flux_even) return broad_flux, waves_even * 1e3