niclas_roos
02/05/2023, 10:14 AMrow = {
'invoice_id': 1234,
'customer_id': 5678,
'invoice_lines': [
{'invoice_line_id': 1, 'value': 100},
{'invoice_line_id': 2, 'value': 300}]}
And I want the parent stream to output the following to the target:
invoice_stream_row = {
'invoice_id': 1234,
'customer_id': 5678}
Next, I want to send row['invoice_id'] and row['invoice_lines'] to get_child_context() and pick them up from context as if they were a list of records to be handle by the stream, like any response would've been handled, and the yield the following rows as output of the child stream:
invoice_line_stream_rows = [
{'invoice_id': 1234, 'invoice_line_id': 1, 'value': 100},
{'invoice_id': 1234, 'invoice_line_id': 2, 'value': 300}]
Any thoughts?visch
02/05/2023, 9:03 PMmeltanonic (or whatever the equivalent to pythonic is for meltano)❤️ this phrasing The most meltanoic way (this is opinionated of course) of doing this especially if you're using the meltano sdk would be to leave all this data in the parent stream, not write any additional code by using the
flattening_enabled
config. Docs are a bit lacking in the SDK https://github.com/meltano/sdk/issues/717 is the best write up right now I think 😄. There are some targets
I believe that will parse a json schema list
and "flatten" them to a down stream table
for you. Which is probably the easiest way here. I don't think flattening_enabled
does this today (I could be wrong I haven't looked at the code)
But with the structure of your data this may not work after a second look.
Leaving the data alone and doing the parsing in DBT is probably the easiest way forward. I think it's also the most "singer" way to do this as we try to match the source system as closely as possible, doing any transformation on the data we try to leave up to something else.
Finally, how to do this in the SDK if you really want to (probably not the most meltanic thing as I went over above but it's good to have an answer here). I think I'd pass invoice_lines
to the get_child_context
Then I"d probably make my own stream
class like class ListToStream(Stream)
, then probably do this with get_records
def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
"""Return a generator of record-type dictionary objects.
Each record emitted should be a dictionary of property names to their values.
Args:
context: Stream partition or context dictionary.
Yields:
One item per (possibly processed) record in the API.
"""
for record in context['invoice_lines']:
record['invoice_id']=context['invoice_id']
yield record
if you wanted you could use an yield from extract_jsonpath(self.records_jsonpath, input=response.json())
like https://github.com/meltano/sdk/blob/984761e6db5e6352e2fd33cc7352e44943ecb9f9/singer_sdk/streams/rest.py#L560 instead but that's a bit overkill imo.niclas_roos
02/05/2023, 10:13 PM{"name": {"first_name": "Jane", "last_name": "Doe}} and turn it into {"name__first_name": "Jane", "name__last_name": "Doe"}
which is not what I'm trying to do. My current approach is to use dbt, but I have the schemas for all the sub-objects as well and it would be really neat to make the tap return as much as possible, without the need for another dependency.
The example that you describe is pretty close to what I had in mind, but I'm still not sure how to "turn off" the request.visch
02/05/2023, 11:32 PMStream
doesn't care how data is pulled, overriding RESTStream
still may be easierniclas_roos
02/06/2023, 1:40 PMRecursionError: maximum recursion depth exceeded while calling a Python object
for everything else:
class ListToStream(Stream):
parent_id = None
parent_id_key = None
list_id = None
def get_records(self, context: Optional[dict]) -> Iterable[dict[str, Any]]:
"""Return a generator of record-type dictionary objects.
Each record emitted should be a dictionary of property names to their values.
Args:
context: Stream partition or context dictionary.
Yields:
One item per (possibly processed) record in the API.
"""
parent_record = context.get('parent_record')
for record in parent_record.get(self.list_id):
record[self.parent_id_key] = parent_record.get(self.parent_id)
yield record
def post_process(self, row: dict, context: Optional[dict]) -> dict:
"""As needed, append or transform raw data to match expected structure."""
row = {stringcase.snakecase(key) if isinstance(key, str) else key: value for key, value in row.items()}
row['legal_entity'] = self.config.get('legal_entity')
if self.replication_key:
replication_key_value = self.get_starting_replication_key_value(context)
if replication_key_value == row['id']:
row = None
return row
class AccountsStream(youniumStream):
"""Define custom stream."""
name = "accounts"
path = "/Accounts"
primary_keys = ["id"]
replication_key = None
schema = convert_schema('AccountResponse')
def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return the ticket_id for use by child streams."""
return {'parent_record': record}
class AccountAddresssesStream(ListToStream):
name = "account_addresses"
primary_keys = ['id']
parent_stream_type = AccountsStream
parent_id = 'id'
parent_id_key = 'account_id'
list_id = 'addresses'
schema = convert_schema(
resource_name='AccountResponse',
child_key='addresses',
parent_id_key=parent_id_key)
visch
02/06/2023, 1:43 PMaaronsteers
02/06/2023, 6:26 PMget_records()
actually can support two yield options. You can yield records or you can yield a tuple of (record, child_context)
.
https://sdk.meltano.com/en/latest/classes/singer_sdk.Stream.html#singer_sdk.Stream.get_records
Parent streams can optionally return a tuple, in which case the second item in the tuple being a child_context dictionary for the stream’s context.
aaronsteers
02/06/2023, 6:28 PMtap-tableau
and can be found here.
In that case, we're iterating on Tableau workbook objects, which are inherently large and would be difficult and expensive to recreate or iterate on again in a fresh call to get_child_context()
. Could work for your use case as well, passing some arbitrary fields from the parent object to the child class.niclas_roos
02/08/2023, 1:16 PMken_payne
02/08/2023, 6:10 PMrecord
and I pass the nested entities as context to the child stream. Full example here 🙂ken_payne
02/08/2023, 6:13 PMchild_context
yielded in the tuple by the parents get_records()
is passed as the context
argument of the childs get_records()
method, where you can access the data you want directly. Hope that makes sense 😅ken_payne
02/08/2023, 6:14 PMniclas_roos
02/09/2023, 10:19 PMstate_partitioning_keys = []
once I added that it works perfectly!