tomk
12/23/2022, 9:16 AMfrom singer_sdk import typing as th
...
primary_keys = ["id"]
replication_key = "updated_at"
replication_key_value = '1990-01-01T00:00:00Z'
schema = th.PropertiesList(
th.Property("id", th.StringType),
th.Property("updated_at", th.DateTimeType)
).to_dict()
I use meltano elt to run full load and proper state is generated (visible in meltano state list...). In logs I can see that a number of rows is inserted into target tables, let's say 10 for example, which is OK. Then I use meltano elt --state-id to use previous state (state.json) and from logs it seems it is properly picked up (lack of warning "No state was found, complete import"). In logs I can also see that again 10 rows is processed but this time updated (or 0 inserted and 0 updated when I use skip-updates for target-redshift), however each time all the rows from tap are processed instead of only "updated ones". What else I can check or need to implement to make it work as expected, which is to grab only updated records from source (tap) ? Also, each time all rows are updated even if there were no changes in the source, so each time it is actually full load even if meltano detects incremental mode (updates instead of inserts)