afscgap.flat_http

Interfaces for cursor objects which iterate over records from prejoined flat records.

Interfaces for cursor objects which iterate over records from prejoined flat records, either those appearing in the underlying unjoined dataset or zero catch inferred records.

(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center for Data Science and the Environment at UC Berkeley.

This file is part of afscgap released under the BSD 3-Clause License. See LICENSE.md.

  1"""
  2Interfaces for cursor objects which iterate over records from prejoined flat records.
  3
  4Interfaces for cursor objects which iterate over records from prejoined flat records, either those
  5appearing in the underlying unjoined dataset or zero catch inferred records.
  6
  7(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center
  8for Data Science and the Environment at UC Berkeley.
  9
 10This file is part of afscgap released under the BSD 3-Clause License. See
 11LICENSE.md.
 12"""
 13import itertools
 14import typing
 15
 16import fastavro
 17
 18import afscgap.flat_index_util
 19import afscgap.flat_model
 20import afscgap.http_util
 21
 22from afscgap.flat_model import HAUL_KEYS, RECORDS
 23from afscgap.typesdef import REQUESTOR
 24
 25MAIN_INDEX_PATH = '/index/main.avro'
 26OPT_FILTER = typing.Optional[afscgap.flat_index_util.IndexFilter]
 27
 28
 29def build_haul_from_avro(target: dict) -> afscgap.flat_model.HaulKey:
 30    """Build a haul record from a dictionary parsed from avro.
 31
 32    Args:
 33        target: The single record parsed from binary avro to be converted to a HaulKey.
 34
 35    Returns:
 36        Parsed HaulKey record.
 37    """
 38    year = target['year']
 39    survey = target['survey']
 40    haul = target['haul']
 41    return afscgap.flat_model.HaulKey(year, survey, haul)
 42
 43
 44def build_requestor() -> REQUESTOR:
 45    """Build a requests-compatible requestor object.
 46
 47    Returns:
 48        Create a new requestor object which is set up for streaming and other configuration as
 49        required for flat file iteration.
 50    """
 51    return afscgap.http_util.build_requestor(stream=True)
 52
 53
 54def get_index_urls(meta: afscgap.flat_model.ExecuteMetaParams,
 55    index_filter: OPT_FILTER = None) -> typing.Iterable[str]:
 56    """Get the URL at which an index can be found.
 57
 58    Args:
 59        meta: Configuration object which indicates how the all hauls index should be requested. This
 60            can, for example, be used to configure the server from which data are streamed.
 61        index_filter: Information about the filter to be applied against a precomputed index. If
 62            None, the URL for the all hauls index is returned. Defaults to NOne.
 63
 64    Returns:
 65        String URL at which the index can be found.
 66    """
 67    if index_filter is None:
 68        return [meta.get_base_url() + MAIN_INDEX_PATH]
 69    else:
 70        paths = map(lambda x: '/index/%s.avro' % x, index_filter.get_index_names())
 71        return map(lambda x: meta.get_base_url() + x, paths)
 72
 73
 74def determine_matching_hauls_from_index(options: typing.Iterable[dict],
 75    index_filter: afscgap.flat_index_util.IndexFilter) -> typing.Iterable[dict]:
 76    """Determine which haul keys match an index filter.
 77
 78    Args:
 79        options: The haul keys matching different values.
 80        index_filter: The index filter to apply to the available hauls.
 81
 82    Returns:
 83        Iterable of haul keys matching the filter.
 84    """
 85    dict_stream_with_value = filter(lambda x: index_filter.get_matches(x['value']), options)
 86    dict_stream_nested = map(lambda x: x['keys'], dict_stream_with_value)
 87    dict_stream = itertools.chain(*dict_stream_nested)
 88    return dict_stream
 89
 90
 91def get_complete(iterator, url: str) -> typing.List[dict]:
 92    """Get the complete payload from an Avro iterator.
 93
 94    Get the complete payload from an Avro iterator to avoid issues with streaming interruption on
 95    weaker connections.
 96
 97    Args:
 98        iterator: The iterator over Avro files read from the stream.
 99        url: The URL at which the Avro payload was found.
100
101    Returns:
102        Iterable over the parsed Avro records.
103    """
104    try:
105        return list(iterator)
106    except Exception as e:
107        raise RuntimeError('Failed on %s (%s).' % (url, str(e)))
108
109
110def get_all_hauls(meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS:
111    """Get information about all hauls currently available.
112
113    Args:
114        meta: Configuration object which indicates how the all hauls index should be requested. This
115            can, for example, be used to configure the server from which data are streamed.
116
117    Returns:
118        Iterator over haul information as requested from the remote server.
119    """
120    urls = list(get_index_urls(meta))
121    assert len(urls) == 1
122    url = urls[0]
123
124    requestor_maybe = meta.get_requestor()
125    requestor = requestor_maybe if requestor_maybe else build_requestor()
126    response = requestor(url)
127
128    afscgap.http_util.check_result(response)
129
130    stream = response.raw
131    dict_stream = get_complete(fastavro.reader(stream), url)  # type: ignore
132    obj_stream = map(build_haul_from_avro, dict_stream)  # type: ignore
133    return obj_stream
134
135
136def get_hauls_for_index_filter(meta: afscgap.flat_model.ExecuteMetaParams,
137    index_filter: afscgap.flat_index_util.IndexFilter) -> HAUL_KEYS:
138    """Get hauls which may match a filter using precomputed indicies.
139
140    Get all hauls which may match a filter using pre-computed Avro haul indicies which may prevent
141    the query from requiring all catch data to be downloaded.
142
143    Args:
144        meta: Configuration object which indicates how the all hauls index should be requested. This
145            can, for example, be used to configure the server from which data are streamed.
146        index_filter: Information about the filter to be applied against a precomputed index.
147
148    Returns:
149        Iterable over haul keys which may match the specified filter.
150    """
151    urls = get_index_urls(meta, index_filter)
152
153    def get_for_url(url):
154        requestor_maybe = meta.get_requestor()
155        requestor = requestor_maybe if requestor_maybe else build_requestor()
156        response = requestor(url)
157
158        afscgap.http_util.check_result(response)
159
160        stream = response.raw
161        all_with_value = get_complete(fastavro.reader(stream), url)
162        dict_stream = determine_matching_hauls_from_index(all_with_value, index_filter)
163
164        obj_stream = map(build_haul_from_avro, dict_stream)
165        return obj_stream
166
167    return itertools.chain(*map(get_for_url, urls))
168
169
170def get_records_for_haul(meta: afscgap.flat_model.ExecuteMetaParams,
171    haul: afscgap.flat_model.HaulKey) -> RECORDS:
172    """Get the joined records from the hauls provided.
173
174    Args:
175        meta: Configuration object which indicates how the all hauls index should be requested. This
176            can, for example, be used to configure the server from which data are streamed.
177        haul: The haul for which records should be returned.
178
179    Returns:
180        All joined records for the given haul.
181    """
182    path = haul.get_path()
183    url = meta.get_base_url() + path
184
185    requestor_maybe = meta.get_requestor()
186    requestor = requestor_maybe if requestor_maybe else build_requestor()
187    response = requestor(url)
188
189    afscgap.http_util.check_result(response)
190
191    stream = response.raw
192    dict_stream = get_complete(fastavro.reader(stream), url)
193    obj_stream = map(lambda x: afscgap.flat_model.FlatRecord(x), dict_stream)
194    return obj_stream
MAIN_INDEX_PATH = '/index/main.avro'
OPT_FILTER = typing.Optional[afscgap.flat_index_util.IndexFilter]
def build_haul_from_avro(target: dict) -> afscgap.flat_model.HaulKey:
30def build_haul_from_avro(target: dict) -> afscgap.flat_model.HaulKey:
31    """Build a haul record from a dictionary parsed from avro.
32
33    Args:
34        target: The single record parsed from binary avro to be converted to a HaulKey.
35
36    Returns:
37        Parsed HaulKey record.
38    """
39    year = target['year']
40    survey = target['survey']
41    haul = target['haul']
42    return afscgap.flat_model.HaulKey(year, survey, haul)

Build a haul record from a dictionary parsed from avro.

Arguments:
  • target: The single record parsed from binary avro to be converted to a HaulKey.
Returns:

Parsed HaulKey record.

def build_requestor() -> Callable[[str], requests.models.Response]:
45def build_requestor() -> REQUESTOR:
46    """Build a requests-compatible requestor object.
47
48    Returns:
49        Create a new requestor object which is set up for streaming and other configuration as
50        required for flat file iteration.
51    """
52    return afscgap.http_util.build_requestor(stream=True)

Build a requests-compatible requestor object.

Returns:

Create a new requestor object which is set up for streaming and other configuration as required for flat file iteration.

def get_index_urls( meta: afscgap.flat_model.ExecuteMetaParams, index_filter: Optional[afscgap.flat_index_util.IndexFilter] = None) -> Iterable[str]:
55def get_index_urls(meta: afscgap.flat_model.ExecuteMetaParams,
56    index_filter: OPT_FILTER = None) -> typing.Iterable[str]:
57    """Get the URL at which an index can be found.
58
59    Args:
60        meta: Configuration object which indicates how the all hauls index should be requested. This
61            can, for example, be used to configure the server from which data are streamed.
62        index_filter: Information about the filter to be applied against a precomputed index. If
63            None, the URL for the all hauls index is returned. Defaults to NOne.
64
65    Returns:
66        String URL at which the index can be found.
67    """
68    if index_filter is None:
69        return [meta.get_base_url() + MAIN_INDEX_PATH]
70    else:
71        paths = map(lambda x: '/index/%s.avro' % x, index_filter.get_index_names())
72        return map(lambda x: meta.get_base_url() + x, paths)

Get the URL at which an index can be found.

Arguments:
  • meta: Configuration object which indicates how the all hauls index should be requested. This can, for example, be used to configure the server from which data are streamed.
  • index_filter: Information about the filter to be applied against a precomputed index. If None, the URL for the all hauls index is returned. Defaults to NOne.
Returns:

String URL at which the index can be found.

def determine_matching_hauls_from_index( options: Iterable[dict], index_filter: afscgap.flat_index_util.IndexFilter) -> Iterable[dict]:
75def determine_matching_hauls_from_index(options: typing.Iterable[dict],
76    index_filter: afscgap.flat_index_util.IndexFilter) -> typing.Iterable[dict]:
77    """Determine which haul keys match an index filter.
78
79    Args:
80        options: The haul keys matching different values.
81        index_filter: The index filter to apply to the available hauls.
82
83    Returns:
84        Iterable of haul keys matching the filter.
85    """
86    dict_stream_with_value = filter(lambda x: index_filter.get_matches(x['value']), options)
87    dict_stream_nested = map(lambda x: x['keys'], dict_stream_with_value)
88    dict_stream = itertools.chain(*dict_stream_nested)
89    return dict_stream

Determine which haul keys match an index filter.

Arguments:
  • options: The haul keys matching different values.
  • index_filter: The index filter to apply to the available hauls.
Returns:

Iterable of haul keys matching the filter.

def get_complete(iterator, url: str) -> List[dict]:
 92def get_complete(iterator, url: str) -> typing.List[dict]:
 93    """Get the complete payload from an Avro iterator.
 94
 95    Get the complete payload from an Avro iterator to avoid issues with streaming interruption on
 96    weaker connections.
 97
 98    Args:
 99        iterator: The iterator over Avro files read from the stream.
100        url: The URL at which the Avro payload was found.
101
102    Returns:
103        Iterable over the parsed Avro records.
104    """
105    try:
106        return list(iterator)
107    except Exception as e:
108        raise RuntimeError('Failed on %s (%s).' % (url, str(e)))

Get the complete payload from an Avro iterator.

Get the complete payload from an Avro iterator to avoid issues with streaming interruption on weaker connections.

Arguments:
  • iterator: The iterator over Avro files read from the stream.
  • url: The URL at which the Avro payload was found.
Returns:

Iterable over the parsed Avro records.

def get_all_hauls( meta: afscgap.flat_model.ExecuteMetaParams) -> Iterable[afscgap.flat_model.HaulKey]:
111def get_all_hauls(meta: afscgap.flat_model.ExecuteMetaParams) -> HAUL_KEYS:
112    """Get information about all hauls currently available.
113
114    Args:
115        meta: Configuration object which indicates how the all hauls index should be requested. This
116            can, for example, be used to configure the server from which data are streamed.
117
118    Returns:
119        Iterator over haul information as requested from the remote server.
120    """
121    urls = list(get_index_urls(meta))
122    assert len(urls) == 1
123    url = urls[0]
124
125    requestor_maybe = meta.get_requestor()
126    requestor = requestor_maybe if requestor_maybe else build_requestor()
127    response = requestor(url)
128
129    afscgap.http_util.check_result(response)
130
131    stream = response.raw
132    dict_stream = get_complete(fastavro.reader(stream), url)  # type: ignore
133    obj_stream = map(build_haul_from_avro, dict_stream)  # type: ignore
134    return obj_stream

Get information about all hauls currently available.

Arguments:
  • meta: Configuration object which indicates how the all hauls index should be requested. This can, for example, be used to configure the server from which data are streamed.
Returns:

Iterator over haul information as requested from the remote server.

def get_hauls_for_index_filter( meta: afscgap.flat_model.ExecuteMetaParams, index_filter: afscgap.flat_index_util.IndexFilter) -> Iterable[afscgap.flat_model.HaulKey]:
137def get_hauls_for_index_filter(meta: afscgap.flat_model.ExecuteMetaParams,
138    index_filter: afscgap.flat_index_util.IndexFilter) -> HAUL_KEYS:
139    """Get hauls which may match a filter using precomputed indicies.
140
141    Get all hauls which may match a filter using pre-computed Avro haul indicies which may prevent
142    the query from requiring all catch data to be downloaded.
143
144    Args:
145        meta: Configuration object which indicates how the all hauls index should be requested. This
146            can, for example, be used to configure the server from which data are streamed.
147        index_filter: Information about the filter to be applied against a precomputed index.
148
149    Returns:
150        Iterable over haul keys which may match the specified filter.
151    """
152    urls = get_index_urls(meta, index_filter)
153
154    def get_for_url(url):
155        requestor_maybe = meta.get_requestor()
156        requestor = requestor_maybe if requestor_maybe else build_requestor()
157        response = requestor(url)
158
159        afscgap.http_util.check_result(response)
160
161        stream = response.raw
162        all_with_value = get_complete(fastavro.reader(stream), url)
163        dict_stream = determine_matching_hauls_from_index(all_with_value, index_filter)
164
165        obj_stream = map(build_haul_from_avro, dict_stream)
166        return obj_stream
167
168    return itertools.chain(*map(get_for_url, urls))

Get hauls which may match a filter using precomputed indicies.

Get all hauls which may match a filter using pre-computed Avro haul indicies which may prevent the query from requiring all catch data to be downloaded.

Arguments:
  • meta: Configuration object which indicates how the all hauls index should be requested. This can, for example, be used to configure the server from which data are streamed.
  • index_filter: Information about the filter to be applied against a precomputed index.
Returns:

Iterable over haul keys which may match the specified filter.

def get_records_for_haul( meta: afscgap.flat_model.ExecuteMetaParams, haul: afscgap.flat_model.HaulKey) -> Iterable[afscgap.model.Record]:
171def get_records_for_haul(meta: afscgap.flat_model.ExecuteMetaParams,
172    haul: afscgap.flat_model.HaulKey) -> RECORDS:
173    """Get the joined records from the hauls provided.
174
175    Args:
176        meta: Configuration object which indicates how the all hauls index should be requested. This
177            can, for example, be used to configure the server from which data are streamed.
178        haul: The haul for which records should be returned.
179
180    Returns:
181        All joined records for the given haul.
182    """
183    path = haul.get_path()
184    url = meta.get_base_url() + path
185
186    requestor_maybe = meta.get_requestor()
187    requestor = requestor_maybe if requestor_maybe else build_requestor()
188    response = requestor(url)
189
190    afscgap.http_util.check_result(response)
191
192    stream = response.raw
193    dict_stream = get_complete(fastavro.reader(stream), url)
194    obj_stream = map(lambda x: afscgap.flat_model.FlatRecord(x), dict_stream)
195    return obj_stream

Get the joined records from the hauls provided.

Arguments:
  • meta: Configuration object which indicates how the all hauls index should be requested. This can, for example, be used to configure the server from which data are streamed.
  • haul: The haul for which records should be returned.
Returns:

All joined records for the given haul.