Hello all, I am curious if there is a solid way to...
# troubleshooting
s
Hello all, I am curious if there is a solid way to define Dagster assets for Meltano jobs using
dagster-meltano
. Currently, I am using the
dbt-postgres
plugin within Meltano and I can fairly easily run a Meltano job, but I have no data lineage or dependency tracking.
Copy code
from dagster import Definitions, job
from dagster_meltano import meltano_resource, meltano_run_op

@job(resource_defs={"meltano": meltano_resource})
def run_ndo_pipeline():
    meltano_run_op("ndo_pipeline")()

defs = Definitions(
    jobs=[run_ndo_pipeline]
)
Copy code
jobs:
- name: ndo_pipeline
  tasks:
  - download-files:run_script
  - dbt-postgres:drop-raw-tables
  - tap-csv
  - target-postgres
  - dbt-postgres:select-ndo
I am looking to move towards running DBT outside of Meltano, but am not finding a straightforward path to doing this. Note: I am using Dagster+ hybrid and not using the dagster meltano utility.
👀 1
a
Happy to share my code here. I have a yaml file containing the stream names organised by tap, and the python class to convert those into dagster assets. Ultimately you can't execute any of the streams as individual assets, they are all materialised by a simple call to
meltano run
Copy code
from dataclasses import dataclass
import enum
import os
from typing import List
import warnings
from yaml import safe_load
from pathlib import Path



from dagster import MAX_RUNTIME_SECONDS_TAG, AssetOut, AssetSelection, ConfigurableResource, DefaultScheduleStatus, OpExecutionContext, ScheduleDefinition, define_asset_job, multi_asset, ExperimentalWarning
from dagster_meltano import meltano_resource

warnings.filterwarnings("ignore", category=ExperimentalWarning)


class MeltanoEnv(enum.Enum):
    dev = enum.auto()
    prod = enum.auto()


MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())


@dataclass
class MeltanoSpec():
    job_name: str
    streams: list[str]
    cron_schedule: str
    max_retries: int = 2
    timeout_mins: int = 15   # number of mins the job can run for before being considered timed out.
    enabled: bool = True

    @property
    def timeout_seconds(self) -> int:
        return self.timeout_mins * 60
    
def meltano_asset_factory(all_tap_streams: list[MeltanoSpec]) -> list:
    multi_assets = []
    jobs = []
    schedules = []

    for spec in all_tap_streams:
    # for job_name, (tap_streams, cron_schedule) in all_tap_streams.items():
        @multi_asset(
            name=spec.job_name,
            resource_defs={'meltano': meltano_resource},
            compute_kind="meltano",
            group_name=spec.job_name,
            outs={
            stream: AssetOut(key_prefix=[f'raw_{spec.job_name}'])
            for stream
            in spec.streams
            },
            op_tags={
                'dagster/max_retries': spec.max_retries,
                MAX_RUNTIME_SECONDS_TAG: spec.timeout_mins,
            },
        )
        def compute(context: OpExecutionContext, meltano: ConfigurableResource):
            command = f"run {context.op.name}"
            meltano.execute_command(f"{command}", dict(), context.log)
            return tuple([None for _ in context.selected_output_names])

        multi_assets.append(compute)

        asset_job = define_asset_job(
            f"{spec.job_name}_assets",
            AssetSelection.groups(spec.job_name),
            tags={
                'dagster/max_retries': spec.max_retries,
                MAX_RUNTIME_SECONDS_TAG: spec.timeout_seconds,
            }
            )
        
        jobs.append(asset_job)

        basic_schedule = ScheduleDefinition(
            job=asset_job,
            cron_schedule=spec.cron_schedule,
            default_status=DefaultScheduleStatus.RUNNING,
            execution_timezone="Europe/London"
        )

        
        schedules.append(basic_schedule)

    return multi_assets, jobs, schedules

def get_meltano_specs(meltano_yaml) -> List[MeltanoSpec]:
    meltano_specs = []
    no_value = object()

    for k in meltano_yaml['taps']:
        kwargs = {
            'max_retries': k.get('max_retries', no_value),
            'timeout_mins': k.get('timeout_mins', no_value),
            'enabled': k.get('enabled', no_value),
        }
        kwargs = {
            k:v
            for k,v in kwargs.items()
            if v is not no_value
        }
        spec = MeltanoSpec(
            k['name'],
            k['streams'],
            k['schedule'],
            **kwargs
        )
        meltano_specs.append(spec)
    return meltano_specs

with open(Path(__file__).parent  / 'meltano_assets.yaml', 'r') as f:
    meltano_yaml = safe_load(f)
    all_tap_streams = get_meltano_specs(meltano_yaml)



meltano_assets, meltano_jobs, meltano_schedules = meltano_asset_factory([t for t in all_tap_streams if t.enabled])
Yaml example
Copy code
taps:
  - name: freshdesk
    schedule: '0 2 * * *'
    streams:
      - conversations
      - ticket_fields
      - tickets_detail
      - email_configs
      - agents
      - contacts
      - sla_policies
      - groups
  - name: mailchimp
    schedule: '@daily'
    timeout_mins: 180
    streams:
      - campaigns
      - lists
      - lists_members
      - reports_email_activity
      - reports_sent_to
      - reports_unsubscribes
I also define meltano jobs matching each tap name
Copy code
jobs:
- name: freshdesk
  tasks:
  - tap-freshdesk target-postgres  # still basic postgres until <https://github.com/MeltanoLabs/target-postgres/issues/124> resolved
- name: instagram
  tasks:
  - tap-instagram target-postgres  # still basic postgres until <https://github.com/MeltanoLabs/target-postgres/issues/86> resolved
No experience with dagster+ so not sure if this ports across, but hopefully of some help?
❤️ 2
If this is helpful I can show you how to structure your dbt sources, these need to match your streams and target schemas 1:1 but not too tricky once set. You can ask dagster to refresh a dbt model, and it will also trigger an upstream run of meltano sources.
s
@Andy Carter Awesome, thanks Andy! Definitely helpful.