Source code for lime.io

__all__ = [
    'save_cfg',
    'load_cfg',
    'load_frame',
    'load_spatial_mask',
    'save_frame',
    'log_parameters_calculation',
    'log_to_HDU',
    '_LOG_EXPORT_RECARR',
    '_LOG_EXPORT',
    '_LOG_COLUMNS',
    'lime_cfg']

import os
import configparser
import logging
import numpy as np
import pandas as pd

from sys import exit
from pathlib import Path
from collections.abc import Sequence

from astropy.io import fits
from lime.archives.tables import table_fluxes

try:
    import openpyxl
    openpyxl_check = True
    from openpyxl.utils.dataframe import dataframe_to_rows

except ImportError:
    openpyxl_check = False

try:
    import asdf
    asdf_check = True
except ImportError:
    asdf_check = False


try:
    import tomllib
except ModuleNotFoundError:
    import tomli as tomllib

try:
    import toml
    toml_check = True
except ImportError:
    toml_check = False

_logger = logging.getLogger('LiMe')

# Reading file with the format and export status for the measurements
_LIME_FOLDER = Path(__file__).parent
_params_table_file = _LIME_FOLDER/'resources/types_params.txt'
_PARAMS_CONF_TABLE = pd.read_csv(_params_table_file, sep=r'\s+', header=0, index_col=0)

# Read lime configuration .toml
_conf_path = _LIME_FOLDER/'lime.toml'
with open(_conf_path, mode="rb") as fp:
    lime_cfg = tomllib.load(fp)

# Convert null entries to None
for obs_type in ('long_slit', 'cube'):
    for inst in lime_cfg[f'instrument_params'][obs_type].keys():
        for param, value in lime_cfg[f'instrument_params'][obs_type][inst].items():
            if value == 'null':
                lime_cfg[f'instrument_params'][obs_type][inst][param] = None

# Dictionary with the parameter formart
_LOG_COLUMNS = dict(zip(_PARAMS_CONF_TABLE.index.values,
                        _PARAMS_CONF_TABLE.loc[:, 'Norm_by_flux':'dtype'].values))

# Parameters notation latex formatDictionary with the parameter formart
_LOG_COLUMNS_LATEX = dict(zip(_PARAMS_CONF_TABLE.index.values,
                          _PARAMS_CONF_TABLE.loc[:, 'latex_label'].values))

# Array with the parameters to be included in the output log
_LOG_EXPORT = _PARAMS_CONF_TABLE.loc[_PARAMS_CONF_TABLE.Export_log.to_numpy().astype(bool)].index.to_numpy()
_LOG_EXPORT_TYPES = _PARAMS_CONF_TABLE.loc[_PARAMS_CONF_TABLE.Export_log.to_numpy().astype(bool)].dtype.to_numpy()
_LOG_EXPORT_DICT = dict(zip(_LOG_EXPORT, _LOG_EXPORT_TYPES))
_LOG_EXPORT_RECARR = np.dtype(list(_LOG_EXPORT_DICT.items()))

# Attributes from the profile fittings
_ATTRIBUTES_PROFILE = _PARAMS_CONF_TABLE.loc[_PARAMS_CONF_TABLE.Profile_attributes.to_numpy().astype(bool)].index.to_numpy()
_RANGE_PROFILE_FIT = np.arange(_ATTRIBUTES_PROFILE.size)

# Attributes with measurements for log
_ATTRIBUTES_FIT = _PARAMS_CONF_TABLE.loc[_PARAMS_CONF_TABLE.Fit_attributes.to_numpy().astype(bool)].index.to_numpy()
_RANGE_ATTRIBUTES_FIT = np.arange(_ATTRIBUTES_FIT.size)

# Dictionary with the parameter dtypes
_LOG_TYPES_DICT = dict(zip(_PARAMS_CONF_TABLE.index.to_numpy(),
                           _PARAMS_CONF_TABLE.dtype.to_numpy()))


class LiMe_Error(Exception):
    """LiMe exception function"""


def strtobool(val):
    """Convert a string representation of truth to true (1) or false (0).

    True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
    are 'n', 'no', 'f', 'false', 'off', and '0'.  Raises ValueError if
    'val' is anything else.
    """
    val = val.lower()
    if val in ('y', 'yes', 't', 'true', 'on', '1'):
        return 1
    elif val in ('n', 'no', 'f', 'false', 'off', '0'):
        return 0
    else:
        raise ValueError("invalid truth value %r" % (val,))


