Shubham Kawade
11/08/2024, 5:13 AMblocks
in mage
.
I have noticed that If I cancel the block run in mage then the meltano process doesn't get cancelled.
Is there a command where I can forcefully stop the meltano run ?
P.S. Since I run the meltano command in a python subprocess, I can't directly close it.visch
11/08/2024, 3:13 PMShubham Kawade
11/12/2024, 4:42 AMcustom python
block in my mage
project:
if 'custom' not in globals():
from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
import subprocess
stream_name = 'stock_move'
@custom
def transform_custom(*args, **kwargs):
choose_stream_name = f"export TAP_ODOO__SELECT_FILTER='[\"{stream_name}\"]'; cd odoo_elt/tap-odoo; meltano run --force load-historical-data-lyra --merge-state"
a = subprocess.run(choose_stream_name, shell=True, text=True)
print(a.stderr)
return {"stock_move": a.returncode}
Here I am using a custom tap called tap-odoo
.
I have created custom blocks for each stream and they are run in a sequence.
Here are the issues:
1. The block execution successfully completes even when the meltano command is still running (I can see the meltano state messages being populated in the block output). Not sure If I'm calling this subprocess correctly. How do I get the python block to wait for the meltano command to complete followed by the block execution completion?
2. Since the meltano process keeps running although the block execution
has succeeded, the next meltano sync call is made via the next python block and so on. This might end up making multiple simultaneous calls and the tap
server may not be able to handle the loadvisch
11/12/2024, 4:54 PMimport subprocess
import os
def custom(func):
def custom_func(*args, **kwargs):
if not kwargs.get('from_notebook', False):
df = None
if len(args) >= 1:
df = args[0]
return func(df, **kwargs)
return func(*args, **kwargs)
custom_func.is_custom = True
return custom_func
stream_name = 'stock_move'
@custom
def transform_custom(*args, **kwargs):
env = os.environ.copy()
env['TAP_ODOO__SELECT_FILTER'] = f'["{stream_name}"]'
try:
a = subprocess.run(
"echo before timer; sleep 2; echo after timer",
shell=True,
text=True,
env=env,
capture_output=True
)
if a.stdout:
print(f"Stdout: {a.stdout}")
if a.stderr:
print(f"Stderr: {a.stderr}")
return {"stock_move": a.returncode}
except Exception as e:
print(f"Error executing subprocess: {str(e)}")
raise
transform_custom()
This works fine, the python app waits to finish execution. I tried to find the annotation code for @custom
https://github.com/mage-ai/mage-ai/blob/d0eaef32a40a771b9ac8485e14ed08da83ca2dc6/mage_ai/data_preparation/decorators.py#L4
But it looks like that's made at runtime so maybe you could peak at that?Shubham Kawade
11/20/2024, 7:46 AM