Hello all. Currently having problems trying to use...
# troubleshooting
j
Hello all. Currently having problems trying to use Dagster/DBT/Meltano together and I would appreciate some insight into what I am doing wrong. I have been able to run Meltano (`tap-mssql, target-snowflake)`and DBT successfully now against Snowflake. I've taken inspiration from https://github.com/quantile-development/dagster-meltano/issues/28 in trying to register my taps/streams from meltano as assets, as well as DBT. Questions in 🧵
My first issue is that although I am able to see the DBT resource, Dagster is failing at trying to load assets (the models themselves) I know that I can run my DBT (See screenshot). I am currently using the following dbt version my meltano yaml files right now
Copy code
plugins:
  utilities:
  - name: dagster
    variant: quantile-development
    pip_url: dagster-ext pendulum<3
    # Run custom commands (meltano invoke dagster:(COMMAND))
    commands:
      start_local: # Run command if not in Docker
        args: dev -f $REPOSITORY_DIR/repository.py --dagit-host 127.0.0.1 -d $REPOSITORY_DIR
        executable: dagster_invoker
      start_docker: # Run command if in Docker
        args: dev -f $REPOSITORY_DIR/repository.py --dagit-host 0.0.0.0 -d $REPOSITORY_DIR
        executable: dagster_invoker
Copy code
plugins:
  utilities:
  - name: dbt-snowflake
    variant: dbt-labs
    # <https://docs.getdbt.com/docs/dbt-versions/core> #dbt-core~=1.5.9 dbt-snowflake~=1.5.9
    pip_url: dbt-core~=1.7.0 dbt-snowflake~=1.7.0 git+<https://github.com/meltano/dbt-ext.git@main>
When I try to work off of the code from Andy Carter's suggestion, trying to adapt it to point to the project and profiles, i get the following error
Copy code
dagster_dbt.errors.DagsterDbtCliFatalRuntimeError: Fatal error in the dbt CLI (return code 2): Running with dbt=1.5.9 Error importing adapter: No module named 'dbt.adapters.snowflake' Encountered an error:
I have tried changing dbt-core back to 1.5.9, in my transformer utiltiy but it's still giving the same error. The current repository code I have is below, which is what the solution from that github issue is, with continued modifications to try to dynamically retrieve my taps and streams. That portion is currently commented out because I'm having other problems with Dagster parsing my tap names/resources (but that's another issue entierly)
Copy code
#<https://github.com/quantile-development/dagster-meltano/issues/28>
import os
import enum
import yaml
from pathlib import Path
from os import path

from dagster import ScheduleDefinition, DefaultScheduleStatus, Definitions, define_asset_job, AssetOut, multi_asset, OpExecutionContext, ConfigurableResource, AssetSelection
from dagster_meltano import meltano_resource
from dagster_dbt import load_assets_from_dbt_project, DbtCliResource

DBT_PROJECT_PATH = str(Path(__file__).parent.parent.parent / "transform")
DBT_PROFILE_PATH = str(Path(__file__).parent.parent.parent / "transform/profiles/snowflake")
DBT_TARGET_PATH = str(Path(__file__).parent.parent.parent / ".meltano/transformers/dbt/target")
DBT_PROFILE = os.getenv('DBT_PROFILE')
DBT_TARGET = os.getenv('DBT_TARGET_PATH')

class MeltanoEnv(enum.Enum):
    dev = enum.auto()
    prod = enum.auto()

MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")

# Find the DBT resource
resources= {
    "dbt": DbtCliResource(project_dir=DBT_PROJECT_PATH, profiles_dir = DBT_PROFILE_PATH, target=DBT_TARGET, profile=DBT_PROFILE),
    #"meltano": meltano_resource
}

def meltano_asset_factory() -> list:
    multi_assets = []
    jobs = []
    schedules = []

    # Retrieve the file path of our extractor yml where our taps live
    startingPath = path.dirname(__file__)
    # ./orchestrate/dagster/repository.py => ./extract/extractors.meltano.yml
    ymlFilePath = path.abspath(path.join(startingPath, "..", "..", "extract", "extractors.meltano.yml"))
    # We want to ignore any non-inherited taps
    excludes = ['tap-mssql']
    # Establish Empty Dictionary
    all_tap_streams = {}

    # Get a list of all of taps and their respective data pulls
    with open(ymlFilePath, 'r') as reader:
        yamlOutput = yaml.safe_load(reader)
        for extractor in yamlOutput['plugins']['extractors']:
            if extractor['name'] not in excludes:
                all_tap_streams[extractor['name'].split("-")[-1]] = [item.split(".")[0] for item in extractor['select']]

    # for tap_name, tap_streams in all_tap_streams.items():
    #     @multi_asset(
    #         name=tap_name,
    #         resource_defs={'meltano': meltano_resource},
    #         compute_kind="meltano",
    #         group_name=tap_name,
    #         outs={
    #         stream: AssetOut(key_prefix=[f'{tap_name}'])
    #         for stream
    #         in tap_streams
    #         }
    #     )
    #     def compute(context: OpExecutionContext, meltano: ConfigurableResource):
    #         command = f"run tap-{context.op.name} target-snowflake"
    #         meltano.execute_command(f"{command}", dict(), context.log)
    #         return tuple([None for _ in context.selected_output_names])
        
    #     multi_assets.append(compute)

    #     asset_job = define_asset_job(f"{tap_name}_assets", AssetSelection.groups(tap_name))

    #     basic_schedule = ScheduleDefinition(
    #         job=asset_job, 
    #         cron_schedule="@hourly", 
    #         default_status=DefaultScheduleStatus.RUNNING
    #     )

    #     jobs.append(asset_job)
    #     schedules.append(basic_schedule)

    return multi_assets, jobs, schedules

meltano_assets, jobs, schedules = meltano_asset_factory()

dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILE_PATH)

defs = Definitions(
    assets= (dbt_assets + meltano_assets),
    resources= resources,
    jobs=jobs,
    schedules=schedules,
)
If I comment out the
load_assets_from_dbt_project
it does successfully load in Datser, and I can see the dbt resourec from
DbtCliResource(project_dir=DBT_PROJECT_PATH, profiles_dir = DBT_PROFILE_PATH, target=DBT_TARGET, profile=DBT_PROFILE),
I can't seem to understand why that works but using the same project and profile directories for
load_assets_from_dbt_project
it crashes, saying it cannot find the adapter (and that's it's using a different version of dbt)
@Andy Carter @jan_soubusta I apologize for pinging you both but I've seen you both here talk about using dagster and meltano together with the extensions so I'm hoping you could see what I'm doing wrong here for trying to load dbt assets
d
@joshua_janicas dagster and dbt-snowflake installed as independed plugins in meltano and they have dedicated envs so when you invoke dbt from dagster it doesn’t have snowflake adapter by default in its env
j
Hi Denis, thanks for the insight there. That means I have to include
dbt-snowflake
in my pip_url?
d
yep, include dbt-snowflake in dagster plugin’s pip_url
j
I am guessing I should try also tell it to use dbt-core 1.7? I am concerned that dagster is trying to use dbt= 1.5.9 when I am actively using dbt>1.7 :P
d
I tried to wrap meltano’s invoke dbt command inside dagster to keep dependencies clean, but the dagster’s dbt plugin doesn’t recognise it as a dbt binary. I ended up with the forcing of dbt version for both meltano and dagster envs
meltano.yml:
Copy code
env:
  GLOBAL_DBT_CORE_VERSION: ~=1.7.4
  GLOBAL_DBT_POSTGRES_VERSION: ~=1.7.4
for both plugins:
Copy code
pip_url: dbt-core${GLOBAL_DBT_CORE_VERSION} dbt-postgres${GLOBAL_DBT_POSTGRES_VERSION}
🦜 1
j
Thank you. I know it's late in your time zone on a friday night so i 100% appreciate this
🤓 1
So some good news, I have been successful in loading the lineage from both Meltano (dynamically!) and DBT , which is great! I've also been able to run a Meltano E+L command within Dagster. A subsequent question I have now is that it seems that my seeds as well as my sources are not displaying lineage properly. Tap: Because my tap is from a SQL database (
tap-mssql
), my streams are my actual tables I am pulling the data are coming in the same format as I have it in my YAML file (e.g.
Content-TransactionLocationUpdate
). Meanwhile DBT currently has the same usage of the lineage (with
{{source}}
) as an asset, but with
Content\TransactionLocationUpdate
Example:
Copy code
- name: tap-mssql-content
    inherit_from: tap-mssql
    select:
    - Content-ResourceTransactionLocationReservability.*
    - Content-TransactionLocationUpdate.*
    - Content-TransactionLocationLocalization.*
