hi, i am trying to use transferwise snowflake tap ...
# troubleshooting
s
hi, i am trying to use transferwise snowflake tap https://github.com/transferwise/pipelinewise-tap-snowflake
Copy code
- name: pipelinewise-tap-snowflake
    #variant: transferwise
    namespace:  pipelinewise_tap_snowflake
    pip_url: pipelinewise-tap-snowflake 
    executable: tap-snowflake
    config: ....
couple of questions. 1. wondering why
variant: transferwise
doesn't work for snowflake tap like it does for loader 2. when i run
meltano elt pipelinewise-tap-snowflake target-postgres
it doesn't do anything saying there are no properties file. i generated properties.json but not sure how to give that to meltano.
log
Copy code
meltano                    | Running extract & load...
pipelinewise-tap-snowflake | /Users//code/meltano//.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/pandas/compat/__init__.py:120: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
pipelinewise-tap-snowflake |   warnings.warn(msg)
pipelinewise-tap-snowflake | time=2021-05-20 08:58:45 name=tap_snowflake level=INFO message=No properties were selected
meltano                    | Extract & load complete!
meltano                    | Transformation skipped.
t
wondering why  
variant: transferwise
  doesn’t work for snowflake tap like it does for loader
tap-snowflake isn’t discoverable out of the box like many other taps, so you need to treat it like a custom extractor.
s
ah yea. I guess my question was why some taps ect are supported officially vs not.
does it mean discoverable plugins are vetted by the meltano team ?
t
Discoverable means Meltano is aware of all the tap settings available for configuration and how to map it to things like schema, username etc. It implies a deeper level of curation and integration https://meltano.com/docs/plugins.html#discoverable-plugins
s
ah thats awesome. Thank you!
now i just need to figure out how to pass properties.json to meltano elt run for the snowflake-tap
t
you should be able to do
--properties properties.json
properties
is the old method for taps.
--catalog
is the newer way, but some taps still support the old way
but looking at that tap it looks like it’s
--properties
s
oh yea i tried that
Copy code
meltano elt  pipelinewise-tap-snowflake target-postgres  --properties properties.json 
Error: no such option: --properties
t
try
meltano elt  pipelinewise-tap-snowflake --properties properties.json target-postgres
b/c you had it after the
target-postgres
it’s trying to pass it to the target and not the tap
s
Copy code
meltano elt  pipelinewise-tap-snowflake --properties properties.json target-postgres
Usage: meltano elt [OPTIONS] EXTRACTOR LOADER
Try 'meltano elt --help' for help.

Error: no such option: --properties
Copy code
capabilities:
        - properties
        - state
    catalog: properties.json
trying this in my config now
t
are you able to run
meltano invoke pipelinewise-tap-snowflake
?
s
yes
Copy code
meltano invoke pipelinewise-tap-snowflake
[2021-05-20 10:08:19,111] [39978|MainThread|meltano.core.plugin.singer.tap] [INFO] Found catalog in /Users/code/meltano/properties.json
/Users/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/pandas/compat/__init__.py:120: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
  warnings.warn(msg)
time=2021-05-20 10:08:20 name=tap_snowflake level=INFO message=Getting column information for snowflake_sample_data.tpcds_sf10tcl.CALL_CENTER...
{"type": "STATE", "value": {"currently_syncing": null}}
t
can you try with the --properties command too?/
s
sure thing
Copy code
meltano invoke pipelinewise-tap-snowflake  --properties properties.json 
[2021-05-20 10:11:08,324] [40078|MainThread|meltano.core.plugin.singer.tap] [INFO] Found catalog in /Users/code/meltano/attentive/properties.json
{"type": "STATE", "value": {"currently_syncing": null}}
t
so that seems happy 🤔
I may need to loop in @douwe_maan or @aaronsteers as backup
s
yea actually can now run
Copy code
meltano elt  pipelinewise-tap-snowflake target-postgres
meltano                    | Running extract & load...
pipelinewise-tap-snowflake |   warnings.warn(msg)
pipelinewise-tap-snowflake | time=2021-05-20 10:12:16 name=tap_snowflake level=INFO message=Getting column information for snowflake_sample_data.tpcds_sf10tcl.CALL_CENTER...
meltano                    | Incremental state has been updated at 2021-05-20 15:12:18.213661.
meltano                    | Extract & load complete!
meltano                    | Transformation skipped.
t
👀 what changed?
s
but nothing shows up on postgres side because this tap seems to expect source to target mapping https://transferwise.github.io/pipelinewise/connectors/taps/snowflake.html
re: change i added this to meltano.yml
Copy code
capabilities:
        - properties
    catalog: properties.json
