Source code for lime.workflow

import logging

import numpy as np
from pathlib import Path
from astropy.io import fits
from time import time
from lmfit.models import PolynomialModel

import lime
from lime.tools import ProgressBar, join_fits_files, extract_wcs_header, pd_get, au
from lime.rsrc_manager import lineDB
from lime.fitting.lines import LineFitting, signal_to_noise_rola, sigma_corrections, k_gFWHM, velocity_to_wavelength_band, profiles_computation, linear_continuum_computation
from lime.transitions import Line, lines_frame
from lime.retrieve.line_bands import determine_line_groups, groupify_lines_df
from lime.io import check_file_dataframe, check_file_array_mask, log_to_HDU, results_to_log, load_frame, LiMe_Error, check_fit_conf, lime_cfg
from lime.fitting.redshift import RedshiftFitting
from lime.plotting.plots import spec_continuum_calculation
from scipy import stats

try:
    import aspect
    aspect_check = True
except ImportError:
    aspect_check = False


_logger = logging.getLogger('LiMe')


def review_bands(spec, line, min_line_pixels=3, min_cont_pixels=2, user_cont_source='central', user_err_from_bands=False):

    # Check if the line bands are provided
    if line.mask is None:
        _logger.warning(f"Line {line} was not found on the input bands database. It won't be measured")
        return None

    # Check if the line is within the w3, w4 limits
    limit_blue, limit_red = spec.wave.compressed()[0], spec.wave.compressed()[-1]
    if ((line.mask[2] * (1 + spec.redshift)) < limit_blue) or ((line.mask[3] * (1 + spec.redshift)) > limit_red):
        _logger.warning(f"Line {line} bands are outside spectrum wavelengh range: w3 < w_min_rest ({line.mask[2]} < {limit_blue}) or"
                                                                               f" w4 > w_max_rest ({line.mask[3]} > {limit_red})"
                                                                               f" it won't be measured")
        return None

    # Check if the spectrum does not have the error arr but the user has requested it
    if spec.err_flux is None and user_err_from_bands is False:
        _logger.warning(f'The observation does not have an error spectrum but the fit command has requested not to use '
                        f'the adjacent bands to compute the uncertainty. Please set the "user_err_from_bands=True" to perform'
                        f' a measurement.')
        return None

    # Compute the line and adjacent continua indeces:
    idcsEmis, idcsCont = line.index_bands(spec.wave, spec.redshift)

    if user_cont_source == 'fit':
        if spec.cont is None:
            raise LiMe_Error(f"The continuum has not been fit. Please run 'Spectrum.fit.continuum' before measuring lines"
                             f"or change the 'user_cont_source'.")
            idcsCont = idcsEmis

    # Logic for the bands source
    cont_from_bands = True if user_cont_source == 'adjacent' else False

    # Check if all the flux entries are masked
    emis_flux, cont_flux = spec.flux[idcsEmis], spec.flux[idcsCont]
    if np.all(emis_flux.mask):
        _logger.warning(f"Line {line} flux is fully masked. It won't be measured")
        return None
    if np.all(cont_flux.mask) and cont_from_bands:
        _logger.warning(f"Line {line} adjacent continua flux is fully masked. It won't be measured")
        return None

    # Check if all the flux entries are zero
    if not np.any(emis_flux):
        _logger.warning(f"Line {line} flux entries are all 0. It won't be measured")
        return None
    if not np.any(cont_flux) and cont_from_bands:
        _logger.warning(f"Line {line} continuum flux entries are all 0. It won't be measured")
        return None

    # Check if the line selection is too narrow
    if np.sum(~emis_flux.mask) < min_line_pixels:
        _logger.warning(f"Line {line} has only {np.sum(~emis_flux.mask)} pixels. It won't be measured")
        return None

    # Check if the continua selection is too narrow
    if (np.sum(~cont_flux.mask) < min_cont_pixels) and cont_from_bands:
        _logger.warning(f"Line {line} continuum bands have only {np.sum(~cont_flux.mask)} pixels. It won't be measured")
        return None

    return idcsEmis, idcsCont


def import_line_kinematics(line, z_cor, log, fit_conf):

    # Check if imported kinematics come from blended component
    for idx_child, child_label in enumerate(line.list_comps):

        # Check for kinem order
        parent_label = fit_conf.get(f'{child_label}_kinem')
        if (parent_label is not None) and line.group == 'b':

            # Tied kinematics in blended profile
            if parent_label in line.list_comps:
                idx_parent = line.list_comps.index(parent_label)
                factor = f'{line.list_comps[idx_child].wavelength / line.list_comps[idx_parent].wavelength:0.8f}'
                fit_conf[f'{child_label}_center'] = {'expr': f'{factor}*{parent_label}_center'}
                fit_conf[f'{child_label}_sigma'] = {'expr': f'{factor}*{parent_label}_sigma'}

            # Import kinematics from previously measured
            elif parent_label in log.index:
                mu_parent = log.loc[parent_label, ['center', 'center_err']].to_numpy()
                sigma_parent = log.loc[parent_label, ['sigma', 'sigma_err']].to_numpy()
                wave_ratio = line.list_comps[idx_child].wavelength/log.loc[parent_label, 'wavelength']

                center_child_arr = wave_ratio * (mu_parent / z_cor)
                sigma_child_arr = wave_ratio * sigma_parent

                # Store the value on the dictionary
                fit_conf[f'{child_label}_center'] = {'value': center_child_arr[0], 'vary': False}
                fit_conf[f'{child_label}_sigma'] = {'value': sigma_child_arr[0], 'vary': False}

                # Error for the propagation
                fit_conf[f'{child_label}_center_err'] = center_child_arr[1]
                fit_conf[f'{child_label}_sigma_err'] = sigma_child_arr[1]

            # Line has not been measured before found
            else:
                _logger.info(f'\n{parent_label} has not been found on the input lines frame for {child_label} kinematics export.'
                             f'\n - Please check you are using the right line label and that the line has been measured prior '
                             f'prior to the current fitting.')

    return


def check_cube_bands(input_bands, mask_list, fit_cfg):

    if input_bands is None:

        # Recover the mask_configuration as a list
        for mask_name in mask_list:

            mask_fit_cfg = fit_cfg.get(f'{mask_name}_line_fitting')

            missing_mask = False
            if mask_fit_cfg is not None:
                if mask_fit_cfg.get('bands') is None:
                    missing_mask = True
            else:
                missing_mask = True

            if missing_mask:
                error_message = 'No input "bands" provided. In this case you need to include the \n' \
                                f'you need to specify an "bands=log_file_address" entry the ' \
                                f'"[{mask_name}_file]" of your fitting configuration file'
                raise LiMe_Error(error_message)

    return


def recover_level_conf(fit_cfg, mask_key, default_key):

    default_cfg = fit_cfg.get(f'{default_key}_line_fitting') if default_key is not None else None
    mask_cfg = fit_cfg.get(f'{mask_key}_line_fitting') if mask_key is not None else None

    # Case there are not leveled entries
    if (default_cfg is None) and (mask_cfg is None):
        output_conf = fit_cfg

    # Proceed to update the levels
    else:

        # Default configuration
        default_conf = {} if default_cfg is None else default_cfg
        default_detect = default_conf.get('line_detection')

        # Mask conf
        mask_conf = {} if mask_cfg is None else mask_cfg
        mask_detect = mask_conf.get('line_detection')

        # Update the levels
        output_conf = {**default_conf, **mask_conf}

        # If no line detection don't add it
        if mask_detect is not None:
            output_conf['line_detection'] = mask_detect
        elif default_detect is not None:
            output_conf['line_detection'] = default_detect
        else:
            pass

    return output_conf