def hdu_to_log_df(file_path, page_name):

    with fits.open(file_path) as hdul:
        hdu_log = hdul[page_name].data

    df_log = pd.DataFrame.from_records(data=hdu_log, index='index')

    return df_log


def parse_lime_cfg(toml_cfg, fit_cfg_suffix='_line_fitting'):

    # Convert the configuration entries from the string format if possible
    if fit_cfg_suffix is not None:
        for section, items in toml_cfg.items():
            if section.endswith(fit_cfg_suffix):
                for i_key, i_value in items.items():
                    try:
                        toml_cfg[section][i_key] = format_option_value(i_value, i_key, section)
                    except:
                        _logger.critical(f'Failure to convert entry: "{i_key} = {i_value}" at section [{section}] ')

    return toml_cfg


# Function to load configuration file
[docs] def load_cfg(file_address, fit_cfg_suffix='_line_fitting'): """ Load a LiMe configuration file (TOML) and normalize LiMe-specific sections. This reads a TOML configuration file and, for any section whose name ends with `fit_cfg_suffix`, converts its key/value pairs to the formats expected by LiMe's line-fitting routines. Parameters ---------- file_address : str or pathlib.Path Path to the input configuration file (.toml). fit_cfg_suffix : str, optional Section-name suffix that identifies LiMe line-fitting configuration blocks to be normalized. Default is ``"_line_fitting"``. Returns ------- dict Parsed configuration mapping with LiMe sections converted where applicable. Raises ------ LiMe_Error If the file does not exist at `file_address`. tomllib.TOMLDecodeError If the TOML file cannot be parsed. Notes ----- - Sections ending with `fit_cfg_suffix` are passed to ``parse_lime_cfg`` for normalization. - If an item within a `fit_cfg_suffix` section cannot be converted, a critical warning is emitted (handled inside ``parse_lime_cfg``), but the rest of the configuration is still returned when possible. - Requires Python 3.11+ for ``tomllib``. Examples -------- >>> cfg = load_cfg("settings.toml") >>> cfg["my_model_line_fitting"]["method"] 'gaussian' """ file_path = Path(file_address) # Open the file if file_path.is_file(): # Toml file with open(file_path, mode="rb") as fp: cfg_lime = tomllib.load(fp) else: raise LiMe_Error(f'The configuration file was not found at: {file_address}') # Convert the configuration entries from the string format if possible cfg_lime = parse_lime_cfg(cfg_lime, fit_cfg_suffix) return cfg_lime
# Function to save SpecSyzer configuration file def save_cfg(output_file, param_dict, section_name=None, clear_section=False): """ This function safes the input dictionary into a configuration file. If no section is provided the input dictionary overwrites the data """ # TODO for line_fitting/models save dictionaries as line output_path = Path(output_file) if output_path.suffix == '.toml': # TODO review convert numpy arrays and floats64 if toml_check: toml_dict = param_dict if section_name is None else {section_name: param_dict} with open(output_file, "w") as f: toml.dump(toml_dict, f) else: raise LiMe_Error(f'toml library is not installed. Toml files cannot be saved') # Creating a new file (overwritting old if existing) else: if section_name == None: # Check all entries are dictionaries values_list = [*param_dict.values()] section_check = all(isinstance(x, dict) for x in values_list) assert section_check, f'ERROR: Dictionary for {output_file} cannot be converted to configuration file. Confirm all its values are dictionaries' output_cfg = configparser.ConfigParser() output_cfg.optionxform = str # Loop throught he sections and options to create the files for section_name, options_dict in param_dict.items(): output_cfg.add_section(section_name) for option_name, option_value in options_dict.items(): option_formatted = formatStringOutput(option_value, option_name, section_name) output_cfg.set(section_name, option_name, option_formatted) # Save to a text format with open(output_file, 'w') as f: output_cfg.write(f) # Updating old file else: # Confirm file exists file_check = os.path.isfile(output_file) # Load original cfg if file_check: output_cfg = configparser.ConfigParser() output_cfg.optionxform = str output_cfg.read(output_file) # Create empty cfg else: output_cfg = configparser.ConfigParser() output_cfg.optionxform = str # Clear section upon request if clear_section: if output_cfg.has_section(section_name): output_cfg.remove_section(section_name) # Add new section if it is not there if not output_cfg.has_section(section_name): output_cfg.add_section(section_name) # Map key values to the expected format and store them for option_name, option_value in param_dict.items(): option_formatted = formatStringOutput(option_value, option_name, section_name) output_cfg.set(section_name, option_name, option_formatted) # Save to a text file with open(output_file, 'w') as f: output_cfg.write(f) return
[docs] def load_frame(fname, page: str = 'FRAME', levels: list = ['id', 'line']): """ Loads a lines frame (pandas.DataFrame) from various file formats. Supported inputs include plain text tables, CSV, FITS HDUs, Excel sheets, ASDF nodes, and Streamlit ``UploadedFile`` objects. If the resulting table contains the columns listed in ``levels``, a MultiIndex is reconstructed by setting those columns as the index. Parameters ---------- fname : str or pathlib.Path or UploadedFile Path to the lines log file or a Streamlit ``UploadedFile``. When a path is provided, the file type is inferred from the suffix. page : str, optional HDU name (FITS) / sheet name (Excel) / node key (ASDF) to read from. Used only for ``.fits``, ``.xlsx``/``.xls``, and ``.asdf`` inputs. Default is ``"FRAME"``. levels : list of str, optional Column names to use for reconstructing a MultiIndex via ``DataFrame.set_index``. If all names in ``levels`` are present as columns, they are set as the index. Default is ``["id", "line"]``. Returns ------- pandas.DataFrame The loaded lines log. For text/CSV/Excel/ASDF the first column is treated as the initial index during read; if all ``levels`` are present as columns, they are set as a MultiIndex on return. Raises ------ LiMe_Error If ``fname`` is a path and the file does not exist. SystemExit If the file exists but cannot be opened/parsed (wraps a ``ValueError``). Notes ----- Detected formats and readers: * ``.fits``: read via ``hdu_to_log_df(log_path, page)``. * ``.xlsx`` / ``.xls``: read via ``pandas.read_excel(..., sheet_name=page, header=0, index_col=0)``. * ``.asdf``: read node ``page`` and build a DataFrame from records; index set from the ``"index"`` field. * ``.txt``: whitespace-separated via ``pandas.read_csv(..., sep=r"\\s+", header=0, index_col=0, comment="#")``. * ``.csv``: comma-separated via ``pandas.read_csv(..., sep=",", header=0, index_col=0, comment="#")``. * ``UploadedFile`` (Streamlit): treated like a whitespace-separated text table. Examples -------- Load an Excel sheet named ``FRAME`` and restore a MultiIndex of (``id``, ``line``): >>> df = load_frame("lines.xlsx", page="FRAME", levels=["id", "line"]) Load a FITS HDU named ``FRAME``: >>> df = load_frame("lines.fits", page="FRAME") """ # Check file is at path if type(fname).__name__ != 'UploadedFile': log_path = Path(fname) if not log_path.is_file(): raise LiMe_Error(f'No lines log found at {fname}\n') file_name, file_type = log_path.name, log_path.suffix else: file_name, file_type = fname, 'UploadedFile' try: # Fits file: if file_type == '.fits': log = hdu_to_log_df(log_path, page) # Excel table elif file_type in ['.xlsx' or '.xls']: log = pd.read_excel(log_path, sheet_name=page, header=0, index_col=0) # ASDF file elif file_type == '.asdf': with asdf.open(log_path) as af: log_RA = af[page] log = pd.DataFrame.from_records(log_RA, columns=log_RA.dtype.names) log.set_index('index', inplace=True) # # Change 'nan' to np.nan # idcs_nan_str = log['group_label'] == 'none' # log.loc[idcs_nan_str, 'group_label'] = None # Text file elif file_type == '.txt': log = pd.read_csv(log_path, sep=r'\s+', header=0, index_col=0, comment='#') # Uploaded file from streamlit elif file_type == 'UploadedFile': log = pd.read_csv(file_name, sep=r'\s+', header=0, index_col=0, comment='#') elif file_type == '.csv': log = pd.read_csv(log_path, sep=',', header=0, index_col=0, comment='#') else: _logger.warning(f'File type {file_type} is not recognized. This can cause issues reading the log.') log = pd.read_csv(log_path, sep=r'\s+', header=0, index_col=0) except ValueError as e: exit(f'\nERROR: LiMe could not open {file_type} file at {log_path}\n{e}') # Restore levels if multi-index if log.columns.isin(levels).sum() == len(levels): log.set_index(levels, inplace=True) return log
[docs] def save_frame(fname, dataframe, page='FRAME', parameters='all', header=None, column_dtypes=None, safe_version=True, **kwargs): """ Save a lines frame (pandas.DataFrame) to disk in one of several supported formats. The output format is inferred from the file extension. Supported formats include plain text tables, FITS HDUs, ASDF trees, and Excel sheets. Optional metadata such as FITS/ASDF headers and custom column data types can be provided. Parameters ---------- fname : str or pathlib.Path Destination file path. The extension determines the file format. Supported extensions are ``.txt``, ``.fits``, ``.asdf``, and ``.xlsx``. dataframe : pandas.DataFrame Lines log to be saved. parameters : list or {"all"}, optional Columns to include in the output. If ``"all"``, all DataFrame columns are written. Default is ``"all"``. page : str, optional HDU name (for FITS) or sheet name (for Excel) to use when writing. Default is ``"FRAME"``. header : dict, optional Additional metadata to include in the output file. For FITS and ASDF files, this dictionary is added to the file header. column_dtypes : str, type, or dict, optional Data type conversion mapping for the output FITS record array. - If a string or type, all columns are cast to that type. - If a dictionary, specify a mapping of column names or indices (zero-indexed) to their desired data types. This argument overrides LiMe’s default FITS formatting. safe_version : bool, optional If ``True``, the current LiMe version is saved as a footnote or page header in the output log. Default is ``True``. Raises ------ ValueError If the file extension is unsupported or the DataFrame cannot be written in the specified format. Notes ----- - For FITS and Excel outputs, the target HDU or sheet is named according to ``page``. - FITS headers can be extended using ``header``. - Custom column data types can be enforced via ``column_dtypes``. - The function ensures compatibility with LiMe’s internal data formats. Examples -------- Save a DataFrame to a FITS file with a custom header: >>> header = {"OBSERVER": "V. PΓ©rez", "INSTRUME": "MEGARA"} >>> save_frame("lines.fits", df, header=header) Save selected columns to an Excel sheet named ``FRAME``: >>> save_frame("lines.xlsx", df, parameters=["profile_flux", "profile_flux_err", "eqw"], page="FRAME") """ # Confirm file path exits log_path = Path(fname) assert log_path.parent.exists(), LiMe_Error(f'- ERROR: Output lines log folder not found: {log_path.parent}') file_name, file_type = log_path.name, log_path.suffix if len(dataframe.index) > 0: # In case of multi-index dataframe # TODO add option to lime.save_frame that the input frame is a lime.Sample if isinstance(dataframe.index, pd.MultiIndex): log = dataframe.reset_index() # TODO why do I need this else: log = dataframe # Slice the log if the user provides a list of columns if parameters != 'all': # TODO change this to None if isinstance(dataframe.index, pd.MultiIndex): parameters_list = np.atleast_1d(list(dataframe.index.names) + list(parameters)) else: parameters_list = np.atleast_1d(parameters) # lines_log = log[parameters_list] lines_log = log[log.columns.intersection(parameters_list)] else: lines_log = log # Default txt log with the complete information if file_type == '.txt': with open(log_path, 'wb') as output_file: pd.set_option('multi_sparse', False) string_DF = lines_log.to_string() # Add meta params for key, value in kwargs.items(): string_DF += f'\n#{key}:{value}' output_file.write(string_DF.encode('UTF-8')) # CSV elif file_type == '.csv': with open(log_path, 'wb') as output_file: pd.set_option('multi_sparse', False) string_DF = lines_log.to_csv(sep=',', na_rep='NaN') # Add meta params for key, value in kwargs.items(): string_DF += f'\n#{key}:{value}' output_file.write(string_DF.encode('UTF-8')) # Pdf fluxes table elif file_type == '.pdf' or file_type == '.tex': table_fluxes(lines_log, log_path.parent / log_path.stem, table_type=file_type[1:], header_format_latex=_LOG_COLUMNS_LATEX, lines_notation=log.latex_label.values, **kwargs) # Log in a fits format elif file_type == '.fits': if isinstance(lines_log, pd.DataFrame): # Initiate the header header = {} if header is None else header # Add the meta parameters for key, value in kwargs.items(): header[key] = value lineLogHDU = log_to_HDU(lines_log, ext_name=page, column_dtypes=column_dtypes, header_dict=header) if log_path.is_file(): # TODO this strategy is slow for many 2_guides try: fits.update(log_path, data=lineLogHDU.data, header=lineLogHDU.header, extname=lineLogHDU.name, verify=True) except KeyError: fits.append(log_path, data=lineLogHDU.data, header=lineLogHDU.header, extname=lineLogHDU.name) else: hdul = fits.HDUList([fits.PrimaryHDU(), lineLogHDU]) hdul.writeto(log_path, overwrite=True, output_verify='fix') # Log in excel format elif file_type == '.xlsx' or file_type == '.xls': # Check openpyxl is installed else leave if openpyxl_check: # New excel if not log_path.is_file(): with pd.ExcelWriter(log_path) as writer: lines_log.to_excel(writer, sheet_name=page) if len(kwargs) > 0: sheet_name = f'LiMe_{kwargs["LiMe"]}' df_empty = pd.DataFrame.from_dict(kwargs, orient='index', columns=['values']) df_empty.to_excel(writer, sheet_name=sheet_name, index=True) # Updating existing file else: wb = openpyxl.load_workbook(log_path) # Remove if existing and create anew if page in wb.sheetnames: wb.remove(wb[page]) sheet = wb.create_sheet(page, index=0) # Add data one row at a time for i, row in enumerate(dataframe_to_rows(lines_log, index=True, header=True)): if len(row) > 1: # TOFIX Remove the frozen list logic sheet.append(row) # Save the data wb.save(log_path) else: _logger.critical(f'openpyxl is not installed. Lines log {fname} could not be saved') # Advance Scientific Storage Format elif file_type == '.asdf': tree = {page: lines_log.to_records(index=True, column_dtypes=_LOG_TYPES_DICT, index_dtypes='<U50')} # Create new file if not log_path.is_file(): af = asdf.AsdfFile(tree) af.write_to(log_path) # Update file else: with asdf.open(log_path, mode='rw') as af: af.tree.update(tree) af.update() # Extension not recognised else: raise LiMe_Error(f'output extension "{file_type}" was not recognised for file {log_path}') # Empty log else: _logger.info('The output log has no measurements. An output file will not be saved') return
def results_to_log(line, log, norm_flux): # Loop through the line components for i, comp in enumerate(line.list_comps if line.group == 'b' else [line]): # Identifiers log.at[comp.label, 'particle'] = comp.particle.label log.at[comp.label, 'wavelength'] = comp.wavelength log.at[comp.label, 'latex_label'] = comp.latex_label log.at[comp.label, 'group_label'] = 'none' if comp.group_label is None else comp.group_label log.at[comp.label, 'profile'] = comp.profile log.at[comp.label, 'shape'] = comp.shape # Add bands wavelengths log.at[comp.label, 'w1'] = line.mask[0] log.at[comp.label, 'w2'] = line.mask[1] log.at[comp.label, 'w3'] = line.mask[2] log.at[comp.label, 'w4'] = line.mask[3] log.at[comp.label, 'w5'] = line.mask[4] log.at[comp.label, 'w6'] = line.mask[5] # Pixel mask log.at[comp.label, 'pixel_mask'] = line.pixel_mask # Treat every line for j in _RANGE_ATTRIBUTES_FIT: param = _ATTRIBUTES_FIT[j] param_value = line.measurements.__getattribute__(param) # Get component parameter if _LOG_COLUMNS[param][3] and (param_value is not None): param_value = param_value[i] # De-normalize if _LOG_COLUMNS[param][0]: if param_value is not None: param_value = param_value * norm_flux # Store in dataframe log.at[comp.label, param] = param_value return def check_file_dataframe(df_variable, variable_type=pd.DataFrame, ext='FRAME', sample_levels=['id', 'line'], copy_input=True, verbose=True): if isinstance(df_variable, variable_type): if copy_input: output = df_variable.copy() else: output = df_variable elif isinstance(df_variable, (str, Path)): input_path = Path(df_variable) if input_path.is_file(): output = load_frame(df_variable, page=ext, levels=sample_levels) else: if verbose: _logger.warning(f'Lines bands file not found at {df_variable}') output = None else: output = df_variable return output def check_fit_conf(fit_conf, default_key, obj_key, update_default=True, group_list=None, fit_cfg_suffix='_line_fitting', line_detection=False): # Check that there is an input configuration if fit_conf is not None: # Check if we have a file if not isinstance(fit_conf, dict): input_cfg = load_cfg(fit_conf, fit_cfg_suffix) else: input_cfg = fit_conf.copy() # If requested, check that the group/mask configurations are there if group_list is not None: for mask_name in group_list: mask_fit_cfg = input_cfg.get(f'{mask_name}_line_fitting') missing_mask = False if mask_fit_cfg is not None: if mask_fit_cfg.get('bands') is None: missing_mask = True else: missing_mask = True if missing_mask: error_message = 'No input "bands" provided. In this case you need to include the \n' \ f'you need to specify an "bands=log_file_address" entry the ' \ f'"[{mask_name}_file]" of your fitting configuration file' raise LiMe_Error(error_message) # Recover the configuration expected for the object default_cfg = input_cfg.get(f'{default_key}_line_fitting') if default_key is not None else None custom_cfg = input_cfg.get(f'{obj_key}_line_fitting') if obj_key is not None else None # Case there are not level entries if (default_cfg is None) and (custom_cfg is None): output_cfg = input_cfg # Proceed to update the levels else: # Default configuration default_cfg = {} if default_cfg is None else default_cfg default_detect = default_cfg.get('line_detection', {}) # Custom configuration custom_cfg = {} if custom_cfg is None else custom_cfg custom_detect = custom_cfg.get('line_detection', {}) # Update default configuration if requested else use only custom output_cfg = {**default_cfg, **custom_cfg} if update_default else (custom_cfg if custom_cfg else default_cfg) # Update default detection if requested else use only custom if line_detection: output_cfg['line_detection'] = default_detect.update(custom_detect) if update_default else\ (custom_detect if custom_detect else default_detect) else: output_cfg = {} # Check the detection entries are included if line_detection: if output_cfg.get('continuum') is not None: if output_cfg['continuum'].get('degree_list') is None: _logger.critical(f'Automatic line detection but the input configuration does not include a ' f'"degree_list" key') if output_cfg['continuum'].get('emis_threshold') is None: _logger.critical(f'TAutomatic line detection but the input configuration does not include a ' f'"emis_threshold" key') else: _logger.critical(f'Automatic line detection but the input configuration does not include a ' f'"continuum" entry with a "degree_list" and "emis_threshold" keys') return output_cfg def check_numeric_value(s): # Function to check if variable can be converte to float else leave as string try: output = float(s) return output except ValueError: return s # Function to map a string to its variable-type def format_option_value(entry_value, key_label, section_label='', float_format=None, nan_format='nan'): output_variable = entry_value # Dictionary blended lines if isinstance(entry_value, str): output_variable = {} try: keys_and_values = entry_value.split(',') for pair in keys_and_values: # Conversion for parameter class atributes if ':' in pair: key, value = pair.split(':') if value == 'None': output_variable[key] = None elif key in ['value', 'min', 'max']: output_variable[key] = float(value) elif key == 'vary': output_variable[key] = strtobool(value) == 1 else: output_variable[key] = value elif '_mask' in key_label: output_variable = entry_value # Conversion for non-parameter class atributes (only str conversion possible) else: output_variable = check_numeric_value(entry_value) except: raise LiMe_Error(f'Failure to convert configuration entry: {key_label} = {entry_value} in section {section_label}') return output_variable def log_parameters_calculation(input_log, parameter_list, formulae_list): # Load the log if necessary file: file_check = False if isinstance(input_log, pd.DataFrame): log_df = input_log elif isinstance(input_log, (str, Path)): file_check = True log_df = load_frame(input_log) else: _logger.critical( f'Not a recognize log format. Please use a pandas dataframe or a string/Path object for file {input_log}') exit(1) # Parse the combined expression expr = '' for col, formula in zip(parameter_list, formulae_list): expr += f'{col}={formula}\n' # Compute the new parameters log_df.eval(expr=expr, inplace=True) # Save to the previous location if file_check: save_frame(log_df, input_log) return def log_to_HDU(log, ext_name=None, column_dtypes=None, header_dict=None): # For non empty logs if not log.empty: if column_dtypes is None: column_dtypes = _LOG_TYPES_DICT if header_dict is None: header_dict = {} linesSA = log.to_records(index=True, column_dtypes=column_dtypes, index_dtypes='<U50') linesCol = fits.ColDefs(linesSA) hdr = fits.Header(header_dict) if isinstance(header_dict, dict) else header_dict linesHDU = fits.BinTableHDU.from_columns(linesCol, name=ext_name, header=hdr) # Empty log else: linesHDU = None return linesHDU def formatStringOutput(value, key, section_label=None, float_format=None, nan_format='nan'): # TODO this one should be the default option # TODO add more cases for dicts # Check None entry if value is not None: # Check string entry if isinstance(value, str): formatted_value = value else: # Case of an array scalarVariable = True if isinstance(value, (Sequence, np.ndarray)): # Confirm is not a single value array if len(value) == 1: value = value[0] # Case of an array else: scalarVariable = False formatted_value = ','.join([str(item) for item in value]) if scalarVariable: # Case single float if isinstance(value, str): formatted_value = value else: if np.isnan(value): formatted_value = nan_format else: formatted_value = str(value) else: formatted_value = 'None' return formatted_value def load_spatial_mask(mask_file, mask_list=None, return_coords=False): # Masks array container spatial_mask_dict = {} # Limit the analysis to some masks mask_list_check = False if mask_list is None else True # Check that mask is there: if mask_file is not None: mask_file = Path(mask_file) if mask_file.is_file(): with fits.open(mask_file) as maskHDULs: # Save the fits data to restore later counter = 0 for HDU in maskHDULs: ext_name = HDU.name if HDU.name != 'PRIMARY': if mask_list_check: if ext_name in mask_list: spatial_mask_dict[ext_name] = (HDU.data.astype('bool'), HDU.header) counter += 1 else: spatial_mask_dict[ext_name] = (HDU.data.astype('bool'), HDU.header) counter += 1 # Warn if the mask file does not contain any of the expected masks if counter == 0: _logger.warning(f'No masks extensions were found in file {mask_file}') else: _logger.warning(f'No mask file was found at {mask_file.as_posix()}.') # Return the mask as a set of coordinates (no headers) if return_coords: for mask_name, mask_data in spatial_mask_dict.items(): idcs_spaxels = np.argwhere(mask_data[0]) spatial_mask_dict[mask_name] = idcs_spaxels return spatial_mask_dict def check_file_array_mask(var, mask_list=None): # Check if file if isinstance(var, (str, Path)): input = Path(var) if input.is_file(): mask_dict = load_spatial_mask(var, mask_list) else: raise LiMe_Error(f'No spatial mask file at {Path(var).as_posix()}') # Array elif isinstance(var, (np.ndarray, list)): # Re-adjust the variable var = np.ndarray(var, ndmin=3) masks_array = np.squeeze(np.array_split(var, var.shape[0], axis=0)) # Confirm boolean array if masks_array.dtype != bool: _logger.warning(f'The input mask array should have a boolean variables (True/False)') # Incase user gives a str mask_list = [mask_list] if isinstance(mask_list, str) else mask_list # Check if there is a mask list if len(mask_list) == 0: mask_list = [f'SPMASK{i}' for i in range(masks_array.shape[0])] # Case the number of masks names and arrays is different elif masks_array.shape[0] != len(mask_list): _logger.warning(f'The number of input spatial mask arrays is different than the number of mask names') # Everything is fine else: mask_list = mask_list # Create mask dict with empty headers mask_dict = dict(zip(mask_list, (masks_array, {}))) else: raise LiMe_Error(f'Input mask format {type(input)} is not recognized for a mask file. Please declare a fits file, a' f' numpy array or a list/array of numpy arrays') return mask_dict # _PARENT_BANDS = load_frame(_LINES_DATABASE_FILE)