I’ve been working on a new feature to an SDK based...
# singer-target-development
b
I’ve been working on a new feature to an SDK based SQL Target and wanted to share it. I have a stream that takes a long time to reach the drain limit and then a longtime to preprocess the records causing the target host connection to timeout. If I move the
MAX_DEFAULT_SIZE
to
100
the stream works great but the rest of my streams load better at the normal drain limit of
10000
. The feature I added to solve this is
dynanmic batch sizing
. You set how often in seconds you want a target to have bulk inserts, let say every second, then it will dynamically set the
MAX_DEFAULT_SIZE
and monitor how long inserts take during the run to insure that happens. The feature utilizes
time.perf_counter
and some basic calculations. I placed more info in a reply to this message If you are interested in taking a look or giving it a try in your own SDK based SQL Target.
Like I mentioned I have a working PR to a personal project target-postgres. Here is the link to the PR file changes: feat: Dynamic batch sizing. Give it look and let me know your thoughts via comments to this thread. If you would like to give it a try on your own SDK based SQL Target follow these instruction. They may work, I hope. Download the perftimer.py file and place it into the folder that contains your target’s
sinks.py
file. A copy and paste of the contents into a new file named
perftimer.py
will also work. Open the file that contains your target’s sink class this usually is
sinks.py
and add an import line for the
BatchPerfTimer
class. You will need to add couple of items to your target's sinks class. First lets set a really low bulk insert batch size by adding
MAX_SIZE_DEFAULT = 100
. Second we need a new
BatchPerfTimer
and let it know the starting bulk insert batch size and how long in between insert in seconds
_sink_timer: BatchPerfTimer = BatchPerfTimer(MAX_SIZE_DEFAULT,1)
. Third let’s access this new timer via a property
Copy code
@property
   def sink_timer(self):
       return self._sink_timer
With all those additions made we ready for the final step added the code to start and stop the
sink_timer
. I placed it in the sink’s
bulk_insert_records
, since I already overloaded the method it was preset in my targets sink class. If you have it present you will want to place the following code just after your insert code and before the return:
Copy code
# Finish Line for max_size perf counter
       if self.sink_timer.start_time is not None:
           self.sink_timer.stop()
           self.MAX_SIZE_DEFAULT = self.sink_timer.counter_based_max_size()

       <http://self.logger.info|self.logger.info>(f"MAX_SIZE_DEFAULT: {self.max_size}")

       # Starting Line for max_size perf counter
       self.sink_timer.start()
If you haven’t needed to overload the bulk insert method then I think you can just add this code block to the end of your target’s sink class.
Copy code
def bulk_insert_records(
           self,
           full_table_name: str,
           schema: dict,
           records: t.Iterable[dict[str, t.Any]],
       ) -> int | None:
   
       count = super().bulk_insert_records(
           full_table_name=full_table_name,
           schema=schema,
           records=records
       )

       # Finish Line for max_size perf counter
       if self.sink_timer.start_time is not None:
           self.sink_timer.stop()
           self.MAX_SIZE_DEFAULT = self.sink_timer.counter_based_max_size()

       <http://self.logger.info|self.logger.info>(f"MAX_SIZE_DEFAULT: {self.max_size}")

       # Starting Line for max_size perf counter
       self.sink_timer.start()
       
       return count
It should be working and when you run it you will see how the bulk insert batch size is changing.
v
If you use connection pooling with sqlalchemy I think it will refresh the pool automatically and bypass this right?
b
Thanks for that tip I will have to look at that.
v
I have it implemented with the latest PR on
target-postgres
the meltano labs version
I think it'll work but I haven't tested it out with your use case it'd be interesting to see if it solves your issue or not
b
I started with the current version of meltanolabs
target-postgres
to see if it would stop the same way I saw my target stop. It did stop with the same
psycopg2.OperationalError: server closed the connection unexpectedly
message. I did a clean install of the meltanolabs
target-postgres
pointing to the branch
uppercase_alter_failure
. It was able to hang on longer but also stopped with the same message.
Copy code
2023-06-29T19:11:56.741784Z [info     ] Traceback (most recent call last): cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.741784Z [info     ]   File "C:\development\projects\melatnolabs-picture-test\.meltano\loaders\target-postgres\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1885, in _execute_context cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_i
2023-06-29T19:11:56.804286Z [info     ]     self.dialect.do_executemany( cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.804286Z [info     ]   File "C:\development\projects\melatnolabs-picture-test\.meltano\loaders\target-postgres\venv\lib\site-packages\sqlalchemy\dialects\postgresql\psycopg2.py", line 982, in do_executemany cmd_type=elb consumer=True name=target-postgres producer=False stdio=s
2023-06-29T19:11:56.835536Z [info     ]     context._psycopg2_fetched_rows = xtras.execute_values( cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.835536Z [info     ]   File "C:\development\projects\melatnolabs-picture-test\.meltano\loaders\target-postgres\venv\lib\site-packages\psycopg2\extras.py", line 1299, in execute_values cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-
2023-06-29T19:11:56.851160Z [info     ]     cur.execute(b''.join(parts)) cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.851160Z [info     ] psycopg2.OperationalError: server closed the connection unexpectedly cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.851160Z [info     ]         This probably means the server terminated abnormally cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.866788Z [info     ]         before or while processing the request. cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
v
Super helpful, maybe I missed migrating every call properly hmm do you have the 10-20 lines above that in the stack trace? I think I see where I missed a few calls
I could probably write a test for this too hmmmm interesting stuff
b
I found the elt.log of the run and removed the listing of not selected tables and some data from the paramaters listing.
I wonder if this might be the issue? https://github.com/psycopg/psycopg2/issues/829. I tried the
select 1
fix by adding the engine argument
pool_pre_ping=True
https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic I still had it stop with that, also tried a transaction isolation level of
AUTOCOMMIT
and that got further but still stopped. https://docs.sqlalchemy.org/en/20/orm/session_transaction.html#setting-transaction-isolation-levels-dbapi-autocommit
Just tried the
pg8000
driver as mentioned as a possible solution in
psycopg2
issue #829 and the same load completed without stopping. I still prefer
psycopg2
since it has
executemany_mode
.
e
Hey @BuzzCutNorman! I proposed
batch_wait_limit_seconds
in https://github.com/meltano/sdk/issues/1626 and it seems that your proposal would solve that?
v
@BuzzCutNorman Can you give that branch a pull and another shot?
b
@visch I just did a clean install from the branch, started a run, and it stopped with the same message
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
.
v
😕 shucks if I wanted to replicate your error I need to run something that takes a while to run right? Like >30 min?
b
The EL does take a long time but I think it is more that there are multiple drains inserting at the same time. So initial limit drain, then 5 min drain, insert of first limit drain occurs, then second limit drain, the 5 mins drain tries to insert, then when the second limit insert tries everything stops. That is the best I can piece together so far.
Hey @edgar_ramirez_mondragon! Thanks for pointing out that this might be a way to implement the
batch_wait_limit_seconds
feature you outlined. Would you like me to put this into a PR and connect it to that issue?
e
@BuzzCutNorman by all means! Contributions are more than welcome 😁