Skip to main content
Quick navigation

What is a Schema Payload Validation Policy Interceptor?

Avoid outages from missing or badly formatted records, ensure all messages adhere to a schema.

This interceptor also supports validating payload against specific constraints for AvroSchema and Protobuf.

This is similar to the validations provided by JsonSchema, such as:

  • Number: minimum, maximum, exclusiveMinimum, exclusiveMaximum, multipleOf
  • String: minLength, maxLength, pattern, format
  • Collections: maxItems, minItems

View the full demo in realtime

You can either follow all the steps manually, or watch the recording

Review the docker compose environment

As can be seen from docker-compose.yaml the demo environment consists of the following services:

  • gateway1
  • gateway2
  • kafka-client
  • kafka1
  • kafka2
  • kafka3
  • schema-registry
  • zookeeper
cat docker-compose.yaml

Starting the docker environment

Start all your docker processes, wait for them to be up and ready, then run in background

  • --wait: Wait for services to be running|healthy. Implies detached mode.
  • --detach: Detached mode: Run containers in the background
docker compose up --detach --wait

Creating virtual cluster teamA

Creating virtual cluster teamA on gateway gateway1 and reviewing the configuration file to access it

# Generate virtual cluster teamA with service account sa
token=$(curl \
--request POST "http://localhost:8888/admin/vclusters/v1/vcluster/teamA/username/sa" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data-raw '{"lifeTimeSeconds": 7776000}' | jq -r ".token")

# Create access file
echo """
bootstrap.servers=localhost:6969
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='sa' password='$token';
""" > teamA-sa.properties

# Review file
cat teamA-sa.properties

Creating topics topic-json,topic-avro,topic-protobuf on teamA

Creating on teamA:

  • Topic topic-json with partitions:1 and replication-factor:1
  • Topic topic-avro with partitions:1 and replication-factor:1
  • Topic topic-protobuf with partitions:1 and replication-factor:1
kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--replication-factor 1 \
--partitions 1 \
--create --if-not-exists \
--topic topic-json
kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--replication-factor 1 \
--partitions 1 \
--create --if-not-exists \
--topic topic-avro
kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--replication-factor 1 \
--partitions 1 \
--create --if-not-exists \
--topic topic-protobuf

Adding interceptor guard-schema-payload-validate

Add Schema Payload Validation Policy Interceptor

