Is there any built-in Meltano support for scheduli...
# getting-started
s
Is there any built-in Meltano support for scheduling ad hoc tasks/jobs (jobs other than elt jobs)? My understanding of the current integration with Airflow/orchestrators is a means through which to schedule elt jobs, and while it installs Airflow proper, there is no way for me to run
meltano schedule my-random-python-script
, right?
t
Not currently. A cool thing to do would be to specify an
executable
for a schedule and have it run that. Right now you would just create a DAG yourself and run it there. Want to open an issue on that? 🙂
s
ok, so just talking through this, but the theoretical example would be 1. add a python file at
./orchestrators/my_dag.py
2. run
meltano schedule <executable_name> --executable 'airflow dags trigger my_dag'
that would create a new scheduled task that would run the executable?
t
Copy code
meltano schedule [SCHEDULE_NAME] [random_python_executable] [INTERVAL]
I think that would be the better way given the current model. Since we're dynamically generating the dags in https://gitlab.com/meltano/files-airflow/-/blob/master/bundle/orchestrate/dags/meltano.py we could just have one that looks for an
executable
type (or something other name) and just runs that
So you wouldn't have to build your own full dag script in that scenario. I'm just saying right now you can make your own dag and just have airflow run it
s
yep, that makes sense. i'm a little fuzzy on how dependency management would work in that scenario, but that would be the desired path i think. i can create an issue
This is how one would implement something like orchestrated testing, right?
meltano schedule great-expectations 'great-expectations --arg foo'
(just as an example of a useful thing to orchestrate alongside elt)
t
This is very much related to work that Florian is currently doing where we want to enable a more composable, arbitrary approach to pipelines https://gitlab.com/meltano/meltano/-/issues/2301
k
to expand on this, if i have a shell script say
test.sh
that has something simple like an echo command, if i add that to the
meltano.yml
like so:
Copy code
- name: meltano-sh
    namespace: meltano_sh
    executable: <PATH_TO_test.sh>
    capabilities: [[]]
    config:
      username: admin
      user_agent: Meltano shell script
and run
meltano schedule meltano2 meltano-sh target-jsonl "* * * * *"
that should work? (for simplicity)
cc @taylor
got some wierd logs in the ui:
Copy code
2022-01-12T19:13:18.017354Z [info ] ascii passed initial chaos probing. Mean measured chaos is 0.300000 %
2022-01-12T19:13:18.018037Z [info ] ascii is most likely the one. Stopping the process.
2022-01-12T19:13:18.576787Z [info ] Running extract & load... job_id=meltano3 name=meltano run_id=47e14570-7609-4050-bc3e-9d5ca3a26943
2022-01-12T19:13:18.592584Z [info ] ELT could not be completed: Cannot start extractor: unhashable type: 'list' cmd_type=elt job_id=meltano3 name=meltano run_id=47e14570-7609-4050-bc3e-9d5ca3a26943 stdio=stderr
t
What was the command you ran? And can you retry with
--log-level=debug
? cc @florian.hines
k
Copy code
#!/bin/sh

echo hello!
shell script was initially a full script to install dependencies but wanted to test it with just a single command
t
are you adding this to your meltano.yml as a utility plugin type? https://meltano.com/docs/plugins.html#utilities
k
hmm intersting havent done that i can give it a try just manually added the snippet
Copy code
- name: meltano-sh
    namespace: meltano_sh
    executable: <PATH_TO_test.sh>
    capabilities: [[]]
    config:
      username: admin
      user_agent: Meltano shell script
