clement
01/05/2024, 3:09 PMstate = { query.contract_address: max(query.to_block, state[query.contract_address]) for query in queries }
and on subsequent runs, restart querying from the most recently imported contract_address block.
The trick is, sometimes there is no data for a block range but I still want to remember that I already fetched this block range so I can't derive state from the record stream.
I don't understand how to do that with meltano's sdk at the moment, it's most likely related to incremental replication but I don't understand how to set or get the statematt_elgazar
01/06/2024, 5:16 AMclement
01/06/2024, 10:09 AMclement
01/06/2024, 10:19 AMclement
01/06/2024, 10:20 AMclement
01/06/2024, 10:21 AMEdgar Ramírez (Arch.dev)
01/08/2024, 4:00 PMclement
01/09/2024, 1:42 PMclement
01/09/2024, 1:50 PMEdgar Ramírez (Arch.dev)
01/09/2024, 6:29 PMclement
01/10/2024, 10:25 AM_singer_sdk 's SchemaMessage but there is a property I don't understand and that's not in the singer 0.3 spec: bookmark_properties . Another structure I don't understand is ActivateVersionMessagematt_elgazar
01/19/2024, 11:02 PMstate_backend in meltano.yml
• Below I’ve parallelized the tasks by each --select CLI value. I have not parallelized the individual selected taps.
from flask import Flask, make_response
import os
import subprocess
from datetime import datetime
import logging
import time
import multiprocessing as mp
import logging
from waitress import serve
from apscheduler.schedulers.background import BackgroundScheduler
import json
import yaml
app = Flask(__name__)
app.url_map.strict_slashes = False
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = Flask(__name__)
app.url_map.strict_slashes = False
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
### GLOBALS ###
ENVIRONMENT = os.getenv('ENVIRONMENT')
### routes ###
@app.route(f'/my-tap-{ENVIRONMENT}', methods=['GET'])
def my_tap(task_chunks=None):
with app.app_context():
base_run_command = f'meltano --environment={ENVIRONMENT} el my-tap target-{MELTANO_TARGET}'
if task_chunks is None:
logging.info('Running meltano ELT without multiprocessing.')
run_command = f"{base_run_command} --state-id my_tap_{ENVIRONMENT}_{MELTANO_TARGET}"
logging.info(f"Running command {run_command}")
subprocess.run(run_command, shell=True, cwd=os.path.join(app.root_path, project_dir))
else:
logging.info(f"Running meltano ELT using multiprocessing. Number of processes set to {os.getenv('NUM_WORKERS')}.")
processes = []
for chunk in task_chunks:
assert isinstance(chunk, list), "Invalid datatype task_chunks. Must be list when running multiprocessing."
state_id = ' '.join(chunk).replace('--select ', '').replace(' ', '__')
select_param = ' '.join(chunk)
run_command = \
f"{base_run_command} " \
f"--state-id MELTANO_TARGET_{MELTANO_TARGET}_{ENVIRONMENT}_{state_id} {select_param}"
process = \
mp.Process(
target=subprocess.run,
kwargs={'args': run_command, 'shell': True,
'cwd': os.path.join(app.root_path, project_dir)}
)
process.daemon = True
logging.info(f"Running command {run_command}")
process.start()
time.sleep(3)
processes.append(process)
for p in processes:
p.join()
logging.info(f'*** Completed process {process} --- run_commands: {chunk}')
return make_response(f'Last ran project my-tap-{ENVIRONMENT} target {MELTANO_TARGET} at {cur_timestamp()}.', 200)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info(f'\n*** Running environment {ENVIRONMENT}. ***\n')
scheduler = BackgroundScheduler(job_defaults={'max_instances': 3})
###### my-tap ######
num_tasks = int(os.getenv('NUM_WORKERS'))
my_tap_cron = json.loads(os.getenv('CRON'))
if num_tasks == 1:
scheduler.add_job(my_tap, trigger='cron', **my_tap_cron, jitter=120)
else:
assert isinstance(num_tasks, int) and num_tasks > 1, \
f"ENV variable NUM_WORKERS must be >= 1. It is currently set to {num_tasks} with datatype {type(num_tasks)}"
with open("meltano.yml", "r") as meltano_cfg:
cfg = yaml.safe_load(meltano_cfg)
tasks = cfg.get('plugins').get('extractors')[0].get('select')
tasks = [f'--select {i}' for i in tasks]
tasks_per_chunk = len(tasks) // num_tasks
remainder = len(tasks) % num_tasks
task_chunks = []
start_index = 0
for i in range(num_tasks):
chunk_size = tasks_per_chunk + (1 if i < remainder else 0)
task_chunks.append(tasks[start_index: start_index + chunk_size])
start_index += chunk_size
scheduler.add_job(my_tap, kwargs={'task_chunks': task_chunks}, trigger='cron', **my_tap_cron, jitter=120)
###### host ######
HOST = '0.0.0.0'
PORT = 5000
logging.info(f'Server is listening on port {PORT}')
logging.info(f'Hosting environment {ENVIRONMENT}')
scheduler.start()
serve(app, host=HOST, port=PORT, threads=2) # waitress wsgi production serverclement
01/20/2024, 12:23 AMand fought with the SDK a ton.Highly relatable 😄 Thank you for your answer sir, I definitely need to fight some more before fully removing the sdk from my most custom tap