afscgap.flat_cursor
Interfaces for cursor objects which iterate over records from prejoined flat records.
Interfaces for cursor objects which iterate over records from prejoined flat records, either those appearing in the underlying unjoined dataset or zero catch inferred records.
(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center for Data Science and the Environment at UC Berkeley.
This file is part of afscgap released under the BSD 3-Clause License. See LICENSE.md.
1""" 2Interfaces for cursor objects which iterate over records from prejoined flat records. 3 4Interfaces for cursor objects which iterate over records from prejoined flat records, either those 5appearing in the underlying unjoined dataset or zero catch inferred records. 6 7(c) 2025 Regents of University of California / The Eric and Wendy Schmidt Center 8for Data Science and the Environment at UC Berkeley. 9 10This file is part of afscgap released under the BSD 3-Clause License. See 11LICENSE.md. 12""" 13import queue 14import typing 15 16import afscgap.cursor 17import afscgap.model 18 19from afscgap.flat_model import RECORDS 20from afscgap.typesdef import OPT_INT 21 22 23class FlatCursor(afscgap.cursor.Cursor): 24 """Cursor over flat avro "prejoined" results.""" 25 26 def __init__(self, records: RECORDS): 27 """Create a new prejoined cursor. 28 29 Create a new cursor over prejoined flat files which have already been parsed or which are 30 emitted prejoined. 31 32 Args: 33 records: The inner iterator that should be decorated by this cursor. 34 """ 35 self._records = records 36 self._records_iter = iter(self._records) 37 38 def get_limit(self) -> OPT_INT: 39 """Get the overall limit. 40 41 Returns: 42 The maximum number of records to return. 43 """ 44 return None 45 46 def get_filtering_incomplete(self) -> bool: 47 """Determine if this cursor is silently filtering incomplete records. 48 49 Returns: 50 Flag indicating if incomplete records should be silently filtered. 51 If true, they will not be returned during iteration and placed in 52 the queue at get_invalid(). If false, they will be returned and 53 those incomplete records' get_complete() will return false. 54 """ 55 return False 56 57 def get_invalid(self) -> 'queue.Queue[dict]': 58 """Get a queue of invalid / incomplete records found so far. 59 60 Returns: 61 Queue with dictionaries containing the raw data returned from the 62 remote that did not have valid values for all required fields. Note 63 that this will include incomplete records as well if 64 get_filtering_incomplete() is true and will not contain incomplete 65 records otherwise. 66 """ 67 return queue.Queue() 68 69 def to_dicts(self) -> typing.Iterable[dict]: 70 """Create an iterator which converts Records to dicts. 71 72 Returns: 73 Iterator which returns dictionaries instead of Record objects but 74 has otherwise the same beahavior as iterating in this Cursor 75 directly. 76 """ 77 return map(lambda x: x.to_dict(), self) 78 79 def get_next(self) -> typing.Optional[afscgap.model.Record]: 80 """Get the next value for this Cursor. 81 82 Returns: 83 The next value waiting if cached in the cursor's results queue or 84 as just retrieved from a new page gathered by HTTP request. Will 85 return None if no remain. 86 """ 87 try: 88 return next(self._records_iter) 89 except StopIteration: 90 return None 91 92 93class CompleteCursor(afscgap.cursor.Cursor): 94 """Cursor decorator which only yields complete records.""" 95 96 def __init__(self, inner: afscgap.cursor.Cursor): 97 """Create a new decorator for another cursor which filters for complete records. 98 99 Args: 100 inner: The cursor to decorate. 101 """ 102 self._inner = inner 103 self._invalid: queue.Queue[dict] = queue.Queue() 104 105 def get_limit(self) -> OPT_INT: 106 """Get the overall limit. 107 108 Returns: 109 The maximum number of records to return. 110 """ 111 return self._inner.get_limit() 112 113 def get_filtering_incomplete(self) -> bool: 114 """Determine if this cursor is silently filtering incomplete records. 115 116 Returns: 117 Flag indicating if incomplete records should be silently filtered. 118 If true, they will not be returned during iteration and placed in 119 the queue at get_invalid(). If false, they will be returned and 120 those incomplete records' get_complete() will return false. 121 """ 122 return True 123 124 def to_dicts(self) -> typing.Iterable[dict]: 125 """Create an iterator which converts Records to dicts. 126 127 Returns: 128 Iterator which returns dictionaries instead of Record objects but 129 has otherwise the same beahavior as iterating in this Cursor 130 directly. 131 """ 132 return map(lambda x: x.to_dict(), self) 133 134 def get_next(self) -> typing.Optional[afscgap.model.Record]: 135 """Get the next value for this Cursor. 136 137 Returns: 138 The next value waiting if cached in the cursor's results queue or 139 as just retrieved from a new page gathered by HTTP request. Will 140 return None if no remain. 141 """ 142 done = False 143 144 next_candidate = None 145 146 while not done: 147 next_candidate = self._inner.get_next() 148 149 if next_candidate is None: 150 return None 151 152 is_complete = next_candidate.is_complete() 153 154 if not is_complete: 155 next_candidate_cast: afscgap.flat_model.FlatRecord = next_candidate # type: ignore 156 self._invalid.put(next_candidate_cast.get_inner()) 157 158 done = is_complete 159 160 return next_candidate 161 162 163class LimitCursor(afscgap.cursor.Cursor): 164 """Create a decorator which limits cursor iteration to a certain count of returned records.""" 165 166 def __init__(self, inner: afscgap.cursor.Cursor, limit: int): 167 """Create a new limit decorator around an existing cursor. 168 169 Create a new limit decorator around an existing cursor which terminates iteration either 170 after a limit is reached or no additional records are available. 171 172 Args: 173 inner: The cursor to decorate. 174 limit: The integer count of records after which iteration will be terminated. 175 """ 176 self._inner = inner 177 self._limit = limit 178 self._remaining = limit 179 180 def get_limit(self) -> OPT_INT: 181 """Get the overall limit. 182 183 Returns: 184 The maximum number of records to return. 185 """ 186 return self._limit 187 188 def get_filtering_incomplete(self) -> bool: 189 """Determine if this cursor is silently filtering incomplete records. 190 191 Returns: 192 Flag indicating if incomplete records should be silently filtered. 193 If true, they will not be returned during iteration and placed in 194 the queue at get_invalid(). If false, they will be returned and 195 those incomplete records' get_complete() will return false. 196 """ 197 return self._inner.get_filtering_incomplete() 198 199 def to_dicts(self) -> typing.Iterable[dict]: 200 """Create an iterator which converts Records to dicts. 201 202 Returns: 203 Iterator which returns dictionaries instead of Record objects but 204 has otherwise the same beahavior as iterating in this Cursor 205 directly. 206 """ 207 return map(lambda x: x.to_dict(), self) 208 209 def get_next(self) -> typing.Optional[afscgap.model.Record]: 210 """Get the next value for this Cursor. 211 212 Returns: 213 The next value waiting if cached in the cursor's results queue or 214 as just retrieved from a new page gathered by HTTP request. Will 215 return None if no remain. 216 """ 217 if self._remaining == 0: 218 return None 219 220 next_candidate = self._inner.get_next() 221 222 if next_candidate is None: 223 return None 224 else: 225 self._remaining -= 1 226 return next_candidate
24class FlatCursor(afscgap.cursor.Cursor): 25 """Cursor over flat avro "prejoined" results.""" 26 27 def __init__(self, records: RECORDS): 28 """Create a new prejoined cursor. 29 30 Create a new cursor over prejoined flat files which have already been parsed or which are 31 emitted prejoined. 32 33 Args: 34 records: The inner iterator that should be decorated by this cursor. 35 """ 36 self._records = records 37 self._records_iter = iter(self._records) 38 39 def get_limit(self) -> OPT_INT: 40 """Get the overall limit. 41 42 Returns: 43 The maximum number of records to return. 44 """ 45 return None 46 47 def get_filtering_incomplete(self) -> bool: 48 """Determine if this cursor is silently filtering incomplete records. 49 50 Returns: 51 Flag indicating if incomplete records should be silently filtered. 52 If true, they will not be returned during iteration and placed in 53 the queue at get_invalid(). If false, they will be returned and 54 those incomplete records' get_complete() will return false. 55 """ 56 return False 57 58 def get_invalid(self) -> 'queue.Queue[dict]': 59 """Get a queue of invalid / incomplete records found so far. 60 61 Returns: 62 Queue with dictionaries containing the raw data returned from the 63 remote that did not have valid values for all required fields. Note 64 that this will include incomplete records as well if 65 get_filtering_incomplete() is true and will not contain incomplete 66 records otherwise. 67 """ 68 return queue.Queue() 69 70 def to_dicts(self) -> typing.Iterable[dict]: 71 """Create an iterator which converts Records to dicts. 72 73 Returns: 74 Iterator which returns dictionaries instead of Record objects but 75 has otherwise the same beahavior as iterating in this Cursor 76 directly. 77 """ 78 return map(lambda x: x.to_dict(), self) 79 80 def get_next(self) -> typing.Optional[afscgap.model.Record]: 81 """Get the next value for this Cursor. 82 83 Returns: 84 The next value waiting if cached in the cursor's results queue or 85 as just retrieved from a new page gathered by HTTP request. Will 86 return None if no remain. 87 """ 88 try: 89 return next(self._records_iter) 90 except StopIteration: 91 return None
Cursor over flat avro "prejoined" results.
27 def __init__(self, records: RECORDS): 28 """Create a new prejoined cursor. 29 30 Create a new cursor over prejoined flat files which have already been parsed or which are 31 emitted prejoined. 32 33 Args: 34 records: The inner iterator that should be decorated by this cursor. 35 """ 36 self._records = records 37 self._records_iter = iter(self._records)
Create a new prejoined cursor.
Create a new cursor over prejoined flat files which have already been parsed or which are emitted prejoined.
Arguments:
- records: The inner iterator that should be decorated by this cursor.
39 def get_limit(self) -> OPT_INT: 40 """Get the overall limit. 41 42 Returns: 43 The maximum number of records to return. 44 """ 45 return None
Get the overall limit.
Returns:
The maximum number of records to return.
47 def get_filtering_incomplete(self) -> bool: 48 """Determine if this cursor is silently filtering incomplete records. 49 50 Returns: 51 Flag indicating if incomplete records should be silently filtered. 52 If true, they will not be returned during iteration and placed in 53 the queue at get_invalid(). If false, they will be returned and 54 those incomplete records' get_complete() will return false. 55 """ 56 return False
Determine if this cursor is silently filtering incomplete records.
Returns:
Flag indicating if incomplete records should be silently filtered. If true, they will not be returned during iteration and placed in the queue at get_invalid(). If false, they will be returned and those incomplete records' get_complete() will return false.
58 def get_invalid(self) -> 'queue.Queue[dict]': 59 """Get a queue of invalid / incomplete records found so far. 60 61 Returns: 62 Queue with dictionaries containing the raw data returned from the 63 remote that did not have valid values for all required fields. Note 64 that this will include incomplete records as well if 65 get_filtering_incomplete() is true and will not contain incomplete 66 records otherwise. 67 """ 68 return queue.Queue()
Get a queue of invalid / incomplete records found so far.
Returns:
Queue with dictionaries containing the raw data returned from the remote that did not have valid values for all required fields. Note that this will include incomplete records as well if get_filtering_incomplete() is true and will not contain incomplete records otherwise.
70 def to_dicts(self) -> typing.Iterable[dict]: 71 """Create an iterator which converts Records to dicts. 72 73 Returns: 74 Iterator which returns dictionaries instead of Record objects but 75 has otherwise the same beahavior as iterating in this Cursor 76 directly. 77 """ 78 return map(lambda x: x.to_dict(), self)
Create an iterator which converts Records to dicts.
Returns:
Iterator which returns dictionaries instead of Record objects but has otherwise the same beahavior as iterating in this Cursor directly.
80 def get_next(self) -> typing.Optional[afscgap.model.Record]: 81 """Get the next value for this Cursor. 82 83 Returns: 84 The next value waiting if cached in the cursor's results queue or 85 as just retrieved from a new page gathered by HTTP request. Will 86 return None if no remain. 87 """ 88 try: 89 return next(self._records_iter) 90 except StopIteration: 91 return None
Get the next value for this Cursor.
Returns:
The next value waiting if cached in the cursor's results queue or as just retrieved from a new page gathered by HTTP request. Will return None if no remain.
94class CompleteCursor(afscgap.cursor.Cursor): 95 """Cursor decorator which only yields complete records.""" 96 97 def __init__(self, inner: afscgap.cursor.Cursor): 98 """Create a new decorator for another cursor which filters for complete records. 99 100 Args: 101 inner: The cursor to decorate. 102 """ 103 self._inner = inner 104 self._invalid: queue.Queue[dict] = queue.Queue() 105 106 def get_limit(self) -> OPT_INT: 107 """Get the overall limit. 108 109 Returns: 110 The maximum number of records to return. 111 """ 112 return self._inner.get_limit() 113 114 def get_filtering_incomplete(self) -> bool: 115 """Determine if this cursor is silently filtering incomplete records. 116 117 Returns: 118 Flag indicating if incomplete records should be silently filtered. 119 If true, they will not be returned during iteration and placed in 120 the queue at get_invalid(). If false, they will be returned and 121 those incomplete records' get_complete() will return false. 122 """ 123 return True 124 125 def to_dicts(self) -> typing.Iterable[dict]: 126 """Create an iterator which converts Records to dicts. 127 128 Returns: 129 Iterator which returns dictionaries instead of Record objects but 130 has otherwise the same beahavior as iterating in this Cursor 131 directly. 132 """ 133 return map(lambda x: x.to_dict(), self) 134 135 def get_next(self) -> typing.Optional[afscgap.model.Record]: 136 """Get the next value for this Cursor. 137 138 Returns: 139 The next value waiting if cached in the cursor's results queue or 140 as just retrieved from a new page gathered by HTTP request. Will 141 return None if no remain. 142 """ 143 done = False 144 145 next_candidate = None 146 147 while not done: 148 next_candidate = self._inner.get_next() 149 150 if next_candidate is None: 151 return None 152 153 is_complete = next_candidate.is_complete() 154 155 if not is_complete: 156 next_candidate_cast: afscgap.flat_model.FlatRecord = next_candidate # type: ignore 157 self._invalid.put(next_candidate_cast.get_inner()) 158 159 done = is_complete 160 161 return next_candidate
Cursor decorator which only yields complete records.
97 def __init__(self, inner: afscgap.cursor.Cursor): 98 """Create a new decorator for another cursor which filters for complete records. 99 100 Args: 101 inner: The cursor to decorate. 102 """ 103 self._inner = inner 104 self._invalid: queue.Queue[dict] = queue.Queue()
Create a new decorator for another cursor which filters for complete records.
Arguments:
- inner: The cursor to decorate.
106 def get_limit(self) -> OPT_INT: 107 """Get the overall limit. 108 109 Returns: 110 The maximum number of records to return. 111 """ 112 return self._inner.get_limit()
Get the overall limit.
Returns:
The maximum number of records to return.
114 def get_filtering_incomplete(self) -> bool: 115 """Determine if this cursor is silently filtering incomplete records. 116 117 Returns: 118 Flag indicating if incomplete records should be silently filtered. 119 If true, they will not be returned during iteration and placed in 120 the queue at get_invalid(). If false, they will be returned and 121 those incomplete records' get_complete() will return false. 122 """ 123 return True
Determine if this cursor is silently filtering incomplete records.
Returns:
Flag indicating if incomplete records should be silently filtered. If true, they will not be returned during iteration and placed in the queue at get_invalid(). If false, they will be returned and those incomplete records' get_complete() will return false.
125 def to_dicts(self) -> typing.Iterable[dict]: 126 """Create an iterator which converts Records to dicts. 127 128 Returns: 129 Iterator which returns dictionaries instead of Record objects but 130 has otherwise the same beahavior as iterating in this Cursor 131 directly. 132 """ 133 return map(lambda x: x.to_dict(), self)
Create an iterator which converts Records to dicts.
Returns:
Iterator which returns dictionaries instead of Record objects but has otherwise the same beahavior as iterating in this Cursor directly.
135 def get_next(self) -> typing.Optional[afscgap.model.Record]: 136 """Get the next value for this Cursor. 137 138 Returns: 139 The next value waiting if cached in the cursor's results queue or 140 as just retrieved from a new page gathered by HTTP request. Will 141 return None if no remain. 142 """ 143 done = False 144 145 next_candidate = None 146 147 while not done: 148 next_candidate = self._inner.get_next() 149 150 if next_candidate is None: 151 return None 152 153 is_complete = next_candidate.is_complete() 154 155 if not is_complete: 156 next_candidate_cast: afscgap.flat_model.FlatRecord = next_candidate # type: ignore 157 self._invalid.put(next_candidate_cast.get_inner()) 158 159 done = is_complete 160 161 return next_candidate
Get the next value for this Cursor.
Returns:
The next value waiting if cached in the cursor's results queue or as just retrieved from a new page gathered by HTTP request. Will return None if no remain.
Inherited Members
164class LimitCursor(afscgap.cursor.Cursor): 165 """Create a decorator which limits cursor iteration to a certain count of returned records.""" 166 167 def __init__(self, inner: afscgap.cursor.Cursor, limit: int): 168 """Create a new limit decorator around an existing cursor. 169 170 Create a new limit decorator around an existing cursor which terminates iteration either 171 after a limit is reached or no additional records are available. 172 173 Args: 174 inner: The cursor to decorate. 175 limit: The integer count of records after which iteration will be terminated. 176 """ 177 self._inner = inner 178 self._limit = limit 179 self._remaining = limit 180 181 def get_limit(self) -> OPT_INT: 182 """Get the overall limit. 183 184 Returns: 185 The maximum number of records to return. 186 """ 187 return self._limit 188 189 def get_filtering_incomplete(self) -> bool: 190 """Determine if this cursor is silently filtering incomplete records. 191 192 Returns: 193 Flag indicating if incomplete records should be silently filtered. 194 If true, they will not be returned during iteration and placed in 195 the queue at get_invalid(). If false, they will be returned and 196 those incomplete records' get_complete() will return false. 197 """ 198 return self._inner.get_filtering_incomplete() 199 200 def to_dicts(self) -> typing.Iterable[dict]: 201 """Create an iterator which converts Records to dicts. 202 203 Returns: 204 Iterator which returns dictionaries instead of Record objects but 205 has otherwise the same beahavior as iterating in this Cursor 206 directly. 207 """ 208 return map(lambda x: x.to_dict(), self) 209 210 def get_next(self) -> typing.Optional[afscgap.model.Record]: 211 """Get the next value for this Cursor. 212 213 Returns: 214 The next value waiting if cached in the cursor's results queue or 215 as just retrieved from a new page gathered by HTTP request. Will 216 return None if no remain. 217 """ 218 if self._remaining == 0: 219 return None 220 221 next_candidate = self._inner.get_next() 222 223 if next_candidate is None: 224 return None 225 else: 226 self._remaining -= 1 227 return next_candidate
Create a decorator which limits cursor iteration to a certain count of returned records.
167 def __init__(self, inner: afscgap.cursor.Cursor, limit: int): 168 """Create a new limit decorator around an existing cursor. 169 170 Create a new limit decorator around an existing cursor which terminates iteration either 171 after a limit is reached or no additional records are available. 172 173 Args: 174 inner: The cursor to decorate. 175 limit: The integer count of records after which iteration will be terminated. 176 """ 177 self._inner = inner 178 self._limit = limit 179 self._remaining = limit
Create a new limit decorator around an existing cursor.
Create a new limit decorator around an existing cursor which terminates iteration either after a limit is reached or no additional records are available.
Arguments:
- inner: The cursor to decorate.
- limit: The integer count of records after which iteration will be terminated.
181 def get_limit(self) -> OPT_INT: 182 """Get the overall limit. 183 184 Returns: 185 The maximum number of records to return. 186 """ 187 return self._limit
Get the overall limit.
Returns:
The maximum number of records to return.
189 def get_filtering_incomplete(self) -> bool: 190 """Determine if this cursor is silently filtering incomplete records. 191 192 Returns: 193 Flag indicating if incomplete records should be silently filtered. 194 If true, they will not be returned during iteration and placed in 195 the queue at get_invalid(). If false, they will be returned and 196 those incomplete records' get_complete() will return false. 197 """ 198 return self._inner.get_filtering_incomplete()
Determine if this cursor is silently filtering incomplete records.
Returns:
Flag indicating if incomplete records should be silently filtered. If true, they will not be returned during iteration and placed in the queue at get_invalid(). If false, they will be returned and those incomplete records' get_complete() will return false.
200 def to_dicts(self) -> typing.Iterable[dict]: 201 """Create an iterator which converts Records to dicts. 202 203 Returns: 204 Iterator which returns dictionaries instead of Record objects but 205 has otherwise the same beahavior as iterating in this Cursor 206 directly. 207 """ 208 return map(lambda x: x.to_dict(), self)
Create an iterator which converts Records to dicts.
Returns:
Iterator which returns dictionaries instead of Record objects but has otherwise the same beahavior as iterating in this Cursor directly.
210 def get_next(self) -> typing.Optional[afscgap.model.Record]: 211 """Get the next value for this Cursor. 212 213 Returns: 214 The next value waiting if cached in the cursor's results queue or 215 as just retrieved from a new page gathered by HTTP request. Will 216 return None if no remain. 217 """ 218 if self._remaining == 0: 219 return None 220 221 next_candidate = self._inner.get_next() 222 223 if next_candidate is None: 224 return None 225 else: 226 self._remaining -= 1 227 return next_candidate
Get the next value for this Cursor.
Returns:
The next value waiting if cached in the cursor's results queue or as just retrieved from a new page gathered by HTTP request. Will return None if no remain.