Has anyone gone through the exercise of converting...
# singer-tap-development
d
Has anyone gone through the exercise of converting a non-SDK tap to SDK? What kind of effort is involved? Mainly wondering what it would take to add support for batch messages. Or how much of an effort it would be to add batch to non-SDK tap?
v
all depends on what exactly you're going after batch for. Is it just to get the
batch
messages so the target can handle them. Meaning you're ok with the speed of the tap right now but the target is your issue.
d
I think the target might have batch support already. The issue is speed. Not sure what expected performance is, but it seems incredibly slow. For something with around 4m records, it takes hours.
Actually, is there a way to know for sure if batch is supported on the target? Would it be listed as a capability? The reason why I thought it is was because I needed to set
batch_config.batch_size
in addition to
batch_size_rows
in order for the batch size to actually change. And
batch_config.storage.root
seems to be working as well.
v
If it's because of performance I would start here https://github.com/meltano/meltano/issues/6613#issuecomment-1215074973 Figure out what exactly is the problem is it the tap or the target. If it's the tap that's slow then we need to figure out what exactly is making it slow, which endpoints are slow. Is there a batch endpoint available (batch messages aren't really needed to improve this) that would 100x your speed? Then convert to that, etc
d
What do you mean by batch endpoint?
v
First follow the steps in that issue as it'll save us a bunch of time!
d
Already on it!
Testing out some variations since I have a mapper and some other customizations for sanitizing data as well
v
"batch endpoint" in this context means is there some way to get the data faster then we're currently doing it. Maybe there's an export endpoint that you can call to get a file that bypasses the need for 10k api requests, etc. What is the tap? What is the stream that's taking a while
@Daniel Luo just running
time meltano invoke tap-name
should really narrow this down!
d
Ah okay. It's tap-mssql
1
v
Now we're getting somewhere! 🙂
After all that if you could share your meltano.yml and the command you're using that's slow we can all help a lot easier 😄
d
Does it make much difference if I dump the output file to time the tap?
v
It shouldn't, I would do the steps in the issue
time meltano invoke tap-name > out
d
For 500k records: 20m 26s run 3m 2s on tap 3m 28s on mapper 12m 44s on target-snowflake After removing all settings on target, it took even longer at 21m 28s
v
Ok we're getting somewhere here we know that the target is our bottleneck that's helpful!
can you share your meltano.yml? It'd be nice to see the snowflake settings
Try setting
validate_records
to False in the target and see what the new results are
d
Well, with all settings removed, it was even worse. I don't think it's the settings. I only had some batch size configuration
v
We should share it all as it'll help
d
I mean, with settings removed, there's literally nothing to share. It's the defaults
Or do you want what I had set for the faster run?
v
anything helps, do that and try the validate_records thing
Some other people may have messed with this, I didn't know target-snowflake was slow
d
Default for validate_records is false
v
Not true, could you please share yoru meltano.yml
d
Copy code
loaders:
  - name: target-snowflake
    variant: meltanolabs
    pip_url: meltanolabs-target-snowflake
That's all for snowflake side
Oh sorry, you're right. I was looking at required column, not default value
Let me try that
1
🤞 1
Trying again with this
Copy code
loaders:
  - name: target-snowflake
    variant: meltanolabs
    pip_url: meltanolabs-target-snowflake
    config:
      validate_records: false
😄 1
I think 3 min is still rather slow for extracting 500k records, though. I tested another tool, dlt, which was doing 7.5k records/s on same hardware, which I think was around 1.3m/3 min
v
Yes it is pretty slow, the tap can get improved for sure but if the target here is 4x slower we should focus on that
d
I don't think validate_records is doing much. So far 5 min and 160k records based on number of times target sink has been full and drained
v
Can you share the logs from this run? Then it's looking at if there's an easy win here
d
I did notice that loading data to snowflake with dlt, if I specify parquet file format, it took 26 min, while csv was 2 min. I haven't tried jsonl yet.
Are you looking for debug logs? or is info enough? It's still not done yet
v
normal shoudl be enough, the point here is we need to look at what is taking a long time
Is it the s3 upload, etc etc.
From the target developer side I'd probably open up viztracer or something to performance analyzer what is the slowest part then work through it. But a lot of time there's easy gains that get you to what you need
d
Copy code
> meltano invoke target-snowflake --input map-output.json
2024-10-01T14:15:19.837133Z [info     ] Environment 'default' is active
/home/dluo/git/dagster-hybrid/src/elt_projects/meltano/.meltano/loaders/target-snowflake/venv/lib/python3.12/site-packages/snowflake/sqlalchemy/base.py:1068: SAWarning: The GenericFunction 'flatten' is already registered and is going to be overridden.
  functions.register_function("flatten", flatten)
