Hey how do you typically set JSON fields in the sc...
# troubleshooting
m
Hey how do you typically set JSON fields in the schema? I would like to return one column as a JSON structure. I’ve tried
th.ObjectType()
with various parameters,
th.StringType
, and some others. Not sure what the best practice is. I noticed that when I manually remove the
properties
key, the full json
platforms
field gets returned:
Copy code
SCHEMA = th.PropertiesList(
    th.Property("id", th.StringType),
    th.Property("platforms", th.ObjectType(additional_properties=True))
).to_dict()
SCHEMA['properties']['platforms'].pop('properties')
e
Hey @matt_elgazar! I think you're running into this bit of type-conforming code: https://github.com/meltano/sdk/blob/c4b5c39c092cb1cb26f08c66056560accf3ec8f7/singer_sdk/helpers/_typing.py#L453 You could try setting
TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT
in your stream class as an attribute. See https://github.com/meltano/sdk/blob/c4b5c39c092cb1cb26f08c66056560accf3ec8f7/singer_sdk/streams/core.py#L78. That said, I think an object field that uses
additional_properties
should not have its properties dropped so I opened https://github.com/meltano/sdk/issues/2300.
m
Hey thanks @Edgar Ramírez (Arch.dev). Is this correct? To set a specific field in a schema to be of type “json” I shouldn’t necessarily set it to a json type in the schema (e.g.
th.Property("platforms", th.ObjectType(additional_properties=True))
), but I need to change the
TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT
in the stream class? What should the type of the JSON field be? Should it still be set as
th.Property("platforms", th.ObjectType(additional_properties=True))
? the “platforms” field can either come in like
[{"key1": "value1"}, {"key2": "value2"}]
or
{"key1": "value1", "key2": "value2"}
e
So, the issue is that JSON schema doesn't recognize a generic "json" type (or that'd be truly any of number, integer, string, etc.). For your needs, you probably want either an empty schema for that field (literally
"my_field": {}
) or
anyOf
m
Ah ok. How do you create an empty schema? Sorry for the newbie question 😅do you mean
th.AnyType
?
e
Yeah, exactly
th.AnyType
!
1
m
Awesome yep that did it!
e
Cool!
(tbh I'd forgotten I added
AnyType
a while back 😅)
m
Ah wait it errors out in this case:
Copy code
Could not append type because the JSON schema for the dictionary `{}` appears to be invalid
Copy code
raise EmptySchemaTypeError cmd_type=extractor name=tap-coingecko run_id=cece1780-f758-4742-a61d-1b302a2af77f state_id=2024-03-08T210658--tap-coingecko--target-jsonl stdio=stderr
2024-03-08T21:06:58.948952Z [info     ] singer_sdk.helpers._typing.EmptySchemaTypeError: Could not detect type from empty type_dict. Did you forget to define a property in the stream schema?
Hmm @Edgar Ramírez (Arch.dev) do you know if there’s a parameter in `th.AnyType()`that allows empty data / nulls?
e
>
Copy code
Could not append type because the JSON schema for the dictionary `{}` appears to be invalid
This is a harmless (if annoying) warning Ah, yeah some type checks require a non-empty schema to avoid ambiguity 🤦🏻‍♂️. I got confused there, cause config json schemas can get away with it, but not records... I think you're better off using
anyOf
then:
Copy code
th.Property("platforms", th.CustomType({"anyOf": [{"type": "object"}, {"type": "array"}]}))
👍 1
m
That did it! Thanks again!
🙌 1
Hey @Edgar Ramírez (Arch.dev) actually noticing something weird. This schema runs perfectly fine with a jsonl target but fails with target-bigquery. I tried using different target-bigquery settings and still no luck. Anything you can think of / anyone you can tag that would probably know the answer? schema:
Copy code
th.Property(
                "thumbnail",
                th.CustomType({"anyOf": [{"type": "object"}, {"type": "array"}, {}]}),
            ),
