I’m working on a custom tap that will discover dyn...
# singer-tap-development
k
I’m working on a custom tap that will discover dynamic streams for tables/collections in a database. Most of the examples have simple streams with fixed information. I’m having trouble finding the appropriate way with the Meltano SDK to set metadata for each stream to represent what database and table the stream should use. Any examples available for something like this or a pointer in the right direction perhaps? Thanks!
v
Which database are you trying to pull from?
k
I’m writing a custom tap to pull data from a series of MongoDB databases. I’ve completed the discover phase but need to store details from discovery of the source data for when the sync is going to run. Hopefully that makes sense. For example, the regular tap-mongodb does this without the singer SDK by writing metadata for the stream. https://github.com/singer-io/tap-mongodb/blob/49878ec2c05dac560335c4d8fa9d4b2cd77178f1/tap_mongodb/__init__.py#L128 I’m trying to do something similar to return metadata with my streams returned from discovery that persists to the catalog file.
v
https://gitlab.com/meltano/sdk/-/blob/74-database-type-streams/singer_sdk/streams/database.py#L167 is how @aaronsteers did it with SQL Alchemy, see https://github.com/MeltanoLabs/tap-athena/tree/master / https://github.com/MeltanoLabs/tap-sqlalchemy Mongo is a different beast though, the point is to not have a schema in a document db I believe? I know little about Mongo
Well there's still schemas https://docs.mongodb.com/realm/mongodb/document-schemas/ I'm outside of my zone here so this Mongo stuff is just me spit balling
https://docs.mongodb.com/realm/mongodb/enforce-a-document-schema/#generate-a-schema looks like there's an idea here around generating a schema, I'd guess there's some tooling around this in the Mongo world. I'd guess you'd find something better than https://github.com/wolverdude/GenSON , if you do let me know!!
k
It’s not really about the schema as much as how to include metadata for the catalog entry.
I was wondering if there was common construct for adding metadata in the singer SDK. Maybe I have to create the singer CatalogEntry like that SQLStream
v
k
Thanks! I’ll look into it some more.
a
@kevin_mullins - @visch pointed you in the right direction with the link to
get_standard_metadata()
. This gives a valid default metadata construct, provided your declared schema. Can you say more about your use case here? Are you wanting to override metadata such as
automatic
and
unsupported
fields?
k
@aaronsteers - I’m trying to add metadata/information to the catalog for the source MongoDB database and collection names. That way when the catalog is used for a sync I know how what database and catalog to go connect to. Very similar to your Athena use case discover schemas and tables, it will discover databases and collections to sync.
a
I see. So, I think you want
database-name
,
table-name
,
schema-name
, etc. for identifying the upstream links. Yes?
k
Yup, did I miss if there is something already for this?
a
We don't have a built-in way to do that yet, but I imagine it should be pretty easy to override if we find the right place to do so.
k
I’m actually seeing some more things you accomplished in the Athena tap that I would be interested in. Most specifically the round tripping of the catalog entry to prevent having to connect and run the discovery code when syncing.
The structure you provided will solve both the problems for now, allowing me to make sure the catalog entry has the info and preventing querying everything again on a sync
a
Fantastic!
If you don't mind sharing your lessons learned back with us, that would be much appreciated. We've got a database-stream-type MR open and have been piloting a few taps with that prerelease branch, but those would not fully solve the MongoDB use case, since I don't think that has SQLAlchemy support. being a key-value store and not traditional relational model.
This spec is fairly settled at this point, may have small changes but (hopefully) nothing too huge between now and release. We tuned first for SQLAlchemy, since most traditional db types have SQLAlchemy support, but perhaps there are patterns here that you could leverage as well.
k
No problem! I am sure I will end up with some more questions (ex: state handling). I’m essentially writing a more advanced MongoDB tap that support multiple databases and change streams.
After this round I’ll have to figure out how to extract some data from Azure SQL Databases, that SQLStream work may intersect there.
a
Your Azure SQL stream will be a cakewalk after Mongo 🙂
k
@aaronsteers - FYI, I don’t know if your Athena tap would work with the current main branch. I ran into a little snag. It seems that after this change (https://gitlab.com/meltano/sdk/-/commit/167e6c580f34a34acf4bc52d844bca47cb54e4b0) during stream registration, the sdk is expecting the catalog_entry.metadata to be a MetadataMapping class from _singer.py instead of the raw list that singer.metadata.get_standard_metadata returns. https://gitlab.com/meltano/sdk/-/blob/main/singer_sdk/mapper.py#L502
Copy code
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/kevin/source/pocs/fb-tap-mongodb/.venv/lib/python3.8/site-packages/click/core.py", line 1137, in __call__
    return self.main(*args, **kwargs)
  File "/Users/kevin/source/pocs/fb-tap-mongodb/.venv/lib/python3.8/site-packages/click/core.py", line 1062, in main
    rv = self.invoke(ctx)
  File "/Users/kevin/source/pocs/fb-tap-mongodb/.venv/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/kevin/source/pocs/fb-tap-mongodb/.venv/lib/python3.8/site-packages/click/core.py", line 763, in invoke
    return __callback(*args, **kwargs)
  File "/Users/kevin/source/pocs/fb-tap-mongodb/.venv/lib/python3.8/site-packages/singer_sdk/tap_base.py", line 359, in cli
    tap = cls(  # type: ignore  # Ignore 'type not callable'
  File "/Users/kevin/source/pocs/fb-tap-mongodb/.venv/lib/python3.8/site-packages/singer_sdk/tap_base.py", line 62, in __init__
    self.mapper.register_raw_streams_from_catalog(
  File "/Users/kevin/source/pocs/fb-tap-mongodb/.venv/lib/python3.8/site-packages/singer_sdk/mapper.py", line 372, in register_raw_streams_from_catalog
    catalog_entry.metadata.resolve_selection(),
AttributeError: 'list' object has no attribute 'resolve_selection'
a
Thanks, @kevin_mullins, for calling this out. I hadn't seen that fail on my side yet but I'll definitely take a look and see if we can patch that.