Is there something that would prevent me from runn...
# singer-taps
d
Is there something that would prevent me from running multiple instances of meltano in parallel? When I do, I'm getting this error, which does not happen when running a single instance at a time. Example commands (env variable set per instance TAP_MSSQL__SELECT to filter stream)
Copy code
meltano run tap-mssql target-snowflake --state-id-suffix dbo-Test1
meltano run tap-mssql target-snowflake --state-id-suffix dbo-Test2
Error:
Copy code
msgspec.DecodeError: JSON is malformed: trailing characters (byte 10964751) cmd_type=elb consumer=False job_name=default:tap-mssql-to-target-snowflake:dbo-Test2 name=tap-mssql producer=True run_id=85d87b05-afd5-4ff0-9df4-20ca4e585a40 stdio=stderr string_id=tap-mssql
v
Generally nothing, but there are some race conditions where your
catalog.json
file could be written and deleted at the same time another one is being generated. Would need to see the stack trace here to have a better guess, it'd also be helpful to see how you're orchestrating it.
d
What is catalog.json and can it be a custom path? Using dagster to orchestrate, just selecting multiple assets and running. Each asset just runs that command. Here is the stack trace:
Copy code
time=2024-09-11 17:34:08 name=singer level=CRITICAL message=JSON is malformed: trailing characters (byte 10964751) 
Traceback (most recent call last):
File "/opt/dagster/app/src/elt_projects/meltano/.meltano/extractors/tap-mssql/venv/bin/tap-mssql", line 8, in <module> 
sys.exit(main())
^^^^^^
File "/opt/dagster/app/src/elt_projects/meltano/.meltano/extractors/tap-mssql/venv/lib/python3.12/site-packages/tap_mssql/__init__.py", line 805, in main
raise exc
File "/opt/dagster/app/src/elt_projects/meltano/.meltano/extractors/tap-mssql/venv/lib/python3.12/site-packages/tap_mssql/__init__.py", line 802, in main
main_impl()
File "/opt/dagster/app/src/elt_projects/meltano/.meltano/extractors/tap-mssql/venv/lib/python3.12/site-packages/tap_mssql/__init__.py", line 780, in main_impl
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/dagster/app/src/elt_projects/meltano/.meltano/extractors/tap-mssql/venv/lib/python3.12/site-packages/singer/utils.py", line 183, in parse_args
args.catalog = Catalog.load(args.catalog)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/dagster/app/src/elt_projects/meltano/.meltano/extractors/tap-mssql/venv/lib/python3.12/site-packages/singer/catalog.py", line 118, in load
return Catalog.from_dict(msgspec.json.decode(fp.read()))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
msgspec.DecodeError: JSON is malformed: trailing characters (byte 10964751)
I'm running using Popen
v
Each asset just runs that command
It's nuanced to do this stuff properly 😕 Now that I undersatnd better what you're doing I'd recommend a Docker Container Since you're running with Dagster why not use a docker container for each individual run, you wouldn't hit this issue and you could scale up much easier To do this on one machine I'd either 1. Have each run be in its own directory (you're rebuiling parts of docker so probably easier to do docker) 2. Use inherits_from and only run one "tap-mssql" at a time so you'd have tap-mssql-1, tap-mssql-2 , etc etc for the number that you want to run in parallal Originally I made this https://gitlab.com/meltano/meltano/-/merge_requests/2247 so this was feasible.
catalog.json
might not be a bad one to add a uuid for so we wouldn't have to hop through as many hoops but it's harder than the config.json uuid implementation
Other way is to just give a catalog.json file so meltano doesn't generate a new one every run
d
We already run this using docker container, but the problem is materializing multiple tables, each with its own
meltano run
, within a single dagster run. How would you generate a catalog.json and reuse it? I thought it already does that behind the scenes unless --refresh-catalog is passed.
Inherits could also potentially work, just that it doesn't feel like a clean solution
When I say run in this context, I'm referring to a dagster run. Were you talking about meltano run?
e
I see a
msgspec.DecodeError
, so maybe @BuzzCutNorman knows what's going on?
b
@Edgar Ramírez (Arch.dev) I have no idea 😅. This is realit-singer-python from pipelinewise. Looking at the line maybe it needs to do a
mode="rb"
so it reads directly as a binary string into the decoder 🤷.
😅 1
d
Where is this catalog located on disk?
v
I thought it already does that behind the scenes unless --refresh-catalog is passed.
It does, but if you're in an ephemeral container it has to generate one, and if you run them both at the same time you can have issues (as you're having)
How would you generate a catalog.json and reuse it?
meltano invoke --dump=catalog tap-name > catalog.json
When I say run in this context, I'm referring to a dagster run. Were you talking about meltano run?
I'm talking about running two meltano run processes on the same machine
Where is this catalog located on disk?
Run meltano in debug mode and step through this stuff it shows everything
d
How does that generated catalog then get used by
meltano run
?
v
d
I think I'll have to set up a more localized proof of concept meltano project.. My main one has so many streams debug takes forever since it prints all streams even if you don't have them selected
v
Or have each meltano run be in a seperate container
d
Ah, I was looking at run docs instead of extras
Apparently running nested containers is not recommended practice
v
🤷 I don't think so DIND is a thing
d
Hmm, I'll give the catalog option a try first before going too deep in this rabbit hole
v
You're already pretty deep 🙂
😅 1
Good luck!
d
Thanks. I'm surprised more people haven't been having problems with this. I didn't think this would be an uncommon scenario
I think the solution for now is to limit the concurrency and maybe work towards being able to spin up containers or have some kind of k8 setup. Even if the catalog works, there's still state being managed by the tap that doesn't really extend itself to running simultaneously.
Copy code
Cannot start plugin tap-mssql: [WinError 32] The process cannot access the file because it is being used by another process: '.meltano\\run\\tap-mssql\\tap.properties.json'
I think what would need to happen to support this out of the box is to have the uuid provided by --run-id to be propagated and a separate folder for that uuid to be used instead. The tap.config.json seems to take it into account and have it as part of its file name, but there's nothing that enforces separation. The catalog dump is also not fool-proof, as it can still enter a race condition somehow when generating the catalogs and end up being invalid. Here's an example,
default":
is repeated.
Copy code
"metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected-by-default": default": false,
v
> Thanks. I'm surprised more people haven't been having problems with this. I didn't think this would be an uncommon scenario It's certainly not on the "happy path" for Meltano. A good number of folks do it though and it works great. > Cannot start plugin tap-mssql: [WinError 32] The process cannot access the file because it is being used by another process: '.meltano\\run\\tap-mssql\\tap.properties.json'
Copy code
2024-09-12T12:47:15.485745Z [debug    ] Found plugin parent            parent=tap-smoke-test plugin=tap-smoke-test source=LOCKFILE
2024-09-12T12:47:15.563708Z [debug    ] Skipped installing extractor 'tap-smoke-test'
2024-09-12T12:47:15.563933Z [debug    ] Skipped installing 1/1 plugins
2024-09-12T12:47:15.713970Z [debug    ] Created configuration at /home/visch/git/meltano-projects/getting_started/.meltano/run/tap-smoke-test/tap.ceb61f67-c362-4a72-86b7-787eef00aa2a.config.json
2024-09-12T12:47:15.714332Z [debug    ] Could not find tap.properties.json in /home/visch/git/meltano-projects/getting_started/.meltano/extractors/tap-smoke-test/tap.properties.json, skipping.
2024-09-12T12:47:15.714573Z [debug    ] Could not find tap.properties.cache_key in /home/visch/git/meltano-projects/getting_started/.meltano/extractors/tap-smoke-test/tap.properties.cache_key, skipping.
2024-09-12T12:47:15.714807Z [debug    ] Could not find state.json in /home/visch/git/meltano-projects/getting_started/.meltano/extractors/tap-smoke-test/state.json, skipping.
2024-09-12T12:47:15.715151Z [debug    ] Cached catalog is outdated, running discovery...
2024-09-12T12:47:15.715464Z [info     ] Found catalog in /home/visch/git/meltano-projects/getting_started/catalog.json
2024-09-12T12:47:15.717292Z [debug    ] Invoking: ['/home/visch/git/meltano-projects/getting_started/.meltano/extractors/tap-smoke-test/venv/bin/tap-smoke-test', '--config', '/home/visch/git/meltano-projects/getting_started/.meltano/run/tap-smoke-test/tap.ceb61f67-c362-4a72-86b7-787eef00aa2a.config.json', '--catalog', '/home/visch/git/meltano-projects/getting_started/.meltano/run/tap-smoke-test/tap.properties.json']
I assumed for the last few years that putting
catalog
in as an extra meant that meltano wouldn't make a
tap.properties.json
file and it'd just point catalog directly to the file provided. This seems like a decently easy change for Meltano, I put in an issue for that here https://github.com/meltano/meltano/issues/8763 > The catalog dump is also not fool-proof, as it can still enter a race condition somehow when generating the catalogs and end up being invalid. Here's an example,
default":
is repeated. I haven't actually attempted to implement all of this. That sounds really bad these things must be getting called at precisely the same time (It's still a bug in my opinion, we should get an issue in with replication steps if possible). I haven't had this exact issue, but I"ll share what I have done and it may help You didn't mention that you are on Windows as well. That changes things a bit as I actually have the code to do all of this. Could you call this before your tap/target runs? This is a bit of a cludge but it handles a few things for you. 1. You can get rid of the extra for refreshing catalogs as this does this for you (this way is wonky for that) 2. This serves as a way to get around parallelization issues by having the tap back off for a minute if it runs at the same time as something else There's better ways to do this now as refreshing catalogs now has an extra, but I'm sure you can run with this!
Copy code
def remove_cache(name_of_tap_or_target):
    logging.info(f"Removing cache for {name_of_tap_or_target} .")
    dir = f"..\\meltano\\.meltano\\run\\{name_of_tap_or_target}"
    #Retry this 5 times before failing, other jobs may need the catalog file
    max_retries = 5
    time_between_retries = 60 #Seconds
    try_number = 0
    complete = False
    while (complete != True):
        try_number = try_number + 1
        try: 
            filelist = glob.glob(os.path.join(dir, "tap.properties.*"))
            for f in filelist:
                os.remove(f)
            complete = True
        except Exception as e:
            if (try_number > max_retries):
                logging.info(f"Tried {try_number} times. Throwing exception, stoping.")
                raise e #Throw exception
            logging.info(f"Caught exception trying again, waiting {time_between_retries} seconds. Try number: {try_number}. Exception was: {e}. ")
            time.sleep(time_between_retries)
            continue
