Source code for lime.transitions

import logging

import numpy as np
import pandas as pd
from pandas import DataFrame
from numpy.core.fromnumeric import argmin

from lime.io import (_LOG_COLUMNS, check_file_dataframe, LiMe_Error, _RANGE_ATTRIBUTES_FIT, _ATTRIBUTES_FIT, load_frame,
                     _LIME_FOLDER)
from lime.tools import pd_get, au, unit_conversion
from lime import rsrc_manager

_DEFAULT_PROFILE = 'g'
_DEFAULT_SHAPE = 'emi'

_logger = logging.getLogger('LiMe')

_COMPs_KEYS = {'k': 'kinem',
               'p': 'profile',
               's': 'shape',
               't': 'trans'}

ELEMENTS_DICT = dict(H='Hydrogen', He='Helium', Li='Lithium', Be='Beryllium', B='Boron', C='Carbon', N='Nitrogen', O='Oxygen', F='Fluorine',
                     Ne='Neon', Na='Sodium', Mg='Magnesium', Al='Aluminum', Si='Silicon', P='Phosphorus', S='Sulfur',
                     Cl='Chlorine', Ar='Argon', K='Potassium', Ca='Calcium', Sc='Scandium', Ti='Titanium', V='Vanadium',
                     Cr='Chromium', Mn='Manganese', Fe='Iron', Ni='Nickel', Co='Cobalt', Cu='Copper', Zn='Zinc',
                     Ga='Gallium', Ge='Germanium', As='Arsenic', Se='Selenium', Br='Bromine', Kr='Krypton', Rb='Rubidium',
                     Sr='Strontium', Y='Yttrium', Zr='Zirconium', Nb='Niobium', Mo='Molybdenum', Tc='Technetium',
                     Ru='Ruthenium', Rh='Rhodium', Pd='Palladium', Ag='Silver', Cd='Cadmium', In='Indium', Sn='Tin',
                     Sb='Antimony', Te='Tellurium', I='Iodine', Xe='Xenon', Cs='Cesium', Ba='Barium', La='Lanthanum',
                     Ce='Cerium', Pr='Praseodymium', Nd='Neodymium', Pm='Promethium', Sm='Samarium', Eu='Europium',
                     Gd='Gadolinium', Tb='Terbium', Dy='Dysprosium', Ho='Holmium', Er='Erbium', Tm='Thulium',
                     Yb='Ytterbium', Lu='Lutetium', Hf='Hafnium', Ta='Tantalum', W='Tungsten', Re='Rhenium', Os='Osmium',
                     Ir='Iridium', Pt='Platinum', Au='Gold', Hg='Mercury', Tl='Thallium', Pb='Lead', Bi='Bismuth',
                     Th='Thorium', Pa='Protactinium', U='Uranium', Np='Neptunium', Pu='Plutonium', Am='Americium',
                     Cm='Curium', Bk='Berkelium', Cf='Californium', Es='Einsteinium', Fm='Fermium', Md='Mendelevium',
                     No='Nobelium', Lr='Lawrencium', Rf='Rutherfordium', Db='Dubnium', Sg='Seaborgium', Bh='Bohrium',
                     Hs='Hassium', Mt='Meitnerium', Ds='Darmstadtium', Rg='Roentgenium', Cn='Copernicium', Nh='Nihonium',
                     Fl='Flerovium', Mc='Moscovium', Lv='Livermorium', Ts='Tennessine', Og='Oganesson')


VAL_LIST = [1000, 900, 500, 400, 100, 90, 50, 40, 10, 9, 5, 4, 1]
SYB_LIST = ["M", "CM", "D", "CD", "C", "XC", "L", "XL", "X", "IX", "V", "IV", "I"]


# Reading file with the format and export status for the measurements
_DATABASE_FILE = rf'{_LIME_FOLDER}/resources/lines_database_v2.0.0.txt'

def check_lines_frame_units(frame):

    if 'units_wave' in frame.columns:
        return au.Unit(frame["units_wave"].iat[0])
    else:
        return au.Unit(Line.from_transition(frame.index[0], data_frame=frame).units_wave)


class LinesDatabase:

    def __init__(self, frame_address=None, default_shape=None, default_profile=None):

        # Default_lines database
        self.frame_address = _DATABASE_FILE if frame_address is None else frame_address
        self.frame = load_frame(self.frame_address)

        # Default values
        self._vacuum_check = False
        self._shape = 'emi' if default_shape is None else default_shape
        self._profile = 'g' if default_profile is None else default_profile
        self.set_units_wave()

        return

    def set_database(self, wave_intvl=None, line_list=None, particle_list=None, redshift=None, units_wave='Angstrom',
                     sig_digits=4, vacuum_waves=False, ref_bands=None, update_labels=False, update_latex=False,
                     exclude_lines=None, default_shape=None, default_profile=None):

        # Reload the database at each modification to avoid contamination
        ref_bands = load_frame(self.frame_address) if ref_bands is None else ref_bands

        self.frame = lines_frame(wave_intvl, line_list, particle_list, redshift, units_wave, sig_digits=sig_digits,
                                 vacuum_waves=vacuum_waves, ref_bands=ref_bands, update_labels=update_labels,
                                 update_latex=update_latex, rejected_lines=exclude_lines)

        self._vacuum_check = vacuum_waves

        self.set_units_wave()

        if default_shape:
            self.set_shape(default_shape)

        if default_profile:
            self.set_profile(default_profile)

        return

    def reset(self, frame_address=None, default_shape=None, default_profile=None):

        self.__init__(frame_address, default_shape, default_profile)

        return

    def copy(self):

        return self.frame.copy()

    def set_shape(self, value):
        self._shape = value
        return

    def set_profile(self, value):
        self._profile = value
        return

    def set_units_wave(self):

        self._units_wave = check_lines_frame_units(self.frame)

        return

    def get_shape(self):
        return self._shape

    def get_profile(self):
        return self._profile

    def get_units(self):
        return self._units_wave


rsrc_manager.lineDB = LinesDatabase(_DATABASE_FILE)


