Sometimes my ELT jobs just hang during ingestion. ...
# troubleshooting
d
Sometimes my ELT jobs just hang during ingestion. I'm using meltanolabs-tap-postgres => ticketswap-target-redshift...
Logs from my ELT run:
Copy code
9
2024-09-09T21:55:59.825+10:00
2024-09-09 11:55:59,824 | INFO | tap-postgres | Skipping deselected stream 'public-secure_collection_item_files'.
10
2024-09-09T21:55:59.825+10:00
2024-09-09 11:55:59,825 | INFO | tap-postgres | Skipping deselected stream 'public-secure_collection_items'.
11
2024-09-09T21:55:59.825+10:00
2024-09-09 11:55:59,825 | INFO | tap-postgres | Skipping deselected stream 'public-secure_collection_permissions'.
12
2024-09-09T21:55:59.824+10:00
2024-09-09 11:55:59,823 | INFO | singer_sdk.metrics | METRIC: {"type": "timer", "metric": "sync_duration", "value": 0.22330546379089355, "tags": {"stream": "public-practitioners", "context": {}, "status": "succeeded"}}
13
2024-09-09T21:55:59.824+10:00
2024-09-09 11:55:59,824 | INFO | singer_sdk.metrics | METRIC: {"type": "counter", "metric": "record_count", "value": 2890, "tags": {"stream": "public-practitioners", "context": {}}}
14
2024-09-09T21:55:59.824+10:00
2024-09-09 11:55:59,824 | INFO | tap-postgres | Skipping deselected stream 'public-practitioners_services'.
15
2024-09-09T21:55:59.824+10:00
2024-09-09 11:55:59,824 | INFO | tap-postgres | Skipping deselected stream 'public-practitioners_unavailabilities'.
16
2024-09-09T21:55:59.817+10:00
2024-09-09 11:55:59,816 | INFO | target-redshift | Target 'target-redshift' is listening for input from tap.
17
2024-09-09T21:55:59.817+10:00
2024-09-09 11:55:59,816 | INFO | target-redshift | Initializing 'target-redshift' target sink...
18
2024-09-09T21:55:59.817+10:00
2024-09-09 11:55:59,817 | INFO | target-redshift.public-forms | Initializing target sink for stream 'public-forms'...
19
2024-09-09T21:55:59.601+10:00
2024-09-09 11:55:59,600 | INFO | tap-postgres.public-practitioners | Beginning full_table sync of 'public-practitioners'...
20
2024-09-09T21:55:59.601+10:00
2024-09-09 11:55:59,600 | INFO | tap-postgres.public-practitioners | Tap has custom mapper. Using 1 provided map(s).
21
2024-09-09T21:55:59.600+10:00
2024-09-09 11:55:59,599 | INFO | tap-postgres | Skipping deselected stream 'public-forms_access'.
22
2024-09-09T21:55:59.600+10:00
2024-09-09 11:55:59,599 | INFO | tap-postgres | Skipping deselected stream 'public-instances'.
23
2024-09-09T21:55:59.600+10:00
2024-09-09 11:55:59,599 | INFO | tap-postgres | Skipping deselected stream 'public-keys'.
24
2024-09-09T21:55:59.600+10:00
2024-09-09 11:55:59,599 | INFO | tap-postgres | Skipping deselected stream 'public-knex_migrations'.
25
2024-09-09T21:55:59.600+10:00
2024-09-09 11:55:59,600 | INFO | tap-postgres | Skipping deselected stream 'public-knex_migrations_lock'.
26
2024-09-09T21:55:59.600+10:00
2024-09-09 11:55:59,600 | INFO | tap-postgres | Skipping deselected stream 'public-notifications'.
27
2024-09-09T21:55:59.599+10:00
2024-09-09 11:55:59,599 | INFO | singer_sdk.metrics | METRIC: {"type": "timer", "metric": "sync_duration", "value": 0.13203024864196777, "tags": {"stream": "public-forms", "context": {}, "status": "succeeded"}}
28
2024-09-09T21:55:59.599+10:00
2024-09-09 11:55:59,599 | INFO | singer_sdk.metrics | METRIC: {"type": "counter", "metric": "record_count", "value": 1, "tags": {"stream": "public-forms", "context": {}}}
29
2024-09-09T21:55:59.467+10:00
2024-09-09 11:55:59,466 | INFO | tap-postgres | Skipping deselected stream 'public-form_requests'.
30
2024-09-09T21:55:59.467+10:00
2024-09-09 11:55:59,466 | INFO | tap-postgres.public-forms | Beginning incremental sync of 'public-forms'...
31
2024-09-09T21:55:59.467+10:00
2024-09-09 11:55:59,466 | INFO | tap-postgres.public-forms | Tap has custom mapper. Using 1 provided map(s).
32
2024-09-09T21:55:59.466+10:00
2024-09-09 11:55:59,465 | INFO | tap-postgres | Skipping deselected stream 'public-audit_collection_events'.
33
2024-09-09T21:55:59.466+10:00
2024-09-09 11:55:59,466 | INFO | tap-postgres | Skipping deselected stream 'public-bookings'.
34
2024-09-09T21:55:59.466+10:00
2024-09-09 11:55:59,466 | INFO | tap-postgres | Skipping deselected stream 'public-communications'.
35
2024-09-09T21:55:59.466+10:00
2024-09-09 11:55:59,466 | INFO | tap-postgres | Skipping deselected stream 'public-form_request_forms'.
36
2024-09-09T21:55:55.941+10:00
Environment 'au' is active
37
2024-09-09T21:55:54.791+10:00
running tap [tap-agnodice-v3] with mappers: [mapper-agnodice] to target [target-warehouse-redshift]
38
2024-09-09T21:55:54.791+10:00
Executing: pipenv run .meltano/run/bin --environment=au run --no-install tap-agnodice-v3 mapper-agnodice target-warehouse-redshift
39
2024-09-09T21:55:53.501+10:00
> meltano-pipeline@1.0.0 el
40
2024-09-09T21:55:53.501+10:00
> ts-node ./scripts/extractLoad.ts
41
2024-09-09T21:55:53.298+10:00
> meltano-pipeline@1.0.0 elt
That's from newest message to oldest.
It just sits there until the job times out (1 hr timeout on AWS Batch).
It smells like it can't connect to a db and is just sitting there waiting. But I'm not sure. It's intermittent. Any ideas?
v
I can't understand the logs to help you, there's some orchestration script running that's doing things that aren't standard meltano stuff. If a tap /target was hanging when I ran
meltano run tap-postgres target-csv
what I would do is enable debugging logs to see if I can find any clues, to get to the bottom most quickly, I"d try to get the stack trace from whatever is calling the application. Most of the time when I see these things with folks with their custom orchestration scripts they do something wrong with process handling and end up having a full buffer from stdout or something OS-related they missed when implementing the subprocess flow. Complete guess but that's the most common thing I've seen.
d
The orchestration isn't anything crazy. It's just a Typescript file that's running:
Copy code
pipenv run .meltano/run/bin --environment=au run --no-install tap-agnodice-v3 mapper-agnodice target-warehouse-redshift
I'll check the buffer handling. Thanks.
v
pipenv run .meltano/run/bin --environment=au run --no-install tap-agnodice-v3 mapper-agnodice target-warehouse-redshift
isn't
meltanolabs-tap-postgres => ticketswap-target-redshift
which is one part of my multiple layers of confusion
d
We've got some tap/target inheritance going on there.
v
Can you share your meltano.yml and then isolate the run command to get this to happen on its own?
d
🤔 Isolating it will be challenging, and it's intermittent. That may take some time.
v
Debugging is a key part of orchestration 🤷
Hope for the best, I haven't seen anything like this myself and I run a bunch of tap-postgres pipelines in production. But different configurations so it's very possible you've hit an issue with the tap or the target but it's hard to say
d
Thanks
enable debugging logs to see if I can find any clues
Not sure exactly what you mean by this. Is this like setting a verbose debug level in meltano or the tap?
h
@dylan_just invoke meltano with log level set to debug, i.e
Copy code
pipenv run .meltano/run/bin --log-level=debug --environment=au ...
2
d
Thanks
I'm ruuning with debug on and it's freezing here:
Copy code
1
2024-09-10T09:56:27.282+10:00
head producer completed first as expected
2
2024-09-10T09:56:27.211+10:00
head producer completed first as expected
3
2024-09-10T09:56:27.211+10:00
waiting for process completion or exception
4
2024-09-10T09:56:27.185+10:00
{"type":"STATE","value":{"bookmarks":{"public-forms":{"replication_key":"updated_at","replication_key_value":"2024-09-05T04:53:09.535000+00:00"},"public-practitioners":{}}}}
h
the last log line might indicate that data has been loaded, and meltano is trying to persist the new state, when it hangs. how are you persisting the state (db, vs local/network file system vs object-store (s3, etc.))?
the issue may not be with any plugin, but rather with either meltano or connectivity to the state-store.
👍 1
d
Persisting the state to a Postgres db.
h
gotcha, and which version of meltano are you using? Would you be willing to upgrade your meltano version to the latest (not the plugin, just the meltano version). If so, then maybe we can insert some debug messages in the state persistence code path and try to isolate the issue. Since you mention this as an intermittent problem, my hunch is that meltano is trying to connect to the database and failing for some reason. Where is the database hosted (rds vs aurora vs another cloud)? and how is meltano being run when these errors are encountered?
d
It's hosted on rds postgres. Meltano 3.5.0. Happy to upgrade.
h
Here's another line of thought: Is it possible that multiple batch jobs are running simultaneously, and are taking locks on the postgresql state database. These locks are then preventing some of the jobs from completing
d
Only one job is running at the one time. This is enforced by our Batch setup, and we've observed it, too. We have found that if we forcibly terminate a Meltano job, it can retain a lock. However, on the next run, we get an error, and can clear it by sending --force. So, I don't think that's what's happening here.
h
Interesting. IIUC, meltano will read the state at the start of the job, and write the state at its conclusion. Since only 1 job is running at a time, the lock could only have been taken while reading the state. That seems implausible as well. Anyway, if you think this thread (db-locks) is worth chasing down, maybe the next time meltano hangs, you can query the postgresql db for locks. I'll spend some time in the codebase and add debug logs to hopefully help us understand what's going on better.
d
Is there a connection timeout setting I can tweak for connecting to the Meltano state db? I don't know the Python ecosystem, but a popular Postgres library in node.js defaults to "no timeout", which would cause a connection hang.
Maybe on the connection URL?
h
this seems to suggest passing in a
connect_args
. i'm not sure you can specify the timeout in the url. https://stackoverflow.com/questions/35640726/how-to-set-connection-timeout-in-sqlalchemy
v
I have seen this come up 3-4 times where folks really want step by step instructions for how to debug a running meltano process. Step by step for how to pull traces from a running python application would be nice for you here. It's definitely doable, you need to look at the 3 processes (meltano, tap process, target process) and see which one's waiting and what they are waiting on. I've had to do it a few times for folks, but haven't taken the extra steps to write up generalized instructions
d
Yeah, something like that would be fantastic.
c
regarding
connect_timeout
you should be able to pass it in via the URL something like this:
Copy code
<postgresql+psycopg://url/database?connect_timeout=10>
where 10 represents 10 seconds. I use this to set my
application_name
so things can be identified
🙏 2
e
I can't find any documentation on
connect_timeout
, neither in the sqlalchemy docs nor in the postgres docs so I wonder if it actually works.
c
https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS this is libpq specific, which is how PostgreSQL client (psql and friends) and other libraries connect to PostgreSQL
ty 1
it looks like SQLAlchemy also uses libpq
e
Ah, gotcha!
h
nice, TIL 🙂
c
there are two ways SQLAlchemy can initialize its connection so I don’t know which Meltano is using; one of them is very declarative where a number of independent parameters are passed to a function (hostname, port, dbname, username, etc), and another is
create_engine(str, kwargs)
where a connection string is passed wholesale
so I have been passing mine via a
.env
and by retrieving SSM parameters in an Airflow plugin as a single string like this:
Copy code
MELTANO_DATABASE_URI=postgresql+psycopg://$DATA_WAREHOUSE_DB_USER:$DATA_WAREHOUSE_DB_PASSWORD@$DATA_WAREHOUSE_DB_HOST:5432/some_database_here?options=-csearch_path%3Ddata_warehouse_state
and this has worked just fine, I definitely have state in production and its been running for a few months 🙂 I use
application_name
in some of the other connection strings in tap-postgres but that’s a different use case entirely
connect_timeout
is not an option, which just means it’s not part of the
options=-c…
stuff
I will disconnect from the VPN and try right now in fact
okay I did this:
Copy code
MELTANO_DATABASE_URI="postgresql+psycopg://$DATA_WAREHOUSE_DB_USER:$DATA_WAREHOUSE_DB_PASSWORD@$DATA_WAREHOUSE_DB_HOST:5432/some_database_here?connect_timeout=1&options=-csearch_path%3Ddata_warehouse_state"
(note quotes around the variable value because I added a
&
) and disconnected from the VPN so I could not resolve my staging data warehouse via host name, and then ran a tap->target and immediately got:
Copy code
2024-09-12T19:11:36.571551Z [info     ] Environment 'stg' is active
2024-09-12T19:11:36.735105Z [info     ] DB connection failed. Will retry after 5s. Attempt 1/3
2024-09-12T19:11:41.751013Z [info     ] DB connection failed. Will retry after 5s. Attempt 2/3
2024-09-12T19:11:46.764098Z [info     ] DB connection failed. Will retry after 5s. Attempt 3/3
then I changed it to
connect_timeout=10
and verified the connection attempt time was more or less 10 seconds between each of the 5s retries. Then I connected to the VPN and ran the tap->target successfully. So, at least via the
MELTANO_DATABASE_URI
this connection string parameter option does work.
apparently you can just
postgresql+psycopg2://
as your connection string and then you can do everything via the
PG_
variables… if that was a thing you wanted to do
💡 1