"""
calibrator.py
====================================
Module for the calibrator functionality.
"""
import numpy as np
import pandas as pd
from configparser import ConfigParser
from pathlib import Path
from os.path import join
from .external_instrument_model import ExternalInstrumentModel
from gaiaxpy.config import config_path
from gaiaxpy.core.satellite import BANDS
from gaiaxpy.core import _get_spectra_type, _load_xpmerge_from_csv, \
_load_xpsampling_from_csv, _progress_tracker, \
_validate_arguments, _validate_wl_sampling, satellite
from gaiaxpy.input_reader import InputReader
from gaiaxpy.output import SampledSpectraData
from gaiaxpy.spectrum import _get_covariance_matrix, AbsoluteSampledSpectrum, \
SampledBasisFunctions, XpContinuousSpectrum
config_parser = ConfigParser()
config_parser.read(join(config_path, 'config.ini'))
[docs]def calibrate(
input_object,
sampling=None,
truncation=False,
output_path='.',
output_file='output_spectra',
output_format=None,
save_file=True,
username=None,
password=None):
"""
Calibration utility: calibrates the input internally-calibrated
continuously-represented mean spectra to the absolute system. An absolute
spectrum sampled on a user-defined or default wavelength grid is created
for each set of BP and RP input spectra. If either band is missing, the
output spectrum will only cover the range covered by the available data.
Args:
input_object (object): Path to the file containing the mean spectra
as downloaded from the archive in their continuous representation,
a list of sources ids (string or long), or a pandas DataFrame.
sampling (ndarray): 1D array containing the desired sampling in
absolute wavelengths [nm].
truncation (bool): Toggle truncation of the set of bases. The level
of truncation to be applied is defined by the recommended value in
the input files.
output_path (str): Path where to save the output data.
output_file (str): Name of the output file.
output_format (str): Format to be used for the output file. If no format
is given, then the output file will be in the same format as the
input file.
save_file (bool): Whether to save the output in a file. If false, output_format
and output_file are ignored.
username (str): Cosmos username, only suggested when input_object is a list or ADQL query.
password (str): Cosmos password, only suggested when input_object is a list or ADQL query.
Returns:
(tuple): tuple containing:
DataFrame: The values for all sampled absolute spectra.
ndarray: The sampling used to calibrate the input spectra (user-provided or default).
"""
# Call internal method
return _calibrate(
input_object,
sampling,
truncation,
output_path,
output_file,
output_format,
save_file,
username=username,
password=password)
def _calibrate(
input_object,
sampling=None,
truncation=False,
output_path='.',
output_file='output_spectra',
output_format=None,
save_file=True,
bp_model='v375wi',
rp_model='v142r',
username=None,
password=None):
"""
Internal method of the calibration utility. Refer to "calibrate".
Args:
bp_model (str): BP model to use.
rp_model (str): RP model to use.
Returns:
DataFrame: A list of all sampled absolute spectra.
ndarray: The sampling used to calibrate the spectra.
Raises:
ValueError: If the sampling is out of the expected boundaries.
"""
_validate_wl_sampling(sampling)
_validate_arguments(_calibrate.__defaults__[3], output_file, save_file)
parsed_input_data, extension = InputReader(input_object, _calibrate, username, password)._read()
label = 'calibrator'
xp_design_matrices, xp_merge = _generate_xp_matrices_and_merge(label, sampling, bp_model, rp_model)
# Create sampled basis functions
spectra_list = _create_spectra(parsed_input_data, truncation, xp_design_matrices, xp_merge)
# Generate output
spectra_df = pd.DataFrame.from_records([spectrum._spectrum_to_dict() for spectrum in spectra_list])
spectra_type = _get_spectra_type(spectra_list)
spectra_df.attrs['data_type'] = spectra_type
positions = spectra_list[0]._get_positions()
output_data = SampledSpectraData(spectra_df, positions)
# Save output
Path(output_path).mkdir(parents=True, exist_ok=True)
output_data.save(save_file, output_path, output_file, output_format, extension)
return spectra_df, positions
def _create_merge(xp, sampling):
"""
Create the weight information on the input sampling grid.
Args:
xp (str): Band (either BP or RP).
sampling (ndarray): 1D array containing the sampling grid.
Returns:
dict: A dictionary containing a BP and an RP array with weights.
"""
wl_high = satellite.BP_WL.high
wl_low = satellite.RP_WL.low
if xp == BANDS.bp:
weight = np.array([1.0 if wl < wl_low else 0.0 if wl > wl_high else (
1.0 - (wl - wl_low) / (wl_high - wl_low)) for wl in sampling])
elif xp == BANDS.rp:
weight = np.array([0.0 if wl < wl_low else 1.0 if wl > wl_high else (
wl - wl_low) / (wl_high - wl_low) for wl in sampling])
else:
raise ValueError(f'Given band is {xp}, but should be either bp or rp.')
return weight
def _generate_xp_matrices_and_merge(label, sampling, bp_model, rp_model):
"""
Get xp_design_matrices and xp_merge.
"""
def _get_file_for_xp(xp, key, bp_model=bp_model, rp_model=rp_model):
file_name = config_parser.get(label, key)
if xp == BANDS.bp:
model = bp_model
elif xp == BANDS.rp:
model = rp_model
return join(config_path, f"{file_name.replace('xp', xp).replace('model', model)}".format(key))
xp_design_matrices = {}
if sampling is None:
xp_sampling_grid, xp_merge = _load_xpmerge_from_csv(label, bp_model=bp_model)
xp_design_matrices = _load_xpsampling_from_csv(label, bp_model=bp_model)
for xp in BANDS:
xp_design_matrices[xp] = SampledBasisFunctions.from_design_matrix(
xp_sampling_grid, xp_design_matrices[xp])
else:
xp_merge = {}
for xp in BANDS:
instr_model = ExternalInstrumentModel.from_config_csv(
_get_file_for_xp(
xp, 'dispersion'), _get_file_for_xp(
xp, 'response'), _get_file_for_xp(
xp, 'bases'))
xp_merge[xp] = _create_merge(xp, sampling)
xp_design_matrices[xp] = SampledBasisFunctions.from_external_instrument_model(
sampling, xp_merge[xp], instr_model)
return xp_design_matrices, xp_merge
def _create_spectra(parsed_spectrum_file, truncation, design_matrices, merge):
"""
Internal wrapper function. Allows _create_spectrum to use the generic
progress tracker.
"""
spectra_list = []
nrows = len(parsed_spectrum_file)
@_progress_tracker
def create_spectrum(row, *args):
truncation, design_matrices, merge = args[:3]
spectrum = _create_spectrum(
row, truncation, design_matrices, merge)
spectra_list.append(spectrum)
for index, row in parsed_spectrum_file.iterrows():
create_spectrum(row, truncation, design_matrices, merge, index, nrows)
return spectra_list
def _create_spectrum(row, truncation, design_matrix, merge):
"""
Create a single sampled absolute spectrum from the input continuously-represented
mean spectrum and design matrix.
Args:
row (DataFrame): Single row in a DataFrame containing the entry
for one source in the mean spectra file. This will include columns for
both bands (although one could be missing).
truncation (bool): Toggle truncation of the set of bases.
design_matrix (ndarray): 2D array containing the basis functions
sampled on the pseudo-wavelength grid (either user-defined or default).
merge (dict): Dictionary containing an array of weights per BP and one for RP.
These have one value per sample and define the contributions from BP and RP
to the joined absolute spectrum.
Returns:
AbsoluteSampledSpectrum: The sampled absolute spectrum.
"""
source_id = row['source_id']
cont_dict = {}
# Split both bands
source_id = row['source_id']
cont_dict = {}
# Split both bands
for band in BANDS:
try:
covariance_matrix = _get_covariance_matrix(row, band)
if covariance_matrix is not None:
continuous_object = XpContinuousSpectrum(
source_id,
band,
row[f'{band}_coefficients'],
covariance_matrix,
row[f'{band}_standard_deviation'])
cont_dict[band] = continuous_object
if truncation:
recommended_truncation = row[f'{band}_n_relevant_bases']
else:
recommended_truncation = -1
except Exception:
# If the band is not present, ignore it
continue
return AbsoluteSampledSpectrum(
source_id, cont_dict, design_matrix, merge,
truncation=recommended_truncation)