hello folks, I am trying to orchestrate a custom t...
# infra-deployment
s
hello folks, I am trying to orchestrate a custom tap we are using on
Mage
. I am using python subprocesses to execute meltano commands. I have created a task in mage for each stream I want to sync. and I want to somehow track whether at the end of the task run, that stream was synced or not. How do I programatically check whether after a sync, that stream was successfully attempted or not? e.g. a log file which I could check in python P.S. I checked the logging section and the log file is created in a run folder which is randomly generated so it doesn't seem reliable for this purpose
v
the subprocess that you're calling Meltano with has a return code, if it's 0 it succeeded
2
s
thanks🙌
dancingpenguin 1
just a followup question, is it possible to extract the output or error message like
Block run successful
or
block run failed: error
with error from subprocesses? I tried the stdout and stderr but it gives a none output. My use case requires a slack notification with the error message if the extractor/loader fails
v
yes it's possible to parse the data from stderr in a subprocess. I'd highly recommend the error code
👍 1
e
There's also https://github.com/meltano/meltano/issues/3013, but the spec is not very clear
s
Hello Derek, have you gotten a meltano error message in the stderr or the subprocess in mage once it fails? I tried to manually fail the subprocess (by disconnecting wifi), meltano failed (as visible in the logs printed on the UI) but the
subprocess.stderr
returned
None
. I have a scheduled pipeline which runs these meltano subprocess blocks, I see that they've been run successfully but the state is not updated so something is wrong. And the pipeline logs don't show me anything for debugging
v
@Shubham Kawade I don't use mage I just have used subprocess in python a bunch to call meltano taps/targets. If you share the code you're using for calling the meltano process it'd be easier, but yes I wouldn't rely on subprocess.stderr for telling whether a subprocess exits properly or not
I see that they've been run successfully but the state is not updated so something is wrong
state is not updated could be a number of things so your meltano.yml would be helpful as well.
s
Yes sure, this is the custom python block I use in mage to define the function which calls the subprocess:
Copy code
stream_name = 'sale_order'

@custom
def transform_custom(*args, **kwargs):
    choose_stream_name = f"export TAP_ODOO__SELECT_FILTER='[\"{stream_name}\"]'; cd odoo_elt/tap-odoo;  meltano run load-historical-data --merge-state"
    a = subprocess.run(choose_stream_name,  shell=True, text=True)
    return {f"{stream_name}": a.returncode}
and here is the meltano.yml file for the relavant taps and targets:
Copy code
version: 1
send_anonymous_usage_stats: false
project_id: tap-odoo
default_environment: test
environments:
- name: test
plugins:
  extractors:
  - name: tap-odoo
    namespace: tap_odoo
    pip_url: -e .
    capabilities:
    - state
    - catalog
    - discover
    settings:
    - name: url
      value: TAP_ODOO_URL
    - name: db
      value: TAP_ODOO_DB
    - name: username
      value: TAP_ODOO_USERNAME
    - name: password
      value: TAP_ODOO_PASSWORD
    config:
      start_date: '2023-01-01T00:00:00Z'
    select:
    # -  #}
    # - purchase_order_lines.* #}
    # - dispatch_line.* #} #}
    #- sale_orders.*
    - stock_move.* #} d
    - stock_move_line.*  #d
    - stock_lot.* #d   
    - stock_valuation.* #d 
    - stock_quant.* #d
    - stock_warehouse.*  #d
    - stock_location.* #d
    - res_company.*  #d
    - product_template.* #d --issue
    - product_product.* #d
    - stock_quant_history.* #d
    #- stock_warehouse_orderpoint
    - product_category.*
    - purchase_order.*
    - purchase_order_line.*
    - mrp_production.*
    - sale_order.*
    

  loaders:
  - name: target-bigquery
    variant: z3z1ma
    pip_url: git+<https://github.com/z3z1ma/target-bigquery.git>
    config:
      append_only: 'true'
      credentials_path: some_path
      dataset: dataset
      denormalized: true
      project: project
      location: location
      method: batch_job
jobs:
- name: load-historical-data
  tasks:
  - tap-odoo target-bigquery
Here, tap-odoo is a custom tap I'm working on
v
the code for
transform_custom
looks pretty good to me, a shouldn't be able to be none. Are you sure that's what's happening, maybe the @custom decorator is doing something here or maybe it depends on how you're calling the function. a shouldn't be able to be none though. I'd throw a few println's in there to see if mage is actually calling the process when you think they are
👀 1
s
ok let me try that.