I’ve been working on a tap where a top-level strea...
# singer-tap-development
a
I’ve been working on a tap where a top-level stream result contains several “sub-streams” (arrays of objects). I would like to specify the top-level stream and each sub-stream as a
Stream
entity within the SDK, but I’m unsure about how best to sync the sub-stream records from the parent. Example: Issue endpoint that returns an array of comments on that issue:
Copy code
{
  "issue_id": "i001",
  "comments": [
    {
      "comment_id": "c1",
      "text": ":+1:"
    },
    {
      "comment_id": "c2",
      "text": ":-1:"
    }
  ]
}
I was able to get it working by instantiating and syncing the 
CommentStream
 from 
IssueStream.post_process
 like so:
Copy code
class CommentStream(Stream):
    def __init__(self, *args, records: Optional[List] = None, **kwargs):
        super().__init__(*args, **kwargs)
        self.records = records or []

    def get_records(self, context: Optional[dict]) -> Iterable[Union[dict, Tuple[dict, dict]]]:
        for row in self.records:
            yield row

class IssueStream(RESTStream):
    def post_process(self, row: dict, context: Optional[dict] = None) -> dict:
        issue_comments = row.pop('comments', None)
        if issue_comments:
            CommentStream(tap=self._tap, records=issue_comments).sync()
But, this results in the 
SCHEMA
 record and logging being sent for every 
sync
 call.
I considered using the parent-child pattern, but passing the entire row via 
get_child_context
 didn’t seem like the right option either.
Thoughts?
a
@alex_levene I think the parent child stream mechanism should work for this. When sending the parent row in "get_child_context" the dictionary record already exists so it doesn't consomme extra resources per se to pass it along. Another option is to return a tuple from the parent's "get_records" method, with the second item in the tuple being the child context.
Does this work?
a
Thanks for the feedback. If I implement the parent-child pattern, does that mean the context (the record) will automatically be stored under the Comment state output?
I did just try the parent-child method, and it’s throwing errors resolving the state message for a
context
dictionary value (the whole parent record) for the context. I’ve only ever provided primitive data types in
get_child_context
, not nested values.
Copy code
class IssuesStream:
    ...
    def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
        return {"record": record}
Copy code
Traceback (most recent call last):
  File "tap_dir/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "tap_dir/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "tap_dir/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/tap_base.py", line 332, in cli
    tap.sync_all()
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/tap_base.py", line 263, in sync_all
    stream.sync()
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 758, in sync
    self._sync_records(context)
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 710, in _sync_records
    self._sync_children(child_context)
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 763, in _sync_children
    child_stream.sync(context=child_context)
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 758, in sync
    self._sync_records(context)
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 710, in _sync_records
    self._sync_children(child_context)
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 763, in _sync_children
    child_stream.sync(context=child_context)
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 758, in sync
    self._sync_records(context)
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 741, in _sync_records
    self._write_state_message()
  File "tap_dir/lib/python3.8/site-packages/singer_sdk/streams/core.py", line 531, in _write_state_message
    singer.write_message(singer.StateMessage(value=self.tap_state))
  File "tap_dir/lib/python3.8/site-packages/singer/messages.py", line 280, in write_message
    sys.stdout.write(format_message(message) + '\n')
  File "tap_dir/lib/python3.8/site-packages/singer/messages.py", line 276, in format_message
    return json.dumps(message.asdict(), use_decimal=True)
  File "tap_dir/lib/python3.8/site-packages/simplejson/__init__.py", line 380, in dumps
    return _default_encoder.encode(obj)
  File "tap_dir/lib/python3.8/site-packages/simplejson/encoder.py", line 291, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "tap_dir/lib/python3.8/site-packages/simplejson/encoder.py", line 373, in iterencode
    return _iterencode(o, 0)
ValueError: Circular reference detected