Conner Panarella
07/01/2024, 7:20 PMAndy Carter
07/02/2024, 8:17 AMAndy Carter
07/03/2024, 7:19 AMmeltano
commands, or with an orchestrator like Airflow or Dagster?Conner Panarella
07/03/2024, 1:15 PMmeltano add files files-docker
the only addition I've made is adding an additional meltano invoke dagster:start
as a CMD in the Dockerfile.Conner Panarella
07/03/2024, 1:16 PMAndy Carter
07/03/2024, 1:19 PMConner Panarella
07/03/2024, 1:19 PMAndy Carter
07/03/2024, 1:21 PMdagster-ext
extension and getting that up and running with your meltano streams.
Code example here for repository.py
https://github.com/quantile-development/dagster-meltano/issues/28#issuecomment-1655283987Conner Panarella
07/03/2024, 1:26 PMConner Panarella
07/03/2024, 1:59 PMAndy Carter
07/03/2024, 2:15 PM# <http://registry.gitlab.com/meltano/meltano:latest|registry.gitlab.com/meltano/meltano:latest> is also available in GitLab Registry
ARG MELTANO_IMAGE=meltano/meltano:v2.20.0-python3.10
FROM $MELTANO_IMAGE
ARG ado_token
ENV ADO_TOKEN $ado_token
WORKDIR /project
# Install any additional requirements
COPY ./requirements.txt .
RUN pip install -r requirements.txt
COPY meltano.yml logging.yaml ./
ADD meltano-yml meltano-yml
ADD plugins plugins
# Copy over Meltano project directory
# COPY . .
RUN meltano install
COPY ga4_reports.json ./
# then copy dbt models in orchestrate folder
ADD orchestrate orchestrate
# Installs dbt's required dependencies and extra packages
RUN meltano invoke dagster:dbt_deps
# overwrite dagster.yaml with contents of dagster_azure.yaml
RUN rm -rf ./orchestrate/dagster/dagster.yaml
COPY ./orchestrate/dagster/dagster_azure.yaml ./orchestrate/dagster/dagster.yaml
# Don't allow changes to containerized project files
ENV MELTANO_PROJECT_READONLY 1
# Expose default port used by `meltano ui`
EXPOSE 5000
# Expose port used for postgres connection
EXPOSE 5432
# Expose port used for postgres connection
EXPOSE 3000
ENTRYPOINT ["meltano"]
Andy Carter
07/03/2024, 2:19 PMdagster.yaml
lines are because I want different behaviour in cloud to local (writing logs etc), and there's no native way to to this with a single dagster.yaml
file.Conner Panarella
07/03/2024, 3:20 PMAndy Carter
07/03/2024, 9:35 PMmeltano invoke dagster:dev
utilities:
- name: dagster
variant: quantile-development
pip_url: git+<https://github.com/quantile-development/dagster-ext@v0.1.1> dagster-postgres dagster-dbt dbt-postgres<1.8.0 dagster-azure dagster_msteams elementary-data
settings:
- name: dagster_home
env: DAGSTER_HOME
value: $MELTANO_PROJECT_ROOT/orchestrate/dagster
- name: dbt_load
env: DAGSTER_DBT_PARSE_PROJECT_ON_LOAD
value: 1
commands:
dev:
args: dev --workspace $REPOSITORY_DIR/workspace.yml --dagit-host 0.0.0.0
executable: dagster_invoker
Andy Carter
07/03/2024, 9:37 PMdev
shouldn't really be used for production but it works for me 🙂Andy Carter
07/04/2024, 7:08 AMresource appService 'Microsoft.Web/sites@2022-09-01' = {
name: '${env}-dagster-${uniqueString(resourceGroup().id)}'
location: location
properties: {
serverFarmId: appServicePlan.id
siteConfig: {
alwaysOn: true
linuxFxVersion: 'DOCKER|${containerRegistryName}.<http://azurecr.io/myimage:latest|azurecr.io/myimage:latest>'
appSettings: appSettings
appCommandLine: 'invoke dagster:dev'
ipSecurityRestrictions: securityRestrictions
ipSecurityRestrictionsDefaultAction: 'Deny'
publicNetworkAccess: null
}
virtualNetworkSubnetId: subnetID
}
}
And you need SERVER_PORT=3000
in the env variables for dagit.Conner Panarella
07/05/2024, 4:21 PMdagster-meltano
here: meltano_jobs = load_jobs_from_meltano_project(MELTANO_PROJECT_DIR)
so I'm not sure if I can further customize them with tags to limit concurrencyAndy Carter
07/06/2024, 12:48 PMAndy Carter
07/06/2024, 12:50 PMAndy Carter
07/06/2024, 12:50 PMConner Panarella
07/08/2024, 3:43 PMAndy Carter
07/09/2024, 9:55 AMdagster.yaml
you can see your global concurrency limits and specifically for certain tags:
run_queue:
max_concurrent_runs: 15
tag_concurrency_limits:
- key: "database"
value: "redshift" # applies when the `database` tag has a value of `redshift`
limit: 4
- key: "dagster/backfill" # applies when the `dagster/backfill` tag is present, regardless of value
limit: 10
Are you talking about job-level concurrency?Conner Panarella
07/09/2024, 1:12 PMmeltano.yaml
and update the run_queue as you showed above. That way the repository.py
could just use load_jobs_from_meltano_project
. However, I don't think this is possible, so I will need to define each job and schedule in dagster instead of meltano.Andy Carter
07/09/2024, 1:35 PMload_jobs..
doesn't give a full view of the individual streams, no. If you care about that I think you need to define them using multi_asset approach.Conner Panarella
07/23/2024, 2:21 PMConner Panarella
07/23/2024, 2:22 PMAndy Carter
07/23/2024, 2:30 PMAndy Carter
07/23/2024, 2:35 PMConner Panarella
07/23/2024, 2:40 PMexecute_shell_command
because of the significant performance hit described here: https://discuss.dagster.io/t/18765172/u0667dnc02y-i-have-a-meltano-job-that-takes-5-mins-when-i-ru
Here's the function that I created to help construct the jobs:
def make_meltano_job(tap_name: str):
cleaned_name = tap_name.replace("-", "_")
@op(
name=f"{cleaned_name}_tap_op"
)
def meltano_shell_op(context: OpExecutionContext):
execute_shell_command(
f"meltano run {tap_name} target-postgres --force",
output_logging="STREAM",
log=context.log)
@job(
name=f"{cleaned_name}_tap_job"
)
def meltano_run_job():
meltano_shell_op()
return meltano_run_job
From there I create jobs like this:
all_shopify_taps = [
'tap-shopify--store1',
'tap-shopify--store2
]
shopify_jobs = [make_meltano_job(shopify_tap) for shopify_tap in all_shopify_taps]
Andy Carter
07/23/2024, 2:53 PMConner Panarella
07/23/2024, 2:56 PM