to the extractors portion of
meltano.yml
f
If you just invoke this via meltano elt, and throw a
--log-level=debug
on it, you'll get a traceback - I was able to repro it:
Copy code
2022-01-12T19:44:37.170099Z [debug    ] ELT could not be completed: Cannot start extractor: unhashable type: 'list'
Traceback (most recent call last):
  File "/Users/syn/projects/meltano/src/meltano/core/runner/singer.py", line 64, in invoke
    p_tap = await tap.invoke_async(
  File "/Users/syn/projects/meltano/src/meltano/core/plugin_invoker.py", line 277, in invoke_async
    async with self._invoke(*args, **kwargs) as (
  File "/Users/syn/.pyenv/versions/3.8.12/envs/melty-3.8/lib/python3.8/site-packages/async_generator/_util.py", line 34, in __aenter__
    return await self._agen.asend(None)
  File "/Users/syn/projects/meltano/src/meltano/core/plugin_invoker.py", line 262, in _invoke
    async with self.plugin.trigger_hooks("invoke", self, args):
  File "/Users/syn/.pyenv/versions/3.8.12/envs/melty-3.8/lib/python3.8/site-packages/async_generator/_util.py", line 34, in __aenter__
    return await self._agen.asend(None)
  File "/Users/syn/projects/meltano/src/meltano/core/behavior/hookable.py", line 87, in trigger_hooks
    await self.__class__.trigger(self, f"before_{hook_name}", *args, **kwargs)
  File "/Users/syn/projects/meltano/src/meltano/core/behavior/hookable.py", line 115, in trigger
    raise err
  File "/Users/syn/projects/meltano/src/meltano/core/behavior/hookable.py", line 107, in trigger
    await hook_func(target, *args, **kwargs)
  File "/Users/syn/projects/meltano/src/meltano/core/plugin/singer/tap.py", line 177, in look_up_state_hook
    await self.look_up_state(plugin_invoker)
  File "/Users/syn/projects/meltano/src/meltano/core/plugin/singer/tap.py", line 185, in look_up_state
    if "state" not in plugin_invoker.capabilities:
  File "/Users/syn/projects/meltano/src/meltano/core/plugin_invoker.py", line 133, in capabilities
    return frozenset(self.plugin.capabilities)
TypeError: unhashable type: 'list'
Its having issues with that empty nested capabilities list (
capabilities: [[]]
)
@kevin can you double check and make sure thats also where traceback starts on your end ?
k
yup it is, ive removed the capabilities part entirely and ran
meltano schedule meltano2 meltano-sh target-jsonl "* * * * *"
as id like it to run continuously but seems like its throwing out ```2022-01-12T194946.774813Z [info ] ascii passed initial chaos probing. Mean measured chaos is 0.300000 % 2022-01-12T194946.775257Z [info ] ascii is most likely the one. Stopping the process. 2022-01-12T194947.321201Z [info ] Running extract & load... job_id=meltano3 name=meltano run_id=6d18d55d-dd4c-4028-8834-07b192165b5e 2022-01-12T194947.477192Z [info ] Traceback (most recent call last): cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.477567Z [info ] File "/Users/plotly/Desktop/lane-meltano/.meltano/loaders/target-jsonl/venv/bin/target-jsonl", line 8, in <module> cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.477658Z [info ] sys.exit(main()) cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.477731Z [info ] File "/Users/plotly/Desktop/lane-meltano/.meltano/loaders/target-jsonl/venv/lib/python3.8/site-packages/target_jsonl.py", line 94, in main cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.477812Z [info ] state = persist_messages(input_messages, config.get('destination_path', ''), config.get('do_timestamp_file', True)) cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.477883Z [info ] File "/Users/plotly/Desktop/lane-meltano/.meltano/loaders/target-jsonl/venv/lib/python3.8/site-packages/target_jsonl.py", line 47, in persist_messages cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.477950Z [info ] o = singer.parse_message(message).asdict() cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.478021Z [info ] File "/Users/plotly/Desktop/lane-meltano/.meltano/loaders/target-jsonl/venv/lib/python3.8/site-packages/singer/messages.py", line 156, in parse_message cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.478084Z [info ] obj = json.loads(msg) cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.478148Z [info ] File "/Users/plotly/Desktop/lane-meltano/.meltano/loaders/target-jsonl/venv/lib/python3.8/site-packages/simplejson/__init__.py", line 516, in loads cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.478211Z [info ] return _default_decoder.decode(s) cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.478279Z [info ] File "/Users/plotly/Desktop/lane-meltano/.meltano/loaders/target-jsonl/venv/lib/python3.8/site-packages/simplejson/decoder.py", line 370, in decode cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.478345Z [info ] obj, end = self.raw_decode(s) cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.478410Z [info ] File "/Users/plotly/Desktop/lane-meltano/.meltano/loaders/target-jsonl/venv/lib/python3.8/site-packages/simplejson/decoder.py", line 400, in raw_decode cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-8834-07b192165b5e stdio=stderr 2022-01-12T194947.478477Z [info ] return self.scan_once(s, idx=_w(s, idx).end()) cmd_type=loader job_id=meltano3 name=target-jsonl run_id=6d18d55d-dd4c-4028-88…
cool ive added the shell script to the yml as a utility and ran
meltano invoke yoyo
and seems to work. how would i be able to schedule this? say have it run every hour?
Copy code
utilities:
  - name: yoyo
    namespace: yoyo
    pip_url: yoyo-migrations
    executable: <PATH_TO_TEST.SH>
t
Currently you can’t through the meltano schedule interface b/c it’s too integrated with the elt command. We’re working on that spec in https://gitlab.com/meltano/meltano/-/issues/2924 to make it very generic. That said, you can run it via airflow directly by creating a dag directly. not ideal I realize, but we’re working on making it a better experience!
another option would be to use CI to schedule it. We do that on our squared repo for a bit before moving to airflow https://gitlab.com/meltano/squared gitlab CI has a pretty generous free tier for CI jobs
k
oh cool no problem! so just to be clear (so i dont keep going with this), atm adding a local shell script to as an extractor to my
meltano.yml
file like so:
Copy code
- name: meltano-sh
    namespace: meltano_sh
    executable: /Users/......./test.sh
    config:
      username: admin
      user_agent: Meltano shell script
and running a schedule command -
meltano schedule test meltano-sh target-jsonl "* * * * *"
will not work
t
correct, as it is tied to the
meltano elt
command which can only do extractors/loaders and dbt transformations. https://meltano.com/docs/command-line-interface.html#schedule the new
meltano run
command, in preview now, will be the way to go in the future https://meltano.com/docs/command-line-interface.html#run
k
cool, any beginner docs on creating a dag / airflow to run the script? (havent done that before)
t
that’s a good request! I don’t know that we have one. @aaronsteers? I’ll drop a link in a second to the DAG meltano uses behind the scenes that you could probably start from
a
@taylor - Sorry, I don't know of samples or guides per se. The squared project is the first that comes to mind.
k
update just created a dag inside of the
orchestrate
folder and ran
meltano invoke airflow scheduler
t
Nice! 😄