Stéphane Burwash
10/24/2022, 2:30 PMfull-refresh
calls to a specific stream in production?dan_ladd
10/24/2022, 2:57 PMStéphane Burwash
10/24/2022, 2:58 PMdan_ladd
10/24/2022, 2:59 PMStéphane Burwash
10/24/2022, 3:01 PMStéphane Burwash
10/27/2022, 8:47 PM# This file is managed by the 'airflow' file bundle and updated automatically when `meltano upgrade` is run.
# To prevent any manual changes from being overwritten, remove the file bundle from `meltano.yml` or disable automatic updates:
# meltano config --plugin-type=files airflow set _update orchestrate/dags/meltano.py false
# If you want to define a custom DAG, create
# a new file under orchestrate/dags/ and Airflow
# will pick it up automatically.
import os
import logging
import subprocess
import json
from datetime import datetime
from airflow import DAG
try:
from airflow.operators.bash_operator import BashOperator
except ImportError:
from airflow.operators.bash import BashOperator
from datetime import timedelta
from pathlib import Path
logger = logging.getLogger(__name__)
DEFAULT_ARGS = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"catchup": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"concurrency": 1,
}
DEFAULT_TAGS = ["meltano"]
project_root = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
meltano_bin = ".meltano/run/bin"
if not Path(project_root).joinpath(meltano_bin).exists():
logger.warning(
f"A symlink to the 'meltano' executable could not be found at '{meltano_bin}'. Falling back on expecting it to be in the PATH instead."
)
meltano_bin = "meltano"
dag_id = "meltano_master_dag"
args = DEFAULT_ARGS.copy()
args["start_date"] = datetime(2020, 1, 1)
tags = DEFAULT_TAGS.copy()
tags.extend([
"meltano",
"master_dag"
])
dag = DAG(
dag_id,
tags=tags,
catchup=False,
default_args=args,
schedule_interval=None,
max_active_runs=1,
)
invoke = BashOperator(
task_id="invoke",
bash_command=f'cd {project_root}; {meltano_bin}'
' {{ dag_run.conf["message"] if dag_run.conf else "--help" }} ',
dag=dag,
)
# register the dag
globals()[dag_id] = dag
<http://logger.info|logger.info>(f"Create Meltano Master Dag")