Schema Validation with Pyflink and Kafka’s Schema Registry

  Kiến thức lập trình

I was able register my expected schema for the Kafka topic for the records arriving using the below command:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  --data '{"schema": "{"type":"record","name":"UserData","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"user_id","type":"int"},{"name":"transaction_timestamp_millis","type":"double"},{"name":"amount","type":"double"},{"name":"currency","type":"string"},{"name":"counterpart_id","type":"int"}]}"}' 
  http://localhost:8081/subjects/user-data/versions  

Now, I am trying to validate the incoming record values with the already registered schema. So, I tried this:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" 
  --data '{"schema": "{"type":"record","name":"UserData","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"user_id","type":"float"},{"name":"transaction_timestamp_millis","type":"double"},{"name":"amount","type":"double"},{"name":"currency","type":"string"},{"name":"counterpart_id","type":"int"}]}"}' 
  http://localhost:8081/compatibility/subjects/user-data/versions/latest

But it gives me the below response:

{"is_compatible":true}  

I was expecting it as false since the data type for the user_id field passed in the new request is float but I was expecting it to be int.

But since I want to validate my incoming record for the expected schema so I tried second approach and referred to this link

docker exec -it 5a7990c6f769 kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic lets-tests --config confluent.value.schema.validation=true --config confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.user-data

But it gives me the below error:

Error while executing topic command : The server experienced an unexpected error when processing the request.
[2024-04-27 06:51:34,766] ERROR org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request.
 (kafka.admin.TopicCommand$)

I have ensured that all the services are up and running except ksql and relevant services.

Also, my expectation is I don’t want to terminate my producer session if the incoming record is not as per the expectation. Since I am using PyFlink, I was planning to park such records aside and send them to another topic that can be evaluated later.

So where am I going wrong? What could be the best way here? Any help is much appreciated. TIA

LEAVE A COMMENT