2024-10-01 14:15:21,135 | INFO     | target-snowflake     | Target 'target-snowflake' is listening for input from tap.
2024-10-01 14:15:21,136 | INFO     | target-snowflake     | Initializing 'target-snowflake' target sink...
2024-10-01 14:15:21,136 | INFO     | target-snowflake.client-fh_test | Initializing target sink for stream 'client-fh_test'...
2024-10-01 14:15:21,150 | INFO     | snowflake.connector.connection | Snowflake Connector for Python Version: 3.12.2, Python Version: 3.12.5, Platform: Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.35
2024-10-01 14:15:21,151 | INFO     | snowflake.connector.connection | Connecting to GLOBAL Snowflake domain
2024-10-01 14:15:21,151 | INFO     | snowflake.connector.connection | This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
2024-10-01 14:15:22,656 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:23,693 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:23,965 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 6
2024-10-01 14:15:24,027 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:24,256 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:24,490 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 6
2024-10-01 14:15:24,568 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:24,732 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:25,162 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 96
2024-10-01 14:15:25,439 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:29,001 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:29,087 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:39,193 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 52
2024-10-01 14:15:41,657 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 468
2024-10-01 14:15:42,274 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:42,615 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:42,751 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 96
2024-10-01 14:15:42,831 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:48,131 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:48,220 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:49,538 | INFO     | target-snowflake     | Target sink for 'client-fh_test' is full. Current size is '10000'. Draining...
2024-10-01 14:15:52,195 | INFO     | target-snowflake.client-fh_test | Processing batch of files.
2024-10-01 14:15:53,116 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:53,808 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:55,794 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:55,848 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:15:56,082 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:04,851 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:12,625 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:12,697 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:12,858 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:25,207 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:25,331 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:25,393 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:25,438 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:25,460 | INFO     | singer_sdk.metrics   | METRIC: {"type": "timer", "metric": "batch_processing_time", "value": 35.92187786102295, "tags": {"stream": "client-fh_test", "pid": 19626, "status": "succeeded"}}
2024-10-01 14:16:25,499 | INFO     | singer_sdk.metrics   | METRIC: {"type": "counter", "metric": "record_count", "value": 10001, "tags": {"stream": "client-fh_test", "pid": 19626}}
2024-10-01 14:16:26,639 | INFO     | target-snowflake     | Target sink for 'client-fh_test' is full. Current size is '10000'. Draining...
2024-10-01 14:16:29,276 | INFO     | target-snowflake.client-fh_test | Processing batch of files.
2024-10-01 14:16:33,903 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:33,962 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:34,154 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:34,323 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:34,361 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:40,420 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:43,029 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:43,106 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:43,208 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:43,392 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:43,782 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:44,188 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:44,291 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:44,310 | INFO     | singer_sdk.metrics   | METRIC: {"type": "timer", "metric": "batch_processing_time", "value": 17.670918226242065, "tags": {"stream": "client-fh_test", "pid": 19626, "status": "succeeded"}}
2024-10-01 14:16:45,455 | INFO     | target-snowflake     | Target sink for 'client-fh_test' is full. Current size is '10000'. Draining...
2024-10-01 14:16:48,019 | INFO     | target-snowflake.client-fh_test | Processing batch of files.
2024-10-01 14:16:48,805 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:49,530 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:49,680 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:49,818 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:50,003 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:57,767 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:57,832 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:58,086 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:58,142 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:58,488 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:58,783 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:58,843 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:58,961 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:16:59,095 | INFO     | singer_sdk.metrics   | METRIC: {"type": "timer", "metric": "batch_processing_time", "value": 13.639158725738525, "tags": {"stream": "client-fh_test", "pid": 19626, "status": "succeeded"}}
2024-10-01 14:17:00,716 | INFO     | target-snowflake     | Target sink for 'client-fh_test' is full. Current size is '10000'. Draining...
2024-10-01 14:17:03,330 | INFO     | target-snowflake.client-fh_test | Processing batch of files.
2024-10-01 14:17:04,217 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:05,071 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:05,141 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:05,299 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:05,342 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:23,118 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:23,235 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:25,084 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:25,137 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:26,151 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:27,540 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:27,791 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:27,862 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:27,899 | INFO     | singer_sdk.metrics   | METRIC: {"type": "timer", "metric": "batch_processing_time", "value": 27.182664394378662, "tags": {"stream": "client-fh_test", "pid": 19626, "status": "succeeded"}}
2024-10-01 14:17:27,946 | INFO     | singer_sdk.metrics   | METRIC: {"type": "counter", "metric": "record_count", "value": 30000, "tags": {"stream": "client-fh_test", "pid": 19626}}
2024-10-01 14:17:29,024 | INFO     | target-snowflake     | Target sink for 'client-fh_test' is full. Current size is '10000'. Draining...
2024-10-01 14:17:31,604 | INFO     | target-snowflake.client-fh_test | Processing batch of files.
2024-10-01 14:17:31,910 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:32,123 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:32,200 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
2024-10-01 14:17:32,421 | INFO     | snowflake.connector.cursor | Number of results in first chunk: 1
v
Anyone else had issues with target-snowflake being slow?
I wonder what the target is doing between 2024-10-01 141650,003 | INFO | snowflake.connector.cursor | Number of results in first chunk: 1 2024-10-01 141657,767 | INFO | snowflake.connector.cursor | Number of results in first chunk: 1 Is it pushing s3 chunks up? I haven't dove in deeply to target-snowflake
If I was diving in myself I'd jump in to figure out what's happening there and figure out why it's slow. Probably would use viztracer, or just some breakpoints in the snowflake portions of the target
d
Hmm, not completely sure what I'm looking at, first time using vizviewer. Most of it is just
process_lines
. Is it waiting on something?
Not using S3. The stage is in SF
h
i'd be happy if https://github.com/singer-io/tap-marketo was converted to sdk. I found the tap to be inefficient & while at my old company I forked & optimized it for internal use (to my knowledge the fork is still being used). I hoped to contribute the optimizations back to the community, but got 0 engagement from the maintainers unfortunately https://github.com/singer-io/tap-marketo/pull/88 I made a short presentation of the efficiencies possible & sharing that here in case anyone is curious. it would benefit the community if a more performant tap was available (maybe airbyte has a faster one that we can wrap around), and it would be incredible to have batch capabilities for this source for an even greater speed up in the pipeline. I'd be happy to work on this, but unfortunately do not have access to a marketo instance at my current employment.
1
💪 1
e
a
Not especially slow, but loading regular singer record messages are relatively slow (100x) when compared with batch messages or native Snowflake S3 integration table copy. We have a use case from big query that we’re writing up. @Reuben (Matatika)
d
What's the benefit of using external s3 stage over putting the file in SF's stage?
h
from what i understand there is no real performance benefit between s3 stage & sf's stage (that i've noticed) iiuc, sf's stage will use cloud storage (s3, blob storage, gcs) under the hood. most folks are already on a cloud and being able to pull from s3 or similar reduces unnecessary steps. i'm curious to hear other perspectives on native vs cloud stage.
d
I suppose if you already have the files there, but if you're uploading a file to staging, I feel like it doesn't seem to make a difference, but adds a step for configuration. Maybe to have full control over the lifecycle? Not sure
h
I suppose if you already have the files there, but if you're uploading a file to staging
would you mind clarifying what is meant by
there
and which
staging
you are referring to.
a
(I was comparing the target with totally native Snowflake above - I doubt there’s any benefit to S3 staging the file vs SF staging) From what I saw in the target the non batch implementation uses inserts? That’s very slow compared to the copy staging a file. I didn’t look that closely at the implementation, but I can confirm hours down to minutes for our use case once we made the tap emit batch messages. No doubt some of this was on the big query side too.
d
would you mind clarifying what is meant by
there
and which
staging
you are referring to.
I was referring to having files in external, and staging in general, as in you have a local file that you need to upload somewhere, whether that be SF or s3
From what I saw in the target the non batch implementation uses inserts? That’s very slow compared to the copy staging a file.
Based on the query history that I see in SF, what it does is first does a PUT to put the results in staging, then does a merge from staging into the table
Though that is for a CDC/incremental approach. I'd have to find a full table replication and check
Still seems to be a merge