Hi all, first off want to say thanks to getting th...
# troubleshooting
j
Hi all, first off want to say thanks to getting the target-snowflake up to date with the Meltano SDK. I have included the new
batch_size_rows
command in my meltano file, and I can see the command when I run
meltano invoke target-snowflake --about
. However it seems that things aren't working quite as I expect it to - I have a (small) table of 940K rows and it is still chunking the
.json
files into 10,000 entry increments rather than the amount I specified. ๐Ÿงต I am wondering if I am missing anything else configuration wise that I should be doing? I am running
target-snowflake
(Meltano Variant) in conjunction with
tap-mssql
(Buzzcut Norman) which is also on the Meltano SDK
๐Ÿ‘€ 1
One of the JSON files still chunking into 10K rows
Interesting, maybe I am having incorrect assumptions of the expected end result of the data being pulled. Re-running I am seeing that the json.gz files have a (target-snowflake-table_name-GUID-#.json.gz) which goes up to 15 then starts over. I thought that the JSON files would have increments of rows based on what I tell it to, so each json would've had 150 K rows.
b
You are correct. Looking at the code for
target-snowflake
it is using the
JSONLinesBatcher.get_batches
. I think you can give it a batch config to change that maybe?
Copy code
batch_config:
  batch_size: 150000
j
As such?
Copy code
plugins:
  loaders:
  - name: target-snowflake
    variant: meltanolabs
    pip_url: meltanolabs-target-snowflake
    batch_config:
      batch_size: 150000
    config:
      add_record_metadata: false # Can enable if we want more metadata
      #account: See .ENV
      #database: TS See .ENV 
      #user: See .ENV
      #role: See .ENV
      #warehouse: See .ENV
      #password: See .ENV
      default_target_schema: Raw # ${MELTANO_EXTRACT__LOAD_SCHEMA} # Meltano chooses the schema based on the `name` of the extractor
      hard_delete: false
      batch_size_rows: 150000
๐Ÿ‘€ 1
that didnt work, maybe i need to embed it under
config
b
I think like this.
Copy code
plugins:
  loaders:
  - name: target-snowflake
    variant: meltanolabs
    pip_url: meltanolabs-target-snowflake
    config:
      add_record_metadata: false # Can enable if we want more metadata
      #account: See .ENV
      #database: TS See .ENV 
      #user: See .ENV
      #role: See .ENV
      #warehouse: See .ENV
      #password: See .ENV
      default_target_schema: Raw # ${MELTANO_EXTRACT__LOAD_SCHEMA} # Meltano chooses the schema based on the `name` of the extractor
      hard_delete: false
      batch_size_rows: 150000
      batch_config:
        batch_size: 150000
โค๏ธ 1
is it working ?
j
One second; it's now having a bad time with that
batch_config
being embedded, something about a pipe closing error. When I removed the
batch_config
setting it started working again
๐Ÿ˜ž 1
Copy code
024-03-27T20:04:53.442684Z [info     ] 2024-03-27 16:04:53,442 | INFO     | target-snowflake     | Target sink for 'Static-RegionCode' is full. Current size is '150000'. Draining... cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.443684Z [info     ] Traceback (most recent call last): cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.444867Z [info     ]   File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 196, in _run_module_as_main cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.444867Z [info     ]     return _run_code(code, main_globals, None, cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.445867Z [info     ]   File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\runpy.py", line 86, in _run_code cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.445867Z [info     ]     exec(code, run_globals)    cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.446868Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\Scripts\target-snowflake.exe\__main__.py", line 7, in <module> cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.446868Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\click\core.py", line 1157, in __call__ cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr
 string_id=target-snowflake
2024-03-27T20:04:53.447867Z [info     ]     return self.main(*args, **kwargs) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.447867Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\click\core.py", line 1078, in main cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.448870Z [info     ]     rv = self.invoke(ctx)      cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.448870Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\plugin_base.py", line 80, in invoke cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.449868Z [info     ]     return super().invoke(ctx) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.449868Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\click\core.py", line 1434, in invoke cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.450869Z [info     ]     return ctx.invoke(self.callback, **ctx.params) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.450869Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\click\core.py", line 783, in invoke cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.451869Z [info     ]     return __callback(*args, **kwargs) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.451869Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\target_base.py", line 567, in invoke cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.451869Z [info     ]     target.listen(file_input)  cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.452869Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\io_base.py", line 35, in listen cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.452869Z [info     ]     self._process_lines(file_input) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.453869Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\target_base.py", line 307, in _process_lines cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.453869Z [info     ]     counter = super()._process_lines(file_input) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.453869Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\io_base.py", line 94, in _process_lines cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.454869Z [info     ]     self._process_record_message(line_dict) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.454869Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\target_base.py", line 371, in _process_record_message cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.454869Z [info     ]     self.drain_one(sink)       cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.455868Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\target_base.py", line 512, in drain_one cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.455868Z [info     ]     sink.process_batch(draining_status) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.456868Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\sinks\sql.py", line 262, in process_batch cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.456868Z [info     ]     self.bulk_insert_records(  cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.456868Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\target_snowflake\sinks.py", line 147, in bulk_insert_records cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.457873Z [info     ]     batch_config=self.batch_config, cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.457873Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\target_snowflake\sinks.py", line 168, in batch_config cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.458869Z [info     ]     return BatchConfig.from_dict(raw) cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.458869Z [info     ]   File "C:\Git\elt-pipeline\.meltano\loaders\target-snowflake\venv\lib\site-packages\singer_sdk\helpers\_batch.py", line 255, in from_dict cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.458869Z [info     ]     return cls(**data)         cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.459869Z [info     ] TypeError: BatchConfig.__init__() missing 2 required positional arguments: 'encoding' and 'storage' cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.498527Z [info     ] 2024-03-27 16:04:53,498 | INFO     | snowflake.connector.connection | closed cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
2024-03-27T20:04:53.499528Z [info     ] 2024-03-27 16:04:53,498 | INFO     | snowflake.connector.connection | No async queries seem to be running, deleting session cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr
 string_id=target-snowflake
2024-03-27T20:04:53.672422Z [error    ] [WinError 109] The pipe has been ended
Copy code
2024-03-27T20:04:53.672422Z [error    ] [WinError 109] The pipe has been ended
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Traceback (most recent call last) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\log โ”‚
โ”‚ ging\output_logger.py:207 in redirect_logging                                                    โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   204 โ”‚   โ”‚   โ”‚   *ignore_errors,                                                                โ”‚
โ”‚   205 โ”‚   โ”‚   )                                                                                  โ”‚
โ”‚   206 โ”‚   โ”‚   try:                                                                               โ”‚
โ”‚ โฑ 207 โ”‚   โ”‚   โ”‚   yield                                                                          โ”‚
โ”‚   208 โ”‚   โ”‚   except ignored_errors:  # noqa: WPS329                                             โ”‚
โ”‚   209 โ”‚   โ”‚   โ”‚   raise                                                                          โ”‚
โ”‚   210 โ”‚   โ”‚   except RunnerError as err:                                                         โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ”‚
โ”‚ ck\extract_load.py:462 in run                                                                    โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   459 โ”‚   โ”‚   โ”‚   # TODO: legacy `meltano elt` style logging should be deprecated                โ”‚
โ”‚   460 โ”‚   โ”‚   โ”‚   legacy_log_handler = self.output_logger.out("meltano", logger)                 โ”‚
โ”‚   461 โ”‚   โ”‚   โ”‚   with legacy_log_handler.redirect_logging():                                    โ”‚
โ”‚ โฑ 462 โ”‚   โ”‚   โ”‚   โ”‚   await self.run_with_job()                                                  โ”‚
โ”‚   463 โ”‚   โ”‚   โ”‚   โ”‚   return                                                                     โ”‚
โ”‚   464 โ”‚   โ”‚   else:                                                                              โ”‚
โ”‚   465 โ”‚   โ”‚   โ”‚   logger.warning(                                                                โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ”‚
โ”‚ ck\extract_load.py:494 in run_with_job                                                           โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   491 โ”‚   โ”‚                                                                                      โ”‚
โ”‚   492 โ”‚   โ”‚   with closing(self.context.session) as session:                                     โ”‚
โ”‚   493 โ”‚   โ”‚   โ”‚   async with job.run(session):                                                   โ”‚
โ”‚ โฑ 494 โ”‚   โ”‚   โ”‚   โ”‚   await self.execute()                                                       โ”‚
โ”‚   495 โ”‚                                                                                          โ”‚
โ”‚   496 โ”‚   async def terminate(self, graceful: bool = False) -> None:                             โ”‚
โ”‚   497 โ”‚   โ”‚   """Terminate an in flight ExtractLoad execution, potentially disruptive.           โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ”‚
โ”‚ ck\extract_load.py:454 in execute                                                                โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   451 โ”‚   โ”‚   async with self._start_blocks():                                                   โ”‚
โ”‚   452 โ”‚   โ”‚   โ”‚   await self._link_io()                                                          โ”‚
โ”‚   453 โ”‚   โ”‚   โ”‚   manager = ELBExecutionManager(self)                                            โ”‚
โ”‚ โฑ 454 โ”‚   โ”‚   โ”‚   await manager.run()                                                            โ”‚
โ”‚   455 โ”‚                                                                                          โ”‚
โ”‚   456 โ”‚   async def run(self) -> None:                                                           โ”‚
โ”‚   457 โ”‚   โ”‚   """Run the ELT task."""                                                            โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ”‚
โ”‚ ck\extract_load.py:659 in run                                                                    โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   656 โ”‚   โ”‚   for IO to complete as appropriate. Expect a RunnerError to be raised if            โ”‚
โ”‚   657 โ”‚   โ”‚   any of the blocks exit with a non 0 exit code.                                     โ”‚
โ”‚   658 โ”‚   โ”‚   """                                                                                โ”‚
โ”‚ โฑ 659 โ”‚   โ”‚   await self._wait_for_process_completion(self.elb.head)                             โ”‚
โ”‚   660 โ”‚   โ”‚   _check_exit_codes(                                                                 โ”‚
โ”‚   661 โ”‚   โ”‚   โ”‚   self._producer_code,                                                           โ”‚
โ”‚   662 โ”‚   โ”‚   โ”‚   self._consumer_code,                                                           โ”‚
โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
`
Copy code
C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\blo โ”‚
โ”‚ ck\extract_load.py:732 in _wait_for_process_completion                                           โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   729 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   line_length_limit=self.line_length_limit,                              โ”‚
โ”‚   730 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   stream_buffer_size=self.stream_buffer_size,                            โ”‚
โ”‚   731 โ”‚   โ”‚   โ”‚   โ”‚   )                                                                          โ”‚
โ”‚ โฑ 732 โ”‚   โ”‚   โ”‚   raise output_futures_failed.exception()  # noqa: RSE102                        โ”‚
โ”‚   733 โ”‚   โ”‚   else:                                                                              โ”‚
โ”‚   734 โ”‚   โ”‚   โ”‚   # If all the output handlers completed without raising an                      โ”‚
โ”‚   735 โ”‚   โ”‚   โ”‚   # exception, we still need to wait for all the underlying block                โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\log โ”‚
โ”‚ ging\utils.py:233 in capture_subprocess_output                                                   โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   230 โ”‚   โ”‚   โ”‚   continue                                                                       โ”‚
โ”‚   231 โ”‚   โ”‚                                                                                      โ”‚
โ”‚   232 โ”‚   โ”‚   for writer in line_writers:                                                        โ”‚
โ”‚ โฑ 233 โ”‚   โ”‚   โ”‚   if not await _write_line_writer(writer, line):                                 โ”‚
โ”‚   234 โ”‚   โ”‚   โ”‚   โ”‚   # If the destination stream is closed, we can stop capturing output.       โ”‚
โ”‚   235 โ”‚   โ”‚   โ”‚   โ”‚   return                                                                     โ”‚
โ”‚   236                                                                                            โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\pipx\pipx\venvs\meltano\lib\site-packages\meltano\core\log โ”‚
โ”‚ ging\utils.py:201 in _write_line_writer                                                          โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   198 โ”‚   โ”‚   โ”‚   writer.write(line)                                                             โ”‚
โ”‚   199 โ”‚   โ”‚   โ”‚   await writer.drain()                                                           โ”‚
โ”‚   200 โ”‚   โ”‚   except (BrokenPipeError, ConnectionResetError):                                    โ”‚
โ”‚ โฑ 201 โ”‚   โ”‚   โ”‚   await writer.wait_closed()                                                     โ”‚
โ”‚   202 โ”‚   โ”‚   โ”‚   return False                                                                   โ”‚
โ”‚   203 โ”‚   else:                                                                                  โ”‚
โ”‚   204 โ”‚   โ”‚   writer.writeline(line.decode())                                                    โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\streams.py:343 in    โ”‚
โ”‚ wait_closed                                                                                      โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   340 โ”‚   โ”‚   return self._transport.is_closing()                                                โ”‚
โ”‚   341 โ”‚                                                                                          โ”‚
โ”‚   342 โ”‚   async def wait_closed(self):                                                           โ”‚
โ”‚ โฑ 343 โ”‚   โ”‚   await self._protocol._get_close_waiter(self)                                       โ”‚
โ”‚   344 โ”‚                                                                                          โ”‚
โ”‚   345 โ”‚   def get_extra_info(self, name, default=None):                                          โ”‚
โ”‚   346 โ”‚   โ”‚   return self._transport.get_extra_info(name, default)                               โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py:3 โ”‚
โ”‚ 85 in _loop_writing                                                                              โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   382 โ”‚   โ”‚   โ”‚   self._write_fut = None                                                         โ”‚
โ”‚   383 โ”‚   โ”‚   โ”‚   self._pending_write = 0                                                        โ”‚
โ”‚   384 โ”‚   โ”‚   โ”‚   if f:                                                                          โ”‚
โ”‚ โฑ 385 โ”‚   โ”‚   โ”‚   โ”‚   f.result()                                                                 โ”‚
โ”‚   386 โ”‚   โ”‚   โ”‚   if data is None:                                                               โ”‚
โ”‚   387 โ”‚   โ”‚   โ”‚   โ”‚   data = self._buffer                                                        โ”‚
โ”‚   388 โ”‚   โ”‚   โ”‚   โ”‚   self._buffer = None                                                        โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py:82 โ”‚
โ”‚ 6 in _poll                                                                                       โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   823 โ”‚   โ”‚   โ”‚   # if the overlapped has been cancelled                                         โ”‚
โ”‚   824 โ”‚   โ”‚   โ”‚   elif not f.done():                                                             โ”‚
โ”‚   825 โ”‚   โ”‚   โ”‚   โ”‚   try:                                                                       โ”‚
โ”‚ โฑ 826 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   value = callback(transferred, key, ov)                                 โ”‚
โ”‚   827 โ”‚   โ”‚   โ”‚   โ”‚   except OSError as e:                                                       โ”‚
โ”‚   828 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   f.set_exception(e)                                                     โ”‚
โ”‚   829 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   self._results.append(f)                                                โ”‚
โ”‚                                                                                                  โ”‚
โ”‚ C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py:55 โ”‚
โ”‚ 2 in finish_send                                                                                 โ”‚
โ”‚                                                                                                  โ”‚
โ”‚   549 โ”‚   โ”‚                                                                                      โ”‚
โ”‚   550 โ”‚   โ”‚   def finish_send(trans, key, ov):                                                   โ”‚
โ”‚   551 โ”‚   โ”‚   โ”‚   try:                                                                           โ”‚
โ”‚ โฑ 552 โ”‚   โ”‚   โ”‚   โ”‚   return ov.getresult()                                                      โ”‚
โ”‚   553 โ”‚   โ”‚   โ”‚   except OSError as exc:                                                         โ”‚
โ”‚   554 โ”‚   โ”‚   โ”‚   โ”‚   if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,                     โ”‚
โ”‚   555 โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   โ”‚   _overlapped.ERROR_OPERATION_ABORTED):                  โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
BrokenPipeError: [WinError 109] The pipe has been ended

Exception ignored in: <function BaseSubprocessTransport.__del__ at 0x0000022A33789240>
Traceback (most recent call last):
  File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 125, in __del__
    _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
  File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 80, in __repr__
    info.append(f'stderr={stderr.pipe}')
  File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 80, in __repr__
    info.append(f'fd={self._sock.fileno()}')
  File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_utils.py", line 102, in fileno
    raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x0000022A3378AD40>
Traceback (most recent call last):
  File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 116, in __del__
    _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
  File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 80, in __repr__
    info.append(f'fd={self._sock.fileno()}')
  File "C:\Users\joshua.janicas\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_utils.py", line 102, in fileno
    raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe
b
BatchConfig.__init__() missing 2 required positional arguments: 'encoding' and 'storage'
๐Ÿง™โ€โ™‚๏ธ 1
j
im guessing we can also include those into the batch_config ?
i suppose whatever values the default 10K provides...but i gotta find that
b
There is a default batch config the append
Copy code
DEFAULT_BATCH_CONFIG = {
    "encoding": {"format": "jsonl", "compression": "gzip"},
    "storage": {"root": "file://"},
}
Give this a try
Copy code
plugins:
  loaders:
  - name: target-snowflake
    variant: meltanolabs
    pip_url: meltanolabs-target-snowflake
    config:
      add_record_metadata: false # Can enable if we want more metadata
      #account: See .ENV
      #database: TS See .ENV 
      #user: See .ENV
      #role: See .ENV
      #warehouse: See .ENV
      #password: See .ENV
      default_target_schema: Raw # ${MELTANO_EXTRACT__LOAD_SCHEMA} # Meltano chooses the schema based on the `name` of the extractor
      hard_delete: false
      batch_size_rows: 150000
      batch_config:
        batch_size: 150000
        encoding:
          format: jsonl
          compression: gzip
        storage:
          root: "file://"
j
Copy code
batch_config:
        batch_size: 150000
        encoding: {"format": "jsonl", "compression": "gzip"}
        storage: {"root": "file://"}
b
I think that is a good sign?
j
yesss
b
ok cool. Then YAY!!!!
j
aw it didnt copy the gif, but its the high five one haha
thank so much
b
You are welcome. Glad I could help.
j
It went from taking 6 minutes to run 1 million rows to 1 minute
๐ŸŽ‰ 1
b
Very Cool. I think you can remove
batch_size_rows
and it will still work. I hate to say that but I think it will.
j
a reasonable assumption, let me see
nope, we still need it. If I remove it it reverts back to 10 K chunks, because I bet you that's what it defaults to drawing from the source
b
Hmm that shocks me to be honest
@Edgar Ramรญrez (Arch.dev) we have been working with
target_snowflake
and found we need to add in the
batch_config
and
batch_size_rows
to change how many rows get written to a batch file. Here is an example of the config below. I was wondering if this would be and SDK change or Target change to make this simpler for the user?
Copy code
plugins:
  loaders:
  - name: target-snowflake
    variant: meltanolabs
    pip_url: meltanolabs-target-snowflake
    config:
      add_record_metadata: false # Can enable if we want more metadata
      #account: See .ENV
      #database: TS See .ENV 
      #user: See .ENV
      #role: See .ENV
      #warehouse: See .ENV
      #password: See .ENV
      default_target_schema: Raw # ${MELTANO_EXTRACT__LOAD_SCHEMA} # Meltano chooses the schema based on the `name` of the extractor
      hard_delete: false
      batch_size_rows: 150000
      batch_config:
        batch_size: 150000
        encoding:
          format: jsonl
          compression: gzip
        storage:
          root: "file://"
I wondering if it is a easy as changing the
DEFAULT_BATCH_CONFIG
for
target-snowflake
to look like this and move it inside the
SnowflakeSink
class?
Copy code
DEFAULT_BATCH_CONFIG = {
    "batch_size": self.max_size
    "encoding": {"format": "jsonl", "compression": "gzip"},
    "storage": {"root": "file://"},
}
e
Yeah, that probably makes sense. I think some confusion seeped in the target-snowflake implementation ๐Ÿ˜…, it shouldn't be using the batch config to decide how many rows to put in the temporary batch files it uses to load data. I don't think users need or want to control how target-snowflake loads its data beyond the batch size ๐Ÿคทโ€โ™‚๏ธ.
b
Oh, I wonder if it takes input from the
batch_config
in the
meltano.yml
because it passes
self.batch_config
when it creates
batcher
. I know there is a property for
batch_config
in the
SnowflakeSink
class but I don't think the property is used if the
batch_config
is present in the
meltano.yml
?
Copy code
# serialize to batch files and upload
        # TODO: support other batchers
        batcher = JSONLinesBatcher(
            tap_name=self.target.name,
            stream_name=self.stream_name,
            batch_config=self.batch_config,
        )
e
Yeah,
JSONLinesBatcher
should be getting a batch config object built on the fly with the value of
batch_size_rows
and the other jsonl default values.