Source code for gaiaxpy.input_reader.query_reader

import re

from gaiaxpy.core.server import data_release, gaia_server
from gaiaxpy.core.version import __version__
from .archive_reader import ArchiveReader
from .dataframe_reader import DataFrameReader
from ..core.custom_errors import SelectorNotImplementedError

not_supported_functions = ['apply_colour_equation']


[docs] class QueryReader(ArchiveReader): def __init__(self, content, function, truncation, user=None, password=None, additional_columns=None, selector=None, disable_info=False): if additional_columns is None: additional_columns = dict() if selector is not None: raise SelectorNotImplementedError('Query') self.content = content super(QueryReader, self).__init__(function, truncation, user, password, additional_columns=additional_columns, disable_info=disable_info)
[docs] @staticmethod def get_srcids(_table): """ Get the name of the source ID column from an Astropy Table. Args: _table (Table): A table containing columns. Returns: str: The name of the source ID column. Raises: ValueError: If the source ID column is not found in the DataFrame. ValueError: If the index of the source ID column is not a string. """ try: stripped_pairs = ((re.sub(r'[-_ ]', '', c.lower()), c) for c in _table.columns.keys()) sid_col = next(original for stripped, original in stripped_pairs if stripped in ['sourceid', 'srcid']) except StopIteration: raise ValueError('Source ID column not found in query result.') if not isinstance(sid_col, str): raise ValueError( f'Index of source ID column should be a string, but is {type(sid_col).__name__}: {sid_col}.') return _table[sid_col]
@staticmethod def _add_marker(query, comment): def __remove_comments(_query: str) -> str: single_line_comment = re.compile(r'--.*$', re.MULTILINE) multi_line_comment = re.compile(r'/\*.*?\*/', re.DOTALL) query_no_single_line_comments = single_line_comment.sub('', _query) # Remove single-line comments return multi_line_comment.sub('', query_no_single_line_comments) # Remove multi-line comments query = __remove_comments(query) if comment: insensitive_select = re.compile(re.escape('select'), re.IGNORECASE) query = insensitive_select.sub(f'select --{comment} \n', query) return query
[docs] def read(self, _data_release=data_release, _comment=f'This query was launched from within GaiaXPy {__version__}'): # Import only when read is called, as this module is dependent on the Archive’s availability from astroquery.gaia import GaiaClass query = self.content query = self._add_marker(query, _comment) function_name = self.function.__name__ if function_name in not_supported_functions: raise ValueError(f'Function {function_name} does not accept ADQL queries.') # Connect to geapre gaia = GaiaClass(gaia_tap_server=gaia_server, gaia_data_server=gaia_server) self._login(gaia) # ADQL query if not self.disable_info: self.show_info_msg() job = gaia.launch_job_async(query, dump_to_file=False) query_result = job.get_results() result = gaia.load_data(ids=self.get_srcids(query_result), format='csv', data_release=_data_release, data_structure='raw', retrieval_type='XP_CONTINUOUS', avoid_datatype_check=True) try: continuous_key = [key for key in result.keys() if 'continuous' in key.lower()][0] data = result[continuous_key][0].to_pandas() except KeyError: raise ValueError('No continuous BP/RP data found for the requested query.') if not self.disable_info: self.show_info_msg(done=True) return DataFrameReader(data, function_name, self.truncation, additional_columns=self.additional_columns, disable_info=True).read()