I hope you will be able to help with my use case: ...
# announcements
m
I hope you will be able to help with my use case: I have 100 accounts and each account can have one or more supported services e.g. (serviceA, serviceB, serviceC) For each of the supported services, account can have several entities e.g. (id1, id2, id3). I want to create DAG foreach account's service and foreach service create task for each entity. which mean: dag_account_1_serviceA -> task_id1 -> task_id2 dag_account_1_serviceB -> task_id1 ... This is an example of the data structure:
{
"account_1": {
"serviceA": {
"entities": ["id1", "id2"]
},
"serviceB": {
"entities": ["id1"]
}
},
"account_2": {
"serviceB": {
"entities": ["id1", "id5"]
},
"serviceC": {
"entities": ["id4"]
}
}
}
Each service has its docker image that contain the meltano tap to run. This is the code I am using:
import re
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 7, 16),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
account_services_data = {
"account_1": {
"serviceA": {
"entities": ["id1", "id2"]
},
"serviceB": {
"entities": ["id1"]
}
},
"account_2": {
"serviceB": {
"entities": ["id1", "id5"]
},
"serviceC": {
"entities": ["id4"]
}
}
}
service_name_to_tap_name = {
'serviceA': 'tap_serviceA',
'serviceB': 'tap_serviceB',
'serviceC': 'tap_serviceC'
}
target_list_for_tap = {
'serviceA': 'target_jsonl',
'serviceB': 'target_jsonl',
'serviceC': 'target_jsonl'
}
latest_images_for_services = {'serviceA': 'serviceA:latest',
'serviceB': 'serviceB:latest',
'serviceC': 'serviceC:latest'}
account_services = account_services_data
account_ids = account_services.keys()
for account in account_ids:
services = account_services[account].keys()
for service in services:
dag_name = '{}_{}_job'.format(account, service)
# The main dag
dag = DAG(dag_name, default_args=default_args, schedule_interval=None)
entities = account_services[account][service]['entities']
for entity in entities:
now = datetime.now()
timestamp = datetime.timestamp(now)
name = re.sub('[^0-9a-zA-Z]+', '_', "{}_{}_{}_{}".format(service, account, entity, timestamp))
dag_id = "meltano_{}".format(name)
tap_name = service_name_to_tap_name[service]
target_name = target_list_for_tap[service]
task_name = "{}_{}_".format(service, entity)
image_path = latest_images_for_services[service]
docker_dag = DAG(
dag_id,
default_args=default_args,
description=dag_id,
schedule_interval='@hourly',
)
with docker_dag:
t2 = DockerOperator(
task_id='meltano_elt_{}'.format(name),
# Need to get the latest version of the image from the registry
image=image_path,
api_version='auto',
auto_remove=True,
command="meltano elt {} {} --task-id={}".format(tap_name, target_name, task_name),
docker_url="<unix://var/run/docker.sock>",
`networ…
d
@meir.shamay I see a couple of things you may want to look at: • Airflow will only pick up DAGs that appear in
globals()
, which means they all need to have a unique variable name at the global level: https://airflow.apache.org/docs/stable/concepts.html#scope You can use this trick to add them directly to
globals()
with a key matching the DAG ID: https://gitlab.com/meltano/files-airflow/-/blob/master/bundle/orchestrate/dags/meltano.py#L74 (see also https://medium.com/@flavio.mtps/making-use-of-python-globals-to-dynamically-create-airflow-dags-124e556b704e) • DAG IDs are supposed to be stable, so you'll want to drop the timestamp from the name. • I'm not sure why you're creating a main DAG but not adding any tasks to it. I think the entity-specific DAGs you're creating are probably sufficient. • The
meltano elt
argument you'll want to set is
--job_id
, not
--task-id
🙂 • You'll want to set the
MELTANO_DATABASE_URI
env var or pass the
--database-uri=...
flag so that
meltano elt
will use an external system database for metadata: https://meltano.com/docs/production.html#storing-metadata
Also, the environment variables you're passing (
account_id
and
entity_id
) won't be picked up by Meltano with those names. To figure out the exact environment variable names to use, run
meltano config <plugin> list
, which will print something like:
Copy code
api_token [env: TAP_COVID_19_API_TOKEN] current value: None (from default)
user_agent [env: TAP_COVID_19_USER_AGENT] current value: None (from default)
start_date [env: TAP_COVID_19_START_DATE] current value: None (from default)
The env var names to use are those all-caps ones starting with the tap's namespace.
You may also need to change the
command
option to drop the
meltano
command name, and just start with
elt
, since the
meltano
command is already the entrypoint of the Docker image: https://meltano.com/docs/production.html#containerized-meltano-project-6