Source code for lime.tools

__all__ = ['unit_conversion',
           'extract_fluxes',
           'normalize_fluxes',
           'redshift_calculation',
           'save_parameter_maps']

import logging
import numpy as np
import pandas as pd
from sys import stdout
from pathlib import Path

from astropy import units as au
from astropy.units.core import CompositeUnit, IrreducibleUnit, Unit

from astropy.io import fits

from lime.io import LiMe_Error, load_frame, log_to_HDU

_logger = logging.getLogger('LiMe')

# Arrays for roman numerals conversion
VAL_LIST = [1000, 900, 500, 400, 100, 90, 50, 40, 10, 9, 5, 4, 1]
SYB_LIST = ["M", "CM", "D", "CD", "C", "XC", "L", "XL", "X", "IX", "V", "IV", "I"]


dict_units = {'flam': au.erg/au.s/au.cm**2/au.AA, 'FLAM': au.erg/au.s/au.cm**2/au.AA,
              'fnu': au.erg/au.s/au.cm**2/au.Hz, 'FNU': au.erg/au.s/au.cm**2/au.Hz,
              'photlam': au.photon/au.s/au.cm**2/au.AA, 'PHOTLAM': au.photon/au.s/au.cm**2/au.AA,
              'photnu': au.photon/au.s/au.cm**2/au.Hz, 'PHOTNU': au.photon/au.s/au.cm**2/au.Hz}
au.set_enabled_aliases(dict_units)


PARAMETER_LATEX_DICT = {'Flam': r'$F_{\lambda}$',
                        'Fnu': r'$F_{\nu}$',
                        'SN_line': r'$\frac{S}{N}_{line}$',
                        'SN_cont': r'$\frac{S}{N}_{cont}$'}


def unique_line_arr(frame, return_index = False):

    idcs_s = frame.group_label == 'none'
    idcs_m = pd.Series(frame.index.str.endswith('_m'), index=frame.index)

    # Initialize with False
    idcs_b = pd.Series(False, index=frame.index)
    idcs_components = frame.loc[~(idcs_s | idcs_m), 'group_label'].str.split('+').str[0].unique()
    idcs_components = idcs_b.index.isin(idcs_components)
    idcs_b[idcs_components] = True

    # Get merged lines in blended lines
    m_terms = (frame.loc[idcs_b, 'group_label'].apply(lambda x: [term for term in x.split('+')
                                                                 if term.endswith('_m')]))
    flat_m_terms = [item for sublist in m_terms if sublist for item in sublist]

    if len(flat_m_terms) > 0:
        idcs_m_in_b = pd.Series(False, index=frame.index)
        idcs_m_in_b.loc[flat_m_terms] = True
    else:
        idcs_m_in_b = pd.Series(False, index=frame.index)

    idcs_selection = (idcs_s | idcs_m | idcs_b) & ~idcs_m_in_b

    # Return a series with the same length as the original index
    if return_index:
        return idcs_selection

    # Return an array
    else:
        return idcs_selection[idcs_selection].index


def mult_err_propagation(nominal_array, err_array, result):

    err_result = result * np.sqrt(np.sum(np.power(err_array/nominal_array, 2)))

    return err_result


