BuzzCutNorman
06/29/2023, 5:42 PMMAX_DEFAULT_SIZE
to 100
the stream works great but the rest of my streams load better at the normal drain limit of 10000
. The feature I added to solve this is dynanmic batch sizing
. You set how often in seconds you want a target to have bulk inserts, let say every second, then it will dynamically set the MAX_DEFAULT_SIZE
and monitor how long inserts take during the run to insure that happens. The feature utilizes time.perf_counter
and some basic calculations. I placed more info in a reply to this message If you are interested in taking a look or giving it a try in your own SDK based SQL Target.BuzzCutNorman
06/29/2023, 5:43 PMsinks.py
file. A copy and paste of the contents into a new file named perftimer.py
will also work. Open the file that contains your target’s sink class this usually is sinks.py
and add an import line for the BatchPerfTimer
class. You will need to add couple of items to your target's sinks class. First lets set a really low bulk insert batch size by adding MAX_SIZE_DEFAULT = 100
. Second we need a new BatchPerfTimer
and let it know the starting bulk insert batch size and how long in between insert in seconds _sink_timer: BatchPerfTimer = BatchPerfTimer(MAX_SIZE_DEFAULT,1)
. Third let’s access this new timer via a property
@property
def sink_timer(self):
return self._sink_timer
With all those additions made we ready for the final step added the code to start and stop the sink_timer
. I placed it in the sink’s bulk_insert_records
, since I already overloaded the method it was preset in my targets sink class. If you have it present you will want to place the following code just after your insert code and before the return:
# Finish Line for max_size perf counter
if self.sink_timer.start_time is not None:
self.sink_timer.stop()
self.MAX_SIZE_DEFAULT = self.sink_timer.counter_based_max_size()
<http://self.logger.info|self.logger.info>(f"MAX_SIZE_DEFAULT: {self.max_size}")
# Starting Line for max_size perf counter
self.sink_timer.start()
If you haven’t needed to overload the bulk insert method then I think you can just add this code block to the end of your target’s sink class.
def bulk_insert_records(
self,
full_table_name: str,
schema: dict,
records: t.Iterable[dict[str, t.Any]],
) -> int | None:
count = super().bulk_insert_records(
full_table_name=full_table_name,
schema=schema,
records=records
)
# Finish Line for max_size perf counter
if self.sink_timer.start_time is not None:
self.sink_timer.stop()
self.MAX_SIZE_DEFAULT = self.sink_timer.counter_based_max_size()
<http://self.logger.info|self.logger.info>(f"MAX_SIZE_DEFAULT: {self.max_size}")
# Starting Line for max_size perf counter
self.sink_timer.start()
return count
It should be working and when you run it you will see how the bulk insert batch size is changing.visch
06/29/2023, 6:01 PMBuzzCutNorman
06/29/2023, 6:04 PMvisch
06/29/2023, 6:05 PMtarget-postgres
the meltano labs versionvisch
06/29/2023, 6:05 PMBuzzCutNorman
06/29/2023, 8:35 PMtarget-postgres
to see if it would stop the same way I saw my target stop. It did stop with the same psycopg2.OperationalError: server closed the connection unexpectedly
message. I did a clean install of the meltanolabs target-postgres
pointing to the branch uppercase_alter_failure
. It was able to hang on longer but also stopped with the same message.
2023-06-29T19:11:56.741784Z [info ] Traceback (most recent call last): cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.741784Z [info ] File "C:\development\projects\melatnolabs-picture-test\.meltano\loaders\target-postgres\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1885, in _execute_context cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_i
2023-06-29T19:11:56.804286Z [info ] self.dialect.do_executemany( cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.804286Z [info ] File "C:\development\projects\melatnolabs-picture-test\.meltano\loaders\target-postgres\venv\lib\site-packages\sqlalchemy\dialects\postgresql\psycopg2.py", line 982, in do_executemany cmd_type=elb consumer=True name=target-postgres producer=False stdio=s
2023-06-29T19:11:56.835536Z [info ] context._psycopg2_fetched_rows = xtras.execute_values( cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.835536Z [info ] File "C:\development\projects\melatnolabs-picture-test\.meltano\loaders\target-postgres\venv\lib\site-packages\psycopg2\extras.py", line 1299, in execute_values cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-
2023-06-29T19:11:56.851160Z [info ] cur.execute(b''.join(parts)) cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.851160Z [info ] psycopg2.OperationalError: server closed the connection unexpectedly cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.851160Z [info ] This probably means the server terminated abnormally cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
2023-06-29T19:11:56.866788Z [info ] before or while processing the request. cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres
visch
06/29/2023, 9:13 PMvisch
06/29/2023, 9:13 PMBuzzCutNorman
06/29/2023, 9:47 PMBuzzCutNorman
06/29/2023, 11:25 PMselect 1
fix by adding the engine argument pool_pre_ping=True
https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic I still had it stop with that, also tried a transaction isolation level of AUTOCOMMIT
and that got further but still stopped. https://docs.sqlalchemy.org/en/20/orm/session_transaction.html#setting-transaction-isolation-levels-dbapi-autocommitBuzzCutNorman
06/30/2023, 4:13 PMpg8000
driver as mentioned as a possible solution in psycopg2
issue #829 and the same load completed without stopping. I still prefer psycopg2
since it has executemany_mode
.edgar_ramirez_mondragon
07/03/2023, 9:05 PMbatch_wait_limit_seconds
in https://github.com/meltano/sdk/issues/1626 and it seems that your proposal would solve that?visch
07/05/2023, 9:34 PMBuzzCutNorman
07/07/2023, 8:55 PMsqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
.visch
07/07/2023, 9:01 PMBuzzCutNorman
07/07/2023, 9:28 PMBuzzCutNorman
07/07/2023, 9:38 PMbatch_wait_limit_seconds
feature you outlined. Would you like me to put this into a PR and connect it to that issue?edgar_ramirez_mondragon
07/07/2023, 9:46 PM