Hi all, I'm working on a pipeline that pulls news ...
# best-practices
j
Hi all, I'm working on a pipeline that pulls news data in from an api, stores it in postgres, scrubs it with dbt, and then does some sentiment analysis by calling a LLM endpoint with each new row. Currently I have the pipeline scheduled using the Airflow orchestrator, but it stops before the sentiment analysis/LLM endpoint step happens. I currently have the sentiment analysis/LLM endpoint step in a separate project and was going to call its Docker image from inside the Airflow orchestrator, but thought it would be nice to have everything inside Meltano. Is there a best practice for where to put the python scripts for calling the LLM endpoint? Also, is there an easy way to trigger them automatically after the previous steps run, or do I just create a new DAG for the Airflow orchestrator?
After going back and reading the docs again the answer was right in front of my face (ofc). I ended up creating a custom utility for running generic llm-postgres transforms:
Copy code
utilities:
  - name: airflow
    variant: apache
    pip_url: ...
  - name: llm-postgres
    namespace: llm-postgres
    commands:
      run:
        executable: python
        args: llm/main.py
Then in the llm folder in the root of the Meltano project (
/llm/configs
) you specify the requirements for the LLM call. Each config yaml specifies the required LLM arguments such as model details and prompt template. The AI generated JSON results are then saved in the specified destination table.
Copy code
config:  
  model: "deepseek-r1-distill-qwen-32b"
  api_key_env: "GROQ_API_KEY"  # Name of environment variable with actual key
  base_url: "<https://api.groq.com/openai/v1>"
  messages:
    - role: "system"
      content: "You are an expert at crafting jokes given a subject's name."
    - role: "user"
      content: |
        Create a joke about {author}.
  parameters:
    temperature: 0.7  # Range: 0-2, default 1.0
    top_p: 0.9        # Range: 0-1, default 1.0
    
sources:
  - name: "authors"
    schema: "public"
    tables:
      - name: "authors"
        primary_key: "authors"
        input_mapping:
          - column: "authors"
            alias: "author"

destination:
  schema: "public"
  table: "jokes"
  columns: ["content", "explanation", "rating"]  
  load_strategy: "upsert"  # append-only|upsert|overwrite
Now I can add the utility to the job and it will be run as part of the pipeline as desired:
Copy code
jobs:
- name: demo-pipeline
  tasks:
  - tap-github target-postgres dbt-postgres:run llm-postgres:run
Let's me keep data transformations and orchestration inside Meltano which is great.