tomk
01/10/2023, 8:12 PMaaronsteers
01/13/2023, 3:27 PMHi everyone.Hi! waving wave 🙂
Is there anything else except overwriting get_records method and declare state capability to make my custom tap handle incremental replication?Nope. Just declare
replication_key
to be the column you want for incremental key in the RESTStream
class. The SDK base methods will watch for max values there, and will handle bookmarking/state automatically.aaronsteers
01/13/2023, 3:28 PMis_sorted = True
if you trust your API to provide records in pre-sorted order.tomk
01/16/2023, 7:08 AMreplication_key
= "updated_at") or, if it tries to fetch only data greater than last_max_replication_key_value
how does the RESTStream
SDK base methods know how to build a request to API to fetch data "truly" incrementally only? What conditions have to be met by API endpoint itself to have this incremental work as expected with filtering data on endpoint side? I'm asking this because I have used The SDK base methods, set replication_key
to updated_at timestamp, and logs tell me that when I executed elt the first time it inserted 50 rows into target table (redshift), but then I have executed this second time (without any changes on source side) and it updated 50 rows again, when I would expect to see 0 (zero) rows either inserted or updated, because nothing changed on source side, especially updated_at did not, so why it is fething 50 rows again instead of 0?aaronsteers
01/17/2023, 6:33 PMreplication_key
, you'll also have to leverage that value in a call to get_url_params() or similar. For example, if your API expects a since
URL param, you can use get_starting_timesamp() to call back whatever latest bookmark or start_date
config and then pass that along to your REST API. If your API needs the start value in a different place (like the http_header), then you could modify the other part of the request in a similar way.aaronsteers
01/17/2023, 6:37 PMtomk
01/20/2023, 1:33 PMself.get_starting_timestamp(context)
returns me None for each stream. Any idea what might be wrong? Am I missing something in the stream class definition?aaronsteers
01/20/2023, 5:59 PMget_starting_timestamp()
is returning null, that would indicate either that replication_key
is unset, or else replication_key
is not known to be a datetime
-like data type.aaronsteers
01/20/2023, 6:01 PMschema
declaration?
2. If the column is not a datetime/timestamp type, can you try the sibling method get_starting_replication_key_value()?tomk
01/23/2023, 9:51 AMaaronsteers
01/25/2023, 12:50 AMWhat I have noticed is that my context is None, which is weird.This shouldn't be the case for child streams. Child streams should always have a non-null context AFAIK.
aaronsteers
01/25/2023, 12:51 AMget_child_context()
? That one has a variable return type, and it could be your code is triggering the wrong case.tomk
01/25/2023, 6:47 PMdef get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
company_id = self.config.get('company_id') if "company_id" in self.config else None
return {
"property_id": record["id"],
"updated_at": record.get("updated_at", None),
"results": record.get("results", []),
"company_id": company_id
}
aaronsteers
01/25/2023, 7:08 PMaaronsteers
01/25/2023, 7:08 PMparent_stream_type
, and that there aren't any loops in the dependencies of parent-child relationships?
https://sdk.meltano.com/en/latest/classes/singer_sdk.Stream.html#singer_sdk.Stream.parent_stream_typetomk
01/26/2023, 1:04 AMClientClass
inheriting from RESTStream
class, then 1 StreamParentClass
inheriting from ClientClass
( StreamParentClass(ClientClass)
) , then 1 ResultStream
class inheriting from ClientClass
( ResultStream(ClientClass)
) but parent_stream_type = StreamParentClass
then 5 Child2TypeClass
inheriting from ResultStream
class Child2TypeClass(ResultStream)
but parent_stream_type = StreamParrentClass
edgar_ramirez_mondragon
01/26/2023, 4:56 PMpartitions
property to return an empty list?tomk
01/26/2023, 6:09 PMedgar_ramirez_mondragon
01/26/2023, 6:13 PMcontext
to a super()
call or elsewhere and so it’s defaulting to None
tomk
01/26/2023, 7:16 PMResultStream
: like this:
def get_records(self, context: Optional[dict] = None) -> Iterable[Dict[str, Any]]:
"""Return a generator of row-type dictionary objects.
Each row emitted should be a dictionary of property names to their values.
"""
if context and self.name not in context.get("results"):
self.logger.debug(f"No results detected. Skipping '{self.name}' sync.")
return []
return super().get_records(context)