Ian OLeary
04/26/2024, 5:48 PMAndy Carter
04/29/2024, 8:36 AMAndy Carter
04/29/2024, 8:37 AMIan OLeary
04/29/2024, 1:36 PMAttributeError: 'str' object has no attribute 'items'
File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 443, in iterate_with_context
next_output = next(iterator)
File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster_dbt/asset_defs.py", line 290, in _dbt_op
dbt_resource.remove_run_results_json()
File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster_dbt/cli/resources.py", line 462, in remove_run_results_json
project_dir = kwargs.get("project_dir", self.default_flags["project-dir"])
File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster_dbt/cli/resources.py", line 176, in default_flags
return self._format_params(self._default_flags, replace_underscores=True)
File "/project/.meltano/utilities/dagster/venv/lib/python3.10/site-packages/dagster_dbt/dbt_resource.py", line 33, in _format_params
flags = {k.replace("_", "-"): v for k, v in flags.items() if v is not None}
I ran dbt build/run locally and it completed successfully, but I still seem to be getting this error when running materialize in dagsterIan OLeary
04/29/2024, 1:39 PMAndy Carter
04/29/2024, 2:10 PMIan OLeary
04/29/2024, 2:16 PMIan OLeary
04/29/2024, 2:16 PMAndy Carter
04/29/2024, 3:48 PMintegration_dbt
channel in the dagster slack and see what someone else saysAndy Carter
04/29/2024, 3:51 PMIan OLeary
04/30/2024, 2:56 PMimport os
from pathlib import Path
from dagster import ScheduleDefinition, DefaultScheduleStatus, Definitions, define_asset_job, AssetOut, multi_asset, OpExecutionContext, ConfigurableResource, AssetSelection
import enum
from dagster_meltano import meltano_resource
from dagster_dbt import load_assets_from_dbt_project, DbtCliResource
DBT_PROJECT_PATH = str(Path(__file__).parent.parent.parent / "transform")
DBT_PROFILE_PATH = str(Path(__file__).parent.parent.parent / "transform/profiles/snowflake")
DBT_TARGET_PATH = str(Path(__file__).parent.parent.parent / "transform/target")
DBT_PROFILE = os.getenv('DBT_PROFILE')
DBT_TARGET = os.getenv('DBT_TARGET_PATH')
class MeltanoEnv(enum.Enum):
dev = enum.auto()
prod = enum.auto()
MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")
resources= {
"dbt": DbtCliResource(
DBT_PROJECT_PATH,
DBT_PROFILE_PATH,
[],
DBT_TARGET,
DBT_PROFILE),
# "meltano": meltano_resource,
}
ALL_TAP_STREAMS = {
"jobdiva": [
"NewUpdatedCandidateRecordsStream",
"NewUpdatedSubmittalInterviewHireActivityRecordsStream",
"NewUpdatedJobRecordsStream",
"NewUpdatedContactRecordsStream",
#"NewUpdatedCompanyRecordsStream",
"NewUpdatedTerminationsStream",
"UsersListStream",
],
}
def meltano_asset_factory(all_tap_streams: list) -> list:
multi_assets = []
jobs = []
schedules = []
for tap_name, tap_streams in all_tap_streams.items():
@multi_asset(
name=tap_name,
resource_defs={'meltano': meltano_resource},
compute_kind="meltano",
group_name=tap_name,
outs={
stream: AssetOut(tap_name)
for stream
in tap_streams
}
)
def compute(context: OpExecutionContext, meltano: ConfigurableResource):
command = f"run tap-{context.op.name} target-snowflake"
meltano.execute_command(f"{command}", dict(), context.log)
return tuple([None for _ in context.selected_output_names])
multi_assets.append(compute)
asset_job = define_asset_job(f"{tap_name}_assets", AssetSelection.groups(tap_name))
basic_schedule = ScheduleDefinition(
job=asset_job,
cron_schedule="@daily",
default_status=DefaultScheduleStatus.STOPPED
)
jobs.append(asset_job)
schedules.append(basic_schedule)
return multi_assets, jobs, schedules
meltano_assets, jobs, schedules = meltano_asset_factory(ALL_TAP_STREAMS)
dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_PATH, profiles_dir=DBT_PROJECT_PATH,)
defs = Definitions(
assets= (dbt_assets + meltano_assets),
resources= resources,
jobs=jobs,
schedules=schedules,
)
I'll take it to the dagster slack as well, but here's my repository.py - I went off of yours originallyAndy Carter
04/30/2024, 8:25 PMIan OLeary
05/01/2024, 12:22 AMIan OLeary
05/01/2024, 12:23 AMIan OLeary
05/02/2024, 6:51 PM