def check_compound_line_exclusion(line, lines_df):

    # Confirm the dataframe includes the group of lines
    group_label = pd_get(lines_df, line, 'group_label', transform='none', nan_to_none=True)

    # Confirm if the line is in the group of lines
    if group_label is not None:
        comp_list = group_label.split('+')
        measure_check = False if line in comp_list else True
    else:
        measure_check = True

    return measure_check


def continuum_model_fit(x_array, y_array, idcs, degree):

    poly3Mod = PolynomialModel(prefix=f'poly_{degree}', degree=degree)
    poly3Params = poly3Mod.guess(y_array[idcs], x=x_array[idcs])

    try:
        poly3Out = poly3Mod.fit(y_array[idcs], poly3Params, x=x_array[idcs])
        cont_fit = poly3Out.eval(x=x_array)

    except TypeError:
        _logger.warning(f'- The continuum fitting polynomial has more degrees ({degree}) than data points')
        cont_fit = np.full(x_array.size, np.nan)

    return cont_fit


def res_power_approx(wavelength_arr):

    delta_lambda = np.ediff1d(wavelength_arr, to_end=0)
    delta_lambda[-1] = delta_lambda[-2]

    return wavelength_arr/delta_lambda



class SpecRetriever:

    def __init__(self, spectrum):

        self._spec = spectrum

        return

    def lines_frame(self, band_vsigma=70, n_sigma=4, adjust_central_band=True, instrumental_correction=True,
                    exclude_bands_masked=True, map_band_vsigma=None, grouped_lines=None, automatic_grouping=False,
                    Rayleigh_threshold=2, fit_cfg=None, default_cfg_prefix='default', obj_cfg_prefix=None,
                    update_default=True, line_list=None, particle_list=None, sig_digits=4, ref_bands=None,
                    vacuum_waves=False, update_labels=False, update_latex=False, rejected_lines=None, components=None,
                    save_group_label=False):

        # Remove the mask from the wavelength array if necessary
        wave_intvl = self._spec.wave.compressed()

        # Check configuration format
        in_cfg = check_fit_conf(fit_cfg, default_cfg_prefix, obj_cfg_prefix, update_default) if fit_cfg else None

        # Get the parameters from configuration file if provided
        map_band_vsigma = in_cfg['map_band_vsigma'] if in_cfg and ('map_band_vsigma' in in_cfg) else map_band_vsigma
        rejected_lines = in_cfg['rejected_lines'] if in_cfg and ('rejected_lines' in in_cfg) else rejected_lines
        grouped_lines = in_cfg['grouped_lines'] if in_cfg and ('grouped_lines' in in_cfg) else grouped_lines

        # Crop the bands to match the observation
        bands = lines_frame(wave_intvl, line_list, particle_list, redshift=self._spec.redshift,
                            units_wave=self._spec.units_wave, sig_digits=sig_digits, ref_bands=ref_bands,
                            vacuum_waves=vacuum_waves, update_labels=update_labels, update_latex=update_latex,
                            rejected_lines=rejected_lines)

        # Compute the resolving power if necessary
        if self._spec.res_power is not None:
            res_power = self._spec.res_power
        else:
            res_power = res_power_approx(wave_intvl) if (instrumental_correction or fit_cfg is not None) else None

        # Adjust the middle bands to match the line width
        if adjust_central_band:

            # Expected transitions in the observed frame
            lambda_obs = bands.wavelength.to_numpy() * (1 + self._spec.redshift)

            # Add correction for the instrumental broadening
            if instrumental_correction:

                # Indexes for the lines emission peak
                idcs = np.searchsorted(wave_intvl, lambda_obs)

                # Use the instrumental resolution if available
                delta_lambda_inst = lambda_obs / (res_power[idcs] * k_gFWHM)

            # Constant velocity width
            else:
                delta_lambda_inst = 0

            # Use unique or specific velocity sigma for the bands
            if map_band_vsigma is not None:
                band_vsigma = np.full(lambda_obs.size, band_vsigma)
                for idx in bands.index.get_indexer(map_band_vsigma.keys()):
                    if idx > -1:
                        band_vsigma[idx] = map_band_vsigma[bands.index[idx]]

            # Convert to spectral width
            delta_lambda = velocity_to_wavelength_band(n_sigma, band_vsigma, lambda_obs, delta_lambda_inst)

            # Add new values to database in the rest frame
            bands['w3'] = (lambda_obs - delta_lambda) / (1 + self._spec.redshift)
            bands['w4'] = (lambda_obs + delta_lambda) / (1 + self._spec.redshift)

        # Remove from the output bands those which have all their pixels masked
        if exclude_bands_masked:
            idcs_w3_w4 = np.searchsorted(self._spec.wave.data/(1+self._spec.redshift), bands.loc[:, 'w3':'w4'])
            idcs_valid = [idx for idx, start, end in zip(bands.index, idcs_w3_w4[:, 0], idcs_w3_w4[:, 1])
                          if not np.all(self._spec.flux[start:end].mask)]
            bands = bands.loc[idcs_valid]

        # Combine the blended/merged lines in the bands table
        if in_cfg:

            # Determine the line groups
            groups_dict = determine_line_groups(self._spec, bands, in_cfg, grouped_lines, automatic_grouping, n_sigma,
                                                Rayleigh_threshold)

            # Apply the changes
            groupify_lines_df(bands, in_cfg, groups_dict, self._spec, save_group_label)

        # Filter the table to match the line detections
        if components:
            if aspect_check:
                if self._spec.infer.pred_arr is not None:

                    # Create masks for all intervals
                    starts = bands.w3.to_numpy()[:, None] * (1 + self._spec.redshift)
                    ends = bands.w4.to_numpy()[:, None] * (1 + self._spec.redshift)

                    # Check if x values fall within each interval
                    in_intervals = (self._spec.wave.data >= starts) & (self._spec.wave.data < ends)

                    # Check where y equals the target category
                    shape_indexes = [aspect.cfg['shape_number'][comp] for comp in components]
                    is_target_category = np.isin(self._spec.infer.pred_arr, shape_indexes)

                    # Combine the masks to count target_category occurrences in each interval
                    counts = np.sum(in_intervals & is_target_category, axis=1)

                    # Check which intervals satisfy the minimum count condition
                    idcs = counts >= 4
                    bands = bands.loc[idcs]

                else:
                    raise LiMe_Error(f'No components data. Please run the components detection algorithm')
            else:
                raise LiMe_Error(f'Aspect is not installed')

        return bands

    def spectrum(self, fname=None, line_label=None, ref_frame=None, split_components=False, **kwargs):

        # Headers for the default list
        headers = np.array(["wave", "flux", "err_flux", "pixel_mask"])

        # Use the observation frame if none is provided
        frame = self._spec.frame if ref_frame is None else ref_frame

        # By default report complete spectrum
        idcs = (0, None)

        # If a line is provided get indexes for the bands limits
        line_measured = False
        if line_label is not None:
            if frame is not None:
                if line_label in frame.index:
                    bands_limits = frame.loc[line_label, 'w1':'w6']
                    idcs_bands = np.searchsorted(self._spec.wave.data, bands_limits * (1 + self._spec.redshift))
                    idcs = (idcs_bands[0], idcs_bands[5])
                    line_measured = True
                else:
                    _logger.warning(f'Line {line_label} not found on observation frame')
            else:
                _logger.warning(f'No lines measured on object')

        # Compute the bands
        if line_measured:

            # Declare line object and the components and its components from the frame
            line = Line.from_transition(line_label, data_frame=frame)
            line_list = line.list_comps

            # Compute the linear components
            gaussian_arr = profiles_computation(line_list, frame, 1 + self._spec.redshift, line.profile,
                                                x_array=self._spec.wave.data[idcs[0]: idcs[1]])
            linear_arr = linear_continuum_computation(line_list, frame, 1 + self._spec.redshift, x_array=self._spec.wave.data[idcs[0]: idcs[1]])

            # Determine which component you want to extract:
            if split_components is False:
                gaussian_arr = gaussian_arr.sum(axis=1) + linear_arr[:, 0]
                gaussian_arr = gaussian_arr.reshape(-1, 1)
                line_hdrs = [line_label]
            else:
                gaussian_arr = gaussian_arr + linear_arr[:, 0][:, np.newaxis]
                line_hdrs = line_list

            # Add the line list to the headers
            headers = np.append(headers, line_hdrs)

        # Container for the data
        out_arr = np.full((self._spec.wave.data[idcs[0]: idcs[1]].size, len(headers)), np.nan)

        # Fill the array:
        out_arr[:, 0] = self._spec.wave.data[idcs[0]: idcs[1]]
        out_arr[:, 1] = self._spec.flux.data[idcs[0]: idcs[1]] * self._spec.norm_flux

        # Err array if it exists
        if self._spec.err_flux is not None:
            out_arr[:, 2] = self._spec.err_flux[idcs[0]: idcs[1]].data * self._spec.norm_flux

        # Pixel mask if any is invalid
        if np.any(self._spec.wave.mask):
            out_arr[:, 3] = self._spec.wave[idcs[0]: idcs[1]].mask

        # Add the components
        if line_measured:
            for i, line_comp in enumerate(line_hdrs):
                out_arr[:, 4 + i] = gaussian_arr[:, i]

        # Crop array if some columns are missing
        nan_columns = np.zeros(out_arr.shape[1]).astype(bool)
        nan_columns[:4] = np.all(np.isnan(out_arr[:, :4]), axis=0)
        out_arr = out_arr[:, ~nan_columns]

        # Headers
        headers = headers[~nan_columns]

        # Formatting for the data
        spec_hdrs_list = ['%.18e', '%.18e', '%.18e', '%d']
        spec_hdrs_list = spec_hdrs_list + ['%.18e'] * len(line_hdrs) if line_measured else spec_hdrs_list
        array_fmt = np.array(spec_hdrs_list)
        array_fmt = list(array_fmt[~nan_columns])

        # Update defaults with user-provided values
        default_kwargs = {"fmt": array_fmt, "delimiter": ' '}
        default_kwargs.update(kwargs)

        # Create header
        if default_kwargs.get('header') is None:
            default_kwargs['header'] = default_kwargs['delimiter'].join(headers)

        # Dictionary with parameters
        if 'footer' not in default_kwargs:
            footer_dict = {'LiMe': f"v{lime_cfg['metadata']['version']}",
                            'units_wave': self._spec.units_wave, 'units_flux':  self._spec.units_flux,
                            'redshift': self._spec.redshift, 'norm_flux': self._spec.norm_flux, 'id_label': self._spec.label}
            footer_str = "\n".join(f"{key}:{value}" for key, value in footer_dict.items())
            default_kwargs['footer'] = footer_str

        # Return a recarray with the spectrum data
        if fname is None:
            output = np.core.records.fromarrays([out_arr[:, i] for i in range(out_arr.shape[1])], names=list(headers))

        # Save to a file
        else:
            np.savetxt(fname, out_arr, **default_kwargs)
            output = None

        return output

    def rebinned(self, disp_intvl=None, pixel_width=None, pixel_number=None, constant_pixel_width=True,
                 return_spectrum=False):

        """
        Rebin the spectrum onto a new dispersion grid and return binned flux (and errors).

        Exactly **one** of ``disp_intvl``, ``pixel_width``, or ``pixel_number`` must be
        provided to define the target bins:

        - ``disp_intvl``: explicit bin **edges** (1D array, strictly increasing).
        - ``pixel_width``: constant bin width (same units as wavelength); bins span the
          full observed range ``[wave[0], wave[-1])``.
        - ``pixel_number``: group pixels by this integer factor. If ``constant_pixel_width``
          is ``True`` (default), uses an average native dispersion to build uniform-width
          bins; otherwise, takes every ``pixel_number``-th native wavelength as an edge
          (non-uniform bin widths).

        The binned flux is the mean flux per bin. If an uncertainty array is available,
        per-bin errors are combined as the square-root of the sum of variances divided
        by the number of contributing samples.

        Parameters
        ----------
        disp_intvl : array-like, optional
            Monotonic array of bin **edges** for the target dispersion grid. If provided,
            it overrides the other arguments. Values outside the native spectral range
            will trigger a warning.
        pixel_width : float, optional
            Desired constant bin width (in wavelength units). Used to generate uniform
            bin edges covering the full native wavelength range.
        pixel_number : int, optional
            Number of native pixels to aggregate per bin. Must be ``>= 2``. If a
            non-integer is given, it is rounded and a message is logged.
        constant_pixel_width : bool, optional
            When rebinned by ``pixel_number``:
            - If ``True`` (default), compute a uniform bin width as the average native
              spacing times ``pixel_number``.
            - If ``False``, construct edges by taking every ``pixel_number``-th native
              wavelength (non-uniform bins).
        return_spectrum : bool, optional
            If ``False`` (default), return the binned wavelength, flux and flux uncertainty arrays.
            If ``True``, return a new :class:`lime.Spectrum` instance containing the binned data.

        Returns
        -------
        disp_centers : numpy.ndarray
            Binned wavelength array (same units as disp_intvl if provided).
        flux_binned : numpy.ndarray
            Binned flux array.
        err_binned : numpy.ndarray or None
            Binned flux uncertainty array.
        spectrum : lime.Spectrum, optional
            Returned only if ``return_spectrum=True``. A new spectrum object with the binned data.

        Raises
        ------
        ValueError
            If none or more than one of ``disp_intvl``, ``pixel_width``, ``pixel_number``
            are provided, or if ``pixel_number < 2``.

        Notes
        -----
        - ``disp_intvl`` is interpreted as **edges** (length = ``nbins + 1``); the
          returned ``disp_centers`` are computed as edge midpoints.
        - Binning uses ``scipy.stats.binned_statistic(..., statistic='mean')`` for flux.
        - If the spectrum contains masked or NaN values, ensure they are appropriately
          handled upstream; NaNs within a bin will propagate to the mean.
        """

        # Check only one approach is necessary
        provided = {"dispersion inteval": disp_intvl, "pixel width": pixel_width, "pixel number": pixel_number}
        active = [name for name, val in provided.items() if val is not None]

        # Enforce only one rebinning array
        if len(active) == 0:
            raise ValueError("No arguments were provided to compute the new spectral resolution")

        if len(active) > 1:
            raise ValueError(f"Arguments {active} are mutually exclusive. Please only one.")

        # Extract the spectrum data
        mask_arr = self._spec.wave.mask
        wave_arr = self._spec.wave.data
        flux_arr = self._spec.flux.data

        # Check the input disespersion interval
        if disp_intvl is not None:

            # Limit warnings
            if disp_intvl[0] < wave_arr[0]:
                _logger.warning(f'The input lower dispersion value is below the spectral range: disp_intvl {disp_intvl[0]} < {wave_arr[0]}')
            if disp_intvl[-1] > wave_arr[-1]:
                _logger.warning(f'The input higher dispersion value is above the spectral range: disp_intvl {disp_intvl[-1]} > {wave_arr[-1]}')
            bin_width = np.nanmean(np.diff(disp_intvl))

        else:

            # Compute the wavelength range based on the pixel width
            if pixel_width is not None:
                disp_intvl = np.arange(wave_arr[0], wave_arr[-1], pixel_width)
                bin_width = pixel_width

            # Compute the wavelength range based on a number of pixels
            else:

                # Make sure the inputs make sense good values
                if pixel_number is not None and pixel_number >= 2:
                    if not float(pixel_number).is_integer():
                        _logger.info(f'The input pixel number has been rounded from {pixel_number} to {round(pixel_number)}')
                    pixel_number = round(pixel_number)
                else:
                    raise ValueError(f" In the number of pixels rebinning you need the input value must be above 1.")

                # Calculate the difference:
                if constant_pixel_width:
                    bin_width = np.nanmean(np.diff(wave_arr)) * pixel_number
                    disp_intvl = np.arange(wave_arr[0], wave_arr[-1], bin_width)
                else:
                    disp_intvl = wave_arr[pixel_number::pixel_number]
                    bin_width = disp_intvl[-1] - disp_intvl[-2]

        # Make the binning calculation
        flux_binned, edges, binnumber = stats.binned_statistic(wave_arr, flux_arr, statistic='mean', bins=disp_intvl)
        disp_intvl = disp_intvl[:-1] + bin_width/2

        if self._spec.err_flux is not None:
            err_arr = self._spec.err_flux.data
            sum_sq_errors = np.bincount(binnumber, weights=err_arr ** 2)
            bin_counts = np.bincount(binnumber)
            N_bins = flux_binned.size
            sum_sq_errors_filtered = sum_sq_errors[1: N_bins + 1]
            bin_counts_filtered = bin_counts[1: N_bins + 1]
            err_binned = np.sqrt(sum_sq_errors_filtered) / bin_counts_filtered
        else:
            err_binned = None

        if not return_spectrum:
            return disp_intvl, flux_binned, err_binned

        else:
            return lime.Spectrum(input_wave=disp_intvl,
                                 input_flux=flux_binned * self._spec.norm_flux if self._spec.norm_flux else flux_binned,
                                 input_err=err_binned * self._spec.norm_flux if self._spec.norm_flux else err_binned,
                                 redshift=self._spec.redshift, res_power=self._spec.res_power,
                                 units_wave=self._spec.units_wave, units_flux=self._spec.units_flux,
                                 norm_flux=self._spec.norm_flux)

    def normalization(self, return_spectrum=False, **kwargs):

        """
        Normalize the spectrum by its continuum.

        If the continuum has not been previously computed (``self._spec.cont is None``),
        this method automatically fits it by calling
        ``self._spec.fit.continuum(**kwargs)`` before performing the normalization.

        The normalized flux is defined as ``flux / continuum``. If a flux uncertainty
        array is available, the normalized uncertainty is propagated assuming
        independent errors in the flux and continuum.

        Parameters
        ----------
        return_spectrum : bool, optional
            If ``False`` (default), return the normalized flux and uncertainty arrays.
            If ``True``, return a new :class:`lime.Spectrum` instance containing the
            normalized data.
        **kwargs
            Additional keyword arguments passed directly to
            ``lime.Spectrum.fit.continuum`` when the continuum needs to be computed.

        Returns
        -------
        flux_norm : astropy.units.Quantity
            Continuum-normalized flux array.
        err_norm : astropy.units.Quantity or bool
            Uncertainty of the normalized flux. If the original spectrum does not
            contain a flux uncertainty array, this returns ``False``.
        spectrum : lime.Spectrum, optional
            Returned only if ``return_spectrum=True``. A new spectrum object with
            dimensionless flux units and ``norm_flux=1``.

        Notes
        -----
        - The uncertainty propagation follows:

          ``σ(F/C) = |F/C| * sqrt[(σ_F / F)^2 + (σ_C / C)^2]``

          where ``F`` is the flux and ``C`` is the continuum.
        - The returned spectrum preserves the original wavelength grid, redshift,
          spectral resolution, and pixel mask.
        - The continuum is computed only once and cached in the parent spectrum.
        """

        # Compute the object continuum if not provided
        if self._spec.cont is None:
            self._spec.fit.continuum(**kwargs)

        # Normalize the flux
        flux_norm = self._spec.flux / self._spec.cont

        # Normalize the flux uncertainty
        if self._spec.err_flux is not None:
            err_norm = np.abs(flux_norm) * np.sqrt((self._spec.err_flux / self._spec.flux) ** 2 +
                                                   (self._spec.cont_std / self._spec.cont) ** 2)
        else:
            err_norm = False

        # Return the normalized flux and uncertainty array
        if not return_spectrum:
            return flux_norm, err_norm

        # Return a LiMe spectrum
        else:
            return lime.Spectrum(self._spec.wave.data, flux_norm.data, err_norm.data, redshift=self._spec.redshift,
                                 units_wave=self._spec.units_wave, units_flux=au.dimensionless_unscaled, norm_flux=1,
                                 res_power=self._spec.res_power, pixel_mask=flux_norm.mask)

    def upper_line_limit(self, line, bands=None, signal_to_noise=8, err_from_bands=False, continua_sigma=True):

        # Use frame if available
        bands = self._spec.frame if bands is None else check_file_dataframe(bands)

        # Check if line is avaible to calculations
        if (bands is None) or (line not in bands.index):
            raise LiMe_Error('Upper flux limit requires the line band. Please input the lines frame or include in the '
                             'spec.lines_frame measurements' if bands is None else f'Line "{line}" not found in the input bands')

        line = Line.from_transition(line, data_frame=bands)
        idcs_central, idcs_continua = line.index_bands(self._spec.wave, self._spec.redshift)
        lambda_central = line.wavelength * (1 + self._spec.redshift)

        if err_from_bands:
            sigma_cont = np.mean(self._spec.err_flux[idcs_central]) * self._spec.norm_flux
        else:
            sigma_cont = line.measurements.cont_err

        print(line.measurements.intg_flux)
        print(line.measurements.profile_flux)

        delta_lambda = np.mean(np.diff(self._spec.wave[idcs_central]))

        R_line = np.mean(self._spec.res_power[idcs_central])

        g_const = np.sqrt(np.pi/(2*np.log(2)))

        upper_flux = signal_to_noise * sigma_cont * np.sqrt(g_const * (lambda_central / R_line) * delta_lambda)

        print('lambda_central', lambda_central)
        print('delta_lambda',delta_lambda)
        print('sigma_cont',sigma_cont)
        print('R_line',R_line)
        print('g_const',g_const)
        print('upper_flux',upper_flux)

        return upper_flux