Data example:
Copy code
{'resolutions': [{'url': '<https://s.yimg.com/uu/api/res/1.2/aAwfzsF_OIDSoMQL8Ie5cQ--~B/aD0zMTYyO3c9NTYxNjthcHBpZD15dGFjaHlvbg--/https://s.yimg.com/os/creatr-uploaded-images/2024-03/45bf03b0-e17a-11ee-bf9d-ece364da95b9>', 'width': 5616, 'height': 3162, 'tag': 'original'}, {'url': '<https://s.yimg.com/uu/api/res/1.2/m2wJePuwFe.MWvWtps517w--~B/Zmk9ZmlsbDtoPTE0MDtweW9mZj0wO3c9MTQwO2FwcGlkPXl0YWNoeW9u/https://s.yimg.com/os/creatr-uploaded-images/2024-03/45bf03b0-e17a-11ee-bf9d-ece364da95b9>', 'width': 140, 'height': 140, 'tag': '140x140'}]}
The error is
Copy code
{
  "cmd_type": "loader",
  "event": "google.api_core.exceptions.BadRequest: 400 POST: Field thumbnail is type RECORD but has no schema",
  "level": "info",
  "name": "target-bigquery",
  "run_id": "d9f2a5a8-3667-4be0-8f67-1971d3b46093",
  "state_id": "tmp",
  "stdio": "stderr",
  "timestamp": "2024-03-14T01:17:22.572535Z"
}
e
Hey @matt_elgazar. Which target-bigquery variant is this?
m
Copy code
loaders:
    - name: target-jsonl
      variant: andyh1203
      pip_url: target-jsonl

    - name: target-bigquery
      variant: z3z1ma
      pip_url: git+<https://github.com/z3z1ma/target-bigquery.git@2d59eae0aa4a5468ed8ba5d04e8d239e05e373ee>
      config:
        method: gcs_stage
        project: ${GCP_PROJECT_ID}
        credentials_path: ${GOOGLE_APPLICATION_CREDENTIALS}
        dataset: ${GCP_DATASET}
        batch_size: 100000
        denormalized: true
        bucket: ${GCP_BUCKET_NAME}
I checked and it successfully loaded into google cloud storage as a jsonl. Looks like the loading failed copying from GCS -> Bigquery
e
Thanks. So we can try adapting the schema to get the field type we want in bq. Looking at https://github.com/z3z1ma/target-bigquery/blob/5ee673f5dc33ccb0cce332aa9a2f6040114d12c4/target_bigquery/core.py#L799 it seems we want to steer it to land in
SchemaField(name, "JSON", "NULLABLE")
. It's already picking the first element of
anyOf
, so adding
patternProperties
to the
object
might do it.
m
Copy code
schema = th.PropertiesList(
            th.Property("timestamp_extracted", th.DateTimeType, required=True),
            th.Property("ticker", th.StringType),
            th.Property("link", th.StringType),
            th.Property("provider_publish_time", th.DateTimeType),
            th.Property("publisher", th.StringType),
            th.Property("related_tickers", th.ArrayType(th.StringType)),
            th.Property(
                "thumbnail",
                th.CustomType({"anyOf": [{"type": "object"}, {"type": "array"}, {}]}),
            ),
            th.Property("title", th.StringType),
            th.Property("type", th.StringType),
            th.Property("uuid", th.StringType),
        ).to_dict()

schema_property = schema.get('properties').get('thumbnail')
schema_property
Out[12]: {'anyOf': [{'type': 'object'}, {'type': 'array'}, {}, 'null']}

"anyOf" in schema_property and len(schema_property["anyOf"]) > 0
Out[13]: True

# we go down the first case at line 765

property_type = schema_property["anyOf"][0].get("type", "string")
property_format = schema_property["anyOf"][0].get("format", None)

Out[15]: 'object'
property_format

