I am having this issue where a certain pipeline sh...
# troubleshooting
g
I am having this issue where a certain pipeline shows as running, but has no logs and isn't doing anything. It shows it is running in the Meltano UI and in the Airflow UI. I have tried deleting the dag and pipeline and any logs associated, however, when I recreate the scheduled pipeline, both UI's just repopulate what was there before and continue to show it as running. I looked through the issue board and it doesn't seem like there is currently a way to cancel or end a pipelines run.
message has been deleted
n
I just ran into this and got around it by manually deleting the job record in the meltano DB. More context here: https://meltano.slack.com/archives/C01TCRBBJD7/p1620654893103500?thread_ts=1620221119.064500&cid=C01TCRBBJD7
d
Stale "running" jobs in Meltano (rows in the
job
table in the system database) should automatically be marked as "failed" after 5 minutes: https://gitlab.com/meltano/meltano/-/merge_requests/2000
If that's not happening, we may be looking at a bug! @gunnar Can you share the contents of the
job
table row for this stuck pipeline? That will help me figure out why it's not being detected as being stale.
g
It wasn't ending after 5 minutes (had been over a day), but @nick_hamlin’s solution seemed to fix the issue! Thank you! The only other thing I was curious about was if Meltano supports multiple pipelines / dags to be run at the same time. I assumed yes and that this would be more related to Airflow. However, I wanted to quickly ask just because I don't want to mess around with anything and break any integrations if it is not yet supported.
d
It wasn't ending after 5 minutes (had been over a day), but @nick_hamlin’s solution seemed to fix the issue!
I would've loved to see the
job
record before you deleted it so that I could figure out why it wasn't getting marked stale after 5 min 😄
The only other thing I was curious about was if Meltano supports multiple pipelines / dags to be run at the same time.
Yep, definitely, you can have multiple distinct pipelines running at the same time.
n
but they must have unique
job_id
values (this was part of my problem)
and if it’s helpful, I can confirm that the rows I deleted in my fix had been hanging around longer than 5m. if @gunnar isn’t able to, let me know and I bet I can reproduce it locally and get you the full entry
d
@nick_hamlin That'd be great, please do.
n
well, I may have spoken too soon - I’m following the same steps I was over the weekend, but (at least for now), nothing seems to be getting stuck anymore!
Not sure what changed (clearly something has), but I’ll save the entry for you if I’m able to get it to happen again
g
Ah sorry! I think I still have the printout in my console, I can check in a second. However, I think the issue was because of multiple changes I made to the project and re-creating pipelines with the same setup and naming convention.
Would running multiple pipelines best be done in Meltano UI (/Meltano CLI) or Airflow UI? I noticed that it seems like Airflow setup a queue for the DAGS which is why I am asking. (Some DAGS show up as either queued or scheduled) Far right (slightly brown) is scheduled third from right in grey is queued
I tried trigger via command line using "meltano invoke airflow dags trigger ___" I will let you know if it works. So far in airflow: externally trigger = true However, Meltano UI has not updated.
c
I'm running into the same issue using 1.76. Here's the list of running jobs from the
job
table:
Copy code
meltano=> select id, run_id, state, started_at, ended_at, trigger, last_heartbeat_at from job where state ='RUNNING';
 id |                run_id                |  state  |         started_at         | ended_at | trigger |     last_heartbeat_at      
----+--------------------------------------+---------+----------------------------+----------+---------+----------------------------
 21 | 6476cd38-f490-4df5-b3e8-e45f1ba324da | RUNNING | 2021-06-18 01:50:24.108557 |          | ui      | 2021-06-18 01:58:49.481304
 20 | 017c90f8-696e-4b6c-8c75-063c0b1e34dc | RUNNING | 2021-06-18 01:50:23.99533  |          | ui      | 2021-06-18 01:58:50.118105
 22 | b3b2d854-acaa-40e5-9888-7c3115126321 | RUNNING | 2021-06-18 01:50:24.231653 |          | ui      | 2021-06-18 01:58:50.224987
d
@casey If you run
meltano schedule list
, do you see anything logged about stale jobs being marked as such?
c
Here's the output:
Copy code
root@ledp-vm:/project# meltano schedule list
[@once] ip-hash: tap-postgres--telemetry → pipelinewise-target-bigquery--telemetry → transforms
[@once] kdp-el: tap-postgres--kdp → pipelinewise-target-bigquery--kdp → transforms
[@once] studio-el: tap-postgres--studio → pipelinewise-target-bigquery--studio → transforms
d
@casey OK, let’s debug this a little. Can you run
meltano repl
and then:
Copy code
from meltano.core.job.job_finder import JobFinder
JobFinder.all_stale()
c
Hmm, it can't find that module.
Copy code
root@ledp-vm:/project# meltano repl
Python 3.7.10 (default, May 12 2021, 16:05:48) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.24.1 -- An enhanced Interactive Python. Type '?' for help.