Right now the repository file I have been utilizing
Copy code
#<https://github.com/quantile-development/dagster-meltano/issues/28>
import os
import enum
import yaml
from pathlib import Path
from os import path

from dagster import ScheduleDefinition, DefaultScheduleStatus, Definitions, define_asset_job, AssetOut, multi_asset, OpExecutionContext, ConfigurableResource, AssetSelection
from dagster_meltano import meltano_resource
from dagster_dbt import load_assets_from_dbt_project, DbtCliResource

DBT_PROJECT_PATH = str(Path(__file__).parent.parent.parent / "transform")
DBT_PROFILE_PATH = str(Path(__file__).parent.parent.parent / "transform/profiles/snowflake")
DBT_TARGET_PATH = str(Path(__file__).parent.parent.parent / ".meltano/transformers/dbt/target")
DBT_PROFILE = os.getenv('DBT_PROFILE')
DBT_TARGET = os.getenv('DBT_TARGET_PATH')

class MeltanoEnv(enum.Enum):
    dev = enum.auto()
    prod = enum.auto()

MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")

# Find the DBT resource
# <https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.DbtCliResource>
resources= {
    "dbt": DbtCliResource(
        DBT_PROJECT_PATH,
        DBT_PROFILE_PATH, 
        [], 
        DBT_TARGET, 
        DBT_PROFILE),
    #"meltano": meltano_resource
}

