import logging
import numpy as np
import pandas as pd
from pathlib import Path
from astropy.io import fits
from collections import UserDict
from .tools import unit_conversion, define_masks, extract_fluxes, normalize_fluxes, ProgressBar, check_units, au, extract_wcs_header
from .recognition import LineFinder, DetectionInference
from .plots import SpectrumFigures, SampleFigures, CubeFigures
from .plots_interactive import SpectrumCheck, CubeCheck, SampleCheck
from .io import _LOG_EXPORT_RECARR, save_frame, LiMe_Error, check_file_dataframe, _PARENT_BANDS, \
check_file_array_mask, load_frame
from .read_fits import OpenFits, SPECTRUM_FITS_PARAMS
from .transitions import Line, latex_from_label, air_to_vacuum_function
from .workflow import SpecTreatment, CubeTreatment
from . import Error, __version__
# Log variable
_logger = logging.getLogger('LiMe')
try:
import mplcursors
mplcursors_check = True
except ImportError:
mplcursors_check = False
if mplcursors_check:
from mplcursors._mplcursors import _default_annotation_kwargs as popupProps
popupProps['bbox']['alpha'] = 0.9
def review_sample_levels(log, id_name, file_name, id_level="id", file_level="file", line_level="line"):
# If single index we make 2 [id, file, line] or 3 [id, line]
# If multi-index, we don't do anything
# Check if multi-index dataframe
if not isinstance(log.index, pd.MultiIndex):
# Name line level
log.index.name = line_level
# Add id level
log[id_level] = id_name
log.set_index(id_level, append=True, inplace=True)
# Add file level
if file_name is not None:
log[file_level] = file_name
log.set_index(file_level, append=True, inplace=True)
# Sort to the default order
user_levels = [id_level, line_level] if file_name is None else [id_level, file_level, line_level]
log = log.reorder_levels(user_levels)
return log
def check_inputs_arrays(wave, flux, err_flux, lime_object):
for i, items in enumerate(locals().items()):
if i < 3:
key, value = items
if value is not None:
# Confirm numpy array inputs
if isinstance(value, np.ndarray):
# Confirm dimensions
dimensions = len(value.shape)
spec_check = dimensions == 1 and (isinstance(lime_object, Spectrum) or key == 'wave')
cube_type = dimensions == 3 and isinstance(lime_object, Cube)
if not spec_check and not cube_type:
raise LiMe_Error(f'The dimensions of the input {key} are {dimensions}.\n'
f'LiMe only recognizes 1D arrays for the wavelength array, \n'
f'1D flux arrays for the Spectrum objects \n'
f'and 3D flux arrays Cube objects.')
else:
raise LiMe_Error(f'The input {key} array must be numpy array. The input variable type is a {type(value)}')
else:
if key in ['wave', 'flux']:
_logger.info(f'No value has been provided for {key}.')
return
def check_redshift_norm(redshift, norm_flux, flux_array, units_flux, norm_factor=100):
if redshift is None:
_logger.info(f'No redshift provided for the spectrum. Assuming local universe observation (z = 0)')
redshift = 0
if redshift < 0:
_logger.warning(f'Input spectrum redshift has a negative value: z = {redshift}')
if norm_flux is None:
if units_flux.scale == 1:
norm_flux = np.nanmean(flux_array) / norm_factor
_logger.info(f'Normalizing input flux by {norm_flux}')
else:
norm_flux = 1
return redshift, norm_flux
def check_spectrum_axes(lime_object):
# Check for masked arrays
array_labels = ['wave', 'wave_rest', 'flux']
check_mask = np.zeros(3).astype(bool)
for i, arg in enumerate(array_labels):
if np.ma.isMaskedArray(lime_object.__getattribute__(arg)):
check_mask[i] = True
# TODO this one should go at the begining and review inputs
if np.any(check_mask) and isinstance(lime_object, Spectrum):
if ~np.all(check_mask):
for i, arg in enumerate(array_labels):
if not check_mask[i]:
_logger.warning(f'Your {arg} array does not include a pixel mask this can caused issues on the fittings')
# Check that the flux and wavelength normalization #
# if not isinstance(lime_object, Cube):
# if np.nanmedian(lime_object.flux) < 0.0001:
# _logger.info(f'The input flux has a median value of {np.nanmedian(lime_object.flux):.2e} '
# f'{UNITS_LATEX_DICT[lime_object.units_flux]}. This can cause issues in the fitting. '
# f'Try changing the flux normalization')
return
def check_sample_input_files(log_list, file_list, page_list, id_list):
# Confirm all the files have the same address
for key, value in {'log_list': log_list, 'file_list': file_list, 'page_list': page_list}.items():
if value is not None:
if not (len(id_list) == len(value)):
raise LiMe_Error(f'The length of the {key} must be the same of the input "id_list".')
if log_list is None and file_list is None:
raise LiMe_Error(f'To define a sample, the user must provide alongside an "id_list" a "log_list" and/or a '
f'"file_list".')
return
def check_sample_file_parser(source, fits_reader, load_function, default_load_function):
# Assign input load function
if load_function is not None:
output = None, load_function
# Assign the
elif (source is not None) and (fits_reader is not None):
output = fits_reader, None
# No load function nor instrument
else:
raise LiMe_Error(f'To create a Sample object you need to provide "load_function" or provide a "instrument" '
f'supported by LiMe')
return output
def check_sample_levels(levels, necessary_levels=("id", "file")):
for comp_level in necessary_levels:
if comp_level not in levels:
_logger.warning(f'Input log levels do not include a "{comp_level}". This can cause issues with LiMe functions')
return
def cropping_spectrum(crop_waves, input_wave, input_flux, input_err, pixel_mask):
if crop_waves is not None:
# min_limit = crop_waves[0] if crop_waves[0] != 0 else input_wave[0]
# max_limit = crop_waves[1] if crop_waves[1] != -1 else input_wave[-1]
#
# idcs_crop = np.searchsorted(input_wave, (min_limit, max_limit))
# input_wave = input_wave[idcs_crop[0]:idcs_crop[1]]
idx_min = np.searchsorted(input_wave, crop_waves[0]) if crop_waves[0] != 0 else 0
idx_max = np.searchsorted(input_wave, crop_waves[1]) if crop_waves[1] != -1 else None
idcs_crop = (idx_min, idx_max)
input_wave = input_wave[idcs_crop[0]:idcs_crop[1]]
# Spectrum
if len(input_flux.shape) == 1:
input_flux = input_flux[idcs_crop[0]:idcs_crop[1]]
if input_err is not None:
input_err = input_err[idcs_crop[0]:idcs_crop[1]]
# Cube
elif len(input_flux.shape) == 3:
input_flux = input_flux[idcs_crop[0]:idcs_crop[1], :, :]
if input_err is not None:
input_err = input_err[idcs_crop[0]:idcs_crop[1], :, :]
# Not recognized
else:
raise LiMe_Error(f'The dimensions of the input flux are {input_flux.shape}. LiMe only recognized flux 1D '
f'arrays for Spectrum objects and 3D arrays for Cube objects')
if pixel_mask is not None:
pixel_mask = pixel_mask[idcs_crop[0]:idcs_crop[1]]
return input_wave, input_flux, input_err, pixel_mask
def spec_normalization_masking(input_wave, input_flux, input_err, pixel_mask, redshift, norm_flux):
# Apply the redshift correction
if input_wave is not None:
wave_rest = input_wave / (1 + redshift)
if (input_wave is not None) and (input_flux is not None):
wave = input_wave
flux = input_flux # * (1 + self.redshift)
if input_err is not None:
err_flux = input_err # * (1 + self.redshift)
else:
err_flux = None
# Normalize the spectrum
if input_flux is not None:
flux = flux / norm_flux
if input_err is not None:
err_flux = err_flux / norm_flux
# Masked the arrays if requested
if pixel_mask is not None:
# Confirm boolean mask
bool_mask = pixel_mask.astype(bool)
# Check for non-1D arrays
if len(pixel_mask.shape) == 1:
wave = np.ma.masked_array(wave, bool_mask)
wave_rest = np.ma.masked_array(wave_rest, bool_mask)
# Spectrum or Cube spectral masking
flux = np.ma.masked_array(flux, bool_mask)
# if len(input_flux.shape) == 1:
# mask_array = pixel_mask
# else:
# mask_array = np.ones(flux.shape).astype(bool)
# mask_array[pixel_mask, :, :] = pixel_mask
if err_flux is not None:
err_flux = np.ma.masked_array(err_flux, bool_mask)
return wave, wave_rest, flux, err_flux
[docs]
def line_bands(wave_intvl=None, lines_list=None, particle_list=None, z_intvl=None, units_wave='Angstrom', decimals=None,
vacuum=False, ref_bands=None):
"""
This function returns `LiMe bands database <https://lime-stable.readthedocs.io/en/latest/inputs/n_inputs3_line_bands.html>`_
as a pandas dataframe.
If the user provides a wavelength array (``wave_inter``), a lime.Spectrum or lime.Cube the output dataframe will be
limited to the lines within this wavelength interval.
Similarly, the user provides a ``lines_list`` or a ``particle_list`` the output bands will be limited to the these
lists. These inputs must follow `LiMe notation style <https://lime-stable.readthedocs.io/en/latest/inputs/n_inputs2_line_labels.html>`_
If the user provides a redshift interval (``z_intvl``) alongside the wavelength interval (``wave_intvl``) the output
bands will be limited to the transitions which can be observed given the two parameters.
The default line labels and bands ``units_wave`` are angstroms (A), additional options are: um, nm, Hz, cm, mm.
The argument ``decimals`` determines the number of decimal figures for the line labels.
The user can request the output line labels and bands wavelengths in vacuum setting ``vacuum=True``. This conversion
is done using the relation from `Greisen et al. (2006) <https://www.aanda.org/articles/aa/abs/2006/05/aa3818-05/aa3818-05.html>`_.
Instead of the default LiMe database, the user can provide a ``ref_bands`` dataframe (or the dataframe file address)
to use as the reference database.
:param wave_intvl: Wavelength interval for output line transitions.
:type wave_intvl: list, numpy.array, lime.Spectrum, lime.Cube, optional
:param lines_list: Line list for output line bands.
:type lines_list: list, numpy.array, optional
:param particle_list: Particle list for output line bands.
:type particle_list: list, numpy.array, optional
:param z_intvl: Redshift interval for output line bands.
:type z_intvl: list, numpy.array, optional
:param units_wave: Labels and bands wavelength units. The default value is "A".
:type units_wave: str, optional
:param decimals: Number of decimal figures for the line labels.
:type decimals: int, optional
:param vacuum: Set to True for vacuum wavelength values. The default value is False.
:type vacuum: bool, optional
:param ref_bands: Reference bands dataframe. The default value is None.
:type ref_bands: pandas.Dataframe, str, pathlib.Path, optional
:return:
"""
# Use the default lime mask if none provided
if ref_bands is None:
ref_bands = _PARENT_BANDS
# Load the reference bands
mask_df = check_file_dataframe(ref_bands, pd.DataFrame)
# Recover line label components
idcs_rows = np.ones(mask_df.index.size).astype(bool)
# Convert to vacuum wavelengths if requested
if vacuum:
# First the table data
air_columns = ['wavelength', 'w1', 'w2', 'w3', 'w4', 'w5', 'w6']
mask_df[air_columns] = mask_df[air_columns].apply(air_to_vacuum_function, raw=True)
# Convert to requested units
units_wave = au.Unit(units_wave)
if units_wave != 'Angstrom':
conversion_factor = unit_conversion(au.Unit('Angstrom'), units_wave, wave_array=1, dispersion_units='dispersion axis')
mask_df.loc[:, 'wavelength':'w6'] = mask_df.loc[:, 'wavelength':'w6'] * conversion_factor
# Reconstruct the latex label
n_bands = mask_df.index.size
mask_df['latex_label'] = latex_from_label(None, mask_df['particle'], mask_df['wavelength'],
np.array([units_wave] * n_bands), np.zeros(n_bands),
mask_df['transition'], decimals=decimals)
# Re-write the line band
particle_array = mask_df['particle'].to_numpy().astype(str)
wave_array = mask_df['wavelength'].to_numpy()
wave_array = np.round(wave_array, decimals) if decimals is not None else np.round(wave_array, 0).astype(int)
wave_array = wave_array.astype(str)
unit_string = 'A' if units_wave == 'Angstrom' else str(units_wave)
labels_array = np.core.defchararray.add(particle_array, '_')
labels_array = np.core.defchararray.add(labels_array, wave_array)
labels_array = np.core.defchararray.add(labels_array, unit_string)
mask_df.rename(index=dict(zip(mask_df.index.values, labels_array)), inplace=True)
# First slice by wavelength and redshift
if wave_intvl is not None:
# In case the input is a spectrum
if isinstance(wave_intvl, (Spectrum, Cube)):
wave_intvl = wave_intvl.wave_rest
# Establish the lower and upper wavelength limits
if np.ma.isMaskedArray(wave_intvl):
w_min, w_max = wave_intvl.data[0], wave_intvl.data[-1]
else:
w_min, w_max = wave_intvl[0], wave_intvl[-1]
if z_intvl is not None:
z_intvl = np.array(z_intvl, ndmin=1)
w_min, w_max = w_min * (1 + z_intvl[0]), w_max * (1 + z_intvl[-1])
wavelength_array = mask_df['wavelength']
idcs_rows = idcs_rows & (wavelength_array >= w_min) & (wavelength_array <= w_max)
# Second slice by particle
if particle_list is not None:
idcs_rows = idcs_rows & mask_df.particle.isin(particle_list)
# Finally slice by the name of the lines
if lines_list is not None:
idcs_rows = idcs_rows & mask_df.index.isin(lines_list)
return mask_df.loc[idcs_rows]
[docs]
class Spectrum(LineFinder):
"""
This class creates an astronomical cube variable for an integral field spectrograph observation.
The user needs to provide wavelength and flux arrays. Additionally, the user can include a flux uncertainty
array. This uncertainty must be in the same units as the flux. The cube should include its ``redshift``.
If the flux units result in very small magnitudes, the user should also provide a normalization to make the flux
magnitude well above zero. Otherwise, the profile fittings are likely to fail. This normalization is removed in the
output measurements.
The user can provide a ``pixel_mask`` boolean array with the pixels **to be excluded** from the measurements.
The default ``units_wave`` are angtroms (Å), additional options are: um, nm, Hz, cm, mm
The default ``units_flux`` are Flam (erg s^-1 cm^-2 Å^-1), additional options are: Fnu, Jy, mJy, nJy
The user can also specify an instrument FWHM (``inst_FWHM``), so it can be taken into account during the measurements.
The user can provide a ``pixel_mask`` boolean array with the pixels **to be excluded** from the measurements.
:cvar fit: Fitting function instance from :class:`lime.workflow.SpecTreatment`.
:cvar plot: Plotting function instance from :class:`lime.plots.SpectrumFigures`.
:param input_wave: wavelength array.
:type input_wave: numpy.array
:param input_flux: flux array.
:type input_flux: numpy.array
:param input_err: flux sigma uncertainty array.
:type input_err: numpy.array, optional
:param redshift: observation redshift.
:type redshift: float, optional
:param norm_flux: spectrum flux normalization.
:type norm_flux: float, optional
:param crop_waves: spectrum (minimum, maximum) values
:type crop_waves: np.array, tuple, optional
:param inst_FWHM: Instrumental FWHM.
:type inst_FWHM: float, optional
:param units_wave: Wavelength array units. The default value is "A".
:type units_wave: str, optional
:param units_flux: Flux array physical units. The default value is "Flam".
:type units_flux: str, optional
:param pixel_mask: Boolean array with True values for rejected pixels.
:type pixel_mask: np.array, optional
:param id_label: identity label for the spectrum object
:type id_label: str, optional
"""
# File manager for a Cube created from an observation file
_fitsMgr = None
def __init__(self, input_wave=None, input_flux=None, input_err=None, redshift=None, norm_flux=None, crop_waves=None,
inst_FWHM=None, units_wave='AA', units_flux='FLAM', pixel_mask=None, id_label=None, review_inputs=True):
# Load parent classes
LineFinder.__init__(self)
# Class attributes
self.label = None
self.wave = None
self.wave_rest = None
self.flux = None
self.err_flux = None
self.cont = None
self.cont_std = None
self.frame = None
self.redshift = None
self.norm_flux = None
self.inst_FWHM = None
self.units_wave = None
self.units_flux = None
# Treatments objects
self.fit = SpecTreatment(self)
self.infer = DetectionInference(self)
# Plotting objects
self.plot = SpectrumFigures(self)
self.check = SpectrumCheck(self)
# Review and assign the attibutes data
if review_inputs:
self._set_attributes(input_wave, input_flux, input_err, redshift, norm_flux, crop_waves, inst_FWHM,
units_wave, units_flux, pixel_mask, id_label)
return
@classmethod
def from_cube(cls, cube, idx_j, idx_i, label=None):
# Load parent classes
spec = cls(review_inputs=False)
# Class attributes
spec.label = label
spec.wave = cube.wave
spec.wave_rest = cube.wave_rest
spec.flux = cube.flux[:, idx_j, idx_i]
spec.err_flux = None if cube.err_flux is None else cube.err_flux[:, idx_j, idx_i]
spec.norm_flux = cube.norm_flux
spec.redshift = cube.redshift
spec.frame = pd.DataFrame(np.empty(0, dtype=_LOG_EXPORT_RECARR))
spec.inst_FWHM = cube.inst_FWHM
spec.units_wave = cube.units_wave
spec.units_flux = cube.units_flux
# Check if masked array
if np.ma.isMaskedArray(spec.flux):
spec.wave = np.ma.masked_array(spec.wave, cube.flux[:, idx_j, idx_i].mask)
spec.wave_rest = np.ma.masked_array(cube.wave_rest, cube.flux[:, idx_j, idx_i].mask)
return spec
@classmethod
def from_file(cls, file_address, instrument, mask_flux_entries=None, **kwargs):
"""
This method creates a lime.Spectrum object from an observational (.fits) file. The user needs to introduce the
file address location and the name of the instrument of survey.
Currently, this method supports NIRSPEC, ISIS, OSIRIS and SDSS as input instrument sources. This method will
lower case the input instrument or survey name.
The user can include list of pixel values to generate a mask from the input file flux entries. For example, if the
user introduces [np.nan, 'negative'] the output spectrum will mask np.nan entries and negative fluxes.
This method provides the instrument observational units and normalization but the user should introduce
the additional LiMe.Spectrum arguments (such as the observation redshift).
:param file_address: Input file location address.
:type file_address: Path, string
:param instrument: Input file instrument or survey name
:type instrument: str
:param mask_flux_entries: List of pixel values to mask from flux array
:type mask_flux_entries: list
:param kwargs: lime.Spectrum arguments.
:return: lime.Spectrum
"""
# Create file manager object to administrate the file source and observation properties
cls._fitsMgr = OpenFits(file_address, instrument, cls.__name__)
# Load the scientific data from the file
fits_args = cls._fitsMgr.parse_data_from_file(cls._fitsMgr.file_address, mask_flux_entries)
# Update the parameters file parameters with the user parameters
obs_args = {**fits_args, **kwargs}
# Create the LiMe object
return cls(**obs_args)
@classmethod
def from_survey(cls, target_id, survey, mask_flux_entries=None, **kwargs):
"""
This method creates a lime.Spectrum object from a survey observational (.fits) file. The user needs to provide an
object ID alongside the calague organization labels to identify the file.
Currently, this method supports the DESI survey. This method will lower case the input survey name.
The user can include list of pixel values to generate a mask from the input file flux entries. For example, if the
user introduces [np.nan, 'negative'] the output spectrum will mask np.nan entries and negative fluxes.
This method provides the arguments necesary to create the LiMe.Spectrum object. However, the user should provide
the indexation values to locate the file on the survey. For example, for the DESI survey these would be the
catalogue (i.e. healpix), program (i.e. dark) and release (fuji).
:param file_address: Input object ID label.
:type file_address: str
:param survey: Input object survey name
:type survey: str
:param mask_flux_entries: List of pixel values to mask from flux array
:type mask_flux_entries: list
:param kwargs: Survey indexation arguments for the object
:return: lime.Spectrum
"""
# Create file manager object to administrate the file source and observation properties
cls._fitsMgr = OpenFits(target_id, survey, cls.__name__)
# Load the scientific data from the file
fits_args = cls._fitsMgr.parse_data_from_url(cls._fitsMgr.file_address, mask_flux_entries, **kwargs)
# Create the LiMe object
return cls(**fits_args)
def _set_attributes(self, input_wave, input_flux, input_err, redshift, norm_flux, crop_waves, inst_FWHM, units_wave,
units_flux, pixel_mask, label):
# Class attributes
self.label = label
self.inst_FWHM = np.nan if inst_FWHM is None else inst_FWHM
# Review the inputs
check_inputs_arrays(input_wave, input_flux, input_err, self)
# Checks units
self.units_wave, self.units_flux = check_units(units_wave, units_flux)
# Check redshift and normalization
self.redshift, self.norm_flux = check_redshift_norm(redshift, norm_flux, input_flux, self.units_flux)
# Start cropping the input spectrum if necessary
input_wave, input_flux, input_err, pixel_mask = cropping_spectrum(crop_waves, input_wave, input_flux, input_err,
pixel_mask)
# Normalization and masking
self.wave, self.wave_rest, self.flux, self.err_flux = spec_normalization_masking(input_wave, input_flux,
input_err, pixel_mask,
self.redshift, self.norm_flux)
# Check nan entries and mask quality
check_spectrum_axes(self)
# Generate empty dataframe to store measurement use cwd as default storing folder # TODO we are not using this
self.frame = pd.DataFrame(np.empty(0, dtype=_LOG_EXPORT_RECARR))
return
def unit_conversion(self, wave_units_out=None, flux_units_out=None, norm_flux=None):
"""
This function converts spectrum wavelength array, the flux array or both arrays units.
The user can also provide a flux normalization for the spectrum flux array.
The wavelength units available are AA (angstroms), um, nm, Hz, cm, mm
The flux units available are Flam (erg s^-1 cm^-2 Å^-1), Fnu (erg s^-1 cm^-2 Hz^-1), Jy, mJy, nJy
:param wave_units_out: Wavelength array units
:type wave_units_out: str, optional
:param flux_units_out: Flux array units
:type flux_units_out: str, optional
:param norm_flux: Flux normalization
:type norm_flux: float, optional
"""
# Remove existing normalization
if (self.norm_flux != 1) and (self.norm_flux is not None):
# Remove mask and normalization
input_mask = self.flux.mask if np.ma.isMaskedArray(self.flux) else None
flux_arr = self.flux.data * self.norm_flux if input_mask is None else self.flux * self.norm_flux
err_arr = self.err_flux.data * self.norm_flux if input_mask is None else self.err_flux * self.norm_flux
# Re-apply mask
self.flux = flux_arr if input_mask is None else np.ma.masked_array(flux_arr, self.flux.mask)
self.err_flux = err_arr if input_mask is None else np.ma.masked_array(err_arr, self.err_flux.mask)
self.norm_flux = None
# Convert the requested units to astropy unit object
wave_units_out = au.Unit(wave_units_out) if wave_units_out is not None else None
flux_units_out = au.Unit(flux_units_out) if flux_units_out is not None else None
# Dispersion axes conversion
if wave_units_out is not None:
# Convert the data
output_wave = unit_conversion(self.units_wave, wave_units_out, wave_array=self.wave)
# Assign the new values
self.wave = output_wave
self.wave_rest = np.ma.masked_array(self.wave.data / (1+self.redshift), self.wave.mask)
self.units_wave = wave_units_out
# Flux axis conversion
if flux_units_out is not None:
# Flux conversion
output_flux = unit_conversion(self.units_flux, flux_units_out, wave_array=self.wave,
flux_array=self.flux, dispersion_units=self.units_wave)
# Flux uncertainty conversion
output_err = None if self.err_flux is None else unit_conversion(self.units_flux, flux_units_out,
wave_array=self.wave,
flux_array=self.err_flux,
dispersion_units=self.units_wave)
# Assign new values
self.flux = output_flux
self.err_flux = None if self.err_flux is None else output_err
self.units_flux = flux_units_out
# Switch the normalization
if norm_flux is not None:
# Remove mask and then apply normalization
input_mask = self.flux.mask if np.ma.isMaskedArray(self.flux) else None
flux_arr = self.flux.data / norm_flux if input_mask is None else self.flux / norm_flux
err_arr = self.err_flux.data / norm_flux if input_mask is None else self.err_flux / norm_flux
# Re-apply mask
self.flux = flux_arr if input_mask is None else np.ma.masked_array(flux_arr, self.flux.mask)
self.err_flux = err_arr if input_mask is None else np.ma.masked_array(err_arr, self.err_flux.mask)
self.norm_flux = norm_flux
return
def save_frame(self, fname, page='FRAME', param_list='all', header=None, column_dtypes=None,
safe_version=True):
"""
This function saves the spectrum measurements at the ``file_address`` provided by the user.
The accepted extensions are ".txt", ".pdf", ".fits", ".asdf" and ".xlsx".
For ".fits" and ".xlsx" files the user can provide a page name for the HDU/sheet with the ``ext`` argument.
The default name is "LINESFRAME".
The user can specify the ``parameters`` to be saved in the output file.
For ".fits" files the user can provide a dictionary to add to the ``fits_header``. The user can provide a ``column_dtypes``
string or dictionary for the output fits file record array. This overwrites LiMe deafult formatting and it must have the
same columns as the file names.
:param fname: Output log address.
:type fname: str, Path
:param param_list: Output parameters list. The default value is "all"
:type param_list: list
:param page: Name for the HDU/sheet for ".fits"/".xlsx" files.
:type page: str, optional
:param header: Dictionary for ".fits" and ".asdf" files.
:type header: dict, optional
:param column_dtypes: Conversion variable for the `records array <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_records.html>`.
for the output fits file. If a string or type, the data type to store all columns. If a dictionary, a mapping of column
names and indices (zero-indexed) to specific data types.
:type column_dtypes: str, dict, optional
:param safe_version: Save LiMe version as footnote or page header on the output log. The default value is True.
:type safe_version: bool, optional
"""
# Meta parameters from the observations
meta_params = {'LiMe': __version__,
'u_wave': self.units_wave.to_string(),
'u_flux': self.units_flux.to_string(),
'redshift': self.redshift,
'id': self.label}
# Save the file
save_frame(fname, self.frame, page, param_list, header, column_dtypes=column_dtypes,
safe_version=safe_version, **meta_params)
return
def load_frame(self, fname, page='LINESFRAME'):
"""
This function loads a lines measurements log as a lime.Spectrum.log variable.
The appropriate variables are normalized by the current spectrum flux normalization.
:param fname: Input log address.
:type fname: str, Path
:param page: Name of the HDU/sheet for ".fits"/".xlsx" files.
:type page: str, optional
"""
# Load the log file if it is a log file
log_df = check_file_dataframe(fname, pd.DataFrame, ext=page)
# Security checks:
if log_df.index.size > 0:
line_list = log_df.index.values
# Get the first line in the log
line_0 = Line.from_log(line_list[0], log_df, norm_flux=self.norm_flux)
# Confirm the lines in the log match the one of the spectrum
if line_0.units_wave[0] != self.units_wave:
_logger.warning(f'Different units in the spectrum dispersion ({self.units_wave}) axis and the lines log'
f' in {line_0.units_wave[0]}')
# Confirm all the log lines have the same units
au_str = 'A' if line_0.units_wave[0] == 'Angstrom' else str(line_0.units_wave)
same_units_check = np.flatnonzero(np.core.defchararray.find(line_list.astype(str), au_str) != -1).size == line_list.size
if not same_units_check:
_logger.warning(f'The log has lines with different units')
# Assign the log
self.frame = log_df
else:
_logger.info(f'Log file with 0 entries ({fname})')
return
def update_redshift(self, redshift):
# Check if it is a masked array
if np.ma.isMaskedArray(self.wave):
input_wave = self.wave.data
input_flux = self.flux.data
input_err = self.err_flux.data
pixel_mask = self.wave.mask
else:
input_wave = self.wave
input_flux = self.flux
input_err = self.err_flux
pixel_mask = None
# Normalization and masking
self.redshift = redshift
self.wave, self.wave_rest, self.flux, self.err_flux = spec_normalization_masking(input_wave, input_flux,
input_err, pixel_mask,
self.redshift, 1)
return
[docs]
class Cube:
"""
This class creates an astronomical cube for an integral field spectrograph observation.
The user needs to provide 1D wavelength and 3D flux arrays. Additionally, the user can include a 3D flux uncertainty
array. This uncertainty must be in the same units as the flux. The cube should include its ``redshift``.
If the flux units result in very small magnitudes, the user should also provide a normalization to make the flux
magnitude well above zero. Otherwise, the profile fittings are likely to fail. This normalization is removed in the
output measurements.
The default ``units_wave`` are angtroms (Å), additional options are: um, nm, Hz, cm, mm
The default ``units_flux`` are Flam (erg s^-1 cm^-2 Å^-1), additional options are: Fnu, Jy, mJy, nJy
The user can also specify an instrument FWHM (``inst_FWHM``), so it can be taken into account during the measurements.
The user can provide a ``pixel_mask`` boolean 3D array with the pixels **to be excluded** from the measurements.
The observation object should include an astropy World Coordinate System (``wcs``) to export the spatial coordinate
system to the measurement files.
:param input_wave: wavelength 1D array
:type input_wave: numpy.array
:param input_flux: flux 3D array
:type input_flux: numpy.array
:param input_err: flux sigma uncertainty 3D array.
:type input_err: numpy.array, optional
:param redshift: observation redshift.
:type redshift: float, optional
:param norm_flux: spectrum flux normalization
:type norm_flux: float, optional
:param crop_waves: spectrum (minimum, maximum) values
:type crop_waves: np.array, tuple, optional
:param inst_FWHM: Instrumental FWHM.
:type inst_FWHM: float, optional
:param units_wave: Wavelength units. The default value is "A"
:type units_wave: str, optional
:param units_flux: Flux array physical units. The default value is "Flam"
:type units_flux: str, optional
:param pixel_mask: Boolean 3D array with True values for rejected pixels.
:type pixel_mask: np.array, optional
:param id_label: identity label for the spectrum object
:type id_label: str, optional
:param wcs: Observation `world coordinate system <https://docs.astropy.org/en/stable/wcs/index.html>`_.
:type wcs: astropy WCS, optional
"""
# File manager for a Cube created from an observation file
_fitsMgr = None
def __init__(self, input_wave=None, input_flux=None, input_err=None, redshift=None, norm_flux=None, crop_waves=None,
inst_FWHM=None, units_wave='AA', units_flux='FLAM', pixel_mask=None, id_label=None, wcs=None):
# Review the inputs
check_inputs_arrays(input_wave, input_flux, input_err, self)
# Class attributes
self.obj_name = id_label
self.wave = None
self.wave_rest = None
self.flux = None
self.err_flux = None
self.inst_FWHM = np.nan if inst_FWHM is None else inst_FWHM
self.wcs = wcs
# Treatments objects
self.fit = CubeTreatment(self)
# Plotting objects
self.plot = CubeFigures(self)
self.check = CubeCheck(self)
# Checks units
self.units_wave, self.units_flux = check_units(units_wave, units_flux)
# Check redshift and normalization
self.redshift, self.norm_flux = check_redshift_norm(redshift, norm_flux, input_flux, self.units_flux)
# Start cropping the input spectrum if necessary
input_wave, input_flux, input_err, pixel_mask = cropping_spectrum(crop_waves, input_wave, input_flux, input_err,
pixel_mask)
# Spectrum normalization, redshift and mask calculation
self.wave, self.wave_rest, self.flux, self.err_flux = spec_normalization_masking(input_wave, input_flux,
input_err, pixel_mask,
self.redshift, self.norm_flux)
# Check nan entries and mask quality
check_spectrum_axes(self)
return
@classmethod
def from_file(cls, file_address, instrument, mask_flux_entries=None, **kwargs):
"""
This method creates a lime.Cube object from an observational (.fits) file. The user needs to introduce the
file address location and the name of the instrument of survey.
Currently, this method supports MANGA and MUSE input instrument sources. This method will lower case the input
instrument or survey name.
The user can include list of pixel values to generate a mask from the input file flux entries. For example, if the
user introduces ["nan", "negative"] the output spectrum will mask np.nan entries and negative fluxes.
This method procures the instrument observations units, normalization and wcs but the user should introduce the
LiMe.Spectrum arguments (such as the observation redshift).
:param file_address: Input file location address.
:type file_address: Path, string
:param instrument: Input file instrument or survey name
:type instrument: str
:param mask_flux_entries: List of pixel values to mask from flux array
:type mask_flux_entries: list
:param kwargs: lime.Cube arguments.
:return: lime.Cube
"""
# TODO kwargs are not passing
# Create file manager object to administrate the file source and observation properties
cls._fitsMgr = OpenFits(file_address, instrument, cls.__name__)
# Load the scientific data from the file
fits_args = cls._fitsMgr.parse_data_from_file(cls._fitsMgr.file_address, mask_flux_entries)
# Update the parameters file parameters with the user parameters
obs_args = {**fits_args, **kwargs}
# Create the LiMe object
return cls(**obs_args)
def spatial_masking(self, line, bands=None, param='flux', contour_pctls=(90, 95, 99), output_address=None,
mask_label_prefix=None, header=None):
"""
This function generates a spatial binary mask for an input ``line``.
The ``line`` argument provides the label for the mask spatial image. The bands are read from the ``bands``
dataframe argument.
The mask calculation can be done as a function of three parameters as a function of the ``param`` argument: "flux"
is the sum of the flux on input band, "SN_line" is the signal-to-noise ratio for an emission line and "SN_cont"
is the signal-to-noise of the continuum. The latter two parameters use the `Rola et al. (1994)
<https://ui.adsabs.harvard.edu/abs/1994A%26A...287..676R/abstract>`_ definition.
The number and spread of the binary masks is determined from percentile levels in the ``contour_pctls`` argument.
If the user provides an ``output_address`` this function will be saved as a ".fits" file. If none is provided the
function will return and HDUL variable.
By default, the masks are saved in a ".fits" file with the extension name "MASK_0", "MASK_1"... The user can add a
prefix to these names witht he ```mask_label_prefix`` argument.
:param line: Line label for the spatial image.
:param type: str
:param bands: Bands dataframe (or file address to the dataframe).
:type bands: pandas.Dataframe, str, path.Pathlib, optional
:param param: Parameter label for mask calculation. The default value is 'flux'.
:type param: str
:param contour_pctls: Sorted percentile values for the binary mask calculation.
:type contour_pctls: np.array
:param mask_label_prefix: Prefix for the mask page name in output file
:type mask_label_prefix: str, optional
:param output_address: File location to store the mask.
:type output_address: str, optional
:param header: Dictionary for mask ".fits" file header
:type header: dict, optional
:return:
"""
# Check the function inputs
contour_pctls = np.atleast_1d(contour_pctls)
if not np.all(np.diff(contour_pctls) > 0):
raise Error(f'The mask percentiles ({contour_pctls}) must be in increasing order')
inver_percentiles = np.flip(contour_pctls)
if not param in ['flux', 'SN_line', 'SN_cont']:
raise Error(f'The mask calculation parameter ({param}) is not recognised. Please use "flux", "SN_line", "SN_cont"')
# Line for the background image
line_bg = Line(line, bands)
# Get the band indexes
idcsEmis, idcsCont = define_masks(self.wave, line_bg.mask * (1 + self.redshift), line_mask_entry=line_bg.pixel_mask,
line=line_bg.label)
signal_slice = self.flux[idcsEmis, :, :]
signal_slice = signal_slice if not np.ma.isMaskedArray(signal_slice) else signal_slice.data
# Get indeces all nan entries to exclude them from the analysis
idcs_all_nan = np.all(np.isnan(signal_slice.data), axis=0)
# If not mask parameter provided we use the flux percentiles
if param == 'flux':
param = self.units_flux
param_image = signal_slice.sum(axis=0)
# S/N cont
elif param == 'SN_cont':
param_image = np.nanmean(signal_slice, axis=0) / np.nanstd(signal_slice, axis=0)
# S/N line
elif param == 'SN_line':
n_pixels = np.sum(idcsCont)
cont_slice = self.flux[idcsCont, :, :]
cont_slice = cont_slice if not np.ma.isMaskedArray(cont_slice) else cont_slice.data
Amp_image = np.nanmax(signal_slice, axis=0) - np.nanmean(cont_slice, axis=0)
std_image = np.nanstd(cont_slice, axis=0)
param_image = (np.sqrt(2 * n_pixels * np.pi) / 6) * (Amp_image / std_image)
else:
raise LiMe_Error(f'Parameter {param} is not recognized please use: "flux", "SN_line" or "SN_cont"')
# Percentiles vector for the target parameter
param_array = np.nanpercentile(param_image, inver_percentiles)
# If minimum level not provided by user use lowest contour_level
min_level = param_array[-1]
# Containers for the mask parameters
mask_dict = {}
param_level = {}
boundary_dict = {}
# Loop throught the counter levels and compute the
for i, n_levels in enumerate(param_array):
# # Operation every element
if i == 0:
maParamImage = np.ma.masked_where((param_image >= param_array[i]) &
(param_image >= min_level),
param_image)
else:
maParamImage = np.ma.masked_where((param_image >= param_array[i]) &
(param_image < param_array[i - 1]) &
(param_image >= min_level),
param_image)
if np.sum(maParamImage.mask) > 0:
mask_dict[f'mask_{i}'] = maParamImage.mask & ~idcs_all_nan
boundary_dict[f'mask_{i}'] = inver_percentiles[i]
param_level[f'mask_{i}'] = param_array[i]
# Use as HDU as container for the mask
hdul = fits.HDUList([fits.PrimaryHDU()])
# Recover coordinates from the wcs to store in the headers:
hdr_coords = extract_wcs_header(self.wcs, drop_axis='spectral')
for idx_region, region_items in enumerate(mask_dict.items()):
region_label, region_mask = region_items
# Metadata for the fits page
signal_slice = signal_slice if not np.ma.isMaskedArray(signal_slice) else signal_slice.data
hdr_i = fits.Header({'PARAM': param,
'PARAMIDX': boundary_dict[region_label],
'PARAMVAL': param_level[region_label],
'NUMSPAXE': np.sum(region_mask)})
# Add WCS information
if hdr_coords is not None:
hdr_i.update(hdr_coords)
# Add user information
if header is not None:
page_hdr = header.get(f'{mask_label_prefix}{region_label}', None)
page_hdr = header if page_hdr is None else page_hdr
hdr_i.update(page_hdr)
# Extension for the mask
mask_ext = region_label if mask_label_prefix is None else f'{mask_label_prefix}{region_label}'
# Mask HDU
mask_hdu = fits.ImageHDU(name=mask_ext, data=region_mask.astype(int), ver=1, header=hdr_i)
hdul.append(mask_hdu)
# Output folder computed from the output address
output_address = Path(output_address) if output_address is not None else None
# Return an array with the masks
if output_address is not None:
if output_address.parent.is_dir():
hdul.writeto(output_address, overwrite=True, output_verify='fix')
output_func = None
else:
raise LiMe_Error(f'Mask could not be saved. Folder not found: {output_address.parent.as_posix()}')
# Return the hdul
else:
output_func = hdul
return output_func
def unit_conversion(self, units_wave=None, units_flux=None, norm_flux=None):
"""
This function converts cube wavelength array and/or the flux array units.
The user can also provide a flux normalization for the spectrum flux array.
The wavelength units available are A (angstroms), um, nm, Hz, cm, mm
The flux units available are Flam (erg s^-1 cm^-2 Å^-1), Fnu (erg s^-1 cm^-2 Hz^-1), Jy, mJy, nJy
:param units_wave: Wavelength array units
:type units_wave: str, optional
:param units_flux: Flux array units
:type units_flux: str, optional
:param norm_flux: Flux normalization
:type norm_flux: float, optional
"""
# Dispersion axes conversion
if units_wave is not None:
# Remove the masks for the conversion
input_wave = self.wave.data if np.ma.isMaskedArray(self.wave) else self.wave
output_wave = unit_conversion(self.units_wave, units_wave, wave_array=input_wave)
# Reflect the new units
if np.ma.isMaskedArray(self.wave):
self.wave = np.ma.masked_array(output_wave, self.wave.mask)
self.wave_rest = np.ma.masked_array(output_wave/(1+self.redshift), self.wave.mask)
else:
self.wave = output_wave
self.wave_rest = output_wave/(1+self.redshift)
self.units_wave = units_wave
# Flux axis conversion
if units_flux is not None:
# Remove the masks for the conversion
input_wave = self.wave.data if np.ma.isMaskedArray(self.wave) else self.wave
input_flux = self.flux.data if np.ma.isMaskedArray(self.flux) else self.flux
input_err = self.err_flux.data if np.ma.isMaskedArray(self.err_flux) else self.err_flux
# TODO this is slow
flux_shape = input_flux.shape
y_range, x_range = np.arange(flux_shape[1]), np.arange(flux_shape[2])
if len(flux_shape) == 3:
output_flux = np.empty(flux_shape)
for j in y_range:
for i in x_range:
output_flux[:, j, i] = unit_conversion(self.units_flux, units_flux, wave_array=self.wave,
flux_array=input_flux[:, j, i], dispersion_units=self.units_wave)
else:
output_flux = unit_conversion(self.units_flux, units_flux, wave_array=self.wave,
flux_array=input_flux, dispersion_units=self.units_wave)
if input_err is not None:
output_err = unit_conversion(self.units_flux, units_flux, wave_array=input_wave,
flux_array=input_err, dispersion_units=self.units_wave)
# Reflect the new units
if np.ma.isMaskedArray(self.flux):
self.flux = np.ma.masked_array(output_flux, self.flux.mask)
else:
self.flux = output_flux
if input_err is not None:
self.err_flux = np.ma.masked_array(output_err, self.err_flux.mask) if np.ma.isMaskedArray(self.err_flux) else output_err
self.units_flux = units_flux
# Switch the normalization
if norm_flux is not None:
mask_check = np.ma.isMaskedArray(self.flux)
# Remove old
if mask_check:
new_flux = self.flux.data * self.norm_flux / norm_flux
new_err = None if self.err_flux is None else self.err_flux.data * self.norm_flux / norm_flux
self.flux = np.ma.masked_array(new_flux, self.flux.mask)
self.err_flux = None if self.err_flux is None else np.ma.masked_array(new_err, self.err_flux.mask)
else:
self.flux = self.flux * self.norm_flux / norm_flux
self.err_flux = None if self.err_flux is None else self.err_flux * self.norm_flux / norm_flux
self.norm_flux = norm_flux
return
def get_spectrum(self, idx_j, idx_i, id_label=None):
"""
This function returns a lime.Spectrum object from the input array coordinates
:param idx_j: y-axis array coordinate
:type idx_j: int
:param idx_i: x-axis array coordinate
:type idx_i: int
:param id_label: Identity label for spectrum object
:type id_label: str, optional
"""
return Spectrum.from_cube(self, idx_j, idx_i, id_label)
def export_spaxels(self, output_address, mask_file, mask_list=None, log_ext_suffix='_LINELOG', progress_output='bar'):
# Check if the mask variable is a file or an array
mask_dict = check_file_array_mask(mask_file, mask_list)
# Unpack mask dictionary
mask_list = np.array(list(mask_dict.keys()))
mask_data_list = list(mask_dict.values())
# Checks for the data type
err_check = False if self.err_flux is None else True
masked_check = False if np.ma.isMaskedArray(self.flux) is False else True
# Check if the output log folder exists
output_address = Path(output_address)
if not output_address.parent.is_dir():
raise LiMe_Error(f'The folder of the output log file does not exist at {output_address}')
# Determine the spaxels to treat at each mask
total_spaxels, spaxels_dict = 0, {}
for idx_mask, mask_data in enumerate(mask_data_list):
spa_mask, hdr_mask = mask_data
idcs_spaxels = np.argwhere(spa_mask)
total_spaxels += len(idcs_spaxels)
spaxels_dict[idx_mask] = idcs_spaxels
# Spaxel counter to save the data everytime n_save is reached
spax_counter = 0
# HDU_container
hdul = fits.HDUList([fits.PrimaryHDU()])
# Header data
if self.wcs is not None:
hdr_coords = extract_wcs_header(self.wcs, drop_axis='spatial')
else:
hdr_coords = None
# Loop through the masks
n_masks = len(mask_list)
for i in np.arange(n_masks):
# Mask progress indexing
mask_name = mask_list[i]
idcs_spaxels = spaxels_dict[i]
# Loop through the spaxels
n_spaxels = idcs_spaxels.shape[0]
pbar = ProgressBar(progress_output, f'{n_spaxels} spaxels')
print(f'\n\nSpatial mask {i + 1}/{n_masks}) {mask_name} ({n_spaxels} spaxels)')
for j in np.arange(n_spaxels):
idx_j, idx_i = idcs_spaxels[j]
spaxel_label = f'{idx_j}-{idx_i}'
ext_label = f'{spaxel_label}{log_ext_suffix}'
# Spaxel progress message
pbar.output_message(j, n_spaxels, pre_text="", post_text=f'(coordinate {spaxel_label})')
# Recover the spectrum
spec_flux = self.flux[:, idx_j, idx_i] * self.norm_flux
spec_err_flux = self.err_flux[:, idx_j, idx_i] * self.norm_flux if err_check else None
# Remove mask
if masked_check:
spec_flux = spec_flux.data
spec_err_flux = spec_err_flux.data if err_check else None
# Convert to table-HDU format
if err_check:
data_array = np.rec.fromarrays([spec_flux, spec_err_flux], dtype=[('flux', '>f8'), ('flux_err', '>f8')])
else:
data_array = np.rec.fromarrays([spec_flux], dtype=[('flux', '<f8')])
# Create spaxel_page
table_hdu_i = fits.TableHDU(data_array, header=hdr_coords, name=ext_label)
hdul.append(table_hdu_i)
hdul.writeto(output_address, overwrite=True)
hdul.close()
return
[docs]
class Sample(UserDict, OpenFits):
"""
This class creates a dictionary-like variable to store LiMe observations, by the fault it is assumed that these are
``Spectrum`` objects.
The sample is indexed via the input ``log`` parameter, a pandas dataframe, whose levels must be declared via
the ``levels`` parameter. By default, three levels are assumed: an "id" column and a "file" column specifying the object
ID and observation file address respectively. The "line" level refers to the label measurements in the corresponding
The user can specify more levels via the ``levels`` parameter. However, it is recommended to keep this structure: "id"
and "file" levels first and the "line" column last.
To create the LiMe observation variables (``Spectrum`` or ``Cube``) the user needs to specify a ``load_function``.
This is a python method which declares how the observational files are read and parsed and returns a LiMe object.
This ``load_function`` must have 4 parameters: ``log_df``, ``obs_idx``, ``folder_obs`` and ``**kwargs``.
The first and second variable represent the sample ``log`` and a single pandas multi-index entry for the requested
observation. The ``folder_obs`` and ``**kwargs`` are provided at the ``Sample`` creation:
The ``folder_obs`` parameter specifies the root file location for the targeted observation file. This root address
is combined with the corresponding log level ``file`` value. If a ``folder_obs`` is not specified, it is assumed that
the ``file`` log column contains the absolute file address.
The ``**kwargs`` argument specifies keyword arguments used in the creation of the ``Spectrum`` or ``Cube`` objects
such as the ```redshift`` or ``norm_flux`` for example.
The user may also specify the instrument used for the observation. In this case LiMe will use the inbuilt functions
to read the supported instruments. This, however, may not contain all the necessary information to create the LiMe
variable (such as the redshift). In this case, the user can include a load_function which returns a dictionary with
observation parameters not found on the ".fits" file.
:param sample_log: multi-index dataframe with the parameter properties belonging to the ``Sample``.
:type sample_log: pd.Dataframe
:param levels: levels for the sample log dataframe. By default, these levels are "id", "file", "line".
:type levels: list
:param load_function: python method with the instructions to convert the observation file into a LiMe observation.
:type load_function: python method
:param instrument: instrument name responsible for the sample observations.
:type instrument: string, optional.
:param folder_obs: Root address for the observations' location. This address is combined with the "file" log column value.
:type folder_obs: string, optional.
:param kwargs: Additional keyword arguments for the creation of the LiMe observation variables.
"""
def __init__(self, sample_log, levels=('id', 'file', 'line'), load_function=None, instrument=None, folder_obs=None,
units_wave='AA', units_flux='FLAM', **kwargs):
# Initiate the user dictionary with a dictionary of observations if provided
super().__init__()
# Load parent classes
OpenFits.__init__(self, folder_obs, instrument, load_function, 'Sample')
# Function attributes
self.label_list = None
self.objects = None
self.group_list = None
self.levels = list(levels)
# Check the levels on combined labels target log
check_sample_levels(self.levels)
# Checks units
self.units_wave, self.units_flux = check_units(units_wave, units_flux)
self.frame = check_file_dataframe(sample_log, pd.DataFrame, sample_levels=self.levels)
self._load_function = load_function
self.load_params = kwargs
# Functionality objects
self.plot = SampleFigures(self)
self.check = SampleCheck(self)
# Check if there is not a log
if self.frame is None:
_logger.warning(f'Sample was created with a null log')
return
@classmethod
def from_file(cls, id_list, log_list=None, file_list=None, page_list=None, levels=('id', 'file', "line"),
load_function=None, instrument=None, folder_obs=None, **kwargs):
"""
This class creates a dictionary-like variable to store LiMe observations taking a list of observations IDs, line
logs and a list of files.
The sample is indexed via the input ``log`` parameter, a pandas dataframe, whose levels must are declared via
the ``levels`` parameter. By default, three levels are assumed: an "id" column and a "file" column specifying the object
ID and observation file address respectively. The "line" level refers to the label measurements in the corresponding
The user can specify more levels via the ``levels`` parameter. However, it is recommended to keep this structure: "id"
and "file" levels first and the "line" column last.
The sample log levels are created from the input values for the ``id_list``, ``log_list`` and ``file_list`` while
the individual logs from each observation are combined where the line labels in the "line" level. If the input logs
are ".fits" files the user must specify extension name or number via the ``page_list`` parameter.
To create the LiMe observation variables (``Spectrum`` or ``Cube``) the user needs to specify a ``load_function``.
This is a python method which declares how the observational files are read and parsed and returns a LiMe object.
This ``load_function`` must have four parameters: ``log_df``, ``obs_idx``, ``folder_obs`` and ``**kwargs``.
The first and second variable represent the sample ``log`` and a single pandas multi-index entry for the requested
observation. The ``folder_obs`` and ``**kwargs`` are provided at the ``Sample`` creation:
The ``folder_obs`` parameter specifies the root file location for the targeted observation file. This root address
is combined with the corresponding log level ``file`` value. If a ``folder_obs`` is not specified, it is assumed
that the ``file`` log column contains the absolute file address. This is
The ``**kwargs`` argument specifies keyword arguments used in the creation of the ``Spectrum`` or ``Cube`` objects
such as the ```redshift`` or ``norm_flux`` for example.
:param id_list: List of observation names
:type id_list: list
:param log_list: List of observation log data frames or files or pandas data frames
:type log_list: list
:param file_list: List of observation files.
:type file_list: list
:param page_list: List of extension files or names for the observation ".fits" files
:type page_list: list
:param levels: levels for the sample log dataframe. By default, these levels are "id", "file", "line".
:type levels: list
:param load_function: python method with the instructions to convert the observation file into a LiMe observation.
:type load_function: python method
:param instrument: instrument name responsible for the sample observations.
:type instrument: string, optional.
:param folder_obs: Root address for the observations' location. This address is combined with the "file" log
column value.
:type folder_obs: string, optional.
:param kwargs: Additional keyword arguments for the creation of the LiMe observation variables.
"""
# Confirm matching length of entries
check_sample_input_files(log_list, file_list, page_list, id_list)
# Check the levels on combined labels target log
check_sample_levels(levels)
# Loop through observations and combine the log
df_list = []
for i, id_spec in enumerate(id_list):
# Page and spec index
file_spec = None if file_list is None else file_list[i]
page_name = page_list[i] if page_list is not None else 'LINESFRAME'
# Load the log and check the levels
if log_list is not None:
log_i = load_frame(log_list[i], page_name, levels)
df_list.append(review_sample_levels(log_i, id_spec, file_spec))
else:
log_i = pd.DataFrame(columns=["id", "file"], data=(id_spec, file_spec))
log_i.set_index(["id", "file"], inplace=True)
sample_log = pd.concat(df_list)
return cls(sample_log, levels, load_function, instrument, folder_obs, **kwargs)
def load_function(self, log_df, obs_idx, root_address, **kwargs):
# Use loading function
if self._load_function is not None:
load_function_output = self._load_function(log_df, obs_idx, root_address, **kwargs)
else:
load_function_output = {}
# Proceed to create the LiMe object if necessary
if isinstance(load_function_output, dict):
# Get address of observation
file_spec = root_address / obs_idx[log_df.index.names.index('file')]
# User provides a data parser
if self.fits_reader is not None:
spec_data = self.fits_reader(file_spec)
fits_args = {'input_wave': spec_data[0], 'input_flux': spec_data[1], 'input_err': spec_data[2],
**spec_data[4]}
else:
fits_args = {}
# Create observation
obs_args = {**fits_args, **load_function_output}
obs = Spectrum(**obs_args) if self.spectrum_check else Cube(**obs_args)
else:
obs = load_function_output
return obs
def __getitem__(self, id_key):
output = None
valid_check = self._review_df_indexes()
# Proceed to selection
if valid_check:
# Check if Pandas indeces, numpy boolean or scalar key
if isinstance(id_key, pd.Index) or isinstance(id_key, pd.MultiIndex) or isinstance(id_key, pd.Series):
idcs = id_key
elif isinstance(id_key, (np.ndarray, np.bool_)):
idcs = self.frame.index[id_key]
else:
idcs = self.frame.index.get_level_values('id').isin([id_key])
# Not entry found
if np.all(idcs is False):
raise KeyError(id_key)
# Crop sample
output = Sample(self.frame.loc[idcs], self.levels, self.load_function, self.source, self.file_address,
**self.load_params)
return output
def get_observation(self, input_index, default_none=False):
output = None
valid_check = self._review_df_indexes()
if valid_check:
# Case only ID string
if isinstance(input_index, str):
idcs = self.frame.index.get_level_values('id').isin(np.atleast_1d(input_index))
# Not entry found
if np.all(idcs is False):
raise KeyError(input_index)
# Check for logs without lines
if 'line' not in self.frame.index.names:
obj_idcs = self.frame.loc[idcs].index.unique()
else:
obj_idcs = self.frame.loc[idcs].iloc[0].name
else:
obj_idcs = input_index
# # Not entry found
# if len(obj_idcs) > 1:
# raise LiMe_Error(f'Multiple observations match the input id: {obj_idcs}')
# Load the LiMe object
output = self.load_function(self.frame, obj_idcs, self.file_address, **self.load_params)
return output
def get_spectrum(self, idx):
if isinstance(idx, pd.Series):
idx_true = self.frame.loc[idx].index
if idx_true.size > 1:
raise LiMe_Error(f'Input sample spectrum extraction has more than one existing entry')
idx_in = self.frame.loc[idx_true].index.values[0]
else:
idx_in = idx
return self.load_function(self.frame, idx_in, self.file_address, **self.load_params)
@property
def index(self):
return self.frame.index
@property
def loc(self):
return self.frame.loc
@property
def ids(self):
return self.frame.index.get_level_values('id')
@property
def files(self):
return self.frame.index.get_level_values('file')
@property
def lines(self):
return self.frame.index.get_level_values('line')
@property
def size(self):
return self.frame.index.size
def load_frame(self, dataframe, ext='LINESFRAME', sample_levels=['id', 'line']):
# Load the log file if it is a log file
log_df = check_file_dataframe(dataframe, pd.DataFrame, ext=ext, sample_levels=sample_levels)
# Security checks:
if log_df.index.size > 0:
if self.units_wave is not None:
line_list = log_df.index.values
# Get the first line in the log
line_0 = Line.from_log(line_list[0], log_df, norm_flux=self.norm_flux)
# Confirm the lines in the log match the one of the spectrum
# TODO we need something more advance for the line_0 units
# if line_0.units_wave != self.units_wave:
# _logger.warning(f'Different units in the spectrum dispersion ({self.units_wave}) axis and the '
# f' lines log in {line_0.units_wave}')
# Confirm all the log lines have the same units
same_units_check = np.flatnonzero(np.core.defchararray.find(line_list.astype(str), line_0.units_wave) != -1).size == line_list.size
if not same_units_check:
_logger.warning(f'The log has lines with different units')
else:
_logger.info(f'Log file with 0 entries ({dataframe})')
# Assign the log
self.frame = log_df
return
def save_frame(self, fname, ext='LINESFRAME', param_list='all', fits_header=None):
# Save the file
save_frame(fname, self.frame, ext, param_list, fits_header)
return
def extract_fluxes(self, flux_type='mixture', sample_level='line', column_names='line_flux', column_positions=1):
return extract_fluxes(self.frame, flux_type, sample_level, column_names, column_positions)
def normalize_fluxes(self, normalization_line, flux_entries=['line_flux', 'line_flux_err'], column_names=None,
column_positions=[1, 2]):
return normalize_fluxes(self.frame, normalization_line, flux_entries, column_names, column_positions)
def _review_df_indexes(self):
# Check there is a log
check = False
if self.frame is None:
_logger.info(f'Sample does not contain observations')
# Check there is load function
elif self.load_function is None:
_logger.info(f'The sample does not contain a load_function')
# Check there is a 'file' index
elif 'id' not in self.frame.index.names:
_logger.info(f'The sample log does not contain an "id" index column the observation label')
# Check there is a 'file' index
elif 'file' not in self.frame.index.names:
_logger.info(f'The sample log does not contain a "file" index column with the observation file')
else:
check = True
return check
class ObsManager(OpenFits):
def __init__(self, file_address, file_source, lime_object, load_function=None, **kwargs):
# Initialize the .fits reading class
OpenFits.__init__(self, file_address, file_source, lime_object, load_function)
# Define attribute
self.spectrum_check = False
self.load_function = None
self.user_params = None
# Store the user arguments for the spectra
self.user_params = kwargs
# State the type of spectra
if file_source is None:
self.spectrum_check = True
else:
self.spectrum_check = True if self.source in list(SPECTRUM_FITS_PARAMS.keys()) else False
# Assign input load function
if load_function is not None:
self.load_function = load_function
# Assign the
elif (file_source is not None) and (self.fits_reader is not None):
self.load_function = self.default_file_parser
# No load function nor instrument
else:
raise LiMe_Error(f'To create a Sample object you need to provide "load_function" or provide a "instrument" '
f'supported by LiMe')
return
def load_function(self, file_spec, log_df=None, id_spec=None, **kwargs):
default_args = self.fits_reader(file_spec) if self.fits_reader is not None else {}
user_args = self.user_params if self.user_params is not None else {}
# Recover the user params
user_args = user_args if user_args is not None else self.user_params
# Update with the default arguments
default_args = {**default_args, **user_args}
# Run the load function
load_function_output = self.load_function(file_spec, log_df, id_spec, **default_args)
# Proceed to create LiMe object if necessary
if isinstance(load_function_output, dict):
obs_args = {**default_args, **load_function_output}
obs = Spectrum(**obs_args) if self.spectrum_check else Cube(**obs_args)
else:
obs = load_function_output
return obs
def default_file_parser(self, log_df=None, id_spec=None, **kwargs):
file_spec = self.id_spec[log_df.index.names.index('file')]
fits_args = self.fits_reader(file_spec)
return fits_args