alex_levene
07/26/2021, 7:42 PMStream
entity within the SDK, but I’m unsure about how best to sync the sub-stream records from the parent.
Example: Issue endpoint that returns an array of comments on that issue:
{
"issue_id": "i001",
"comments": [
{
"comment_id": "c1",
"text": ":+1:"
},
{
"comment_id": "c2",
"text": ":-1:"
}
]
}
alex_levene
07/26/2021, 7:42 PMCommentStream
from IssueStream.post_process
like so:
class CommentStream(Stream):
def __init__(self, *args, records: Optional[List] = None, **kwargs):
super().__init__(*args, **kwargs)
self.records = records or []
def get_records(self, context: Optional[dict]) -> Iterable[Union[dict, Tuple[dict, dict]]]:
for row in self.records:
yield row
class IssueStream(RESTStream):
def post_process(self, row: dict, context: Optional[dict] = None) -> dict:
issue_comments = row.pop('comments', None)
if issue_comments:
CommentStream(tap=self._tap, records=issue_comments).sync()
alex_levene
07/26/2021, 7:42 PMSCHEMA
record and logging being sent for every sync
call.alex_levene
07/26/2021, 7:43 PMget_child_context
didn’t seem like the right option either.alex_levene
07/26/2021, 7:43 PMaaronsteers
07/26/2021, 8:02 PMaaronsteers
07/26/2021, 8:02 PMalex_levene
07/26/2021, 8:10 PMalex_levene
07/26/2021, 8:58 PMcontext
dictionary value (the whole parent record) for the context. I’ve only ever provided primitive data types in get_child_context
, not nested values.
class IssuesStream:
...
def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
return {"record": record}
Traceback (most recent call last):
File "tap_dir/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "tap_dir/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "tap_dir/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "tap_dir/lib/python3.8/site-packages/singer_sdk/tap_base.py", line 332, in cli
tap.sync_all()
File "tap_dir/lib/python3.8/site-packages/singer_sdk/tap_base.py", line 263, in sync_all
stream.sync()
File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 758, in sync
self._sync_records(context)
File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 710, in _sync_records
self._sync_children(child_context)
File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 763, in _sync_children
child_stream.sync(context=child_context)
File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 758, in sync
self._sync_records(context)
File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 710, in _sync_records
self._sync_children(child_context)
File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 763, in _sync_children
child_stream.sync(context=child_context)
File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 758, in sync
self._sync_records(context)
File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 741, in _sync_records
self._write_state_message()
File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 531, in _write_state_message
singer.write_message(singer.StateMessage(value=self.tap_state))
File "tap_dir/lib/python3.8/site-packages/singer/messages.py", line 280, in write_message
sys.stdout.write(format_message(message) + '\n')
File "tap_dir/lib/python3.8/site-packages/singer/messages.py", line 276, in format_message
return json.dumps(message.asdict(), use_decimal=True)
File "tap_dir/lib/python3.8/site-packages/simplejson/__init__.py", line 380, in dumps
return _default_encoder.encode(obj)
File "tap_dir/lib/python3.8/site-packages/simplejson/encoder.py", line 291, in encode
chunks = self.iterencode(o, _one_shot=True)
File "tap_dir/lib/python3.8/site-packages/simplejson/encoder.py", line 373, in iterencode
return _iterencode(o, 0)
ValueError: Circular reference detected