Hi All, I am trying to understand how Meltano inte...
# best-practices
s
Hi All, I am trying to understand how Meltano internals are working. I am not able to figure out why one of the cases below is working but not both. • I am using Multiprocessing to call, child stream with parent keys. the data sent to Target is getting into race condition and the data posted is broken at times like ◦ expected sample, I have not added signer format but consider this in singer format and type record ▪︎
{"key1": "value1" }, {"key2" : "value2"}
◦ random race condition data sample: the second record is emitted and written to the target before completing the first record writing. ▪︎
{"key1": {"key2" : "value2"}
◦ I was under the assumption this was happening because of batching at tap and data written to jsonl ◦ This is happening even on removing the batching at tap • I tried Multiprocessing outside tap and call the meltano el as a subprocess for each chunk of data like below . This works without race condition.
Copy code
def run(time_range):
    try:
        is_backfill = os.environ.get("TAP_ZOOM_IS_BACKFILL")
        start_time, end_time = time_range
        start_time = shlex.quote(start_time)
        # start = start_time.replace(" ", "").replace(":", "")
        end_time = shlex.quote(end_time)
        cmd = (
            f"export MELTANO_STATE_BACKEND_URI='<s3://iflow-prod/state/backfill/zoom/>'"
            f"export TAP_ZOOM_FROM_DATE={start_time} TAP_ZOOM_TO_DATE={end_time} TAP_ZOOM_IS_BACKFILL={is_backfill}; "
            f"source .venv/bin/activate ; "
            f"meltano el tap-zoom target-s3-zoom --force;"
        )
        subprocess.run(cmd, shell=True, check=True)
        return {"time_range": time_range, "status": "success"}
    except Exception as e:
        return {"time_range": time_range, "status": "error", "error": str(e)}
I was wondering if both the approaches spin an individual stdout pipe for each process spun but why is it getting into race condition in case 1 and not in case 2. My understanding is meltano sends data to stdout as per the target emit code, • might be a silly question but. How is Meltano differentiating logs that are emitted and singer records emitted? • when I spin a separate process this stdout should be different from the main process stdout right or else, is it the same stdout pipe. thanks for the time to read through any help is much appreciated, Thanks and have a great day
I can see Meltano spinning separate pipes for each async invoke in the below code. I think this is why the subprocess is running without issue, but multiprocessing inside the tap is using the same stdout PIPE. This is my assumption, as I am not able to visualize how the code is working. Let me know if this is the right evaluation or if I am just running in the wrong direction. https://github.com/meltano/meltano/blob/ca3984fd62b2d999fb1dcc648fc713070aa6de99/src/meltano/core/runner/singer.py#L63
e
• might be a silly question but. How is Meltano differentiating logs that are emitted and singer records emitted?
not silly at all. Meltano assumes the Singer stream is in stdout, while logs and other output are in stderr. It would be useful to dump the Singer stream into a file and inspecet the sequence of messages.
s
got it and I believe meltano is creating a stdout pipe for "meltano el" and using the same pipe even after creating multiple processes and calling tap.stream("child").sync() in each process. when I call meltano el in multiprocess and each process invoking "meltano el" creates scoped stdout for it
do you have a sample tap that is using parallel calls like calling all streams passing chunks of dates, I want to make sure I am doing the right thing and not get into race condition again
the messages were sequenced fine the stdout writes were dangled due to too many processes writing to the same process
e
do you have a sample tap that is using parallel calls like calling all streams passing chunks of dates
I don't have one handy unfortunately.
I believe meltano is creating a stdout pipe for "meltano el" and using the same pipe even after creating multiple processes and calling tap.stream("child").sync() in each process
If all the processes in the tap are writing to the same stream (stdout) then that's problem. You probably rather need a "multiplex" solution, similar to something proposed in https://github.com/meltano/meltano/issues/3303.