meir.shamay
07/23/2020, 4:23 PM{
"account_1": {
"serviceA": {
"entities": ["id1", "id2"]
},
"serviceB": {
"entities": ["id1"]
}
},
"account_2": {
"serviceB": {
"entities": ["id1", "id5"]
},
"serviceC": {
"entities": ["id4"]
}
}
}
Each service has its docker image that contain the meltano tap to run.
This is the code I am using:
import re
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 7, 16),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
account_services_data = {
"account_1": {
"serviceA": {
"entities": ["id1", "id2"]
},
"serviceB": {
"entities": ["id1"]
}
},
"account_2": {
"serviceB": {
"entities": ["id1", "id5"]
},
"serviceC": {
"entities": ["id4"]
}
}
}
service_name_to_tap_name = {
'serviceA': 'tap_serviceA',
'serviceB': 'tap_serviceB',
'serviceC': 'tap_serviceC'
}
target_list_for_tap = {
'serviceA': 'target_jsonl',
'serviceB': 'target_jsonl',
'serviceC': 'target_jsonl'
}
latest_images_for_services = {'serviceA': 'serviceA:latest',
'serviceB': 'serviceB:latest',
'serviceC': 'serviceC:latest'}
account_services = account_services_data
account_ids = account_services.keys()
for account in account_ids:
services = account_services[account].keys()
for service in services:
dag_name = '{}_{}_job'.format(account, service)
# The main dag
dag = DAG(dag_name, default_args=default_args, schedule_interval=None)
entities = account_services[account][service]['entities']
for entity in entities:
now = datetime.now()
timestamp = datetime.timestamp(now)
name = re.sub('[^0-9a-zA-Z]+', '_', "{}_{}_{}_{}".format(service, account, entity, timestamp))
dag_id = "meltano_{}".format(name)
tap_name = service_name_to_tap_name[service]
target_name = target_list_for_tap[service]
task_name = "{}_{}_".format(service, entity)
image_path = latest_images_for_services[service]
docker_dag = DAG(
dag_id,
default_args=default_args,
description=dag_id,
schedule_interval='@hourly',
)
with docker_dag:
t2 = DockerOperator(
task_id='meltano_elt_{}'.format(name),
# Need to get the latest version of the image from the registry
image=image_path,
api_version='auto',
auto_remove=True,
command="meltano elt {} {} --task-id={}".format(tap_name, target_name, task_name),
docker_url="<unix://var/run/docker.sock>",
`networ…douwe_maan
07/23/2020, 4:38 PMglobals(), which means they all need to have a unique variable name at the global level: https://airflow.apache.org/docs/stable/concepts.html#scope You can use this trick to add them directly to globals() with a key matching the DAG ID: https://gitlab.com/meltano/files-airflow/-/blob/master/bundle/orchestrate/dags/meltano.py#L74 (see also https://medium.com/@flavio.mtps/making-use-of-python-globals-to-dynamically-create-airflow-dags-124e556b704e)
• DAG IDs are supposed to be stable, so you'll want to drop the timestamp from the name.
• I'm not sure why you're creating a main DAG but not adding any tasks to it. I think the entity-specific DAGs you're creating are probably sufficient.
• The meltano elt argument you'll want to set is --job_id , not --task-id 🙂
• You'll want to set the MELTANO_DATABASE_URI env var or pass the --database-uri=... flag so that meltano elt will use an external system database for metadata: https://meltano.com/docs/production.html#storing-metadatadouwe_maan
07/23/2020, 4:39 PMcatchup=False on the DAG, see https://gitlab.com/meltano/files-airflow/-/blob/master/bundle/orchestrate/dags/meltano.py#L51-57douwe_maan
07/23/2020, 4:41 PMaccount_id and entity_id ) won't be picked up by Meltano with those names. To figure out the exact environment variable names to use, run meltano config <plugin> list , which will print something like:
api_token [env: TAP_COVID_19_API_TOKEN] current value: None (from default)
user_agent [env: TAP_COVID_19_USER_AGENT] current value: None (from default)
start_date [env: TAP_COVID_19_START_DATE] current value: None (from default)
The env var names to use are those all-caps ones starting with the tap's namespace.douwe_maan
07/23/2020, 4:42 PMcommand option to drop the meltano command name, and just start with elt , since the meltano command is already the entrypoint of the Docker image: https://meltano.com/docs/production.html#containerized-meltano-project-6