class SpecTreatment(LineFitting, RedshiftFitting):

    def __init__(self, spectrum):

        # Instantiate the dependencies
        LineFitting.__init__(self)

        # Lime spectrum object with the scientific data
        self._spec = spectrum
        self.line = None
        self._i_line = 0
        self._n_lines = 0

[docs] def bands(self, label, bands=None, fit_cfg=None, min_method='least_squares', profile=None, shape=None, cont_source='central', err_from_bands=None, temp=10000.0, default_cfg_prefix='default', obj_cfg_prefix=None, update_default=True): """ Fit a spectral line from defined bands (see :ref:`bands documentation <line-bands-doc>`.). This method performs a full line measurement and profile fitting. The line is query from the default line database if no ``bands`` dataframe is provided. The function will also query the input ``fit_cfg`` dictionary if provided for the configuration settings. Parameters ---------- label : str or float Line label in `LiMe notation <https://lime-stable.readthedocs.io/en/latest/inputs/n_inputs2_line_labels.html>`_, or a transition wavelength (in the same units as the spectrum). If a numeric wavelength is given, the corresponding transition is queried from the ``bands`` table (or falling back to the default database if not provided). bands : pandas.DataFrame, str, or pathlib.Path, optional Either: * A bands DataFrame or file path to one. * If ``None``, the default LiMe bands database is used. fit_cfg : dict, str, or pathlib.Path, optional A dictionary or a path to a .toml file. min_method : str, optional Minimization algorithm used by :mod:`lmfit`. Supported methods are listed in the `lmfit.minimizer.Minimizer.minimize <https://lmfit.github.io/lmfit-py/fitting.html#lmfit.minimizer.Minimizer.minimize>`_ documentation. Default is ``"least_squares"``. profile : str, optional Profile type for fitting (e.g., ``"g"`` for Gaussian, ``"l"`` Lorentz, ...). If none is provided, the algorithm will use the default profile from the lines' database. shape : str, optional Line shape for the fitted profiles: "emi" for emission or "abs" for absroption. If none is provided, the algorithm will use the default shape from the lines' database. cont_source : {'central', 'adjacent', 'fitted'}, optional Method used to estimate the continuum level for line fitting. - ``'central'`` — use the edges of the central line band (w₃–w₄). - ``'adjacent'`` — use the adjacent continuum bands (w₁–w₂ and w₅–w₆). - ``'fitted'`` — use a previously fitted continuum model from the spectrum. The default is ``'central'``. err_from_bands : bool or None, optional If ``True``, estimate the pixel uncertainty from the continuum bands. If ``None``, use the spectrum’s ``err_flux`` data or fall back to the continuum regions if not available (False). The default value is None. temp : float, optional Electron temperature in Kelvin used to compute the thermal broadening correction for the fitted lines. Default is 10,000 K. default_cfg_prefix : str, optional Section key prefix for the default configuration in ``fit_cfg``. Default is ``"default"``. obj_cfg_prefix : str, optional Section key prefix for the object-specific configuration in ``fit_cfg``. update_default : bool, optional If ``True`` (default), merge parameters from ``obj_cfg_prefix`` into ``default_cfg_prefix``. If ``False``, the object configuration is used falling back to the default if not available. Returns ------- None The results are stored in the spectrum’s internal ``frame`` attribute and in ``self.line.measurements``. Notes ----- - The method performs the following steps: 1. Parse or retrieve the line definition using :meth:`~lime.Line.from_transition`. 2. Select line and continuum regions based on the provided or default ``bands``. 3. Estimate the continuum level and its uncertainty. 4. Compute non-parametric line properties (e.g., integrated flux). 5. Perform optional profile fitting via :mod:`lmfit` using ``min_method``. 6. Apply instrumental and thermal corrections to measured line widths. 7. Recalculate the signal-to-noise ratio and store all results in the spectrum’s log frame. - Continuum and error estimations are controlled via the ``cont_from_bands`` and ``err_from_bands`` flags. - Thermal broadening corrections use the provided ``temp`` parameter. - All results are written to the current spectrum’s measurement log and accessible through ``Spectrum.frame``. Examples -------- Fit a Gaussian emission line using the default configuration: >>> spec.bands("O3_5007A") Use a custom bands table and configuration dictionary: >>> spec.bands("H1_4861A", bands="my_bands.xlsx", fit_cfg=my_fit_cfg) Change the minimization algorithm and temperature: >>> spec.bands("O2_3726A", min_method="nelder", temp=12000) Fit a line providing the central wavelength directly: >>> spec.bands(5007.0, bands=my_bands_df) """ # Make a copy of the fitting configuration input_conf = check_fit_conf(fit_cfg, default_cfg_prefix, obj_cfg_prefix, update_default) # User 2_guides override default behaviour for the pixel error and the continuum calculation err_from_bands = True if (err_from_bands is None) and (self._spec.err_flux is None) else err_from_bands # Interpret the input line if isinstance(label, str): self.line = Line.from_transition(label, input_conf, data_frame=lineDB.frame if bands is None else check_file_dataframe(bands, copy_input=False), def_shape=shape, def_profile=profile) else: self.line = label # Check the line selection is valid idcs_selection = review_bands(self._spec, self.line, user_cont_source=cont_source, user_err_from_bands=err_from_bands) if idcs_selection is not None: # Unpack the line selections idcs_line, idcs_continua = idcs_selection # Compute line continuum cont_arr = self.continuum_calculation(idcs_line, idcs_continua, cont_source, err_from_bands) # Compute line flux error pixel_err_arr = self.pixel_error_calculation(idcs_continua, err_from_bands) # Non-parametric measurements self.integrated_properties(self.line, self._spec.wave[idcs_line], self._spec.flux[idcs_line], pixel_err_arr[idcs_line], cont_arr[idcs_line]) # Import kinematics if requested import_line_kinematics(self.line, 1 + self._spec.redshift, self._spec.frame, input_conf) # Profile fitting measurements idcs_fitting = idcs_selection[0] if cont_source == 'central' else idcs_selection[0] + idcs_selection[1] self.profile_fitting(self.line, x_arr=self._spec.wave[idcs_fitting], y_arr=self._spec.flux[idcs_fitting], err_arr=pixel_err_arr[idcs_fitting], cont_arr=cont_arr[idcs_fitting] if cont_source == 'fit' else None, user_conf=input_conf, fit_method=min_method) # Instrumental and thermal corrections for the lines sigma_corrections(self.line, idcs_line, self._spec.wave[idcs_line], self._spec.res_power, temp) # Recalculate the SNR with the profile parameters self.line.measurements.snr_line = signal_to_noise_rola(self.line.measurements.amp, self.line.measurements.cont_err, self.line.measurements.n_pixels) # Save the line parameters results_to_log(self.line, self._spec.frame, self._spec.norm_flux) return
[docs] def frame(self, bands, fit_cfg=None, min_method='least_squares', profile=None, shape=None, cont_source='central', err_from_bands=None, temp=10000.0, line_list=None, default_cfg_prefix='default', obj_cfg_prefix=None, update_default=True, line_detection=False, plot_fit=False, progress_output='bar'): """ Measure multiple spectral lines from a bands dataframe (see :ref:`bands documentation <line-bands-doc>`). This method automates the fitting of the lines on the input lines frame. It iterates through all listed transitions, performing continuum estimation, line detection (optional), and profile fitting using the configuration provided in ``fit_cfg``. Parameters ---------- bands : pandas.DataFrame, str, or pathlib.Path Bands table defining the line labels and bands limits (w1 to w6) for each line. fit_cfg : dict, str, or pathlib.Path, optional Fitting configuration dictionary or a path to a TOML configuration file. See the `profile fitting documentation <https://lime-stable.readthedocs.io/en/latest/inputs/n_inputs4_fit_configuration.html>`_. min_method : str, optional Minimization algorithm used by :mod:`lmfit`. See the `lmfit.minimizer.Minimizer.minimize <https://lmfit.github.io/lmfit-py/fitting.html#lmfit.minimizer.Minimizer.minimize>`_ documentation for available options. Default is ``"least_squares"``. profile : str, optional Profile type for fitting (e.g., ``"g"`` for Gaussian, ``"l"`` for Lorentzian). Defaults to the line database entry if omitted. shape : str, optional Line shape keyword (``"emi"`` for emission or ``"abs"`` for absorption). Defaults to the line database entry if omitted. cont_source : {'central', 'adjacent', 'fitted'}, optional Method used to estimate the continuum level for line fitting. - ``'central'`` — use the edges of the central line band (w₃–w₄). - ``'adjacent'`` — use the adjacent continuum bands (w₁–w₂ and w₅–w₆). - ``'fitted'`` — use a previously fitted continuum model from the spectrum. The default is ``'central'``. err_from_bands : bool or None, optional If ``True``, estimate the pixel uncertainty from the continuum bands. If ``None``, use the spectrum’s ``err_flux`` data or fall back to the continuum regions if not available (False). The default value is None. temp : float, optional Electron temperature (K) used to compute the thermal broadening correction. Default is 10,000 K. line_list : list of str, optional Subset of line labels from the bands table to process. If ``None``, all entries in ``bands`` are measured. default_cfg_prefix : str, optional Section key prefix for the default configuration in ``fit_cfg``. Default is ``"default"``. obj_cfg_prefix : str, optional Section key prefix for the object-specific configuration in ``fit_cfg``. update_default : bool, optional If ``True`` (default), merge parameters from ``obj_cfg_prefix`` into ``default_cfg_prefix``. If ``False``, the object configuration is used falling back to the default if not available. line_detection : bool, optional If ``True``, run the continuum fitting and line threshodling to confirme the presence of lines before measurements. The functions parameters must be specified in the ``fit_cfg`` (e.g., entries under ``"peaks_troughs"``, ``"continuum"``). Default is ``False``. plot_fit : bool, optional If ``True``, display the profile fit after each iteration. progress_output : {"bar", "counter", None}, optional Controls progress display in the console. - ``"bar"`` (default): show a dynamic progress bar. - ``"counter"``: print current line number and label. - ``None``: suppress console output. Returns ------- None The resulting measurements are stored in the spectrum’s internal ``frame`` attribute and in ``self.line.measurements``. Notes ----- - This method performs the following sequence for each line: 1. Optionally apply continuum and line detection preprocessing steps if enabled via ``line_detection=True`` and the appropicate keys are found at ``fit_cfg``. 2. Retrieve the line list from the ``bands`` table or the default database. 3. Estimate the continuum level and its uncertainty. 4. Perform non-parametric measurements (e.g., flux, EW, FWHM). 5. Run profile fitting using :mod:`lmfit` according to ``min_method``. 6. Apply instrumental and thermal width corrections (via ``Spectrum.res_power`` and ``temp``). 7. Recalculate SNR and store results in the spectrum’s log frame. - Progress reporting is configurable through ``progress_output``. - Use ``line_detection=True`` to automatically threshold and select only detected lines before fitting. Examples -------- Measure all lines from a bands file: >>> spec.frame("my_bands.xlsx", fit_cfg="my_fit_config.toml") Run the fit with a progress bar: >>> spec.frame(bands_df, progress_output="bar") Limit to a subset of lines: >>> spec.frame(bands_df, line_list=["O3_5007A", "H1_4861A"]) Enable automatic line detection: >>> spec.frame(bands_df, line_detection=True) """ # Check if the lines log is a dataframe or a file address bands = check_file_dataframe(bands) if bands is not None: # Crop the analysis to the target lines if line_list is not None: idcs = bands.index.isin(line_list) bands = bands.loc[idcs] # Load configuration input_conf = check_fit_conf(fit_cfg, default_cfg_prefix, obj_cfg_prefix, update_default=update_default, line_detection=line_detection) # Line detection if requested if line_detection: # Review the configuration entries cont_fit_conf = input_conf.get('continuum', None) if cont_fit_conf: self._spec.fit.continuum(**cont_fit_conf) else: _logger.warning(f'No "continuum" entry in input configuration file. No continuum fitting will be applied') # Perform the line detection detect_conf = input_conf.get('peaks_troughs', {}) if detect_conf: bands = self._spec.infer.peaks_troughs(bands, **detect_conf) else: _logger.warning(f'No "peaks_troughs" entry in input configuration file. No line thresholding will be applied') else: cont_fit_conf, detect_conf = None, None # Define lines to treat through the lines label_list = bands.index.to_numpy() self._n_lines = label_list.size # Loop through the lines if self._n_lines > 0: # On screen progress bar pbar = ProgressBar(progress_output, f'{self._n_lines} lines') if progress_output is not None: print(f'\nLine fitting progress{" (continuum fitting)" if cont_fit_conf is not None else ""}' f'{" (line detection)" if detect_conf is not None else ""}:') for self._i_line in np.arange(self._n_lines): # Ignore line if part of a blended/merge group line = label_list[self._i_line] measure_check = check_compound_line_exclusion(line, bands) if measure_check: # Progress message pbar.output_message(self._i_line, self._n_lines, pre_text="", post_text=f'({line})') # Fit the lines self.bands(line, bands, input_conf, min_method, profile, shape, cont_source=cont_source, err_from_bands=err_from_bands, temp=temp, obj_cfg_prefix=None, default_cfg_prefix=None) if plot_fit: self._spec.plot.bands() else: msg = f'No lines were measured from the input dataframe:\n - line_list: {line_list}\n - line_detection: {line_detection}' _logger.debug(msg) else: _logger.info(f'Not input dataframe. Lines were not measured') return
[docs] def continuum(self, degree_list, emis_threshold, abs_threshold=None, smooth_scale=None, exclude_intvls=None, rest_intvls=False, plot_steps=False, **kwargs): """ Fit the spectrum continuum via polynomial clipping. This routine estimates the continuum by iteratively fitting polynomials and sigma-clipping outliers above (emission) and below (absorption) a flux threshold. At each iteration, points outside configurable residual thresholds are excluded and the polynomial is refitted on the remaining pixels. The user may optionally provide a list of wavelength intervals to be excluded from the continuum fitting. By default, these limits are assumed to be defined in the observed frame. Parameters ---------- degree_list : list of int Polynomial degree to use at each iteration. The number of iterations equals ``len(degree_list)``. emis_threshold : list of float Upper (emission-side) clipping factors, in units of number of residual standard deviation for each iteration. Must have the same length as ``degree_list``. abs_threshold : list of float, optional Lower (absorption-side) clipping factors, also in units of number of residual standard deviation. If ``None``, the values in ``emis_threshold`` are reused for the lower limit. Must match the length of ``degree_list`` when provided. smooth_scale : int, optional Window size (in pixels) for a moving-average smoothing applied to the input flux before fitting. If ``None``, no smoothing is applied. exclude_intvls : list of tuple(float, float), optional List of wavelength intervals (low, high) to exclude from the continuum fitting. By default, intervals are interpreted in the observed frame. rest_intvls : bool, optional If ``True``, the wavelength intervals in ``exclude_intvls`` are assumed to be defined in the rest frame and are converted to the observed frame using ``λ_obs = λ_rest × (1 + z)`` prior to mask computation. plot_steps : bool, optional If ``True``, display a diagnostic plot after each iteration showing the current fit, clipping limits, and kept/rejected pixels. **kwargs Additional keyword arguments forwarded to the plotting helper if ``plot_steps=True`` (e.g., figure size, axis, title customization). Returns ------- None The method updates the spectrum in place, setting: - ``self._spec.cont`` : masked array of the final continuum model - ``self._spec.cont_std`` : float, standard deviation of residuals on kept pixels Notes ----- - **Initialization:** The first iteration seeds the mask using the 16th–84th percentile flux range of unmasked pixels, then fits the initial polynomial. - **Clipping:** After each fit, residuals are computed and the standard deviation is measured over currently kept pixels. New keep/reject limits are: ``low = model - abs_threshold[i] * std`` and ``high = model + emis_threshold[i] * std``. - **Masking:** Existing pixel masks are honored; clipping only modifies the continuum-selection mask on top of the original flux mask. - **Smoothing:** When ``smooth_scale`` is provided, a boxcar (length ``smooth_scale``) is convolved with the flux prior to fitting; the continuum itself is always evaluated on the original wavelength grid. Examples -------- Fit a three-iteration continuum with increasingly restrictive clipping: >>> degrees = [1, 2, 2] >>> thr_hi = [5.0, 3.0, 2.0] # emission-side thresholds (σ) >>> thr_lo = [5.0, 3.0, 2.0] # absorption-side thresholds (σ) >>> spec.fit.continuum(degrees, thr_hi, abs_threshold=thr_lo, smooth_scale=11) Show diagnostic plots at each iteration: >>> spec.fit.continuum([2, 2], [3.0, 2.0], plot_steps=True, title="Continuum fit") Exclude known emission-line regions (defined in the rest frame) from the continuum fit: >>> spec.continuum(degree_list=[3, 2], emis_threshold=[3.0, 2.0], exclude_intvls=[(4861, 4875), (6548, 6584)], rest_intvls=True) """ # Create a pre-Mask based on the original mask if available input_wave = self._spec.wave.data input_flux = self._spec.flux.data input_mask = ~self._spec.flux.mask # Correction if intervals are in the rest frame z_corr = 1 + self._spec.redshift if rest_intvls else 1 # Adjust the mask to exclude wavelengt intervals if exclude_intvls is not None: exclude_intvls = np.asarray(exclude_intvls, dtype=float) # Check the input has the right format if exclude_intvls.ndim != 2 or exclude_intvls.shape[1] != 2: raise ValueError('The argument "exclude_intvls" must be a list of (low, high) pairs') # Loop through the wavelength intervals and add them to the mask exclude_intvls = exclude_intvls * z_corr i0 = np.searchsorted(input_wave, exclude_intvls[:, 0], side="right") i1 = np.searchsorted(input_wave, exclude_intvls[:, 1], side="left") for start, stop in zip(i0, i1): input_mask[start:stop] = False # Smooth the spectrum if smooth_scale is not None: smoothing_window = np.ones(smooth_scale) / smooth_scale input_flux = np.convolve(input_flux, smoothing_window, mode='same') # Loop through the fitting degree abs_threshold = emis_threshold if abs_threshold is None else abs_threshold for i, degree in enumerate(degree_list): # First iteration use percentile limits for an initial fit if i == 0: low_lim, high_lim = np.nanpercentile(input_flux[input_mask], (16, 84)) mask_cont_0 = input_mask & (input_flux >= low_lim) & (input_flux <= high_lim) cont_fit = continuum_model_fit(input_wave, input_flux, mask_cont_0, degree) # Establishing the flux limits std_flux = np.nanstd((input_flux - cont_fit)[input_mask]) low_lim, high_lim = cont_fit - abs_threshold[i] * std_flux, cont_fit + emis_threshold[i] * std_flux # Add new entries to the mask input_mask = input_mask & (input_flux >= low_lim) & (input_flux <= high_lim) # Fit continuum cont_fit = continuum_model_fit(input_wave, input_flux, input_mask, degree) # Compute the continuum and assign replace the value outside the bands the new continuum if plot_steps: input_kwargs = {'title':f'Continuum fitting, iteration ({i+1}/{len(degree_list)})'} input_kwargs.update(kwargs) spec_continuum_calculation(self._spec, input_wave, input_flux, cont_fit, input_mask, low_lim, high_lim, smooth_scale, exclude_intvls, **input_kwargs) # Include the standard deviation of the spectrum for the unmasked pixels self._spec.cont = np.ma.masked_array(cont_fit, self._spec.flux.mask) self._spec.cont_std = np.std((input_flux - cont_fit)[input_mask]) return
class CubeTreatment(LineFitting): def __init__(self, cube): # Instantiate the dependencies LineFitting.__init__(self) # Lime spectrum object with the scientific data self._cube = cube self._spec = None def spatial_mask(self, mask_file, fname, bands=None, fit_cfg=None, mask_list=None, line_list=None, log_ext_suffix='_LINELOG', min_method='least_squares', profile=None, shape=None, cont_source='central', err_from_bands=False, temp=10000.0, default_cfg_prefix='default', update_default=True, line_detection=False, progress_output='bar', plot_fit=False, header=None, join_output_files=True): """ Measure lines across an IFS cube using one or more spatial masks. This routine iterates over spaxels selected by binary masks (from ``mask_file``), measures the requested lines for each spaxel, and writes the results to one or more multi-extension FITS logs. Each spaxel’s measurements are saved in a dedicated extension named ``"{j}-{i}{log_ext_suffix}"`` (e.g., ``"25-30_LINELOG"``). The bands to use for measurements can be supplied globally via ``bands`` or, per-mask, via entries in ``fit_cfg``. Line-fitting behavior is configured via ``fit_cfg`` with a multi-level override scheme (default → mask → spaxel). Parameters ---------- mask_file : str or pathlib.Path or dict or numpy.ndarray Source of spatial masks. Can be a FITS file produced by :meth:`~lime.Cube.spatial_masking`, a dictionary mapping mask names to boolean arrays, or a boolean array. fname : str or pathlib.Path Output path for the combined measurements log (or the base name, if generating one .fits file per mask). bands : pandas.DataFrame, str, or pathlib.Path, optional Bands table (or path) to use for all masks/spaxels. If ``None``, the method will look for a per-mask bands path in ``fit_cfg``. fit_cfg : dict or str or pathlib.Path, optional Fitting configuration (dict or path to a TOML file). Supports a three-level override hierarchy: 1) ``default_cfg_prefix`` section (global defaults), 2) mask-level section named after the mask (e.g., ``"MASK_A"``), 3) spaxel-level section named ``"{j}-{i}_line_fitting"``. mask_list : list of str, optional Subset of masks in ``mask_file`` to process. If ``None``, all masks are used. line_list : list of str, optional Subset of line labels to measure from the bands table. If ``None``, all lines present in the bands are measured. log_ext_suffix : str, optional Suffix appended to each FITS extension name with results. Default is ``"_LINELOG"``. min_method : str, optional Minimization algorithm used by :mod:`lmfit`. See `lmfit.minimizer.Minimizer.minimize <https://lmfit.github.io/lmfit-py/fitting.html#lmfit.minimizer.Minimizer.minimize>`_. Default is ``"least_squares"``. profile : str, optional Profile identifier for fitting (e.g., ``"g"`` for Gaussian). If ``None``, falls back to the line database defaults. shape : str, optional Line shape: ``"emi"`` for emission or ``"abs"`` for absorption. If ``None``, falls back to database defaults. cont_source : {'central', 'adjacent', 'fitted'}, optional Method used to estimate the continuum level for line fitting. - ``'central'`` — use the edges of the central line band (w₃–w₄). - ``'adjacent'`` — use the adjacent continuum bands (w₁–w₂ and w₅–w₆). - ``'fitted'`` — use a previously fitted continuum model from the spectrum. The default is ``'central'``. err_from_bands : bool or None, optional If ``True``, estimate the pixel uncertainty from the continuum bands. If ``None``, use the spectrum’s ``err_flux`` data or fall back to the continuum regions if not available (False). The default value is None. temp : float, optional Electron temperature (K) for thermal broadening corrections. Default is ``10000.0``. default_cfg_prefix : str, optional Section name in ``fit_cfg`` containing global defaults. Default is ``"default"``. update_default : bool, optional If ``True`` (default), apply higher-level overrides by updating lower-level dictionaries (shared keys only). line_detection : bool, optional If ``True``, run the continuum fitting and line threshodling to confirme the presence of lines before measurements. The functions parameters must be specified in the ``fit_cfg`` (e.g., entries under ``"peaks_troughs"``, ``"continuum"``). Default is ``False``. progress_output : {"bar", "counter", None}, optional Console progress reporting mode. Default is ``"bar"``. plot_fit : bool, optional If ``True``, render a plot of each spaxel’s fitted lines during processing. Default is ``False``. header : dict, optional Extra FITS header keywords to add per spaxel page in the output logs. If a key matching the extension name (e.g., ``"25-30_LINELOG"``) exists, that dict is used; otherwise ``header`` is treated as a global header. join_output_files : bool, optional If multiple masks are processed, merge the per-mask logs into a single FITS file named after ``fname``. When ``False``, keep one output file per mask (named ``"{stem}_MASK-{mask}.fits"``). Default is ``True``. Returns ------- None Results are written to disk as one or more FITS files. Each spaxel’s measurements are stored in a binary table extension. Notes ----- - **Configuration hierarchy:** Values are resolved in the order *default → mask → spaxel*. Higher levels **update** shared keys only if ``update_default=True``. Otherwise, the method applies a fallback protocol where only the parameters explicitly defined in each section are used. - **WCS headers:** Spatial WCS keywords are added to each extension header when available, using the parent cube’s WCS metadata. Examples -------- Use a single bands table for all masks and join outputs: >>> cube.obs.spatial_mask( ... mask_file="O3_masks.fits", ... fname="logs/o3_all_masks.fits", ... bands="my_bands.xlsx", ... fit_cfg="fit_config.toml", ... progress_output="bar", ... ) Provide bands per mask via the configuration and keep files separate: >>> cube.obs.spatial_mask( ... mask_file="O3_masks.fits", ... fname="logs/o3_base.fits", ... fit_cfg="fit_config.toml", ... join_output_files=False, ... ) Limit measured lines and enable line detection: >>> cube.obs.spatial_mask( ... mask_file="masks.fits", ... fname="logs/selected_lines.fits", ... line_list=["O3_5007A", "H1_4861A"], ... line_detection=True, ... ) """ if bands is not None: bands = check_file_dataframe(bands) # Check if the mask variable is a file or an array mask_maps = check_file_array_mask(mask_file, mask_list) mask_list = np.array(list(mask_maps.keys())) mask_data_list = list(mask_maps.values()) # Check the mask configuration is included if there are no masks input_masks = mask_list if bands is None else None input_conf = check_fit_conf(fit_cfg, default_key=None, obj_key=None, update_default=update_default, group_list=input_masks) # Check if the output log folder exists fname = Path(fname) address_dir = fname.parent if not address_dir.is_dir(): raise LiMe_Error(f'The folder of the output log file does not exist at {fname}') address_stem = fname.stem # Determine the spaxels to treat at each mask spax_counter, total_spaxels, spaxels_dict = 0, 0, {} for idx_mask, mask_data in enumerate(mask_data_list): spa_mask, hdr_mask = mask_data idcs_spaxels = np.argwhere(spa_mask) total_spaxels += len(idcs_spaxels) spaxels_dict[idx_mask] = idcs_spaxels # Header data hdr_coords = extract_wcs_header(self._cube.wcs, drop_axis='spectral') if self._cube.wcs is not None else None # Loop through the masks n_masks = len(mask_list) mask_log_files_list = [address_dir/f'{address_stem}_MASK-{mask_name}.fits' for mask_name in mask_list] for i in np.arange(n_masks): # HDU_container hdul_log = fits.HDUList([fits.PrimaryHDU()]) # Mask progress indexing mask_name = mask_list[i] mask_hdr = mask_data_list[i][1] idcs_spaxels = spaxels_dict[i] # Recover the fitting configuration mask_conf = check_fit_conf(input_conf, default_cfg_prefix, mask_name) # Load the mask log if provided if bands is None: bands_file = Path(mask_conf['bands']).resolve() if bands_file.exists(): bands_in = load_frame(bands_file) else: err_msg = (f'Bands file not found at: {bands_file}.' f'\n- Resolving from log section - entry: ' f'\n [{mask_name}_line_fitting]' f'\n bands = {mask_conf["bands"]}') raise LiMe_Error(err_msg) else: bands_in = bands # Loop through the spaxels n_spaxels = idcs_spaxels.shape[0] n_lines, start_time = 0, time() print(f'\nSpatial mask {i + 1}/{n_masks}) {mask_name} ({n_spaxels} spaxels)') pbar = ProgressBar(progress_output, f'mask') for j in np.arange(n_spaxels): idx_j, idx_i = idcs_spaxels[j] spaxel_label = f'{idx_j}-{idx_i}' # Get the spaxel fitting configuration spaxel_conf = input_conf.get(f'{spaxel_label}_line_fitting') spaxel_conf = mask_conf if spaxel_conf is None else {**mask_conf, **spaxel_conf} # Spaxel progress message pbar.output_message(j, n_spaxels, pre_text="", post_text=f'(spaxel coordinate. {idx_j}-{idx_i})') # Get spaxel data spaxel = self._cube.get_spectrum(idx_j, idx_i, spaxel_label) # Fit the lines spaxel.fit.frame(bands_in, spaxel_conf, line_list=line_list, min_method=min_method, line_detection=line_detection, profile=profile, shape=shape, cont_source=cont_source, err_from_bands=err_from_bands, temp=temp, progress_output=None, plot_fit=None, obj_cfg_prefix=None, default_cfg_prefix=None, update_default=update_default) # Count the number of measurements n_lines += spaxel.frame.index.size # Create page header with the default data hdr_i = fits.Header() # Add WCS information if hdr_coords is not None: hdr_i.update(hdr_coords) # Add user information if header is not None: page_hdr = header.get(f'{spaxel_label}{log_ext_suffix}', None) page_hdr = header if page_hdr is None else page_hdr hdr_i.update(page_hdr) # Save to a fits file linesHDU = log_to_HDU(spaxel.frame, ext_name=f'{spaxel_label}{log_ext_suffix}', header_dict=hdr_i) if linesHDU is not None: hdul_log.append(linesHDU) # Plot the fittings if requested: if plot_fit: spaxel.plot.spectrum(rest_frame=True) # Save the log at each new mask hdul_log.writeto(mask_log_files_list[i], overwrite=True, output_verify='ignore') hdul_log.close() # Computation time and message end_time = time() elapsed_time = end_time - start_time print(f'\n{n_lines} lines measured in {elapsed_time/60:0.2f} minutes.') if join_output_files: output_comb_file = f'{address_dir/address_stem}.fits' # In case of only one file just rename it if len(mask_list) == 1: mask_0_path = Path(mask_log_files_list[0]) mask_0_path.rename(Path(output_comb_file)) else: print(f'\nJoining spatial log files ({",".join(mask_list)}) -> {output_comb_file}') join_fits_files(mask_log_files_list, output_comb_file, delete_after_join=join_output_files) return