def int_to_roman(num):
    i, roman_num = 0, ''
    while num > 0:
        for _ in range(num // VAL_LIST[i]):
            roman_num += SYB_LIST[i]
            num -= VAL_LIST[i]
        i += 1
    return roman_num


def recover_ionization(string):

    part_len = len(string)
    index = part_len - 1  # Start at the last character
    while index >= 0 and string[index].isdigit():
        index -= 1

    # All characters
    if index == part_len - 1:
        particle, ionization = string, None

    # All numbers
    elif index < 0:
        particle, ionization = string, None

    # Particle and ionization
    else:
        particle, ionization = string[0:index+1], int(string[index+1:])

    return particle, ionization


def recover_transition(particle, input_trans=None):

    # Check for PyNeb particle notation
    if input_trans is None:

        element = ELEMENTS_DICT.get(particle.symbol)
        if (element is not None) and (particle.ionization is not None):

            if particle.label in ('H1', 'He1', 'He2'):
                transition = 'rec'
            else:
                transition = 'col'
        else:
            transition = None

        return transition

    else:
        return input_trans


def particle_notation(particle, transition):

    # Pre-assigned a transition time if particle and ionization
    if transition is None:
        transition = recover_transition(particle)

    if transition is not None:

        try:

            # Get ionization numeral
            particle = particle if not isinstance(particle, str) else Particle.from_label(particle)
            ionization_roman = int_to_roman(particle.ionization)
            part_label = f'{particle.symbol}{ionization_roman}'

            # Collisional excited
            if transition == 'col':
                part_label = f'$[{part_label}]'

            # Semi-forbidden
            elif transition == 'sem':
                part_label = f'${part_label}]'

            # Recombination
            else:
                part_label = f'${part_label}'

        except:
            part_label = f'{particle}-$'

    else:
        part_label = f'{particle}-$'

    return part_label


def air_to_vacuum_function(wave_array, units_wave='AA'):

    r"""
    Converts air wavelengths to vacuum wavelengths using the method of `Greisen et al. (2006, A&A 446, 747) <https://ui.adsabs.harvard.edu/abs/2006A%26A...446..747G/abstract>`_.

    This function applies a wavelength correction to account for the refractive index of air. The input wavelengths are assumed to be measured in air, and
    the output values correspond to vacuum wavelengths. The conversion follows the formula:

    .. math::

        \lambda_{\text{vac}} = \lambda_{\text{air}} \times \left( 1 + 10^{-6} \times
        \left( 287.6155 + 1.62887 \sigma^2 + 0.01360 \sigma^4 \right) \right)

    where:

    .. math::

        \sigma = \frac{1}{\lambda_{\text{air}}^2} \quad \text{(in microns)}.

    :param wave_array: Input array of wavelengths in air.
    :type wave_array: numpy.ndarray or astropy.units.Quantity

    :param units_wave: The wavelength unit (default: ``'AA'`` for Angstroms). If `wave_array` is an `astropy.Quantity`, this parameter is ignored.
    :type units_wave: str, optional

    :return: The converted vacuum wavelengths, in the same unit as the input.
    :rtype: numpy.ndarray or astropy.units.Quantity

    :raises ValueError: If the input wavelength array is not valid.

    **Example Usage:**

    Converting an array of air wavelengths to vacuum wavelengths:

    .. code-block:: python

        import numpy as np
        import astropy.units as u
        from your_module import air_to_vacuum_function

        air_waves = np.array([5000, 6000, 7000])  # Angstroms
        vac_waves = air_to_vacuum_function(air_waves)

        print(vac_waves)  # Output: Vacuum wavelengths in the same unit

    Using Astropy units:

    .. code-block:: python

        air_waves = np.array([5000, 6000, 7000]) * u.AA
        vac_waves = air_to_vacuum_function(air_waves)

        print(vac_waves.to(u.nm))  # Convert output to nanometers

    """

    wave_arr_um = wave_array * au.Unit(units_wave)
    sigma2 = 1/np.square(wave_arr_um.to(au.um).value)

    return wave_array / (1 + 1e-6 * (287.6155 + 1.62887 * sigma2 + 0.01360 * np.square(sigma2)))


def check_units_from_wave(line, str_ion, str_wave, bands):

    # First the input database
    if bands is not None:

        # Check literal unit
        wave = pd_get(bands, line, 'wavelength', nan_to_none=True)
        units = pd_get(bands, line, 'units_wave', nan_to_none=True)

        # Check core element
        if wave is None:
            core_element = f'{str_ion}_{str_wave}'
            wave = pd_get(bands, core_element, 'wavelength', nan_to_none=True)
            units = pd_get(bands, core_element, 'units_wave', nan_to_none=True)

        # Convert to units
        units = au.Unit(units) if units is not None else units

    else:
        wave, units = None, None

    # Second the reference database
    if (units is None) or (wave is None):
        wave = pd_get(rsrc_manager.lineDB.frame, line, 'wavelength', nan_to_none=True)
        units = pd_get(rsrc_manager.lineDB.frame, line, 'units_wave', nan_to_none=True)

        # Convert to units
        units = au.Unit(units) if units is not None else units

    # Third decipher from label
    if (units is None) or (wave is None):

        # First check for Angstroms
        if str_wave[-1] == 'A':
            units = au.Unit('AA')
            wave = float(str_wave[:-1])

        else:
            au_unit = au.Unit(str_wave)
            units = au_unit.bases[0]
            wave = au_unit.scale

    return wave, units


def check_line_in_log(input_wave, log=None, tol=1):

    # Guess the transition if only a wavelength provided
    if not isinstance(input_wave, str):
        if log is not None:

            # Check the wavelength from the wave_obs column or get it from the labels
            if 'wavelength' in log.columns:
                ref_waves = log.wavelength.to_numpy()
            else:
                ion_array, ref_waves = zip(*log.index.str.split('_').to_numpy())
                _wave0, units_wave = check_units_from_wave(ref_waves[0])
                ref_waves = np.char.strip(ref_waves, units_wave).astype(float)

            # Locate the best candidate
            idx_closest = np.argmin(np.abs(ref_waves - input_wave))
            label = log.iloc[idx_closest].name
            _logger.warning(f'The transition wavelength "{input_wave}" has been identified as "{label}"')

            # Check if table rows are not sorted
            if not all(np.diff(ref_waves) >= 0): # TODO we might need to use something else than searchsorted
                _logger.warning(f'\nThe lines log rows are not sorted from lower to higher wavelengths.\nThis can cause '
                                f'issues to identify the lines using the transition wavelength.\nTry to use the string '
                                f'line label')

            return label

        else:
            _logger.critical(f'The line "{input_wave}" could not be identified: A lines log was not provided')

    return input_wave


def latex_from_label(label, particle=None, wave=None, units_wave=None, kinem=None, transition_comp=None, scalar_output=False,
                     decimals=None):

    # Use input values if provided else compute from label
    if (particle is None) or (wave is None) or (wave is None) or (kinem is None) or (transition_comp is None):
        particle, wave, units_wave, kinem, profile_comp, transition_comp = label_composition(label)

    # Reshape to 1d array
    else:
        particle = np.array(particle, ndmin=1)
        wave = np.array(wave, ndmin=1)
        units_wave = np.array(units_wave, ndmin=1)
        kinem = np.array(kinem, ndmin=1)
        transition_comp = np.array(transition_comp, ndmin=1)

    n_items = wave.size
    latex_array = np.empty(n_items).astype('object')

    # Significant figures
    if decimals is None:
        wave = np.round(wave, 0).astype(int)
    else:
        wave = np.round(wave, decimals)

    for i in np.arange(n_items):

        # Particle label
        part_label = particle_notation(particle[i], transition_comp[i])

        # Wavelength and units label
        units_latex = f'{units_wave[i]:latex}'[9:-2].replace(' ',r'\,')

        # Kinematic label
        kinetic_comp = '' if kinem[i] == 0 else f'-k_{kinem[i]}'

        # Combine items
        latex_array[i] = f'{part_label}{wave[i]}{units_latex}{kinetic_comp}$'

    # Scalar output if requested and 1 length array
    if scalar_output:
        if latex_array.size == 1:
            latex_array = latex_array[0]

    return latex_array


def label_composition(line_list, bands=None, default_profile=None):

    # Empty containers for the label componentes
    n_comps = len(line_list)
    particle = [None] * n_comps #empty(n_comps).astype(str)
    wavelength = np.empty(n_comps)
    units_wave = [None] * n_comps
    kinem = np.zeros(n_comps, dtype=int)
    profile_comp = [None] * n_comps
    transition_comp = [None] * n_comps

    # If there isn't an input profile use LiMe default
    default_profile = _DEFAULT_PROFILE if default_profile is None else default_profile

    # Loop through the components and get the components
    for i, line in enumerate(line_list):

        line_items = line.split('_')

        # Check the line has the items
        if len(line_items) < 2:
            raise LiMe_Error(f'The blend/merged component "{line}" in the transition list "{line_list}" does not have a'
                             f' recognised format. Please use a "Particle_WavelengthUnits" format in the configuration'
                             f'file')

        # Particle properties
        particle[i] = Particle.from_label(line_items[0])

        # Wavelength properties
        wavelength[i], units_wave[i] = check_units_from_wave(line, line_items[0], line_items[1], bands)

        # Split the optional components: "H1_1216A_t-rec_k-0_p-g" -> {'t': 'rec', 'k': '0', 'p': 'g'} # TODO better do that with optional_comps
        comp_conf = {optC[0]: optC[2:] for optC in line_items[2:]}

        # Check there are no accepted optional components
        for opt_key in comp_conf.keys():
            if opt_key != 'm' and opt_key != 'b':
                if opt_key not in ['k', 'p', 't', 's']:
                    _logger.warning(f'Optional component "{opt_key}" is not recognised. Please use "_k-", "_p-", "_t-".')

        # Kinematic component
        kinem[i] = int(comp_conf.get('k', 0))

        # Profile component
        profile_comp[i] = comp_conf.get('p', None)
        if profile_comp[i] is None:
            profile_comp[i] = default_profile

        # Transition component
        trans = comp_conf.get('t', None)

        # If none is provided check from the table
        if (trans is None) and (bands is not None):
            trans = pd_get(bands, line, 'transition')

        # Else assume default
        if trans is None:
            trans = recover_transition(particle[i])

        transition_comp[i] = trans

    return particle, wavelength, units_wave, kinem, profile_comp, transition_comp


[docs] def label_decomposition(lines_list, bands=None, fit_conf=None, params_list=('particle', 'wavelength', 'latex_label'), scalar_output=False, verbose=True): """ Decompose LiMe line labels into requested physical/formatting parameters. Given a list of line labels in LiMe notation, this function returns one or more arrays (or scalars) with selected parameters such as the particle label, line wavelength, LaTeX label, kinematic setup, profile, and transition information. When a bands table and/or a fitting configuration are provided, they are used (via :meth:`Line.from_transition`) to enrich or override defaults; otherwise, values are derived from the label and LiMe's internal database. Parameters ---------- lines_list : list of str Sequence of line labels in LiMe notation (e.g., ``["O3_5007A", "H1_4861A"]``). bands : pandas.DataFrame or str or pathlib.Path, optional Bands table, or a file path to a readable bands table. When provided, its information is used by :meth:`Line.from_transition` to refine wavelengths, components, labels, etc. fit_conf : dict, optional Fitting configuration dictionary (e.g., parsed from TOML) used to override defaults (wavelengths, blends, shapes/profiles). params_list : tuple of {"particle", "wavelength", "latex_label", "kinem", "profile_comp", "transition_comp"}, optional Ordered list of output parameters to return. Default is ``("particle", "wavelength", "latex_label")``. scalar_output : bool, optional If ``True`` **and** only a single input label is provided, return scalars instead of 1-D arrays. Default is ``False``. verbose : bool, optional If ``True``, propagate verbose behavior to :meth:`Line.from_transition`. Default is ``True``. Returns ------- tuple A tuple with the same length and order as ``params_list``. Each element is: - For multiple input labels: a 1-D ``ndarray`` with one value per label. - For a single input label and ``scalar_output=True``: a scalar value. The possible entries are: - ``"particle"`` : array of str — particle labels (e.g., ``"O3"``). - ``"wavelength"`` : array of float — wavelengths. - ``"latex_label"`` : array of str — LaTeX-formatted labels. - ``"kinem"`` : array of objects — kinematic configuration per line. - ``"profile_comp"`` : array of objects — line profile identifiers. - ``"transition_comp"`` : array of objects — transition descriptors. Notes ----- - Each line is resolved through :meth:`Line.from_transition(label, fit_conf, bands, verbose=verbose)`. Missing information is filled from LiMe’s default database where possible. - The function constructs an internal DataFrame with columns: ``["particle", "wavelength", "latex_label", "kinem", "profile_comp", "transition_comp"]``, then extracts the columns requested by ``params_list``. - Column ``"wavelength"`` is coerced to numeric. Examples -------- Return particle, wavelength, and LaTeX label arrays for two lines: >>> particles, waves, latex = label_decomposition( ... ["O3_5007A", "H1_4861A"], ... params_list=("particle", "wavelength", "latex_label") ... ) Use a fitting configuration and bands table; request only wavelengths: >>> (waves,) = label_decomposition( ... ["O2_3726A", "O2_3729A"], ... bands=bands_df, ... fit_conf=fit_cfg, ... params_list=("wavelength",) ... ) Single label with scalar output: >>> particle, wl = label_decomposition( ... ["O3_5007A"], ... params_list=("particle", "wavelength"), ... scalar_output=True ... ) >>> isinstance(wl, float) True """ headers = ['particle', 'wavelength', 'latex_label', 'kinem', 'profile_comp', 'transition_comp'] lines_df = pd.DataFrame(index=np.array(lines_list, ndmin=1), columns=headers) # Loop through the lines and derive their properties: for label in lines_df.index: line = Line.from_transition(label, fit_conf, bands, verbose=verbose) lines_df.loc[label, :] = (line.particle.label, line.wavelength, line.latex_label, line.kinem, line.profile, line.trans) # Adjust column types lines_df['wavelength'] = pd.to_numeric(lines_df['wavelength']) # Recover the columns requested by the user output = [] for i, param in enumerate(params_list): output.append(lines_df[param].to_numpy(copy=True)) # If requested and single line, return the input as a scalar if scalar_output and (output[0].shape[0] == 1): output = tuple(item[0] for item in output) else: output = tuple(output) return output
def label_profiling(profile_comp, default_type='emi'): _p_type_list, _p_shape_list = [], [] for prof_comp in profile_comp: _line_p_items = prof_comp.split('-') # Type of profile _p_shape = _line_p_items[0] # In case the user does not provide emi or abs _p_type = 'emi' if len(_line_p_items) == 1 else _line_p_items[1] # Emission or absorption _p_type = True if _p_type == default_type else False # Append to list _p_shape_list.append(_p_shape) _p_type_list.append(_p_type) _p_shape_list, _p_type_list = np.array(_p_shape_list), np.array(_p_type_list) return _p_shape_list, _p_type_list def label_mask_assigning(line_label, input_band, blend_check, merged_check, core_comp): if isinstance(input_band, DataFrame): # Query for input label if pd_get(input_band, line_label, 'w1') is not None: ref_label = line_label # Blended or merged line elif pd_get(input_band, line_label[:-2], 'w1') is not None: ref_label = line_label[:-2] # Case where we introduce a line with a different profile (H1_4861A_l) elif pd_get(input_band, core_comp, 'w1') is not None: ref_label = core_comp # Not found else: ref_label = None # Recover the mask if ref_label is not None: mask = np.array([input_band.at[ref_label, 'w1'], input_band.at[ref_label, 'w2'], input_band.at[ref_label, 'w3'], input_band.at[ref_label, 'w4'], input_band.at[ref_label, 'w5'], input_band.at[ref_label, 'w6']]) else: mask = None # No band elif input_band is None: mask = None # Convert input to numpy array else: mask = np.atleast_1d(input_band) return mask def format_line_mask_option(entry_value, wave_array): # Check if several entries formatted_value = entry_value.split(',') if ',' in entry_value else [f'{entry_value}'] # Check if interval or single pixel mask for i, element in enumerate(formatted_value): if '-' in element: formatted_value[i] = element.split('-') else: element = float(element) pix_width = (np.diff(wave_array).mean())/2 formatted_value[i] = [element-pix_width, element+pix_width] formatted_value = np.array(formatted_value).astype(float) return formatted_value
[docs] def lines_frame(wave_intvl=None, line_list=None, particle_list=None, redshift=None, units_wave='Angstrom', sig_digits=4, vacuum_waves=False, ref_bands=None, update_labels=False, update_latex=False, rejected_lines=None): """ This function returns `LiMe bands database <https://lime-stable.readthedocs.io/en/latest/inputs/n_inputs3_line_bands.html>`_ as a pandas dataframe. If the user provides a wavelength array (``wave_inter``) the output dataframe will be limited to the lines within this wavelength interval. Similarly, the user provides a ``lines_list`` or a ``particle_list`` the output bands will be limited to the these lists. These 2_guides must follow `LiMe notation style <https://lime-stable.readthedocs.io/en/latest/inputs/n_inputs2_line_labels.html>`_ If the user provides a redshift value alongside the wavelength interval (``wave_intvl``) the output bands will be limited to the transitions at that observed range. The user can specify the desired wavelength units using the `astropy string format <https://docs.astropy.org/en/stable/units/ref_api.html>`_ or introducing the `astropy unit variable <https://docs.astropy.org/en/stable/units/index.html>`_. The default value unit is angstroms. The argument ``sig_digits`` determines the number of decimal figures for the line labels. The user can request the output line labels and bands wavelengths in vacuum setting ``vacuum=True``. This conversion is done using the relation from `Greisen et al. (2006) <https://www.aanda.org/articles/aa/abs/2006/05/aa3818-05/aa3818-05.html>`_. Instead of the default LiMe database, the user can provide a ``ref_bands`` dataframe (or the dataframe file address) to use as the reference database. :param wave_intvl: Wavelength interval for output line transitions. :type wave_intvl: list, numpy.array, lime.Spectrum, lime.Cube, optional :param line_list: Line list for output line bands. :type line_list: list, numpy.array, optional :param particle_list: Particle list for output line bands. :type particle_list: list, numpy.array, optional :param redshift: Redshift interval for output line bands. :type redshift: list, numpy.array, optional :param units_wave: Labels and bands wavelength units. The default value is "A". :type units_wave: str, optional :param sig_digits: Number of decimal figures for the line labels. :type sig_digits: int, optional :param vacuum_waves: Set to True for vacuum wavelength values. The default value is False. :type vacuum_waves: bool, optional :param ref_bands: Reference bands dataframe. The default value is None. :type ref_bands: pandas.Dataframe, str, pathlib.Path, optional :return: """ # Use the default lime mask if none provided ref_bands = ref_bands if ref_bands is not None else rsrc_manager.lineDB.frame # Load the reference bands bands_df = check_file_dataframe(ref_bands) # Convert to requested units if units_wave != 'Angstrom': wave_columns = ['wave_vac', 'wavelength', 'w1', 'w2', 'w3', 'w4', 'w5', 'w6'] conversion_factor = unit_conversion(in_units='Angstrom', out_units=units_wave, wave_array=1) bands_df.loc[:, wave_columns] = bands_df.loc[:, wave_columns] * conversion_factor bands_df['units_wave'] = units_wave else: conversion_factor = 1 # First slice by wavelength and redshift idcs_rows = np.ones(bands_df.index.size).astype(bool) if wave_intvl is not None: w_min, w_max = wave_intvl[0], wave_intvl[-1] # Account for redshift redshift = redshift if redshift is not None else 0 if 'wavelength' in bands_df.columns: wave_arr = bands_df['wavelength'] * (1 + redshift) else: wave_arr = label_decomposition(bands_df.index.to_numpy(), params_list=['wavelength'])[0] * conversion_factor # Compare with wavelength values idcs_rows = idcs_rows & (wave_arr >= w_min) & (wave_arr <= w_max) # Second slice by particle if particle_list is not None: idcs_rows = idcs_rows & bands_df.particle.isin(particle_list) # Finally slice by the name of the lines if line_list is not None: idcs_rows = idcs_rows & bands_df.index.isin(line_list) # Convert to vacuum wavelengths if requested but after renaming the labels to keep air if requested if vacuum_waves: bands_df['wavelength'] = bands_df['wave_vac'] bands_lim_columns = ['w1', 'w2', 'w3', 'w4', 'w5', 'w6'] bands_df[bands_lim_columns] = air_to_vacuum_function(bands_df[bands_lim_columns].to_numpy()) # Final table bands_df = bands_df.loc[idcs_rows] # Update the labels if requested if update_labels or update_latex: n_lines = bands_df.index.size labels, latex_list = ([None] * n_lines if update_labels else None, [None] * n_lines if update_latex else None) for i, label_i in enumerate(bands_df.index): line = Line.from_transition(label_i, data_frame=bands_df) line.update_labels(sig_digits=sig_digits) if update_labels: labels[i] = line.label if update_latex: latex_list[i] = line.latex_label if update_latex: bands_df["latex_label"] = latex_list if update_labels: bands_df.rename(index=dict(zip(bands_df.index, labels)), inplace=True) # Exclude lines if rejected_lines is not None: bands_df = bands_df.loc[~bands_df.index.isin(rejected_lines)] return bands_df
def bands_from_measurements(frame, sample_levels=['id', 'line'], sort=True, remove_empty_columns=False, index_dict=None, bands_hdrs=None): """ Re-constructs the lines frames from an output measurements lines frame consolidating the merged and blended lines. Given a LiMe measurements table (or a path to one), this function produces a bands DataFrame with wavelength limits (``w1``–``w6``), representative wavelength, grouping labels, and basic metadata (units, particle, transition, LaTeX label). It supports both single-index and MultiIndex inputs and handles merged/blended groups, including renaming and de-duplication logic. Parameters ---------- frame : pandas.DataFrame or str or pathlib.Path Measurements table or a file path that can be read into a measurements DataFrame. The table is validated/loaded via ``check_file_dataframe``. sample_levels : list of str, optional For MultiIndex inputs, the index levels that identify a line sample (e.g., ``["id", "line"]``). Default is ``["id", "line"]``. sort : bool, optional If ``True``, sort the output by ``["wavelength", "group_label"]``. Default is ``True``. remove_empty_columns : bool, optional If ``True``, drop columns that are entirely ``NaN`` in the result. Default is ``False``. index_dict : dict, optional Mapping to rename output line labels. Applied at the end of processing. bands_hdrs : sequence of str, optional Column set for the output table. When not provided the default is ``("wavelength","w1","w2","w3","w4","w5","w6","group_label","units_wave","particle","trans","latex_label")``. Returns ------- pandas.DataFrame A bands table indexed by line labels (with ``_b`` suffix for blended groups and ``_m`` for merged labels when applicable). Notes ----- - **Single-index frames:** - Single lines are selected where ``group_label == "none"`` and copied directly. - Grouped lines are consolidated by unique ``group_label``, and blended labels get a ``_b`` suffix if they do not already end with ``"_m"``. - **MultiIndex frames:** - Unique line labels are retrieved from the given ``sample_levels``. - For each line and each group in its rows, the band limits (``w1``–``w6``) are computed as the **median** across matching entries. - Labels are determined as: - Single (``group == "none"``): the original line label. - Merged: preserve the ``_m`` label. - Blended: add ``_b`` to the label; if already present in the index, a disambiguated label like ``"{species}-{j}_{rest}_b"`` is created. - A minimal single-row reference DataFrame is built and passed to :meth:`Line.from_transition` to recover canonical metadata (wavelength, units, particle, LaTeX label, mask). - The representative wavelength and units are taken from the line’s reference component; mask values populate ``w1``–``w6``. - **Blended lines with merged components:** - Rows sharing identical band limits (``w1``–``w6``) and with ``group_label != "none"`` are considered duplicates. - Duplicates are collapsed into a single blended entry. The surviving row’s ``group_label`` becomes the ``"+"``-joined list of merged components, and its index is renamed with a ``_b`` suffix (or from ``*_m`` → ``*_b``). - If this auto-generated label isn’t in ``index_dict``, a warning is logged suggesting you add a stable rename. - **Sorting and cleanup:** ``sort`` orders the result; ``remove_empty_columns`` prunes all-NaN columns; ``index_dict`` renames the final index. Examples -------- From a single-index measurements table: >>> bands = bands_from_measurements(frame) With MultiIndex and custom sample levels, dropping empty columns: >>> bands = bands_from_measurements( ... frame, ... sample_levels=["object_id", "line"], ... remove_empty_columns=True ... ) Enforcing stable external IDs: >>> rename_map = {"H1_6563A_b": "H1-N2_6563A_b"} >>> bands = bands_from_measurements(frame, index_dict=rename_map) """ # Load the frame if necessary frame = check_file_dataframe(frame, sample_levels=sample_levels) # Single frame if not isinstance(frame.index, pd.MultiIndex): bands_hdrs = ('wavelength', 'w1', 'w2', 'w3', 'w4', 'w5', 'w6', 'group_label', 'units_wave', 'particle', 'trans', 'latex_label') if bands_hdrs is None else None # Make dataframe with single lines idcs_single = frame.group_label == 'none' bands = frame.reindex(index=frame.loc[idcs_single].index, columns=bands_hdrs) # Get the indexes of grouped lines group_labels_arr = frame.loc[~idcs_single].group_label unique_values, unique_indices = np.unique(group_labels_arr, return_index=True) group_labels_arr = group_labels_arr.iloc[np.sort(unique_indices)] # Make dataframe with grouped lines and add blended suffix bands_group = frame.reindex(index=group_labels_arr.index, columns=bands_hdrs) blended_arr = bands_group.loc[~bands_group.index.to_series().str.endswith('_m')].index.to_numpy() bands_group.rename(index={key: f"{key}_b" for key in blended_arr}, inplace=True) # Combine the dataframes bands = pd.concat([bands, bands_group], axis=0) # Multi-index else: line_list = frame.index.get_level_values(sample_levels).unique() idcs_single = frame.group_label == 'none' bands = frame.reindex(index=frame.loc[idcs_single].index, columns=bands_hdrs) # Loop through the lines for i, line_label in enumerate(line_list): # Exclude kinematic components: if '_k-' not in line_label: df_line = frame.xs(line_label, level=sample_levels, drop_level=False) group_list = df_line.group_label.unique() for j, group in enumerate(group_list): # Get same group entries and compute mean bands df_group = df_line.loc[df_line.group_label == group] bands_limits = np.median(df_group.loc[:, 'w1':'w6'].to_numpy(), axis=0) # Single if group == 'none': entry_label = line_label # Merged and blended else: # Merged if line_label.endswith('_m'): entry_label = line_label # Blended Compute name else: entry_label = f'{line_label}_b' if entry_label in bands.index: comps = line_label.split('_') entry_label = f'{comps[0]}-{j}_{comps[1]}_b' # Re-assign scalar wavelength # Generate single line df with the line information ref_df = df_group.iloc[0,:].copy() ref_df.name = entry_label ref_df['w1':'w6'] = bands_limits ref_df = ref_df.to_frame().T # Define LiMe line and add data to dataframe line = Line.from_transition(entry_label, data_frame=ref_df) # Assign the values: bands.loc[line.label, 'wavelength'] = line.wavelength[line._ref_idx] bands.loc[line.label, 'w1':'w6'] = line.mask bands.loc[line.label, 'group_label'] = line.group_label bands.loc[line.label, 'latex_label'] = line.latex_label bands.loc[line.label, 'units_wave'] = line.units_wave[line._ref_idx] bands.loc[line.label, 'particle'] = line.particle[line._ref_idx] # Check for blended merged lines idcs_merged_blended = bands.duplicated(subset=['w1', 'w2', 'w3', 'w4', 'w5', 'w6'], keep=False) & (bands.group_label != 'none') duplicate_groups = (bands.loc[idcs_merged_blended].groupby(['w1', 'w2', 'w3', 'w4', 'w5', 'w6']).filter(lambda x: len(x) > 1) # Filter for groups with more than one row (i.e., duplicates) .groupby(['w1', 'w2', 'w3', 'w4', 'w5', 'w6']).groups) idcs_repeated_bands = [index.tolist() for index in duplicate_groups.values()] if len(idcs_repeated_bands) > 0: indexes_to_drop, rename_groups = [], {} for group_merged in idcs_repeated_bands: bands.loc[group_merged[0], 'group_label'] = '+'.join(group_merged) indexes_to_drop += group_merged[1:] rename_groups[group_merged[0]] = group_merged[0] + '_b' if not group_merged[0].endswith("_m") else group_merged[0][:-2] + '_b' if index_dict is None or (rename_groups[group_merged[0]] not in index_dict): _logger.warning(f'\nBlended line "{group_merged[0]}" with merged components has been renamed "{rename_groups[group_merged[0]]}={bands.loc[group_merged[0], "group_label"]}"' f'\nIt is adviced rename via index_dict argument to match future measurements.') bands.drop(indexes_to_drop, inplace=True) bands.rename(rename_groups, inplace=True) if sort: bands.sort_values(by=['wavelength', 'group_label'], inplace=True) if remove_empty_columns: bands = bands.dropna(axis=1, how='all') if index_dict is not None: bands.rename(index=index_dict, inplace=True) return bands def construct_classic_notation(line=None, line_params=None): # From line objects if line is not None: if isinstance(line, Line): for comp in line.list_comps: particle_str = particle_notation(comp.particle, comp.trans) wavelength_str = round_sig_clean(comp.wavelength) units_str = f'{au.Unit(comp.units_wave):latex}'[9:-2].replace(' ', r'\,') comp.latex_label = f'{particle_str}{wavelength_str}{units_str}$' line.latex_label = '+'.join(line.param_arr('latex_label')) else: raise LiMe_Error('To reconstruct the line notation please use introduce a "lime.Line" object or a dictionary ' 'with its parameters.') # From dictionary with the items else: particle_str = particle_notation(Particle.from_label(line_params["particle"]), line_params.get("trans")) wavelength_str = round_sig_clean(line_params["wavelength"]) units_str = f'{au.Unit(line_params["units_wave"]):latex}'[9:-2].replace(' ', r'\,') line_params['latex_label'] = f'{particle_str}{wavelength_str}{units_str}$' return def check_continua_bands(idcs, wave_rest, min_width_pixel = 1, min_sep_cont=2, reset_w2_w5=False): if reset_w2_w5: if idcs[1] > idcs[2]: idcs[1] = idcs[2] - min_sep_cont if idcs[0] >= idcs[1]: idcs[0] = idcs[1] - (min_width_pixel + min_sep_cont) if idcs[4] < idcs[3]: idcs[4] = idcs[3] + min_sep_cont if idcs[5] <= idcs[4]: idcs[5] = idcs[4] + (min_width_pixel + min_sep_cont) # Continua bands beyond the spectral wavelength range if idcs[0] < 0: idcs[0] = 0 if idcs[1] < 0: idcs[1] = idcs[0] + min_width_pixel if idcs[5] > wave_rest.size - 1: idcs[5] = wave_rest.size - min_width_pixel if idcs[4] > wave_rest.size - 1: idcs[4] = idcs[5] - min_width_pixel # One pixel bands width if idcs[0] == idcs[1]: idcs[1] = idcs[0] + min_width_pixel if idcs[2] == idcs[3]: idcs[3] = idcs[2] + min_width_pixel if idcs[4] == idcs[5]: idcs[4] = idcs[5] - min_width_pixel return idcs
[docs] class Particle: """ Representation of an atomic or ionic species used in spectral line definitions. A :class:`Particle` object encodes the physical identity of a species through its label, atomic symbol, and ionization stage. It provides convenience methods for reconstructing these attributes from a shorthand label (e.g., ``"O3"`` → oxygen, doubly ionized). Parameters ---------- label : str, optional Canonical particle label (e.g., ``"H1"``, ``"O3"``, ``"He2"``). This string uniquely identifies the species and ionization stage within LiMe. symbol : str, optional Chemical symbol of the species (e.g., ``"O"`` for oxygen, ``"H"`` for hydrogen). ionization : int, optional Ionization stage of the particle, typically an integer where ``1 = neutral``, ``2 = singly ionized``, etc. Attributes ---------- label : str Canonical identifier for the species (e.g., ``"O3"``). symbol : str Atomic symbol. ionization : int Ionization stage (1 = neutral, 2 = singly ionized, ...). Methods ------- from_label(label) Create a :class:`Particle` from a shorthand label string (e.g., ``"O3"``). __eq__(other) Return ``True`` if two particles (or a particle and a label string) are equivalent. __ne__(other) Return ``True`` if two particles are different. Examples -------- Create a particle manually: >>> Particle(label="O3", symbol="O", ionization=3) O3 Construct a particle automatically from a label string: >>> Particle.from_label("He2") He2 Compare particles: >>> Particle.from_label("O3") == Particle.from_label("O3") True >>> Particle.from_label("O3") == "O2" False """ def __init__(self, label: str = None, symbol: str = None, ionization: int = str): self.label = label self.symbol = symbol self.ionization = ionization return
[docs] @classmethod def from_label(cls, label): """ Create a :class:`Particle` instance from a shorthand label string. This class method parses the label into its elemental symbol and ionization stage using :func:`recover_ionization`. Parameters ---------- label : str Species identifier string (e.g., ``"H1"``, ``"O3"``). Returns ------- Particle A :class:`Particle` instance with the corresponding ``symbol`` and ``ionization`` attributes. Examples -------- >>> Particle.from_label("O3") O3 >>> Particle.from_label("He2").symbol 'He' """ symbol, ionization = recover_ionization(label) return cls(label, symbol, ionization)
def __str__(self): return self.label def __repr__(self): return self.label
[docs] def __eq__(self, other): """Overrides the default implementation""" # Compare with strings if isinstance(other, str): return self.label == other # Compare with other particles if isinstance(other, Particle): if (other.label == self.label) and (other.symbol == self.symbol) and (other.ionization == self.ionization): return True else: return False
[docs] def __ne__(self, other): _inequality = True if isinstance(other, Particle): if (other.label == self.label) and (other.symbol == self.symbol) and (other.ionization == self.ionization): _inequality = False return _inequality
class LineMeasurements: def __init__(self): # Measurements attributes self.intg_flux, self.intg_flux_err = None, None self.peak_wave, self.peak_flux = None, None self.eqw, self.eqw_err = None, None self.eqw_intg, self.eqw_intg_err = None, None self.profile_flux, self.profile_flux_err = None, None self.cont, self.cont_err = None, None self.m_cont, self.n_cont = None, None self.m_cont_err, self.n_cont_err = None, None self.m_cont_err_intg, self.n_cont_err_intg = None, None self.amp, self.center, self.sigma, self.gamma = None, None, None, None self.amp_err, self.center_err, self.sigma_err, self.gamma_err = None, None, None, None self.alpha, self.beta = None, None self.alpha_err, self.beta_err = None, None self.frac, self.frac_err = None, None self.a, self.a_err = None, None self.b, self.b_err = None, None self.c, self.c_err = None, None self.z_line = None self.v_r, self.v_r_err = None, None self.pixel_vel = None self.sigma_vel, self.sigma_vel_err = None, None self.sigma_thermal, self.sigma_instr = None, None self.snr_line, self.snr_cont = None, None self.observations, self.comments = 'no', 'no' self.pixel_mask = 'no' self.n_pixels = None self.FWHM_i, self.FWHM_p, self.FWZI = None, None, None self.w_i, self.w_f = None, None self.v_med, self.v_50 = None, None self.v_5, self.v_10 = None, None self.v_90, self.v_95 = None, None self.v_1, self.v_99 = None, None self.chisqr, self.redchi = None, None self.aic, self.bic = None, None self.pixelWidth = None return def check_measurements_table(df): if df is not None: try: df.at[df.index[0], 'profile_flux'] return True except: return False else: return False
[docs] class Line: """ Spectral line container with metadata, grouping, and measurement hooks. A ``Line`` holds the identifying metadata for a spectral feature (e.g., label, particle/ion, rest wavelength), optional grouping information (for blends or multiplets), and references to kinematics/transition/profile details used by LiMe. For grouped lines, a *reference component* is tracked via ``ref_idx`` to define the line’s representative wavelength. Parameters ---------- label : str Human-readable identifier for the line (e.g., ``"H1_4861A"`` or ``"O3_5007A"``). particle : str or Particle, optional Species identifier. Converted to a :class:`Particle` via ``Particle.from_label(particle)``. wavelength : float, optional Rest (or reference) wavelength of the line. Units given by ``units_wave``. units_wave : str, optional Wavelength units (e.g., ``"Angstrom"``). latex_label : str, optional LaTeX-formatted label for rendering in plots or tables. core : Line, optional Core component of a blended/grouped feature. If ``group == "b"`` (blend) and ``core`` is included in ``list_comps``, its index is used as the reference. group_label : str, optional Group identifier for collections of related components (e.g., blend name). group : str, optional Group type flag. When ``"b"``, the line is treated as part of a blend and the reference component is determined as described in **Notes**. list_comps : list of Line, optional Explicit list of component lines comprising a grouped feature. If ``None``, a single-component list ``[self]`` is used. mask : any, optional User-defined mask/flag for downstream processing. kinem : any, optional Kinematic information (e.g., velocity/dispersion constraints) used by fitters. trans : any, optional Transition descriptor passed through ``recover_transition(self.particle, trans)``. profile : any, optional Line profile model identifier (e.g., Gaussian, Voigt). shape : any, optional Optional shape constraints or metadata for modeling. pixel_mask : str or any, optional Pixel mask mode/flag. Defaults to ``"no"`` when not provided. Attributes ---------- label : str particle : Particle Result of ``Particle.from_label(particle)``. wavelength : float units_wave : str latex_label : str or None core : Line or None group_label : str or None group : str or None list_comps : list of Line Component list; defaults to ``[self]`` when not provided. ref_idx : int Index of the reference component within ``list_comps``. mask : any pixel_mask : str or any Defaults to ``"no"`` if not given. kinem : any trans : any Result of ``recover_transition(self.particle, trans)``. profile : any shape : any measurements : any or None Placeholder for later measurement results (populated downstream). Examples -------- Single, standalone line: >>> Hbeta = Line(label="H1_4861A", particle="H1", wavelength=4861.33, units_wave="Angstrom") Blended feature with explicit core component: >>> comp1 = Line(label="O2_3726A", particle="O2", wavelength=3726.03, units_wave="Angstrom") >>> comp2 = Line(label="O2_3729A", particle="O2", wavelength=3728.82, units_wave="Angstrom") >>> OII_blend = Line(label="O2_3726A", group="b", list_comps=[comp1, comp2], core=comp1) >>> OII_blend.ref_idx # uses core component index """ def __init__(self, label, particle=None, wavelength=None, units_wave=None, latex_label=None, core=None, group_label=None, group=None, list_comps=None, mask=None, kinem=None, trans=None, profile=None, shape=None, pixel_mask=None): self.label = label self.particle = Particle.from_label(particle) self.wavelength = wavelength self.units_wave = units_wave self.latex_label = latex_label self.core = core self.group_label = group_label self.group = group self.list_comps = None self.ref_idx = None self.mask = mask self.pixel_mask = 'no' if pixel_mask is None else pixel_mask self.kinem = kinem self.trans = recover_transition(self.particle, trans) self.profile = profile self.shape = shape # Compile the line components self.list_comps = [self] if list_comps is None else list_comps # Get the reference index: if self.wavelength is not None: if self.group != 'b': self.ref_idx = 0 else: if self.core in self.list_comps: self.ref_idx = list_comps.index(self.core) else: self.ref_idx = 0 # For grouped lines without reference wavelength else: self.ref_idx = 0 if self.ref_idx is None else self.ref_idx self.wavelength = self.list_comps[self.ref_idx].wavelength self.units_wave = self.list_comps[self.ref_idx].units_wave # Add measurements variable self.measurements = None return def __str__(self): return self.label def __repr__(self): return self.label def __eq__(self, var): if isinstance(var, str): return self.label == var elif isinstance(var, self.__class__): return self.label == var.label return False
[docs] @classmethod def from_transition(cls, label, fit_cfg=None, data_frame=None, parent_group_label=None, norm_flux=None, def_shape=None, def_profile=None, verbose=True, warn_missing_db=False): """ Construct a :class:`Line` instance from transition label and optional user input fit_cfg and lines frame. This class method compiles all relevant parameters for a spectral line following a defined hierarchy of input sources. At the lowest level, default values are retrieved from LiMe’s internal line database. These can be overridden by entries in the user-provided lines frame, which in turn are superseded by parameters in the fitting configuration (`fit_cfg`). Finally, label suffixes take the highest precedence. Grouped (blended or merged) lines are recursively reconstructed into the ``list_comps`` attribute. Parameters ---------- label : str Identifier of the target spectral line (e.g., ``"O3_5007A"`` or ``"H1_4861A"``). fit_cfg : dictionary, optional Fitting configuration defining line metadata such as wavelength, particle, and/or group components. This dictionary can be generated from a TOML file, where each transition is defined by its label. Example TOML snippet: data_frame : pandas.DataFrame, optional Table containing line measurement data. The expected columns are: ``"wavelength", "wave_vac", "w1", "w2", "w3", "w4", "w5", "w6", "latex_label", "units_wave", "particle", "trans"`` If any of these columns are missing, the function attempts to recover missing information from LiMe’s default database at ``lime.lineDB``. parent_group_label : str, optional Label for the parent group when constructing component lines of a blend or merged group. Used internally during recursive creation. norm_flux : float, optional Normalization factor for fluxes in the measurements table. def_shape : str, optional Default line shape to use if not provided in the configuration or database. Defaults to ``rsrc_manager.lineDB.get_shape()``. def_profile : str, optional Default line profile model to use if not provided in the configuration or database. Defaults to ``rsrc_manager.lineDB.get_profile()``. verbose : bool, optional If ``True``, print informative messages during line reconstruction. warn_missing_db : bool, optional If ``True``, issue warnings when the line label is not found in the LiMe database. Returns ------- Line A fully constructed :class:`Line` instance. For grouped or blended transitions, this object includes a populated ``list_comps`` of component :class:`Line` objects and a combined ``latex_label``. Notes ----- - The function calls :func:`parse_container_data` to merge information from the configuration, measurement table, and LiMe’s default database. - If ``data_frame`` corresponds to a valid measurements table (verified via :func:`check_measurements_table`), the resulting ``Line`` includes a :class:`LineMeasurements` instance loaded with the corresponding data. - Grouped transitions (blends) are recursively reconstructed via :meth:`Line.from_transition` for each component listed in ``line_params['list_comps']``. Examples -------- Create a single emission line directly from the database: >>> Hbeta = Line.from_transition("H1_4861A") If reading the configuration from a TOML file such as the one below: .. code-block:: toml transitions.O2_3726A_m.wavelength = 3728.484 transitions.O2_7325A_m.wavelength = 7325.000 transitions.O2_7325A_b.wavelength = 7325.000 O2_3726A_m = 'O2_3726A+O2_3729A' O2_3726A_b = 'O2_3726A+O2_3729A' The line can be generated as: >>> fit_cfg_dict = Line.load_cfg("conf.toml") >>> OII = Line.from_transition("O2_3726A_m", fit_cfg=fit_cfg_dict) Or including a lines frame: >>> line = Line.from_transition("O3_5007A", fit_cfg=fit_cfg_dict, data_frame="lines_table.txt") """ # Extract line parameters from the input containers line_params = parse_container_data(label, fit_cfg, data_frame, parent_group_label=parent_group_label, def_shape=rsrc_manager.lineDB.get_shape() if def_shape is None else def_shape, def_profile=rsrc_manager.lineDB.get_profile() if def_profile is None else def_profile, verbose=verbose, warn_missing_db=warn_missing_db) # Check if input dataframe has measurements measurements_check = check_measurements_table(data_frame) # Reconstruct de-blended parent: if measurements_check: if (line_params.get('group') is None) and line_params.get('group_label') and parent_group_label is None: line_params['group'] = 'b' line_params['list_comps'] = line_params['group_label'].split('+') line_params['label'] = line_params.get('group_label').split('+')[0] + '_b' # Get the line parameters for additional components if line_params.get('group'): for i, line_i in enumerate(line_params['list_comps']): line_params['list_comps'][i] = Line.from_transition(line_i, fit_cfg, data_frame, parent_group_label=line_params['group_label'], norm_flux=None, def_shape=def_shape, def_profile=def_profile, verbose=False, warn_missing_db=warn_missing_db) # Combine the latex label line_params['latex_label'] = '+'.join([line_i.latex_label for line_i in line_params['list_comps']]) # Create the line and initiate the measurements container line = cls(**line_params) if parent_group_label is None: if measurements_check: line.load_measurements(data_frame, norm_flux=norm_flux) else: line.measurements = LineMeasurements() return line
def param_arr(self, param): out_arr = [None] * len(self.list_comps) for i, comp in enumerate(self.list_comps): out_arr[i] = getattr(comp, param) return np.array(out_arr) def index_bands(self, wavelength_array, redshift, merge_continua=True, just_band_edges=False): # Check the line has associated bands if self.mask is None: raise LiMe_Error(f'The line {self.label} does include bands. Please select another line or update the database.') # Transform the bands into the observed frame bands_obs_arr = np.atleast_1d(self.mask) * (1 + redshift) # Get the wavelength array and mask wave_arr = wavelength_array.data # Find the indeces of the bands idcs_bands = np.searchsorted(wave_arr, bands_obs_arr) # Return the boolean arrays if not just_band_edges: idcs_line = np.zeros(wave_arr.size, dtype=bool) idcs_line[idcs_bands[2]:idcs_bands[3]] = True idcs_blue = np.zeros(wave_arr.size, dtype=bool) idcs_blue[idcs_bands[0]:idcs_bands[1]] = True idcs_red = np.zeros(wave_arr.size, dtype=bool) idcs_red[idcs_bands[4]:idcs_bands[5]] = True # Check for line pixel masking if self.pixel_mask != 'no': line_mask_limits = format_line_mask_option(self.pixel_mask, wave_arr) idcs_mask = (wave_arr[:, None] >= line_mask_limits[:, 0]) & (wave_arr[:, None] <= line_mask_limits[:, 1]) idcs_valid = ~idcs_mask.sum(axis=1).astype(bool)#[:, None] idcs_line = idcs_line & idcs_valid idcs_blue = idcs_blue & idcs_valid idcs_red = idcs_red & idcs_valid # Output as merged continua bands if merge_continua: return idcs_line, idcs_blue | idcs_red # Output as independent continua bands else: return idcs_line, idcs_blue, idcs_red # Return just the edges of the bands else: return idcs_bands def load_measurements(self, data_frame, norm_flux=None): # Reset the measurements self.measurements = LineMeasurements() # Recover measurements from table list_comps = [self.label] if self.group != 'b' else self.param_arr('label') for j in _RANGE_ATTRIBUTES_FIT: param = _ATTRIBUTES_FIT[j] # Get component parameter param_value = data_frame.loc[list_comps, param].to_numpy() # Convert empty arrays to None if np.issubdtype(param_value.dtype, np.floating) and np.isnan(param_value).all(): param_value = None # Get component parameter if param_value is not None and _LOG_COLUMNS[param][3] == 0: param_value = param_value[0] # Normalize if (norm_flux is not None) and _LOG_COLUMNS[param][0] and (param_value is not None): param_value = param_value / norm_flux # Store in line measurements self.measurements.__setattr__(param, param_value) return def update_labels(self, sig_digits=None): # Get core components wavelength new_label = (f'{self.list_comps[self.ref_idx].particle}' f'_{round_sig_clean(self.wavelength, sig_digits)}' f'{"A" if self.units_wave == "Angstrom" else self.units_wave}') # Group suffix group_str = "_b" if self.group == 'b' else "_m" if self.group == 'm' else "" # Get optional components if self.group != 'b': new_label += f'_k-{self.list_comps[self.ref_idx].kinem}' if self.list_comps[self.ref_idx].kinem != 0 else '' new_label += f'_t-{self.list_comps[self.ref_idx].trans}' if '_t-' in self.label else '' new_label += f'_p-{self.list_comps[self.ref_idx].profile}' if '_p-' in self.label else '' new_label += f'_s-{self.list_comps[self.ref_idx].shape}' if '_s-' in self.label else '' # New label self.label = f'{new_label}{group_str}' # New latex label construct_classic_notation(line=self) return
def parse_container_data(label, fit_cfg, data_frame, parent_group_label, def_shape, def_profile, verbose, warn_missing_db=False): # Review line notation and its group structure line_params = get_line_group(label, fit_cfg, data_frame, parent_group_label, verbose) # Check for the line data on the configuration file line_params = get_line_from_cfg(line_params, fit_cfg) # The configuration parameters overwrite those from the database line_params = get_line_from_df(line_params, data_frame, warn_missing_db) # Review input for missing/default parameter values review_input_params(line_params, def_shape, def_profile) return line_params def get_line_group(label, fit_cfg, data_frame, parent_group_label=None, verbose=True): # Container for the parameters line_params = {} # Numerical label if not isinstance(label, str): label = check_line_in_log(label, rsrc_manager.lineDB.frame if data_frame is None else data_frame) # Review label components items = label.split('_') match len(items): # Single transition case 2: group_type, opt_items = None, None # Wrong format case 0 | 1: raise LiMe_Error(f'The line {label} format is not recognized. Please use a "Particle_WavelengthUnits" format.') # Complex transition (H1_6563A_b) or manual formating (Fe3_4658A_s-abs_p-v) case _: # Blended or merged if items[-1][0] in ['b', 'm']: group_type = items[-1][0] opt_items = None if len(items) == 3 else items[2:-1] else: group_type = None opt_items = items[2:] # Untangle the optional components and update the default values: if opt_items is not None: for opt_i in opt_items: key, value = opt_i.split('-') if _COMPs_KEYS.get(key) is not None: line_params[_COMPs_KEYS[key]] = value else: _logger.warning(f'Line {label} has an unrecognized optional item: {opt_i}.' f' - Please use "_k-", "_t-", "_p-", "_s-".') # Get line core label core = f'{items[0]}_{items[1]}' # Confirm grouped lines have specified their components otherwise set to single if group_type: group_label = None if fit_cfg is None else fit_cfg.get(label, None) # Dict 2_guides have preference over database if (group_label is None) and isinstance(data_frame, pd.DataFrame): group_label = pd_get(data_frame, label, column='group_label', transform='none') else: group_label = None list_comps = None if group_label is None else group_label.split('+') # Warn if the grouped line does not have its component label does not have an entry if group_type is not None and list_comps is None: if verbose: _logger.warning(f'The {label} line has a "_{group_type}" suffix but its group lines components were not found ' f'on the input configuration file or lines database. It will be treated a single line') label = label[:-2] group_type = None # Assign parent group_label if child does not have one if group_label is None and parent_group_label is not None: group_label = parent_group_label # Assign essential keys: line_params['label'] = label line_params['core'] = core # Add group keys if they passed all checks if group_type: line_params['group'] = group_type if group_label: line_params['group_label'] = group_label if list_comps: line_params['list_comps'] = list_comps return line_params def get_line_from_cfg(line_params, fit_cfg): # Get the line information from the input configuration file if present if fit_cfg: # Check for transtitions data if fit_cfg.get('transitions'): # Check for the full label if fit_cfg['transitions'].get(line_params['label']): # line_cfg = {**fit_cfg['transitions'].get(line_params['label']), **line_params} # line_cfg.update(line_params) return {**fit_cfg['transitions'].get(line_params['label']), **line_params} # Check for the core label elif fit_cfg['transitions'].get(line_params['core']): line_cfg = fit_cfg['transitions'].get(line_params['core']) line_cfg.update(line_params) return line_cfg else: return line_params # Check for line mask if fit_cfg.get(f"{line_params['label']}_pixel_mask"): line_params['pixel_mask'] = fit_cfg.get(f"{line_params['label']}_pixel_mask") return line_params else: return line_params def get_line_from_df(line_params, input_df, warn_missing_db=False): # Get the line information from the dataframe if available if input_df is not None: if line_params['label'] in input_df.index: row_name = line_params['label'] elif line_params['core'] in input_df.index: row_name = line_params['core'] else: row_name = None else: row_name = None # Try the lines database if row_name is None and line_params['core'] in rsrc_manager.lineDB.frame.index: input_df = rsrc_manager.lineDB.frame row_name = line_params['core'] # Assign the data if row_name is not None: db_params = dict(particle=pd_get(input_df, row_name, 'particle'), wavelength=pd_get(input_df, row_name, 'wavelength'), units_wave=pd_get(input_df, row_name, 'units_wave'), latex_label=pd_get(input_df, row_name, 'latex_label'), group_label=pd_get(input_df, row_name, 'group_label', transform='none'), trans=pd_get(input_df, row_name, 'trans'), kinem=pd_get(input_df, row_name, 'kinem'), profile=pd_get(input_df, row_name, 'profile'), shape=pd_get(input_df, row_name, 'shape'), ) if 'w1' in input_df.columns: db_params['mask'] = [pd_get(input_df, row_name, 'w1'), pd_get(input_df, row_name, 'w2'), pd_get(input_df, row_name, 'w3'), pd_get(input_df, row_name, 'w4'), pd_get(input_df, row_name, 'w5'), pd_get(input_df, row_name, 'w6')] # Configuration file overwrite dataframe db_params.update(line_params) return db_params else: # Print warning if requested for a line not present on the database if warn_missing_db and line_params.get('group') is None: _logger.warning(f'Line {line_params["label"]} was not found on the database') return line_params def review_input_params(line_params, def_shape, def_profile): # Check core components line_items = line_params['label'].split('_') # Particle if line_params.get('particle') is None: line_params['particle'] = line_items[0] # Wavelength (for reference blended lines we assign the reference index later) if (line_params.get('wavelength') is None) and (line_params.get('group') is None): if line_items[1][-1] == 'A': line_params['units_wave'] = au.Unit('AA') line_params['wavelength'] = float(line_items[1][:-1]) else: au_unit = au.Unit(line_items[1]) line_params['units_wave'] = au_unit.bases[0] line_params['wavelength'] = au_unit.scale # Units: if line_params.get('units_wave') is None: line_items = line_params['label'].split('_') line_params['units_wave'] = au.Unit('AA') if line_items[1][-1] else au.Unit(line_items[1]).bases[0] # Kinematic component if line_params.get('kinem') is None: line_params['kinem'] = 0 else: line_params['kinem'] = int(line_params['kinem']) # Profile component if line_params.get('profile') is None: line_params['profile'] = def_profile # Line shape (emi, abs) if line_params.get('shape') is None: line_params['shape'] = def_shape # Review latex label if line_params.get('group') is None: if line_params.get('latex_label') is None: construct_classic_notation(line_params=line_params) if line_params.get('kinem') > 0 and ('_k-' not in line_params['latex_label']): line_params['latex_label'] = line_params['latex_label'][:-1] + f'_k-{line_params["kinem"]}$' return def round_sig_clean(x, sig=4): if x == 0: return 0 rounded = round(x, sig - int(np.floor(np.log10(abs(x)))) - 1) return int(rounded) if np.isclose(rounded % 1, 0) else rounded