joshua_janicas
01/05/2024, 4:46 PMjoshua_janicas
01/05/2024, 4:48 PMplugins:
utilities:
- name: dagster
variant: quantile-development
pip_url: dagster-ext pendulum<3
# Run custom commands (meltano invoke dagster:(COMMAND))
commands:
start_local: # Run command if not in Docker
args: dev -f $REPOSITORY_DIR/repository.py --dagit-host 127.0.0.1 -d $REPOSITORY_DIR
executable: dagster_invoker
start_docker: # Run command if in Docker
args: dev -f $REPOSITORY_DIR/repository.py --dagit-host 0.0.0.0 -d $REPOSITORY_DIR
executable: dagster_invoker
plugins:
utilities:
- name: dbt-snowflake
variant: dbt-labs
# <https://docs.getdbt.com/docs/dbt-versions/core> #dbt-core~=1.5.9 dbt-snowflake~=1.5.9
pip_url: dbt-core~=1.7.0 dbt-snowflake~=1.7.0 git+<https://github.com/meltano/dbt-ext.git@main>
joshua_janicas
01/05/2024, 4:50 PMdagster_dbt.errors.DagsterDbtCliFatalRuntimeError: Fatal error in the dbt CLI (return code 2): Running with dbt=1.5.9 Error importing adapter: No module named 'dbt.adapters.snowflake' Encountered an error:
I have tried changing dbt-core back to 1.5.9, in my transformer utiltiy but it's still giving the same error.
The current repository code I have is below, which is what the solution from that github issue is, with continued modifications to try to dynamically retrieve my taps and streams. That portion is currently commented out because I'm having other problems with Dagster parsing my tap names/resources (but that's another issue entierly)
#<https://github.com/quantile-development/dagster-meltano/issues/28>
import os
import enum
import yaml
from pathlib import Path
from os import path
from dagster import ScheduleDefinition, DefaultScheduleStatus, Definitions, define_asset_job, AssetOut, multi_asset, OpExecutionContext, ConfigurableResource, AssetSelection
from dagster_meltano import meltano_resource
from dagster_dbt import load_assets_from_dbt_project, DbtCliResource
DBT_PROJECT_PATH = str(Path(__file__).parent.parent.parent / "transform")
DBT_PROFILE_PATH = str(Path(__file__).parent.parent.parent / "transform/profiles/snowflake")
DBT_TARGET_PATH = str(Path(__file__).parent.parent.parent / ".meltano/transformers/dbt/target")
DBT_PROFILE = os.getenv('DBT_PROFILE')
DBT_TARGET = os.getenv('DBT_TARGET_PATH')
class MeltanoEnv(enum.Enum):
dev = enum.auto()
prod = enum.auto()
MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")
# Find the DBT resource
resources= {
"dbt": DbtCliResource(project_dir=DBT_PROJECT_PATH, profiles_dir = DBT_PROFILE_PATH, target=DBT_TARGET, profile=DBT_PROFILE),
#"meltano": meltano_resource
}
def meltano_asset_factory() -> list:
multi_assets = []
jobs = []
schedules = []
# Retrieve the file path of our extractor yml where our taps live
startingPath = path.dirname(__file__)
# ./orchestrate/dagster/repository.py => ./extract/extractors.meltano.yml
ymlFilePath = path.abspath(path.join(startingPath, "..", "..", "extract", "extractors.meltano.yml"))
# We want to ignore any non-inherited taps
excludes = ['tap-mssql']
# Establish Empty Dictionary
all_tap_streams = {}
# Get a list of all of taps and their respective data pulls
with open(ymlFilePath, 'r') as reader:
yamlOutput = yaml.safe_load(reader)
for extractor in yamlOutput['plugins']['extractors']:
if extractor['name'] not in excludes:
all_tap_streams[extractor['name'].split("-")[-1]] = [item.split(".")[0] for item in extractor['select']]
# for tap_name, tap_streams in all_tap_streams.items():
# @multi_asset(
# name=tap_name,
# resource_defs={'meltano': meltano_resource},
# compute_kind="meltano",
# group_name=tap_name,
# outs={
# stream: AssetOut(key_prefix=[f'{tap_name}'])
# for stream
# in tap_streams
# }
# )
# def compute(context: OpExecutionContext, meltano: ConfigurableResource):
# command = f"run tap-{context.op.name} target-snowflake"
# meltano.execute_command(f"{command}", dict(), context.log)
# return tuple([None for _ in context.selected_output_names])
# multi_assets.append(compute)
# asset_job = define_asset_job(f"{tap_name}_assets", AssetSelection.groups(tap_name))
# basic_schedule = ScheduleDefinition(
# job=asset_job,
# cron_schedule="@hourly",
# default_status=DefaultScheduleStatus.RUNNING
# )
# jobs.append(asset_job)
# schedules.append(basic_schedule)
return multi_assets, jobs, schedules
meltano_assets, jobs, schedules = meltano_asset_factory()
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILE_PATH)
defs = Definitions(
assets= (dbt_assets + meltano_assets),
resources= resources,
jobs=jobs,
schedules=schedules,
)
joshua_janicas
01/05/2024, 4:54 PMload_assets_from_dbt_project
it does successfully load in Datser, and I can see the dbt resourec from DbtCliResource(project_dir=DBT_PROJECT_PATH, profiles_dir = DBT_PROFILE_PATH, target=DBT_TARGET, profile=DBT_PROFILE),
joshua_janicas
01/05/2024, 4:55 PMload_assets_from_dbt_project
it crashes, saying it cannot find the adapter (and that's it's using a different version of dbt)joshua_janicas
01/05/2024, 5:02 PMDenis I.
01/05/2024, 5:06 PMjoshua_janicas
01/05/2024, 5:12 PMdbt-snowflake
in my pip_url?Denis I.
01/05/2024, 5:14 PMjoshua_janicas
01/05/2024, 5:16 PMDenis I.
01/05/2024, 5:17 PMDenis I.
01/05/2024, 5:20 PMenv:
GLOBAL_DBT_CORE_VERSION: ~=1.7.4
GLOBAL_DBT_POSTGRES_VERSION: ~=1.7.4
for both plugins:
pip_url: dbt-core${GLOBAL_DBT_CORE_VERSION} dbt-postgres${GLOBAL_DBT_POSTGRES_VERSION}
joshua_janicas
01/05/2024, 5:21 PMjoshua_janicas
01/05/2024, 9:03 PMtap-mssql
), my streams are my actual tables I am pulling the data are coming in the same format as I have it in my YAML file (e.g. Content-TransactionLocationUpdate
).
Meanwhile DBT currently has the same usage of the lineage (with {{source}}
) as an asset, but with Content\TransactionLocationUpdate
Example:
- name: tap-mssql-content
inherit_from: tap-mssql
select:
- Content-ResourceTransactionLocationReservability.*
- Content-TransactionLocationUpdate.*
- Content-TransactionLocationLocalization.*
Right now the repository file I have been utilizing
#<https://github.com/quantile-development/dagster-meltano/issues/28>
import os
import enum
import yaml
from pathlib import Path
from os import path
from dagster import ScheduleDefinition, DefaultScheduleStatus, Definitions, define_asset_job, AssetOut, multi_asset, OpExecutionContext, ConfigurableResource, AssetSelection
from dagster_meltano import meltano_resource
from dagster_dbt import load_assets_from_dbt_project, DbtCliResource
DBT_PROJECT_PATH = str(Path(__file__).parent.parent.parent / "transform")
DBT_PROFILE_PATH = str(Path(__file__).parent.parent.parent / "transform/profiles/snowflake")
DBT_TARGET_PATH = str(Path(__file__).parent.parent.parent / ".meltano/transformers/dbt/target")
DBT_PROFILE = os.getenv('DBT_PROFILE')
DBT_TARGET = os.getenv('DBT_TARGET_PATH')
class MeltanoEnv(enum.Enum):
dev = enum.auto()
prod = enum.auto()
MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")
# Find the DBT resource
# <https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.DbtCliResource>
resources= {
"dbt": DbtCliResource(
DBT_PROJECT_PATH,
DBT_PROFILE_PATH,
[],
DBT_TARGET,
DBT_PROFILE),
#"meltano": meltano_resource
}
def meltano_asset_factory() -> list:
multi_assets = []
jobs = []
schedules = []
# Retrieve the file path of our extractor yml where our taps live
startingPath = path.dirname(__file__)
# ./orchestrate/dagster/repository.py => ./extract/extractors.meltano.yml
ymlFilePath = path.abspath(path.join(startingPath, "..", "..", "extract", "extractors.meltano.yml"))
# We want to ignore any non-inherited taps
excludes = ['tap-mssql']
# Establish Empty Dictionary
all_tap_streams = {}
# Get a list of all of taps and their respective data pulls
with open(ymlFilePath, 'r') as reader:
yamlOutput = yaml.safe_load(reader)
for extractor in yamlOutput['plugins']['extractors']:
if extractor['name'] not in excludes:
all_tap_streams[extractor['name'].split("-")[-1]] = [item.split(".")[0] for item in extractor['select']]
for tap_name, tap_streams in all_tap_streams.items():
@multi_asset(
name=tap_name,
resource_defs={'meltano': meltano_resource},
compute_kind="meltano",
group_name=tap_name,
outs={
stream: AssetOut(key_prefix=[f'{tap_name}'])
for stream
in tap_streams
}
)
def compute(context: OpExecutionContext, meltano: ConfigurableResource):
command = f"run tap-mssql-{context.op.name} target-snowflake"
meltano.execute_command(f"{command}", dict(), context.log)
return tuple([None for _ in context.selected_output_names])
multi_assets.append(compute)
asset_job = define_asset_job(f"{tap_name}_assets", AssetSelection.groups(tap_name))
basic_schedule = ScheduleDefinition(
job=asset_job,
cron_schedule="@hourly",
default_status=DefaultScheduleStatus.RUNNING
)
jobs.append(asset_job)
schedules.append(basic_schedule)
return multi_assets, jobs, schedules
meltano_assets, jobs, schedules = meltano_asset_factory()
#<https://docs.dagster.io/_apidocs/libraries/dagster-dbt#assets-dbt-core>
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILE_PATH, target_dir=DBT_TARGET_PATH, use_build_command=True)
defs = Definitions(
assets= (dbt_assets + meltano_assets),
resources= resources,
jobs=jobs,
schedules=schedules,
)
joshua_janicas
01/05/2024, 9:09 PMjoshua_janicas
01/05/2024, 9:11 PMjoshua_janicas
01/05/2024, 9:16 PMload_assets_from_dbt_project
but I don't know if that's the answer here.joshua_janicas
01/05/2024, 9:52 PMall_tap_streams[extractor['name'].split("-")[-1]] = [item.split(".")[0] for item in extractor['select']]
to
all_tap_streams[extractor['name'].split("-")[-1]] = [item.split("-")[1].split(".")[0] for item in extractor['select']]
joshua_janicas
01/05/2024, 10:17 PMsources.yml
. However when I try to make the names the same name I get the following error:
2024-01-05 17:02:28 -0500 - dagster.daemon.SensorDaemon - WARNING - Could not load location repository.py to check for sensors due to the following error: dagster._core.errors.DagsterInvalidDefinitionError: Assets can only depend on themselves if they are time-partitioned and each partition depends on earlier partitions
joshua_janicas
01/05/2024, 10:17 PMDenis I.
01/05/2024, 11:07 PM