Hi! I'm trying to figure out if there's meltanonic...
# singer-tap-development
n
Hi! I'm trying to figure out if there's meltanonic (or whatever the equivalent to pythonic is for meltano) way to create a child stream that doesn't do another request, but simply parses a specified part of the parent stream row? Let's say this is one row from the parent stream:
Copy code
row = {
    'invoice_id': 1234,
    'customer_id': 5678,
    'invoice_lines': [
        {'invoice_line_id': 1, 'value': 100},
        {'invoice_line_id': 2, 'value': 300}]}
And I want the parent stream to output the following to the target:
Copy code
invoice_stream_row = {
    'invoice_id': 1234,
    'customer_id': 5678}
Next, I want to send row['invoice_id'] and row['invoice_lines'] to get_child_context() and pick them up from context as if they were a list of records to be handle by the stream, like any response would've been handled, and the yield the following rows as output of the child stream:
Copy code
invoice_line_stream_rows = [
    {'invoice_id': 1234, 'invoice_line_id': 1, 'value': 100},
    {'invoice_id': 1234, 'invoice_line_id': 2, 'value': 300}]
Any thoughts?
v
meltanonic (or whatever the equivalent to pythonic is for meltano)
❤️ this phrasing The most meltanoic way (this is opinionated of course) of doing this especially if you're using the meltano sdk would be to leave all this data in the parent stream, not write any additional code by using the
flattening_enabled
config. Docs are a bit lacking in the SDK https://github.com/meltano/sdk/issues/717 is the best write up right now I think 😄. There are some
targets
I believe that will parse a json schema
list
and "flatten" them to a down stream
table
for you. Which is probably the easiest way here. I don't think
flattening_enabled
does this today (I could be wrong I haven't looked at the code) But with the structure of your data this may not work after a second look. Leaving the data alone and doing the parsing in DBT is probably the easiest way forward. I think it's also the most "singer" way to do this as we try to match the source system as closely as possible, doing any transformation on the data we try to leave up to something else. Finally, how to do this in the SDK if you really want to (probably not the most meltanic thing as I went over above but it's good to have an answer here). I think I'd pass
invoice_lines
to the
get_child_context
Then I"d probably make my own
stream
class like
class ListToStream(Stream)
, then probably do this with
get_records
Copy code
def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
        """Return a generator of record-type dictionary objects.
        Each record emitted should be a dictionary of property names to their values.
        Args:
            context: Stream partition or context dictionary.
        Yields:
            One item per (possibly processed) record in the API.
        """
        for record in context['invoice_lines']:
            record['invoice_id']=context['invoice_id'] 
            yield record
if you wanted you could use an
yield from extract_jsonpath(self.records_jsonpath, input=response.json())
like https://github.com/meltano/sdk/blob/984761e6db5e6352e2fd33cc7352e44943ecb9f9/singer_sdk/streams/rest.py#L560 instead but that's a bit overkill imo.
n
Thanks for the feedback! As far as I've been able to make out, flattening will handle something like:
Copy code
{"name": {"first_name": "Jane", "last_name": "Doe}} and turn it into {"name__first_name": "Jane", "name__last_name": "Doe"}
which is not what I'm trying to do. My current approach is to use dbt, but I have the schemas for all the sub-objects as well and it would be really neat to make the tap return as much as possible, without the need for another dependency. The example that you describe is pretty close to what I had in mind, but I'm still not sure how to "turn off" the request.
v
If you look at how I overrode get_records vs the original function that's how! Note that in the class declaration I also mention that you don't need to extend the rest class here.
Stream
doesn't care how data is pulled, overriding
RESTStream
still may be easier
n
ok, here's my attempt, which works for the first record of the parent stream but then fails with a max recursion error
RecursionError: maximum recursion depth exceeded while calling a Python object
for everything else:
Copy code
class ListToStream(Stream):
    parent_id = None
    parent_id_key = None
    list_id = None

    def get_records(self, context: Optional[dict]) -> Iterable[dict[str, Any]]:
        """Return a generator of record-type dictionary objects.
        Each record emitted should be a dictionary of property names to their values.
        Args:
            context: Stream partition or context dictionary.
        Yields:
            One item per (possibly processed) record in the API.
        """
        parent_record = context.get('parent_record')

        for record in parent_record.get(self.list_id):
            record[self.parent_id_key] = parent_record.get(self.parent_id)
            yield record

    def post_process(self, row: dict, context: Optional[dict]) -> dict:
        """As needed, append or transform raw data to match expected structure."""
        
        row = {stringcase.snakecase(key) if isinstance(key, str) else key: value for key, value in row.items()}

        row['legal_entity'] = self.config.get('legal_entity')
        
        if self.replication_key:
            replication_key_value = self.get_starting_replication_key_value(context)
            if replication_key_value == row['id']:
                row = None
            
        return row
Copy code
class AccountsStream(youniumStream):
    """Define custom stream."""
    name = "accounts"
    path = "/Accounts"
    primary_keys = ["id"]
    replication_key = None

    schema = convert_schema('AccountResponse')

    def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
        """Return the ticket_id for use by child streams."""
        return {'parent_record': record}

class AccountAddresssesStream(ListToStream):
    name = "account_addresses"
    primary_keys = ['id']
    parent_stream_type = AccountsStream

    parent_id = 'id'
    parent_id_key = 'account_id'
    list_id = 'addresses'

    schema = convert_schema(
        resource_name='AccountResponse',
        child_key='addresses',
        parent_id_key=parent_id_key)
v
Stack trace would be very helpful, also if you could share the entire tap code that'd be easier too. Hard to tell from this without reading all of your code
a
Jumping in with another approach that is pretty niche, but was created with tap-tableau in mind... Very rarely used feature, but
get_records()
actually can support two yield options. You can yield records or you can yield a tuple of
(record, child_context)
. https://sdk.meltano.com/en/latest/classes/singer_sdk.Stream.html#singer_sdk.Stream.get_records
Parent streams can optionally return a tuple, in which case the second item in the tuple being a child_context dictionary for the stream’s context.
The first implementation of this was written by @ken_payne for
tap-tableau
and can be found here. In that case, we're iterating on Tableau workbook objects, which are inherently large and would be difficult and expensive to recreate or iterate on again in a fresh call to
get_child_context()
. Could work for your use case as well, passing some arbitrary fields from the parent object to the child class.
n
Thanks @aaronsteers, even though I'm not sure I follow. The weird thing is that my approach works for the first record but then fails for some reason
k
@niclas_roos this feature allows you to return the child data as context compiled by the parent, to save that child needing to fetch that context itself. In the Tableau example, I fetch a workbook that has some metadata and then lots of nested child entities. I yield the metadata as a the
record
and I pass the nested entities as context to the child stream. Full example here 🙂
The
child_context
yielded in the tuple by the parents
get_records()
is passed as the
context
argument of the childs
get_records()
method, where you can access the data you want directly. Hope that makes sense 😅
Happy to jump on Zoom if it would be helpful, as I am probably one of the few users of this great little SDK feature 👍
n
I finally got it to work, turns out I had missed setting
state_partitioning_keys = []
once I added that it works perfectly!