afscgap.flat

Entrypoint for logic to use prejoined Avro flat files.

(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center for Data Science and the Environment at UC Berkeley.

This file is part of afscgap released under the BSD 3-Clause License. See LICENSE.md.

  1"""
  2Entrypoint for logic to use prejoined Avro flat files.
  3
  4(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center
  5for Data Science and the Environment at UC Berkeley.
  6
  7This file is part of afscgap released under the BSD 3-Clause License. See
  8LICENSE.md.
  9"""
 10import functools
 11import itertools
 12import warnings
 13
 14import afscgap.cursor
 15import afscgap.flat_http
 16import afscgap.flat_local_filter
 17import afscgap.flat_model
 18import afscgap.flat_cursor
 19
 20from afscgap.flat_model import HAUL_KEYS, PARAMS_DICT, RECORDS
 21
 22WARNING_THRESHOLD = 3000
 23
 24LARGE_WARNING = ' '.join([
 25    'Your query may return a very large amount of records.',
 26    'Be sure to interact with results in a memory efficient way.'
 27])
 28
 29
 30def get_hauls(params: PARAMS_DICT, meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS:
 31    """Get the hauls matching a set of parameters.
 32
 33    Args:
 34        params: Set of parameters to apply when determining which records should be included in the
 35            results set.
 36        meta: Set of configuration parameters to use when making requests for Avro flat files.
 37
 38    Returns:
 39        Iterable over matching hauls. All relevant results will be included in this iterable but it
 40        may also include irrelevant results, requiring further local filtering.
 41    """
 42    presence_only = meta.get_presence_only()
 43
 44    params_flat = params.items()
 45    params_keyed = map(lambda x: afscgap.param.FieldParam(x[0], x[1]), params_flat)
 46    params_required = filter(lambda x: not x.get_param().get_is_ignorable(), params_keyed)
 47    index_filters_nest = map(lambda x: afscgap.flat_index_util.make_filters(
 48        x.get_field(),
 49        x.get_param(),
 50        presence_only
 51    ), params_required)
 52
 53    index_filters = itertools.chain(*index_filters_nest)
 54    index_filters_realized = list(index_filters)
 55
 56    if len(index_filters_realized) == 0:
 57        return afscgap.flat_http.get_all_hauls(meta)
 58
 59    haul_iterables = map(
 60        lambda x: afscgap.flat_http.get_hauls_for_index_filter(meta, x),
 61        index_filters_realized
 62    )
 63    haul_sets = map(lambda x: set(x), haul_iterables)
 64    return functools.reduce(lambda a, b: a.intersection(b), haul_sets)  # type: ignore
 65
 66
 67def check_warning(hauls: HAUL_KEYS, meta: afscgap.flat_model.ExecuteMetaParams):
 68    """Check if a large payload warning should be emitted.
 69
 70    Check if a large payload warning should be emitted, creating that warning through the method
 71    indicated by the provided meta parameter if meeting criteria specified by that same parameter.
 72
 73    Args:
 74        hauls: Iterable over haul keys to be downloaded.
 75        meta: Set of configuration parameters to use when making requests for Avro flat files.
 76    """
 77    if meta.get_suppress_large_warning():
 78        return
 79
 80    num_hauls = sum(map(lambda x: 1, hauls))
 81
 82    if num_hauls > WARNING_THRESHOLD:
 83        warn_func = meta.get_warn_func()
 84        warn_func_realized = warnings.warn if warn_func is None else warn_func
 85        warn_func_realized(LARGE_WARNING)  # type: ignore
 86
 87
 88def execute(param_dict: PARAMS_DICT,
 89    meta: afscgap.flat_model.ExecuteMetaParams) -> afscgap.cursor.Cursor:
 90    """Execute a AFSC GAP request using prejoined flat Avro files.
 91
 92    Args:
 93        param_dict: Dictionary describing the parameters with which to execute this request.
 94        meta: Set of configuration parameters to use when making requests for Avro flat files.
 95
 96    Returns:
 97        Iterable over matching results.
 98    """
 99    local_filter = afscgap.flat_local_filter.build_filter(param_dict)
100    hauls = get_hauls(param_dict, meta)
101
102    hauls_realized = list(hauls)
103    check_warning(hauls_realized, meta)
104
105    candidate_records_nested = map(
106        lambda x: afscgap.flat_http.get_records_for_haul(meta, x),
107        hauls_realized
108    )
109    candidate_records: RECORDS = itertools.chain(*candidate_records_nested)
110    records: RECORDS = filter(lambda x: local_filter.matches(x), candidate_records)
111
112    limit = meta.get_limit()
113    raw_cursor = afscgap.flat_cursor.FlatCursor(records)
114
115    no_incomplete = meta.get_filter_incomplete()
116    filter_cursor = afscgap.flat_cursor.CompleteCursor(raw_cursor) if no_incomplete else raw_cursor
117
118    limit = meta.get_limit()
119    cursor = afscgap.flat_cursor.LimitCursor(filter_cursor, limit) if limit else filter_cursor
120
121    return cursor
WARNING_THRESHOLD = 3000
LARGE_WARNING = 'Your query may return a very large amount of records. Be sure to interact with results in a memory efficient way.'
def get_hauls( params: Dict[str, afscgap.param.Param], meta: afscgap.flat_model.ExecuteMetaParams) -> Iterable[afscgap.flat_model.HaulKey]:
31def get_hauls(params: PARAMS_DICT, meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS:
32    """Get the hauls matching a set of parameters.
33
34    Args:
35        params: Set of parameters to apply when determining which records should be included in the
36            results set.
37        meta: Set of configuration parameters to use when making requests for Avro flat files.
38
39    Returns:
40        Iterable over matching hauls. All relevant results will be included in this iterable but it
41        may also include irrelevant results, requiring further local filtering.
42    """
43    presence_only = meta.get_presence_only()
44
45    params_flat = params.items()
46    params_keyed = map(lambda x: afscgap.param.FieldParam(x[0], x[1]), params_flat)
47    params_required = filter(lambda x: not x.get_param().get_is_ignorable(), params_keyed)
48    index_filters_nest = map(lambda x: afscgap.flat_index_util.make_filters(
49        x.get_field(),
50        x.get_param(),
51        presence_only
52    ), params_required)
53
54    index_filters = itertools.chain(*index_filters_nest)
55    index_filters_realized = list(index_filters)
56
57    if len(index_filters_realized) == 0:
58        return afscgap.flat_http.get_all_hauls(meta)
59
60    haul_iterables = map(
61        lambda x: afscgap.flat_http.get_hauls_for_index_filter(meta, x),
62        index_filters_realized
63    )
64    haul_sets = map(lambda x: set(x), haul_iterables)
65    return functools.reduce(lambda a, b: a.intersection(b), haul_sets)  # type: ignore

Get the hauls matching a set of parameters.

Arguments:
  • params: Set of parameters to apply when determining which records should be included in the results set.
  • meta: Set of configuration parameters to use when making requests for Avro flat files.
Returns:

Iterable over matching hauls. All relevant results will be included in this iterable but it may also include irrelevant results, requiring further local filtering.

def check_warning( hauls: Iterable[afscgap.flat_model.HaulKey], meta: afscgap.flat_model.ExecuteMetaParams):
68def check_warning(hauls: HAUL_KEYS, meta: afscgap.flat_model.ExecuteMetaParams):
69    """Check if a large payload warning should be emitted.
70
71    Check if a large payload warning should be emitted, creating that warning through the method
72    indicated by the provided meta parameter if meeting criteria specified by that same parameter.
73
74    Args:
75        hauls: Iterable over haul keys to be downloaded.
76        meta: Set of configuration parameters to use when making requests for Avro flat files.
77    """
78    if meta.get_suppress_large_warning():
79        return
80
81    num_hauls = sum(map(lambda x: 1, hauls))
82
83    if num_hauls > WARNING_THRESHOLD:
84        warn_func = meta.get_warn_func()
85        warn_func_realized = warnings.warn if warn_func is None else warn_func
86        warn_func_realized(LARGE_WARNING)  # type: ignore

Check if a large payload warning should be emitted.

Check if a large payload warning should be emitted, creating that warning through the method indicated by the provided meta parameter if meeting criteria specified by that same parameter.

Arguments:
  • hauls: Iterable over haul keys to be downloaded.
  • meta: Set of configuration parameters to use when making requests for Avro flat files.
def execute( param_dict: Dict[str, afscgap.param.Param], meta: afscgap.flat_model.ExecuteMetaParams) -> afscgap.cursor.Cursor:
 89def execute(param_dict: PARAMS_DICT,
 90    meta: afscgap.flat_model.ExecuteMetaParams) -> afscgap.cursor.Cursor:
 91    """Execute a AFSC GAP request using prejoined flat Avro files.
 92
 93    Args:
 94        param_dict: Dictionary describing the parameters with which to execute this request.
 95        meta: Set of configuration parameters to use when making requests for Avro flat files.
 96
 97    Returns:
 98        Iterable over matching results.
 99    """
100    local_filter = afscgap.flat_local_filter.build_filter(param_dict)
101    hauls = get_hauls(param_dict, meta)
102
103    hauls_realized = list(hauls)
104    check_warning(hauls_realized, meta)
105
106    candidate_records_nested = map(
107        lambda x: afscgap.flat_http.get_records_for_haul(meta, x),
108        hauls_realized
109    )
110    candidate_records: RECORDS = itertools.chain(*candidate_records_nested)
111    records: RECORDS = filter(lambda x: local_filter.matches(x), candidate_records)
112
113    limit = meta.get_limit()
114    raw_cursor = afscgap.flat_cursor.FlatCursor(records)
115
116    no_incomplete = meta.get_filter_incomplete()
117    filter_cursor = afscgap.flat_cursor.CompleteCursor(raw_cursor) if no_incomplete else raw_cursor
118
119    limit = meta.get_limit()
120    cursor = afscgap.flat_cursor.LimitCursor(filter_cursor, limit) if limit else filter_cursor
121
122    return cursor

Execute a AFSC GAP request using prejoined flat Avro files.

Arguments:
  • param_dict: Dictionary describing the parameters with which to execute this request.
  • meta: Set of configuration parameters to use when making requests for Avro flat files.
Returns:

Iterable over matching results.