for that tap
t
ah nice
s
not quite sure how to pass in
schemas
to that tap via meltano
t
s
hmm..yea but i think thats schema for the catalog
Copy code
"streams": [
    {
      "tap_stream_id": "SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER",
      "table_name": "CALL_CENTER",
      "schema": {
        "properties": {
          "CC_CALL_CENTER_SK": {
            "inclusion": "available",
            "type": [
              "null",
              "number"
            ]
          },
i already have that in my properties.file
maybe i'll be better off writing my own tap for snowflake via meltano sdk
a
Hi, @surya_g. Sorry I'm late to this thread but I'd love to help if I can. I'm surprised that the existing Snowflake tap is not discovering the schema for you. What happens if you add "discover" and "catalog" to the capabilities list and try to let Snowflake tap do the discovery for you?
s
oh let me try that
Copy code
- name: pipelinewise-tap-snowflake
    namespace:  pipelinewise_tap_snowflake
    pip_url: pipelinewise-tap-snowflake 
    executable: tap-snowflake
    capabilities:
        - properties
        - catalog
        - discover
    #catalog: properties.json
    select:
      - SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-*
    config:
      "account": "mn55477.us-east-2.aws" ....
i have this in my yml i am running this
Copy code
meltano elt  pipelinewise-tap-snowflake target-postgres
meltano                    | Running extract & load...
pipelinewise-tap-snowflake |   warnings.warn(msg)
pipelinewise-tap-snowflake | time=2021-05-20 10:43:47 name=tap_snowflake level=INFO message=Getting column information for snowflake_sample_data.tpcds_sf10tcl.CALL_CENTER...
pipelinewise-tap-snowflake | time=2021-05-20 10:43:49 name=tap_snowflake level=WARNING message=There are no columns selected for stream SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER, skipping it.
meltano                    | Incremental state has been updated at 2021-05-20 15:43:49.275806.
meltano                    | Extract & load complete!
meltano                    | Transformation skipped.
doesn't seem to sync anything
sorry i am totally new to singer/meltano . Lmk if i am missing something totally obvious
pipelinewise wants this in their yaml so i am guessing something equivalent needs to be passed in to meltano
Copy code
schemas:

  - source_schema: "SCHEMA_1"          # Source schema (aka. database) in Snowflake with tables
    target_schema: "REPL_SCHEMA_1"     # Target schema in the destination Data Warehouse

    # List of tables to replicate from Snowflake to a destination
    #
    # Please check the Replication Strategies section in the documentation to understand the differences.
    tables:
      - table_name: "TABLE_ONE"
        replication_method: "INCREMENTAL"   # One of INCREMENTAL or FULL_TABLE
        replication_key: "last_update"      # Important: Incremental load always needs replication key

        # OPTIONAL: Load time transformations
        #transformations:
        #  - column: "last_name"            # Column to transform
        #    type: "SET-NULL"               # Transformation type

      # You can add as many tables as you need...
      - table_name: "TABLE_TWO"
        replication_method: "FULL_TABLE"
a
I was looking at that also but those look like table-level overrides. The fact that they aren’t declaring columns (only optional overrides) makes me think it actually does hava catalog discovery.
Copy code
level=INFO message=Getting column information for snowflake_sample_data.tpcds_sf10tcl.CALL_CENTER...
pipelinewise-tap-snowflake | time=2021-05-20 10:43:49 name=tap_snowflake level=WARNING message=There are no columns selected for stream SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER, skipping it.
Interesting that it finds the table but not ‘selected’ columns. Can you try this update:
Copy code
select:
      - SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER.*
s
yes for sure
Copy code
meltano elt  pipelinewise-tap-snowflake target-postgres
meltano                    | Running extract & load...
pipelinewise-tap-snowflake | /Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/pandas/compat/__init__.py:120: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
pipelinewise-tap-snowflake |   warnings.warn(msg)
pipelinewise-tap-snowflake | time=2021-05-20 10:56:37 name=tap_snowflake level=INFO message=Getting column information for snowflake_sample_data.tpcds_sf10tcl.CALL_CENTER...
pipelinewise-tap-snowflake | time=2021-05-20 10:56:38 name=tap_snowflake level=INFO message=Beginning to sync SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL.CALL_CENTER
pipelinewise-tap-snowflake | time=2021-05-20 10:56:38 name=singer level=INFO message=METRIC: {"type": "timer", "metric": "job_duration", "value": 0.00010180473327636719, "tags": {"job_type": "sync_table", "database": "SNOWFLAKE_SAMPLE_DATA", "table": "CALL_CENTER", "status": "failed"}}
pipelinewise-tap-snowflake | time=2021-05-20 10:56:38 name=tap_snowflake level=CRITICAL message=Only INCREMENTAL and FULL TABLE replication methods are supported
pipelinewise-tap-snowflake | Traceback (most recent call last):
pipelinewise-tap-snowflake |   File "/Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/bin/tap-snowflake", line 8, in <module>
pipelinewise-tap-snowflake |     sys.exit(main())
pipelinewise-tap-snowflake |   File "/Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/tap_snowflake/__init__.py", line 500, in main
pipelinewise-tap-snowflake |     raise exc
pipelinewise-tap-snowflake |   File "/Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/tap_snowflake/__init__.py", line 497, in main
pipelinewise-tap-snowflake |     main_impl()
pipelinewise-tap-snowflake |   File "/Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/tap_snowflake/__init__.py", line 486, in main_impl
pipelinewise-tap-snowflake |     do_sync(snowflake_conn, args.config, args.catalog, state)
pipelinewise-tap-snowflake |   File "/Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/tap_snowflake/__init__.py", line 474, in do_sync
pipelinewise-tap-snowflake |     sync_streams(snowflake_conn, catalog, state)
pipelinewise-tap-snowflake |   File "/Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/tap_snowflake/__init__.py", line 466, in sync_streams
pipelinewise-tap-snowflake |     raise Exception('Only INCREMENTAL and FULL TABLE replication methods are supported')
pipelinewise-tap-snowflake | Exception: Only INCREMENTAL and FULL TABLE replication methods are supported
meltano                    | Incremental state has been updated at 2021-05-20 15:56:38.879309.
meltano                    | Extraction failed (1): Exception: Only INCREMENTAL and FULL TABLE replication methods are supported
meltano                    | ELT could not be completed: Extractor failed
ELT could not be completed: Extractor failed
a
That might be progress. Let's see if we can explicitly set replication method to full table.
d
(I haven't read the whole thread, but a note on
--properties
vs `--catalog`: if the tap only supports
--properties
, it needs only the
properties
capability, but
meltano elt
will still take the catalog in
--catalog
, and pass it to the tap using the appropriate flag
a
Thanks, @douwe_maan. I think we just need to override the replication-method at this point for the specific table. @surya_g - Can you try setting your replication-method for this table to “FULL_TABLE” using this sample: https://meltano.com/docs/plugins.html#extractors
@surya_g - One more thing which might be helpful (for debugging, or for future reference), is that you can run
meltano invoke --dump=catalog <plugin>
as shown here: https://meltano.com/docs/command-line-interface.html#how-to-use-7 This will give you the discovered catalog file text you can optionally modify manually and/or pass back in if you are not able to config or customize using meltano.yml.
s
Copy code
{
  "streams": [
    {
      "tap_stream_id": "SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER",
      "table_name": "CALL_CENTER",
      "schema": {
        "properties": {
          "CC_CALL_CENTER_SK": {
            "inclusion": "available",
            "type": [
              "null",
              "number"
            ]
          },
catalog give me this
a
Yep! That looks right. And I can see it’s discovering columns.
s
trying metadata addition now
i have this my yaml
Copy code
metadata:
        SNOWFLAKE_SAMPLE_DATA_TPCDS_SF10TCL_CALL_CENTER:
          replication-method: FULL_TABLE
    capabilities:
        - properties
        - catalog
        - discover
    select:
      - SNOWFLAKE_SAMPLE_DATA_TPCDS_SF10TCL_*
Copy code
meltano elt  pipelinewise-tap-snowflake target-postgres
meltano                    | Running extract & load...
pipelinewise-tap-snowflake | /Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/pandas/compat/__init__.py:120: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
pipelinewise-tap-snowflake |   warnings.warn(msg)
pipelinewise-tap-snowflake | time=2021-05-20 11:23:14 name=tap_snowflake level=INFO message=Getting column information for snowflake_sample_data.tpcds_sf10tcl.CALL_CENTER...
meltano                    | Incremental state has been updated at 2021-05-20 16:23:16.270232.
meltano                    | Extract & load complete!
meltano                    | Transformation skipped.
didn't sync anything
d
Can you try changing your
select
to
SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-*.*
?
s
ooh that worked @douwe_maan @aaronsteers
```meltano | Running extract & load... pipelinewise-tap-snowflake | /Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/pandas/compat/__init__.py120 UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError. pipelinewise-tap-snowflake | warnings.warn(msg) pipelinewise-tap-snowflake | time=2021-05-20 113102 name=tap_snowflake level=INFO message=Getting column information for snowflake_sample_data.tpcds_sf10tcl.CALL_CENTER... pipelinewise-tap-snowflake | time=2021-05-20 113104 name=tap_snowflake level=INFO message=Beginning to sync SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL.CALL_CENTER pipelinewise-tap-snowflake | time=2021-05-20 113104 name=tap_snowflake level=INFO message=Stream SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER is using full table replication target-postgres | INFO Table 'SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER' does not exist. Creating... CREATE TABLE pipelinewise_tap_snowflake.SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER ("cc_call_center_id" character varying, "cc_call_center_sk" numeric, "cc_city" character varying, "cc_class" character varying, "cc_closed_date_sk" numeric, "cc_company" numeric, "cc_company_name" character varying, "cc_country" character varying, "cc_county" character varying, "cc_division" numeric, "cc_division_name" character varying, "cc_employees" numeric, "cc_gmt_offset" numeric, "cc_hours" character varying, "cc_manager" character varying, "cc_market_manager" character varying, "cc_mkt_class" character varying, "cc_mkt_desc" character varying, "cc_mkt_id" numeric, "cc_name" character varying, "cc_open_date_sk" numeric, "cc_rec_end_date" timestamp without time zone, "cc_rec_start_date" timestamp without time zone, "cc_sq_ft" numeric, "cc_state" character varying, "cc_street_name" character varying, "cc_street_number" character varying, "cc_street_type" character varying, "cc_suite_number" character varying, "cc_tax_percentage" numeric, "cc_zip" character varying) target-postgres | Traceback (most recent call last): target-postgres | File "/Users/sgaddipati/code/meltano/attentive/.meltano/loaders/target-postgres/venv/bin/target-postgres", line 33, in <module> target-postgres | sys.exit(load_entry_point('target-postgres===meltano.1.1.11', 'console_scripts', 'target-postgres')()) target-postgres | File "/Users/sgaddipati/code/meltano/attentive/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/target_postgres/__init__.py", line 191, in main target-postgres | state = persist_lines(config, input) target-postgres | File "/Users/sgaddipati/code/meltano/attentive/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/target_postgres/__init__.py", line 155, in persist_lines target-postgres | stream_to_sync[stream].sync_table() target-postgres | File "/Users/sgaddipati/code/meltano/attentive/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/target_postgres/db_sync.py", line 373, in sync_table target-postgres | self.query(query) target-postgres | File "/Users/sgaddipati/code/meltano/attentive/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/target_postgres/db_sync.py", line 145, in query target-postgres | cur.execute( target-postgres | File "/Users/sgaddipati/code/meltano/attentive/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/psycopg2/extras.py", line 143, in execute target-postgres | return super(DictCursor, self).execute(query, vars) target-postgres | psycopg2.errors.SyntaxError: syntax error at or near "-" target-postgres | LINE 1: ...E pipelinewise_tap_snowflake.SNOWFLAKE_SAMPLE_DATA-TPCDS_SF1... target-postg…
i think i can fix that
a
Fantastic, @surya_g! Yes, that one is because the default tap_stream_id (and therefor the default stream name has a dash in it. If you add an explicit stream name or otherwise override that behavior, you should be good.
Copy code
"tap_stream_id": "SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER",
      "table_name": "CALL_CENTER",
The tap does this so all stream names are unique, but not all targets can gracefully handle it. I don’t recall the easiest way to override the stream name but I can dig deeper into this if needed. (Also - it’s not intuitive, but the
table_name
ref above is (according to Singer Spec) a documentation of the upstream table name, not the intended downstream one. This was very confusing to me when I first started out, which is why I want to call it out.)
s
@aaronsteers sorry looking around to see how i can do this
Copy code
add an explicit stream name or otherwise override that behavior
a
Okay, thanks, @douwe_maan! @surya_g - per the above from Douwe (and per my comment a little bit back around different target’s handling of dashes), it looks like the transferwise variant of your target should better support that syntax.
If you wanted to override the catalog file, you can add a 
"stream": "call_center",
 entry just below the 
tap_stream_id
 declaration into your catalog file (as quoted in this message), but the downside is that then you are maintaining that custom catalog. Douwe’s suggestion of switching to the other variant would be easier and probably the better way to go.
s
ah yea i'll probably maintain catalog file on my own
Copy code
(.venv) ip-192-168-86-137:attentive sgaddipati$ meltano elt  pipelinewise-tap-snowflake target-postgres
meltano                    | Running extract & load...
pipelinewise-tap-snowflake | /Users/sgaddipati/code/meltano/attentive/.meltano/extractors/pipelinewise-tap-snowflake/venv/lib/python3.8/site-packages/pandas/compat/__init__.py:120: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
pipelinewise-tap-snowflake |   warnings.warn(msg)
pipelinewise-tap-snowflake | time=2021-05-20 11:59:12 name=tap_snowflake level=INFO message=Getting column information for snowflake_sample_data.tpcds_sf10tcl.CALL_CENTER...
pipelinewise-tap-snowflake | time=2021-05-20 11:59:13 name=tap_snowflake level=INFO message=Beginning to sync SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL.CALL_CENTER
pipelinewise-tap-snowflake | time=2021-05-20 11:59:13 name=tap_snowflake level=INFO message=Stream SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER is using full table replication
target-postgres            | time=2021-05-20 11:59:13 name=target_postgres level=CRITICAL message=Primary key is set to mandatory but not defined in the [SNOWFLAKE_SAMPLE_DATA-TPCDS_SF10TCL-CALL_CENTER] stream
target-postgres            | Traceback (most recent call last):
target-postgres            |   File "/Users/sgaddipati/code/meltano/attentive/.meltano/loaders/target-postgres/venv/bin/target-postgres", line 8, in <module>
target-postgres            |     sys.exit(main())
target-postgres            |   File "/Users/sgaddipati/code/meltano/attentive/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/target_postgres/__init__.py", line 373, in main
target-postgres            |     persist_lines(config, singer_messages)
target-postgres            |   File "/Users/sgaddipati/code/meltano/attentive/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/target_postgres/__init__.py", line 209, in persist_lines
target-postgres            |     raise Exception("key_properties field is required")
target-postgres            | Exception: key_properties field is required
meltano                    | Loading failed (1): Exception: key_properties field is required
meltano                    | ELT could not be completed: Loader failed
ELT could not be completed: Loader failed
a
@surya_g - Does this table have a primary key - or else a unique key? If it has one but it’s just not discovered, you can provide it and that would cause this error to go away. You can also silence the error with a setting on that target (sorry, I forgot the name of the setting).
s
sounds good. Just trying to get a simple example going.
i'll try to find the setting for sliencing it for now
a
Totally fair. For a simple example, you can just turn off the
primary_key_required
(sic) setting. It only exists to prevent users from getting shocked by duplicates in the downstream, since without a primary key set, the target just appends records each time.
s
@aaronsteers @douwe_maan got the whole thing working from end to end
thank you so much. Hopefully i can contribute something back to the project soon!
a
@surya_g - So glad to hear you got it working. The first one you setup is always the hardest. You’ll be a pro in no time 🙂