I'm a noob to sqlalchemy, I know I could switch t...
# singer-targets
v
I'm a noob to sqlalchemy, I know I could switch to using native SQL Alchmy Objects here but I'm kind of on a mission at the moment to figure this out (I should probably give up but I'm curious) 🧵
Where does ::JSON[] come from in this insert
Copy code
2022-10-28 11:22:22,789 INFO sqlalchemy.engine.Engine INSERT INTO data (data) VALUES (%(data)s::JSON[]) RETURNING data.id
Code
Copy code
from sqlalchemy import Table, Column, MetaData, Integer, Computed, create_engine, Identity, String, ARRAY, JSON

url = "<postgresql://postgres:postgres@localhost:5432/postgres>"
engine = create_engine(url, echo=True)
metadata = MetaData()
data = Table("data",
    metadata,
    Column(
        'id', Integer, Identity(start=42, cycle=True), primary_key=True
    ),
    Column('data', ARRAY(JSON))
)
with engine.connect() as conn:
    #data.create(conn)
    stmt = data.insert().values(data=[{"hi":"Hello"}, {"bye": "Goodbye"}])
    conn.execute(stmt)
I'm trying to make this same thing work without the SQL ALchemy Table / Column types
Copy code
def bulk_insert_records(
         self,
         full_table_name: str,
         schema: dict,
         records: Iterable[Dict[str, Any]]
     ) -> Optional[int]:     
         Args:
            full_table_name: the target table name.
            schema: the JSON schema for the new table, to be used when inferring column
                names.
            records: the input records.

        Returns:
            True if table exists, False if not, None if unsure or undetectable.
        """
        insert_sql = self.generate_insert_statement(
            full_table_name,
            schema,
        )
        insert_sql = sqlalchemy.text(insert_sql)
        breakpoint()
        if records[0].get("languages") is not None:
            insert_sql.bindparams(bindparam("languages", type_=ARRAY(JSON)))
        insert_sql.columns()

        <http://self.logger.info|self.logger.info>("Inserting with SQL: %s", insert_sql)
        columns = self.column_representation(schema)

        jsonb_columns = []
        array_columns = []
        for column in columns:
            if type(column.type) == sqlalchemy.dialects.postgresql.JSONB:
                jsonb_columns.append(column)
        for column in columns:
            if type(column.type) == sqlalchemy.dialects.postgresql.ARRAY:
                array_columns.append(column)
        for record in records:
            for jsonb_column in jsonb_columns:
                record[jsonb_column.name]=psycopg2.extras.Json(record[jsonb_column.name])
            for array_column in array_columns:
                try:
                    record[array_column.name]=sqlalchemy.dialects.postgresql.array([psycopg2.extras.Json(record[array_column.name][0])])
                except IndexError:
                    record[array_column.name]=sqlalchemy.dialects.postgresql.array([psycopg2.extras.Json({"a":"a"})])

        self.connector.connection.execute(insert_sql, records)
        return True
I know it'd be easier and better to just convert now to sql alchemy constructs so I'm going to just go that route but in my quest to understand sqlalchemy better this would be helpful to me
I think me using
bindparams
is just wrong but I can't find what would be right. Is the %s ::JSON[] happening in DBAPI based on the type from the records passed in here?
Copy code
self.connector.connection.execute(insert_sql, records)
c
Defo not a sqlalchemy expert, but at a glance--isn't this the behavior you'd want? The
data
column is a JSON array and sqlalchemy is just doing an explicit type cast to that type on the inserted data.
v
Yes the behvaior is exactly right, sorry I'm not being clear. I'm trying to replicate that behavior in the code block below without using the SQL Alchemy
Table
/
Column
constructs.
I'd assume the Column type definition from
Copy code
Column('data', ARRAY(JSON))
Somehow maps to
%(data)s::JSON[]
But grepping for ``JSON[]`` in the sqlalchemy code base, and psycopg2 isn't leading me anywhere 😕
b
Maybe try the following to create your insert statement. It might see the JSON and conform the insert for them
insert_sql = sqlalchemy.dialects.postgresql.insert(self.connector.get_table(full_table_name))
v
Yeah that's much better I'm just going off the deep end trying to understand sqlalchemy internals
b
Completely get it there is a ton. The documents are great but broken up between base core and dialect core a little and now there is the 1.x to 2.x items to contend with. Meltano and the SDK aligns with Core not ORM as far as I can tell.
v
I was curious enough I dove into this on lunch today. http://aosabook.org/en/sqlalchemy.html is a good architecture overview of SQLAlchemy. During compilation of the insert, we hit
sqlalchemy/dialects/postgresql/base.py(3062)visit_ARRAY()
Copy code
3055        def visit_ARRAY(self, type_, **kw):
3056
3057            inner = self.process(type_.item_type, **kw)
3058            breakpoint()
3059            return re.sub(
3060                r"((?: COLLATE.*)?)$",
3061                (
3062 ->                 r"%s\1"
3063                    % (
3064                        "[]"
3065                        * (type_.dimensions if type_.dimensions is not None else 1)
3066                    )
3067                ),
3068                inner,
3069                count=1,
3070            )
This gives us the
JSON[]
Next in the chain we hit
Copy code
> /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/base.py(2221)visit_colon_cast()->'%(data)s::JSON[]
Stack trace here ```(Pdb) where /home/visch/git/playgrounds/sqlalchemyplay/play.py(16)<module>() -> conn.execute(stmt) /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py(1380)execute() -> return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS) /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/elements.py(333)_execute_on_connection() -> return connection._execute_clauseelement( /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py(1564)_execute_clauseelement() -> compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/elements.py(531)_compile_w_cache() -> compiled_sql = self._compiler( /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/elements.py(566)_compiler() -> return dialect.statement_compiler(dialect, self, **kw) /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/compiler.py(804)__init__() -> Compiled.__init__(self, dialect, statement, **kwargs) /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/compiler.py(463)__init__() -> self.string = self.process(self.statement, **compile_kwargs) /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/compiler.py(498)process() -> return obj._compiler_dispatch(self, **kwargs) /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py(82)_compiler_dispatch() -> return meth(self, **kw) /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/compiler.py(3889)visit_insert() -> crud_params = crud._get_crud_params( /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/crud.py(149)_get_crud_params() -> _scan_cols( /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/crud.py(401)_scan_cols() -> _append_param_parameter( /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/crud.py(511)_append_param_parameter() -> value = _handle_values_anonymous_param( /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/crud.py(252)_handle_values_anonymous_param() -> return value._compiler_dispatch(compiler, **kw) /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py(82)_compiler_dispatch() -> return meth(self, **kw) /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8/site-packages/sqlalchemy/sql/compiler.py(2437)visit_bindparam() -> wrapped = self.process( /home/visch/git/playgrounds/sqlalchemyplay/.venv/lib/python3.8…
b
Wow, that is a lot of great info. Thanks for sharing it.