Hi All, I hope this message finds you well. I'm r...
# troubleshooting
d
Hi All, I hope this message finds you well. I'm reaching out to seek guidance on implementing incremental data loading from a custom tap API into a PostgreSQL database using Meltano. I'm relatively new to Meltano and have been encountering an issue where, despite being able to save the state file in the local filesystem successfully, subsequent runs of
meltano run
or
meltano el
result in complete data reloading. This, in turn, leads to duplicates in the database. My stream data is not sorted (is_sorted = False), but want to load incremental data per batch (from previous batch's highest watermark) Here's a snippet of the state file content:
Copy code
{
  "completed": {
    "singer_state": {
      "bookmarks": {
        "book": {
          "replication_key": "id",
          "replication_key_value": 112233
        }
      }
    }
  },
  "partial": {}
}
Below is the relevant code section:
Copy code
def get_url_params(self, context, next_page_token):
        params = {}

        starting_id: int = self.get_starting_replication_key_value(context=context)
        if starting_id:
            params["after"] = starting_id

        <http://self.logger.info|self.logger.info>("QUERY PARAMS: %s", params)

        return params
Also made
Copy code
replication_key = "id"
is_sorted = False
I would greatly appreciate any assistance or guidance in resolving this issue. If you have reference code or any insights on how to properly implement incremental data loading with Meltano, it would be immensely helpful.
1
e
Hey @Debashis Adak! What does your
meltano.yml
look like, specifically for your custom tap?
d
@Edgar Ramírez (Arch.dev) Please find the custom tap section in meltano.yml
Copy code
extractors:
  - name: tap-efservice
    namespace: tap_efservice
    pip_url: -e tap-efservice/
    executable: tap-efservice
    metadata:
      '*':
        replication-method: INCREMENTAL
        replication-key: id
e
ok, so may wanna add `capabilities`:
Copy code
extractors:
  - name: tap-efservice
    namespace: tap_efservice
    pip_url: -e tap-efservice/
    executable: tap-efservice
    capabilities:  # <- this
    - state
    - catalog
    - discover
    - about
    - stream-maps
    metadata:
      '*':
        replication-method: INCREMENTAL
        replication-key: id
d
@Edgar Ramírez (Arch.dev) thanks for the suggestion. Now, I am able to see, it is reading from local Filesystem & query param is passed. But still it is generating duplicate data. Not sure. 😞
Copy code
Reading state from Local Filesystem
QUERY PARAMS: {'after': 14104}
e
Did you declare
primary_keys
in your stream class?
d
Copy code
primary_key = None
@Edgar Ramírez (Arch.dev)
e
Oh you need to set that to a field name to allow the target to upsert incoming records
Otherwise it'll just append new records
d
@Edgar Ramírez (Arch.dev) Thanks for your input. After declaring primary_keys, was able to solve the problem. 🙂 One quick question around upsert. If my target is s3 file (eg: parquet/csv etc), how upsert is taken care in that scenario. Can we load incremental data in that case?
e
Different targets might handle it differently but for files there's usually two options since upserting is not really feasible: 1. append to the same file 2. write a new file with only the new data, adding a timestamp to the filename The latter is usually safer.