Happy monday everyone! I had a question regarding...
# infra-deployment
s
Happy monday everyone! I had a question regarding deployment with airflow. I'm trying to setup a way to include flags in my production pipeline (basically to run a full-refresh) if I need to reset state. How do you guys manage
full-refresh
calls to a specific stream in production?
d
We run a separate DAG that can reset the state and utilize "Trigger DAG w/ config"
s
Thanks @dan_ladd! This is a custom dag that is separate? Like a master dag of sorts?
d
yea like a unscheduled dag that let's you run certain operations. In this case, move back the state.
s
Ok perfect! I'll have to set that up
Update: Based on @dan_ladd’s suggestion (thanks again), here is the dag I created as a "master dag" for inputing commands
Copy code
# This file is managed by the 'airflow' file bundle and updated automatically when `meltano upgrade` is run.
# To prevent any manual changes from being overwritten, remove the file bundle from `meltano.yml` or disable automatic updates:
#     meltano config --plugin-type=files airflow set _update orchestrate/dags/meltano.py false

# If you want to define a custom DAG, create
# a new file under orchestrate/dags/ and Airflow
# will pick it up automatically.

import os
import logging
import subprocess
import json

from datetime import datetime

from airflow import DAG
try:
    from airflow.operators.bash_operator import BashOperator
except ImportError:
    from airflow.operators.bash import BashOperator

from datetime import timedelta
from pathlib import Path


logger = logging.getLogger(__name__)


DEFAULT_ARGS = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "catchup": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "concurrency": 1,
}

DEFAULT_TAGS = ["meltano"]

project_root = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())

meltano_bin = ".meltano/run/bin"

if not Path(project_root).joinpath(meltano_bin).exists():
    logger.warning(
        f"A symlink to the 'meltano' executable could not be found at '{meltano_bin}'. Falling back on expecting it to be in the PATH instead."
    )
    meltano_bin = "meltano"

dag_id = "meltano_master_dag"
args = DEFAULT_ARGS.copy()
args["start_date"] = datetime(2020, 1, 1)
tags = DEFAULT_TAGS.copy()
tags.extend([
    "meltano",
    "master_dag"
])


dag = DAG(
    dag_id,
    tags=tags,
    catchup=False,
    default_args=args,
    schedule_interval=None,
    max_active_runs=1,
)

invoke = BashOperator(
    task_id="invoke",
    bash_command=f'cd {project_root}; {meltano_bin}'
                ' {{ dag_run.conf["message"] if dag_run.conf else "--help" }} ',
    dag=dag,
)

# register the dag
globals()[dag_id] = dag

<http://logger.info|logger.info>(f"Create Meltano Master Dag")