def int_to_roman(num):
    i, roman_num = 0, ''
    while num > 0:
        for _ in range(num // VAL_LIST[i]):
            roman_num += SYB_LIST[i]
            num -= VAL_LIST[i]
        i += 1
    return roman_num


def pd_get(df, row, column, default=None, transform=None, nan_to_none=False):

    # Fast get from dataframe
    try:
        cell = df.at[row, column]
    except KeyError:
        cell = default

    # Transform the value from the dataframe to the default if requested
    if transform is not None:
        cell = default if cell == transform else cell

    # Transform nan to None # TODO is this all necessary
    if nan_to_none and (cell is not None):
        if isinstance(cell, float):
            cell = None if np.isnan(cell) else cell

    return cell


def extract_fluxes(log, flux_type='mixture', sample_level='line', column_name='line_flux', column_positions=None):

    if flux_type not in ('mixture', 'intg', 'profile'):
        raise LiMe_Error(f'Flux type "{flux_type}" is not recognized please one of "intg", "profile", or "mixture" ')

    # Get indeces of blended lines
    if not isinstance(log.index, pd.MultiIndex):
        idcs_blended = (log['group_label'] != 'none') & (~log.index.str.endswith('_m'))
    else:
        if sample_level not in log.index.names:
            raise LiMe_Error(f'Input log does not have a index level with column "{sample_level}"')

        idcs_blended = (log['group_label'] != 'none') & (~log.index.get_level_values('line').str.endswith('_m'))

    # Mixture models: Integrated fluxes for all lines except blended
    if flux_type == 'mixture' and np.any(idcs_blended):
        obsFlux = log['intg_flux'].to_numpy(copy=True)
        obsErr = log['intg_flux_err'].to_numpy(copy=True)
        obsFlux[idcs_blended.values] = log.loc[idcs_blended.values, 'profile_flux'].to_numpy(copy=True)
        obsErr[idcs_blended.values] = log.loc[idcs_blended.values, 'profile_flux_err'].to_numpy(copy=True)

    # Use the one requested by the user
    else:
        obsFlux = log[f'{flux_type}_flux'].to_numpy(copy=True)
        obsErr = log[f'{flux_type}_flux_err'].to_numpy(copy=True)

    # Add columns to input dataframe
    output_fluxes = [obsFlux, obsErr]
    if column_name is not None:
        column_positions = 0 if column_positions is None else column_positions
        if column_name in log.columns:
            log.loc[f'{column_name}'] = output_fluxes[0]
            log.loc[f'{column_name}_err'] = output_fluxes[1]
        else:
            log.insert(loc=column_positions, column=f'{column_name}', value=output_fluxes[0])
            log.insert(loc=column_positions + 1, column=f'{column_name}_err', value=output_fluxes[1])
        function_return = None
    else:
        function_return = output_fluxes

    return function_return


def check_lines_normalization(input_lines, norm_line, log):

    # Single or multi-index behaviour
    single_index_check = not isinstance(log.index, pd.MultiIndex)

    # If not input lines use all of them
    if input_lines is None:
        input_lines = log.index.to_numpy() if single_index_check else log.index.get_level_values('line').unique()

    # In case there is only one input line as str
    input_lines = [input_lines] if isinstance(input_lines, str) else input_lines

    # 1) One-norm in norm_line  # 2) Multiple-norm in norm_line # 3) Multiple-norm in input_lines

    # Cases 1 and 2
    if norm_line is not None:

        # Unique normalization
        if isinstance(norm_line, str) or (len(norm_line) == 1):

            if single_index_check:
                line_list = list(log.loc[log.index.isin(input_lines)].index.to_numpy())
            else:
                line_list = list(log.loc[log.index.get_level_values('line').isin(input_lines)].index.get_level_values('line').unique())

            norm_list = [norm_line] * len(line_list)

        # Multiple normalizations
        else:
            if len(input_lines) and len(norm_line):
                line_list, norm_list = [], []
                candidate_lines = log.index if single_index_check else log.index.get_level_values('line')
                for i, line in enumerate(input_lines):
                    if (line in candidate_lines) and (norm_line[i] in candidate_lines):
                        line_list.append(line)
                        norm_list.append(norm_line[i])
            else:
                raise LiMe_Error(f'The number of normalization lines does not match the number of input lines:\n'
                                   f'- Model lines ({input_lines}): {input_lines}\n'
                                   f'- Norm lines  ({len(norm_line)}): {norm_line}')

    # Case 3
    else:

        # Split the nominators and denominators
        line_list, norm_list = [], []
        for ratio in input_lines:

            if '/' not in ratio:
                raise LiMe_Error(f'Input line list must use "/" with their normalization (for example H1_6563A/H1_4861A)\n'
                                   f'The input ratio: {ratio} does not have it. Try to specify a "norm_list" argument instead.')

            nomin_i, denom_i = ratio.replace(" ", "").split('/')
            line_list.append(nomin_i)
            norm_list.append(denom_i)

    return line_list, norm_list


def normalize_fluxes(log, line_list=None, norm_list=None, flux_column='profile_flux', column_name='line_flux',
                     column_position=0, column_normalization_name='norm_line', sample_levels=['id', 'line']):

    '''
    If the normalization line is not available, no operation is added.
    '''

    # Check columns present in log
    if (flux_column not in log.columns) or (f'{flux_column}_err' not in log.columns):
        raise LiMe_Error(f'Input log is missing "{flux_column}" or "{flux_column}_err" columns')

    # Check the normalization for the lines
    line_array, norm_array = check_lines_normalization(line_list, norm_list, log)

    # Add new columns if necessary
    if column_name not in log.columns:
        log.insert(loc=column_position, column=f'{column_name}', value=np.nan)
        log.insert(loc=column_position+1, column=f'{column_name}_err', value=np.nan)

    # Add new line with normalization by default
    if column_normalization_name not in log.columns:
        log.insert(loc=column_position+2, column=column_normalization_name, value='none')

    # Loop throught the lines to compute their normalization
    single_index = not isinstance(log.index, pd.MultiIndex)
    for i in np.arange(len(line_array)):

        numer, denom = line_array[i], norm_array[i]
        numer_flux, denom_flux = None, None

        # Single-index dataframe
        if single_index:
            idcs_ratios = numer
            if (numer in log.index) and (denom in log.index):
                numer_flux = log.loc[numer, flux_column]
                numer_err = log.loc[numer, f'{flux_column}_err']

                denom_flux = log.loc[denom, flux_column]
                denom_err = log.loc[denom, f'{flux_column}_err']

        #Multi-index dataframe
        else:

            if numer == denom:  # Same line normalization
                idcs_slice = log.index.get_level_values(sample_levels[-1]) == numer
                df_slice = log.loc[idcs_slice]
            else:  # Rest cases
                idcs_slice = log.index.get_level_values(sample_levels[-1]).isin([numer, denom])
                grouper = log.index.droplevel('line')
                idcs_slice = pd.Series(idcs_slice).groupby(grouper).transform('sum').ge(2).to_numpy()
                df_slice = log.loc[idcs_slice]

            # Get fluxes
            if df_slice.size > 0:
                num_slice = df_slice.xs(numer, level=sample_levels[-1], drop_level=False)
                numer_flux = num_slice[flux_column].to_numpy()
                numer_err = num_slice[f'{flux_column}_err'].to_numpy()

                denom_slice = df_slice.xs(denom, level=sample_levels[-1], drop_level=False)
                denom_flux = denom_slice[flux_column].to_numpy()
                denom_err = denom_slice[f'{flux_column}_err'].to_numpy()

                idcs_ratios = num_slice.index

        # Compute the ratios with error propagation
        ratio_array, ratio_err = None, None
        if (numer_flux is not None) and (denom_flux is not None):
            ratio_array = numer_flux / denom_flux
            ratio_err = ratio_array * np.sqrt(np.power(numer_err / numer_flux, 2) + np.power(denom_err / denom_flux, 2))

        # Store in dataframe (with empty columns)
        if (ratio_array is not None) and (ratio_err is not None):
            log.loc[idcs_ratios, f'{column_name}'] = ratio_array
            log.loc[idcs_ratios, f'{column_name}_err'] = ratio_err

            # Store normalization line
            if column_normalization_name is not None:
                log.loc[idcs_ratios, f'{column_normalization_name}'] = denom

    return


def redshift_calculation(input_log, line_list=None, weight_parameter=None, min_err_pct=None, obj_label='spec_0'):

    if str(type(input_log)) in ["<class 'lime.observations.Spectrum'>", "<class 'lime.observations.Cube'>", "<class 'lime.observations.Sample'>"]:
        input_log = input_log.frame

    # Check if single or multi-index
    sample_check = isinstance(input_log.index, pd.MultiIndex)

    # Check the weighted parameter presence
    if weight_parameter is not None:
        if weight_parameter not in input_log.columns:
            raise LiMe_Error(f'The parameter {weight_parameter} is not found on the input lines log headers')

    # Check input line is not a string
    line_list = np.array(line_list, ndmin=1) if isinstance(line_list, str) else line_list

    if sample_check:
        levels = input_log.index.names
        id_list = input_log.index.droplevel(levels[-1]).unique()
    else:
        id_list = np.array([obj_label])
        levels = None

    # Container for redshifts
    z_df = pd.DataFrame(index=id_list, columns=['z_mean', 'z_std', 'lines', 'weight'])
    if sample_check:
        z_df.rename_axis(index=levels[:-1], inplace=True)

    # Loop through the ids
    for idx in id_list:

        # Slice to the object log
        if not sample_check:
            df_slice = input_log
        else:
            df_slice = input_log.xs(idx, level=levels[0])

        # Get the lines requested
        if line_list is not None:
            idcs_slice = df_slice.index.isin(line_list)
            df_slice = df_slice.loc[idcs_slice]

        # Exclude lines which failed to be measured
        df_slice = df_slice.loc[df_slice.profile_flux_err.notnull()]

        # Exclude error lines:
        if min_err_pct is not None:
            idcs_slice = df_slice.center_err.to_numpy() / df_slice.center.to_numpy() <= min_err_pct
            df_slice = df_slice.loc[idcs_slice]

        # Check the line has lines
        n_lines = len(df_slice.index)
        if n_lines > 0:
            z_array = (df_slice['center']/df_slice['wavelength'] - 1).to_numpy()
            z_err_array = df_slice.center_err.to_numpy() / df_slice.center.to_numpy()
            line_array = df_slice.index.to_numpy()

            # Just one line
            if n_lines == 1:
                z_mean = z_array[0]
                z_std = df_slice.center_err.to_numpy()[0]/df_slice.center.to_numpy()[0]

            # Multiple lines
            else:

                # Not weighted parameter
                if weight_parameter is None:
                    z_mean = z_array.mean()
                    z_std = z_array.std()

                # With a weighted parameter
                else:

                    # Get normalized errors weight values which are positive...
                    w_array = df_slice[weight_parameter].to_numpy()
                    idcs_pos = w_array > 0
                    w_array = w_array[idcs_pos]/np.sum(w_array[idcs_pos])

                    # Only use possitive values
                    z_array, z_err_array, line_array = z_array[idcs_pos], z_err_array[idcs_pos], line_array[idcs_pos]

                    z_mean = np.sum(w_array * z_array)/np.sum(w_array)
                    z_std = np.sqrt(np.sum(w_array * np.square(z_err_array)) / np.square(np.sum(w_array)))

            obsLineList = ','.join(list(line_array))

        else:
            z_mean, z_std, obsLineList = np.nan, np.nan, None

        # Add to dataframe
        z_df.loc[idx, 'z_mean':'weight'] = z_mean, z_std, obsLineList, weight_parameter

    return z_df


def compute_FWHM0(idx_peak, spec_flux, delta_wave, cont_flux, emission_check=True):

    """

    :param idx_peak:
    :param spec_flux:
    :param delta_wave:
    :param cont_flux:
    :param emission_check:
    :return:
    """

    i = idx_peak
    i_final = 0 if delta_wave < 0 else spec_flux.size - 1

    if emission_check:
        while (spec_flux[i] >= cont_flux[i]) and (i != i_final):
            i += delta_wave
    else:
        while (spec_flux[i] <= cont_flux[i]) and (i != i_final):
            i += delta_wave

    return i


def blended_label_from_log(line, log):

    # Default values: single line
    blended_check = False
    group_label = None

    if line in log.index:

        if 'group_label' in log.columns:

            if log.loc[line, 'group_label'] == 'none':
                group_label = None
            elif line.endswith('_m'):
                group_label = log.loc[line, 'group_label']
            else:
                blended_check = True
                group_label = log.loc[line, 'group_label']
    else:
        # TODO this causes and error if we forget the '_b' componentes in the configuration file need to check input cfg
        _logger.warning(f'The line {line} was not found on the input log. If you are specifying the components of a '
                        f'blended line in the fitting configuration, make sure you are not missing the "_b" subscript')

    return blended_check, group_label


[docs] def unit_conversion(in_units, out_units, wave_array=None, flux_array=None, dispersion_units=None, decimals=None): """ Convert a wavelength or flux array from one physical unit to another. This function converts input quantities (wavelength or flux) from ``in_units`` to ``out_units`` using the `Astropy Units module <https://docs.astropy.org/en/stable/units/>`_. For flux conversions, the user must also provide the corresponding wavelength array and its dispersion units to correctly apply the spectral-density equivalency. Parameters ---------- in_units : str Units of the input array. Must be a valid `Astropy unit string <https://docs.astropy.org/en/stable/units/#module-astropy.units>`_ (e.g., ``"Angstrom"``, ``"Hz"``, ``"Jy"``, ``"erg / (s cm2 Angstrom)"``). out_units : str Target units for the conversion. Must also be a valid Astropy unit string. wave_array : numpy.ndarray, optional Wavelength array. Required for flux conversions (``flux_array`` not ``None``). Used to define the frequency–wavelength relationship in the conversion. flux_array : numpy.ndarray, optional Flux array to convert. If ``None``, the function assumes that the conversion applies to ``wave_array`` instead. dispersion_units : str, optional Units of the wavelength array, required when converting flux densities (e.g., ``"Angstrom"`` or ``"Hz"``). Used to define the spectral density equivalency. decimals : int, optional Number of decimal places to round the output values. If ``None``, no rounding is applied. Returns ------- numpy.ndarray Converted array (wavelength or flux) expressed in ``out_units`` and stripped of unit metadata. Notes ----- - The conversion is performed through `astropy.units.Unit` objects and their corresponding equivalencies: * For wavelength conversions: ``astropy.units.spectral()``. * For flux conversions: ``astropy.units.spectral_density(wave_array)``. - Ensure that both input and output units are compatible for the desired transformation (e.g., wavelength ↔ frequency or flux per unit wavelength ↔ flux per unit frequency). - When converting flux densities, always provide both ``wave_array`` and ``dispersion_units`` to avoid unit-equivalency errors. Examples -------- Convert wavelength from Angstroms to nanometers: >>> import numpy as np >>> wave_nm = unit_conversion("Angstrom", "nm", wave_array=np.array([5000, 6000])) >>> wave_nm array([500., 600.]) Convert flux density from Fλ (erg s⁻¹ cm⁻² Å⁻¹) to Fν (erg s⁻¹ cm⁻² Hz⁻¹): >>> flux_fnu = unit_conversion("erg / (s cm2 Angstrom)", "erg / (s cm2 Hz)", ... wave_array=np.array([5000, 6000]), ... flux_array=np.array([1e-14, 2e-14]), ... dispersion_units="Angstrom", decimals=5) >>> flux_fnu array([3.33e-27, 6.67e-27]) """ # Converting the wavelength array if flux_array is None: input_array = wave_array * au.Unit(in_units) output_array = input_array.to(au.Unit(out_units), equivalencies=au.spectral()) # Converting the flux array else: input_array = flux_array * au.Unit(in_units) dispersion_array = wave_array * au.Unit(dispersion_units) output_array = input_array.to(au.Unit(out_units), au.spectral_density(dispersion_array)) # Remove the units output_array = output_array.value # Round to decimal places output_array = output_array if decimals is None else np.round(output_array, decimals) return output_array
def parse_unit_convertion(observation, wave_units_out, flux_units_out): # Recover the pixel mask pixel_mask = observation.flux.mask # Remove existing normalization old_norm = observation.norm_flux if ((observation.norm_flux != 1) and (observation.norm_flux is not None)) else 1 input_wave = observation.wave if pixel_mask is None else observation.wave.data input_flux = observation.flux * old_norm if pixel_mask is None else observation.flux.data * old_norm if observation.err_flux is not None: input_err = observation.err_flux * old_norm if pixel_mask is None else observation.err_flux * old_norm else: input_err = None # Get the new units or use old wave_units_out = observation.units_wave if wave_units_out is None else wave_units_out flux_units_out = observation.units_flux if flux_units_out is None else flux_units_out # Flux conversion if flux_units_out != observation.units_flux: # Spectrum if len(input_flux.shape) == 1: # Flux conversion output_flux = unit_conversion(observation.units_flux, flux_units_out, wave_array=input_wave, flux_array=input_flux, dispersion_units=observation.units_wave) # Flux uncertainty conversion output_err = None if input_err is None else unit_conversion(observation.units_flux, flux_units_out, wave_array=input_wave, flux_array=input_err, dispersion_units=observation.units_wave) # Cube elif len(input_flux.shape) == 3: output_flux = np.empty_like(input_flux) output_err = np.empty_like(input_err) if input_err is not None else None for i in np.arange(input_flux.shape[0]): output_flux[i, :, :] = unit_conversion(observation.units_flux, flux_units_out, wave_array=input_wave[i], flux_array=input_flux[i, :, :], dispersion_units=observation.units_wave) if input_err is not None: output_err[i, :, :] = unit_conversion(observation.units_flux, flux_units_out, wave_array=input_wave[i], flux_array=input_err[i, :, :], dispersion_units=observation.units_wave) # Not recognized else: raise LiMe_Error(f'Input flux for unit conversion array size is not recognized:\n' f'-- Please use 1-dimension array for lime.Spectrum and 3-dimension array for lime.Cube') else: output_flux, output_err = input_flux, input_err # Wavelength conversion if wave_units_out != observation.units_wave: output_wave = unit_conversion(observation.units_wave, wave_units_out, wave_array=input_wave) else: output_wave = input_wave return wave_units_out, flux_units_out, output_wave, output_flux, output_err, pixel_mask def join_fits_files(log_file_list, output_address, delete_after_join=False, levels=['id', 'line']): """ This functions combines multiple log files into single *.fits* file. The user can request to the delete the individual logs after the individual logs have been combined. If the case of individual *.fits* the function loop through the individual HDU and add them to the output file. Currently, this is not available to other multi-page files (such as .xlsx or .asdf) :param log_file_list: Input list of log files. :type log_file_list: list :param output_address: String or path for the output combined log file. :type output_address: str, Path, optional :param delete_after_join: Delete individual files after joining them. The default value is False :type output_address: bool, optional :param levels: Indexes name list for MultiIndex dataframes. The default value is ['id', 'line']. :type levels: list, optional :return: """ # Confirm is a path output_address = Path(output_address) # Create new HDU for the combined file with a new PrimaryHDU hdulist = fits.HDUList([fits.PrimaryHDU()]) # Progress bar n_log = len(log_file_list) pbar = ProgressBar('bar', f'log files combined') # Iterate through the file paths, open each FITS file, and append the non-primary HDUs to hdulist missing_files = [] for i, log_path in enumerate(log_file_list): log_path = Path(log_path) pbar.output_message(i, n_log, pre_text="", post_text=None) if log_path.is_file(): ext = log_path.suffix # Fits file if ext == '.fits': with fits.open(log_path) as hdul: for j, hdu in enumerate(hdul): if j == 0: if not isinstance(hdu, fits.PrimaryHDU): hdu_i = fits.BinTableHDU(data=hdu.data, header=hdu.header, name=hdu.name, character_as_bytes=False) else: hdu_i = None else: hdu_i = fits.BinTableHDU(data=hdu.data, header=hdu.header, name=hdu.name, character_as_bytes=False) # Append if hdu_i is not None: hdulist.append(hdu_i) # Remaining types else: df_i = load_frame(log_path, levels=levels) name_i = log_path.stem hdu_i = log_to_HDU(df_i, ext_name=name_i) # Append if hdu_i is not None: hdulist.append(hdu_i) # Append to the list # with fits.open(log_path) as hdulist_i: # # for i, hdu in enumerate(hdulist_i): # if i == 0: # if not isinstance(hdu, fits.PrimaryHDU): # hdulist.append(fits.TableHDU(data=hdu.data, header=hdu.header, name=hdu.name)) # else: # hdulist.append(fits.TableHDU(data=hdu.data, header=hdu.header, name=hdu.name)) else: missing_files.append(log_path) # Save to a combined file hdulist.writeto(output_address, overwrite=True, output_verify='ignore') hdulist.close() # Warn of missing files if len(missing_files) > 0: _logger.info(f"Warning these files were missing: {missing_files}") # Delete individual files if requested if delete_after_join: if len(missing_files) == 0: for log_path in log_file_list: log_path.unlink() else: _logger.info("The individual masks won't be deleted") return def check_units(units_wave, units_flux): # Check if input are already astropy units units_wave = au.Unit(units_wave) if not isinstance(units_wave, (IrreducibleUnit, CompositeUnit, Unit)) else units_wave units_flux = au.Unit(units_flux) if not isinstance(units_flux, (IrreducibleUnit, CompositeUnit, Unit)) else units_flux return units_wave, units_flux
[docs] def save_parameter_maps(lines_log_file, output_folder, param_list, line_list, mask_file=None, mask_list='all', image_shape=None, log_ext_suffix='_LINELOG', spaxel_fill_value=np.nan, output_file_prefix=None, header=None, wcs=None): """ Convert the measurements from a cube FITS log into 2D parameter maps. For each requested parameter in ``param_list`` and each line in ``line_list``, this function builds a 2D image (spaxel grid) by reading the per-spaxel measurement pages from ``lines_log_file``. The resulting images are written as multi-extension FITS files to ``output_folder``—one output file per parameter, with one ImageHDU page per line. Spaxels to include can be specified via a spatial mask FITS file (``mask_file``). If multiple mask extensions exist, select a subset with ``mask_list`` or use ``'all'`` to include all masks. If no mask is provided, you must supply ``image_shape=(ny, nx)`` to define the output grid (this computation can be lengthy). Parameters ---------- lines_log_file : str or pathlib.Path Path to the FITS file containing per-spaxel line measurements. Each spaxel must be stored in an extension named ``"{j}-{i}{log_ext_suffix}"`` (e.g., ``"25-30_LINELOG"``). output_folder : str or pathlib.Path Existing directory where output parameter maps will be saved. param_list : list of str Measurement fields to extract and map (e.g., ``["flux", "EW", "v_med"]``). These must be columns present in each spaxel page of ``lines_log_file``. line_list : list of str Line labels to include as pages in the output files. Each label must appear in the spaxel log’s ``index`` (line name) column. mask_file : str or pathlib.Path, optional FITS file with one or more binary masks (nonzero→included). If provided, only spaxels selected by the union of masks in ``mask_list`` are mapped. mask_list : {'all'} or list of str, optional Which mask extensions from ``mask_file`` to use. ``'all'`` (default) includes every non-PRIMARY extension. Otherwise, provide a list of extension names. image_shape : tuple of int, optional Output image shape ``(ny, nx)`` used when no ``mask_file`` is provided. Required if ``mask_file`` is ``None``. log_ext_suffix : str, optional Suffix used in spaxel extension names inside ``lines_log_file``. Default is ``"_LINELOG"``. spaxel_fill_value : float, optional Value used to fill pixels with missing data or spaxels not found in the log. Default is ``numpy.nan``. output_file_prefix : str, optional Prefix prepended to each output filename. For parameter ``P``, the file is saved as ``{prefix}{P}.fits``. Default is ``None`` (no prefix). header : dict, optional Extra header cards to add. If a key matching ``"{param}-{line}"`` exists, that nested dict is used for that page; otherwise ``header`` is treated as a global header for all pages of a parameter file. wcs : astropy.wcs.WCS, optional WCS used to populate spatial coordinate keywords in each ImageHDU header. See `Astropy WCS <https://docs.astropy.org/en/stable/wcs/index.html>`_. Returns ------- None Writes one FITS file per parameter to ``output_folder``. Each file contains an ImageHDU per line in ``line_list``. Notes ----- - **Spaxel HDU naming:** The function expects pages in the ``lines_log_file`` to be named exactly ``"{j}-{i}{log_ext_suffix}"`` where ``j`` and ``i`` are integer **(row, column)** indices of the cube spaxel. - **Mask handling:** When ``mask_file`` includes multiple mask extensions, the selected masks are combined (logical OR) to form the spaxel set. - **Headers:** Each page includes ``LINE`` (line label) and ``PARAM`` (parameter name). WCS metadata are added if ``wcs`` is provided. Custom header cards from ``header`` are merged afterward. Examples -------- Create maps for flux and velocity dispersion of two lines, using all masks: >>> save_parameter_maps( ... lines_log_file="linelog.fits", ... output_folder="maps/", ... param_list=["flux", "sigma"], ... line_list=["O3_5007A", "H1_6563A"], ... mask_file="spatial_masks.fits", ... mask_list="all", ... log_ext_suffix="_LINELOG", ... output_file_prefix="galaxy_" ... ) Build maps without a mask file (provide image shape explicitly): >>> save_parameter_maps( ... lines_log_file="linelog.fits", ... output_folder="maps/", ... param_list=["EW"], ... line_list=["H1_4861A"], ... image_shape=(74, 76) ... ) """ assert Path(lines_log_file).is_file(), f'- ERROR: lines log at {lines_log_file} not found' assert Path(output_folder).is_dir(), f'- ERROR: Output parameter maps folder {output_folder} not found' # Compile the list of voxels to recover the provided masks if mask_file is not None: assert Path(mask_file).is_file(), f'- ERROR: mask file at {mask_file} not found' with fits.open(mask_file) as maskHDUs: # Get the list of mask extensions if mask_list == 'all': if ('PRIMARY' in maskHDUs) and (len(maskHDUs) > 1): mask_list = [] for i, HDU in enumerate(maskHDUs): mask_name = HDU.name if mask_name != 'PRIMARY': mask_list.append(mask_name) mask_list = np.array(mask_list) else: mask_list = np.array(['PRIMARY']) else: mask_list = np.array(mask_list, ndmin=1) # Combine all the mask voxels into one for i, mask_name in enumerate(mask_list): if i == 0: mask_array = maskHDUs[mask_name].data image_shape = mask_array.shape else: assert image_shape == maskHDUs[mask_name].data.shape, '- ERROR: Input masks do not have the same dimensions' mask_array += maskHDUs[mask_name].data # Convert to boolean mask_array = mask_array.astype(bool) # List of spaxels in list [(idx_j, idx_i), ...] format spaxel_list = np.argwhere(mask_array) # No mask file is provided and the user just defines an image size tupple (nY, nX) else: mask_array = np.ones(image_shape).astype(bool) spaxel_list = np.argwhere(mask_array) # Generate containers for the data: images_dict = {} for param in param_list: # Make sure is an array and loop throuh them for line in line_list: images_dict[f'{param}-{line}'] = np.full(image_shape, spaxel_fill_value) # Loop through the spaxels and fill the parameter images n_spaxels = spaxel_list.shape[0] spaxel_range = np.arange(n_spaxels) pbar = ProgressBar('bar', f'spaxels from file') with fits.open(lines_log_file) as logHDUs: for i_spaxel in spaxel_range: idx_j, idx_i = spaxel_list[i_spaxel] spaxel_ref = f'{idx_j}-{idx_i}{log_ext_suffix}' post_text = f'"{lines_log_file}" read ({n_spaxels} total)' pbar.output_message(i_spaxel, n_spaxels, pre_text="", post_text=post_text) # progress_bar(i_spaxel, n_spaxels, post_text=f'of spaxels from file ({lines_log_file}) read ({n_spaxels} total spaxels)') # Confirm log extension exists if spaxel_ref in logHDUs: # Recover extension data log_data = logHDUs[spaxel_ref].data log_lines = log_data['index'] # Loop through the parameters and the lines: for param in param_list: idcs_log = np.argwhere(np.in1d(log_lines, line_list)) for i_line in idcs_log: images_dict[f'{param}-{log_lines[i_line][0]}'][idx_j, idx_i] = log_data[param][i_line][0] # New line after the rustic progress bar print() # Recover coordinates from the wcs to store in the headers: hdr_coords = extract_wcs_header(wcs, drop_axis='spectral') # Save the parameter maps as individual fits files with one line per page output_file_prefix = '' if output_file_prefix is None else output_file_prefix for param in param_list: # Primary header paramHDUs = fits.HDUList() paramHDUs.append(fits.PrimaryHDU()) # ImageHDU for the parameter maps for line in line_list: # Create page header with the default data hdr_i = fits.Header() hdr_i['LINE'] = (line, 'Line label') hdr_i['PARAM'] = (param, 'LiMe parameter label') # Add WCS information if hdr_coords is not None: hdr_i.update(hdr_coords) # Add user information if header is not None: page_hdr = header.get(f'{param}-{line}', None) page_hdr = header if page_hdr is None else page_hdr hdr_i.update(page_hdr) # Create page HDU entry HDU_i = fits.ImageHDU(name=line, data=images_dict[f'{param}-{line}'], header=hdr_i, ver=1) paramHDUs.append(HDU_i) # Write to new file output_file = Path(output_folder)/f'{output_file_prefix}{param}.fits' paramHDUs.writeto(output_file, overwrite=True, output_verify='fix') return
def extract_wcs_header(wcs, drop_axis=None): if wcs is not None: # Remove 3rd dimensional axis if present if drop_axis is not None: if drop_axis == 'spectral': input_wcs = wcs.dropaxis(2) if wcs.naxis == 3 else wcs elif drop_axis == 'spatial': if wcs.naxis == 3: input_wcs = wcs.dropaxis(1) input_wcs = wcs.dropaxis(0) else: raise LiMe_Error(f'Fits coordinates axis: "{drop_axis}" not recognized. Please use: "spectral" or' f' "spatial"') else: input_wcs = wcs # Convert to HDU header hdr_coords = input_wcs.to_fits()[0].header else: hdr_coords = None return hdr_coords class ProgressBar: def __init__(self, message_type=None, count_type='bar'): self.output_message = None self.count_type = count_type if message_type is None: self.output_message = self.no_output if message_type == 'bar': self.output_message = self.progress_bar if message_type == 'counter': self.output_message = self.counter return def progress_bar(self, i, i_max, pre_text, post_text, n_bar=10): post_text = "" if post_text is None else post_text j = (i + 1) / i_max stdout.write('\r') message = f'[{"=" * int(n_bar * j):{n_bar}s}] {int(100 * j)}% of {self.count_type} {post_text}' stdout.write(message) stdout.flush() return def no_output(self, i, i_max, pre_text, post_text, n_bar=10): return def counter(self, i, i_max, pre_text, post_text, n_bar=10): print(post_text) return