afscgap.flat_cursor

Interfaces for cursor objects which iterate over records from prejoined flat records.

Interfaces for cursor objects which iterate over records from prejoined flat records, either those appearing in the underlying unjoined dataset or zero catch inferred records.

(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center for Data Science and the Environment at UC Berkeley.

This file is part of afscgap released under the BSD 3-Clause License. See LICENSE.md.

  1"""
  2Interfaces for cursor objects which iterate over records from prejoined flat records.
  3
  4Interfaces for cursor objects which iterate over records from prejoined flat records, either those
  5appearing in the underlying unjoined dataset or zero catch inferred records.
  6
  7(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center
  8for Data Science and the Environment at UC Berkeley.
  9
 10This file is part of afscgap released under the BSD 3-Clause License. See
 11LICENSE.md.
 12"""
 13import queue
 14import typing
 15
 16import afscgap.cursor
 17import afscgap.model
 18
 19from afscgap.flat_model import RECORDS
 20from afscgap.typesdef import OPT_INT
 21
 22
 23class FlatCursor(afscgap.cursor.Cursor):
 24    """Cursor over flat avro "prejoined" results."""
 25
 26    def __init__(self, records: RECORDS):
 27        """Create a new prejoined cursor.
 28
 29        Create a new cursor over prejoined flat files which have already been parsed or which are
 30        emitted prejoined.
 31
 32        Args:
 33            records: The inner iterator that should be decorated by this cursor.
 34        """
 35        self._records = records
 36        self._records_iter = iter(self._records)
 37
 38    def get_limit(self) -> OPT_INT:
 39        """Get the overall limit.
 40
 41        Returns:
 42            The maximum number of records to return.
 43        """
 44        return None
 45
 46    def get_filtering_incomplete(self) -> bool:
 47        """Determine if this cursor is silently filtering incomplete records.
 48
 49        Returns:
 50            Flag indicating if incomplete records should be silently filtered.
 51            If true, they will not be returned during iteration and placed in
 52            the queue at get_invalid(). If false, they will be returned and
 53            those incomplete records' get_complete() will return false.
 54        """
 55        return False
 56
 57    def get_invalid(self) -> 'queue.Queue[dict]':
 58        """Get a queue of invalid / incomplete records found so far.
 59
 60        Returns:
 61            Queue with dictionaries containing the raw data returned from the
 62            remote that did not have valid values for all required fields. Note
 63            that this will include incomplete records as well if
 64            get_filtering_incomplete() is true and will not contain incomplete
 65            records otherwise.
 66        """
 67        return queue.Queue()
 68
 69    def to_dicts(self) -> typing.Iterable[dict]:
 70        """Create an iterator which converts Records to dicts.
 71
 72        Returns:
 73            Iterator which returns dictionaries instead of Record objects but
 74            has otherwise the same beahavior as iterating in this Cursor
 75            directly.
 76        """
 77        return map(lambda x: x.to_dict(), self)
 78
 79    def get_next(self) -> typing.Optional[afscgap.model.Record]:
 80        """Get the next value for this Cursor.
 81
 82        Returns:
 83            The next value waiting if cached in the cursor's results queue or
 84            as just retrieved from a new page gathered by HTTP request. Will
 85            return None if no remain.
 86        """
 87        try:
 88            return next(self._records_iter)
 89        except StopIteration:
 90            return None
 91
 92
 93class CompleteCursor(afscgap.cursor.Cursor):
 94    """Cursor decorator which only yields complete records."""
 95
 96    def __init__(self, inner: afscgap.cursor.Cursor):
 97        """Create a new decorator for another cursor which filters for complete records.
 98
 99        Args:
100            inner: The cursor to decorate.
101        """
102        self._inner = inner
103        self._invalid: queue.Queue[dict] = queue.Queue()
104
105    def get_limit(self) -> OPT_INT:
106        """Get the overall limit.
107
108        Returns:
109            The maximum number of records to return.
110        """
111        return self._inner.get_limit()
112
113    def get_filtering_incomplete(self) -> bool:
114        """Determine if this cursor is silently filtering incomplete records.
115
116        Returns:
117            Flag indicating if incomplete records should be silently filtered.
118            If true, they will not be returned during iteration and placed in
119            the queue at get_invalid(). If false, they will be returned and
120            those incomplete records' get_complete() will return false.
121        """
122        return True
123
124    def to_dicts(self) -> typing.Iterable[dict]:
125        """Create an iterator which converts Records to dicts.
126
127        Returns:
128            Iterator which returns dictionaries instead of Record objects but
129            has otherwise the same beahavior as iterating in this Cursor
130            directly.
131        """
132        return map(lambda x: x.to_dict(), self)
133
134    def get_next(self) -> typing.Optional[afscgap.model.Record]:
135        """Get the next value for this Cursor.
136
137        Returns:
138            The next value waiting if cached in the cursor's results queue or
139            as just retrieved from a new page gathered by HTTP request. Will
140            return None if no remain.
141        """
142        done = False
143
144        next_candidate = None
145
146        while not done:
147            next_candidate = self._inner.get_next()
148
149            if next_candidate is None:
150                return None
151
152            is_complete = next_candidate.is_complete()
153
154            if not is_complete:
155                next_candidate_cast: afscgap.flat_model.FlatRecord = next_candidate  # type: ignore
156                self._invalid.put(next_candidate_cast.get_inner())
157
158            done = is_complete
159
160        return next_candidate
161
162
163class LimitCursor(afscgap.cursor.Cursor):
164    """Create a decorator which limits cursor iteration to a certain count of returned records."""
165
166    def __init__(self, inner: afscgap.cursor.Cursor, limit: int):
167        """Create a new limit decorator around an existing cursor.
168
169        Create a new limit decorator around an existing cursor which terminates iteration either
170        after a limit is reached or no additional records are available.
171
172        Args:
173            inner: The cursor to decorate.
174            limit: The integer count of records after which iteration will be terminated.
175        """
176        self._inner = inner
177        self._limit = limit
178        self._remaining = limit
179
180    def get_limit(self) -> OPT_INT:
181        """Get the overall limit.
182
183        Returns:
184            The maximum number of records to return.
185        """
186        return self._limit
187
188    def get_filtering_incomplete(self) -> bool:
189        """Determine if this cursor is silently filtering incomplete records.
190
191        Returns:
192            Flag indicating if incomplete records should be silently filtered.
193            If true, they will not be returned during iteration and placed in
194            the queue at get_invalid(). If false, they will be returned and
195            those incomplete records' get_complete() will return false.
196        """
197        return self._inner.get_filtering_incomplete()
198
199    def to_dicts(self) -> typing.Iterable[dict]:
200        """Create an iterator which converts Records to dicts.
201
202        Returns:
203            Iterator which returns dictionaries instead of Record objects but
204            has otherwise the same beahavior as iterating in this Cursor
205            directly.
206        """
207        return map(lambda x: x.to_dict(), self)
208
209    def get_next(self) -> typing.Optional[afscgap.model.Record]:
210        """Get the next value for this Cursor.
211
212        Returns:
213            The next value waiting if cached in the cursor's results queue or
214            as just retrieved from a new page gathered by HTTP request. Will
215            return None if no remain.
216        """
217        if self._remaining == 0:
218            return None
219
220        next_candidate = self._inner.get_next()
221
222        if next_candidate is None:
223            return None
224        else:
225            self._remaining -= 1
226            return next_candidate
class FlatCursor(typing.Iterable[afscgap.model.Record]):
24class FlatCursor(afscgap.cursor.Cursor):
25    """Cursor over flat avro "prejoined" results."""
26
27    def __init__(self, records: RECORDS):
28        """Create a new prejoined cursor.
29
30        Create a new cursor over prejoined flat files which have already been parsed or which are
31        emitted prejoined.
32
33        Args:
34            records: The inner iterator that should be decorated by this cursor.
35        """
36        self._records = records
37        self._records_iter = iter(self._records)
38
39    def get_limit(self) -> OPT_INT:
40        """Get the overall limit.
41
42        Returns:
43            The maximum number of records to return.
44        """
45        return None
46
47    def get_filtering_incomplete(self) -> bool:
48        """Determine if this cursor is silently filtering incomplete records.
49
50        Returns:
51            Flag indicating if incomplete records should be silently filtered.
52            If true, they will not be returned during iteration and placed in
53            the queue at get_invalid(). If false, they will be returned and
54            those incomplete records' get_complete() will return false.
55        """
56        return False
57
58    def get_invalid(self) -> 'queue.Queue[dict]':
59        """Get a queue of invalid / incomplete records found so far.
60
61        Returns:
62            Queue with dictionaries containing the raw data returned from the
63            remote that did not have valid values for all required fields. Note
64            that this will include incomplete records as well if
65            get_filtering_incomplete() is true and will not contain incomplete
66            records otherwise.
67        """
68        return queue.Queue()
69
70    def to_dicts(self) -> typing.Iterable[dict]:
71        """Create an iterator which converts Records to dicts.
72
73        Returns:
74            Iterator which returns dictionaries instead of Record objects but
75            has otherwise the same beahavior as iterating in this Cursor
76            directly.
77        """
78        return map(lambda x: x.to_dict(), self)
79
80    def get_next(self) -> typing.Optional[afscgap.model.Record]:
81        """Get the next value for this Cursor.
82
83        Returns:
84            The next value waiting if cached in the cursor's results queue or
85            as just retrieved from a new page gathered by HTTP request. Will
86            return None if no remain.
87        """
88        try:
89            return next(self._records_iter)
90        except StopIteration:
91            return None

Cursor over flat avro "prejoined" results.

FlatCursor(records: Iterable[afscgap.model.Record])
27    def __init__(self, records: RECORDS):
28        """Create a new prejoined cursor.
29
30        Create a new cursor over prejoined flat files which have already been parsed or which are
31        emitted prejoined.
32
33        Args:
34            records: The inner iterator that should be decorated by this cursor.
35        """
36        self._records = records
37        self._records_iter = iter(self._records)

Create a new prejoined cursor.

Create a new cursor over prejoined flat files which have already been parsed or which are emitted prejoined.

Arguments:
  • records: The inner iterator that should be decorated by this cursor.
def get_limit(self) -> Optional[int]:
39    def get_limit(self) -> OPT_INT:
40        """Get the overall limit.
41
42        Returns:
43            The maximum number of records to return.
44        """
45        return None

Get the overall limit.

Returns:

The maximum number of records to return.

def get_filtering_incomplete(self) -> bool:
47    def get_filtering_incomplete(self) -> bool:
48        """Determine if this cursor is silently filtering incomplete records.
49
50        Returns:
51            Flag indicating if incomplete records should be silently filtered.
52            If true, they will not be returned during iteration and placed in
53            the queue at get_invalid(). If false, they will be returned and
54            those incomplete records' get_complete() will return false.
55        """
56        return False

Determine if this cursor is silently filtering incomplete records.

Returns:

Flag indicating if incomplete records should be silently filtered. If true, they will not be returned during iteration and placed in the queue at get_invalid(). If false, they will be returned and those incomplete records' get_complete() will return false.

def get_invalid(self) -> queue.Queue[dict]:
58    def get_invalid(self) -> 'queue.Queue[dict]':
59        """Get a queue of invalid / incomplete records found so far.
60
61        Returns:
62            Queue with dictionaries containing the raw data returned from the
63            remote that did not have valid values for all required fields. Note
64            that this will include incomplete records as well if
65            get_filtering_incomplete() is true and will not contain incomplete
66            records otherwise.
67        """
68        return queue.Queue()

Get a queue of invalid / incomplete records found so far.

Returns:

Queue with dictionaries containing the raw data returned from the remote that did not have valid values for all required fields. Note that this will include incomplete records as well if get_filtering_incomplete() is true and will not contain incomplete records otherwise.

def to_dicts(self) -> Iterable[dict]:
70    def to_dicts(self) -> typing.Iterable[dict]:
71        """Create an iterator which converts Records to dicts.
72
73        Returns:
74            Iterator which returns dictionaries instead of Record objects but
75            has otherwise the same beahavior as iterating in this Cursor
76            directly.
77        """
78        return map(lambda x: x.to_dict(), self)

Create an iterator which converts Records to dicts.

Returns:

Iterator which returns dictionaries instead of Record objects but has otherwise the same beahavior as iterating in this Cursor directly.

def get_next(self) -> Optional[afscgap.model.Record]:
80    def get_next(self) -> typing.Optional[afscgap.model.Record]:
81        """Get the next value for this Cursor.
82
83        Returns:
84            The next value waiting if cached in the cursor's results queue or
85            as just retrieved from a new page gathered by HTTP request. Will
86            return None if no remain.
87        """
88        try:
89            return next(self._records_iter)
90        except StopIteration:
91            return None

Get the next value for this Cursor.

Returns:

The next value waiting if cached in the cursor's results queue or as just retrieved from a new page gathered by HTTP request. Will return None if no remain.

class CompleteCursor(typing.Iterable[afscgap.model.Record]):
 94class CompleteCursor(afscgap.cursor.Cursor):
 95    """Cursor decorator which only yields complete records."""
 96
 97    def __init__(self, inner: afscgap.cursor.Cursor):
 98        """Create a new decorator for another cursor which filters for complete records.
 99
100        Args:
101            inner: The cursor to decorate.
102        """
103        self._inner = inner
104        self._invalid: queue.Queue[dict] = queue.Queue()
105
106    def get_limit(self) -> OPT_INT:
107        """Get the overall limit.
108
109        Returns:
110            The maximum number of records to return.
111        """
112        return self._inner.get_limit()
113
114    def get_filtering_incomplete(self) -> bool:
115        """Determine if this cursor is silently filtering incomplete records.
116
117        Returns:
118            Flag indicating if incomplete records should be silently filtered.
119            If true, they will not be returned during iteration and placed in
120            the queue at get_invalid(). If false, they will be returned and
121            those incomplete records' get_complete() will return false.
122        """
123        return True
124
125    def to_dicts(self) -> typing.Iterable[dict]:
126        """Create an iterator which converts Records to dicts.
127
128        Returns:
129            Iterator which returns dictionaries instead of Record objects but
130            has otherwise the same beahavior as iterating in this Cursor
131            directly.
132        """
133        return map(lambda x: x.to_dict(), self)
134
135    def get_next(self) -> typing.Optional[afscgap.model.Record]:
136        """Get the next value for this Cursor.
137
138        Returns:
139            The next value waiting if cached in the cursor's results queue or
140            as just retrieved from a new page gathered by HTTP request. Will
141            return None if no remain.
142        """
143        done = False
144
145        next_candidate = None
146
147        while not done:
148            next_candidate = self._inner.get_next()
149
150            if next_candidate is None:
151                return None
152
153            is_complete = next_candidate.is_complete()
154
155            if not is_complete:
156                next_candidate_cast: afscgap.flat_model.FlatRecord = next_candidate  # type: ignore
157                self._invalid.put(next_candidate_cast.get_inner())
158
159            done = is_complete
160
161        return next_candidate

Cursor decorator which only yields complete records.

CompleteCursor(inner: afscgap.cursor.Cursor)
 97    def __init__(self, inner: afscgap.cursor.Cursor):
 98        """Create a new decorator for another cursor which filters for complete records.
 99
100        Args:
101            inner: The cursor to decorate.
102        """
103        self._inner = inner
104        self._invalid: queue.Queue[dict] = queue.Queue()

Create a new decorator for another cursor which filters for complete records.

Arguments:
  • inner: The cursor to decorate.
def get_limit(self) -> Optional[int]:
106    def get_limit(self) -> OPT_INT:
107        """Get the overall limit.
108
109        Returns:
110            The maximum number of records to return.
111        """
112        return self._inner.get_limit()

Get the overall limit.

Returns:

The maximum number of records to return.

def get_filtering_incomplete(self) -> bool:
114    def get_filtering_incomplete(self) -> bool:
115        """Determine if this cursor is silently filtering incomplete records.
116
117        Returns:
118            Flag indicating if incomplete records should be silently filtered.
119            If true, they will not be returned during iteration and placed in
120            the queue at get_invalid(). If false, they will be returned and
121            those incomplete records' get_complete() will return false.
122        """
123        return True

Determine if this cursor is silently filtering incomplete records.

Returns:

Flag indicating if incomplete records should be silently filtered. If true, they will not be returned during iteration and placed in the queue at get_invalid(). If false, they will be returned and those incomplete records' get_complete() will return false.

def to_dicts(self) -> Iterable[dict]:
125    def to_dicts(self) -> typing.Iterable[dict]:
126        """Create an iterator which converts Records to dicts.
127
128        Returns:
129            Iterator which returns dictionaries instead of Record objects but
130            has otherwise the same beahavior as iterating in this Cursor
131            directly.
132        """
133        return map(lambda x: x.to_dict(), self)

Create an iterator which converts Records to dicts.

Returns:

Iterator which returns dictionaries instead of Record objects but has otherwise the same beahavior as iterating in this Cursor directly.

def get_next(self) -> Optional[afscgap.model.Record]:
135    def get_next(self) -> typing.Optional[afscgap.model.Record]:
136        """Get the next value for this Cursor.
137
138        Returns:
139            The next value waiting if cached in the cursor's results queue or
140            as just retrieved from a new page gathered by HTTP request. Will
141            return None if no remain.
142        """
143        done = False
144
145        next_candidate = None
146
147        while not done:
148            next_candidate = self._inner.get_next()
149
150            if next_candidate is None:
151                return None
152
153            is_complete = next_candidate.is_complete()
154
155            if not is_complete:
156                next_candidate_cast: afscgap.flat_model.FlatRecord = next_candidate  # type: ignore
157                self._invalid.put(next_candidate_cast.get_inner())
158
159            done = is_complete
160
161        return next_candidate

Get the next value for this Cursor.

Returns:

The next value waiting if cached in the cursor's results queue or as just retrieved from a new page gathered by HTTP request. Will return None if no remain.

class LimitCursor(typing.Iterable[afscgap.model.Record]):
164class LimitCursor(afscgap.cursor.Cursor):
165    """Create a decorator which limits cursor iteration to a certain count of returned records."""
166
167    def __init__(self, inner: afscgap.cursor.Cursor, limit: int):
168        """Create a new limit decorator around an existing cursor.
169
170        Create a new limit decorator around an existing cursor which terminates iteration either
171        after a limit is reached or no additional records are available.
172
173        Args:
174            inner: The cursor to decorate.
175            limit: The integer count of records after which iteration will be terminated.
176        """
177        self._inner = inner
178        self._limit = limit
179        self._remaining = limit
180
181    def get_limit(self) -> OPT_INT:
182        """Get the overall limit.
183
184        Returns:
185            The maximum number of records to return.
186        """
187        return self._limit
188
189    def get_filtering_incomplete(self) -> bool:
190        """Determine if this cursor is silently filtering incomplete records.
191
192        Returns:
193            Flag indicating if incomplete records should be silently filtered.
194            If true, they will not be returned during iteration and placed in
195            the queue at get_invalid(). If false, they will be returned and
196            those incomplete records' get_complete() will return false.
197        """
198        return self._inner.get_filtering_incomplete()
199
200    def to_dicts(self) -> typing.Iterable[dict]:
201        """Create an iterator which converts Records to dicts.
202
203        Returns:
204            Iterator which returns dictionaries instead of Record objects but
205            has otherwise the same beahavior as iterating in this Cursor
206            directly.
207        """
208        return map(lambda x: x.to_dict(), self)
209
210    def get_next(self) -> typing.Optional[afscgap.model.Record]:
211        """Get the next value for this Cursor.
212
213        Returns:
214            The next value waiting if cached in the cursor's results queue or
215            as just retrieved from a new page gathered by HTTP request. Will
216            return None if no remain.
217        """
218        if self._remaining == 0:
219            return None
220
221        next_candidate = self._inner.get_next()
222
223        if next_candidate is None:
224            return None
225        else:
226            self._remaining -= 1
227            return next_candidate

Create a decorator which limits cursor iteration to a certain count of returned records.

LimitCursor(inner: afscgap.cursor.Cursor, limit: int)
167    def __init__(self, inner: afscgap.cursor.Cursor, limit: int):
168        """Create a new limit decorator around an existing cursor.
169
170        Create a new limit decorator around an existing cursor which terminates iteration either
171        after a limit is reached or no additional records are available.
172
173        Args:
174            inner: The cursor to decorate.
175            limit: The integer count of records after which iteration will be terminated.
176        """
177        self._inner = inner
178        self._limit = limit
179        self._remaining = limit

Create a new limit decorator around an existing cursor.

Create a new limit decorator around an existing cursor which terminates iteration either after a limit is reached or no additional records are available.

Arguments:
  • inner: The cursor to decorate.
  • limit: The integer count of records after which iteration will be terminated.
def get_limit(self) -> Optional[int]:
181    def get_limit(self) -> OPT_INT:
182        """Get the overall limit.
183
184        Returns:
185            The maximum number of records to return.
186        """
187        return self._limit

Get the overall limit.

Returns:

The maximum number of records to return.

def get_filtering_incomplete(self) -> bool:
189    def get_filtering_incomplete(self) -> bool:
190        """Determine if this cursor is silently filtering incomplete records.
191
192        Returns:
193            Flag indicating if incomplete records should be silently filtered.
194            If true, they will not be returned during iteration and placed in
195            the queue at get_invalid(). If false, they will be returned and
196            those incomplete records' get_complete() will return false.
197        """
198        return self._inner.get_filtering_incomplete()

Determine if this cursor is silently filtering incomplete records.

Returns:

Flag indicating if incomplete records should be silently filtered. If true, they will not be returned during iteration and placed in the queue at get_invalid(). If false, they will be returned and those incomplete records' get_complete() will return false.

def to_dicts(self) -> Iterable[dict]:
200    def to_dicts(self) -> typing.Iterable[dict]:
201        """Create an iterator which converts Records to dicts.
202
203        Returns:
204            Iterator which returns dictionaries instead of Record objects but
205            has otherwise the same beahavior as iterating in this Cursor
206            directly.
207        """
208        return map(lambda x: x.to_dict(), self)

Create an iterator which converts Records to dicts.

Returns:

Iterator which returns dictionaries instead of Record objects but has otherwise the same beahavior as iterating in this Cursor directly.

def get_next(self) -> Optional[afscgap.model.Record]:
210    def get_next(self) -> typing.Optional[afscgap.model.Record]:
211        """Get the next value for this Cursor.
212
213        Returns:
214            The next value waiting if cached in the cursor's results queue or
215            as just retrieved from a new page gathered by HTTP request. Will
216            return None if no remain.
217        """
218        if self._remaining == 0:
219            return None
220
221        next_candidate = self._inner.get_next()
222
223        if next_candidate is None:
224            return None
225        else:
226            self._remaining -= 1
227            return next_candidate

Get the next value for this Cursor.

Returns:

The next value waiting if cached in the cursor's results queue or as just retrieved from a new page gathered by HTTP request. Will return None if no remain.