afscgap.flat_http
Interfaces for cursor objects which iterate over records from prejoined flat records.
Interfaces for cursor objects which iterate over records from prejoined flat records, either those appearing in the underlying unjoined dataset or zero catch inferred records.
(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center for Data Science and the Environment at UC Berkeley.
This file is part of afscgap released under the BSD 3-Clause License. See LICENSE.md.
1""" 2Interfaces for cursor objects which iterate over records from prejoined flat records. 3 4Interfaces for cursor objects which iterate over records from prejoined flat records, either those 5appearing in the underlying unjoined dataset or zero catch inferred records. 6 7(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center 8for Data Science and the Environment at UC Berkeley. 9 10This file is part of afscgap released under the BSD 3-Clause License. See 11LICENSE.md. 12""" 13import itertools 14import typing 15 16import fastavro 17 18import afscgap.flat_index_util 19import afscgap.flat_model 20import afscgap.http_util 21 22from afscgap.flat_model import HAUL_KEYS, RECORDS 23from afscgap.typesdef import REQUESTOR 24 25MAIN_INDEX_PATH = '/index/main.avro' 26OPT_FILTER = typing.Optional[afscgap.flat_index_util.IndexFilter] 27 28 29def build_haul_from_avro(target: dict) -> afscgap.flat_model.HaulKey: 30 """Build a haul record from a dictionary parsed from avro. 31 32 Args: 33 target: The single record parsed from binary avro to be converted to a HaulKey. 34 35 Returns: 36 Parsed HaulKey record. 37 """ 38 year = target['year'] 39 survey = target['survey'] 40 haul = target['haul'] 41 return afscgap.flat_model.HaulKey(year, survey, haul) 42 43 44def build_requestor() -> REQUESTOR: 45 """Build a requests-compatible requestor object. 46 47 Returns: 48 Create a new requestor object which is set up for streaming and other configuration as 49 required for flat file iteration. 50 """ 51 return afscgap.http_util.build_requestor(stream=True) 52 53 54def get_index_urls(meta: afscgap.flat_model.ExecuteMetaParams, 55 index_filter: OPT_FILTER = None) -> typing.Iterable[str]: 56 """Get the URL at which an index can be found. 57 58 Args: 59 meta: Configuration object which indicates how the all hauls index should be requested. This 60 can, for example, be used to configure the server from which data are streamed. 61 index_filter: Information about the filter to be applied against a precomputed index. If 62 None, the URL for the all hauls index is returned. Defaults to NOne. 63 64 Returns: 65 String URL at which the index can be found. 66 """ 67 if index_filter is None: 68 return [meta.get_base_url() + MAIN_INDEX_PATH] 69 else: 70 paths = map(lambda x: '/index/%s.avro' % x, index_filter.get_index_names()) 71 return map(lambda x: meta.get_base_url() + x, paths) 72 73 74def determine_matching_hauls_from_index(options: typing.Iterable[dict], 75 index_filter: afscgap.flat_index_util.IndexFilter) -> typing.Iterable[dict]: 76 """Determine which haul keys match an index filter. 77 78 Args: 79 options: The haul keys matching different values. 80 index_filter: The index filter to apply to the available hauls. 81 82 Returns: 83 Iterable of haul keys matching the filter. 84 """ 85 dict_stream_with_value = filter(lambda x: index_filter.get_matches(x['value']), options) 86 dict_stream_nested = map(lambda x: x['keys'], dict_stream_with_value) 87 dict_stream = itertools.chain(*dict_stream_nested) 88 return dict_stream 89 90 91def get_complete(iterator, url: str) -> typing.List[dict]: 92 """Get the complete payload from an Avro iterator. 93 94 Get the complete payload from an Avro iterator to avoid issues with streaming interruption on 95 weaker connections. 96 97 Args: 98 iterator: The iterator over Avro files read from the stream. 99 url: The URL at which the Avro payload was found. 100 101 Returns: 102 Iterable over the parsed Avro records. 103 """ 104 try: 105 return list(iterator) 106 except Exception as e: 107 raise RuntimeError('Failed on %s (%s).' % (url, str(e))) 108 109 110def get_all_hauls(meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS: 111 """Get information about all hauls currently available. 112 113 Args: 114 meta: Configuration object which indicates how the all hauls index should be requested. This 115 can, for example, be used to configure the server from which data are streamed. 116 117 Returns: 118 Iterator over haul information as requested from the remote server. 119 """ 120 urls = list(get_index_urls(meta)) 121 assert len(urls) == 1 122 url = urls[0] 123 124 requestor_maybe = meta.get_requestor() 125 requestor = requestor_maybe if requestor_maybe else build_requestor() 126 response = requestor(url) 127 128 afscgap.http_util.check_result(response) 129 130 stream = response.raw 131 dict_stream = get_complete(fastavro.reader(stream), url) # type: ignore 132 obj_stream = map(build_haul_from_avro, dict_stream) # type: ignore 133 return obj_stream 134 135 136def get_hauls_for_index_filter(meta: afscgap.flat_model.ExecuteMetaParams, 137 index_filter: afscgap.flat_index_util.IndexFilter) -> HAUL_KEYS: 138 """Get hauls which may match a filter using precomputed indicies. 139 140 Get all hauls which may match a filter using pre-computed Avro haul indicies which may prevent 141 the query from requiring all catch data to be downloaded. 142 143 Args: 144 meta: Configuration object which indicates how the all hauls index should be requested. This 145 can, for example, be used to configure the server from which data are streamed. 146 index_filter: Information about the filter to be applied against a precomputed index. 147 148 Returns: 149 Iterable over haul keys which may match the specified filter. 150 """ 151 urls = get_index_urls(meta, index_filter) 152 153 def get_for_url(url): 154 requestor_maybe = meta.get_requestor() 155 requestor = requestor_maybe if requestor_maybe else build_requestor() 156 response = requestor(url) 157 158 afscgap.http_util.check_result(response) 159 160 stream = response.raw 161 all_with_value = get_complete(fastavro.reader(stream), url) 162 dict_stream = determine_matching_hauls_from_index(all_with_value, index_filter) 163 164 obj_stream = map(build_haul_from_avro, dict_stream) 165 return obj_stream 166 167 return itertools.chain(*map(get_for_url, urls)) 168 169 170def get_records_for_haul(meta: afscgap.flat_model.ExecuteMetaParams, 171 haul: afscgap.flat_model.HaulKey) -> RECORDS: 172 """Get the joined records from the hauls provided. 173 174 Args: 175 meta: Configuration object which indicates how the all hauls index should be requested. This 176 can, for example, be used to configure the server from which data are streamed. 177 haul: The haul for which records should be returned. 178 179 Returns: 180 All joined records for the given haul. 181 """ 182 path = haul.get_path() 183 url = meta.get_base_url() + path 184 185 requestor_maybe = meta.get_requestor() 186 requestor = requestor_maybe if requestor_maybe else build_requestor() 187 response = requestor(url) 188 189 afscgap.http_util.check_result(response) 190 191 stream = response.raw 192 dict_stream = get_complete(fastavro.reader(stream), url) 193 obj_stream = map(lambda x: afscgap.flat_model.FlatRecord(x), dict_stream) 194 return obj_stream
30def build_haul_from_avro(target: dict) -> afscgap.flat_model.HaulKey: 31 """Build a haul record from a dictionary parsed from avro. 32 33 Args: 34 target: The single record parsed from binary avro to be converted to a HaulKey. 35 36 Returns: 37 Parsed HaulKey record. 38 """ 39 year = target['year'] 40 survey = target['survey'] 41 haul = target['haul'] 42 return afscgap.flat_model.HaulKey(year, survey, haul)
Build a haul record from a dictionary parsed from avro.
Arguments:
- target: The single record parsed from binary avro to be converted to a HaulKey.
Returns:
Parsed HaulKey record.
45def build_requestor() -> REQUESTOR: 46 """Build a requests-compatible requestor object. 47 48 Returns: 49 Create a new requestor object which is set up for streaming and other configuration as 50 required for flat file iteration. 51 """ 52 return afscgap.http_util.build_requestor(stream=True)
Build a requests-compatible requestor object.
Returns:
Create a new requestor object which is set up for streaming and other configuration as required for flat file iteration.
55def get_index_urls(meta: afscgap.flat_model.ExecuteMetaParams, 56 index_filter: OPT_FILTER = None) -> typing.Iterable[str]: 57 """Get the URL at which an index can be found. 58 59 Args: 60 meta: Configuration object which indicates how the all hauls index should be requested. This 61 can, for example, be used to configure the server from which data are streamed. 62 index_filter: Information about the filter to be applied against a precomputed index. If 63 None, the URL for the all hauls index is returned. Defaults to NOne. 64 65 Returns: 66 String URL at which the index can be found. 67 """ 68 if index_filter is None: 69 return [meta.get_base_url() + MAIN_INDEX_PATH] 70 else: 71 paths = map(lambda x: '/index/%s.avro' % x, index_filter.get_index_names()) 72 return map(lambda x: meta.get_base_url() + x, paths)
Get the URL at which an index can be found.
Arguments:
- meta: Configuration object which indicates how the all hauls index should be requested. This can, for example, be used to configure the server from which data are streamed.
- index_filter: Information about the filter to be applied against a precomputed index. If None, the URL for the all hauls index is returned. Defaults to NOne.
Returns:
String URL at which the index can be found.
75def determine_matching_hauls_from_index(options: typing.Iterable[dict], 76 index_filter: afscgap.flat_index_util.IndexFilter) -> typing.Iterable[dict]: 77 """Determine which haul keys match an index filter. 78 79 Args: 80 options: The haul keys matching different values. 81 index_filter: The index filter to apply to the available hauls. 82 83 Returns: 84 Iterable of haul keys matching the filter. 85 """ 86 dict_stream_with_value = filter(lambda x: index_filter.get_matches(x['value']), options) 87 dict_stream_nested = map(lambda x: x['keys'], dict_stream_with_value) 88 dict_stream = itertools.chain(*dict_stream_nested) 89 return dict_stream
Determine which haul keys match an index filter.
Arguments:
- options: The haul keys matching different values.
- index_filter: The index filter to apply to the available hauls.
Returns:
Iterable of haul keys matching the filter.
92def get_complete(iterator, url: str) -> typing.List[dict]: 93 """Get the complete payload from an Avro iterator. 94 95 Get the complete payload from an Avro iterator to avoid issues with streaming interruption on 96 weaker connections. 97 98 Args: 99 iterator: The iterator over Avro files read from the stream. 100 url: The URL at which the Avro payload was found. 101 102 Returns: 103 Iterable over the parsed Avro records. 104 """ 105 try: 106 return list(iterator) 107 except Exception as e: 108 raise RuntimeError('Failed on %s (%s).' % (url, str(e)))
Get the complete payload from an Avro iterator.
Get the complete payload from an Avro iterator to avoid issues with streaming interruption on weaker connections.
Arguments:
- iterator: The iterator over Avro files read from the stream.
- url: The URL at which the Avro payload was found.
Returns:
Iterable over the parsed Avro records.
111def get_all_hauls(meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS: 112 """Get information about all hauls currently available. 113 114 Args: 115 meta: Configuration object which indicates how the all hauls index should be requested. This 116 can, for example, be used to configure the server from which data are streamed. 117 118 Returns: 119 Iterator over haul information as requested from the remote server. 120 """ 121 urls = list(get_index_urls(meta)) 122 assert len(urls) == 1 123 url = urls[0] 124 125 requestor_maybe = meta.get_requestor() 126 requestor = requestor_maybe if requestor_maybe else build_requestor() 127 response = requestor(url) 128 129 afscgap.http_util.check_result(response) 130 131 stream = response.raw 132 dict_stream = get_complete(fastavro.reader(stream), url) # type: ignore 133 obj_stream = map(build_haul_from_avro, dict_stream) # type: ignore 134 return obj_stream
Get information about all hauls currently available.
Arguments:
- meta: Configuration object which indicates how the all hauls index should be requested. This can, for example, be used to configure the server from which data are streamed.
Returns:
Iterator over haul information as requested from the remote server.
137def get_hauls_for_index_filter(meta: afscgap.flat_model.ExecuteMetaParams, 138 index_filter: afscgap.flat_index_util.IndexFilter) -> HAUL_KEYS: 139 """Get hauls which may match a filter using precomputed indicies. 140 141 Get all hauls which may match a filter using pre-computed Avro haul indicies which may prevent 142 the query from requiring all catch data to be downloaded. 143 144 Args: 145 meta: Configuration object which indicates how the all hauls index should be requested. This 146 can, for example, be used to configure the server from which data are streamed. 147 index_filter: Information about the filter to be applied against a precomputed index. 148 149 Returns: 150 Iterable over haul keys which may match the specified filter. 151 """ 152 urls = get_index_urls(meta, index_filter) 153 154 def get_for_url(url): 155 requestor_maybe = meta.get_requestor() 156 requestor = requestor_maybe if requestor_maybe else build_requestor() 157 response = requestor(url) 158 159 afscgap.http_util.check_result(response) 160 161 stream = response.raw 162 all_with_value = get_complete(fastavro.reader(stream), url) 163 dict_stream = determine_matching_hauls_from_index(all_with_value, index_filter) 164 165 obj_stream = map(build_haul_from_avro, dict_stream) 166 return obj_stream 167 168 return itertools.chain(*map(get_for_url, urls))
Get hauls which may match a filter using precomputed indicies.
Get all hauls which may match a filter using pre-computed Avro haul indicies which may prevent the query from requiring all catch data to be downloaded.
Arguments:
- meta: Configuration object which indicates how the all hauls index should be requested. This can, for example, be used to configure the server from which data are streamed.
- index_filter: Information about the filter to be applied against a precomputed index.
Returns:
Iterable over haul keys which may match the specified filter.
171def get_records_for_haul(meta: afscgap.flat_model.ExecuteMetaParams, 172 haul: afscgap.flat_model.HaulKey) -> RECORDS: 173 """Get the joined records from the hauls provided. 174 175 Args: 176 meta: Configuration object which indicates how the all hauls index should be requested. This 177 can, for example, be used to configure the server from which data are streamed. 178 haul: The haul for which records should be returned. 179 180 Returns: 181 All joined records for the given haul. 182 """ 183 path = haul.get_path() 184 url = meta.get_base_url() + path 185 186 requestor_maybe = meta.get_requestor() 187 requestor = requestor_maybe if requestor_maybe else build_requestor() 188 response = requestor(url) 189 190 afscgap.http_util.check_result(response) 191 192 stream = response.raw 193 dict_stream = get_complete(fastavro.reader(stream), url) 194 obj_stream = map(lambda x: afscgap.flat_model.FlatRecord(x), dict_stream) 195 return obj_stream
Get the joined records from the hauls provided.
Arguments:
- meta: Configuration object which indicates how the all hauls index should be requested. This can, for example, be used to configure the server from which data are streamed.
- haul: The haul for which records should be returned.
Returns:
All joined records for the given haul.