Siddu Hussain
04/11/2025, 10:28 AM{"key1": "value1" }, {"key2" : "value2"}
◦ random race condition data sample: the second record is emitted and written to the target before completing the first record writing.
▪︎ {"key1": {"key2" : "value2"}
◦ I was under the assumption this was happening because of batching at tap and data written to jsonl
◦ This is happening even on removing the batching at tap
• I tried Multiprocessing outside tap and call the meltano el as a subprocess for each chunk of data like below . This works without race condition.
def run(time_range):
try:
is_backfill = os.environ.get("TAP_ZOOM_IS_BACKFILL")
start_time, end_time = time_range
start_time = shlex.quote(start_time)
# start = start_time.replace(" ", "").replace(":", "")
end_time = shlex.quote(end_time)
cmd = (
f"export MELTANO_STATE_BACKEND_URI='<s3://iflow-prod/state/backfill/zoom/>'"
f"export TAP_ZOOM_FROM_DATE={start_time} TAP_ZOOM_TO_DATE={end_time} TAP_ZOOM_IS_BACKFILL={is_backfill}; "
f"source .venv/bin/activate ; "
f"meltano el tap-zoom target-s3-zoom --force;"
)
subprocess.run(cmd, shell=True, check=True)
return {"time_range": time_range, "status": "success"}
except Exception as e:
return {"time_range": time_range, "status": "error", "error": str(e)}
I was wondering if both the approaches spin an individual stdout pipe for each process spun but why is it getting into race condition in case 1 and not in case 2.
My understanding is meltano sends data to stdout as per the target emit code,
• might be a silly question but. How is Meltano differentiating logs that are emitted and singer records emitted?
• when I spin a separate process this stdout should be different from the main process stdout right or else, is it the same stdout pipe.
thanks for the time to read through any help is much appreciated, Thanks and have a great daySiddu Hussain
04/11/2025, 2:30 PMEdgar Ramírez (Arch.dev)
04/11/2025, 4:33 PM• might be a silly question but. How is Meltano differentiating logs that are emitted and singer records emitted?not silly at all. Meltano assumes the Singer stream is in stdout, while logs and other output are in stderr. It would be useful to dump the Singer stream into a file and inspecet the sequence of messages.
Siddu Hussain
04/11/2025, 7:51 PMSiddu Hussain
04/11/2025, 7:52 PMSiddu Hussain
04/11/2025, 7:53 PMEdgar Ramírez (Arch.dev)
04/14/2025, 7:57 PMdo you have a sample tap that is using parallel calls like calling all streams passing chunks of datesI don't have one handy unfortunately.
I believe meltano is creating a stdout pipe for "meltano el" and using the same pipe even after creating multiple processes and calling tap.stream("child").sync() in each processIf all the processes in the tap are writing to the same stream (stdout) then that's problem. You probably rather need a "multiplex" solution, similar to something proposed in https://github.com/meltano/meltano/issues/3303.