joshua_janicas
03/27/2024, 7:15 PMbatch_size_rows
command in my meltano file, and I can see the command when I run meltano invoke target-snowflake --about
. However it seems that things aren't working quite as I expect it to - I have a (small) table of 940K rows and it is still chunking the .json
files into 10,000 entry increments rather than the amount I specified. ๐งต
I am wondering if I am missing anything else configuration wise that I should be doing? I am running target-snowflake
(Meltano Variant) in conjunction with tap-mssql
(Buzzcut Norman) which is also on the Meltano SDKjoshua_janicas
03/27/2024, 7:15 PMjoshua_janicas
03/27/2024, 7:16 PMjoshua_janicas
03/27/2024, 7:39 PMBuzzCutNorman
03/27/2024, 7:51 PMtarget-snowflake
it is using the JSONLinesBatcher.get_batches
. I think you can give it a batch config to change that maybe?
batch_config:
batch_size: 150000
joshua_janicas
03/27/2024, 7:54 PMplugins:
loaders:
- name: target-snowflake
variant: meltanolabs
pip_url: meltanolabs-target-snowflake
batch_config:
batch_size: 150000
config:
add_record_metadata: false # Can enable if we want more metadata
#account: See .ENV
#database: TS See .ENV
#user: See .ENV
#role: See .ENV
#warehouse: See .ENV
#password: See .ENV
default_target_schema: Raw # ${MELTANO_EXTRACT__LOAD_SCHEMA} # Meltano chooses the schema based on the `name` of the extractor
hard_delete: false
batch_size_rows: 150000
joshua_janicas
03/27/2024, 7:57 PMconfig
BuzzCutNorman
03/27/2024, 7:57 PMplugins:
loaders:
- name: target-snowflake
variant: meltanolabs
pip_url: meltanolabs-target-snowflake
config:
add_record_metadata: false # Can enable if we want more metadata
#account: See .ENV
#database: TS See .ENV
#user: See .ENV
#role: See .ENV
#warehouse: See .ENV
#password: See .ENV
default_target_schema: Raw # ${MELTANO_EXTRACT__LOAD_SCHEMA} # Meltano chooses the schema based on the `name` of the extractor
hard_delete: false
batch_size_rows: 150000
batch_config:
batch_size: 150000
BuzzCutNorman
03/27/2024, 8:01 PMjoshua_janicas
03/27/2024, 8:04 PMbatch_config
being embedded, something about a pipe closing error. When I removed the batch_config
setting it started working againjoshua_janicas
03/27/2024, 8:05 PM024-03-27T20:04:53.442684Z [info ] 2024-03-27 16:04:53,442 | INFO | target-snowflake | Target sink for 'Static-RegionCode' is full. Current size is '150000'. Draining... cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.443684Z [info ] Traceback (most recent call last): cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.444867Z [info ] File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 196, in _run_module_as_main cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.444867Z [info ] return _run_code(code, main_globals, None, cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.445867Z [info ] File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 86, in _run_code cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.445867Z [info ] exec(code, run_globals) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.446868Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\Scripts\target-snowflake.exe\__main__.py", line 7, in <module> cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.446868Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\click\core.py", line 1157, in __call__ cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr
string_id=target-snowflake
2024-03-27T20:04:53.447867Z [info ] return self.main(*args, **kwargs) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.447867Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\click\core.py", line 1078, in main cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.448870Z [info ] rv = self.invoke(ctx) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.448870Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\plugin_base.py", line 80, in invoke cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.449868Z [info ] return super().invoke(ctx) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.449868Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\click\core.py", line 1434, in invoke cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.450869Z [info ] return ctx.invoke(self.callback, **ctx.params) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.450869Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\click\core.py", line 783, in invoke cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.451869Z [info ] return __callback(*args, **kwargs) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.451869Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\target_base.py", line 567, in invoke cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.451869Z [info ] target.listen(file_input) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.452869Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\io_base.py", line 35, in listen cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.452869Z [info ] self._process_lines(file_input) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.453869Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\target_base.py", line 307, in _process_lines cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.453869Z [info ] counter = super()._process_lines(file_input) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.453869Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\io_base.py", line 94, in _process_lines cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.454869Z [info ] self._process_record_message(line_dict) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.454869Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\target_base.py", line 371, in _process_record_message cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.454869Z [info ] self.drain_one(sink) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.455868Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\target_base.py", line 512, in drain_one cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.455868Z [info ] sink.process_batch(draining_status) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.456868Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\sinks\sql.py", line 262, in process_batch cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.456868Z [info ] self.bulk_insert_records( cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.456868Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\target_snowflake\sinks.py", line 147, in bulk_insert_records cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.457873Z [info ] batch_config=self.batch_config, cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.457873Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\target_snowflake\sinks.py", line 168, in batch_config cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.458869Z [info ] return BatchConfig.from_dict(raw) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.458869Z [info ] File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\helpers\_batch.py", line 255, in from_dict cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.458869Z [info ] return cls(**data) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.459869Z [info ] TypeError: BatchConfig.__init__() missing 2 required positional arguments: 'encoding' and 'storage' cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.498527Z [info ] 2024-03-27 16:04:53,498 | INFO | snowflake.connector.connection | closed cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.499528Z [info ] 2024-03-27 16:04:53,498 | INFO | snowflake.connector.connection | No async queries seem to be running, deleting session cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr
string_id=target-snowflake
2024-03-27T20:04:53.672422Z [error ] [WinError 109] The pipe has been ended
joshua_janicas
03/27/2024, 8:06 PM2024-03-27T20:04:53.672422Z [error ] [WinError 109] The pipe has been ended
โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ Traceback (most recent call last) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ
โ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\log โ
โ ging\output_logger.py:207 in redirect_logging โ
โ โ
โ 204 โ โ โ *ignore_errors, โ
โ 205 โ โ ) โ
โ 206 โ โ try: โ
โ โฑ 207 โ โ โ yield โ
โ 208 โ โ except ignored_errors: # noqa: WPS329 โ
โ 209 โ โ โ raise โ
โ 210 โ โ except RunnerError as err: โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ
โ ck\extract_load.py:462 in run โ
โ โ
โ 459 โ โ โ # TODO: legacy `meltano elt` style logging should be deprecated โ
โ 460 โ โ โ legacy_log_handler = self.output_logger.out("meltano", logger) โ
โ 461 โ โ โ with legacy_log_handler.redirect_logging(): โ
โ โฑ 462 โ โ โ โ await self.run_with_job() โ
โ 463 โ โ โ โ return โ
โ 464 โ โ else: โ
โ 465 โ โ โ logger.warning( โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ
โ ck\extract_load.py:494 in run_with_job โ
โ โ
โ 491 โ โ โ
โ 492 โ โ with closing(self.context.session) as session: โ
โ 493 โ โ โ async with job.run(session): โ
โ โฑ 494 โ โ โ โ await self.execute() โ
โ 495 โ โ
โ 496 โ async def terminate(self, graceful: bool = False) -> None: โ
โ 497 โ โ """Terminate an in flight ExtractLoad execution, potentially disruptive. โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ
โ ck\extract_load.py:454 in execute โ
โ โ
โ 451 โ โ async with self._start_blocks(): โ
โ 452 โ โ โ await self._link_io() โ
โ 453 โ โ โ manager = ELBExecutionManager(self) โ
โ โฑ 454 โ โ โ await manager.run() โ
โ 455 โ โ
โ 456 โ async def run(self) -> None: โ
โ 457 โ โ """Run the ELT task.""" โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ
โ ck\extract_load.py:659 in run โ
โ โ
โ 656 โ โ for IO to complete as appropriate. Expect a RunnerError to be raised if โ
โ 657 โ โ any of the blocks exit with a non 0 exit code. โ
โ 658 โ โ """ โ
โ โฑ 659 โ โ await self._wait_for_process_completion(self.elb.head) โ
โ 660 โ โ _check_exit_codes( โ
โ 661 โ โ โ self._producer_code, โ
โ 662 โ โ โ self._consumer_code, โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ
`
joshua_janicas
03/27/2024, 8:06 PMC:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ
โ ck\extract_load.py:732 in _wait_for_process_completion โ
โ โ
โ 729 โ โ โ โ โ line_length_limit=self.line_length_limit, โ
โ 730 โ โ โ โ โ stream_buffer_size=self.stream_buffer_size, โ
โ 731 โ โ โ โ ) โ
โ โฑ 732 โ โ โ raise output_futures_failed.exception() # noqa: RSE102 โ
โ 733 โ โ else: โ
โ 734 โ โ โ # If all the output handlers completed without raising an โ
โ 735 โ โ โ # exception, we still need to wait for all the underlying block โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\log โ
โ ging\utils.py:233 in capture_subprocess_output โ
โ โ
โ 230 โ โ โ continue โ
โ 231 โ โ โ
โ 232 โ โ for writer in line_writers: โ
โ โฑ 233 โ โ โ if not await _write_line_writer(writer, line): โ
โ 234 โ โ โ โ # If the destination stream is closed, we can stop capturing output. โ
โ 235 โ โ โ โ return โ
โ 236 โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\log โ
โ ging\utils.py:201 in _write_line_writer โ
โ โ
โ 198 โ โ โ writer.write(line) โ
โ 199 โ โ โ await writer.drain() โ
โ 200 โ โ except (BrokenPipeError, ConnectionResetError): โ
โ โฑ 201 โ โ โ await writer.wait_closed() โ
โ 202 โ โ โ return False โ
โ 203 โ else: โ
โ 204 โ โ writer.writeline(line.decode()) โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\streams.py:343 in โ
โ wait_closed โ
โ โ
โ 340 โ โ return self._transport.is_closing() โ
โ 341 โ โ
โ 342 โ async def wait_closed(self): โ
โ โฑ 343 โ โ await self._protocol._get_close_waiter(self) โ
โ 344 โ โ
โ 345 โ def get_extra_info(self, name, default=None): โ
โ 346 โ โ return self._transport.get_extra_info(name, default) โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py:3 โ
โ 85 in _loop_writing โ
โ โ
โ 382 โ โ โ self._write_fut = None โ
โ 383 โ โ โ self._pending_write = 0 โ
โ 384 โ โ โ if f: โ
โ โฑ 385 โ โ โ โ f.result() โ
โ 386 โ โ โ if data is None: โ
โ 387 โ โ โ โ data = self._buffer โ
โ 388 โ โ โ โ self._buffer = None โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py:82 โ
โ 6 in _poll โ
โ โ
โ 823 โ โ โ # if the overlapped has been cancelled โ
โ 824 โ โ โ elif not f.done(): โ
โ 825 โ โ โ โ try: โ
โ โฑ 826 โ โ โ โ โ value = callback(transferred, key, ov) โ
โ 827 โ โ โ โ except OSError as e: โ
โ 828 โ โ โ โ โ f.set_exception(e) โ
โ 829 โ โ โ โ โ self._results.append(f) โ
โ โ
โ C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py:55 โ
โ 2 in finish_send โ
โ โ
โ 549 โ โ โ
โ 550 โ โ def finish_send(trans, key, ov): โ
โ 551 โ โ โ try: โ
โ โฑ 552 โ โ โ โ return ov.getresult() โ
โ 553 โ โ โ except OSError as exc: โ
โ 554 โ โ โ โ if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, โ
โ 555 โ โ โ โ โ โ โ โ โ _overlapped.ERROR_OPERATION_ABORTED): โ
โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ
BrokenPipeError: [WinError 109] The pipe has been ended
Exception ignored in: <function BaseSubprocessTransport.__del__ at 0x0000022A33789240>
Traceback (most recent call last):
File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 125, in __del__
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 80, in __repr__
info.append(f'stderr={stderr.pipe}')
File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 80, in __repr__
info.append(f'fd={self._sock.fileno()}')
File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_utils.py", line 102, in fileno
raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x0000022A3378AD40>
Traceback (most recent call last):
File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 116, in __del__
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 80, in __repr__
info.append(f'fd={self._sock.fileno()}')
File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_utils.py", line 102, in fileno
raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe
BuzzCutNorman
03/27/2024, 8:06 PMjoshua_janicas
03/27/2024, 8:07 PMjoshua_janicas
03/27/2024, 8:08 PMBuzzCutNorman
03/27/2024, 8:08 PMDEFAULT_BATCH_CONFIG = {
"encoding": {"format": "jsonl", "compression": "gzip"},
"storage": {"root": "file://"},
}
BuzzCutNorman
03/27/2024, 8:10 PMplugins:
loaders:
- name: target-snowflake
variant: meltanolabs
pip_url: meltanolabs-target-snowflake
config:
add_record_metadata: false # Can enable if we want more metadata
#account: See .ENV
#database: TS See .ENV
#user: See .ENV
#role: See .ENV
#warehouse: See .ENV
#password: See .ENV
default_target_schema: Raw # ${MELTANO_EXTRACT__LOAD_SCHEMA} # Meltano chooses the schema based on the `name` of the extractor
hard_delete: false
batch_size_rows: 150000
batch_config:
batch_size: 150000
encoding:
format: jsonl
compression: gzip
storage:
root: "file://"
joshua_janicas
03/27/2024, 8:10 PMbatch_config:
batch_size: 150000
encoding: {"format": "jsonl", "compression": "gzip"}
storage: {"root": "file://"}
joshua_janicas
03/27/2024, 8:10 PMBuzzCutNorman
03/27/2024, 8:11 PMjoshua_janicas
03/27/2024, 8:11 PMBuzzCutNorman
03/27/2024, 8:12 PMjoshua_janicas
03/27/2024, 8:13 PMjoshua_janicas
03/27/2024, 8:13 PMjoshua_janicas
03/27/2024, 8:13 PMBuzzCutNorman
03/27/2024, 8:13 PMjoshua_janicas
03/27/2024, 8:14 PMBuzzCutNorman
03/27/2024, 8:15 PMbatch_size_rows
and it will still work. I hate to say that but I think it will.joshua_janicas
03/27/2024, 8:15 PMjoshua_janicas
03/27/2024, 8:20 PMBuzzCutNorman
03/27/2024, 8:20 PMBuzzCutNorman
03/27/2024, 8:25 PMtarget_snowflake
and found we need to add in the batch_config
and batch_size_rows
to change how many rows get written to a batch file. Here is an example of the config below. I was wondering if this would be and SDK change or Target change to make this simpler for the user?
plugins:
loaders:
- name: target-snowflake
variant: meltanolabs
pip_url: meltanolabs-target-snowflake
config:
add_record_metadata: false # Can enable if we want more metadata
#account: See .ENV
#database: TS See .ENV
#user: See .ENV
#role: See .ENV
#warehouse: See .ENV
#password: See .ENV
default_target_schema: Raw # ${MELTANO_EXTRACT__LOAD_SCHEMA} # Meltano chooses the schema based on the `name` of the extractor
hard_delete: false
batch_size_rows: 150000
batch_config:
batch_size: 150000
encoding:
format: jsonl
compression: gzip
storage:
root: "file://"
BuzzCutNorman
03/27/2024, 8:34 PMDEFAULT_BATCH_CONFIG
for target-snowflake
to look like this and move it inside the SnowflakeSink
class?
DEFAULT_BATCH_CONFIG = {
"batch_size": self.max_size
"encoding": {"format": "jsonl", "compression": "gzip"},
"storage": {"root": "file://"},
}
Edgar Ramรญrez (Arch.dev)
03/27/2024, 9:03 PMBuzzCutNorman
03/27/2024, 9:28 PMbatch_config
in the meltano.yml
because it passes self.batch_config
when it creates batcher
. I know there is a property for batch_config
in the SnowflakeSink
class but I don't think the property is used if the batch_config
is present in the meltano.yml
?
# serialize to batch files and upload
# TODO: support other batchers
batcher = JSONLinesBatcher(
tap_name=self.target.name,
stream_name=self.stream_name,
batch_config=self.batch_config,
)
Edgar Ramรญrez (Arch.dev)
03/28/2024, 1:13 AMJSONLinesBatcher
should be getting a batch config object built on the fly with the value of batch_size_rows
and the other jsonl default values.