def meltano_asset_factory() -> list:
    multi_assets = []
    jobs = []
    schedules = []

    # Retrieve the file path of our extractor yml where our taps live
    startingPath = path.dirname(__file__)
    # ./orchestrate/dagster/repository.py => ./extract/extractors.meltano.yml
    ymlFilePath = path.abspath(path.join(startingPath, "..", "..", "extract", "extractors.meltano.yml"))
    # We want to ignore any non-inherited taps
    excludes = ['tap-mssql']
    # Establish Empty Dictionary
    all_tap_streams = {}

    # Get a list of all of taps and their respective data pulls
    with open(ymlFilePath, 'r') as reader:
        yamlOutput = yaml.safe_load(reader)
        for extractor in yamlOutput['plugins']['extractors']:
            if extractor['name'] not in excludes:
                all_tap_streams[extractor['name'].split("-")[-1]] = [item.split(".")[0] for item in extractor['select']]

    for tap_name, tap_streams in all_tap_streams.items():
        @multi_asset(
            name=tap_name,
            resource_defs={'meltano': meltano_resource},
            compute_kind="meltano",
            group_name=tap_name,
            outs={
            stream: AssetOut(key_prefix=[f'{tap_name}'])
            for stream
            in tap_streams
            }
        )
        def compute(context: OpExecutionContext, meltano: ConfigurableResource):
            command = f"run tap-mssql-{context.op.name} target-snowflake"
            meltano.execute_command(f"{command}", dict(), context.log)
            return tuple([None for _ in context.selected_output_names])
        
        multi_assets.append(compute)

        asset_job = define_asset_job(f"{tap_name}_assets", AssetSelection.groups(tap_name))

        basic_schedule = ScheduleDefinition(
            job=asset_job, 
            cron_schedule="@hourly", 
            default_status=DefaultScheduleStatus.RUNNING
        )

        jobs.append(asset_job)
        schedules.append(basic_schedule)

    return multi_assets, jobs, schedules

meltano_assets, jobs, schedules = meltano_asset_factory()

#<https://docs.dagster.io/_apidocs/libraries/dagster-dbt#assets-dbt-core>
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILE_PATH, target_dir=DBT_TARGET_PATH, use_build_command=True)

defs = Definitions(
    assets= (dbt_assets + meltano_assets),
    resources= resources,
    jobs=jobs,
    schedules=schedules,
)
Image of the asset from Dagster as DBT sees it (?)
Meanwhile the seed (red line) has the exact same name as what the model is depending on but its not showing the lineage
I have a feeling I need to fix up date the script so that the names of the Meltano and what dbt sees is equivalent, to dagster's point of view? Not sure how to fix the seed. I've seen @dbt-asset https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.dbt_assets as an alternative to
load_assets_from_dbt_project
but I don't know if that's the answer here.
Update; Figured out the Meltano side, had to do some additional splitting for my dynamic stuff
all_tap_streams[extractor['name'].split("-")[-1]] = [item.split(".")[0] for item in extractor['select']]
to
all_tap_streams[extractor['name'].split("-")[-1]] = [item.split("-")[1].split(".")[0] for item in extractor['select']]
As for the Seeds, it's doubling up on the entries because of the name I gave my seeds under
sources.yml
. However when I try to make the names the same name I get the following error:
Copy code
2024-01-05 17:02:28 -0500 - dagster.daemon.SensorDaemon - WARNING - Could not load location repository.py to check for sensors due to the following error: dagster._core.errors.DagsterInvalidDefinitionError: Assets can only depend on themselves if they are time-partitioned and each partition depends on earlier partitions
Despite me running dbt:seed and dbt:run seperately it does run fine. 😢. Any thoughts welcome
d
In general to connect assets to each other you have to define proper upstream/downstream asset keys. Dagster and dbt might use different asset key schemas, but you can fix it with DagsterDbtTranslator. There is more down in the article about upstream/downstream dependencies. Btw, there is nice ai in dagster’s slack channel (#ask-ai) which covers both dagster/dbt integrations and really helpful to navigate docs and solve advanced challenges. And it doesn’t sleep 😁
👀 1
❤️ 1
🙂 1