__all__ = ['unit_conversion',
'extract_fluxes',
'normalize_fluxes',
'redshift_calculation',
'save_parameter_maps']
import logging
import numpy as np
import pandas as pd
from sys import stdout
from pathlib import Path
from astropy import units as au
from astropy.units.core import CompositeUnit, IrreducibleUnit, Unit
from astropy.io import fits
from lime.io import LiMe_Error, load_frame, log_to_HDU
_logger = logging.getLogger('LiMe')
# Arrays for roman numerals conversion
VAL_LIST = [1000, 900, 500, 400, 100, 90, 50, 40, 10, 9, 5, 4, 1]
SYB_LIST = ["M", "CM", "D", "CD", "C", "XC", "L", "XL", "X", "IX", "V", "IV", "I"]
dict_units = {'flam': au.erg/au.s/au.cm**2/au.AA, 'FLAM': au.erg/au.s/au.cm**2/au.AA,
'fnu': au.erg/au.s/au.cm**2/au.Hz, 'FNU': au.erg/au.s/au.cm**2/au.Hz,
'photlam': au.photon/au.s/au.cm**2/au.AA, 'PHOTLAM': au.photon/au.s/au.cm**2/au.AA,
'photnu': au.photon/au.s/au.cm**2/au.Hz, 'PHOTNU': au.photon/au.s/au.cm**2/au.Hz}
au.set_enabled_aliases(dict_units)
PARAMETER_LATEX_DICT = {'Flam': r'$F_{\lambda}$',
'Fnu': r'$F_{\nu}$',
'SN_line': r'$\frac{S}{N}_{line}$',
'SN_cont': r'$\frac{S}{N}_{cont}$'}
def unique_line_arr(frame, return_index = False):
idcs_s = frame.group_label == 'none'
idcs_m = pd.Series(frame.index.str.endswith('_m'), index=frame.index)
# Initialize with False
idcs_b = pd.Series(False, index=frame.index)
idcs_components = frame.loc[~(idcs_s | idcs_m), 'group_label'].str.split('+').str[0].unique()
idcs_components = idcs_b.index.isin(idcs_components)
idcs_b[idcs_components] = True
# Get merged lines in blended lines
m_terms = (frame.loc[idcs_b, 'group_label'].apply(lambda x: [term for term in x.split('+')
if term.endswith('_m')]))
flat_m_terms = [item for sublist in m_terms if sublist for item in sublist]
if len(flat_m_terms) > 0:
idcs_m_in_b = pd.Series(False, index=frame.index)
idcs_m_in_b.loc[flat_m_terms] = True
else:
idcs_m_in_b = pd.Series(False, index=frame.index)
idcs_selection = (idcs_s | idcs_m | idcs_b) & ~idcs_m_in_b
# Return a series with the same length as the original index
if return_index:
return idcs_selection
# Return an array
else:
return idcs_selection[idcs_selection].index
def mult_err_propagation(nominal_array, err_array, result):
err_result = result * np.sqrt(np.sum(np.power(err_array/nominal_array, 2)))
return err_result
def int_to_roman(num):
i, roman_num = 0, ''
while num > 0:
for _ in range(num // VAL_LIST[i]):
roman_num += SYB_LIST[i]
num -= VAL_LIST[i]
i += 1
return roman_num
def pd_get(df, row, column, default=None, transform=None, nan_to_none=False):
# Fast get from dataframe
try:
cell = df.at[row, column]
except KeyError:
cell = default
# Transform the value from the dataframe to the default if requested
if transform is not None:
cell = default if cell == transform else cell
# Transform nan to None # TODO is this all necessary
if nan_to_none and (cell is not None):
if isinstance(cell, float):
cell = None if np.isnan(cell) else cell
return cell
def extract_fluxes(log, flux_type='mixture', sample_level='line', column_name='line_flux', column_positions=None):
if flux_type not in ('mixture', 'intg', 'profile'):
raise LiMe_Error(f'Flux type "{flux_type}" is not recognized please one of "intg", "profile", or "mixture" ')
# Get indeces of blended lines
if not isinstance(log.index, pd.MultiIndex):
idcs_blended = (log['group_label'] != 'none') & (~log.index.str.endswith('_m'))
else:
if sample_level not in log.index.names:
raise LiMe_Error(f'Input log does not have a index level with column "{sample_level}"')
idcs_blended = (log['group_label'] != 'none') & (~log.index.get_level_values('line').str.endswith('_m'))
# Mixture models: Integrated fluxes for all lines except blended
if flux_type == 'mixture' and np.any(idcs_blended):
obsFlux = log['intg_flux'].to_numpy(copy=True)
obsErr = log['intg_flux_err'].to_numpy(copy=True)
obsFlux[idcs_blended.values] = log.loc[idcs_blended.values, 'profile_flux'].to_numpy(copy=True)
obsErr[idcs_blended.values] = log.loc[idcs_blended.values, 'profile_flux_err'].to_numpy(copy=True)
# Use the one requested by the user
else:
obsFlux = log[f'{flux_type}_flux'].to_numpy(copy=True)
obsErr = log[f'{flux_type}_flux_err'].to_numpy(copy=True)
# Add columns to input dataframe
output_fluxes = [obsFlux, obsErr]
if column_name is not None:
column_positions = 0 if column_positions is None else column_positions
if column_name in log.columns:
log.loc[f'{column_name}'] = output_fluxes[0]
log.loc[f'{column_name}_err'] = output_fluxes[1]
else:
log.insert(loc=column_positions, column=f'{column_name}', value=output_fluxes[0])
log.insert(loc=column_positions + 1, column=f'{column_name}_err', value=output_fluxes[1])
function_return = None
else:
function_return = output_fluxes
return function_return
def check_lines_normalization(input_lines, norm_line, log):
# Single or multi-index behaviour
single_index_check = not isinstance(log.index, pd.MultiIndex)
# If not input lines use all of them
if input_lines is None:
input_lines = log.index.to_numpy() if single_index_check else log.index.get_level_values('line').unique()
# In case there is only one input line as str
input_lines = [input_lines] if isinstance(input_lines, str) else input_lines
# 1) One-norm in norm_line # 2) Multiple-norm in norm_line # 3) Multiple-norm in input_lines
# Cases 1 and 2
if norm_line is not None:
# Unique normalization
if isinstance(norm_line, str) or (len(norm_line) == 1):
if single_index_check:
line_list = list(log.loc[log.index.isin(input_lines)].index.to_numpy())
else:
line_list = list(log.loc[log.index.get_level_values('line').isin(input_lines)].index.get_level_values('line').unique())
norm_list = [norm_line] * len(line_list)
# Multiple normalizations
else:
if len(input_lines) and len(norm_line):
line_list, norm_list = [], []
candidate_lines = log.index if single_index_check else log.index.get_level_values('line')
for i, line in enumerate(input_lines):
if (line in candidate_lines) and (norm_line[i] in candidate_lines):
line_list.append(line)
norm_list.append(norm_line[i])
else:
raise LiMe_Error(f'The number of normalization lines does not match the number of input lines:\n'
f'- Model lines ({input_lines}): {input_lines}\n'
f'- Norm lines ({len(norm_line)}): {norm_line}')
# Case 3
else:
# Split the nominators and denominators
line_list, norm_list = [], []
for ratio in input_lines:
if '/' not in ratio:
raise LiMe_Error(f'Input line list must use "/" with their normalization (for example H1_6563A/H1_4861A)\n'
f'The input ratio: {ratio} does not have it. Try to specify a "norm_list" argument instead.')
nomin_i, denom_i = ratio.replace(" ", "").split('/')
line_list.append(nomin_i)
norm_list.append(denom_i)
return line_list, norm_list
def normalize_fluxes(log, line_list=None, norm_list=None, flux_column='profile_flux', column_name='line_flux',
column_position=0, column_normalization_name='norm_line', sample_levels=['id', 'line']):
'''
If the normalization line is not available, no operation is added.
'''
# Check columns present in log
if (flux_column not in log.columns) or (f'{flux_column}_err' not in log.columns):
raise LiMe_Error(f'Input log is missing "{flux_column}" or "{flux_column}_err" columns')
# Check the normalization for the lines
line_array, norm_array = check_lines_normalization(line_list, norm_list, log)
# Add new columns if necessary
if column_name not in log.columns:
log.insert(loc=column_position, column=f'{column_name}', value=np.nan)
log.insert(loc=column_position+1, column=f'{column_name}_err', value=np.nan)
# Add new line with normalization by default
if column_normalization_name not in log.columns:
log.insert(loc=column_position+2, column=column_normalization_name, value='none')
# Loop throught the lines to compute their normalization
single_index = not isinstance(log.index, pd.MultiIndex)
for i in np.arange(len(line_array)):
numer, denom = line_array[i], norm_array[i]
numer_flux, denom_flux = None, None
# Single-index dataframe
if single_index:
idcs_ratios = numer
if (numer in log.index) and (denom in log.index):
numer_flux = log.loc[numer, flux_column]
numer_err = log.loc[numer, f'{flux_column}_err']
denom_flux = log.loc[denom, flux_column]
denom_err = log.loc[denom, f'{flux_column}_err']
#Multi-index dataframe
else:
if numer == denom: # Same line normalization
idcs_slice = log.index.get_level_values(sample_levels[-1]) == numer
df_slice = log.loc[idcs_slice]
else: # Rest cases
idcs_slice = log.index.get_level_values(sample_levels[-1]).isin([numer, denom])
grouper = log.index.droplevel('line')
idcs_slice = pd.Series(idcs_slice).groupby(grouper).transform('sum').ge(2).to_numpy()
df_slice = log.loc[idcs_slice]
# Get fluxes
if df_slice.size > 0:
num_slice = df_slice.xs(numer, level=sample_levels[-1], drop_level=False)
numer_flux = num_slice[flux_column].to_numpy()
numer_err = num_slice[f'{flux_column}_err'].to_numpy()
denom_slice = df_slice.xs(denom, level=sample_levels[-1], drop_level=False)
denom_flux = denom_slice[flux_column].to_numpy()
denom_err = denom_slice[f'{flux_column}_err'].to_numpy()
idcs_ratios = num_slice.index
# Compute the ratios with error propagation
ratio_array, ratio_err = None, None
if (numer_flux is not None) and (denom_flux is not None):
ratio_array = numer_flux / denom_flux
ratio_err = ratio_array * np.sqrt(np.power(numer_err / numer_flux, 2) + np.power(denom_err / denom_flux, 2))
# Store in dataframe (with empty columns)
if (ratio_array is not None) and (ratio_err is not None):
log.loc[idcs_ratios, f'{column_name}'] = ratio_array
log.loc[idcs_ratios, f'{column_name}_err'] = ratio_err
# Store normalization line
if column_normalization_name is not None:
log.loc[idcs_ratios, f'{column_normalization_name}'] = denom
return
def redshift_calculation(input_log, line_list=None, weight_parameter=None, min_err_pct=None, obj_label='spec_0'):
if str(type(input_log)) in ["<class 'lime.observations.Spectrum'>", "<class 'lime.observations.Cube'>", "<class 'lime.observations.Sample'>"]:
input_log = input_log.frame
# Check if single or multi-index
sample_check = isinstance(input_log.index, pd.MultiIndex)
# Check the weighted parameter presence
if weight_parameter is not None:
if weight_parameter not in input_log.columns:
raise LiMe_Error(f'The parameter {weight_parameter} is not found on the input lines log headers')
# Check input line is not a string
line_list = np.array(line_list, ndmin=1) if isinstance(line_list, str) else line_list
if sample_check:
levels = input_log.index.names
id_list = input_log.index.droplevel(levels[-1]).unique()
else:
id_list = np.array([obj_label])
levels = None
# Container for redshifts
z_df = pd.DataFrame(index=id_list, columns=['z_mean', 'z_std', 'lines', 'weight'])
if sample_check:
z_df.rename_axis(index=levels[:-1], inplace=True)
# Loop through the ids
for idx in id_list:
# Slice to the object log
if not sample_check:
df_slice = input_log
else:
df_slice = input_log.xs(idx, level=levels[0])
# Get the lines requested
if line_list is not None:
idcs_slice = df_slice.index.isin(line_list)
df_slice = df_slice.loc[idcs_slice]
# Exclude lines which failed to be measured
df_slice = df_slice.loc[df_slice.profile_flux_err.notnull()]
# Exclude error lines:
if min_err_pct is not None:
idcs_slice = df_slice.center_err.to_numpy() / df_slice.center.to_numpy() <= min_err_pct
df_slice = df_slice.loc[idcs_slice]
# Check the line has lines
n_lines = len(df_slice.index)
if n_lines > 0:
z_array = (df_slice['center']/df_slice['wavelength'] - 1).to_numpy()
z_err_array = df_slice.center_err.to_numpy() / df_slice.center.to_numpy()
line_array = df_slice.index.to_numpy()
# Just one line
if n_lines == 1:
z_mean = z_array[0]
z_std = df_slice.center_err.to_numpy()[0]/df_slice.center.to_numpy()[0]
# Multiple lines
else:
# Not weighted parameter
if weight_parameter is None:
z_mean = z_array.mean()
z_std = z_array.std()
# With a weighted parameter
else:
# Get normalized errors weight values which are positive...
w_array = df_slice[weight_parameter].to_numpy()
idcs_pos = w_array > 0
w_array = w_array[idcs_pos]/np.sum(w_array[idcs_pos])
# Only use possitive values
z_array, z_err_array, line_array = z_array[idcs_pos], z_err_array[idcs_pos], line_array[idcs_pos]
z_mean = np.sum(w_array * z_array)/np.sum(w_array)
z_std = np.sqrt(np.sum(w_array * np.square(z_err_array)) / np.square(np.sum(w_array)))
obsLineList = ','.join(list(line_array))
else:
z_mean, z_std, obsLineList = np.nan, np.nan, None
# Add to dataframe
z_df.loc[idx, 'z_mean':'weight'] = z_mean, z_std, obsLineList, weight_parameter
return z_df
def compute_FWHM0(idx_peak, spec_flux, delta_wave, cont_flux, emission_check=True):
"""
:param idx_peak:
:param spec_flux:
:param delta_wave:
:param cont_flux:
:param emission_check:
:return:
"""
i = idx_peak
i_final = 0 if delta_wave < 0 else spec_flux.size - 1
if emission_check:
while (spec_flux[i] >= cont_flux[i]) and (i != i_final):
i += delta_wave
else:
while (spec_flux[i] <= cont_flux[i]) and (i != i_final):
i += delta_wave
return i
def blended_label_from_log(line, log):
# Default values: single line
blended_check = False
group_label = None
if line in log.index:
if 'group_label' in log.columns:
if log.loc[line, 'group_label'] == 'none':
group_label = None
elif line.endswith('_m'):
group_label = log.loc[line, 'group_label']
else:
blended_check = True
group_label = log.loc[line, 'group_label']
else:
# TODO this causes and error if we forget the '_b' componentes in the configuration file need to check input cfg
_logger.warning(f'The line {line} was not found on the input log. If you are specifying the components of a '
f'blended line in the fitting configuration, make sure you are not missing the "_b" subscript')
return blended_check, group_label
[docs]
def unit_conversion(in_units, out_units, wave_array=None, flux_array=None, dispersion_units=None, decimals=None):
"""
Convert a wavelength or flux array from one physical unit to another.
This function converts input quantities (wavelength or flux) from ``in_units`` to
``out_units`` using the `Astropy Units module
<https://docs.astropy.org/en/stable/units/>`_.
For flux conversions, the user must also provide the corresponding wavelength array
and its dispersion units to correctly apply the spectral-density equivalency.
Parameters
----------
in_units : str
Units of the input array. Must be a valid `Astropy unit string
<https://docs.astropy.org/en/stable/units/#module-astropy.units>`_ (e.g.,
``"Angstrom"``, ``"Hz"``, ``"Jy"``, ``"erg / (s cm2 Angstrom)"``).
out_units : str
Target units for the conversion. Must also be a valid Astropy unit string.
wave_array : numpy.ndarray, optional
Wavelength array. Required for flux conversions (``flux_array`` not ``None``).
Used to define the frequency–wavelength relationship in the conversion.
flux_array : numpy.ndarray, optional
Flux array to convert. If ``None``, the function assumes that the conversion
applies to ``wave_array`` instead.
dispersion_units : str, optional
Units of the wavelength array, required when converting flux densities (e.g.,
``"Angstrom"`` or ``"Hz"``). Used to define the spectral density equivalency.
decimals : int, optional
Number of decimal places to round the output values. If ``None``, no rounding
is applied.
Returns
-------
numpy.ndarray
Converted array (wavelength or flux) expressed in ``out_units`` and stripped
of unit metadata.
Notes
-----
- The conversion is performed through `astropy.units.Unit` objects and their corresponding equivalencies:
* For wavelength conversions: ``astropy.units.spectral()``.
* For flux conversions: ``astropy.units.spectral_density(wave_array)``.
- Ensure that both input and output units are compatible for the desired
transformation (e.g., wavelength ↔ frequency or flux per unit wavelength ↔ flux per unit frequency).
- When converting flux densities, always provide both ``wave_array`` and
``dispersion_units`` to avoid unit-equivalency errors.
Examples
--------
Convert wavelength from Angstroms to nanometers:
>>> import numpy as np
>>> wave_nm = unit_conversion("Angstrom", "nm", wave_array=np.array([5000, 6000]))
>>> wave_nm
array([500., 600.])
Convert flux density from Fλ (erg s⁻¹ cm⁻² Å⁻¹) to Fν (erg s⁻¹ cm⁻² Hz⁻¹):
>>> flux_fnu = unit_conversion("erg / (s cm2 Angstrom)", "erg / (s cm2 Hz)",
... wave_array=np.array([5000, 6000]),
... flux_array=np.array([1e-14, 2e-14]),
... dispersion_units="Angstrom", decimals=5)
>>> flux_fnu
array([3.33e-27, 6.67e-27])
"""
# Converting the wavelength array
if flux_array is None:
input_array = wave_array * au.Unit(in_units)
output_array = input_array.to(au.Unit(out_units), equivalencies=au.spectral())
# Converting the flux array
else:
input_array = flux_array * au.Unit(in_units)
dispersion_array = wave_array * au.Unit(dispersion_units)
output_array = input_array.to(au.Unit(out_units), au.spectral_density(dispersion_array))
# Remove the units
output_array = output_array.value
# Round to decimal places
output_array = output_array if decimals is None else np.round(output_array, decimals)
return output_array
def parse_unit_convertion(observation, wave_units_out, flux_units_out):
# Recover the pixel mask
pixel_mask = observation.flux.mask
# Remove existing normalization
old_norm = observation.norm_flux if ((observation.norm_flux != 1) and (observation.norm_flux is not None)) else 1
input_wave = observation.wave if pixel_mask is None else observation.wave.data
input_flux = observation.flux * old_norm if pixel_mask is None else observation.flux.data * old_norm
if observation.err_flux is not None:
input_err = observation.err_flux * old_norm if pixel_mask is None else observation.err_flux * old_norm
else:
input_err = None
# Get the new units or use old
wave_units_out = observation.units_wave if wave_units_out is None else wave_units_out
flux_units_out = observation.units_flux if flux_units_out is None else flux_units_out
# Flux conversion
if flux_units_out != observation.units_flux:
# Spectrum
if len(input_flux.shape) == 1:
# Flux conversion
output_flux = unit_conversion(observation.units_flux, flux_units_out, wave_array=input_wave,
flux_array=input_flux, dispersion_units=observation.units_wave)
# Flux uncertainty conversion
output_err = None if input_err is None else unit_conversion(observation.units_flux, flux_units_out,
wave_array=input_wave,
flux_array=input_err,
dispersion_units=observation.units_wave)
# Cube
elif len(input_flux.shape) == 3:
output_flux = np.empty_like(input_flux)
output_err = np.empty_like(input_err) if input_err is not None else None
for i in np.arange(input_flux.shape[0]):
output_flux[i, :, :] = unit_conversion(observation.units_flux, flux_units_out, wave_array=input_wave[i],
flux_array=input_flux[i, :, :], dispersion_units=observation.units_wave)
if input_err is not None:
output_err[i, :, :] = unit_conversion(observation.units_flux, flux_units_out, wave_array=input_wave[i],
flux_array=input_err[i, :, :],
dispersion_units=observation.units_wave)
# Not recognized
else:
raise LiMe_Error(f'Input flux for unit conversion array size is not recognized:\n'
f'-- Please use 1-dimension array for lime.Spectrum and 3-dimension array for lime.Cube')
else:
output_flux, output_err = input_flux, input_err
# Wavelength conversion
if wave_units_out != observation.units_wave:
output_wave = unit_conversion(observation.units_wave, wave_units_out, wave_array=input_wave)
else:
output_wave = input_wave
return wave_units_out, flux_units_out, output_wave, output_flux, output_err, pixel_mask
def join_fits_files(log_file_list, output_address, delete_after_join=False, levels=['id', 'line']):
"""
This functions combines multiple log files into single *.fits* file. The user can request to the delete the individual
logs after the individual logs have been combined.
If the case of individual *.fits* the function loop through the individual HDU and add them to the output file. Currently,
this is not available to other multi-page files (such as .xlsx or .asdf)
:param log_file_list: Input list of log files.
:type log_file_list: list
:param output_address: String or path for the output combined log file.
:type output_address: str, Path, optional
:param delete_after_join: Delete individual files after joining them. The default value is False
:type output_address: bool, optional
:param levels: Indexes name list for MultiIndex dataframes. The default value is ['id', 'line'].
:type levels: list, optional
:return:
"""
# Confirm is a path
output_address = Path(output_address)
# Create new HDU for the combined file with a new PrimaryHDU
hdulist = fits.HDUList([fits.PrimaryHDU()])
# Progress bar
n_log = len(log_file_list)
pbar = ProgressBar('bar', f'log files combined')
# Iterate through the file paths, open each FITS file, and append the non-primary HDUs to hdulist
missing_files = []
for i, log_path in enumerate(log_file_list):
log_path = Path(log_path)
pbar.output_message(i, n_log, pre_text="", post_text=None)
if log_path.is_file():
ext = log_path.suffix
# Fits file
if ext == '.fits':
with fits.open(log_path) as hdul:
for j, hdu in enumerate(hdul):
if j == 0:
if not isinstance(hdu, fits.PrimaryHDU):
hdu_i = fits.BinTableHDU(data=hdu.data, header=hdu.header, name=hdu.name,
character_as_bytes=False)
else:
hdu_i = None
else:
hdu_i = fits.BinTableHDU(data=hdu.data, header=hdu.header, name=hdu.name,
character_as_bytes=False)
# Append
if hdu_i is not None:
hdulist.append(hdu_i)
# Remaining types
else:
df_i = load_frame(log_path, levels=levels)
name_i = log_path.stem
hdu_i = log_to_HDU(df_i, ext_name=name_i)
# Append
if hdu_i is not None:
hdulist.append(hdu_i)
# Append to the list
# with fits.open(log_path) as hdulist_i:
#
# for i, hdu in enumerate(hdulist_i):
# if i == 0:
# if not isinstance(hdu, fits.PrimaryHDU):
# hdulist.append(fits.TableHDU(data=hdu.data, header=hdu.header, name=hdu.name))
# else:
# hdulist.append(fits.TableHDU(data=hdu.data, header=hdu.header, name=hdu.name))
else:
missing_files.append(log_path)
# Save to a combined file
hdulist.writeto(output_address, overwrite=True, output_verify='ignore')
hdulist.close()
# Warn of missing files
if len(missing_files) > 0:
_logger.info(f"Warning these files were missing: {missing_files}")
# Delete individual files if requested
if delete_after_join:
if len(missing_files) == 0:
for log_path in log_file_list:
log_path.unlink()
else:
_logger.info("The individual masks won't be deleted")
return
def check_units(units_wave, units_flux):
# Check if input are already astropy units
units_wave = au.Unit(units_wave) if not isinstance(units_wave, (IrreducibleUnit, CompositeUnit, Unit)) else units_wave
units_flux = au.Unit(units_flux) if not isinstance(units_flux, (IrreducibleUnit, CompositeUnit, Unit)) else units_flux
return units_wave, units_flux
[docs]
def save_parameter_maps(lines_log_file, output_folder, param_list, line_list, mask_file=None, mask_list='all',
image_shape=None, log_ext_suffix='_LINELOG', spaxel_fill_value=np.nan, output_file_prefix=None,
header=None, wcs=None):
"""
Convert the measurements from a cube FITS log into 2D parameter maps.
For each requested parameter in ``param_list`` and each line in ``line_list``,
this function builds a 2D image (spaxel grid) by reading the per-spaxel
measurement pages from ``lines_log_file``. The resulting images are written
as multi-extension FITS files to ``output_folder``—one output file per
parameter, with one ImageHDU page per line.
Spaxels to include can be specified via a spatial mask FITS file
(``mask_file``). If multiple mask extensions exist, select a subset with
``mask_list`` or use ``'all'`` to include all masks. If no mask is provided,
you must supply ``image_shape=(ny, nx)`` to define the output grid (this computation can be lengthy).
Parameters
----------
lines_log_file : str or pathlib.Path
Path to the FITS file containing per-spaxel line measurements.
Each spaxel must be stored in an extension named
``"{j}-{i}{log_ext_suffix}"`` (e.g., ``"25-30_LINELOG"``).
output_folder : str or pathlib.Path
Existing directory where output parameter maps will be saved.
param_list : list of str
Measurement fields to extract and map (e.g., ``["flux", "EW", "v_med"]``).
These must be columns present in each spaxel page of ``lines_log_file``.
line_list : list of str
Line labels to include as pages in the output files. Each label must
appear in the spaxel log’s ``index`` (line name) column.
mask_file : str or pathlib.Path, optional
FITS file with one or more binary masks (nonzero→included). If provided,
only spaxels selected by the union of masks in ``mask_list`` are mapped.
mask_list : {'all'} or list of str, optional
Which mask extensions from ``mask_file`` to use. ``'all'`` (default)
includes every non-PRIMARY extension. Otherwise, provide a list of
extension names.
image_shape : tuple of int, optional
Output image shape ``(ny, nx)`` used when no ``mask_file`` is provided.
Required if ``mask_file`` is ``None``.
log_ext_suffix : str, optional
Suffix used in spaxel extension names inside ``lines_log_file``.
Default is ``"_LINELOG"``.
spaxel_fill_value : float, optional
Value used to fill pixels with missing data or spaxels not found in
the log. Default is ``numpy.nan``.
output_file_prefix : str, optional
Prefix prepended to each output filename. For parameter ``P``, the file
is saved as ``{prefix}{P}.fits``. Default is ``None`` (no prefix).
header : dict, optional
Extra header cards to add. If a key matching ``"{param}-{line}"`` exists,
that nested dict is used for that page; otherwise ``header`` is treated
as a global header for all pages of a parameter file.
wcs : astropy.wcs.WCS, optional
WCS used to populate spatial coordinate keywords in each ImageHDU header.
See `Astropy WCS <https://docs.astropy.org/en/stable/wcs/index.html>`_.
Returns
-------
None
Writes one FITS file per parameter to ``output_folder``. Each file
contains an ImageHDU per line in ``line_list``.
Notes
-----
- **Spaxel HDU naming:** The function expects pages in the ``lines_log_file`` to
be named exactly ``"{j}-{i}{log_ext_suffix}"`` where ``j`` and ``i`` are
integer **(row, column)** indices of the cube spaxel.
- **Mask handling:** When ``mask_file`` includes multiple mask extensions,
the selected masks are combined (logical OR) to form the spaxel set.
- **Headers:** Each page includes ``LINE`` (line label) and ``PARAM`` (parameter
name). WCS metadata are added if ``wcs`` is provided. Custom header cards
from ``header`` are merged afterward.
Examples
--------
Create maps for flux and velocity dispersion of two lines, using all masks:
>>> save_parameter_maps(
... lines_log_file="linelog.fits",
... output_folder="maps/",
... param_list=["flux", "sigma"],
... line_list=["O3_5007A", "H1_6563A"],
... mask_file="spatial_masks.fits",
... mask_list="all",
... log_ext_suffix="_LINELOG",
... output_file_prefix="galaxy_"
... )
Build maps without a mask file (provide image shape explicitly):
>>> save_parameter_maps(
... lines_log_file="linelog.fits",
... output_folder="maps/",
... param_list=["EW"],
... line_list=["H1_4861A"],
... image_shape=(74, 76)
... )
"""
assert Path(lines_log_file).is_file(), f'- ERROR: lines log at {lines_log_file} not found'
assert Path(output_folder).is_dir(), f'- ERROR: Output parameter maps folder {output_folder} not found'
# Compile the list of voxels to recover the provided masks
if mask_file is not None:
assert Path(mask_file).is_file(), f'- ERROR: mask file at {mask_file} not found'
with fits.open(mask_file) as maskHDUs:
# Get the list of mask extensions
if mask_list == 'all':
if ('PRIMARY' in maskHDUs) and (len(maskHDUs) > 1):
mask_list = []
for i, HDU in enumerate(maskHDUs):
mask_name = HDU.name
if mask_name != 'PRIMARY':
mask_list.append(mask_name)
mask_list = np.array(mask_list)
else:
mask_list = np.array(['PRIMARY'])
else:
mask_list = np.array(mask_list, ndmin=1)
# Combine all the mask voxels into one
for i, mask_name in enumerate(mask_list):
if i == 0:
mask_array = maskHDUs[mask_name].data
image_shape = mask_array.shape
else:
assert image_shape == maskHDUs[mask_name].data.shape, '- ERROR: Input masks do not have the same dimensions'
mask_array += maskHDUs[mask_name].data
# Convert to boolean
mask_array = mask_array.astype(bool)
# List of spaxels in list [(idx_j, idx_i), ...] format
spaxel_list = np.argwhere(mask_array)
# No mask file is provided and the user just defines an image size tupple (nY, nX)
else:
mask_array = np.ones(image_shape).astype(bool)
spaxel_list = np.argwhere(mask_array)
# Generate containers for the data:
images_dict = {}
for param in param_list:
# Make sure is an array and loop throuh them
for line in line_list:
images_dict[f'{param}-{line}'] = np.full(image_shape, spaxel_fill_value)
# Loop through the spaxels and fill the parameter images
n_spaxels = spaxel_list.shape[0]
spaxel_range = np.arange(n_spaxels)
pbar = ProgressBar('bar', f'spaxels from file')
with fits.open(lines_log_file) as logHDUs:
for i_spaxel in spaxel_range:
idx_j, idx_i = spaxel_list[i_spaxel]
spaxel_ref = f'{idx_j}-{idx_i}{log_ext_suffix}'
post_text = f'"{lines_log_file}" read ({n_spaxels} total)'
pbar.output_message(i_spaxel, n_spaxels, pre_text="", post_text=post_text)
# progress_bar(i_spaxel, n_spaxels, post_text=f'of spaxels from file ({lines_log_file}) read ({n_spaxels} total spaxels)')
# Confirm log extension exists
if spaxel_ref in logHDUs:
# Recover extension data
log_data = logHDUs[spaxel_ref].data
log_lines = log_data['index']
# Loop through the parameters and the lines:
for param in param_list:
idcs_log = np.argwhere(np.in1d(log_lines, line_list))
for i_line in idcs_log:
images_dict[f'{param}-{log_lines[i_line][0]}'][idx_j, idx_i] = log_data[param][i_line][0]
# New line after the rustic progress bar
print()
# Recover coordinates from the wcs to store in the headers:
hdr_coords = extract_wcs_header(wcs, drop_axis='spectral')
# Save the parameter maps as individual fits files with one line per page
output_file_prefix = '' if output_file_prefix is None else output_file_prefix
for param in param_list:
# Primary header
paramHDUs = fits.HDUList()
paramHDUs.append(fits.PrimaryHDU())
# ImageHDU for the parameter maps
for line in line_list:
# Create page header with the default data
hdr_i = fits.Header()
hdr_i['LINE'] = (line, 'Line label')
hdr_i['PARAM'] = (param, 'LiMe parameter label')
# Add WCS information
if hdr_coords is not None:
hdr_i.update(hdr_coords)
# Add user information
if header is not None:
page_hdr = header.get(f'{param}-{line}', None)
page_hdr = header if page_hdr is None else page_hdr
hdr_i.update(page_hdr)
# Create page HDU entry
HDU_i = fits.ImageHDU(name=line, data=images_dict[f'{param}-{line}'], header=hdr_i, ver=1)
paramHDUs.append(HDU_i)
# Write to new file
output_file = Path(output_folder)/f'{output_file_prefix}{param}.fits'
paramHDUs.writeto(output_file, overwrite=True, output_verify='fix')
return
def extract_wcs_header(wcs, drop_axis=None):
if wcs is not None:
# Remove 3rd dimensional axis if present
if drop_axis is not None:
if drop_axis == 'spectral':
input_wcs = wcs.dropaxis(2) if wcs.naxis == 3 else wcs
elif drop_axis == 'spatial':
if wcs.naxis == 3:
input_wcs = wcs.dropaxis(1)
input_wcs = wcs.dropaxis(0)
else:
raise LiMe_Error(f'Fits coordinates axis: "{drop_axis}" not recognized. Please use: "spectral" or'
f' "spatial"')
else:
input_wcs = wcs
# Convert to HDU header
hdr_coords = input_wcs.to_fits()[0].header
else:
hdr_coords = None
return hdr_coords
class ProgressBar:
def __init__(self, message_type=None, count_type='bar'):
self.output_message = None
self.count_type = count_type
if message_type is None:
self.output_message = self.no_output
if message_type == 'bar':
self.output_message = self.progress_bar
if message_type == 'counter':
self.output_message = self.counter
return
def progress_bar(self, i, i_max, pre_text, post_text, n_bar=10):
post_text = "" if post_text is None else post_text
j = (i + 1) / i_max
stdout.write('\r')
message = f'[{"=" * int(n_bar * j):{n_bar}s}] {int(100 * j)}% of {self.count_type} {post_text}'
stdout.write(message)
stdout.flush()
return
def no_output(self, i, i_max, pre_text, post_text, n_bar=10):
return
def counter(self, i, i_max, pre_text, post_text, n_bar=10):
print(post_text)
return