Source code for lime.tools

__all__ = ['unit_conversion',
           'extract_fluxes',
           'normalize_fluxes',
           'redshift_calculation',
           'save_parameter_maps']

import logging
import numpy as np
import pandas as pd

from .io import LiMe_Error, load_frame, log_to_HDU
from sys import stdout

from astropy import units as au
from astropy.units.core import CompositeUnit, IrreducibleUnit, Unit

from astropy.io import fits
from pathlib import Path

_logger = logging.getLogger('LiMe')

# Arrays for roman numerals conversion
VAL_LIST = [1000, 900, 500, 400, 100, 90, 50, 40, 10, 9, 5, 4, 1]
SYB_LIST = ["M", "CM", "D", "CD", "C", "XC", "L", "XL", "X", "IX", "V", "IV", "I"]


dict_units = {'flam': au.erg/au.s/au.cm**2/au.AA, 'FLAM': au.erg/au.s/au.cm**2/au.AA,
              'fnu': au.erg/au.s/au.cm**2/au.Hz, 'FNU': au.erg/au.s/au.cm**2/au.Hz,
              'photlam': au.photon/au.s/au.cm**2/au.AA, 'PHOTLAM': au.photon/au.s/au.cm**2/au.AA,
              'photnu': au.photon/au.s/au.cm**2/au.Hz, 'PHOTNU': au.photon/au.s/au.cm**2/au.Hz}
au.set_enabled_aliases(dict_units)

# flam = au.def_unit(['flam', 'FLAM'], au.erg/au.s/au.cm**2/au.AA,
#                     format={"latex": r"erg\,cm^{-2}s^{-1}\AA^{-1}",
#                             "generic": "FLAM", "console": "FLAM"})
#
# fnu = au.def_unit(['fnu', 'FNU'], au.erg/au.s/au.cm**2/au.Hz,
#                     format={"latex": r"erg\,cm^{-2}s^{-1}Hz^{-1}",
#                             "generic": "FNU", "console": "FNU"})
#
# photlam = au.def_unit(['photlam', 'PHOTLAM'], au.photon/au.s/au.cm**2/au.AA,
#                         format={"latex": r"photon\,cm^{-2}s^{-1}\AA^{-1}",
#                         "generic": "PHOTLAM", "console": "PHOTLAM"})
#
# photnu = au.def_unit(['photnu', 'PHOTNU'], au.photon/au.s/au.cm**2/au.Hz,
#                         format={"latex": r"photon\,cm^{-2}s^{-1}Hz^{-1}",
#                         "generic": "PHOTNU", "console": "PHOTNU"})
#
# au.add_enabled_units([flam, fnu, photlam, photnu])


PARAMETER_LATEX_DICT = {'Flam': r'$F_{\lambda}$',
                        'Fnu': r'$F_{\nu}$',
                        'SN_line': r'$\frac{S}{N}_{line}$',
                        'SN_cont': r'$\frac{S}{N}_{cont}$'}


def mult_err_propagation(nominal_array, err_array, result):

    err_result = result * np.sqrt(np.sum(np.power(err_array/nominal_array, 2)))

    return err_result