Booting import Meltano REPL


In [1]: from meltano.core.job.job_finder import JobFinder
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
<ipython-input-1-d78d2d414e8f> in <module>
----> 1 from meltano.core.job.job_finder import JobFinder

ModuleNotFoundError: No module named 'meltano.core.job.job_finder'
From the way the archive is packaged, it looks like it might be
meltano.core.job.finder
d
Ah yes, my bad
c
all_stale()
takes a
session
parameter... what should I provide it with?
d
That’s what I get for suggesting some code to run without testing it 🙂
There’s a local
session
variable you can use already
c
hahaha... no worries
it returns a reference to a SQLAlchemy Query object... sorry, my SQLAlchemy knowledge is weak, so I'm not sure what to do with this
Here's the pretty-printed form:
Copy code
SELECT job.id AS job_id_1, job.job_id AS job_job_id, job.run_id AS job_run_id, job.state AS job_state, job.started_at AS job_started_at, job.last_heartbeat_at AS job_last_heartbeat_at, job.ended_at AS job_ended_at, job.payload AS job_payload, job.payload_flags AS job_payload_flags, job."trigger" AS job_trigger 
FROM job 
WHERE job.state = ? AND (job.last_heartbeat_at IS NOT NULL AND job.last_heartbeat_at < ? OR job.last_heartbeat_at IS NULL AND job.started_at < ?)
d
Ok, that’s what I’d expect. Let’s see if that query returns any rows:
list(JobFinder.all_stale(session))
c
Nope...
Copy code
In [10]: list(JobFinder.all_stale(session))
Out[10]: []

In [11]:
d
All right, now that is odd!
@casey Can you run the query directly against the DB?
Copy code
SELECT job.id AS job_id_1, job.job_id AS job_job_id, job.run_id AS job_run_id, job.state AS job_state, job.started_at AS job_started_at, job.last_heartbeat_at AS job_last_heartbeat_at, job.ended_at AS job_ended_at, job.payload AS job_payload, job.payload_flags AS job_payload_flags, job."trigger" AS job_trigger 
FROM job 
WHERE job.state = "RUNNING" AND (job.last_heartbeat_at IS NOT NULL AND job.last_heartbeat_at < "2021-06-20" OR job.last_heartbeat_at IS NULL AND job.started_at < "2021-06-20")
c
for reasons I don't understand, Slack won't let me send the results in a message. Let me try and put them in a snippet.
Sorry about the formatting
d
Hmm, so now it can find the stale jobs, but it couldn’t from
JobFinder
The only difference I see is that I hard-coded the dates instead of using Python to determine those
In
meltano repo
, can you run:
Copy code
from datetime import datetime, timedelta
now = datetime.utcnow()
last_valid_heartbeat_at = now - timedelta(minutes=HEARTBEAT_VALID_MINUTES)
last_valid_started_at = now - timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS)
c
do I need to import a module that declares/defines
HEARTBEAT_VALID_MINUTES
?
nm, I think it's in
job.py
Copy code
In [15]: print(last_valid_heartbeat_at)
2021-06-21 16:50:07.654660

In [16]: print(last_valid_started_at)
2021-06-20 16:55:07.654660
the heartbeat value differs substantially from what's in the DB
d
If you enter those values into the query in https://meltano.slack.com/archives/C01TCRBBJD7/p1624292529338800?thread_ts=1620661818.107900&amp;cid=C01TCRBBJD7, do you get 0 records or more?
c
This query:
Copy code
SELECT job.id AS job_id_1, job.job_id AS job_job_id, job.run_id AS job_run_id, job.state AS job_state, job.started_at AS job_started_at, job.last_heartbeat_at AS job_last_heartbeat_at, job.ended_at AS job_ended_at, job.payload AS job_payload, job.payload_flags AS job_payload_flags, job.trigger AS job_trigger FROM job WHERE job.state = 'RUNNING' AND (job.last_heartbeat_at IS NOT NULL AND job.last_heartbeat_at < '2021-06-21 16:50:07.654660' OR job.last_heartbeat_at IS NULL AND job.started_at < '2021-06-20 16:55:07.654660')
returns 3 records
d
Ok. But when run from Python, SQLAlchemy found 0 😕
Can you please file an issue fo this with everything we’ve found so far? This’ll require some deeper debugging
c
Sure
I hope I described/characterized things correctly
d
@casey Thanks a lot, I’ve called in @aaronsteers to help debug this. For debug purposes, it would help if you left the DB in the current (broken) state, but to solve the issue you can manually change the state on these jobs from
RUNNING
to
FAIL
.
c
I'm sorry man. I just deleted the jobs entirely. I did, however, run a sql export of the database before doing so. I'll attach it to the ticket.
d
No worries, thanks!