Having trouble running multiple Meltano processes ...
# troubleshooting
h
Having trouble running multiple Meltano processes in parallel, each with a separate tap with non-overlapping selections. One major issue is that the processes often make no progress at all, although it's hard to get much insight as I need to wait for them to finish to get the output to my controlling application. Why might this be? Do they all read a common lock file and stop each other?
v
Meltano itself doesn't have anything that limits this (I have 20-30 in parallel all the time) I'd guess it's your target, but hard to know without more info, could you share your meltano.yml and how you're running this, and your logs?
There's a feature that if you're using for state on s3,etc could cause locking, but I"m guessing at this point seeing the file will help
h
Hmm, well that's reassuring at least. I'm still using systemdb, manually syncing the state with S3, and only before + after the Meltano subprocess commands are called, so that should be fine. I'll give you more info when I'm back at work, I'll give you something more concrete. It's possible they're just spending too much time refreshing the tap.json. I've copied the tap.json file from the parent tap to each worker tap, but forgot about the cache key, which is probably required to avoid that. I'll also launch them all from the command line in order to see the output.
👀 1
v
Yeah I've only hit a locking issue with catalog.json file once that I remember I think the current implementation meltano stores the discovered catalog in memory, dumps it to a file and then reads the file into memory for the non discovered run. Meaning the lock issue would have to happen in between the tap getting invoked for the discover and the "regular" catalog provided run, I haven't hit enough to care to go add a guid for catalog generation in meltano but it is possible!
👍 1
h
Here is what I've been playing with at the moment. I am finding now, even with a single worker with only 5 tables selected (of about 1300 overall), it takes Meltano about 5 minutes to parse through the tap.properties.json (the visiting/skipping nodes etc). It uses ~100% CPU, and ends up in a broken pipe error, and no state is written. I guess I need to Manufacture my own state.json so Meltano doesn't keep going through that process, and then only using incremental runs from there?
The broken pipe error for reference:
Copy code
2024-07-24T23:22:48.992886Z [debug    ] Deleted configuration at /Users/hayden.ness/meltano_test/.meltano/run/tap-mssql-worker-0/tap.e01ac800-cd43-4b24-b80c-d60af08c4edd.config.json
2024-07-24T23:22:48.993199Z [debug    ] Deleted configuration at /Users/hayden.ness/meltano_test/.meltano/run/meltano-map-transformer/mapper.1e66edde-2472-4c59-ab7a-5e54f01255ec.config.json
2024-07-24T23:22:48.993430Z [debug    ] Deleted configuration at /Users/hayden.ness/meltano_test/.meltano/run/target-s3/target.1f468ed7-32fa-40af-9864-6c0383d7eaeb.config.json
2024-07-24T23:22:49.206355Z [error    ] [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/logging/output_logger.py", line 208, in redirect_logging
    yield
  File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 488, in run
    await self.run_with_job()
  File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 520, in run_with_job
    await self.execute()
  File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 480, in execute
    await manager.run()
  File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 687, in run
    await self._wait_for_process_completion(self.elb.head)
  File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 760, in _wait_for_process_completion
    raise output_futures_failed.exception()  # noqa: RSE102
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/logging/utils.py", line 230, in capture_subprocess_output
    if not await _write_line_writer(writer, line):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/logging/utils.py", line 198, in _write_line_writer
    await writer.wait_closed()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/streams.py", line 364, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/unix_events.py", line 717, in _write_ready
    n = os.write(self._fileno, self._buffer)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
2024-07-24T23:22:49.313510Z [warning  ] Loop <_UnixSelectorEventLoop running=False closed=True debug=False> that handles pid 68974 is closed
I think I've found the main issue. If I just use Popen, then communicate(), things seem to work in the expected timeframes. If I use Popen, polling, then communicate(), the processes never seem to terminate. Presumably the pipe fills up and with nothing to consume, creates a block in the child. I will have to look at this problem more closely. Even with Popen->communicate(), there will still be blocked processes until previous processes in the loop have completed via the communicate() call. I might try a file.
v
I am finding now, even with a single worker with only 5 tables selected (of about 1300 overall), it takes Meltano about 5 minutes to parse through the tap.properties.json (the visiting/skipping nodes etc)
1. Are you sure discovery isn't running? Sounds more like discovery is running and it takes 5 minutes to run 2. I would not modify the meltano.yml for each run. Instead use env variables like https://docs.meltano.com/concepts/plugins/#select_filter-extra 3. I would not copy around the cache files. Let meltano handle that, if you want to pass a pre generated catalog file like https://docs.meltano.com/concepts/plugins#catalog-extra 4. Sharing your code would be more helpful, but I think the above really should get redone if you want this to work. Parallelization is something you need to think about very carefully, it's hard to do right and there's a lot of ways to skin this cat
We are really in #C0699V48BPF as this isn't a meltano issue but happy to leave this thread going!
h
All good, I agree. I've resolved the issue now anyway, but I think I'll take some of that advice on board. Just need to get something working for initial deployment, then I'll have more time to give everything the required amount of thought and effort. Thanks!
v
Can you share? Someone will hit this at some point!
h
Sure, the key part is here. Writing to files prevents the filled pipe problem. I read the files in afterwards to print them out, and will handle errors from them in the future. It's still very ugly and primitive, but I'm focusing on getting the functionally confirmed first. I'm sure there's much better ways to accomplish this.
Copy code
processes: List[subprocess.Popen] = []
    stderr_files = []
    stdout_files = []

    for worker in all_workers:
        command = f"meltano run{' --full-refresh' if full_refresh else ''} {worker} mappy target-s3"

        stderr_file = open(f"{worker}_stderr.txt", "w")
        stdout_file = open(f"{worker}_stdout.txt", "w")

        stderr_files.append(stderr_file)
        stdout_files.append(stdout_file)

        print("starting command: " + command)
        processes.append(subprocess.Popen(shlex.split(command), stdout=stdout_file, stderr=stderr_file))

    global_processes = processes # Quick fix for killing everything on ctrl-c while developing
    processes_done: List[subprocess.Popen] = [False] * len(processes)

    while False in processes_done:
        for i, process in enumerate(processes):
            retcode = process.poll()
            if retcode is not None:
                if not processes_done[i]:
                    print(f"process {i} done.")
                    processes_done[i] = True
        time.sleep(0.1)
As for generating the workers, I am currently inserting them in to meltano.yml, each with an independent subset of the full catalog select list. I'll try separate .yml files which I import shortly. I did also find using catalog-extra to provide a substantial performance improvement so that's great!