Hey all, just diving into the world of Singer. So ...
# singer-target-development
r
Hey all, just diving into the world of Singer. So far I was able to create a tap with the SDK and minimal effort 🎉. Now I'm working with the target SDK and a bit stuck with what I'm trying to accomplish. It's a pretty simple target for writing to an AbstractStorage location (S3, SFTP, local disk, etc). I'm trying to write the input stream to a temp file (in micro-batches ~10k records), and then want to upload that file to the destination as the last step after the records are all processed. • I"m using the BatchSink and referencing how target-yaml sets
max_size = sys.maxsize
, but I'm trying to avoid loading all records into memory. Is there any way to get the batch records to arrive as a generator instead of just using the list of
context["records"]
? Then I could process a single batch, writing to the temp file, and finally do my upload all within
process_batch
. • If not, are there any other mechanisms to do something custom after the input stream is exhausted? That would let me process smaller batches and append to the same file each time, finally doing the upload at the end. The only obvious thing seems to override
_process_endofpipe
but I'd rather avoid overriding a private method. Thanks in advance!
p
@ryan_whitten Awesome to hear! If I remember correctly I've done something similar in the past. I've overrided
start_batch
where I create a temp file and add it to the context for later, then override
process_record
to append each record in the batch to that temp file which I can find by pulling its path from the context, then override
process_batch
to just move the file to its destination once the
max_size
is reached. It just appends to a file on disk rather than a list in memory. Does that sound like it would work for you?
r
Thanks, Pat! Gave that a shot and it worked great
p
Awesome, glad to hear it! Is your target going to be in a public repo once you’re done with it? I’d love to check it out, sounds super useful
r
Unfortunately not, at least for now. In finance and my company has yet to embrace open source 🙂
p
ah bummer, I've been there! I think theres an issue somewhere for building a generic target like this, let me try to dig it up, maybe you'll have some good insights to share there
r
Thanks, that would be great. I did come across an issue a while back related to common file "services" and I'm excited for the upcoming generic SQL taps/targets!
f
The target-athena has something like this. I made some PR/MRs for partitioning when uploading to S3, and for obvious reasons need to break the records into different files depending on the partition key, then upload them. It's been a while, but everything is in the PR/MR if you want to checkt it out.
r
Following up here, I was able to get this to work by using smart_open which streams the incoming records directly to the destination file/object. I open the file at the start of the batch, store the fileobj as an instance variable, yield it within
process_record
(with a try/yield/except to always close the file in case of an error), and write the records to it. Not having a temp file means it's not constrained by disk space, and the only memory constraint is how many records I'm internally buffering before write. Pretty cool way to send a huge amount of data and sink to S3!