charles_mcmillan
05/27/2021, 8:05 PMdouwe_maan
05/27/2021, 8:47 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:19 PMcharles_mcmillan
05/27/2021, 9:20 PMcharles_mcmillan
05/27/2021, 9:21 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:22 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:22 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:23 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:23 PMcharles_mcmillan
05/27/2021, 9:31 PMcharles_mcmillan
05/27/2021, 9:33 PMI have a different configuration where I launch specific pipelinesIs this a specific re-usable pattern I can look into? Is it just launching ‘pods’ somewhere and reporting back or something like that?
juan_sebastian_suarez_valencia
05/27/2021, 9:33 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:34 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:35 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:35 PMfrom airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
import base64
dag = DAG(
dag_id='hubspot',
default_args={'owner': 'airflow'},
schedule_interval='0 1 * * *',
start_date=days_ago(2),
tags=['meltano'],
)
variables = {"channel":"data"}
pipeline_start = SSHOperator(
ssh_conn_id='meltano_ssh',
task_id='pipeline_start',
command='curl --header "Content-Type: application/json" --request POST --data \'{"username":"AirflowBot", "emoji":":robot_face:","password":"NotificationBot","text":"Hubpost sync started :checkered_flag:","channel":"{{params.channel}}"}\' <https://hook.integromat.com/0wphttm6uqgyzsvp91899pk99x982623> 2> error.txt',
params = variables,
dag=dag,
)
pipeline = SSHOperator(
ssh_conn_id='meltano_ssh',
task_id='pipeline',
command='cd ../juan_valencia;source .venv/bin/activate;cd hubspot-meister;meltano elt tap-hubspot-meister target-bigquery --job_id=hubspot-to-bq --force; echo $?',
do_xcom_push=True,
dag=dag,
)
def branch_operator(ti):
try :
#For some reason XCOM recovers the meltano output on top of the echo (<https://bit.ly/3hZSx3P>)
x_com_text = base64.b64decode(ti.xcom_pull(task_ids='pipeline')).decode('utf-8').rstrip()
print(x_com_text)
x_com_exit_status = x_com_text[-1:]
print(x_com_exit_status)
xcom_value = int(x_com_exit_status)
except :
xcom_value = 0
if xcom_value == 0:
return 'save_state'
else:
return 'notification_failed'
branch_op = BranchPythonOperator(
task_id='branch_task',
python_callable=branch_operator,
dag=dag)
save_state = SSHOperator(
ssh_conn_id='meltano_ssh',
task_id='save_state',
command='cd ../juan_valencia;source .venv/bin/activate;cd hubspot-meister;meltano elt tap-hubspot-meister target-bigquery --job_id=hubspot-to-bq --dump=state > latest_state.json;',
dag=dag,
)
notification_success = SSHOperator(
ssh_conn_id='meltano_ssh',
task_id='notification_success',
command='curl --header "Content-Type: application/json" --request POST --data \'{"username":"AirflowBot","emoji":":robot_face:","password":"NotificationBot","text":"Hubpost sync finished :white_check_mark:","channel":"{{params.channel}}"}\' <https://hook.integromat.com/0wphttm6uqgyzsvp91899pk99x982623>',
params = variables,
dag=dag,
)
notification_failed = SSHOperator(
ssh_conn_id='meltano_ssh',
task_id='notification_failed',
command='curl --header "Content-Type: application/json" --request POST --data \'{"username":"AirflowBot","emoji":":robot_face:","password":"NotificationBot","text":"Hubpost sync had an error :no_entry:","channel":"{{params.channel}}"}\' <https://hook.integromat.com/0wphttm6uqgyzsvp91899pk99x982623>',
params = variables,
dag=dag,
)
pipeline_start >> pipeline >> branch_op >> [save_state, notification_failed]
save_state >> notification_success
charles_mcmillan
05/27/2021, 9:37 PMcharles_mcmillan
05/27/2021, 9:38 PMcharles_mcmillan
05/27/2021, 9:38 PMcharles_mcmillan
05/27/2021, 9:39 PMcharles_mcmillan
05/27/2021, 9:39 PMcharles_mcmillan
05/27/2021, 9:40 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:41 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:42 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:42 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:43 PMcharles_mcmillan
05/27/2021, 9:44 PMcharles_mcmillan
05/27/2021, 9:44 PMjuan_sebastian_suarez_valencia
05/27/2021, 9:44 PM