Hi everyone. Is there anything else except overwri...
# singer-tap-development
t
Hi everyone. Is there anything else except overwriting get_records method and declare state capability to make my custom tap handle incremental replication? if I want to create one based on RestAPI (path, jsonpath), what else I should consider "important"? Is there any similiar tap already implemented? I have seen a few already however there were no explicit info if such tap handle incremental replication and how exactly.
a
Hi everyone.
Hi! waving wave 🙂
Is there anything else except overwriting get_records method and declare state capability to make my custom tap handle incremental replication?
Nope. Just declare
replication_key
to be the column you want for incremental key in the
RESTStream
class. The SDK base methods will watch for max values there, and will handle bookmarking/state automatically.
For the most efficient implementation in terms of handling interrupts, you can optioanlly declare
is_sorted = True
if you trust your API to provide records in pre-sorted order.
t
Hi @aaronsteers, thanks but how it works exactly then? Does it fetch all the data from API endpoint and then filter these (in memory?) based on (for example
replication_key
= "updated_at") or, if it tries to fetch only data greater than
last_max_replication_key_value
how does the
RESTStream
SDK base methods know how to build a request to API to fetch data "truly" incrementally only? What conditions have to be met by API endpoint itself to have this incremental work as expected with filtering data on endpoint side? I'm asking this because I have used The SDK base methods, set
replication_key
to updated_at timestamp, and logs tell me that when I executed elt the first time it inserted 50 rows into target table (redshift), but then I have executed this second time (without any changes on source side) and it updated 50 rows again, when I would expect to see 0 (zero) rows either inserted or updated, because nothing changed on source side, especially updated_at did not, so why it is fething 50 rows again instead of 0?
a
Thanks, @tomk for the additional background. I think I might understand the problem now. While the state messages and bookmarks will automatically increment themselves based on the whatever column you specify as
replication_key
, you'll also have to leverage that value in a call to get_url_params() or similar. For example, if your API expects a
since
URL param, you can use get_starting_timesamp() to call back whatever latest bookmark or
start_date
config and then pass that along to your REST API. If your API needs the start value in a different place (like the http_header), then you could modify the other part of the request in a similar way.
This one is a bit more complex than most use cases, but the code sample may be helpful: https://github.com/MeltanoLabs/tap-github/blob/5a3ec399feec9253865e467b74b65a4991af7279/tap_github/client.py#L162-L169
t
@aaronsteers Thanks a lot for the explaination. I have done that and overwritten get_url_params() to add param to the request, but I have hit one issue there,
self.get_starting_timestamp(context)
returns me None for each stream. Any idea what might be wrong? Am I missing something in the stream class definition?
a
Thanks for the update, @tomk. If
get_starting_timestamp()
is returning null, that would indicate either that
replication_key
is unset, or else
replication_key
is not known to be a
datetime
-like data type.
1. If the column is a datetime/timestamp type, can you check if you've typed it as such in the class's
schema
declaration? 2. If the column is not a datetime/timestamp type, can you try the sibling method get_starting_replication_key_value()?
t
Thanks @aaronsteers, I have my updated_At date declared as th.Property("updated_at", th.DateTimeType), so I assume it is good. What I have noticed is that my context is None, which is weird. I'm checking but for now Idk why. These are child streams, I have my get_child_context method overwritten as well and returning dict from record.
a
What I have noticed is that my context is None, which is weird.
This shouldn't be the case for child streams. Child streams should always have a non-null context AFAIK.
If you're still blocked here, can you send a link or snippet showing
get_child_context()
? That one has a variable return type, and it could be your code is triggering the wrong case.
t
Hi @aaronsteers actually I am, here's the snippet:
def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
company_id = self.config.get('company_id') if "company_id" in self.config else None
return {
"property_id": record["id"],
"updated_at": record.get("updated_at", None),
"results": record.get("results", []),
"company_id": company_id
}
a
This looks fine to me. No idea why child streams wouldn't start off with this context 🤔
Random things to check - can you verify that the child streams have their parent stream classes noted in
parent_stream_type
, and that there aren't any loops in the dependencies of parent-child relationships? https://sdk.meltano.com/en/latest/classes/singer_sdk.Stream.html#singer_sdk.Stream.parent_stream_type
t
All of them have 1 parent noted and there are no loops but 1 "fork". Regarding class/parent design, I have 1
ClientClass
inheriting from
RESTStream
class, then 1
StreamParentClass
inheriting from
ClientClass
(
StreamParentClass(ClientClass)
) , then 1
ResultStream
class inheriting from
ClientClass
(
ResultStream(ClientClass)
) but
parent_stream_type = StreamParentClass
then 5
Child2TypeClass
inheriting from
ResultStream
class
Child2TypeClass(ResultStream)
but
parent_stream_type = StreamParrentClass
e
Trying to guess what could be wrong here. If the child stream’s context is null, perhaps you defined the
partitions
property to return an empty list?
t
@edgar_ramirez_mondragon I haven't defined partitions at all
e
Ok thanks for confirming. So, which methods are you overriding in the parent and child stream classes? It could be that you’re not passing
context
to a
super()
call or elsewhere and so it’s defaulting to
None
t
Good point. The only method involving super() is my get_records in
ResultStream
: like this:
def get_records(self, context: Optional[dict] = None) -> Iterable[Dict[str, Any]]:
"""Return a generator of row-type dictionary objects.
Each row emitted should be a dictionary of property names to their values.
"""
if context and self.name not in context.get("results"):
self.logger.debug(f"No results detected. Skipping '{self.name}' sync.")
return []
return super().get_records(context)