Andy Carter
04/17/2023, 8:55 AMsince
timestamp parameter, it just returns all appropriate records, but I can override get_records
and only yield row where timestamp
is after the last time I ran.
How can I get the relevant state in get_records
if this is a child stream?
def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
for record in self.request_records(context):
transformed_record = self.post_process(record, context)
if self.stream_state: # do something here?
yield record
Denis I.
04/17/2023, 10:49 AMreplication_key
and get_starting_timestamp
should help you to skip unnecessary recordsAndy Carter
04/17/2023, 11:35 AMreplication_key = 'timestamp'
is_sorted = False
def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
for record in self.request_records(context):
transformed_record = self.post_process(record, context)
if isoparse(record['timestamp']) >= self.get_starting_timestamp(context):
yield record
I was curious as to whether the state would save the maximum timestamp seen in the data, or the timestamp of the run. It looks like the maximum timestamp of the data.Andy Carter
04/17/2023, 11:35 AM>=
in the comparison, not >
to keep with the expected semantics?Denis I.
04/17/2023, 11:49 AMis_sorted
/ check_sorted
values.
https://github.com/meltano/sdk/blob/2cb74b54f9694b4acc9df5d0e5892616301d2f39/singer_sdk/streams/core.py#L742
https://sdk.meltano.com/en/latest/implementation/state.html#the-impact-of-sorting-on-incremental-syncDenis I.
04/17/2023, 11:52 AM>=
would be safer in case if new records with the same timestamp appear in source later.