"array" in property_type
Out[17]: False

"object" in property_type
Out[18]: True

# we go down condition at line 789

(
  "properties" not in schema_property
   or len(schema_property["properties"]) == 0
   or "patternProperties" in schema_property
)
Out[19]: True

it returns SchemaField(name, "JSON", "NULLABLE")
e
ah gotcha. then I'm curious where is
Field thumbnail is type RECORD but has no schema
coming from then 🤔
m
are you able to load this json into target-bigquery?
Copy code
{'resolutions': [{'url': '<https://s.yimg.com/uu/api/res/1.2/aAwfzsF_OIDSoMQL8Ie5cQ--~B/aD0zMTYyO3c9NTYxNjthcHBpZD15dGFjaHlvbg--/https://s.yimg.com/os/creatr-uploaded-images/2024-03/45bf03b0-e17a-11ee-bf9d-ece364da95b9>', 'width': 5616, 'height': 3162, 'tag': 'original'}, {'url': '<https://s.yimg.com/uu/api/res/1.2/m2wJePuwFe.MWvWtps517w--~B/Zmk9ZmlsbDtoPTE0MDtweW9mZj0wO3c9MTQwO2FwcGlkPXl0YWNoeW9u/https://s.yimg.com/os/creatr-uploaded-images/2024-03/45bf03b0-e17a-11ee-bf9d-ece364da95b9>', 'width': 140, 'height': 140, 'tag': '140x140'}]}
@Edgar Ramírez (Arch.dev) Similarly for
target-snowflake
I get the same issue. Here’s another json example:
Copy code
{
  "id": "zoodao",
  "symbol": "zoo",
  "name": "ZooDAO",
  "platforms": {
    "ethereum": "0x09f098b155d561fc9f7bccc97038b7e3d20baf74",
    "fantom": "0x1ac0c9592e2480649e9471c1548f60564b37a46b",
    "moonbeam": "0x7cd3e6e1a69409def0d78d17a492e8e143f40ec5",
    "arbitrum-one": "0x1689a6e1f09658ff37d0bb131514e701045876da"
  }
}
All I want to do is return the raw json. There’s not a simple functionality in meltano schema to do this? Is it loader specific?
hey @Edgar Ramírez (Arch.dev) just checking in to see if you have any ideas or can tag someone that may be an expert with snowflake / bigquery loaders
e
@matt_elgazar is it perhaps the same issue being described in https://meltano.slack.com/archives/C069CQNHDNF/p1711472708208839?
m
Hmm, yea sounds like it. I still haven’t figured out a clear solution to define a JSON schema that works across both snowflake and bigquery loaders. Actually I haven’t even been able to get it to work in either warehouse
Hey @Edgar Ramírez (Arch.dev) just an update on this. I was able to get snowflake and jsonl loaders working for basic json records with the below snippet but it still fails for target-bigquery.
Copy code
SCHEMA = <schema dictionary>
json_id_fields = ["json_field1", "json_field2"]

CUSTOM_JSON_SCHEMA = {'additionalProperties': True, 'description': 'Custom JSON typing.', 'type': ['object', 'null']}

for field in json_id_fields:
    SCHEMA["properties"][field] = CUSTOM_JSON_SCHEMA
