Hello! Are there any recommended approaches to mon...
# getting-started
c
Hello! Are there any recommended approaches to monitoring meltano? I’m thinking about error tracking in particular, but any metrics at all would be interesting. For error, ideally I could route them to Slack, or perhaps to something like Sentry
d
Are you using Meltano with Airflow as the orchestrator? I'd suggest looking into Airflow's error notification functionality, since this is the responsibility of the orchestrator more than Meltano which ties it all together
j
Hello @charles_mcmillan I’m doing that with Airflow, happy to share with you my experience
c
Oh great points! Yes we have an airflow --> slack notifier for failures. As long as Meltano returns an appropriate exit code, I imagine our current notification setup can handle the rest
Has that your experience @juan_sebastian_suarez_valencia (that it returns an appropriate exit code on any failure). I’d love to see how you set it up in general in Airflow, as well…do you have it ssetup like this? https://gitlab.com/meltano/files-airflow/-/blob/master/bundle/orchestrate/dags/meltano.py
j
You tagged the wrong Juan @charles_mcmillan
No I have a different configuration where I launch specific pipelines
and I recover the error codes
But I plan to put some testing on top, also orchestrated by Airflow
c
Woops! Was not being careful there with the tag, sorry about that!
Very interesting…when you say
I have a different configuration where I launch specific pipelines
Is this a specific re-usable pattern I can look into? Is it just launching ‘pods’ somewhere and reporting back or something like that?
j
Correct
I don’t have Kubernetes, I use VM for now
Here’s the code of the dag
Copy code
from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator 
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
import base64

dag = DAG(
    dag_id='hubspot',
    default_args={'owner': 'airflow'},
    schedule_interval='0 1 * * *',
    start_date=days_ago(2),
    tags=['meltano'],
)

variables = {"channel":"data"}

pipeline_start = SSHOperator(
     ssh_conn_id='meltano_ssh',
     task_id='pipeline_start',
     command='curl --header "Content-Type: application/json" --request POST --data \'{"username":"AirflowBot", "emoji":":robot_face:","password":"NotificationBot","text":"Hubpost sync started :checkered_flag:","channel":"{{params.channel}}"}\' <https://hook.integromat.com/0wphttm6uqgyzsvp91899pk99x982623> 2> error.txt',
     params = variables,
     dag=dag,
 )

pipeline = SSHOperator(
     ssh_conn_id='meltano_ssh',
     task_id='pipeline',
     command='cd ../juan_valencia;source .venv/bin/activate;cd hubspot-meister;meltano elt tap-hubspot-meister target-bigquery --job_id=hubspot-to-bq --force; echo $?',
     do_xcom_push=True,
     dag=dag,
 )

def branch_operator(ti):
    try :
        #For some reason XCOM recovers the meltano output on top of the echo (<https://bit.ly/3hZSx3P>)
        x_com_text = base64.b64decode(ti.xcom_pull(task_ids='pipeline')).decode('utf-8').rstrip()
        print(x_com_text)
        x_com_exit_status = x_com_text[-1:]
        print(x_com_exit_status)
        xcom_value = int(x_com_exit_status)
    except :
        xcom_value = 0
    
    if xcom_value == 0:
        return 'save_state'
    else:
        return 'notification_failed'

branch_op = BranchPythonOperator(
    task_id='branch_task',
    python_callable=branch_operator,
    dag=dag)

save_state = SSHOperator(
     ssh_conn_id='meltano_ssh',
     task_id='save_state',
     command='cd ../juan_valencia;source .venv/bin/activate;cd hubspot-meister;meltano elt tap-hubspot-meister target-bigquery --job_id=hubspot-to-bq --dump=state > latest_state.json;',
     dag=dag,
)

notification_success = SSHOperator(
     ssh_conn_id='meltano_ssh',
     task_id='notification_success',
     command='curl --header "Content-Type: application/json" --request POST --data \'{"username":"AirflowBot","emoji":":robot_face:","password":"NotificationBot","text":"Hubpost sync finished :white_check_mark:","channel":"{{params.channel}}"}\' <https://hook.integromat.com/0wphttm6uqgyzsvp91899pk99x982623>',
     params = variables,
     dag=dag,
)

notification_failed = SSHOperator(
     ssh_conn_id='meltano_ssh',
     task_id='notification_failed',
     command='curl --header "Content-Type: application/json" --request POST --data \'{"username":"AirflowBot","emoji":":robot_face:","password":"NotificationBot","text":"Hubpost sync had an error :no_entry:","channel":"{{params.channel}}"}\' <https://hook.integromat.com/0wphttm6uqgyzsvp91899pk99x982623>',
     params = variables,
     dag=dag,
)

pipeline_start >> pipeline >> branch_op >> [save_state, notification_failed]
save_state >> notification_success
c
Wow, this is so amazing…thank you!! cc @jose_ribeiro
So I think provisioning meltano on VMs has been our tentative strategy as of now as well (in order not to do too much heavy lifting on the airflow instance)…this is super great
One thing I’ve been dreading is the dealing with the programmatic autoprovisioning of instances (we are on Google Cloud, so this would be Google Compute Engine)
It looks like you have one big instance to handle it all, if I’m reading it right, is that correct?
Worst case, we can come up with an idempotent operator at the beginning of the DAG to provision a machine if it’s not there, I’m thinking
(This is where Kubernetes excels, but I’m not advanced enough to get it yet)
j
Yes @charles_mcmillan One VM can handle a few pipelines
But you are correct, this is where Kubernetes excels
The plan is to put this in a Kubernetes cluster but now I test with VM
The things with Kubernetes is that Meltano doesn’t like to be shut down and the state is lost if the db is erased with the instance
c
oh gotcha…yes that makes sense. Independently, we were thinking of putting the ‘state’ into a separate database, so that we could delete/recreate the machine someday without any problems
I’m not sure how hard that is going to be
j
🤷‍♂️