afscgap.flat
Entrypoint for logic to use prejoined Avro flat files.
(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center for Data Science and the Environment at UC Berkeley.
This file is part of afscgap released under the BSD 3-Clause License. See LICENSE.md.
1""" 2Entrypoint for logic to use prejoined Avro flat files. 3 4(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center 5for Data Science and the Environment at UC Berkeley. 6 7This file is part of afscgap released under the BSD 3-Clause License. See 8LICENSE.md. 9""" 10import functools 11import itertools 12import warnings 13 14import afscgap.cursor 15import afscgap.flat_http 16import afscgap.flat_local_filter 17import afscgap.flat_model 18import afscgap.flat_cursor 19 20from afscgap.flat_model import HAUL_KEYS, PARAMS_DICT, RECORDS 21 22WARNING_THRESHOLD = 3000 23 24LARGE_WARNING = ' '.join([ 25 'Your query may return a very large amount of records.', 26 'Be sure to interact with results in a memory efficient way.' 27]) 28 29 30def get_hauls(params: PARAMS_DICT, meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS: 31 """Get the hauls matching a set of parameters. 32 33 Args: 34 params: Set of parameters to apply when determining which records should be included in the 35 results set. 36 meta: Set of configuration parameters to use when making requests for Avro flat files. 37 38 Returns: 39 Iterable over matching hauls. All relevant results will be included in this iterable but it 40 may also include irrelevant results, requiring further local filtering. 41 """ 42 presence_only = meta.get_presence_only() 43 44 params_flat = params.items() 45 params_keyed = map(lambda x: afscgap.param.FieldParam(x[0], x[1]), params_flat) 46 params_required = filter(lambda x: not x.get_param().get_is_ignorable(), params_keyed) 47 index_filters_nest = map(lambda x: afscgap.flat_index_util.make_filters( 48 x.get_field(), 49 x.get_param(), 50 presence_only 51 ), params_required) 52 53 index_filters = itertools.chain(*index_filters_nest) 54 index_filters_realized = list(index_filters) 55 56 if len(index_filters_realized) == 0: 57 return afscgap.flat_http.get_all_hauls(meta) 58 59 haul_iterables = map( 60 lambda x: afscgap.flat_http.get_hauls_for_index_filter(meta, x), 61 index_filters_realized 62 ) 63 haul_sets = map(lambda x: set(x), haul_iterables) 64 return functools.reduce(lambda a, b: a.intersection(b), haul_sets) # type: ignore 65 66 67def check_warning(hauls: HAUL_KEYS, meta: afscgap.flat_model.ExecuteMetaParams): 68 """Check if a large payload warning should be emitted. 69 70 Check if a large payload warning should be emitted, creating that warning through the method 71 indicated by the provided meta parameter if meeting criteria specified by that same parameter. 72 73 Args: 74 hauls: Iterable over haul keys to be downloaded. 75 meta: Set of configuration parameters to use when making requests for Avro flat files. 76 """ 77 if meta.get_suppress_large_warning(): 78 return 79 80 num_hauls = sum(map(lambda x: 1, hauls)) 81 82 if num_hauls > WARNING_THRESHOLD: 83 warn_func = meta.get_warn_func() 84 warn_func_realized = warnings.warn if warn_func is None else warn_func 85 warn_func_realized(LARGE_WARNING) # type: ignore 86 87 88def execute(param_dict: PARAMS_DICT, 89 meta: afscgap.flat_model.ExecuteMetaParams) -> afscgap.cursor.Cursor: 90 """Execute a AFSC GAP request using prejoined flat Avro files. 91 92 Args: 93 param_dict: Dictionary describing the parameters with which to execute this request. 94 meta: Set of configuration parameters to use when making requests for Avro flat files. 95 96 Returns: 97 Iterable over matching results. 98 """ 99 local_filter = afscgap.flat_local_filter.build_filter(param_dict) 100 hauls = get_hauls(param_dict, meta) 101 102 hauls_realized = list(hauls) 103 check_warning(hauls_realized, meta) 104 105 candidate_records_nested = map( 106 lambda x: afscgap.flat_http.get_records_for_haul(meta, x), 107 hauls_realized 108 ) 109 candidate_records: RECORDS = itertools.chain(*candidate_records_nested) 110 records: RECORDS = filter(lambda x: local_filter.matches(x), candidate_records) 111 112 limit = meta.get_limit() 113 raw_cursor = afscgap.flat_cursor.FlatCursor(records) 114 115 no_incomplete = meta.get_filter_incomplete() 116 filter_cursor = afscgap.flat_cursor.CompleteCursor(raw_cursor) if no_incomplete else raw_cursor 117 118 limit = meta.get_limit() 119 cursor = afscgap.flat_cursor.LimitCursor(filter_cursor, limit) if limit else filter_cursor 120 121 return cursor
31def get_hauls(params: PARAMS_DICT, meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS: 32 """Get the hauls matching a set of parameters. 33 34 Args: 35 params: Set of parameters to apply when determining which records should be included in the 36 results set. 37 meta: Set of configuration parameters to use when making requests for Avro flat files. 38 39 Returns: 40 Iterable over matching hauls. All relevant results will be included in this iterable but it 41 may also include irrelevant results, requiring further local filtering. 42 """ 43 presence_only = meta.get_presence_only() 44 45 params_flat = params.items() 46 params_keyed = map(lambda x: afscgap.param.FieldParam(x[0], x[1]), params_flat) 47 params_required = filter(lambda x: not x.get_param().get_is_ignorable(), params_keyed) 48 index_filters_nest = map(lambda x: afscgap.flat_index_util.make_filters( 49 x.get_field(), 50 x.get_param(), 51 presence_only 52 ), params_required) 53 54 index_filters = itertools.chain(*index_filters_nest) 55 index_filters_realized = list(index_filters) 56 57 if len(index_filters_realized) == 0: 58 return afscgap.flat_http.get_all_hauls(meta) 59 60 haul_iterables = map( 61 lambda x: afscgap.flat_http.get_hauls_for_index_filter(meta, x), 62 index_filters_realized 63 ) 64 haul_sets = map(lambda x: set(x), haul_iterables) 65 return functools.reduce(lambda a, b: a.intersection(b), haul_sets) # type: ignore
Get the hauls matching a set of parameters.
Arguments:
- params: Set of parameters to apply when determining which records should be included in the results set.
- meta: Set of configuration parameters to use when making requests for Avro flat files.
Returns:
Iterable over matching hauls. All relevant results will be included in this iterable but it may also include irrelevant results, requiring further local filtering.
68def check_warning(hauls: HAUL_KEYS, meta: afscgap.flat_model.ExecuteMetaParams): 69 """Check if a large payload warning should be emitted. 70 71 Check if a large payload warning should be emitted, creating that warning through the method 72 indicated by the provided meta parameter if meeting criteria specified by that same parameter. 73 74 Args: 75 hauls: Iterable over haul keys to be downloaded. 76 meta: Set of configuration parameters to use when making requests for Avro flat files. 77 """ 78 if meta.get_suppress_large_warning(): 79 return 80 81 num_hauls = sum(map(lambda x: 1, hauls)) 82 83 if num_hauls > WARNING_THRESHOLD: 84 warn_func = meta.get_warn_func() 85 warn_func_realized = warnings.warn if warn_func is None else warn_func 86 warn_func_realized(LARGE_WARNING) # type: ignore
Check if a large payload warning should be emitted.
Check if a large payload warning should be emitted, creating that warning through the method indicated by the provided meta parameter if meeting criteria specified by that same parameter.
Arguments:
- hauls: Iterable over haul keys to be downloaded.
- meta: Set of configuration parameters to use when making requests for Avro flat files.
89def execute(param_dict: PARAMS_DICT, 90 meta: afscgap.flat_model.ExecuteMetaParams) -> afscgap.cursor.Cursor: 91 """Execute a AFSC GAP request using prejoined flat Avro files. 92 93 Args: 94 param_dict: Dictionary describing the parameters with which to execute this request. 95 meta: Set of configuration parameters to use when making requests for Avro flat files. 96 97 Returns: 98 Iterable over matching results. 99 """ 100 local_filter = afscgap.flat_local_filter.build_filter(param_dict) 101 hauls = get_hauls(param_dict, meta) 102 103 hauls_realized = list(hauls) 104 check_warning(hauls_realized, meta) 105 106 candidate_records_nested = map( 107 lambda x: afscgap.flat_http.get_records_for_haul(meta, x), 108 hauls_realized 109 ) 110 candidate_records: RECORDS = itertools.chain(*candidate_records_nested) 111 records: RECORDS = filter(lambda x: local_filter.matches(x), candidate_records) 112 113 limit = meta.get_limit() 114 raw_cursor = afscgap.flat_cursor.FlatCursor(records) 115 116 no_incomplete = meta.get_filter_incomplete() 117 filter_cursor = afscgap.flat_cursor.CompleteCursor(raw_cursor) if no_incomplete else raw_cursor 118 119 limit = meta.get_limit() 120 cursor = afscgap.flat_cursor.LimitCursor(filter_cursor, limit) if limit else filter_cursor 121 122 return cursor
Execute a AFSC GAP request using prejoined flat Avro files.
Arguments:
- param_dict: Dictionary describing the parameters with which to execute this request.
- meta: Set of configuration parameters to use when making requests for Avro flat files.
Returns:
Iterable over matching results.