Whenever I'm in dagster and I hit "materialize all...
# random
i
Whenever I'm in dagster and I hit "materialize all" for my dbt assets, does that pass the command "meltano invoke dbt:run" to the container / CLI for the run? Or is there some other method dagster uses
a
In dagster you can normally see the dbt command issued directly. Dagster talks to dbt directly through the dagster_dbt integration, not via meltano (unless you specifically configure it that way)
It will probably be` dbt build` as this runs tests too.
i
Copy code
AttributeError: 'str' object has no attribute 'items'
  File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 443, in iterate_with_context
    next_output = next(iterator)
  File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster_dbt/asset_defs.py", line 290, in _dbt_op
    dbt_resource.remove_run_results_json()
  File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster_dbt/cli/resources.py", line 462, in remove_run_results_json
    project_dir = kwargs.get("project_dir", self.default_flags["project-dir"])
  File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster_dbt/cli/resources.py", line 176, in default_flags
    return self._format_params(self._default_flags, replace_underscores=True)
  File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster_dbt/dbt_resource.py", line 33, in _format_params
    flags = {k.replace("_", "-"): v for k, v in flags.items() if v is not None}
I ran dbt build/run locally and it completed successfully, but I still seem to be getting this error when running materialize in dagster
But it isn't printing the dbt command its trying to run
a
Is that error coming from dagster? Can you share any more context?
i
yeah it's from dagster whenever I hit "materialize all"
on my dbt assets
a
Is there any more error code? Might be worth taking it to the
integration_dbt
channel in the dagster slack and see what someone else says
1
Could you share your repository.py in full as well?
i
Copy code
import os
from pathlib import Path

from dagster import ScheduleDefinition, DefaultScheduleStatus, Definitions, define_asset_job, AssetOut, multi_asset, OpExecutionContext, ConfigurableResource, AssetSelection
import enum

from dagster_meltano import meltano_resource

from dagster_dbt import load_assets_from_dbt_project, DbtCliResource

DBT_PROJECT_PATH = str(Path(__file__).parent.parent.parent / "transform")
DBT_PROFILE_PATH = str(Path(__file__).parent.parent.parent / "transform/profiles/snowflake")
DBT_TARGET_PATH = str(Path(__file__).parent.parent.parent / "transform/target")
DBT_PROFILE = os.getenv('DBT_PROFILE')
DBT_TARGET = os.getenv('DBT_TARGET_PATH')

class MeltanoEnv(enum.Enum):
    dev = enum.auto()
    prod = enum.auto()

MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")

resources= {
    "dbt": DbtCliResource(
        DBT_PROJECT_PATH,
        DBT_PROFILE_PATH,
        [],
        DBT_TARGET,
        DBT_PROFILE),
    # "meltano": meltano_resource,
}

ALL_TAP_STREAMS = {
    "jobdiva": [
        "NewUpdatedCandidateRecordsStream",
        "NewUpdatedSubmittalInterviewHireActivityRecordsStream",
        "NewUpdatedJobRecordsStream",
        "NewUpdatedContactRecordsStream",
        #"NewUpdatedCompanyRecordsStream",
        "NewUpdatedTerminationsStream",
        "UsersListStream",
    ],
}

def meltano_asset_factory(all_tap_streams: list) -> list:
    multi_assets = []
    jobs = []
    schedules = []

    for tap_name, tap_streams in all_tap_streams.items():
        @multi_asset(
            name=tap_name,
            resource_defs={'meltano': meltano_resource},
            compute_kind="meltano",
            group_name=tap_name,
            outs={
            stream: AssetOut(tap_name)
            for stream
            in tap_streams
            }
        )
        def compute(context: OpExecutionContext, meltano: ConfigurableResource):
            command = f"run tap-{context.op.name} target-snowflake"
            meltano.execute_command(f"{command}", dict(), context.log)
            return tuple([None for _ in context.selected_output_names])
        
        multi_assets.append(compute)

        asset_job = define_asset_job(f"{tap_name}_assets", AssetSelection.groups(tap_name))

        basic_schedule = ScheduleDefinition(
            job=asset_job, 
            cron_schedule="@daily", 
            default_status=DefaultScheduleStatus.STOPPED
        )

        jobs.append(asset_job)
        schedules.append(basic_schedule)

    return multi_assets, jobs, schedules

meltano_assets, jobs, schedules = meltano_asset_factory(ALL_TAP_STREAMS)

dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_PATH, profiles_dir=DBT_PROJECT_PATH,)

defs = Definitions(
    assets= (dbt_assets + meltano_assets),
    resources= resources,
    jobs=jobs,
    schedules=schedules,
)
I'll take it to the dagster slack as well, but here's my repository.py - I went off of yours originally
a
Which versions of dbt/dagster are you on now?
i
pip_url: dagster-ext==0.1.0 dbt-core~=1.7.0 dbt-snowflake~=1.7.0 pendulum<3
dbt-core/snowflake versions are the same in my dbt utility as well
@Andy Carter which versions are you on?