Steven Searcy
07/22/2025, 5:02 PMdagster-meltano
. Currently, I am using the dbt-postgres
plugin within Meltano and I can fairly easily run a Meltano job, but I have no data lineage or dependency tracking.
from dagster import Definitions, job
from dagster_meltano import meltano_resource, meltano_run_op
@job(resource_defs={"meltano": meltano_resource})
def run_ndo_pipeline():
meltano_run_op("ndo_pipeline")()
defs = Definitions(
jobs=[run_ndo_pipeline]
)
jobs:
- name: ndo_pipeline
tasks:
- download-files:run_script
- dbt-postgres:drop-raw-tables
- tap-csv
- target-postgres
- dbt-postgres:select-ndo
I am looking to move towards running DBT outside of Meltano, but am not finding a straightforward path to doing this.
Note: I am using Dagster+ hybrid and not using the dagster meltano utility.Andy Carter
07/23/2025, 7:15 AMmeltano run
from dataclasses import dataclass
import enum
import os
from typing import List
import warnings
from yaml import safe_load
from pathlib import Path
from dagster import MAX_RUNTIME_SECONDS_TAG, AssetOut, AssetSelection, ConfigurableResource, DefaultScheduleStatus, OpExecutionContext, ScheduleDefinition, define_asset_job, multi_asset, ExperimentalWarning
from dagster_meltano import meltano_resource
warnings.filterwarnings("ignore", category=ExperimentalWarning)
class MeltanoEnv(enum.Enum):
dev = enum.auto()
prod = enum.auto()
MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
@dataclass
class MeltanoSpec():
job_name: str
streams: list[str]
cron_schedule: str
max_retries: int = 2
timeout_mins: int = 15 # number of mins the job can run for before being considered timed out.
enabled: bool = True
@property
def timeout_seconds(self) -> int:
return self.timeout_mins * 60
def meltano_asset_factory(all_tap_streams: list[MeltanoSpec]) -> list:
multi_assets = []
jobs = []
schedules = []
for spec in all_tap_streams:
# for job_name, (tap_streams, cron_schedule) in all_tap_streams.items():
@multi_asset(
name=spec.job_name,
resource_defs={'meltano': meltano_resource},
compute_kind="meltano",
group_name=spec.job_name,
outs={
stream: AssetOut(key_prefix=[f'raw_{spec.job_name}'])
for stream
in spec.streams
},
op_tags={
'dagster/max_retries': spec.max_retries,
MAX_RUNTIME_SECONDS_TAG: spec.timeout_mins,
},
)
def compute(context: OpExecutionContext, meltano: ConfigurableResource):
command = f"run {context.op.name}"
meltano.execute_command(f"{command}", dict(), context.log)
return tuple([None for _ in context.selected_output_names])
multi_assets.append(compute)
asset_job = define_asset_job(
f"{spec.job_name}_assets",
AssetSelection.groups(spec.job_name),
tags={
'dagster/max_retries': spec.max_retries,
MAX_RUNTIME_SECONDS_TAG: spec.timeout_seconds,
}
)
jobs.append(asset_job)
basic_schedule = ScheduleDefinition(
job=asset_job,
cron_schedule=spec.cron_schedule,
default_status=DefaultScheduleStatus.RUNNING,
execution_timezone="Europe/London"
)
schedules.append(basic_schedule)
return multi_assets, jobs, schedules
def get_meltano_specs(meltano_yaml) -> List[MeltanoSpec]:
meltano_specs = []
no_value = object()
for k in meltano_yaml['taps']:
kwargs = {
'max_retries': k.get('max_retries', no_value),
'timeout_mins': k.get('timeout_mins', no_value),
'enabled': k.get('enabled', no_value),
}
kwargs = {
k:v
for k,v in kwargs.items()
if v is not no_value
}
spec = MeltanoSpec(
k['name'],
k['streams'],
k['schedule'],
**kwargs
)
meltano_specs.append(spec)
return meltano_specs
with open(Path(__file__).parent / 'meltano_assets.yaml', 'r') as f:
meltano_yaml = safe_load(f)
all_tap_streams = get_meltano_specs(meltano_yaml)
meltano_assets, meltano_jobs, meltano_schedules = meltano_asset_factory([t for t in all_tap_streams if t.enabled])
Yaml example
taps:
- name: freshdesk
schedule: '0 2 * * *'
streams:
- conversations
- ticket_fields
- tickets_detail
- email_configs
- agents
- contacts
- sla_policies
- groups
- name: mailchimp
schedule: '@daily'
timeout_mins: 180
streams:
- campaigns
- lists
- lists_members
- reports_email_activity
- reports_sent_to
- reports_unsubscribes
I also define meltano jobs matching each tap name
jobs:
- name: freshdesk
tasks:
- tap-freshdesk target-postgres # still basic postgres until <https://github.com/MeltanoLabs/target-postgres/issues/124> resolved
- name: instagram
tasks:
- tap-instagram target-postgres # still basic postgres until <https://github.com/MeltanoLabs/target-postgres/issues/86> resolved
No experience with dagster+ so not sure if this ports across, but hopefully of some help?Andy Carter
07/23/2025, 7:42 AMSteven Searcy
07/23/2025, 6:51 PM