Tanner Wilcox
08/11/2025, 10:04 PMSteven Searcy
08/11/2025, 10:33 PM@dbt_assets(manifest=dbt_project.manifest_path, select="+keystone")
def walmart_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
import subprocess
import os
from dagster import multi_asset, AssetOut, OpExecutionContext, ConfigurableResource
from dagster_ndo.resources.dbt import DBT_PROJECT_PATH, DBT_TARGET, DBT_PROFILE
TAP_NAME = "tap-csv--keystone"
ASSET_KEY_PREFIX = "keystone_files"
@multi_asset(
name="csv_keystone",
compute_kind="meltano",
group_name="keystone_extraction",
description="Drops existing tables, downloads fresh Keystone files from SFTP, and extracts 1 data stream to database",
outs={
"raw_keystone_data": AssetOut(key_prefix=[ASSET_KEY_PREFIX]),
}
)
def extract_keystone_data(context: OpExecutionContext, meltano: ConfigurableResource):
# Step 1: Drop existing tables
<http://context.log.info|context.log.info>("Dropping existing keystone raw tables via dbt...")
env = os.environ.copy()
dbt_command = [
"dbt", "run-operation", "drop_keystone_raw_tables",
"--project-dir", DBT_PROJECT_PATH,
"--profiles-dir", DBT_PROJECT_PATH,
"--target", DBT_TARGET,
"--profile", DBT_PROFILE
]
try:
result = subprocess.run(
dbt_command,
cwd=DBT_PROJECT_PATH,
env=env,
capture_output=True,
text=True,
check=True
)
<http://context.log.info|context.log.info>(f"dbt run-operation output: {result.stdout}")
if result.stderr:
context.log.warning(f"dbt run-operation stderr: {result.stderr}")
except subprocess.CalledProcessError as e:
context.log.error(f"dbt run-operation failed: {e}")
context.log.error(f"stdout: {e.stdout}")
context.log.error(f"stderr: {e.stderr}")
raise
# Step 2: Download files
<http://context.log.info|context.log.info>("Downloading fresh keystone files...")
download_command = "invoke download-keystone-files:run_script"
meltano.execute_command(download_command, dict(), context.log)
# Step 3: Extract data
<http://context.log.info|context.log.info>("Extracting Keystone data...")
extract_command = f"run {TAP_NAME} target-postgres--ndo"
meltano.execute_command(extract_command, dict(), context.log)
return (None, None, None, None, None, None)
keystone_extraction_assets = [extract_keystone_data]
keystone_complete_pipeline = define_asset_job(
"keystone_complete_pipeline",
AssetSelection.groups("keystone_extraction") |
AssetSelection.assets(keystone_dbt_assets)
)
Dagster's @dbt_assets decorator reads your dbt manifest and automatically understands that marts depend on staging models. When I run a mart, it intelligently rebuilds upstream dependencies as needed.
This is working for me, but there could be a flaw here that I have not discovered yet.Tanner Wilcox
08/11/2025, 10:36 PMSteven Searcy
08/11/2025, 10:45 PMextract_command = f"run {TAP_NAME} target-postgres--ndo"
The first block is creating Dagster assets from DBT models. (stg and mart tables)
The third block is a job that runs the first and second block if that makes sense. It can live anywhere really.