Hi All , I’m working on a custom Meltano target us...
# meltano-plugin-development
z
Hi All , I’m working on a custom Meltano target using Singer SDK v0.44.3, and I’ve hit an issue where the finalize method in my RecordSink subclass isn’t being called. I’m buffering records and flushing them to S3, expecting finalize to handle the final flush and log a summary, but it never triggers. Here’s the setup: • My sink subclasses singer_sdk.sinks.RecordSink. • process_record buffers records and flushes at a threshold (e.g., 1000 records). • finalize sets force_flush = True, calls flush_buffer, and logs a summary. • Logging is at DEBUG level, but I don’t see “Entering finalize method” in the logs. From the docs, I understand finalize should be called by the Target class after all records are processed, but it’s not happening. Has anyone run into this? Could it be related to: • The tap not signaling stream completion? • An exception killing the process early? (No obvious errors in logs, though.) • A misstep in how I’ve set up the target or sink? I’ve fixed a bug in my buffer logic (was comparing len(buffer) to a boolean), but that didn’t resolve it. I’d appreciate any pointers or examples of working finalize implementations! Happy to share more code/logs if needed. Thanks!
e
Hi @Zahir Alward! None of the
Sink
classes have a
finalize
method.
From the docs, I understand finalize should be called by the Target class after all records are processed, but it’s not happening. Has anyone run into this? Could it be related to:
I'm curious where in the docs you're seeing this. If you're buffering the records, you probably need to subclass
BatchSink
instead and implement
process_batch
.
z
Hi @Edgar Ramírez (Arch.dev) If you're using a
RecordSink
class with a source of indefinite length, the sink won't commit changes to the destination. I might be implementing it in a non-standard way. I worked around this by using
BatchSink
instead. Is
RecordSink
the right approach for handling indefinite streams?
e
Is
RecordSink
the right approach for handling indefinite streams?
It could be, but you might need to implement
start_drain
. I honestly haven't seen a solid use case for
RecordSink
as far as I recall.