Hayden Ness
07/24/2024, 6:37 AMvisch
07/24/2024, 12:21 PMvisch
07/24/2024, 12:21 PMHayden Ness
07/24/2024, 1:08 PMvisch
07/24/2024, 1:15 PMHayden Ness
07/24/2024, 10:58 PMHayden Ness
07/24/2024, 11:26 PM2024-07-24T23:22:48.992886Z [debug ] Deleted configuration at /Users/hayden.ness/meltano_test/.meltano/run/tap-mssql-worker-0/tap.e01ac800-cd43-4b24-b80c-d60af08c4edd.config.json
2024-07-24T23:22:48.993199Z [debug ] Deleted configuration at /Users/hayden.ness/meltano_test/.meltano/run/meltano-map-transformer/mapper.1e66edde-2472-4c59-ab7a-5e54f01255ec.config.json
2024-07-24T23:22:48.993430Z [debug ] Deleted configuration at /Users/hayden.ness/meltano_test/.meltano/run/target-s3/target.1f468ed7-32fa-40af-9864-6c0383d7eaeb.config.json
2024-07-24T23:22:49.206355Z [error ] [Errno 32] Broken pipe
Traceback (most recent call last):
File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/logging/output_logger.py", line 208, in redirect_logging
yield
File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 488, in run
await self.run_with_job()
File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 520, in run_with_job
await self.execute()
File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 480, in execute
await manager.run()
File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 687, in run
await self._wait_for_process_completion(self.elb.head)
File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/block/extract_load.py", line 760, in _wait_for_process_completion
raise output_futures_failed.exception() # noqa: RSE102
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/logging/utils.py", line 230, in capture_subprocess_output
if not await _write_line_writer(writer, line):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hayden.ness/meltano_test/venv/lib/python3.11/site-packages/meltano/core/logging/utils.py", line 198, in _write_line_writer
await writer.wait_closed()
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/streams.py", line 364, in wait_closed
await self._protocol._get_close_waiter(self)
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/unix_events.py", line 717, in _write_ready
n = os.write(self._fileno, self._buffer)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
2024-07-24T23:22:49.313510Z [warning ] Loop <_UnixSelectorEventLoop running=False closed=True debug=False> that handles pid 68974 is closed
Hayden Ness
07/25/2024, 3:32 AMvisch
07/25/2024, 11:01 AMI am finding now, even with a single worker with only 5 tables selected (of about 1300 overall), it takes Meltano about 5 minutes to parse through the tap.properties.json (the visiting/skipping nodes etc)1. Are you sure discovery isn't running? Sounds more like discovery is running and it takes 5 minutes to run 2. I would not modify the meltano.yml for each run. Instead use env variables like https://docs.meltano.com/concepts/plugins/#select_filter-extra 3. I would not copy around the cache files. Let meltano handle that, if you want to pass a pre generated catalog file like https://docs.meltano.com/concepts/plugins#catalog-extra 4. Sharing your code would be more helpful, but I think the above really should get redone if you want this to work. Parallelization is something you need to think about very carefully, it's hard to do right and there's a lot of ways to skin this cat
visch
07/25/2024, 11:01 AMHayden Ness
07/25/2024, 10:27 PMvisch
07/26/2024, 1:02 AMHayden Ness
07/26/2024, 3:53 AMprocesses: List[subprocess.Popen] = []
stderr_files = []
stdout_files = []
for worker in all_workers:
command = f"meltano run{' --full-refresh' if full_refresh else ''} {worker} mappy target-s3"
stderr_file = open(f"{worker}_stderr.txt", "w")
stdout_file = open(f"{worker}_stdout.txt", "w")
stderr_files.append(stderr_file)
stdout_files.append(stdout_file)
print("starting command: " + command)
processes.append(subprocess.Popen(shlex.split(command), stdout=stdout_file, stderr=stderr_file))
global_processes = processes # Quick fix for killing everything on ctrl-c while developing
processes_done: List[subprocess.Popen] = [False] * len(processes)
while False in processes_done:
for i, process in enumerate(processes):
retcode = process.poll()
if retcode is not None:
if not processes_done[i]:
print(f"process {i} done.")
processes_done[i] = True
time.sleep(0.1)
As for generating the workers, I am currently inserting them in to meltano.yml, each with an independent subset of the full catalog select list. I'll try separate .yml files which I import shortly.
I did also find using catalog-extra to provide a substantial performance improvement so that's great!