Source code for lime.fitting.lines

import logging

import numpy as np
from lmfit.models import Model
from lmfit import fit_report
from scipy.interpolate import interp1d
from scipy.special import wofz
from scipy.optimize import curve_fit
from lime.io import LiMe_Error
from lime.tools import compute_FWHM0, mult_err_propagation
import warnings
import re

_logger = logging.getLogger('LiMe')
_VERBOSE_WARNINGS = 'ignore'

c_KMpS = 299792.458  # Speed of light in Km/s

k_GaussArea = np.sqrt(2 * np.pi)
sqrt2 = np.sqrt(2)

k_gFWHM = 2 * np.sqrt(2 * np.log(2))

k_eFWHM = 2 * np.log(2)

TARGET_PERCENTILES = np.array([0, 1, 5, 10, 50, 90, 95, 99, 100])

lime_rng = np.random.default_rng()

# Atomic mass constant
amu = 1.66053906660e-27 # Kg
tiny = 1.0e-15

# Boltzmann constant
k_Boltzmann = 1.380649e-23 # m^2 kg s^-2 K^-1

# Dictionary with atomic masses https://www.ciaaw.org/atomic-weights.htm
ATOMIC_MASS = {'H': (1.00784+1.00811)/2 * amu,
               'He': 4.002602 * amu,
               'C': (12.0096+12.0116)/2 * amu,
               'N': (14.00643 + 14.00728)/2 * amu,
               'O':  (15.99903 + 15.99977)/2 * amu,
               'Ne': 20.1797 * amu,
               'S': (32.059 + 32.076)/2 * amu,
               'Cl': (35.446 + 35.457)/2 * amu,
               'Ar': (39.792 + 39.963)/2 * amu,
               'Fe': 55.845 * amu}


_AMP_PAR = dict(value=None, min=0, max=np.inf, vary=True, expr=None)
_CENTER_PAR = dict(value=None, min=-np.inf, max=np.inf, vary=True, expr=None)
_SIG_PAR = dict(value=None, min=0, max=np.inf, vary=True, expr=None)
_FRAC_PAR = dict(value=None, min=0, max=1, vary=True, expr=None)
_AREA_PAR = dict(value=None, min=-np.inf, max=np.inf, vary=True, expr=None)
_ALPHA_PAR = dict(value=None, min=np.inf, max=0, vary=True, expr=None)

_A_PAR = dict(value=None, min=0, max=np.inf, vary=True, expr=None)
_B_PAR = dict(value=None, min=0, max=np.inf, vary=True, expr=None)
_C_PAR = dict(value=None, min=-np.inf, max=np.inf, vary=True, expr=None)

_AMP_ABS_PAR = dict(value=None, min=-np.inf, max=0, vary=True, expr=None)

_SLOPE_PAR = dict(value=None, min=-np.inf, max=np.inf, vary=False, expr=None)
_INTER_PAR = dict(value=None, min=-np.inf, max=np.inf, vary=False, expr=None)

_SLOPE_FIX_PAR = dict(value=None, min=-np.inf, max=np.inf, vary=False, expr=None)
_INTER_FIX_PAR = dict(value=None, min=-np.inf, max=np.inf, vary=False, expr=None)
_SLOPE_FREE_PAR = dict(value=None, min=-np.inf, max=np.inf, vary=True, expr=None)
_INTER_FREE_PAR = dict(value=None, min=-np.inf, max=np.inf, vary=True, expr=None)


def const_cont_model(cont_array, prefix='cont_', allow_scale=False):
    """
    Build a Model that returns the supplied continuum array.
    If allow_scale=True, include a 'scale' parameter (free by default).
    If allow_scale=False, include 'scale' but freeze it at 1.0 so it's constant.
    """
    cont_array = np.asarray(cont_array)

    def array_component(x, scale=1.0):
        # x is ignored for values, but we check shape for safety
        if np.shape(x) != np.shape(cont_array):
            raise ValueError("continuum array and x must have the same shape")
        return scale * cont_array

    m = Model(array_component, prefix=prefix)
    params = m.make_params(scale=1.0)

    if not allow_scale:
        params[f'{prefix}scale'].set(vary=False)
        # freeze at 1.0 -> truly constant
    return m, params

def linear_model(x, m_cont, n_cont):
    """Linear line formulation"""
    return m_cont * x + n_cont


def gaussian_model(x, amp, center, sigma):
    """1-d gaussian curve : gaussian(x, amp, cen, wid)"""
    return amp * np.exp(-0.5 * (((x-center)/sigma) * ((x-center)/sigma)))


def lorentz_model(x, amp, center, sigma):
    """1-d lorentzian profile : lorentz(x, amp, cen, sigma)"""
    # A * (gamma / 2) ** 2 / ((x - x0) ** 2 + (gamma / 2) ** 2)
    # return (amp / np.pi) * (np.square(sigma) / (np.square(x-center) - np.square(sigma)))
    return amp * ((sigma*sigma) / ((x - center)*(x - center) + sigma*sigma))


def voigt_model(x, amp, center, sigma, gamma):
    # z = ((x-center) + 1j*gamma) / (sigma*sqrt2)
    # return amp * np.real(wofz(z)) / (sigma * k_GaussArea)
    z = (x-center + 1j*gamma) / max(tiny, (sigma*sqrt2))
    return amp*np.real(wofz(z)) / max(tiny, (sigma*k_GaussArea))


def exponential_model(x, amp, center, alpha):

    return amp * np.exp(-alpha * np.abs(x - center))


def pseudo_voigt_model(x, amp, center, sigma, frac):
    return frac * gaussian_model(x, amp, center, sigma) + (1 - frac) * lorentz_model(x, amp, center, sigma)


def broken_powerlaw_model(x, a, b, c, alpha):

    # alpha_in = np.where((x > (center - wbreak)) & (x < (center + wbreak)), 0, alpha)

    # Compute symmetric power law
    wave_shift_abs = np.abs(x - c)
    y = a * np.power(wave_shift_abs, -alpha)

    # Set broken region to zero
    idcs_core = wave_shift_abs < b
    y[idcs_core] = 0.0

    return y


