Hi all. I’m configuring the Airbyte variant of`tap...
# troubleshooting
g
Hi all. I’m configuring the Airbyte variant of`tap-kafka` . Unfortunately, all record values are empty (see below) even though the number of records seems correct. It is the case for both JSON and AVRO topics. Are you aware of an issue with the airbyte wrapper and the Airbyte kafka connector or would you bet on a configuration issue on my side?
Copy code
{"type":"RECORD","stream":"my-topic"record":{},"time_extracted":"2023-04-12T14:05:57.915891+00:00"}
This is my config
Copy code
airbyte_config:
        protocol:
          sasl_jaas_config: org.apache.kafka.common.security.plain.PlainLoginModule
            required username="<redacted>" password="<redacted>";
          security_protocol: SASL_SSL
          sasl_mechanism: PLAIN
        MessageFormat:
          schema_registry_url: https://<redacted>.eu-central-1.aws.confluent.cloud
          schema_registry_username: <redacted>
          schema_registry_password: <redacted>
          deserialization_type: AVRO
          deserialization_strategy: TopicNameStrategy
        bootstrap_servers: <redacted>.eu-west-1.aws.confluent.cloud:9092
        subscription:
          topic_pattern: my-avro-topic
          subscription_type: subscribe
        auto_offset_reset: earliest        
        client_dns_lookup: use_all_dns_ips
        enable_auto_commit: false        
        receive_buffer_bytes: -1
        polling_time: 2000
a
What do the SCHEMA messages look like? Are you invoking the tap and letting it write to stdout? The schema messages should be emitted first.
I think you can also introspect the generated catalog which is a product of the airbyte discover protocol and translated minimally to singer.
l
That's what I was thinking we should do but I can't find how to introspect it
g
Hey Alex. I’m invoking the tap yes.
a
That would be in your .meltano/run dir probably
SCHEMA messages are emitted at top of stdout during the taps invocation. They are a product of the catalog.
If you are getting to many records to see it either > the output to a file or pipe it to
head
Last thing. You can also run discovery yourself.
meltano invoke tap-… —discover
which does the airbyte discovery and minimal translation outputting to stdout.
g
We have the following:
Copy code
Properties ... were present in the 'my-avro-topic' stream but not found in catalog schema. Ignoring.
a
These are sort of the typical troubleshooting paths if that helps. It should feel like a singer tap on how you interface with it from the CLI
Ok so check the SCHEMA messages and/or the catalog. We will probably see an empty-ish schema
g
The schema seems empty indeed
Copy code
"schema": {
        "properties": {
          "value": {
            "type": "string"
          }
        },
        "type": "object"
      },
l
It looks like the value property of a kafka message. Could it be that we are not using the schema from the schema registry 😕
a
It looks like it is expecting all the data to show up under a key called
value
?
l
yes, it looks to me that it could be the
value
of the KafkaMessage
a
So I wager your actual output doesn’t look like that maybe. There is a hacky way to test I think
https://sdk.meltano.com/en/latest/classes/singer_sdk.Stream.html#singer_sdk.Stream.TYPE_CONFORMANCE_LEVEL Open the tap-airbyte venv folder, find the class that inherits from Stream. And override this property so it is set to 0. It’s easy to do and should stop the SDK from pruning your messages.
Ultimately it’s a call to a method called
pop_deselected_record_properties
or something like that we are trying to avoid in order to introspect the actual structure of the messages without the sdk mutating them. Changing that type conformance value, IIRC, is a proxy for disabling that. If not you can override that method/function directly. Again this is just for the purposes of introspection :)
l
It took a while to find it but I think we just did it
shall we do a normal run after that change?
a
Yes see if non mutated messages are pushed to stdout and you can physically verify structure
They should not be pruned by the SDK now
l
we can't see any non-mutated messages
oh, we changed the wrong class actually
a
We have the following:
Copy code
Properties ... were present in the 'my-avro-topic' stream but not found in catalog schema. Ignoring.
For context. I am helping you prevent this message which itself indicates the SDK is mutating/pruning your RECORD messages of all properties except
value
This is so we can see the actual records and their key/values as replicated by Airbyte BEFORE meltano does it’s conformance.
The end result ultimately is that the schema / catalog is wrong. It is either an issue in the configuration/schema registry OR an issue with the Airbyte source and how it uses the schema registry during
discover
I don’t know enough about the internals of that source to help. But I can point you in that direction at least. Also if removing the pruning actually helps, we can make that configurable in tap-airbyte itself as a sort of user override. I disable it myself on my own taps since I often use schemaless (no schema needed up front) destinations and don’t need to waste cycles on pointless checks.
oh, we changed the wrong class actually
The class should be in
tap.py
I think the whole wrapper fits in one py file so you will see the class that inherits from Stream. Probably near the bottom.
l
yes, we changed it there now and it had no impact. We are making sure we changed it in the right env
in any case, I think you might be right about the schema registry and issues with the discovery.
ok, we did modify the right one and it seems it has no impact
a
Yeah if setting that to
TypeConformanceLevel.NONE
does not fix it. You may need to figure out where that pop deselected function is called. That’s really the culprit. I was hoping the type conformance would disable that call but it seems not 😢 If your python savvy and you are in this deep, it would not be much more work to find the method where that function is called and override it removing the call. Up to you though.
l
We will try to do that, thanks for the help!
a
It’s named this
pop_deselected_record_properties
And called in the stream class Yeah NP, and good luck
l
We were overriding in the wrong place. I managed to skip the conformity checks but it's not based on TYPE_CONFORMANCE_LEVEL anymore. Now we are facing a different issue with the s3 target. I think the schema is applied there too when trying to fix the types of the properties
SO we need to figure out how to make this work with the schema registry
we might provide a schema manually just to see if everything works. Thanks for your pointers, it really helped us to navigate the code
It's failing in the
_validate_and_parse
method of the sync
a
NP, Yes so you can use Meltano "extras" to overlay an update to the catalog schema https://docs.meltano.com/concepts/plugins#schema-extra This might solve on both the tap + target side until you get a better resolution for the schema registry, whether its an issue with the airbyte source or the config
l
thanks @alexander_butler that will definitely help. We found that the airbyte source is the problem. It doesn't use the information from the schema registry to create a schema, it returns a random one. The schema extras will work for us until with patch the source
e
hi, I am facing the same issue. @luis_vicente I see that you have succeeded to use "schema extras". Can you confirm that you haven't provided manually the catalog (in meltano.yml) https://docs.meltano.com/concepts/plugins/#catalog-extra?