Hi, is there anyone using Mage to orchestrate Melt...
# best-practices
s
Hi, is there anyone using Mage to orchestrate Meltano commands and syncs? I am doing this, and I have created a pipeline which fetches different streams in different
blocks
in
mage
. I have noticed that If I cancel the block run in mage then the meltano process doesn't get cancelled. Is there a command where I can forcefully stop the meltano run ? P.S. Since I run the meltano command in a python subprocess, I can't directly close it.
v
If I'm running with an orchestration tool I'd call meltano with a subprocess synchronously, and pass through any signals you get from the parent process to the meltano process. Should solve your issues, sharing code would be a lot more helpful, and examples
s
Hey Derek, Here is a snippet from one
custom python
block in my
mage
project:
Copy code
if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test
import subprocess

stream_name = 'stock_move'
@custom
def transform_custom(*args, **kwargs):
    choose_stream_name = f"export TAP_ODOO__SELECT_FILTER='[\"{stream_name}\"]'; cd odoo_elt/tap-odoo; meltano run --force load-historical-data-lyra --merge-state"
    a = subprocess.run(choose_stream_name,  shell=True, text=True)
    print(a.stderr)
    return {"stock_move": a.returncode}
Here I am using a custom tap called
tap-odoo
. I have created custom blocks for each stream and they are run in a sequence. Here are the issues: 1. The block execution successfully completes even when the meltano command is still running (I can see the meltano state messages being populated in the block output). Not sure If I'm calling this subprocess correctly. How do I get the python block to wait for the meltano command to complete followed by the block execution completion? 2. Since the meltano process keeps running although the
block execution
has succeeded, the next meltano sync call is made via the next python block and so on. This might end up making multiple simultaneous calls and the
tap
server may not be able to handle the load
v
hmm I don't know mage but
Copy code
import subprocess
import os

def custom(func):
    def custom_func(*args, **kwargs):
        if not kwargs.get('from_notebook', False):
            df = None
            if len(args) >= 1:
                df = args[0]
            return func(df, **kwargs)
        return func(*args, **kwargs)
    custom_func.is_custom = True
    return custom_func

stream_name = 'stock_move'

@custom
def transform_custom(*args, **kwargs):
    env = os.environ.copy()
    env['TAP_ODOO__SELECT_FILTER'] = f'["{stream_name}"]'
    
    try:
        a = subprocess.run(
            "echo before timer; sleep 2; echo after timer",
            shell=True,
            text=True,
            env=env,
            capture_output=True
        )
        if a.stdout:
            print(f"Stdout: {a.stdout}")
        if a.stderr:
            print(f"Stderr: {a.stderr}")
            
        return {"stock_move": a.returncode}
    except Exception as e:
        print(f"Error executing subprocess: {str(e)}")
        raise

transform_custom()
This works fine, the python app waits to finish execution. I tried to find the annotation code for @custom https://github.com/mage-ai/mage-ai/blob/d0eaef32a40a771b9ac8485e14ed08da83ca2dc6/mage_ai/data_preparation/decorators.py#L4 But it looks like that's made at runtime so maybe you could peak at that?
s
Thanks Derek, I'll check this one out.