def broken_powerlaw_model_ratio(x, a, b, c, alpha):

    # alpha_in = np.where((x > (center - wbreak)) & (x < (center + wbreak)), 0, alpha)

    # Compute symmetric power law
    wave_shift_abs = np.abs(x - c)
    y = a * np.power(wave_shift_abs, -alpha)

    # Set broken region to zero
    idcs_core = wave_shift_abs < b
    y[idcs_core] = 0.0

    return y


def pseudo_power_model(x, amp, center, sigma, alpha, frac):

    return frac * gaussian_model(x, amp, center, sigma) + (1 - frac) * broken_powerlaw_model(x, amp, sigma, center, alpha)


def g_FWHM(line, idx):

    sigma = line.sigma[idx]

    return k_gFWHM * sigma


def l_FWHM(line, idx):

    sigma = line.sigma[idx]

    return 2 * sigma


def v_FWHM(line, idx):

    # Approximation Kielkopf
    FWHM_gauss = g_FWHM(line, idx)
    FWHM_lorentz = l_FWHM(line, idx)

    return 0.5346 * FWHM_lorentz + np.sqrt(0.2166 * FWHM_lorentz * FWHM_lorentz + FWHM_gauss * FWHM_gauss)


def p_FWHM(line, idx):

    FWHM_power = np.nan

    return FWHM_power


def e_FWHM(line, idx):

    alpha = line.alpha[idx]

    return k_eFWHM / alpha


def pp_FWHM(line, idx):

    FWHM_gauss = g_FWHM(line, idx)

    return FWHM_gauss


def gaussian_area(line, idx, n_steps):

    amp = lime_rng.normal(line.amp[idx], line.amp_err[idx], n_steps)
    sigma = lime_rng.normal(line.sigma[idx], line.sigma_err[idx], n_steps)

    # amp = np.random.normal(line.amp[idx], line.amp_err[idx], n_steps)
    # sigma = np.random.normal(line.sigma[idx], line.sigma_err[idx], n_steps)

    # area = 2.5066282746 * line.amp[idx] * line.amp[idx]
    # area_err = area * np.sqrt(np.square(line.amp_err[idx] / line.amp[idx]) +
    #                           np.square(line.sigma_err[idx] / line.sigma[idx]))

    return 2.5066282746 * amp * sigma


def lorentz_area(line, idx, n_steps):

    amp = np.random.normal(line.amp[idx], line.amp_err[idx], n_steps)
    sigma = np.random.normal(line.sigma[idx], line.sigma_err[idx], n_steps)

    return 3.14159265 * amp * sigma


def voigt_area(line, idx, n_steps):

    amp = np.random.normal(line.amp[idx], line.amp_err[idx], n_steps)
    sigma = np.random.normal(line.sigma[idx], line.sigma_err[idx], n_steps)
    gamma = np.random.normal(line.gamma[idx], line.gamma_err[idx], n_steps)

    return gaussian_area(amp, sigma) + lorentz_area(amp, gamma)


def pseudo_voigt_area(line, idx, n_steps):

    amp = np.random.normal(line.amp[idx], line.amp_err[idx], n_steps)
    sigma = np.random.normal(line.sigma[idx], line.sigma_err[idx], n_steps)
    frac = np.random.normal(line.frac[idx], line.frac_err[idx], n_steps)

    return frac * (2.5066282746 * amp * sigma) + (1 - frac) * (3.14159265 * amp * sigma)


def power_area(line, idx, n_steps):

    return np.full(n_steps, np.nan)


def exp_area(line, idx, n_steps):

    amp = np.random.normal(line.amp[idx], line.amp_err[idx], n_steps)
    alpha = np.random.normal(line.alpha[idx], line.alpha[idx], n_steps)

    return 2.5066282746 * amp * 1/alpha


def pseudo_power_area(line, idx, n_steps):

    amp = np.random.normal(line.amp[idx], line.amp_err[idx], n_steps)
    sigma = np.random.normal(line.sigma[idx], line.sigma_err[idx], n_steps)
    frac = np.random.normal(line.frac[idx], line.frac_err[idx], n_steps)

    return frac * (2.5066282746 * amp * sigma) + (1 - frac) * (3.14159265 * amp * sigma)


def velocity_to_wavelength_band(n_sigma, band_velocity_sigma, lambda_obs, delta_instr):

    return n_sigma * ((band_velocity_sigma / c_KMpS) * lambda_obs + delta_instr)


ALL_PARAMS = np.array(['m_cont', 'n_cont', 'amp', 'center', 'sigma', 'gamma', 'alpha', 'frac', 'a', 'b', 'c'])

PROFILE_PARAMS = dict(g = ['amp', 'center', 'sigma'],
                      l = ['amp', 'center', 'sigma'],
                      v = ['amp', 'center', 'sigma', 'gamma'],
                      pv = ['amp', 'center', 'sigma', 'frac'],
                      pp = ['amp', 'center', 'sigma', 'alpha', 'frac'],
                      p = ['a', 'b', 'c', 'alpha'],
                      e = ['amp', 'center', 'alpha'])

PROFILE_ABBREV = dict(g = 'Gaussian',
                      l = 'Lorentzian',
                      v = 'Voigt',
                      pv = 'Pseudo-Voigt',
                      pp = 'Pseudo-Power law',
                      p = 'Broken Power law',
                      e = 'Exponential')

PROFILE_FUNCTIONS =  {'g': gaussian_model, 'l': lorentz_model, 'v': voigt_model,
                      'pv': pseudo_voigt_model, 'pp': pseudo_power_model,
                      'p': broken_powerlaw_model, 'e': exponential_model}