Other pieces that are nice and I've implemented with runners for this scenario that may spur some ideas 1.
#fail_if_no_streams_selected
If no streams are selected as you fat fingered something Meltano doesn't fail. 2. "Single Stream Interface" basically an input that accepts tap-name, target-name, dbt-model and runs for a single model at a time which gives you an interface to do what you're after here
The other big difference in our implementation is that you're using
run
(as you should be) and I am using
meltano invoke
and piping data myself but you absolutely don't want to go down this path it came from a bunch of Windows issues before we worked through and made Meltano work with Windows
d
Hey, appreciate the details, will check it out. Reason I didn't mention it is because the same thing happens on ubuntu, which is what our deployed application is running on. But it is possible I missed some iterations of testing on both. For now, though, I do want the solution to work on Windows as well while we work on getting other team members transitioned to WSL. I'm thinking that what I will end up with, though, is spinning up docker for each step on deployed instances, and just go with linear execution for local dev. This seems like a reasonable compromise that doesn't get into all the potential issues when we stray off the beaten path. I think there are some other details if we really get into it. For example, what I've noticed was that I had to dump the catalog with my
TAP_MSSQL__SELECT
set already. If not, the extractor, when passed with both
TAP_MSSQL__SELECT
and
TAP_MSSQL__CATALOG
, it seems to do more than just the select. I suppose this is because the catalog sets the metadata:selected for the streams in meltano.yml and prioritizes that over any additional parameters. But that isn't really a big deal since if you know, then it's fine. Is the run folder supposed to be completely bypassed if a catalog is provided, though? I noticed it also had some state file that indicated what was in progress. Would that not be created regardless of whether or not the catalog extra is used? It seems like the meltano run always has a uuid associated with it, regardless if
--run-id
is used or not. Is it at all feasible to isolate a different run folder per uuid, or would that be major surgery? Not sure how the plugins determine which folder to use.
âž• 1
v
Is the run folder supposed to be completely bypassed if a catalog is provided, though? I noticed it also had some state file that indicated what was in progress. Would that not be created regardless of whether or not the catalog extra is used?
It seems like the meltano run always has a uuid associated with it, regardless if
--run-id
is used or not. Is it at all feasible to isolate a different run folder per uuid, or would that be major surgery? Not sure how the plugins determine which folder to use.
I'm not sure, I'd run
meltano --log-level=debug
to know for sure, or dive into the code base
For example, what I've noticed was that I had to dump the catalog with my
TAP_MSSQL__SELECT
set already. If not, the extractor, when passed with both
TAP_MSSQL__SELECT
and
TAP_MSSQL__CATALOG
Yeah, when I said generate the catalog I was assuming you'd generate it with the select already set, but now that we know https://github.com/meltano/meltano/issues/8763 is an issue that approach doesn't work anymore so the other approach is the way to go
e
Gonna take a look at that!
dancingpenguin 2
d
Thanks, I think having separate folders per run-id might be needed. Even with the attempt to remove cache files prior to each run, it doesn't work reliably since multiple executions are kicked off at once, causing them to all succeed in the cache removal check, only to run meltano and end up with the same issue. I think it can be staggered by waiting a random amount of time, but the downside is that there still is no guarantee. If the run-id isolation is feasible, that would be best, but in the meantime, will just go for the compromise of sequential local runs and figuring out docker/k8 once deployed. Our current infra has a docker for the dagster run, but not the individual meltano runs that are part of the dagster run.
e
I’m encountering a similar issue as @Daniel Luo, where I'm trying to run multiple
meltano run
commands concurrently, but using a Python thread pool on the same machine. Specifically, I’m seeing errors related to catalog.json, and in addition, it seems that the loader (I'm using
target-parquet
) only outputs data from the last thread, rather than from all the threads. When run sequentially, everything works fine. However, issues arise with concurrent execution on the same machine. Based on the conversation above, I wonder if the best course of action from here is to run every
meltano run
as Docker container, and have an orchestration service that runs docker containers concurrently and in isolation?
v
Even with the attempt to remove cache files prior to each run, it doesn't work reliably since multiple executions are kicked off at once, causing them to all succeed in the cache removal check, only to run meltano and end up with the same issue
I can't believe I haven't hit this
I wonder if the best course of action from here is to run every
meltano run
as Docker container, and have an orchestration service that runs docker containers concurrently and in isolation?
Would definitely work, that's what I do as much as I can in production. But I don't have this issue for some reason I had more issues with running dbt at the same time as other executions honestly