I wanted to pass this along incase other SDK based...
# singer-target-development
b
I wanted to pass this along incase other SDK based SQL Target developers might have run into this. I was seeing a database session per Sink. The sessions would stay until the
meltano run
was complete. I found that if you add a
target_connector
property in your Targets's class in `target.py`:
Copy code
default_connector_class = mssqlConnector
    _target_connector: SQLConnector = None

    @property
    def target_connector(self) -> SQLConnector:
        """The connector object.

        Returns:
            The connector object.
        """
        if self._target_connector is None:
            self._target_connector = self.default_connector_class(dict(self.config))
        return self._target_connector
Then you copy over the
get_sink
and
add_sink
methods from the
Target
class in
target_base.py
. Since
add_sink
is a decorated as
@final
you need to rename it to
add_sqlsink
because it can't be overloaded. Edit
add_sqlsink
to pass
self.target_connector
to the sink class being initialized. Then change
get_sink
to call
add_sqlsink
instead of
add_sink
. I will put examples of these in a reply. This allows Sinks to utilize one SQLAlchemy connection pool. When doing test loads of databases with 80 tables or more I only see a hand full (5) sessions at most. I just merged this change to a couple personal use target projects target-postgress--buzzcutnorman and target-mssql--buzzcutnorman if you want to see a complete example. If you give this a try in your own SDK based SQL Target please let me know what the result was, good or bad.
Here are the examples of the changes to
add_sqlsink
and
get_sink
I mentioned:
Copy code
def add_sqlsink(

		... comments and code removed to show the change better
		
        sink = sink_class(
            target=self,
            stream_name=stream_name,
            schema=schema,
            key_properties=key_properties,
            connector=self.target_connector,
        )
		
		... some code removed to assist with clarity
		
        return sink
   
    def get_sink(
		
		... comments and code removed to show the change better
		
        if not existing_sink:
            return self.add_sqlsink(stream_name, schema, key_properties)
		
		... some code removed to assist with clarity
        
		return existing_sink
p
@BuzzCutNorman this is great, do you know if theres an open issue in the SDK repo to discuss this? I think it would be a good thing to consider adding to the SDK or at least adding a similar implementation to limit the amount of open connections. cc @edgar_ramirez_mondragon @ken_payne
b
@pat_nadolny Thanks 🙏. I don't think there is an open issue.
p
Would you mind opening one? Any thoughts you have around how to best include it in the SDK and your experience from debugging your use case would be helpful
b
I don't mind opening an issue.
Here is the link to the issue I created. https://github.com/meltano/sdk/issues/1772