I would like one dag staging/intermediate/mart fil...
# best-practices
t
I would like one dag staging/intermediate/mart file in my project. I would like if the mart dag knew that it depended on x, y, and z intermediate and that intermediate depended on a, b, and c staging file and would rerun all of those to re-generate the mart. From what I've looked into there isn't a solution for this so I'm planning on writing my own generator. Before I head down that route I was wondering if there's already some solution for this
s
Hi Tanner, I am doing something like the following:
Copy code
@dbt_assets(manifest=dbt_project.manifest_path, select="+keystone")
def walmart_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()
Copy code
import subprocess
import os
from dagster import multi_asset, AssetOut, OpExecutionContext, ConfigurableResource
from dagster_ndo.resources.dbt import DBT_PROJECT_PATH, DBT_TARGET, DBT_PROFILE

TAP_NAME = "tap-csv--keystone"
ASSET_KEY_PREFIX = "keystone_files"

@multi_asset(
    name="csv_keystone",
    compute_kind="meltano",
    group_name="keystone_extraction",
    description="Drops existing tables, downloads fresh Keystone files from SFTP, and extracts 1 data stream to database",
    outs={
        "raw_keystone_data": AssetOut(key_prefix=[ASSET_KEY_PREFIX]),
    }
)
def extract_keystone_data(context: OpExecutionContext, meltano: ConfigurableResource):
    # Step 1: Drop existing tables
    <http://context.log.info|context.log.info>("Dropping existing keystone raw tables via dbt...")
    
    env = os.environ.copy()
    dbt_command = [
        "dbt", "run-operation", "drop_keystone_raw_tables",
        "--project-dir", DBT_PROJECT_PATH,
        "--profiles-dir", DBT_PROJECT_PATH,
        "--target", DBT_TARGET,
        "--profile", DBT_PROFILE
    ]
    
    try:
        result = subprocess.run(
            dbt_command,
            cwd=DBT_PROJECT_PATH,
            env=env,
            capture_output=True,
            text=True,
            check=True
        )
        <http://context.log.info|context.log.info>(f"dbt run-operation output: {result.stdout}")
        if result.stderr:
            context.log.warning(f"dbt run-operation stderr: {result.stderr}")
    except subprocess.CalledProcessError as e:
        context.log.error(f"dbt run-operation failed: {e}")
        context.log.error(f"stdout: {e.stdout}")
        context.log.error(f"stderr: {e.stderr}")
        raise

    # Step 2: Download files
    <http://context.log.info|context.log.info>("Downloading fresh keystone files...")
    download_command = "invoke download-keystone-files:run_script"
    meltano.execute_command(download_command, dict(), context.log)
    
    # Step 3: Extract data
    <http://context.log.info|context.log.info>("Extracting Keystone data...")
    extract_command = f"run {TAP_NAME} target-postgres--ndo"
    meltano.execute_command(extract_command, dict(), context.log)
    
    return (None, None, None, None, None, None)

keystone_extraction_assets = [extract_keystone_data]
Copy code
keystone_complete_pipeline = define_asset_job(
    "keystone_complete_pipeline",
    AssetSelection.groups("keystone_extraction") |
    AssetSelection.assets(keystone_dbt_assets)
)
Dagster's @dbt_assets decorator reads your dbt manifest and automatically understands that marts depend on staging models. When I run a mart, it intelligently rebuilds upstream dependencies as needed. This is working for me, but there could be a flaw here that I have not discovered yet.
💯 1
t
I'm assuming your second code block is your dag but what about the first and second block? Where do those go?
s
Not really. The second block is just one node, or one asset. It's responsible for creating the raw table via Meltano.
extract_command = f"run {TAP_NAME} target-postgres--ndo"
The first block is creating Dagster assets from DBT models. (stg and mart tables) The third block is a job that runs the first and second block if that makes sense. It can live anywhere really.