AREA_FUNCTIONS = {'g': gaussian_area, 'l': lorentz_area, 'v': voigt_area,
                  'pv': pseudo_voigt_area, 'pp': pseudo_power_area,
                  'p': power_area, 'e': exp_area}

FWHM_FUNCTIONS =  {'g': g_FWHM, 'l': l_FWHM, 'v': v_FWHM, 'pv': v_FWHM, 'pp': pp_FWHM, 'p': p_FWHM, 'e': e_FWHM}


[docs] def show_profile_parameters(profile_params=PROFILE_PARAMS, profile_abbrev=PROFILE_ABBREV): """ Display the available emission line profile models and their parameters. Parameters ---------- profile_params : dict, optional Dictionary mapping profile identifier characters (e.g., ``"g"``, ``"l"``) to lists of parameter names (e.g., ``["amplitude", "center", "sigma"]``). Default is :data:`PROFILE_PARAMS`. profile_abbrev : dict, optional Dictionary mapping profile identifier characters to their descriptive names (e.g., ``{"g": "Gaussian", "l": "Lorentzian"}``). Default is :data:`PROFILE_ABBREV`. Examples -------- Display all registered profiles: >>> show_profile_parameters() Example output: .. code-block:: text Available profiles (with their identifying character) and their parameters: - Gaussian "g": ['amp', 'center', 'sigma'] - Lorentzian "l": ['amp', 'center', 'sigma'] - Voigt "v": ['amp', 'center', 'sigma', 'gamma'] """ print("\nAvailable profiles (with their identifying character) and their parameters:") for id_character, param_list in profile_params.items(): profile = profile_abbrev[id_character] print(f'- {profile} "{id_character}": {param_list}') return
def signal_to_noise_rola(amp, std_cont, n_pixels): snr = (k_GaussArea/6) * (amp/std_cont) * np.sqrt(n_pixels) return snr def profiles_computation(line_list, log, z_corr, shape_list, x_array=None, interval=('w3', 'w4'), res_factor=100): # All lines are computed with the same wavelength interval: The maximum interval[1]-interval[0] in the log times 3 # and starting at interval[0] values beyond interval[0] are masked if x_array is None: # Create x array wmin_array = log.loc[line_list, interval[0]].to_numpy() * z_corr wmax_array = log.loc[line_list, interval[1]].to_numpy() * z_corr w_mean = np.max(wmax_array - wmin_array) x_zero = np.linspace(0, w_mean, res_factor) x_array = np.add(np.c_[x_zero], wmin_array) # Get the y array depending of the function gaussian_array = np.zeros((res_factor, len(line_list))) # Compile the flux profile for the corresponding shape for i, line_label in enumerate(line_list): profile_function = PROFILE_FUNCTIONS[shape_list[i]] if shape_list[i] == "g" or shape_list[i] == "l": amp_array = log.loc[line_list, 'amp'].to_numpy() center_array = log.loc[line_list, 'center'].to_numpy() sigma_array = log.loc[line_list, 'sigma'].to_numpy() gaussian_array[:, i] = profile_function(x_array, amp_array[i], center_array[i], sigma_array[i])[:, 0] elif shape_list[i] == "v": amp_array = log.loc[line_list, 'amp'].to_numpy() center_array = log.loc[line_list, 'center'].to_numpy() sigma_array = log.loc[line_list, 'sigma'].to_numpy() gamma_array = log.loc[line_list, 'gamma'].to_numpy() gaussian_array[:, i] = profile_function(x_array, amp_array[i], center_array[i], sigma_array[i], gamma_array[i])[:, 0] elif shape_list[i] == "pv": amp_array = log.loc[line_list, 'amp'].to_numpy() center_array = log.loc[line_list, 'center'].to_numpy() sigma_array = log.loc[line_list, 'sigma'].to_numpy() frac_array = log.loc[line_list, 'frac'].to_numpy() gaussian_array[:, i] = profile_function(x_array, amp_array[i], center_array[i], sigma_array[i], frac_array[i])[:, 0] elif shape_list[i] == "e": amp_array = log.loc[line_list, 'amp'].to_numpy() center_array = log.loc[line_list, 'center'].to_numpy() alpha_array = log.loc[line_list, 'alpha'].to_numpy() gaussian_array[:, i] = profile_function(x_array, amp_array[i], center_array[i], alpha_array[i])[:, 0] elif shape_list[i] == "pp": amp_array = log.loc[line_list, 'amp'].to_numpy() center_array = log.loc[line_list, 'center'].to_numpy() sigma_array = log.loc[line_list, 'sigma'].to_numpy() frac_array = log.loc[line_list, 'frac'].to_numpy() alpha_array = log.loc[line_list, 'alpha'].to_numpy() gaussian_array[:, i] = profile_function(x_array, amp_array[i], center_array[i], sigma_array[i], alpha_array[i], frac_array[i])[:, 0] elif shape_list[i] == "p": a_array = log.loc[line_list, 'a'].to_numpy() b_array = log.loc[line_list, 'b'].to_numpy() c_array = log.loc[line_list, 'c'].to_numpy() alpha_array = log.loc[line_list, 'alpha'].to_numpy() gaussian_array[:, i] = profile_function(x_array, a_array[i], b_array[i], c_array[i], alpha_array[i])[:, 0] else: raise LiMe_Error(f'Profile curve "{shape_list[i]}" for line {line_label} is not recognized. Please use ' f'_p-g (gaussian), _p-l (Lorentz) or _p-v (Voigt)') # Esto que hace for i in range(x_array.shape[1]): idcs_nan = x_array[:, i] > wmax_array[i] x_array[idcs_nan, i] = np.nan gaussian_array[idcs_nan, i] = np.nan return x_array, gaussian_array # All lines are computed with the wavelength range provided by the user else: # Profile container gaussian_array = np.zeros((len(x_array), len(line_list))) # Compute the individual profiles for i, comp in enumerate(line_list): profile_function = PROFILE_FUNCTIONS[shape_list[i]] if shape_list[i] in ['g', 'l']: amp = log.loc[comp.label, 'amp'] center = log.loc[comp.label, 'center'] sigma = log.loc[comp.label, 'sigma'] gaussian_array[:, i] = profile_function(x_array, amp, center, sigma) return gaussian_array def linear_continuum_computation(line_list, log, z_corr, res_factor=100, interval=('w3', 'w4'), x_array=None): # All lines are computed with the same wavelength interval: The maximum interval[1]-interval[0] in the log times 3 # and starting at interval[0] values beyond interval[0] are masked if x_array is None: idcs_lines = (log.index.isin(line_list)) m_array = log.loc[idcs_lines, 'm_cont'].values n_array = log.loc[idcs_lines, 'n_cont'].values wmin_array = log.loc[idcs_lines, interval[0]].values * z_corr wmax_array = log.loc[idcs_lines, interval[1]].values * z_corr w_mean = np.max(wmax_array - wmin_array) x_zero = np.linspace(0, w_mean, res_factor) x_array = np.add(np.c_[x_zero], wmin_array) cont_array = m_array * x_array + n_array for i in range(x_array.shape[1]): idcs_nan = x_array[:, i] > wmax_array[i] x_array[idcs_nan, i] = np.nan cont_array[idcs_nan, i] = np.nan return x_array, cont_array # All lines are computed with the wavelength range provided by the user else: cont_array = np.zeros((len(x_array), len(line_list))) for i, comp in enumerate(line_list): m_cont = log.loc[comp.label, 'm_cont'] n_cont = log.loc[comp.label, 'n_cont'] cont_array[:, i] = m_cont * x_array + n_cont return cont_array def is_digit(x): try: float(x) return True except ValueError: return False def sigma_corrections(line, idcs_line, wave_arr, R_arr, temperature): # Thermal correction line.measurements.sigma_thermal = np.full(line.measurements.sigma.size, np.nan) for i in range(line.measurements.sigma_thermal.size): atom_mass = ATOMIC_MASS.get(line.list_comps[i].particle.symbol, np.nan) line.measurements.sigma_thermal[i] = np.sqrt(k_Boltzmann * temperature / atom_mass) / 1000 # Instrumental correction if R_arr is not None: if np.isscalar(R_arr): line.measurements.sigma_instr = np.mean(wave_arr.compressed() / (R_arr * k_gFWHM)) else: mask_data = ~wave_arr.mask line.measurements.sigma_instr = np.mean(wave_arr[mask_data] / (R_arr[idcs_line][mask_data] * k_gFWHM)) wave_arr[mask_data] / (R_arr[idcs_line][mask_data] * k_gFWHM) else: line.measurements.sigma_instr = None return # TODO esto tirarlo? def compute_inst_sigma_array(wave_arr, res_power=None): # Aproximation using the wavelength array if res_power is None: deltalamb_arr = np.diff(wave_arr) R_arr = wave_arr[1:] / deltalamb_arr FWHM_arr = wave_arr[1:] / R_arr sigma_arr = np.zeros(wave_arr.size) sigma_arr[:-1] = FWHM_arr / k_gFWHM sigma_arr[-1] = sigma_arr[-2] # Use the true R_arr else: if wave_arr.size != res_power.size: raise LiMe_Error(f'The size fo the spectrum array and the resolving power array must have the same size') FWHM_arr = wave_arr / res_power sigma_arr = FWHM_arr / k_gFWHM return sigma_arr class ProfileModelCompiler: def __init__(self, line, redshift, user_conf, y, cont_arr=None, narrow_check=False): self.model = None self.params = None self.output = None self.n_comps = None self.ref_wave = None # Define Reference attributes according to single profile or multiple if not line.group == 'b': self.lcomps = [line] self.n_comps = 1 self.ref_wave = np.array([line.wavelength]) * (1 + redshift) else: self.lcomps = line.list_comps self.n_comps = len(line.list_comps) self.ref_wave = line.param_arr('wavelength') * (1 + redshift) # Continuum model if cont_arr is None: self.model = Model(linear_model) self.model.prefix = 'line0_' # Fix or not the continuum self.define_param(0, line, 'm_cont', line.measurements.m_cont, _SLOPE_FIX_PAR, user_conf) self.define_param(0, line, 'n_cont', line.measurements.n_cont, _INTER_FIX_PAR, user_conf) else: self.model = const_cont_model(cont_arr, 'line0_')[0] # Add one gaussian models per component for idx, comp in enumerate(self.lcomps): # Gaussian comp self.model += Model(PROFILE_FUNCTIONS[line.profile], prefix=f'line{idx}_') # Amplitude configuration if comp.shape == 'emi': # Emission min, init, max values min_lim = 0 peak_0 = line.measurements.peak_flux - line.measurements.cont if narrow_check: max_lim = line.measurements.peak_flux - line.measurements.cont + line.measurements.cont_err else: max_lim = line.measurements.peak_flux * 1.5 else: # Absorption trough = np.min(y)# This is necessary in case there are maximum and minimum lines... we need multiple peak_wave, peak_flux options min_lim = trough - line.measurements.cont max_lim = 0 peak_0 = line.measurements.peak_flux * 0.5 - line.measurements.cont # Add the parameters according to the profile match comp.profile: # Gaussian, lorentz and Voigt case 'g': AMP_PAR = dict(value=None, min=min_lim, max=max_lim, vary=True, expr=None) self.define_param(idx, line, 'amp', peak_0, AMP_PAR, user_conf) self.define_param(idx, line, 'center', self.ref_wave[idx], _CENTER_PAR, user_conf, redshift) self.define_param(idx, line, 'sigma', 2*line.measurements.pixelWidth, _SIG_PAR, user_conf) # Lorentz case 'l': AMP_PAR = dict(value=None, min=min_lim, max=max_lim, vary=True, expr=None) self.define_param(idx, line, 'amp', peak_0, AMP_PAR, user_conf) self.define_param(idx, line, 'center', self.ref_wave[idx], _CENTER_PAR, user_conf, redshift) self.define_param(idx, line, 'sigma', 2 * line.measurements.pixelWidth, _SIG_PAR, user_conf) # Gamma param for Voigt case 'v': AMP_PAR = dict(value=None, min=min_lim, max=max_lim, vary=True, expr=None) self.define_param(idx, line, 'amp', peak_0, AMP_PAR, user_conf) self.define_param(idx, line, 'center', self.ref_wave[idx], _CENTER_PAR, user_conf, redshift) self.define_param(idx, line, 'sigma', 2*line.measurements.pixelWidth, _SIG_PAR, user_conf) self.define_param(idx, line, 'gamma', 2*line.measurements.pixelWidth, _SIG_PAR, user_conf) # Frac for Pseudo-Voigt case 'pv': AMP_PAR = dict(value=None, min=min_lim, max=max_lim, vary=True, expr=None) self.define_param(idx, line, 'amp', peak_0, AMP_PAR, user_conf) self.define_param(idx, line, 'center', self.ref_wave[idx], _CENTER_PAR, user_conf, redshift) self.define_param(idx, line, 'sigma', 2*line.measurements.pixelWidth, _SIG_PAR, user_conf) self.define_param(idx, line, 'frac', 0.5, _FRAC_PAR, user_conf) # Exponential profile case 'e': AMP_PAR = dict(value=None, min=min_lim, max=max_lim, vary=True, expr=None) _CENTER_PAR_e = dict(value= self.ref_wave[idx], min=line.mask[0]*(1+redshift), max=line.mask[5]*(1+redshift), vary=True, expr=None) self.define_param(idx, line, 'amp', peak_0, AMP_PAR, user_conf) self.define_param(idx, line, 'center', self.ref_wave[idx], _CENTER_PAR_e, user_conf, redshift) self.define_param(idx, line, 'alpha', 1.0/line.measurements.pixelWidth, _ALPHA_PAR, user_conf) # Frac for Pseudo-Voigt case 'pp': AMP_PAR = dict(value=None, min=min_lim, max=max_lim, vary=True, expr=None) self.define_param(idx, line, 'amp', peak_0, AMP_PAR, user_conf) self.define_param(idx, line, 'center', self.ref_wave[idx], _CENTER_PAR, user_conf, redshift) self.define_param(idx, line, 'sigma', 2*line.measurements.pixelWidth, _SIG_PAR, user_conf) self.define_param(idx, line, 'frac', 0.5, _FRAC_PAR, user_conf) self.define_param(idx, line, 'alpha', 2, _ALPHA_PAR, user_conf) # Power law case 'p': A_PAR = dict(value=None, min=min_lim, max=max_lim, vary=True, expr=None) self.define_param(idx, line, 'a', peak_0, A_PAR, user_conf) self.define_param(idx, line, 'b', self.ref_wave[idx], _B_PAR, user_conf, redshift) self.define_param(idx, line, 'c', 2 * line.measurements.pixelWidth, _C_PAR, user_conf) self.define_param(idx, line, 'alpha', -2, _ALPHA_PAR, user_conf) return def fit(self, line, x, y, err, method): # Unpack the mask for LmFit analysis idcs_good = ~x.mask x_in = x.data[idcs_good] y_in = y.data[idcs_good] # Compute weights if err is None: weights_in = np.full(x[idcs_good].size, 1.0 / line.measurements.cont_err) else: weights_in = 1.0/err[idcs_good].data # Compute model params displaying an error message if failed try: self.params = self.model.make_params() except Exception as e: msg = "Error compiling the line parameters below. Please check the input fitting configuration.\n" for j, (name, conf) in enumerate(self.model.param_hints.items(), start=1): name_comps = name.split('_') msg += f"{line.list_comps[int(name_comps[0][-1])].label}_{'_'.join(name_comps[1:])}: {conf}\n" raise LiMe_Error(msg) # Run the fitting with warnings.catch_warnings(): warnings.simplefilter(_VERBOSE_WARNINGS) self.output = self.model.fit(y_in, self.params, x=x_in, weights=weights_in, method=method, nan_policy='omit') # Check fitting quality self.review_fitting(line) return def measurements_calc(self, line, user_conf, redshift): # Assign continuum values (If not already available from the fit) if getattr(line.measurements, 'm_cont') is not None: for j, param in enumerate(['m_cont', 'n_cont']): param_fit = self.output.params.get(f'line0_{param}', None) if param_fit is not None: param_value = np.nan if param_fit is None else param_fit.value param_err = np.nan if param_fit is None else param_fit.stderr setattr(line.measurements, param, param_value) setattr(line.measurements, f'{param}_err', param_err) # Loop through the line components and assign profile params for i, comp_label in enumerate(self.lcomps): for j, param in enumerate(PROFILE_PARAMS[comp_label.profile]): # Initialize the parameters if i == 0: setattr(line.measurements, param, np.full(self.n_comps, np.nan)) setattr(line.measurements, f'{param}_err', np.full(self.n_comps, np.nan)) # Recover possible component from fit param_fit = self.output.params.get(f'line{i}_{param}', None) param_value = np.nan if param_fit is None else param_fit.value param_err = np.nan if param_fit is None else param_fit.stderr # Assign array curve parameters getattr(line.measurements, param)[i] = param_value getattr(line.measurements, f'{param}_err')[i] = param_err # Initialize array parameters if i == 0: line.measurements.profile_flux = np.full(self.n_comps, np.nan) line.measurements.profile_flux_err = np.full(self.n_comps, np.nan) line.measurements.eqw = np.full(self.n_comps, np.nan) line.measurements.eqw_err = np.full(self.n_comps, np.nan) line.measurements.FWHM_p = np.full(self.n_comps, np.nan) # Check for negative -0.0 # TODO this needs a better place # FIXME -0.0 error if np.signbit(line.measurements.sigma_err[i]): line.measurements.sigma_err[i] = np.nan _logger.warning(f'Negative scale value for amplitude at {comp_label}') if np.signbit(line.measurements.amp_err[i]): line.measurements.amp_err[i] = np.nan _logger.warning(f'Negative scale value for amplitude at {comp_label}') profile_flux_dist = AREA_FUNCTIONS[comp_label.profile](line.measurements, i, 1000) line.measurements.profile_flux[i] = np.mean(profile_flux_dist) line.measurements.profile_flux_err[i] = np.std(profile_flux_dist) # Compute profile flux and uncertainty # measurements.profile_flux[i], measurements.profile_flux_err[i] = AREA_FUNCTIONS[comp_label.profile](measurements, i, 1000) # Compute FWHM_p (Profile Full Width Half Maximum) line.measurements.FWHM_p[i] = FWHM_FUNCTIONS[comp_label.profile](line.measurements, i) # Check parameters error propagation self.review_err_propagation(line.measurements, i, comp_label.label, user_conf, self.output.errorbars, line.group) # Compute the equivalent widths line.measurements.eqw = line.measurements.profile_flux / line.measurements.cont line.measurements.eqw_err = np.abs(line.measurements.eqw) * np.sqrt(np.square(line.measurements.profile_flux_err/line.measurements.profile_flux) + np.square(line.measurements.cont_err/line.measurements.cont)) # Centroid redshifts line.measurements.z_line = line.measurements.center/(self.ref_wave/(1 + redshift)) - 1 # Kinematics line.measurements.v_r = c_KMpS * (line.measurements.center - self.ref_wave) / self.ref_wave line.measurements.v_r_err = c_KMpS * line.measurements.center_err / self.ref_wave line.measurements.sigma_vel = c_KMpS * line.measurements.sigma / self.ref_wave line.measurements.sigma_vel_err = c_KMpS * line.measurements.sigma_err / self.ref_wave # Fitting diagnostics line.measurements.chisqr, line.measurements.redchi = self.output.chisqr, self.output.redchi line.measurements.aic, line.measurements.bic = self.output.aic, self.output.bic # Updated signal-to-noise based on a successful profile fitting if self.output.errorbars: line.measurements.snr_line = signal_to_noise_rola(line.measurements.amp, line.measurements.cont_err, line.measurements.n_pixels) return def define_param(self, idx, line, param_label, param_value, default_conf={}, user_conf={}, z_obj=0): comps = line.list_comps # Line name i.e. H1_6563A_w1, H1_6563A_w1_amp line_label = comps[idx] # LmFit reference line0, line0_amp user_ref = f'{line_label}_{param_label}' param_ref = f'line{idx}_{param_label}' # TODO if min max provided the value should be in the middle # Overwrite default by the one provided by the user if user_ref in user_conf: param_conf = {**default_conf, **user_conf[user_ref]} else: param_conf = default_conf.copy() # Convert from LiMe -> LmFit label configuration in the expr entries if necessary if param_conf['expr'] is not None: # Do not parse expressions for single components if line.group == 'b': expr = param_conf['expr'] for i, comp in enumerate(comps): for g_param in ALL_PARAMS: expr = expr.replace(f'{comp}_{g_param}', f'line{i}_{g_param}') param_conf['expr'] = expr else: param_conf['expr'] = None # Set initial value estimation from spectrum if not provided by the user if user_ref not in user_conf: param_conf['value'] = param_value else: # Special case inequalities: H1_6563A_w1_sigma = '>1.5*H1_6563A_sigma' if param_conf['expr'] is not None: if ('<' in param_conf['expr']) or ('>' in param_conf['expr']): # Create additional parameter ineq_name = f'{param_ref}_ineq' ineq_operation = '*' # TODO add remaining operations # Split number and ref ineq_expr = param_conf['expr'].replace('<', '').replace('>', '') ineq_items = ineq_expr.split(ineq_operation) ineq_linkedParam = ineq_items[0] if not is_digit(ineq_items[0]) else ineq_items[1] ineq_lim = float(ineq_items[0]) if is_digit(ineq_items[0]) else float(ineq_items[1]) # Stablish the inequality configuration: ineq_conf = {} # TODO need to check these limits if '>' in param_conf['expr']: ineq_conf['value'] = ineq_lim * 1.2 ineq_conf['min'] = ineq_lim else: ineq_conf['value'] = ineq_lim * 0.8 ineq_conf['max'] = ineq_lim # Define new param self.model.set_param_hint(name=ineq_name, **ineq_conf) # Prepare definition of param: new_expresion = f'{ineq_name}{ineq_operation}{ineq_linkedParam}' # param_conf = dict(expr=new_expresion) param_conf = {'value': ineq_conf['value'], 'expr': new_expresion} # Case default value is not provided else: if param_conf['value'] is None: param_conf['value'] = param_value # Additional preparation for center parameter: Multiply value, min, max by redshift if '_center' in param_ref: if user_ref in user_conf: param_user_conf = user_conf[user_ref] for param_conf_entry in ('value', 'min', 'max'): if param_conf_entry in param_user_conf: param_conf[param_conf_entry] = param_conf[param_conf_entry] * (1 + z_obj) # In case of parameter is defined by an expresion, but it has no value if (param_conf['value'] is None) and (param_value is not None): param_conf['value'] = param_value # Assign the parameter configuration to the models self.model.set_param_hint(param_ref, **param_conf) return def review_fitting(self, line): if not self.output.errorbars: if line.measurements.observations == 'no': line.measurements.observations = 'No_errorbars' else: line.measurements.observations += 'No_errorbars' _logger.warning(f'Gaussian fit uncertainty estimation failed for {line.label}') return def review_err_propagation(self, data, idx_line, comp, user_conf, error_check, line_group): # Check gaussian flux error if (data.profile_flux_err[idx_line] == 0.0) and (data.amp_err[idx_line] != 0.0) and (data.sigma_err[idx_line] != 0.0): data.profile_flux_err[idx_line] = mult_err_propagation(np.array([data.amp[idx_line], data.sigma[idx_line]]), np.array([data.amp_err[idx_line], data.sigma_err[idx_line]]), data.profile_flux[idx_line]) # Check equivalent width error if line_group == 'b': if (data.eqw_err[idx_line] == 0.0) and (data.cont_err != 0.0) and (data.profile_flux_err[idx_line] != 0.0): data.eqw_err[idx_line] = mult_err_propagation(np.array([data.cont, data.profile_flux[idx_line]]), np.array([data.cont_err, data.profile_flux_err[idx_line]]), data.eqw[idx_line]) # Continuum error if (data.m_cont_err == 0.0) and (data.m_cont != 0.0) and (data.m_cont_err_intg != 0.0): data.m_cont_err = data.m_cont_err_intg if (data.n_cont_err == 0.0) and (data.n_cont != 0.0) and (data.n_cont_err_intg != 0.0): data.n_cont_err = data.n_cont_err_intg # Velocity and sigma error from line kinematics imported from an external line if data.center_err[idx_line] == 0: if user_conf.get(f'{comp}_center_err') is not None: data.center_err[idx_line] = user_conf.get(f'{comp}_center_err', np.nan) data.sigma_err[idx_line] = user_conf.get(f'{comp}_sigma_err', np.nan) return class LineFitting: """Class to measure emission line fluxes and fit them as gaussian curves""" def __init__(self): self.line = None self.profile = None self._narrow_check = False return def integrated_properties(self, line, emis_wave, emis_flux, emis_err, cont_arr, n_steps=1000, min_array_dim=15): # Use default line shape match line.shape: case 'emi': emission_check = True peakIdx = np.argmax(emis_flux) if emission_check else np.argmin(emis_flux) case 'abs': emission_check = False peakIdx = np.argmin(emis_flux) case _: if emis_flux[emis_flux.size // 2] > line.measurements.cont: peakIdx = np.argmax(emis_flux) line.p_shape = True else: peakIdx = np.argmin(emis_flux) line.p_shape = False # Assign values peak/through properties line.measurements.n_pixels = emis_wave.compressed().size line.measurements.peak_wave = emis_wave[peakIdx] line.measurements.peak_flux = emis_flux[peakIdx] line.measurements.pixelWidth = np.diff(emis_wave).mean() # line.measurements.cont = line.measurements.peak_wave * line.measurements.m_cont + line.measurements.n_cont line.measurements.cont = cont_arr[peakIdx] line.measurements.cont_err = emis_err[peakIdx] # Warning if continuum above or below line peak/through if emission_check and (cont_arr[peakIdx] > emis_flux[peakIdx]): _logger.info(f'Line {line.label} introduced as an emission but the line peak is below the continuum level') if emission_check and (cont_arr[peakIdx] > emis_flux[peakIdx]): _logger.info(f'Line {line.label} introduced as an absorption but the line peak is below the continuum level') # Monte Carlo to measure line flux and uncertainty normalNoise = np.random.normal(0.0, emis_err, (n_steps, emis_flux.size)) lineFluxMatrix = emis_flux + normalNoise areasArray = (lineFluxMatrix.sum(axis=1) - cont_arr.sum()) * line.measurements.pixelWidth # Assign integrated fluxes and uncertainty line.measurements.intg_flux = areasArray.mean() line.measurements.intg_flux_err = areasArray.std() # Compute SN_r line.measurements.snr_line = signal_to_noise_rola(line.measurements.peak_flux - line.measurements.cont, line.measurements.cont_err, line.measurements.n_pixels) line.measurements.snr_cont = line.measurements.cont/line.measurements.cont_err # Logic for very small lines snr_array = signal_to_noise_rola(emis_flux - line.measurements.cont, line.measurements.cont_err, 1) if (np.sum(snr_array > 5) < 3) and (line.group != 'b'): self._narrow_check = True else: self._narrow_check = False # Velocity calculations if (line.measurements.n_pixels >= min_array_dim) and (self._narrow_check is False): self.velocity_profile_calc(line.measurements, peakIdx, emis_wave, emis_flux, cont_arr, emission_check, min_array_dim=min_array_dim) # Equivalent width computation (it must be a 1d array to avoid conflict in blended lines) lineContinuumMatrix = cont_arr + normalNoise eqwMatrix = areasArray / lineContinuumMatrix.mean(axis=1) line.measurements.eqw_intg = np.array(eqwMatrix.mean(), ndmin=1) line.measurements.eqw_intg_err = np.array(eqwMatrix.std(), ndmin=1) return def profile_fitting(self, line, x_arr, y_arr, err_arr, cont_arr, user_conf, fit_method='leastsq'): # Compile the Lmfit component models self.profile = ProfileModelCompiler(line, self._spec.redshift, user_conf, y_arr, cont_arr, self._narrow_check) # Fit the models self.profile.fit(line, x_arr, y_arr, err_arr, fit_method) # Store the results into the line attributes self.profile.measurements_calc(line, user_conf, self._spec.redshift) return def continuum_calculation(self, idcs_emis, idcs_cont, user_cont_source, err_from_bands): # Use the continuum bands for the calculation match user_cont_source: case 'adjacent': # Check for zero err err_cont = self._spec.err_flux[idcs_cont].compressed() if self._spec.err_flux is not None else None err_cont = err_cont if np.any(err_cont) else None # Fit the model, including uncertainties params, covariance = curve_fit(linear_model, xdata=self._spec.wave[idcs_cont].compressed(), ydata=self._spec.flux[idcs_cont].compressed(), sigma=err_cont, absolute_sigma=True, check_finite=False) self.line.measurements.m_cont, self.line.measurements.n_cont = params self.line.measurements.m_cont_err_intg, self.line.measurements.n_cont_err_intg = np.sqrt(np.diag(covariance)) cont_arr = self._spec.wave * self.line.measurements.m_cont + self.line.measurements.n_cont case 'central': x, y = self._spec.wave[idcs_emis].compressed(), self._spec.flux[idcs_emis].compressed() err = self._spec.err_flux[idcs_emis].compressed() if self._spec.err_flux is not None else [0, 0] self.line.measurements.m_cont = (y[-1] - y[0]) / (x[-1] - x[0]) self.line.measurements.n_cont = y[0] - self.line.measurements.m_cont * x[0] self.line.measurements.m_cont_err_intg = np.sqrt((err[0] * (-1 / (x[-1] - x[0]))) ** 2 + (err[-1] * (1/(x[-1] - x[0]))) ** 2) self.line.measurements.n_cont_err_intg = np.sqrt(err[0] ** 2 + (self.line.measurements.m_cont_err_intg * (-x[0])) ** 2) cont_arr = self._spec.wave * self.line.measurements.m_cont + self.line.measurements.n_cont case 'fit': x, y = self._spec.wave[idcs_cont].compressed(), self._spec.cont[idcs_cont].compressed() err = self._spec.err_flux[idcs_cont].compressed() if self._spec.err_flux is not None else [0, 0] self.line.measurements.m_cont = (y[-1] - y[0]) / (x[-1] - x[0]) self.line.measurements.n_cont = y[0] - self.line.measurements.m_cont * x[0] self.line.measurements.m_cont_err_intg = np.sqrt((err[0] * (-1 / (x[-1] - x[0]))) ** 2 + (err[-1] * (1/(x[-1] - x[0]))) ** 2) self.line.measurements.n_cont_err_intg = np.sqrt(err[0] ** 2 + (self.line.measurements.m_cont_err_intg * (-x[0])) ** 2) cont_arr = self._spec.cont case _: raise LiMe_Error(f'Continuum source "{user_cont_source}" is not recognized. ' f'Please use "central", "adjacent" and "fit".') # # Initial continuum level value # self.line.measurements.cont = (self.line.measurements.m_cont * self._spec.wave[idcs_emis].compressed().mean() + # self.line.measurements.n_cont) return cont_arr def pixel_error_calculation(self, idcs_continua, user_error_from_bands): # Constant pixel error array from adjacent bands if user_error_from_bands: return np.ma.array(np.full(self._spec.wave.shape, np.std(self._spec.flux[idcs_continua] - (self.line.measurements.m_cont * self._spec.wave[idcs_continua] + self.line.measurements.n_cont))), mask=np.zeros(self._spec.wave.shape, dtype=bool)) # Pixel array else: return self._spec.err_flux def velocity_profile_calc(self, measurements, peakIdx, selection_wave, selection_flux, selection_cont, emission_check, min_array_dim=15): # line, peakIdx, emis_flux, cont_arr, emission_check idx_0 = compute_FWHM0(peakIdx, selection_flux, -1, selection_cont, emission_check) idx_f = compute_FWHM0(peakIdx, selection_flux, 1, selection_cont, emission_check) # Velocity calculations vel_array = (c_KMpS * (selection_wave[idx_0:idx_f] - measurements.peak_wave) / measurements.peak_wave).compressed() # In case the vel_array has length zero: if vel_array.size > min_array_dim: line_flux = selection_flux[idx_0:idx_f].compressed() cont_flux = selection_cont[idx_0:idx_f].compressed() peakIdx = np.argmax(line_flux) percentFluxArray = np.cumsum(line_flux-cont_flux) * measurements.pixelWidth / measurements.intg_flux * 100 if emission_check: blue_range = line_flux[:peakIdx] > measurements.peak_flux/2 red_range = line_flux[peakIdx:] < measurements.peak_flux/2 else: blue_range = line_flux[:peakIdx] < measurements.peak_flux/2 red_range = line_flux[peakIdx:] > measurements.peak_flux/2 # In case the peak is at the edge if (blue_range.size > 2) and (red_range.size > 2): # Integrated FWHM vel_FWHM_blue = vel_array[:peakIdx][np.argmax(blue_range)] vel_FWHM_red = vel_array[peakIdx:][np.argmax(red_range)] measurements.FWHM_i = vel_FWHM_red - vel_FWHM_blue # Interpolation for integrated kinematics percentInterp = interp1d(percentFluxArray, vel_array, kind='slinear', fill_value='extrapolate') velocPercent = percentInterp(TARGET_PERCENTILES) # Bug with masked array median operation if np.ma.isMaskedArray(vel_array): #TODO Lime2.0 removal measurements.v_med = np.ma.median(vel_array) else: measurements.v_med = np.median(vel_array) measurements.w_i = (velocPercent[0] * measurements.peak_wave / c_KMpS) + measurements.peak_wave measurements.v_1 = velocPercent[1] measurements.v_5 = velocPercent[2] measurements.v_10 = velocPercent[3] measurements.v_50 = velocPercent[4] measurements.v_90 = velocPercent[5] measurements.v_95 = velocPercent[6] measurements.v_99 = velocPercent[7] measurements.w_f = (velocPercent[8] * measurements.peak_wave / c_KMpS) + measurements.peak_wave # Full width zero intensity measurements.FWZI = velocPercent[8] - velocPercent[0] W_80 = measurements.v_90 - measurements.v_10 W_90 = measurements.v_95 - measurements.v_5 # # This are not saved... should they # A_factor = ((measurements.v_90 - measurements.v_med) - (measurements.v_med-measurements.v_10)) / W_80 # K_factor = W_90 / (1.397 * measurements.FWHM_i) return def report(self, return_text=False): # Get the Lmfit report report = fit_report(self.profile.output) # Dictionary with the generic entries to line components mapping map_dict = dict(zip([f'line{i}' for i in range(len(self.line.list_comps))], self.line.param_arr('label'))) # Replace the generic names pattern = re.compile(r'(' + '|'.join(map(re.escape, map_dict.keys())) + r')') report = pattern.sub(lambda match: map_dict[match.group()], report) # Print by default, otherwise return the report string if return_text: return report else: print(report)