Hi, I'm continuing to evaluate Meltano for moving ...
# troubleshooting
p
Hi, I'm continuing to evaluate Meltano for moving data from a Snowflake table to CSVs. I'm using incremental loading, and setting a state to make sure the load doesn't go past a few days. I'm also using stream inline map to trim whitespace from column values. I'm running into issues with both. 1. State does not update after running with
meltano run
. Using
meltano state set ...
to set a start date value works great, but the state is stuck at that timestamp thereafter. Note below the stream state isn't updated after running the tasks.
Copy code
$ meltano --log-level=debug run tap-snowflake-invoices target-csv
...
$ meltano state get dev:tap-snowflake-invoice-to-target-csv
{"singer_state": {"currently_syncing": null, "bookmarks": {"ANALYTICS-ACCOUNTS-INVOICE": {"replication_key": "ORD_DATE", "version": 1670361200499, "replication_key_value": "2022-12-07T00:00:00+00:00"}}}}
2. Can't seem to use stream maps. I'm using the transferwise variant of
tap-snowflake
and
meltano-map-transformer
. Code snippet below.
Copy code
plugins:
  extractors:
  - name: tap-snowflake-invoice
    inherit_from: tap-snowflake
    config:
      tables: ANALYTICS.ACCOUNTS.INVOICE
    select:
    - '*INVOICE.ORD_NUM'
  mappers:
  - name: map-invoice
    inherit_from: meltano-map-transformer
    mappings:
    - name: trim-invoice-columns
      config:
        stream_maps:
          "*INVOICE_V":
            ORD_NUM: ORD_NUM.strip()
Would appreciate any guidance on making stream mapping work. Thanks
Ok, I figured out that. the stream name I was using was wrong. For the transferwise variant plugin, stream id includes the database and schema name. So I updated the stream id. Now I get a problem with
target-csv
and there is no helpful log message. Only:
Copy code
Run invocation could not be completed as block failed: Loader failed
Copy code
plugins:
  mappers:
  - name: map-invoice
    inherit_from: meltano-map-transformer
    mappings:
    - name: trim-invoice-columns
      config:
        stream_maps:
          "ANALYTICS-ACCOUNT-INVOICE":
            ORD_NUM: "ORD_NUM.upper()"
