guys, what I actually need to implement by myself ...
# getting-started
t
guys, what I actually need to implement by myself to have incremental replication mode? I have a sort of tap-mongodb with a few streams in it and target-redshift. In meltano.yml I have declared tap capabilities to have state, catalog and discover. I have also replication-method: INCREMENTAL and replication-key: updated_at. In python script with streams I have declared replication_key and replication_key_value (datetime) and this replication_key is declared in schema
Copy code
from singer_sdk import typing as th
...
primary_keys = ["id"]
replication_key = "updated_at"
replication_key_value =  '1990-01-01T00:00:00Z' 
schema = th.PropertiesList(
	th.Property("id", th.StringType),
	th.Property("updated_at", th.DateTimeType)
).to_dict()
I use meltano elt to run full load and proper state is generated (visible in meltano state list...). In logs I can see that a number of rows is inserted into target tables, let's say 10 for example, which is OK. Then I use meltano elt --state-id to use previous state (state.json) and from logs it seems it is properly picked up (lack of warning "No state was found, complete import"). In logs I can also see that again 10 rows is processed but this time updated (or 0 inserted and 0 updated when I use skip-updates for target-redshift), however each time all the rows from tap are processed instead of only "updated ones". What else I can check or need to implement to make it work as expected, which is to grab only updated records from source (tap) ? Also, each time all rows are updated even if there were no changes in the source, so each time it is actually full load even if meltano detects incremental mode (updates instead of inserts)