gunnar
05/10/2021, 3:50 PMgunnar
05/10/2021, 3:52 PMnick_hamlin
05/10/2021, 3:57 PMdouwe_maan
05/10/2021, 4:11 PMjob table in the system database) should automatically be marked as "failed" after 5 minutes: https://gitlab.com/meltano/meltano/-/merge_requests/2000douwe_maan
05/10/2021, 4:12 PMjob table row for this stuck pipeline? That will help me figure out why it's not being detected as being stale.gunnar
05/10/2021, 4:45 PMdouwe_maan
05/10/2021, 4:45 PMIt wasn't ending after 5 minutes (had been over a day), but @nick_hamlin’s solution seemed to fix the issue!I would've loved to see the
job record before you deleted it so that I could figure out why it wasn't getting marked stale after 5 min 😄douwe_maan
05/10/2021, 4:46 PMThe only other thing I was curious about was if Meltano supports multiple pipelines / dags to be run at the same time.Yep, definitely, you can have multiple distinct pipelines running at the same time.
nick_hamlin
05/10/2021, 4:46 PMjob_id values (this was part of my problem)nick_hamlin
05/10/2021, 4:48 PMdouwe_maan
05/10/2021, 4:48 PMnick_hamlin
05/10/2021, 5:01 PMnick_hamlin
05/10/2021, 5:02 PMgunnar
05/10/2021, 5:14 PMgunnar
05/10/2021, 5:17 PMgunnar
05/10/2021, 5:48 PMcasey
06/21/2021, 3:44 PMjob table:
meltano=> select id, run_id, state, started_at, ended_at, trigger, last_heartbeat_at from job where state ='RUNNING';
id | run_id | state | started_at | ended_at | trigger | last_heartbeat_at
----+--------------------------------------+---------+----------------------------+----------+---------+----------------------------
21 | 6476cd38-f490-4df5-b3e8-e45f1ba324da | RUNNING | 2021-06-18 01:50:24.108557 | | ui | 2021-06-18 01:58:49.481304
20 | 017c90f8-696e-4b6c-8c75-063c0b1e34dc | RUNNING | 2021-06-18 01:50:23.99533 | | ui | 2021-06-18 01:58:50.118105
22 | b3b2d854-acaa-40e5-9888-7c3115126321 | RUNNING | 2021-06-18 01:50:24.231653 | | ui | 2021-06-18 01:58:50.224987douwe_maan
06/21/2021, 3:49 PMmeltano schedule list, do you see anything logged about stale jobs being marked as such?casey
06/21/2021, 3:50 PMroot@ledp-vm:/project# meltano schedule list
[@once] ip-hash: tap-postgres--telemetry → pipelinewise-target-bigquery--telemetry → transforms
[@once] kdp-el: tap-postgres--kdp → pipelinewise-target-bigquery--kdp → transforms
[@once] studio-el: tap-postgres--studio → pipelinewise-target-bigquery--studio → transformsdouwe_maan
06/21/2021, 3:53 PMmeltano repl and then:
from meltano.core.job.job_finder import JobFinder
JobFinder.all_stale()casey
06/21/2021, 3:54 PMroot@ledp-vm:/project# meltano repl
Python 3.7.10 (default, May 12 2021, 16:05:48)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.24.1 -- An enhanced Interactive Python. Type '?' for help.
Booting import Meltano REPL
In [1]: from meltano.core.job.job_finder import JobFinder
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
<ipython-input-1-d78d2d414e8f> in <module>
----> 1 from meltano.core.job.job_finder import JobFinder
ModuleNotFoundError: No module named 'meltano.core.job.job_finder'casey
06/21/2021, 3:57 PMmeltano.core.job.finderdouwe_maan
06/21/2021, 3:58 PMcasey
06/21/2021, 4:00 PMall_stale() takes a session parameter... what should I provide it with?douwe_maan
06/21/2021, 4:00 PMdouwe_maan
06/21/2021, 4:00 PMsession variable you can use alreadycasey
06/21/2021, 4:00 PMcasey
06/21/2021, 4:03 PMcasey
06/21/2021, 4:04 PMSELECT job.id AS job_id_1, job.job_id AS job_job_id, job.run_id AS job_run_id, job.state AS job_state, job.started_at AS job_started_at, job.last_heartbeat_at AS job_last_heartbeat_at, job.ended_at AS job_ended_at, job.payload AS job_payload, job.payload_flags AS job_payload_flags, job."trigger" AS job_trigger
FROM job
WHERE job.state = ? AND (job.last_heartbeat_at IS NOT NULL AND job.last_heartbeat_at < ? OR job.last_heartbeat_at IS NULL AND job.started_at < ?)douwe_maan
06/21/2021, 4:05 PMlist(JobFinder.all_stale(session))casey
06/21/2021, 4:07 PMIn [10]: list(JobFinder.all_stale(session))
Out[10]: []
In [11]:douwe_maan
06/21/2021, 4:20 PMdouwe_maan
06/21/2021, 4:22 PMSELECT job.id AS job_id_1, job.job_id AS job_job_id, job.run_id AS job_run_id, job.state AS job_state, job.started_at AS job_started_at, job.last_heartbeat_at AS job_last_heartbeat_at, job.ended_at AS job_ended_at, job.payload AS job_payload, job.payload_flags AS job_payload_flags, job."trigger" AS job_trigger
FROM job
WHERE job.state = "RUNNING" AND (job.last_heartbeat_at IS NOT NULL AND job.last_heartbeat_at < "2021-06-20" OR job.last_heartbeat_at IS NULL AND job.started_at < "2021-06-20")casey
06/21/2021, 4:30 PMcasey
06/21/2021, 4:30 PMdouwe_maan
06/21/2021, 4:32 PMJobFinderdouwe_maan
06/21/2021, 4:32 PMdouwe_maan
06/21/2021, 4:33 PMmeltano repo, can you run:
from datetime import datetime, timedelta
now = datetime.utcnow()
last_valid_heartbeat_at = now - timedelta(minutes=HEARTBEAT_VALID_MINUTES)
last_valid_started_at = now - timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS)casey
06/21/2021, 4:56 PMHEARTBEAT_VALID_MINUTES ?casey
06/21/2021, 4:58 PMjob.pycasey
06/21/2021, 5:13 PMIn [15]: print(last_valid_heartbeat_at)
2021-06-21 16:50:07.654660
In [16]: print(last_valid_started_at)
2021-06-20 16:55:07.654660casey
06/21/2021, 5:14 PMdouwe_maan
06/21/2021, 5:52 PMcasey
06/21/2021, 5:57 PMSELECT job.id AS job_id_1, job.job_id AS job_job_id, job.run_id AS job_run_id, job.state AS job_state, job.started_at AS job_started_at, job.last_heartbeat_at AS job_last_heartbeat_at, job.ended_at AS job_ended_at, job.payload AS job_payload, job.payload_flags AS job_payload_flags, job.trigger AS job_trigger FROM job WHERE job.state = 'RUNNING' AND (job.last_heartbeat_at IS NOT NULL AND job.last_heartbeat_at < '2021-06-21 16:50:07.654660' OR job.last_heartbeat_at IS NULL AND job.started_at < '2021-06-20 16:55:07.654660')
returns 3 recordsdouwe_maan
06/21/2021, 5:57 PMdouwe_maan
06/21/2021, 5:57 PMcasey
06/21/2021, 5:58 PMcasey
06/21/2021, 9:44 PMcasey
06/21/2021, 9:44 PMdouwe_maan
06/21/2021, 9:54 PMRUNNING to FAIL.casey
06/21/2021, 9:59 PMdouwe_maan
06/21/2021, 9:59 PM