->
Copy code
$ meltano --log-level=debug run tap-snowflake-invoice trim-invoice-columns target-jsonl
Run invocation could not be completed as block failed: Loader failed
No idea.
It's not a deal breaker that stream mapping isn't working, because I can try to work around with DBT - although that's a lot more extra steps. States not updating after incremental load is a lot more worrying.
Tail of logs after a run:
Copy code
2022-12-08T19:17:14.009166Z [info     ] {"type": "RECORD", "stream": "ANALYTICS-ACCOUNT-INVOICE", "record": {...}, "version": 1670361200499, "time_extracted": "2022-12-08T19:16:33.856778Z"} cmd_type=elb consumer=False name=tap-snowflake-invoice producer=True stdio=stdout string_id=tap-snowflake-invoice
2022-12-08T19:17:14.009214Z [info     ] {"type": "STATE", "value": {"currently_syncing": "ANALYTICS-ACCOUNT-INVOICE", "bookmarks": {"ANALYTICS-ACCOUNT-INVOICE": {"replication_key": "ORD_DATE", "version": 1670361200499, "replication_key_value": "2022-12-07T00:00:00+00:00"}}}} cmd_type=elb consumer=False name=tap-snowflake-invoice producer=True stdio=stdout string_id=tap-snowflake-invoice
2022-12-08T19:17:14.009261Z [info     ] {"type": "STATE", "value": {"currently_syncing": "ANALYTICS-ACCOUNT-INVOICE", "bookmarks": {"ANALYTICS-ACCOUNT-INVOICE": {"replication_key": "ORD_DATE", "version": 1670361200499, "replication_key_value": "2022-12-07T00:00:00+00:00"}}}} cmd_type=elb consumer=False name=tap-snowflake-invoice producer=True stdio=stdout string_id=tap-snowflake-invoice
2022-12-08T19:17:14.009307Z [info     ] {"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"ANALYTICS-ACCOUNT-INVOICE": {"replication_key": "ZORD_DATE", "version": 1670361200499, "replication_key_value": "2022-12-07T00:00:00+00:00"}}}} cmd_type=elb consumer=False name=tap-snowflake-invoice producer=True stdio=stdout string_id=tap-snowflake-invoice
2022-12-08T19:17:14.015005Z [debug    ] tail consumer is next block, wrapping up
2022-12-08T19:17:14.022326Z [debug    ] Added to state dev:tap-snowflake-invoice-to-target-jsonl state payload {'singer_state': {'currently_syncing': None, 'bookmarks': {'ANALYTICS-ACCOUNT-INVOICE': {'replication_key': 'ORD_DATE', 'version': 1670361200499, 'replication_key_value': '2022-12-07T00:00:00+00:00'}}}}
2022-12-08T19:17:14.026708Z [info     ] Incremental state has been updated at 2022-12-08 19:17:14.026640.
2022-12-08T19:17:14.026887Z [debug    ] Incremental state: {'currently_syncing': None, 'bookmarks': {'ANALYTICS-ACCOUNT-INVOICE': {'replication_key': 'ORD_DATE', 'version': 1670361200499, 'replication_key_value': '2022-12-07T00:00:00+00:00'}}}
2022-12-08T19:17:14.026964Z [info     ] {"currently_syncing": null, "bookmarks": {"ANALYTICS-ACCOUNT-INVOICE": {"replication_key": "ORD_DATE", "version": 1670361200499, "replication_key_value": "2022-12-07T00:00:00+00:00"}}} cmd_type=elb consumer=True name=target-jsonl producer=False stdio=stdout string_id=target-jsonl
2022-12-08T19:17:14.027974Z [debug    ] Deleted configuration at /Users/pzp/Projects/etl/.meltano/run/tap-snowflake-invoice/tap.f3ef7fb2-2140-44a0-94dc-63e692adb644.config.json
2022-12-08T19:17:14.028516Z [debug    ] Deleted configuration at /Users/pzp/Projects/etl/.meltano/run/target-jsonl/target.c939b4f6-61f2-4f4b-a3f6-d2b370d7b4c6.config.json
2022-12-08T19:17:14.031757Z [info     ] Block run completed.           block_type=ExtractLoadBlocks err=None set_number=0 success=True
Tried again with
target-csv
meltanolabs variant. running against (tap target, no mapper)
Copy code
2022-12-08T19:58:22.406231Z [debug    ] tail consumer is next block, wrapping up
2022-12-08T19:58:22.409479Z [info     ] time=2022-12-08 13:58:22 name=target-csv level=INFO message=Target 'target-csv' completed reading 202096 lines of input (201889 records, 205 state messages). cmd_type=elb consumer=True name=target-csv producer=False stdio=stderr string_id=target-csv
2022-12-08T19:58:22.436529Z [info     ] time=2022-12-08 13:58:22 name=target-csv level=INFO message=Writing to destination file '/Users/pzp/Projects/-etl/ANALYTICS-ACCOUNT_INVOICE.csv'... cmd_type=elb consumer=True name=target-csv producer=False stdio=stderr string_id=target-csv
2022-12-08T19:58:22.436725Z [info     ] time=2022-12-08 13:58:22 name=target-csv level=INFO message=Writing 201889 records to file... cmd_type=elb consumer=True name=target-csv producer=False stdio=stderr string_id=target-csv
2022-12-08T19:58:23.658061Z [info     ] time=2022-12-08 13:58:23 name=target-csv level=INFO message=Emitting completed target state {"currently_syncing": null, "bookmarks": {"ANALYTICS-ACCOUNT_INVOICE": {"replication_key": "ORD_DATE", "version": 1670361200499, "replication_key_value": "2022-12-07T00:00:00+00:00"}}} cmd_type=elb consumer=True name=target-csv producer=False stdio=stderr string_id=target-csv
2022-12-08T19:58:23.665759Z [debug    ] Added to state dev:tap-snowflake-sap-sales-invoice-to-target-csv state payload {'singer_state': {'currently_syncing': None, 'bookmarks': {'ANALYTICS-ACCOUNT_INVOICE': {'replication_key': 'ORD_DATE', 'version': 1670361200499, 'replication_key_value': '2022-12-07T00:00:00+00:00'}}}}
2022-12-08T19:58:23.675579Z [info     ] Incremental state has been updated at 2022-12-08 19:58:23.675478.
2022-12-08T19:58:23.675681Z [debug    ] Incremental state: {'currently_syncing': None, 'bookmarks': {'ANALYTICS-ACCOUNT_INVOICE': {'replication_key': 'ORD_DATE', 'version': 1670361200499, 'replication_key_value': '2022-12-07T00:00:00+00:00'}}}
2022-12-08T19:58:23.675737Z [info     ] {"currently_syncing": null, "bookmarks": {"ANALYTICS-ACCOUNT_INVOICE": {"replication_key": "ORD_DATE", "version": 1670361200499, "replication_key_value": "2022-12-07T00:00:00+00:00"}}} cmd_type=elb consumer=True name=target-csv producer=False stdio=stdout string_id=target-csv
2022-12-08T19:58:23.694980Z [debug    ] Deleted configuration at /Users/pzp/Projects/etl/.meltano/run/tap-snowflake-invoice/tap.8c975671-47b8-41f0-ac14-b3f7c303e5e4.config.json
2022-12-08T19:58:23.695427Z [debug    ] Deleted configuration at /Users/pzp/Projects/etl/.meltano/run/target-csv/target.e23f590f-fa3b-4053-81a6-cfea7fa544b7.config.json
2022-12-08T19:58:23.702665Z [info     ] Block run completed.           block_type=ExtractLoadBlocks err=None set_number=0 success=True
State isn't updating. It's set to the same value that I set using
meltano state set
.
I don't understand. Am I supposed to maintain the state myself? Does setting state with
meltano state set
override everything? Is this an expected result or a bug? Is incremental loading generally poorly supported? Is there a recommended way for loading a table daily/hourly? Could I use
FULL_TABLE
with partial loading? Please advise. Thanks.
p
What is the max value of that replication key that youre using
ORD_DATE
? The singer community uses a convention of selecting
>=
the replication key to ensure at least once delivery (i.e. duplicates are preferred over missed records). I'm wondering if youre replication key is a date and the max is
2022-12-07
then thats the true replication key. Just the first thing that came to mind 🤔
p
Ah.
"2022-12-07T00:00:00+00:00"
so the selection is overlapping.
Table seemingly doesn't have a primary key, and primary key doesn't appear to be in any order, so using ORD_DATE
I have another timestamp column that's more discrete, but meltano/singer didn't like it as a bookmark for some reason
""2022-12-08T072827.030000+0000+0000"
p
Ah yeah thats it then. Hmm what does it say when you use that timestamp?
and just to answer some of your questions:
Am I supposed to maintain the state myself?
No using
run
or
elt
will track the state for you. You might have a bit of an unusual case though, needing to set a state before ever running it. For most users they let it do a full run then incremental picks up from there. Whereas you want to skip the full run.
Does setting state with
meltano state set
override everything?
Nope it takes the most recent state. I think its actually kind of smart where it will merge the last successful state and any partially updated state since then, in the case that your sync failed mid way through but made progress.
p
Re-ran
meltano run tap-snowflake-invoice target-csv
using that updated_timestamp:
Copy code
2022-12-08T20:44:05.731862Z [info     ] {"type": "STATE", "value": {"currently_syncing": "ANALYTICS-ACCOUNT-INVOICE", "bookmarks": {"ANALYTICS-ACCOUNT-INVOICE": {"replication_key": "UPDATED_TIMESTAMP", "version": 1670361200499, "replication_key_value": "2022-12-08T07:28:27.030000+00:00+00:00"}}}} cmd_type=elb consumer=False name=tap-snowflake-invoice producer=True stdio=stdout string_id=tap-snowflake-invoice
2022-12-08T20:44:05.731914Z [info     ] {"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"ANALYTICS-ACCOUNT-INVOICE": {"replication_key": "UPDATED_TIMESTAMP", "version": 1670361200499, "replication_key_value": "2022-12-08T07:28:27.030000+00:00+00:00"}}}} cmd_type=elb consumer=False name=tap-snowflake-invoice producer=True stdio=stdout string_id=tap-snowflake-invoice
2022-12-08T20:44:05.736833Z [debug    ] tail consumer is next block, wrapping up
2022-12-08T20:44:05.740259Z [info     ] time=2022-12-08 14:44:05 name=target-csv level=INFO message=Target 'target-csv' completed reading 401428 lines of input (401021 records, 405 state messages). cmd_type=elb consumer=True name=target-csv producer=False stdio=stderr string_id=target-csv
2022-12-08T20:44:05.752723Z [info     ] time=2022-12-08 14:44:05 name=target-csv level=INFO message=Writing to destination file '/Users/pzp/Projects/etl/ANALYTICS-ACCOUNT-INVOICE.csv'... cmd_type=elb consumer=True name=target-csv producer=False stdio=stderr string_id=target-csv
2022-12-08T20:44:05.752858Z [info     ] time=2022-12-08 14:44:05 name=target-csv level=INFO message=Writing 401021 records to file... cmd_type=elb consumer=True name=target-csv producer=False stdio=stderr string_id=target-csv
2022-12-08T20:44:08.521548Z [info     ] time=2022-12-08 14:44:08 name=target-csv level=INFO message=Emitting completed target state {"currently_syncing": null, "bookmarks": {"ANALYTICS-ACCOUNT-INVOICE": {"replication_key": "UPDATED_TIMESTAMP", "version": 1670361200499, "replication_key_value": "2022-12-08T07:28:27.030000+00:00+00:00"}}} cmd_type=elb consumer=True name=target-csv producer=False stdio=stderr string_id=target-csv
2022-12-08T20:44:08.529435Z [debug    ] Added to state dev:tap-snowflake-invoice-to-target-csv state payload {'singer_state': {'currently_syncing': None, 'bookmarks': {'ANALYTICS-ACCOUNT-INVOICE': {'replication_key': 'UPDATED_TIMESTAMP', 'version': 1670361200499, 'replication_key_value': '2022-12-08T07:28:27.030000+00:00+00:00'}}}}
2022-12-08T20:44:08.537638Z [info     ] Incremental state has been updated at 2022-12-08 20:44:08.537539.
2022-12-08T20:44:08.537737Z [debug    ] Incremental state: {'currently_syncing': None, 'bookmarks': {'ANALYTICS-ACCOUNT-INVOICE': {'replication_key': 'UPDATED_TIMESTAMP', 'version': 1670361200499, 'replication_key_value': '2022-12-08T07:28:27.030000+00:00+00:00'}}}
2022-12-08T20:44:08.537787Z [info     ] {"currently_syncing": null, "bookmarks": {"ANALYTICS-ACCOUNT-INVOICE": {"replication_key": "UPDATED_TIMESTAMP", "version": 1670361200499, "replication_key_value": "2022-12-08T07:28:27.030000+00:00+00:00"}}} cmd_type=elb consumer=True name=target-csv producer=False stdio=stdout string_id=target-csv
2022-12-08T20:44:08.552771Z [debug    ] Deleted configuration at /Users/pzp/Projects/etl/.meltano/run/tap-snowflake-invoice/tap.f2e78b59-b679-4564-9632-a3bc33a1570a.config.json
2022-12-08T20:44:08.553373Z [debug    ] Deleted configuration at /Users/pzp/Projects/etl/.meltano/run/target-csv/target.c9ecb416-ac05-44df-8484-368eaa082742.config.json
2022-12-08T20:44:08.559250Z [info     ] Block run completed.           block_type=ExtractLoadBlocks err=None set_number=0 success=True
So at the end, it finds a new timestamp value
p
So all good, right?
p
On the next run the query looks right:
Copy code
WHERE "UPDATED_TIMESTAMP" >= '2022-12-08T07:28:27.030000+00:00' ORDER BY "UPDATED_TIMESTAMP" ASC
that's a batch datetime, so it doesn't work well as a bookmark.
p
Ok so if I understand correctly: using your date replication key works fine but seemed like it wasnt working because of the
>=
logic, you could use that and get duplicates for overlapping dates. Using the precise
UPDATED_TIMESTAMP
also works but is less ideal because the actual time value isnt reliable. Did I have that right?
Then its just a trade off of what you think is best for your use case
p
Basically, the
>=
logic is tripping this up, and I need to introduce a dedupe process.
does singer/meltano support compound primary keys?
p
I actually opened an issue in the SDK around this https://github.com/meltano/sdk/issues/1200. Theres a few other native dedup options that are being considered too
does singer/meltano support compound primary keys? (edited)
I'm not sure but my thought is no, at least not in any consistent way. @aaronsteers @edgar_ramirez_mondragon do one of you know?
p
how do i set primary keys?
key_properties
?
table_key_properties
?
view_key_properties
? meltano docs doesn't discuss primary keys. the meltano hub spec does
do i set it on config? metadata?
e
Both the Singer spec and Meltano’s singer sdk support compound PKs
do i set it on config? metadata?
metadata is where you want to set it
should be
table-key-properties
p
Copy code
extractors:
  - name: tap-snowflake--invoice
    inherit_from: tap-snowflake
    metadata:
      table-key-properties: ["BILL_NUM", "BILL_ITEM", "BILL_DATE"]
So this should work? But it's up to the target to respect that and dupe it.
its actually a view
Copy code
metadata:
      view-key-properties: ["BILL_NUM", "BILL_ITEM", "BILL_DATE"]
      "*":
        replication-method: INCREMENTAL
        replication-key: ORD_DATE
        ORD_DATE:
          is-replication-key: true
thanks y'all for help w/ state. any advise on stream maps? what is the right way to configure?
Copy code
plugins:
  extractors:
  - name: tap-snowflake-invoice
    config:
      dbname: analytics
      schema: accounts
      tables: ANALYTICS.ACCOUNTS.INVOICE_V
    select:
    - '*INVOICE_V.ORD_NUM'
  mappers:
  - name: map-invoice
    inherit_from: meltano-map-transformer
    mappings:
    - name: trim-invoice-columns
      config:
        stream_maps:
          "ANALYTICS-ACCOUNTS-INVOICE_V":
            ORD_NUM: "ORD_NUM.strip()"
I get this error:
Copy code
singer_sdk.helpers._simpleeval.AttributeDoesNotExist: ('strip', "ORD_NUM.strip()") 
singer_sdk.exceptions.MapExpressionError: Failed to evaluate simpleeval expressions ORD_NUM.strip(
Nvm, figured it out.
str(ORD_NUM).strip()
. PEKBAC