I’m very pleased to have just found out about the ...
# best-practices
p
I’m very pleased to have just found out about the new BATCH feature. Does anyone know if there’s a BATCH-capable S3 CSV extractor available yet?
a
@peter_s - Can you say more about your use case? Does it matter if the load format is in CSV (native), or is JSONL batching still okay?
p
Hi @aaronsteers. I’m looking for a way to load CSV files from S3 into Snowflake, without the time-consuming overhead of converting to JSONL and then back to CSV for load via SQL COPY. I’m told that target-snowflake (meltanolabs variant) supports batch messages, so I was hoping to find an already existing S3 tap with batch support that I could pair it with. My fallback plan had been to write or modify a tap myself, using the SDK’s support for batching. But your question about CSV vs. JSONL made me take a closer look at the Batch Messages spec, and now I see that it currently only supports JSONL. So it seems what would be needed is SDK support for CSV encoding, and an S3 tap and Snowflake target that use it.
a
Hi, Peter. Yes, that's correct. Parsing from CSV isn't difficult code-wise - the reason that CSV isn't in the batch spec (yet!) is mostly due to a lack of unifying dialect that all CSV imports can agree on, and the implications of one side understanding CSV, but not "that dialect" of CSV.
Curious: for the use case you name above, have you considered skipping EL altogether with a Snowflake external table or an S3 stage definition? You could also use native snowpipe ingestion in theory. For all of these cases, you'd be baking in the CSV dialect information into the Snowflake-side definition, and could read the data (or load the data, your choice) with native snowflake SQL. If you are using dbt, there are also some clever ways to make this a fully automated "set and forget" process that just runs in the dbt flow.
To circle back to traditional EL approach, to my knowledge, the most supported and commonly used flow would probably be
tap-spreadsheets-anywhere > target-snowflake
.
p
We actually started out loading from S3 stages using native Snowflake SQL, but have run into several problems with this approach: • Managing schema changes in a flexible way has become important, both because schema inference sometimes doesn’t get it right the first time, and because our sources sometimes change their schemas without warning. We were managing schema changes using database migrations, but that was pretty clumsy. It’s been much smoother to instead just edit a Singer catalog and have the Snowflake target automatically alter the table, which is also easier for some of our developers who don’t have much database experience. Using strict JSON validation within the target has also helped us catch unexpected column additions or deletions, whereas the native COPY statement only catches type changes. • Snowflake will lose track of which files it’s already loaded into a table if the COPY statement that we load with changes in any way. (Snowflake seems to associate load history with the COPY statement rather than with the table itself.) So when a schema change involves a change to the COPY statement, we end up with a whole bunch of reloaded files. • We were using Snowflake tasks to execute the loading, but that was pretty arcane for most developers. I haven’t looked seriously into Snowpipe, but I assume it would also produce duplicate loads when a COPY statement changes. We recently switched to the
tap-spreadsheets-anywhere > pipelinewise-target-snowflake
flow and have been happy with how much EL development work it’s been saving us, but its slowness on very large (20G) files has become a real problem. We do use dbt for transformations and tests. Can you say more about how dbt can be used for loading?
a
All that makes perfect sense! The challenges of changing schema over time is definitely an issue I can relate to - and especially so since Snowflake doesn't honor or even read the file reader row.
The methods I'd suggest regarding dbt-based integration would just build on the same Snowflake strategies that don't deal well with schema volatility.
You could (perhaps?) mitigate those using dbt's snapshot feature but I don't like relying on snapshots personally, and even then someone has to periodically and manually update the snowflake mapping.
Do you have any option to have the data provided to you in a different format than CSV?
JSONL or Parquet would open up a couple more options, since those are at least self-describing from Snowflake's perspective.
p
Sadly no: Our sources are mostly scientific software and clinical sites, neither of which tends to use modern software practices! It’s often a victory just to get data in a format other than Excel. But the JSONL option is interesting. Since taps produce JSONL, I’m wondering if it would be quicker to have a tap just write JSONL to disk and have Snowflake upload the JSONL directly. Which raises a question I’ve been wondering about: Do you happen to know why Singer EL is so slow? I’ve tried to time parts of the process, and the download from S3 by the tap and the COPY of a (new) CSV file into Snowflake are very fast. The writing of JSONL by the tap and the parsing by the target are slower, but still look like they consume maybe only a third of the total EL time. And turning JSON validation on or off doesn’t seem to make much difference in speed. So I’m wondering what the bulk of the EL time is devoted to.
a
Got it. So, Singer EL isn't itself slow (except when compared to native BATCH transfer, which is why we invested in adding that feature. But to answer your core question, for why this particular pipeline is slow... I can give you some common root causes that can be independently tested for your use case. 1. Tap is slow. This can be for a number of reasons, but in your case the tap-spreadsheets-anywhere implementation prioritizes general all-purpose interop, and may not actually be the fastest for your use case. (It uses the smart-open library, if I recall correctly.) 2. Target is slow. It's possible to be an issue here, but I don't think this is your issue, because the Snowflake target is already widely used for high-volume use cases. 3. Target is batching too often or with too-small increments. This is different from the target being slow - because any fast target can be slowed down by loading one record at a time instead of 1 million.
Have you tried running the two steps in isolation? So, first invoke the tap (with no target) and save all output to a local file. Then after that has run, cat the file into the target. By timing these two processes separately, you should be able to identify whether the tap or target is your primary bottleneck.
Something like this:
meltano invoke tap-spreadsheets-anywhere > my-output.jsonl
Then something like this to test the target:
cat my-output.jsonl | meltano invoke target-snowflake
(Caveat: those scripts might need some adaptation since I'm writing by hand and by memory 😅)
What you might find is that the generic smart_open implementation in the tap is just slow for large sources. If so, you could consider tuning it in a fork of this tap, or trying out something like a native tap-s3-csv to see if it's any faster. Or even try first downloading the files locally and then run tap-spreadsheets-anywhere against the local path instead of the remote S3 location.
p
I’ve tried them both in isolation, including from local files, and separately they’re each fairly slow — maybe each 50-60% of the running time of the full EL process (so the full EL process seems to benefit only slightly from having the two running simultaneously). I’ve tried increasing the target batch size 10- or 100-fold, but only saw a 10% speedup. I’ve also tried a different S3 tap (I think it was from pipelinewise) and it was a little faster but not by a huge factor. So I figured the tap’s slowness was something inherent to the Singer spec or Singer libraries and didn’t explore it further. This EL pipeline takes literally 20 times longer than loading the same files using our original process that used Snowflake stages and COPY statements directly, so it’s interesting to hear that speed isn’t generally a problem for Singer users. I’m wondering if it has to do with the size of our files, which can be 20 GB or larger.
a
Thanks for posting the breakdown of you performance analysis.
I’ve also tried a different S3 tap (I think it was from pipelinewise) and it was a little faster but not by a huge factor. So I figured the tap’s slowness was something inherent to the Singer spec or Singer libraries and didn’t explore it further.
Not Singer-specific per se, but any process that reads one record at a time and writes out one record at a time will necessarily be a lot slower than something that is operating at the batch-interface level: where the target system just reads the file directly.
This EL pipeline takes literally 20 times longer than loading the same files using our original process that used Snowflake stages and COPY statements directly
Not surprising to me at all. Whenever compared with a process that does not need to pre-process the files at all because Snowflake is reading those files directly (e.g our Singer BATCH spec, snowflake native COPY operations, Snowpipe, etc.), then yes, something liek 20x improvement is exactly what I'd expect in those cases.
You've mentioned the need for high volume large files, and also a need to adapt to changing schema dynamically, which I mentioned is difficult or impossible to perfectly optimize today with the CSV fire format, due to Snowflake not caring about CSV headers. By any chance, do you have a subset of CSV files that are the bulk of the volume, and a subset of the CSV files which are smaller in nature? Many applications and data platforms, for instance, may have 2-4 very large datasets that are the bulk of the processing time, with 20-40 smaller datasets that provide metadata lookups, etc. If your application looks like this too, then you could keep the dynamic auto-schema parsing for the large number of smaller files, while finding a throughput-optimized approach for the (hopefully) smaller number of large files. The downside is that you'd have two processes running: but you'd have one prioritizing throughput at the cost of flexiblity, and one providing flexibility at the cost of throughput. Combining both strategies to the files that can best benefit from each might be a path forward for you, but again, this would presume that your case fits the pattern of a few large files and larger number of small files - and I don't know if that's true for you.
p
Thanks, this is a very helpful analysis, and I’m glad to know there’s a clear explanation for the speed difference. If writing one record at a time is the culprit, then I understand why your suggestion of a JSONL-based batch flow should solve the problem. So if a JSONL-batch-enabled S3 tap becomes available, I’d be interested in checking it out. Our data sets do fit the pattern you describe, and we’re in the process of transitioning the small group of large data sets to a different EL process. We’ve unfortunately already seen unexpected schema changes in the large sets, but yes throughput may still need to outweigh flexibility for these cases.
a
@peter_s - Thanks very much for confirming these points. Short-term, the bifurcated approach for your largest datasets is probably your best bet. Long-term, I do think it's worth exploring a CSV batch approach, specifically between tap-csv and target-snowflake. Back to the CSV dialect discussion then, can you say what CSV dialect details your upstream publishers are currently using? Optionally, you could open an issue in the SDK repo with those specific details, and we could discuss there how we might configure the batch config for that case. For these largest datasets, our goal would probably be for the CSV tap to detect column names during discovery, then otherwise just hand those files to the target along with a column and the necessary dialect/parsing info. But aside from copying into the S3 bucket, our goal would be to hand off the files natively, without tap or target python code having to read or write any particular record data. (The performance challenge then transforms into "how quickly can snowflake ingest a big file", rather than "how quickly can the task and target process n million records.") Does that sound like a good path forward?
p
Our publishers generally produce tab-delimited files with a single header row. But I’m wondering if the BATCH message spec could include dialect information, so that the SDK wouldn’t necessarily have to know anything about specific dialects, and the dialect information could just be blindly passed to a CSV reader. That sounds like a good plan for how to handle large data sets. Loading into Snowflake (and most DBs) is pretty fast. FYI, as a temporary solution I’ve now hacked my own copies of
tap-spreadsheets-anywhere
and
pipelinewise-target-snowflake
, adding to the tap an option to read just a few records from the file and pass them to the target for schema validation (which the target already supports), and adding to the target an option to retrieve the original S3 location from the passed records (which the tap already includes as metadata) and load from the original location. It’s definitely not clean design, but the loading is very fast, and I still get schema validation. So one variant you might consider is to have an option for the tap or target read some of the data for validation purposes, though I don’t know whether it’s possible to do that cleanly enough to make it part of the spec or the SDK.
j
I'm also very excited to learn about the new batch feature (was just reading about it here https://meltano.com/meltano-tricks/6x-more-speed-for-your-data-pipelines-with-batch-messages/) and I was wondering if there is a comprehensive list of plugins that have incorporated this batch feature? For example, I would really like to test it out for a SQLServer extractor + Postgres Loader
a
@jenna_jordan - On the hub, basically anything that shows it was built with the SDK will support this feature. This one does, for instance.
p
I’m curious what time savings people are seeing with the switch to BATCH. I’m seeing about a 2-fold speedup compared to using RECORD messages.
a
Sweet! Thanks, @peter_s for sharing. I'm curious too if anyone has insights. I can share from my own experience, I ran tap-jaffle-shop in batch mode when I was debugging a performance issue. It brought runtime down from 2 minutes to about 20 seconds - which is very near the runtime of the underlying API. Eventually I was able to tune the record-based performance to get that same ~20 second runtime - but it was helpful to run in batch mode to see what my hypothetical best performanceewould be.
j
Thank you! That is good info, I did not realize not all variants were not built on the meltano sdk
p
@aaronsteers thanks for the comparison measurements. I used https://hub.meltano.com/loaders/target-snowflake--meltanolabs/ as my target, and for my tap I used a version of https://github.com/ets/tap-spreadsheets-anywhere in which I’d hacked in a copy of the SDK’s batching code, just for testing. I’m guessing that the
tap-spreadsheets-anywhere
overhead is what made my speed-up only 2x compared to your 6x: You’ve mentioned that
tap-spreadsheets-anywhere
may be slow due to
smart_open
, and I’ve found that https://github.com/transferwise/pipelinewise-tap-s3-csv is 2-3x faster than record-based
tap-spreadsheets-anywhere
, so that could account for the difference (though I realize this isn’t an apples-to-apples comparison to your case). For further comparison, I got a 20x speedup of an S3->Snowflake flow in a test in which (a hacked version of) the target loads the original CSV files located on S3. But I think it may be worth it to settle for something like 6x, to get the benefits of a JSON-based flow (i.e. abstraction away from the specifics of Snowflake, cleaner implementation of validation if desired, extensibility to non-CSV source files, etc).
a
@peter_s - Good news! We've got a spec proposal emerging here for CSV support in BATCH messages: https://github.com/meltano/sdk/issues/1584#issuecomment-1499263096 If you want to make comments there, we could soon open this up as "Accepting Pull Requests". As noted, the implementation is actually not very difficult - deciding on the spec is the hard part, and we may be close to landing on one.
p
@aaronsteers This is great! And I’m appreciative of how responsive you guys are.
a
Thanks, @peter_s. That's very kind. 🙂