I'm confused about how to add a replication key to...
# best-practices
t
I'm confused about how to add a replication key to my tables with tap-snowflake. Code snippet in thread
Copy code
plugins:
  extractors:
  - name: sonar
    inherit_from: tap-snowflake
    variant: meltanolabs
    config:
      account: MXPKNZE-AH71233
      user: ext_3c255d82059647efba26c0996cec90f6
      password: ${SONAR_PASS}
      warehouse: ETL_WAREHOUSE
      database: TENANT_DATA_SNOWPIPE
      schema: PUBLIC
    select:
      - public-accounts_ext_3c255d82059647efba26c0996cec90f6.*
      - public-account_statuses_ext_3c255d82059647efba26c0996cec90f6.*
      - public-account_service_ext_3c255d82059647efba26c0996cec90f6.*
      - public-services_ext_3c255d82059647efba26c0996cec90f6.*
      - public-data_service_details_ext_3c255d82059647efba26c0996cec90f6.*
      - public-addresses_ext_3c255d82059647efba26c0996cec90f6.*
    load_schema: raw_sonar
I'd love to be pointed to documentation about this as well. Is this a lower level singer SDK that all taps should support?
e
You can declare the replication keys with metadata.
t
Thanks @Edgar Ramírez (Arch.dev). I was looking at this page eariler actually. What should I use for "some_stream_id"? The name of the table?
Like this?
Copy code
plugins:
  extractors:
  - name: sonar
    inherit_from: tap-snowflake
    variant: meltanolabs
    config:
      account: MXPKNZE-AH71233
      user: ext_3c255d82059647efba26c0996cec90f6
      password: ${SONAR_PASS}
      warehouse: ETL_WAREHOUSE
      database: TENANT_DATA_SNOWPIPE
      schema: PUBLIC
    metadata:
      public-addresses_ext_3c255d82059647efba26c0996cec90f6:
        replication-method: INCREMENTAL
        replication-key: UPDATED_AT
        UPDATED_AT:
          is-replication-key: true
    select:
      - public-accounts_ext_3c255d82059647efba26c0996cec90f6.*
      - public-account_statuses_ext_3c255d82059647efba26c0996cec90f6.*
      - public-account_service_ext_3c255d82059647efba26c0996cec90f6.*
      - public-services_ext_3c255d82059647efba26c0996cec90f6.*
      - public-data_service_details_ext_3c255d82059647efba26c0996cec90f6.*
      - public-addresses_ext_3c255d82059647efba26c0996cec90f6.*
    load_schema: raw_sonar
This worked:
Copy code
plugins:
  extractors:
  - name: sonar
    inherit_from: tap-snowflake
    variant: meltanolabs
    config:
      account: MXPKNZE-AH71233
      user: ext_3c255d82059647efba26c0996cec90f6
      password: ${SONAR_PASS}
      warehouse: ETL_WAREHOUSE
      database: TENANT_DATA_SNOWPIPE
      schema: PUBLIC
    metadata:
      public-addresses_ext_3c255d82059647efba26c0996cec90f6:
        replication-method: INCREMENTAL
        replication-key: updated_at
        updated_at:
          is-replication-key: true
    select:
      - public-accounts_ext_3c255d82059647efba26c0996cec90f6.*
      - public-account_statuses_ext_3c255d82059647efba26c0996cec90f6.*
      - public-account_service_ext_3c255d82059647efba26c0996cec90f6.*
      - public-services_ext_3c255d82059647efba26c0996cec90f6.*
      - public-data_service_details_ext_3c255d82059647efba26c0996cec90f6.*
      - public-addresses_ext_3c255d82059647efba26c0996cec90f6.*
    load_schema: raw_sonar
Nevermind that's just syncing one record
e
Does it replicate all of your records if you run with
--full-refresh
?
t
I'm dropping the schema before testing each time. It only populates it with one record with a clean DB
Looks like the metadat isn't stored in that schema
Okay running it with full-refresh did insert more records but subsequent runs are still inserting addresses that are already in the table
e
meltano stores the value of the replication key in its state. it doesn't matter if you delete records from the destination because meltano doesnt look at the destination when determining the value of the replication key.
👍 1
t
Right. So after doing a --full-refresh it did get all records. Which is good However, if I run it again after that it will grab everything again. I want it to use the replication key to recognize that it doesn't have to do that. I could be confused. Maybe it's processing each record and seeing that it doesn't need to insert this record. This particular pipeline processes like a million addresses and I'm trying to speed that up by skipping the addresses I already have in my DB
e
Does the state file show the correct value for the replication key?
t
Sorry, what file are we talking about?
t
I tested it today and it's no longer grabbing new addresses. I would imagine it's because the replication field is a date rather than a timestamp so it being a new day it didn't need to grab them again. I'll read over that for future reference
e
This might also be relevant to your question: https://sdk.meltano.com/en/v0.47.4/implementation/at_least_once.html.
t
This is something that confuses me about meltano. I see in the docs on incremental replication that there's an is_sorted flag on the class. Is that just something i can put in my yml? Where in my yml does that need to go? How do I tell the snowflake tap to sort the records when it retrieves them?