Hi there, what would be the best way to have a "co...
# best-practices
c
Hi there, what would be the best way to have a "complex" state? In my case I'm indexing blockchain contract events and I want to track for each contract the last indexed block. my idea was to put something like this in the sate:
Copy code
state = { query.contract_address: max(query.to_block, state[query.contract_address]) for query in queries }
and on subsequent runs, restart querying from the most recently imported contract_address block. The trick is, sometimes there is no data for a block range but I still want to remember that I already fetched this block range so I can't derive state from the record stream. I don't understand how to do that with meltano's sdk at the moment, it's most likely related to incremental replication but I don't understand how to set or get the state
m
partitioning?
c
Investigating partitioning, thx
Looks like I'de need a mix of incremental replication and partitioning. Looks very convoluted just to update a dict
Also partitioning isn't multithreaded by default
but to get finer control, I probably need to roll out my own tap without the meltano sdk
e
Hi Clement! What would state look like in each of these cases: 1. When there's data in the block range 2. When there's no data in for a block range If it's different replication keys, it's a whole lot more complex but it's the same replication key just with different logic to retrieve the values then you might be able to accomplish what you need with a few overrides in the SDK classes.
c
Hi Edgar, I think I'll fully deviate from the SDK anyway because I want different events to land in a different table. So I would want a process that can yield multiple streams and maintain a single state. That looks like a stretch for the current SDK's assumptions so overriding part of the Stream class doesn't look like a good idea. This and the fact that I want a state to be saved when there is no data for a block range, I might be better off sending SCHEMA, RECORD and STATE messages manually just to keep things simple.
Now I'm just looking at how I can fully control the schema, record and state messages while still getting the meltano sdk features i'm interested in (logging, state management, configuration)
e
Great. Feel free to log any issues for things in the SDK that are not modular enough for your needs. I would hope to make the SDK both a high-level framework and a low-level set of components that can be used to build any sort of singer tap or target.
❣️ 1
c
At the moment I'm struggling a bit to understand how to get things done. Ngl, an example tap with low-level control on singer messages would be very welcome. I tried to reuse the
_singer_sdk
's
SchemaMessage
but there is a property I don't understand and that's not in the singer 0.3 spec:
bookmark_properties
. Another structure I don't understand is
ActivateVersionMessage
m
hey @clement just seeing this thread. I would say I had a similar struggle and I do understand it can be a steep learning curve at times. Seems like you’re interested multiple things: • Keep track of state by partition ◦ I see you want to keep track of each state by individual targets as well (meltano does this automatically, but I’ll give a solution below) • Run meltano in parallel ◦ I have created a feature request for meltano to add parallelization on their github, but this is not a straightforward task. I will also give a solution to this below. I admit I’m pretty new to meltano as well and fought with the SDK a ton. I also had similar feature requests for my own project, and rather than waiting for meltano to build it I did it myself. That being said, I still think it’s worth it to learn the SDK. I would suggest building your tap with the SDK (it will be single-threaded), and then in your application add state-id tracking and parallelism. See examples below: Keep track of dynamic tap/target state by creating a simple python script that hosts meltano and runs on schedule and in parallel (configurable). • I know we can probably do this with airflow, but that’s yet another technology / learning curve. I’ll use the simple python APScheduler for this example. • For consistent state-id tracking, you’ll want to set up
state_backend
in meltano.yml • Below I’ve parallelized the tasks by each
--select
CLI value. I have not parallelized the individual selected taps.
Copy code
from flask import Flask, make_response
import os
import subprocess
from datetime import datetime
import logging
import time
import multiprocessing as mp
import logging
from waitress import serve
from apscheduler.schedulers.background import BackgroundScheduler
import json
import yaml

app = Flask(__name__)
app.url_map.strict_slashes = False

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

app = Flask(__name__)
app.url_map.strict_slashes = False

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


### GLOBALS ###

ENVIRONMENT = os.getenv('ENVIRONMENT')

### routes ###

@app.route(f'/my-tap-{ENVIRONMENT}', methods=['GET'])
def my_tap(task_chunks=None):
    with app.app_context():
        base_run_command = f'meltano --environment={ENVIRONMENT} el my-tap target-{MELTANO_TARGET}'

        if task_chunks is None:
            logging.info('Running meltano ELT without multiprocessing.')
            run_command = f"{base_run_command} --state-id my_tap_{ENVIRONMENT}_{MELTANO_TARGET}"
            logging.info(f"Running command {run_command}")
            subprocess.run(run_command, shell=True, cwd=os.path.join(app.root_path, project_dir))
        else:
            logging.info(f"Running meltano ELT using multiprocessing. Number of processes set to {os.getenv('NUM_WORKERS')}.")

            processes = []

            for chunk in task_chunks:
                assert isinstance(chunk, list), "Invalid datatype task_chunks. Must be list when running multiprocessing."
                state_id = ' '.join(chunk).replace('--select ', '').replace(' ', '__')
                select_param = ' '.join(chunk)
                run_command = \
                    f"{base_run_command} " \
                    f"--state-id MELTANO_TARGET_{MELTANO_TARGET}_{ENVIRONMENT}_{state_id} {select_param}"

                process = \
                    mp.Process(
                        target=subprocess.run,
                        kwargs={'args': run_command, 'shell': True,
                                'cwd': os.path.join(app.root_path, project_dir)}
                    )

                process.daemon = True
                logging.info(f"Running command {run_command}")
                process.start()
                time.sleep(3)
                processes.append(process)

            for p in processes:
                p.join()

            logging.info(f'*** Completed process {process} --- run_commands: {chunk}')

        return make_response(f'Last ran project my-tap-{ENVIRONMENT} target {MELTANO_TARGET} at {cur_timestamp()}.', 200)



if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logging.info(f'\n*** Running environment {ENVIRONMENT}. ***\n')

    scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})

    ###### my-tap ######

    num_tasks = int(os.getenv('NUM_WORKERS'))
    my_tap_cron = json.loads(os.getenv('CRON'))

    if num_tasks == 1:
        scheduler.add_job(my_tap, trigger='cron', **my_tap_cron, jitter=120)
    else:
        assert isinstance(num_tasks, int) and num_tasks > 1, \
            f"ENV variable NUM_WORKERS must be >= 1. It is currently set to {num_tasks} with datatype {type(num_tasks)}"

        with open("meltano.yml", "r") as meltano_cfg:
            cfg = yaml.safe_load(meltano_cfg)

        tasks = cfg.get('plugins').get('extractors')[0].get('select')
        tasks = [f'--select {i}' for i in tasks]
        tasks_per_chunk = len(tasks) // num_tasks
        remainder = len(tasks) % num_tasks

        task_chunks = []
        start_index = 0
        for i in range(num_tasks):
            chunk_size = tasks_per_chunk + (1 if i < remainder else 0)
            task_chunks.append(tasks[start_index: start_index + chunk_size])
            start_index += chunk_size

        scheduler.add_job(my_tap, kwargs={'task_chunks': task_chunks}, trigger='cron', **my_tap_cron, jitter=120)

    ###### host ######

    HOST = '0.0.0.0'
    PORT = 5000
    logging.info(f'Server is listening on port {PORT}')
    logging.info(f'Hosting environment {ENVIRONMENT}')

    scheduler.start()

    serve(app, host=HOST, port=PORT, threads=2)  # waitress wsgi production server
🙌 1
c
and fought with the SDK a ton.
Highly relatable 😄 Thank you for your answer sir, I definitely need to fight some more before fully removing the sdk from my most custom tap
👍 1