Creating the interceptor named guard-schema-payload-validate of the plugin io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin",
"priority" : 100,
"config" : {
"schemaRegistryConfig" : {
"host" : "http://schema-registry:8081"
},
"topic" : "topic-.*",
"schemaIdRequired" : true,
"validateSchema" : true,
"action" : "BLOCK"
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/guard-schema-payload-validate" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-07-guard-schema-payload-validate.json | jq

Listing interceptors for teamA

Listing interceptors on gateway1 for virtual cluster teamA

curl \
--request GET 'http://localhost:8888/admin/interceptors/v1/vcluster/teamA' \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent | jq

Review the example json schema

Review the example json schema

cat user-schema.json

Review the example avro schema

Review the example avro schema

cat user-schema.avsc

Review the example protobuf schema

Review the example protobuf schema

cat user-schema.proto

Let's register these schemas to the Schema Registry

echo jsonSchemaId = $(curl -s -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "{\"schemaType\": \"JSON\", \"schema\": $(cat user-schema.json | jq tostring)}" \
http://localhost:8081/subjects/topic-json/versions)

echo avroSchemaId = $(curl -s -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "{\"schemaType\": \"AVRO\", \"schema\": $(cat user-schema.avsc | jq tostring)}" \
http://localhost:8081/subjects/topic-avro/versions)

echo protobufSchemaId = $(curl -s -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "{\"schemaType\": \"PROTOBUF\", \"schema\": $(cat user-schema.proto | jq -Rs .)}" \
http://localhost:8081/subjects/topic-protobuf/versions)

Let's asserts number of registered schemas

echo nb schemas = $(curl --silent http://localhost:8081/subjects/ | jq 'length')

Let's produce invalid payload to the json schema

echo '{"name":"Hi","age":7,"email":"john.doecom","address":{"street":"123 Main St","city":"a"},"hobbies":["reading","cycling"]}' | \
kafka-json-schema-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--topic topic-json \
--property schema.registry.url=http://localhost:8081 \
--property value.schema.id=1

Let's produce invalid payload to the avro schema

echo '{"name":"Hi","age":7,"email":"john.doe@example.com","address":{"street":"123 Main St","city":"Anytown"},"hobbies":["reading","cycling"],"friends":[{"name":"Friend1","age":17},{"name":"Friend2","age":18}]}' | \
kafka-avro-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--topic topic-avro \
--property schema.registry.url=http://localhost:8081 \
--property value.schema.id=2

Check in the audit log that message was denied

Check in the audit log that message was denied in cluster kafka1

kafka-console-consumer \
--bootstrap-server localhost:19092,localhost:19093,localhost:19094 \
--topic _auditLogs \
--from-beginning \
--timeout-ms 3000 \
| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin")'

returns

Processed a total of 13 messages
{
"id": "42ff3bf1-9ec7-4a5e-9208-27bdf5f57809",
"source": "krn://cluster=TtF1fia0TJ-2B2bjRog-gg",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:58866"
},
"specVersion": "0.1.0",
"time": "2024-02-14T03:12:17.693833427Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin",
"message": "Request parameters do not satisfy the configured policy. Topic 'topic-avro' has invalid avro schema payload: name is too short (2 < 3), street is too long (11 > 10), hobbies has too few items (2 < 3), age is greater than 10, age is greater than 10"
}
}

Let's produce invalid payload to the protobuf schema

echo '{"name":"Hi","age":7,"email":"john.doe@example.com","address":{"street":"123 Main St","city":"Anytown"},"hobbies":["reading","cycling"],"friends":[{"name":"Friend1","age":17},{"name":"Friend2","age":18}]}' | \
kafka-protobuf-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--topic topic-protobuf \
--property schema.registry.url=http://localhost:8081 \
--property value.schema.id=3

Check in the audit log that message was denied

Check in the audit log that message was denied in cluster kafka1

kafka-console-consumer \
--bootstrap-server localhost:19092,localhost:19093,localhost:19094 \
--topic _auditLogs \
--from-beginning \
--timeout-ms 3000 \
| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin")'

returns

Processed a total of 16 messages
{
"id": "42ff3bf1-9ec7-4a5e-9208-27bdf5f57809",
"source": "krn://cluster=TtF1fia0TJ-2B2bjRog-gg",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:58866"
},
"specVersion": "0.1.0",
"time": "2024-02-14T03:12:17.693833427Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin",
"message": "Request parameters do not satisfy the configured policy. Topic 'topic-avro' has invalid avro schema payload: name is too short (2 < 3), street is too long (11 > 10), hobbies has too few items (2 < 3), age is greater than 10, age is greater than 10"
}
}
{
"id": "b6d3c6e8-c3f8-4f86-914c-046d793a0f30",
"source": "krn://cluster=TtF1fia0TJ-2B2bjRog-gg",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:58872"
},
"specVersion": "0.1.0",
"time": "2024-02-14T03:12:24.271040805Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin",
"message": "Request parameters do not satisfy the configured policy. Topic 'topic-protobuf' has invalid protobuf schema payload: Student.name is too short (2 < 3), Student.Address.street is too long (11 > 10), Student.Friend.age is greater than 10, Student.Friend.age is greater than 10"
}
}

Tearing down the docker environment

Remove all your docker processes and associated volumes

  • --volumes: Remove named volumes declared in the "volumes" section of the Compose file and anonymous volumes attached to containers.
docker compose down --volumes

Conclusion

Safeguard is really a game changer!