a
.
The situation is that led to this is: I have an API that cannot filter, but it can sort. So the tap requests that the API sorts descending. The strategy is to short circuit and end the processing when the replication key value matches or is lower than the stored state. In this case,
is_sorted
does not work for me, and it looks like
progress_tracking
will not work either, because
This is used to track the max value seen for the replication_key during the current sync.
.
But for the descending sort order being the only option the API provides, any suggestions?
t
Was the original context of this thread deleted?
a
@adam_roderick - This should be possible - but it is a different paradigm than the main use case.
a
@taylor I conflated two different questions. One was a python syntax issue that didn't add anything, so I deleted it
@aaronsteers I'm looking into overriding the logic that applies the maximum value to the replication key value. I don't have my mind wrapped around my approach just yet but will respond here when I do. Any guidance you might have in the meantime is definitely welcome
a
@adam_roderick - After thinking over this more, I have a hunch that perhaps leaving is_sorted=false might just meet your requirements as long as you have a pagination strategy that still loops through the records in the descending manner that respects your desired query pattern. When is_sorted=false, this forces the incremental key tracking to be non-resumable "progress markers" until the successful end of the stream.
If you needed resumability after interruption, this would be much harder, since you've then got a descending tracker for each run but an ascending tracker for the load jobs overall. In theory it would be possible, but that resumability is what creates a lot more complexity. If you can tolerate that failed syncs are not resumable and only a successful sync will save the state bookmark, I think that makes this much easier to design for.
a
yes that is tolerable
The remaining piece is to stop the pagination when the meets a certain timestamp from the past
Do you have a suggestion for best place to override to achieve that?
a
I think simplest way may be to compare the latest pagination pointer (descending) with the stop point provided from get_starting_timestamp(). And rather than calling that method on each pagination loop, you can optionally store both values in a
dict
that defines the pagination token and its end point:
Copy code
next_page_token = { "stop_point": self.get_starting_timestamp(), "current_page": 1 }
If the result of the request gives data past the stop point, return
None
from get_next_page_token() and that'll signal to stop the loop.
If the final page also includes records you don't want to emit, then you may also need to use post_process() or parse_response() to filter those out based on the same comparison with the starting timestamp lower bound.
Does this help?
a
Yes! Some of those functions I hadn't seen before so this definitely points me in the right direction
Thank you
a
Fantastic! Good luck and keep us posted. 😄 Also - have a good weekend 🌅
a
Think I'm getting close here, but getting a circular reference error that I can't pinpoint
From the output below, you can see the last couple of iterations and then the error. Each page returns 2 records, then after page 12, I return
None
from
get_next_page_token
and the next thing that happens is an info message, then the error ```--------- end of 'get_next_page_token'. Returning next_page_token: {'stop_point': None, 'current_page': 11} --------- end of 'get_url_params', returning params: {'page[size]': 2, 'sort': '-updated_at', 'page[number]': 11} time=2021-12-04 074710 name=tap-krow level=INFO message=INFO METRIC: {'type': 'timer', 'metric': 'http_request_duration', 'value': 0.10503, 'tags': {'endpoint': '/organizations', 'http_status_code': 200, 'status': 'succeeded'}} {"type": "RECORD", "stream": "organizations", "record": {"id": "2434b5b1-dcb3-400a-a874-bbdb789af2f0", "name": "the Emoji Movie Cafe Gift Shop The Game", "created_at": "2021-10-13T220340.256Z", "updated_at": "2021-11-11T224250.181Z"}, "time_extracted": "2021-12-04T144710.584584Z"} {"type": "RECORD", "stream": "organizations", "record": {"id": "128cb7f0-afbc-46fc-8459-5d22a32ab614", "name": "Treebeard's Tap House", "created_at": "2021-10-12T190326.888Z", "updated_at": "2021-11-11T224250.167Z"}, "time_extracted": "2021-12-04T144710.585137Z"} --------- end of 'get_next_page_token'. Returning next_page_token: {'stop_point': None, 'current_page': 12} --------- end of 'get_url_params', returning params: {'page[size]': 2, 'sort': '-updated_at', 'page[number]': 12} time=2021-12-04 074710 name=tap-krow level=INFO message=INFO METRIC: {'type': 'timer', 'metric': 'http_request_duration', 'value': 0.119964, 'tags': {'endpoint': '/organizations', 'http_status_code': 200, 'status': 'succeeded'}} {"type": "RECORD", "stream": "organizations", "record": {"id": "10415ac7-058d-4811-bbb1-43a8488ad440", "name": "311 has grassroots, true", "created_at": "2021-10-26T201019.638Z", "updated_at": "2021-11-11T224250.157Z"}, "time_extracted": "2021-12-04T144710.712124Z"} {"type": "RECORD", "stream": "organizations", "record": {"id": "71ba16a3-c4c6-4c16-9ea1-8b63bacee9ba", "name": "kobe", "created_at": "2021-06-24T193747.738Z", "updated_at": "2021-11-11T224009.532Z"}, "time_extracted": "2021-12-04T144710.712615Z"} --------- end of 'get_next_page_token'. Returning next_page_token: None time=2021-12-04 074710 name=tap-krow level=INFO message=INFO METRIC: {'type': 'counter', 'metric': 'record_count', 'value': 24, 'tags': {'stream': 'organizations'}} Traceback (most recent call last): File "<string>", line 1, in <module> File "/home/adam/.cache/pypoetry/virtualenvs/tap-krow-YeTW9i77-py3.8/lib/python3.8/site-packages/click/core.py", line 1128, in call return self.main(*args, **kwargs) File "/home/adam/.cache/pypoetry/virtualenvs/tap-krow-YeTW9i77-py3.8/lib/python3.8/site-packages/click/core.py", line 1053, in main rv = self.invoke(ctx) File "/home/adam/.cache/pypoetry/virtualenvs/tap-krow-YeTW9i77-py3.8/lib/python3.8/site-packages/click/core.py", line 1395, in invoke return ctx.invoke(self.callback, **ctx.params) File "/home/adam/.cache/pypoetry/virtualenvs/tap-krow-YeTW9i77-py3.8/lib/python3.8/site-packages/click/core.py", line 754, in invoke return __callback(*args, **kwargs) File "/home/adam/.cache/pypoetry/virtualenvs/tap-krow-YeTW9i77-py3.8/lib/python3.8/site-packages/singer_sdk/tap_base.py", line 474, in cli tap.sync_all() File "/home/adam/.cache/pypoetry/virtualenvs/tap-krow-YeTW9i77-py3.8/lib/python3.8/site-packages/singer_sdk/tap_base.py", line 343, in sync_all stream.sync() File "/home/adam/.cache/pypoetry/virtualenvs/tap-krow-YeTW9i77-py3.8/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 984, in sync self._sync_records(context) File "/home/adam/.cache/pypoetry/virtualenvs/tap-krow-YeTW9i77-py3.8/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 958, in _sync_records self._write_state_message() File "/home/adam/.cache/pypoetry/virtualenvs/t…
Any ideas why I might be seeing a circular reference?
If I comment out the line
previous_token = {"stop_point": self.get_starting_timestamp(self.stream_state), "current_page": 1}
then I do not see the circular reference error. Digging into the
get_starting_timestamp
function now
There is definitely a circular reference in the state object used to print the final state message. I can extend this indefinitely with nested context/partitions:
Copy code
print(
                    o["value"]["bookmarks"]["organizations"]["partitions"][0]["context"]["partitions"][0]["context"]["partitions"][0]['context']['partitions']
                )
If I make a call to
self.get_starting_timestamp(self.get_context_state(None))
it looks like the SDK is trying to enable partitions. If I exclude that call, then the final state message looks as I expect (and there is no circular reference error)
Copy code
{
  "type": "STATE",
  "value": {
    "bookmarks": {
      "organizations": {
        "replication_key": "updated_at",
        "replication_key_value": "2020-11-30T21:44:28.839Z"
      }
    }
  }
}
Calling
self.get_starting_timestamp(None)
does not show the same circular reference behavior, treats state the way I expect (without partitions), and pulls in the replication key value from the state filee