# Number conversion to Roman style
def int_to_roman(num):
    i, roman_num = 0, ''
    while num > 0:
        for _ in range(num // VAL_LIST[i]):
            roman_num += SYB_LIST[i]
            num -= VAL_LIST[i]
        i += 1
    return roman_num


def pd_get(df, row, column, default=None, transform=None):

    # Fast get from dataframe
    try:
        cell = df.at[row, column]
    except KeyError:
        cell = default

    # Transform the value from the dataframe to the default if requested
    if transform is not None:
        cell = default if cell == transform else cell

    return cell


# Favoured method to get line fluxes according to resolution
def extract_fluxes(log, flux_type='mixture', sample_level='line', column_name='line_flux', column_positions=None):

    if flux_type not in ('mixture', 'intg', 'profile'):
        raise LiMe_Error(f'Flux type "{flux_type}" is not recognized please one of "intg", "profile", or "mixture" ')

    # Get indeces of blended lines
    if not isinstance(log.index, pd.MultiIndex):
        idcs_blended = (log['group_label'] != 'none') & (~log.index.str.endswith('_m'))
    else:
        if sample_level not in log.index.names:
            raise LiMe_Error(f'Input log does not have a index level with column "{sample_level}"')

        idcs_blended = (log['group_label'] != 'none') & (~log.index.get_level_values('line').str.endswith('_m'))

    # Mixture models: Integrated fluxes for all lines except blended
    if flux_type == 'mixture' and np.any(idcs_blended):
        obsFlux = log['intg_flux'].to_numpy(copy=True)
        obsErr = log['intg_flux_err'].to_numpy(copy=True)
        obsFlux[idcs_blended.values] = log.loc[idcs_blended.values, 'profile_flux'].to_numpy(copy=True)
        obsErr[idcs_blended.values] = log.loc[idcs_blended.values, 'profile_flux_err'].to_numpy(copy=True)

    # Use the one requested by the user
    else:
        obsFlux = log[f'{flux_type}_flux'].to_numpy(copy=True)
        obsErr = log[f'{flux_type}_flux_err'].to_numpy(copy=True)

    # Add columns to input dataframe
    output_fluxes = [obsFlux, obsErr]
    if column_name is not None:
        column_positions = 0 if column_positions is None else column_positions
        if column_name in log.columns:
            log.loc[f'{column_name}'] = output_fluxes[0]
            log.loc[f'{column_name}_err'] = output_fluxes[1]
        else:
            log.insert(loc=column_positions, column=f'{column_name}', value=output_fluxes[0])
            log.insert(loc=column_positions + 1, column=f'{column_name}_err', value=output_fluxes[1])
        function_return = None
    else:
        function_return = output_fluxes

    return function_return


def check_lines_normalization(input_lines, norm_line, log):

    # Single or multi-index behaviour
    single_index_check = not isinstance(log.index, pd.MultiIndex)

    # If not input lines use all of them
    if input_lines is None:
        input_lines = log.index.to_numpy() if single_index_check else log.index.get_level_values('line').unique()

    # In case there is only one input line as str
    input_lines = [input_lines] if isinstance(input_lines, str) else input_lines

    # 1) One-norm in norm_line  # 2) Multiple-norm in norm_line # 3) Multiple-norm in input_lines

    # Cases 1 and 2
    if norm_line is not None:

        # Unique normalization
        if isinstance(norm_line, str) or (len(norm_line) == 1):

            if single_index_check:
                line_list = list(log.loc[log.index.isin(input_lines)].index.to_numpy())
            else:
                line_list = list(log.loc[log.index.get_level_values('line').isin(input_lines)].index.get_level_values('line').unique())

            norm_list = [norm_line] * len(line_list)

        # Multiple normalizations
        else:
            if len(input_lines) and len(norm_line):
                line_list, norm_list = [], []
                candidate_lines = log.index if single_index_check else log.index.get_level_values('line')
                for i, line in enumerate(input_lines):
                    if (line in candidate_lines) and (norm_line[i] in candidate_lines):
                        line_list.append(line)
                        norm_list.append(norm_line[i])
            else:
                raise LiMe_Error(f'The number of normalization lines does not match the number of input lines:\n'
                                   f'- Model lines ({input_lines}): {input_lines}\n'
                                   f'- Norm lines  ({len(norm_line)}): {norm_line}')

    # Case 3
    else:

        # Split the nominators and denominators
        line_list, norm_list = [], []
        for ratio in input_lines:

            if '/' not in ratio:
                raise LiMe_Error(f'Input line list must use "/" with their normalization (for example H1_6563A/H1_4861A)\n'
                                   f'The input ratio: {ratio} does not have it. Try to specify a "norm_list" argument instead.')

            nomin_i, denom_i = ratio.replace(" ", "").split('/')
            line_list.append(nomin_i)
            norm_list.append(denom_i)

    return line_list, norm_list


# Compute the fluxes
def normalize_fluxes(log, line_list=None, norm_list=None, flux_column='profile_flux', column_name='line_flux',
                     column_position=0, column_normalization_name='norm_line', sample_levels=['id', 'line']):

    '''
    If the normalization line is not available, no operation is added.
    '''

    # Check columns present in log
    if (flux_column not in log.columns) or (f'{flux_column}_err' not in log.columns):
        raise LiMe_Error(f'Input log is missing "{flux_column}" or "{flux_column}_err" columns')

    # Check the normalization for the lines
    line_array, norm_array = check_lines_normalization(line_list, norm_list, log)

    # Add new columns if necessary
    if column_name not in log.columns:
        log.insert(loc=column_position, column=f'{column_name}', value=np.nan)
        log.insert(loc=column_position+1, column=f'{column_name}_err', value=np.nan)

    # Add new line with normalization by default
    if column_normalization_name not in log.columns:
        log.insert(loc=column_position+2, column=column_normalization_name, value='none')

    # Loop throught the lines to compute their normalization
    single_index = not isinstance(log.index, pd.MultiIndex)
    for i in np.arange(len(line_array)):

        numer, denom = line_array[i], norm_array[i]
        numer_flux, denom_flux = None, None

        # Single-index dataframe
        if single_index:
            idcs_ratios = numer
            if (numer in log.index) and (denom in log.index):
                numer_flux = log.loc[numer, flux_column]
                numer_err = log.loc[numer, f'{flux_column}_err']

                denom_flux = log.loc[denom, flux_column]
                denom_err = log.loc[denom, f'{flux_column}_err']

        #Multi-index dataframe
        else:

            if numer == denom:  # Same line normalization
                idcs_slice = log.index.get_level_values(sample_levels[-1]) == numer
                df_slice = log.loc[idcs_slice]
            else:  # Rest cases
                idcs_slice = log.index.get_level_values(sample_levels[-1]).isin([numer, denom])
                grouper = log.index.droplevel('line')
                idcs_slice = pd.Series(idcs_slice).groupby(grouper).transform('sum').ge(2).to_numpy()
                df_slice = log.loc[idcs_slice]

            # Get fluxes
            if df_slice.size > 0:
                num_slice = df_slice.xs(numer, level=sample_levels[-1], drop_level=False)
                numer_flux = num_slice[flux_column].to_numpy()
                numer_err = num_slice[f'{flux_column}_err'].to_numpy()

                denom_slice = df_slice.xs(denom, level=sample_levels[-1], drop_level=False)
                denom_flux = denom_slice[flux_column].to_numpy()
                denom_err = denom_slice[f'{flux_column}_err'].to_numpy()

                idcs_ratios = num_slice.index

        # Compute the ratios with error propagation
        ratio_array, ratio_err = None, None
        if (numer_flux is not None) and (denom_flux is not None):
            ratio_array = numer_flux / denom_flux
            ratio_err = ratio_array * np.sqrt(np.power(numer_err / numer_flux, 2) + np.power(denom_err / denom_flux, 2))

        # Store in dataframe (with empty columns)
        if (ratio_array is not None) and (ratio_err is not None):
            log.loc[idcs_ratios, f'{column_name}'] = ratio_array
            log.loc[idcs_ratios, f'{column_name}_err'] = ratio_err

            # Store normalization line
            if column_normalization_name is not None:
                log.loc[idcs_ratios, f'{column_normalization_name}'] = denom

    return


# Get Weighted redshift from lines
def redshift_calculation(input_log, line_list=None, weight_parameter=None, obj_label='spec_0'):

    #TODO accept LiME objects as imput log

    # Check the weighted parameter presence
    if weight_parameter is not None:
        if weight_parameter not in input_log.columns:
            raise LiMe_Error(f'The parameter {weight_parameter} is not found on the input lines log headers')

    # Check input line is not a string
    line_list = np.array(line_list, ndmin=1) if isinstance(line_list, str) else line_list

    # Check if single or multi-index
    sample_check = isinstance(input_log.index, pd.MultiIndex)

    if sample_check:
        levels = input_log.index.names
        id_list = input_log.index.droplevel(levels[-1]).unique()
    else:
        id_list = np.array([obj_label])
        levels = None

    # Container for redshifts
    z_df = pd.DataFrame(index=id_list, columns=['z_mean', 'z_std', 'lines', 'weight'])
    if sample_check:
        z_df.rename_axis(index=levels[:-1], inplace=True)

    # Loop through the ids
    for idx in id_list:

        # Slice to the object log
        if not sample_check:
            df_slice = input_log
        else:
            df_slice = input_log.xs(idx, level=levels[0])

        # Get the lines requested
        if line_list is not None:
            idcs_slice = df_slice.index.isin(line_list)
            df_slice = df_slice.loc[idcs_slice]
        else:
            df_slice = df_slice

        # Check the line has lines
        n_lines = len(df_slice.index)
        if n_lines > 0:
            z_array = (df_slice['center']/df_slice['wavelength'] - 1).to_numpy()
            obsLineList = ','.join(df_slice.index.values)

            # Just one line
            if n_lines == 1:
                z_mean = z_array[0]
                z_std = df_slice.center_err.to_numpy()[0]/df_slice.wavelength.to_numpy()[0]

            # Multiple lines
            else:

                # Not weighted parameter
                if weight_parameter is None:
                    z_mean = z_array.mean()
                    z_std = z_array.std()

                # With a weighted parameter
                else:
                    w_array = df_slice[weight_parameter]
                    z_err_array = df_slice.center_err.to_numpy()/df_slice.wavelength.to_numpy()

                    z_mean = np.sum(w_array * z_array)/np.sum(w_array)
                    z_std = np.sqrt(np.sum(np.power(w_array, 2) * np.power(z_err_array, 2)) / np.sum(np.power(w_array, 2)))

        else:
            z_mean, z_std, obsLineList = np.nan, np.nan, None

        # Add to dataframe
        z_df.loc[idx, 'z_mean':'weight'] = z_mean, z_std, obsLineList, weight_parameter

    return z_df


def compute_FWHM0(idx_peak, spec_flux, delta_wave, cont_flux, emission_check=True):

    """

    :param idx_peak:
    :param spec_flux:
    :param delta_wave:
    :param cont_flux:
    :param emission_check:
    :return:
    """

    i = idx_peak
    i_final = 0 if delta_wave < 0 else spec_flux.size - 1

    if emission_check:
        while (spec_flux[i] >= cont_flux[i]) and (i != i_final):
            i += delta_wave
    else:
        while (spec_flux[i] <= cont_flux[i]) and (i != i_final):
            i += delta_wave

    return i


def blended_label_from_log(line, log):

    # Default values: single line
    blended_check = False
    group_label = None

    if line in log.index:

        if 'group_label' in log.columns:

            if log.loc[line, 'group_label'] == 'none':
                group_label = None
            elif line.endswith('_m'):
                group_label = log.loc[line, 'group_label']
            else:
                blended_check = True
                group_label = log.loc[line, 'group_label']
    else:
        # TODO this causes and error if we forget the '_b' componentes in the configuration file need to check input cfg
        _logger.warning(f'The line {line} was not found on the input log. If you are specifying the components of a '
                        f'blended line in the fitting configuration, make sure you are not missing the "_b" subscript')

    return blended_check, group_label


def latex_science_float(f, dec=2):
    float_str = f'{f:.{dec}g}'
    if "e" in float_str:
        base, exponent = float_str.split("e")
        return r"{0} \times 10^{{{1}}}".format(base, int(exponent))
    else:
        return float_str


[docs] def unit_conversion(in_units, out_units, wave_array=None, flux_array=None, dispersion_units=None, decimals=None): """ This function converts the input array (wavelength or flux) ``in_units`` into the requested ``out_units``. .. attention:: Due to the nature of the ``flux_array``, the user also needs to include the ``wave_array`` and its units in the ``dispersion_units``. units The user can also provide the number of ``decimals`` to round the output array. :param in_units: Input array units :type in_units: str :param out_units: Output array untis :type out_units: str :param wave_array: Wavelength array :type wave_array: numpy.array :param flux_array: Flux array :type flux_array: numpy.array :param dispersion_units: :type dispersion_units: :param decimals: Number of decimals. :type decimals: int, optional """ # Converting the wavelength array if flux_array is None: input_mask = wave_array.mask if np.ma.isMaskedArray(wave_array) else None input_array = wave_array * in_units if input_mask is None else wave_array.data * in_units output_array = input_array.to(au.Unit(out_units)) output_array = output_array.value # Remove the units # Converting the flux array else: input_mask = flux_array.mask if np.ma.isMaskedArray(flux_array) else None input_array = flux_array * in_units if input_mask is None else flux_array.data * in_units w_array = wave_array.data * dispersion_units if np.ma.isMaskedArray(wave_array) else wave_array * dispersion_units output_array = input_array.to(au.Unit(out_units), au.spectral_density(w_array)) output_array = output_array.value # Remove the units # Reapply the mask output_array = output_array if input_mask is None else np.ma.masked_array(output_array, input_mask) # Round to decimal places output_array = output_array if decimals is None else np.round(output_array, decimals) return output_array
def refraction_index_air_vacuum(wavelength_array, units='A'): # TODO add warnings issues with units refraction_index = (1 + 1e-6 * (287.6155 + 1.62887 / np.power(wavelength_array * 0.0001, 2) + 0.01360 / np.power(wavelength_array * 0.0001, 4))) return refraction_index def format_line_mask_option(entry_value, wave_array): # Check if several entries formatted_value = entry_value.split(',') if ',' in entry_value else [f'{entry_value}'] # Check if interval or single pixel mask for i, element in enumerate(formatted_value): if '-' in element: formatted_value[i] = element.split('-') else: element = float(element) pix_width = (np.diff(wave_array).mean())/2 formatted_value[i] = [element-pix_width, element+pix_width] formatted_value = np.array(formatted_value).astype(float) return formatted_value def define_masks(wavelength_array, masks_array, merge_continua=True, line_mask_entry='no', line=None): # Make sure it is a matrix # TODO warning for mask outside limimes # masks_array = np.array(masks_array, ndmin=2) masks_array = np.atleast_2d(masks_array) if np.any(masks_array[:, 0] < wavelength_array[0]) or np.any(masks_array[:, 5] > wavelength_array[-1]): warn_message = ('below', wavelength_array[:, 0] if np.any(masks_array[:, 0] < wavelength_array[0]) else 'above', masks_array[:, 5]) _logger.warning(f'Bands for {line} are {warn_message[0]} the wavelength range {warn_message[1]}.') # Check if it is a masked array if np.ma.isMaskedArray(wavelength_array): wave_arr = wavelength_array.data else: wave_arr = wavelength_array # Remove masked pixels from this function wavelength array if line_mask_entry != 'no': # Convert cfg mask string to limits line_mask_limits = format_line_mask_option(line_mask_entry, wave_arr) # Get masked indeces idcsMask = (wave_arr[:, None] >= line_mask_limits[:, 0]) & (wave_arr[:, None] <= line_mask_limits[:, 1]) idcsValid = ~idcsMask.sum(axis=1).astype(bool)[:, None] else: idcsValid = np.ones(wave_arr.size).astype(bool)[:, None] # Find indeces for six points in spectrum idcsW = np.searchsorted(wave_arr, masks_array) # Emission region idcsLineRegion = ((wave_arr[idcsW[:, 2]] <= wave_arr[:, None]) & (wave_arr[:, None] <= wave_arr[idcsW[:, 3]]) & idcsValid).squeeze() # Return left and right continua merged in one array if merge_continua: idcsContRegion = (((wave_arr[idcsW[:, 0]] <= wave_arr[:, None]) & (wave_arr[:, None] <= wave_arr[idcsW[:, 1]])) | ((wave_arr[idcsW[:, 4]] <= wave_arr[:, None]) & ( wave_arr[:, None] <= wave_arr[idcsW[:, 5]])) & idcsValid).squeeze() outputs = idcsLineRegion, idcsContRegion # Return left and right continua in separated arrays else: idcsContLeft = ((wave_arr[idcsW[:, 0]] <= wave_arr[:, None]) & (wave_arr[:, None] <= wave_arr[idcsW[:, 1]]) & idcsValid).squeeze() idcsContRight = ((wave_arr[idcsW[:, 4]] <= wave_arr[:, None]) & (wave_arr[:, None] <= wave_arr[idcsW[:, 5]]) & idcsValid).squeeze() outputs = idcsLineRegion, idcsContLeft, idcsContRight return outputs
[docs] def join_fits_files(log_file_list, output_address, delete_after_join=False, levels=['id', 'line']): """ This functions combines multiple log files into single *.fits* file. The user can request to the delete the individual logs after the individual logs have been combined. If the case of individual *.fits* the function loop through the individual HDU and add them to the output file. Currently, this is not available to other multi-page files (such as .xlsx or .asdf) :param log_file_list: Input list of log files. :type log_file_list: list :param output_address: String or path for the output combined log file. :type output_address: str, Path, optional :param delete_after_join: Delete individual files after joining them. The default value is False :type output_address: bool, optional :param levels: Indexes name list for MultiIndex dataframes. The default value is ['id', 'line']. :type levels: list, optional :return: """ # Confirm is a path output_address = Path(output_address) # Create new HDU for the combined file with a new PrimaryHDU hdulist = fits.HDUList([fits.PrimaryHDU()]) # Progress bar n_log = len(log_file_list) pbar = ProgressBar('bar', f'log files combined') # Iterate through the file paths, open each FITS file, and append the non-primary HDUs to hdulist missing_files = [] for i, log_path in enumerate(log_file_list): log_path = Path(log_path) pbar.output_message(i, n_log, pre_text="", post_text=None) if log_path.is_file(): ext = log_path.suffix # Fits file if ext == '.fits': with fits.open(log_path) as hdul: for j, hdu in enumerate(hdul): if j == 0: if not isinstance(hdu, fits.PrimaryHDU): hdu_i = fits.BinTableHDU(data=hdu.data, header=hdu.header, name=hdu.name, character_as_bytes=False) else: hdu_i = None else: hdu_i = fits.BinTableHDU(data=hdu.data, header=hdu.header, name=hdu.name, character_as_bytes=False) # Append if hdu_i is not None: hdulist.append(hdu_i) # Remaining types else: df_i = load_frame(log_path, levels=levels) name_i = log_path.stem hdu_i = log_to_HDU(df_i, ext_name=name_i) # Append if hdu_i is not None: hdulist.append(hdu_i) # Append to the list # with fits.open(log_path) as hdulist_i: # # for i, hdu in enumerate(hdulist_i): # if i == 0: # if not isinstance(hdu, fits.PrimaryHDU): # hdulist.append(fits.TableHDU(data=hdu.data, header=hdu.header, name=hdu.name)) # else: # hdulist.append(fits.TableHDU(data=hdu.data, header=hdu.header, name=hdu.name)) else: missing_files.append(log_path) # Save to a combined file hdulist.writeto(output_address, overwrite=True, output_verify='ignore') hdulist.close() # Warn of missing files if len(missing_files) > 0: _logger.info(f"Warning these files were missing: {missing_files}") # Delete individual files if requested if delete_after_join: if len(missing_files) == 0: for log_path in log_file_list: log_path.unlink() else: _logger.info("The individual masks won't be deleted") return
def check_units(units_wave, units_flux): # Check if input are already astropy units units_wave = au.Unit(units_wave) if not isinstance(units_wave, (IrreducibleUnit, CompositeUnit, Unit)) else units_wave units_flux = au.Unit(units_flux) if not isinstance(units_flux, (IrreducibleUnit, CompositeUnit, Unit)) else units_flux return units_wave, units_flux def save_parameter_maps(lines_log_file, output_folder, param_list, line_list, mask_file=None, mask_list='all', image_shape=None, log_ext_suffix='_LINELOG', spaxel_fill_value=np.nan, output_file_prefix=None, header=None, wcs=None): """ This function converts a line measurements log from an IFS cube, into a set of 2D parameter maps. The parameter ".fits" files are saved into the ``output_folder``. These files are named after the parameters in the ``param_list`` with the optional prefix from the ``output_file_prefix`` argument. These files will have one page per line in the ``line_list`` argument. The user can provide a spatial mask file address from which to recover the spaxels with line measurements. If the mask ``.fits`` file contains several pages, the user can provide a ``mask_list`` with the ones to explore. Otherwise, all mask pages will be used. .. attention:: The user can provide an ``image_shape`` tuple to generate the output parameter map. However, for a large image size this approach may require a long time to query the log file pages. The expected page name in the input ``lines_log_file`` is "idx_j-idx_i_log_ext_suffix" where "idx_j" and "idx_i" are the y and x array coordinates of the cube coordinates, by default ``log_ext_suffix='_LINELOG'``. The output ``.fits`` parameter page header includes the ``PARAM`` and ``LINE`` entries with the line and parameter labels respectively. The user should also include a ``wcs`` argument to export the astronomical coordinates to the output files. The user can add additional information via the ``header`` argument. :param lines_log_file: Fits file with IFU cube line measurements. :param lines_log_file: str, pathlib.Path :param param_list: List of parameters to map :param param_list: list :param line_list: List of lines to map :param line_list: list :param output_folder: Output folder to save the maps :param output_folder: str, pathlib.Path :param mask_file: Address of binary spatial mask file :type mask_file: str, pathlib.Path :param mask_list: Mask name list to explore on the ``mask_file``. :type mask_list: list, optional :param image_shape: Array with the image spatial size. :param image_shape: list, tuple, optional :param spaxel_fill_value: Map filling value for empty pixels. The default value is "numpy.nan". :param spaxel_fill_value: float, optional :param log_ext_suffix: Suffix for the lines log extension. The default value is "_LINELOG" :param log_ext_suffix: str, optional :param output_file_prefix: Prefix for the output parameter ".fits" file. The default value is None. :param output_file_prefix: str, optional :param header: Dictionary for parameter ".fits" file header :type header: dict, optional :param wcs: Observation `world coordinate system <https://docs.astropy.org/en/stable/wcs/index.html>`_. :type wcs: astropy WCS, optional """ assert Path(lines_log_file).is_file(), f'- ERROR: lines log at {lines_log_file} not found' assert Path(output_folder).is_dir(), f'- ERROR: Output parameter maps folder {output_folder} not found' # Compile the list of voxels to recover the provided masks if mask_file is not None: assert Path(mask_file).is_file(), f'- ERROR: mask file at {mask_file} not found' with fits.open(mask_file) as maskHDUs: # Get the list of mask extensions if mask_list == 'all': if ('PRIMARY' in maskHDUs) and (len(maskHDUs) > 1): mask_list = [] for i, HDU in enumerate(maskHDUs): mask_name = HDU.name if mask_name != 'PRIMARY': mask_list.append(mask_name) mask_list = np.array(mask_list) else: mask_list = np.array(['PRIMARY']) else: mask_list = np.array(mask_list, ndmin=1) # Combine all the mask voxels into one for i, mask_name in enumerate(mask_list): if i == 0: mask_array = maskHDUs[mask_name].data image_shape = mask_array.shape else: assert image_shape == maskHDUs[mask_name].data.shape, '- ERROR: Input masks do not have the same dimensions' mask_array += maskHDUs[mask_name].data # Convert to boolean mask_array = mask_array.astype(bool) # List of spaxels in list [(idx_j, idx_i), ...] format spaxel_list = np.argwhere(mask_array) # No mask file is provided and the user just defines an image size tupple (nY, nX) else: mask_array = np.ones(image_shape).astype(bool) spaxel_list = np.argwhere(mask_array) # Generate containers for the data: images_dict = {} for param in param_list: # Make sure is an array and loop throuh them for line in line_list: images_dict[f'{param}-{line}'] = np.full(image_shape, spaxel_fill_value) # Loop through the spaxels and fill the parameter images n_spaxels = spaxel_list.shape[0] spaxel_range = np.arange(n_spaxels) pbar = ProgressBar('bar', f'spaxels from file') with fits.open(lines_log_file) as logHDUs: for i_spaxel in spaxel_range: idx_j, idx_i = spaxel_list[i_spaxel] spaxel_ref = f'{idx_j}-{idx_i}{log_ext_suffix}' post_text = f'of spaxels from file ({lines_log_file}) read ({n_spaxels} total spaxels)' pbar.output_message(i_spaxel, n_spaxels, pre_text="", post_text=post_text) # progress_bar(i_spaxel, n_spaxels, post_text=f'of spaxels from file ({lines_log_file}) read ({n_spaxels} total spaxels)') # Confirm log extension exists if spaxel_ref in logHDUs: # Recover extension data log_data = logHDUs[spaxel_ref].data log_lines = log_data['index'] # Loop through the parameters and the lines: for param in param_list: idcs_log = np.argwhere(np.in1d(log_lines, line_list)) for i_line in idcs_log: images_dict[f'{param}-{log_lines[i_line][0]}'][idx_j, idx_i] = log_data[param][i_line][0] # New line after the rustic progress bar print() # Recover coordinates from the wcs to store in the headers: hdr_coords = extract_wcs_header(wcs, drop_axis='spectral') # Save the parameter maps as individual fits files with one line per page output_file_prefix = '' if output_file_prefix is None else output_file_prefix for param in param_list: # Primary header paramHDUs = fits.HDUList() paramHDUs.append(fits.PrimaryHDU()) # ImageHDU for the parameter maps for line in line_list: # Create page header with the default data hdr_i = fits.Header() hdr_i['LINE'] = (line, 'Line label') hdr_i['PARAM'] = (param, 'LiMe parameter label') # Add WCS information if hdr_coords is not None: hdr_i.update(hdr_coords) # Add user information if header is not None: page_hdr = header.get(f'{param}-{line}', None) page_hdr = header if page_hdr is None else page_hdr hdr_i.update(page_hdr) # Create page HDU entry HDU_i = fits.ImageHDU(name=line, data=images_dict[f'{param}-{line}'], header=hdr_i, ver=1) paramHDUs.append(HDU_i) # Write to new file output_file = Path(output_folder)/f'{output_file_prefix}{param}.fits' paramHDUs.writeto(output_file, overwrite=True, output_verify='fix') return def extract_wcs_header(wcs, drop_axis=None): if wcs is not None: # Remove 3rd dimensional axis if present if drop_axis is not None: if drop_axis == 'spectral': input_wcs = wcs.dropaxis(2) if wcs.naxis == 3 else wcs elif drop_axis == 'spatial': if wcs.naxis == 3: input_wcs = wcs.dropaxis(1) input_wcs = wcs.dropaxis(0) else: raise LiMe_Error(f'Fits coordinates axis: "{drop_axis}" not recognized. Please use: "spectral" or' f' "spatial"') else: input_wcs = wcs # Convert to HDU header hdr_coords = input_wcs.to_fits()[0].header else: hdr_coords = None return hdr_coords class ProgressBar: def __init__(self, message_type=None, count_type='bar'): self.output_message = None self.count_type = count_type if message_type is None: self.output_message = self.no_output if message_type == 'bar': self.output_message = self.progress_bar return def progress_bar(self, i, i_max, pre_text, post_text, n_bar=10): post_text = "" if post_text is None else post_text j = (i + 1) / i_max stdout.write('\r') message = f'[{"=" * int(n_bar * j):{n_bar}s}] {int(100 * j)}% of {self.count_type} {post_text}' stdout.write(message) stdout.flush() return def no_output(self, i, i_max, pre_text, post_text, n_bar=10): return