This works for simple json structures such as
{'hi': 'bye', 'no': 2}
however it failed for an array of jsons. In that case I was able to get it working for jsonl and snowflake using this approach
Copy code
th.ArrayType(th.CustomType({"anyOf": [{"type": "object"}, {"type": "array"}, {"type": "null"}, {}, [{}]]}))
^^ However, this works for jsonl complex array of jsons like the one below but fails for snowflake and bigquery as well:
Copy code
"tickers": [{"base": "FTM", "target": "USDT", "market": {"name": "DigiFinex", "identifier": "digifinex", "open": 123}]
Another concern is that it will fail for that field if one record returns an array of jsons and another record returns a simple json. Is there a standardized approach to define json schema types that work across loaders? Both methods above fail for target-bigquery. I’ve opened an issue on their github but no response.
Also side topic - but do you know if there is a staging option in the meltano-labs target-snowflake repo? Not 100% sure, but I believe that it’s best practice (and cost efficient) to load data into a bucket and then call
PUT
when dealing with snowflake like they do in the pipeline-wise variant. Is this a feature?
e
Is there a standardized approach to define json schema types that work across loaders?
For the specific case of variant types that can be any valid json value (string, integer, object, etc.) I don't think there is. The SDK decided early on to disallow empty field schemas, and it's even unclear that most can process such values. If there's a way the SDK would make this easier for you, e.g. by allowing such field schemas, do log an issue. I suspect it may be fine to allow developers to (very rarely) shoot themselves in the foot but give them a bit more flexibility if they know targets accept the schemas in question.
do you know if there is a staging option in the meltano-labs target-snowflake repo?
I think this path uses a stage to load data but I could be wrong: https://github.com/MeltanoLabs/target-snowflake/blob/6672878d3aa193cdcaec61d8de9a2493590cdc32/target_snowflake/connector.py#L408-L416
m
Hmm… What’s your recommended approach on how to set the schema for a json field (or array of jsons)? It is definitely making this EL process overly time consuming since the loaders are constantly failing on json fields
e
If I control the tap, I would look at normalizing the field. For example, if the values are either a single JSON object, or an array of JSON objects, I would use the SDK's
post_process
method to conform them all to be arrays of objects, and I would declare the schema as
{"my_field": {"type": "array", "items": {"type": "object"}}}
.
m
When you say normalize do you mean flatten the json record? If there are over 1000 keys or keys that show up inconsistently I don’t know if flattening it is feasible. Or do you mean do something like this?
Copy code
def post_process( self, row: dict, context: dict | None = None) -> dict | None:
        row["json_field"] = str(row["json_field"])
        return row
Or even
Copy code
row["json_field"] = [row["json_field"]]
and schema set to
{"json_field": {"type": "array", "items": {"type": "object"}}}
I’ve looked through a few of your taps as well. I notice you’re often not defining
get_records
or
request_records
and not yielding a json response. Noob question but how does the data get passed without returning or yielding a record?
e
It's a good question. Most of my taps are REST sources, and they can get away with relying on the behavior inherited from
RESTStream
. So they use the default request_records and get_records. One goal of the SDK's API is to reduce the amount of code developers need to implement. Ideally, you only need to declare tap configuration, stream names, schemas and some metadata (PKs, replication). If you ever need to override methods like those two, it means it's either a gap in the SDK or your source is really different and truly needs an ad-hoc implementation.
m
Interesting! I’ll have to play around with that idea. I think I’m doing things too manually. Definitely open to your suggestions on optimizing the tap I’m working on. If you have a minute would you be able to take a look and suggest how to handle JSON schemas and best practices for how to do avoid doing things too manually? For every single stream I’m getting `Could not append type because the JSON schema for the dictionary
{}
appears to be invalid.`
Here’s the tap I’ve been working on https://github.com/melgazar9/tap-coingecko
👀 1
e
For starters I think you could something like:
Copy code
class CoinListStream(CoingeckoStream):
    """Coingecko Coin-List Stream of Tickers."""

    name = "coin_list"
    path = "/coins/list"
    replication_key = None

    schema = th.PropertiesList(
        th.Property("id", th.StringType, description="Coingecko ticker ID"),
        th.Property(
            "symbol", th.StringType, description="Coingecko symbol / ticker"
        ),
        th.Property("name", th.StringType, description="Coingecko product name"),
        th.Property("platforms", th.CustomType(CUSTOM_JSON_SCHEMA), description="Coingecko platforms"),
    ).to_dict()
instead of a schema property and setter.
🙌 1