Hello ! First thank you for your great `singer-sdk...
# singer-tap-development
a
Hello ! First thank you for your great
singer-sdk
🙂 I am developing a
target
and I’m writing batches into a temporary table in my database. I would like to overwrite the final table with the newly created temporary table at the end of all batches from the
sink
. I didn’t find a function called at the end of all batches in the sdk, maybe I overlooked something ? Why : If we copy the temporary data into the target table for each batch, we obtain a target table partially filled in production Thank you 😉
v
Take everything in this repo with a grain of salt, but I used
clean_up
https://gitlab.com/autoidm/autoidm-target-mssql/-/blob/main/target_mssql/streams.py#L323
The latest SDK release may use something different, note that this repo isn't really using the SDK. It's a primitive version. I'd guess there's some kind of clean_up method somewhere
I think most people would say instead of the DROP, you'd do an UPSERT, then go delete any records that didn't get updated (Activate_version is the method people point to for deleteing old records)
a
Thank for your answer @visch! It’s exactly what I would need, but I’m trying to keep using the
sdk
😉
This line looks to drain all of the sinks for you at the end, so you should be good to go if you're implementing
process_batch
a
yeah that’s what I started doing thanks 😉
v
drain_all
is called more than just there, so this may not be the answer you're looking for. https://sourcegraph.com/gitlab.com/meltano/sdk@main/-/blob/singer_sdk/target_base.py?L303:14#tab=references If your schema changes it'll get called as well
Instead of overriding it, I think the recommendation is to use
process_batch
, might be a good idea to get a
clean_up
function added? Not clear to me
a
yes I’ve seen that but in our cases schema is not changing, so I think it should be ok to use it for that 🤔
Yeah but
process_batch
is running for each batch , but I only want to clean at the end all all batches
p
@andreas_zaidan this is a good question. @aaronsteers and I discussed it in this issue a while ago. In Singer theres not a mechanism right now for targets to know when a stream is completed. Pre-sdk I built some targets with the assumption that a schema change means a stream is complete but that assumes your tap is sending streams serially, which isnt always the case. Also state messages are written out after a batch is drained so the data should be in its final state at the end of
process_batch
or theres a chance that the state bookmarks could be incorrect (i.e. state says records were written but they never got transferred from the temp table so the next sync skips those records and the target table is out of sync).
I'd love to see a new record get added to the spec that announces a stream as completed or even just a convention as part of the state messages like this
currently_syncing
state entry. I think this could make draining more efficient to.
a
@andreas_zaidan - I think as of now, the recommendation would be to write and cleanup batches as they are completed. For full table sync operations, you might receive an
ACTIVATE_VERSION
message if the tap supports it. Are you primarily thinking of incremental or full table sync operations? For incremental operations, you might literally never finish the stream in cases like the proposed --tail mode, so it's important to be saving and cleaning up batches as you go.
a
Thank for your answer @pat_nadolny ! In my use case, I want to overwrite the target table by the temporary one in the case of full table sync operations so I guess that state should not be important
a
This sounds like an opportunity to plug into activate-version. ☝️
a
Alright @aaronsteers I’ll look for that feature then thank you so much
p
@andreas_zaidan heres some info on
ACTIVATE_VERSION
if youre not familiar - https://hub.meltano.com/singer/docs#activate-version
a
@andreas_zaidan one thing that will still make this tricky for you is that I don't know if there are any signals earlier in the stream that would tell you when to expect an ACTIVATE_VERSION message at its end. I'll add a comment on the working group issue here: https://github.com/MeltanoLabs/Singer-Working-Group/issues/9#issue-991554782
Comment added.
@andreas_zaidan - still mulling this over, but for your use case, would it be reasonable to wait until the entire sync operation is complete before finalizing any stream? If so, there's a new method coming to the SDK which might help your use case. Tentative name would be
process_endofpipe()
The only downside of that approach is that it leaves a lot of unfinalized WIP in case of a fatal error, whereas generally we try to prioritize resumability on interrupt.
a
yeah that would be great ! So
process_endofpipe
is called before a
drain_all
?
a
The process_endofpipe() method invokes the last drain_all(), but in theory it could also be used for final cleanup.
@andreas_zaidan - if you have the time, I would love your thoughts on this thread: https://gitlab.com/meltano/sdk/-/merge_requests/194#note_716195061
a
I totally agree with you @aaronsteers
cleanup_target
should be aware of the ending state of the operation 😉 For example : • if failed I would like to clean up my temp tables • If successed I would like to commit the table to production
a
Thanks. Feel free to comment directly to the issue also if you have time. That will allow us to ping you in gitlab within the thread.