:wave: Hey y’all, I am working on one of our custo...
# singer-tap-development
j
👋 Hey y’all, I am working on one of our custom connectors and running into a problem with incremental replication. I have set
replication_key
in our config and meltano does pick up on the last state and when it hits the stream for incremental replication, it does log
Beginning incremental sync of <stream_name>
, but then it does a full replication. Is that expected behavior?
âś… 1
e
Hi @john_mizerany! Are you using Stream.get_starting_timestamp or Stream.get_starting_replication_key_value in your custom connector to make a request that filters the records?
j
The API we are using does not have filtering, so I was following this: https://sdk.meltano.com/en/latest/incremental_replication.html
I could be wrong, but reading through some old slack messages (I can’t find them anymore so not able to link them), but it was saying that if you set
replication_key
meltano will automatically try to sync based off that key you provided, but will it stop the sync if it matches the last in the state?
Or do we have to develop this logic in the tap @Edgar RamĂ­rez (Arch.dev)
e
Yeah, the code sample in https://sdk.meltano.com/en/latest/incremental_replication.html uses the example of an API that accepts an
after
query parameter to filter the response. Unfortunately, the singer sdk does not support incremental replication for APIs that don't support native filtering at the moment, see https://github.com/meltano/sdk/issues/226.
j
Ahhhh okay gotcha that definitely makes sense
I think what I may do is compare the records in the request with the last state captured, do y’all have an example of implementing that in a tap with the cookie cutter based template?
e
Something like that was implemented in https://github.com/edgarrmondragon/tap-google-play/blob/ec95f6fea5c92e4f60bbed84f210686237d9166f/tap_google_play/streams.py#L58-L65. You may be able to keep things simple by doing something like:
Copy code
class MyStream(RESTStream):
  def get_records(self, context):
    start_date = self.get_starting_timestamp(context)
    for record in super().get_records():
      if <your condition>:
        break
      yield record
j
Ahhh that’s perfect thank you! I was also looking at https://github.com/MeltanoLabs/tap-dbt/blob/main/tap_dbt/streams.py#L118-L154
🙌 1