Skip to main content
Quick navigation

Schema Payload Validation

Introduction

Ensuring that all records sent through your Kafka system conform to an agreed structure enables your Kafka consumers to confidently and independently access data. Records with missing or invalid schemas can cause application outages, as consumers may be unable to process the unexpected record format.

More than this though, the use of schemas can broadly only assert structural correctness of data and a level of compatibility for those structures. When it comes to the values in a record often all you can assert is a basic data type (integer, string, double etc.). So while the use of a valid schema solves some concerns around data quality, there are other concerns that still have to be dealt with in a bespoke or distributed manner across all the clients of a given data type.

As a final point, while correct structure can be enforced in a Kafka ecosystem at a client level - each client needs to ensure that it knows and follows the expectations for the data. You cannot prevent one client correctly writing AVRO to a topic, while another one writes plain JSON to the same topic. If the second client doesn't know the rules it can't follow them.

Centralized Policy Enforcement

The Schema validation interceptor comes into play here, and provides functionality which you can configure once in your Kafka system on the source for data (a topic) to ensure that:

  • All records produced to Kafka have a schema set
  • The record contents adhere to that schema
  • The fields (values) in any given record comply to business validation rules you have set in the schema

This policy provides a centralized enforcement of the validation of these rules at the point of write to Kafka. This enforcement cannot be bypassed or ignored by a client, so provides a strong guarantee that data actually written in to Kafka does match your rule set.

Learn more about the Schema Registry and Schemas here

How does the Policy Work?

The policy operates on Produce Requests made to kafka, and will inspect the entire batch of records in a request. Based on its setup, it performs various checks and then will take an action if it finds any problems.

The first important thing to note is that the Policy will do nothing if there is no Audit Log configured for the gateway (as it does not want to silently reject any data). So for the policy to work at all, you must have the Audit Log configured.

Next point of note is that the policy will only check the value for a kafka record, and does not currently support checking the key or headers for a record.

The core config values for the policy itself are:

  • topic : the topic/s to apply the rule to
  • schemaIdRequired : whether records must/must not have a schema assigned to them
  • validateSchema : whether the policy should check if the data for the record matches the schema found for the record.
  • action : what to do if a problem is found

There are three levels of check you can apply:

SetupEffect
schemaIdRequired = falseEnsures that no records have a schema!
schemaIdRequired = true, validateSchema = falseEnsures that records have a valid schema set, and that schema exists in the Schema Registry. Does not check whether the value actually matches the schema though
schemaIdRequired = true, validateSchema = trueEnsures that records have a valid schema set, that schema exists in the Schema Registry and that the value in the record matches the schema. This includes any data validation rules in the schema (see below) as well as a structural check

Action

If any problems are found, the policy will take an action as configured. The action can be set to one of:

  • BLOCK → If any records in the batch fail the policy checks, record the problems in the audit log and then return an error to the client failing the entire batch. No records are written to kafka at all if at least one of the records in the batch is considered invalid.
  • INFO → In this mode the data is always written to Kafka whether it passes the checks or not - but any problems found recorded in the audit log.
  • THROTTLE → If any records in the batch fail the policy checks, the data is still written to Kafka but the request will be throttled with time = throttleTimeMs, forcing the client to back off. Any problems found are recorded in the audit log.

Dead Letter Topic

If a dead letter topic service is configured for the Gateway, then you can optionally supply a topic name for this policy to use for any records which are considered invalid. This topic will be created with the default config for your Kafka setup.

Any record which the policy considers invalid is written to the dead letter topic, and has some headers added for audit purposes. Please note that this is done in the AUDIT_LOG_ONLY mode also, even though the records in this mode are still written to the "real" topic.

HeaderMessage
X-ERROR-MSGDescription of the reason for the policy failure
X-TOPICThe topic the message was intended to be written to
X-PARTITIONThe partition of that topic the message was intended to be written to

The generation of these headers can be disabled if required, through the addErrorHeader configuration parameter (defaults to true).

If no deadLetterTopic is configured for the policy, then no messages will be written out in this manner.

Configuration

The full configuration topics for the policy are as below.

nametypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied
schemaIdRequiredBooleanfalseRecords must/must not have schemaId
validateSchemaBooleanfalseIf true, deserialize the record, validate the record structure and fields within the data itself (see more)
actionBLOCK, INFO, THROTTLEBLOCKAction to take if the value is outside the specified range.
schemaRegistryConfigSchema RegistryN/ASchema Registry Config
celCacheSizeint100In memory cache size for cel expressions, balancing speed and resource use, optimize performance.
deadLetterTopicStringDead letter topic. Not used if this parameter is not set.
addErrorHeaderBooleantrueAdd or not add the error information headers into dead letter topic
throttleTimeMsint100Value to throttle with (only applicable when action is set to THROTTLE).

Schema Registry

keytypedefaultdescription
hostStringUrl of schema registry
cacheSizeString50This interceptor caches schemas locally so that it doesn't have to query the schema registry
additionalConfigsmapAdditional properties maps to specific security related parameters. For enhanced security, you can use the template ${MY_ENV_VAR} in map values, then define their actual values in the environmental config variables of Gateway. (eg: -e MY_ENV_VAR=someValue)

See more about schema registry here

Example

{
"name": "mySchemaIdValidationInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin",
"priority": 100,
"config": {
"topic": "topic_1.*",
"schemaIdRequired": true,
"validateSchema": true,
"schemaRegistryConfig": {
"host": "http://schema-registry:8081"
},
"action": "BLOCK",
"celCacheSize": 100
}
}

Schema Registry with secured template

{
"name": "mySchemaIdValidationInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin",
"priority": 100,
"config": {
"topic": "topic_1.*",
"schemaIdRequired": true,
"validateSchema": true,
"schemaRegistryConfig": {
"host": "http://schema-registry:8081",
"additionalConfigs": {
"schema.registry.url": "${SR_URL}",
"basic.auth.credentials.source": "${SR_BASIC_AUTH_CRED_SRC}",
"basic.auth.user.info": "${SR_BASIC_AUTH_USER_INFO}"
}
},
"action": "BLOCK",
"celCacheSize": 100
}
}

Schema Payload Validations

When configured to do so, the Schema Validation interceptor supports validating the value in a Kafka record against a specific set custom constraints for AvroSchema records. This is similar to the validations provided by JsonSchema, such as:

For fields in an Avro schema, you can specify specific constraints on what is considered a correct value. These rules operate on the specific fields value only.

  • INT, LONG, FLOAT, DOUBLE: minimum, maximum, exclusiveMinimum, exclusiveMaximum, multipleOf
  • STRING: minLength, maxLength, pattern, format
  • ARRAY: maxItems, minItems

Current supported String format values:

  • byte, date, time, date-time, duration, uri, uri-reference, uri-template, uri, email, hostname, ipv4, ipv6, regex, uuid, json-pointer, json-pointer-uri-fragment, relative-json-pointer

This interceptor also supports further validating elements from the whole payload against specific custom constraints - or Metadata Rules - using an expression based on the CEL (Common Expression Language) format. This provides a means to define more advanced rules dependent on multiple values in a record.

Metadata rule

keytypedescription
namestringRule name
expressionstringCEL expression for validation, must return BOOLEAN
messagestringError message if payload not matches the expression with namespace message. represents for produced message

Json Schema Example

In Json Schema, constraints and rules are defined directly in the schema. Here's an example that includes various validations:

{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"name": {
"type": "string",
"minLength": 3,
"maxLength": 50,
"expression": "size(name) >= 3"
},
"age": {
"type": "integer",
"minimum": 0,
"maximum": 120,
"expression": "age >= 0 && age <= 120"
},
"email": {
"type": "string",
"format": "email",
"expression": "email.contains('foo')"
},
"address": {
"type": "object",
"properties": {
"street": {
"type": "string",
"minLength": 5,
"maxLength": 10,
"expression": "size(street) >= 5 && size(street) <= 10"
},
"city": {
"type": "string",
"minLength": 2,
"maxLength": 50
}
},
"expression": "size(address.street) > 1 && address.street.contains('paris') || address.city == 'paris'"
},
"hobbies": {
"type": "array",
"items": {
"type": "string"
},
"minItems": 3,
"expression": "size(hobbies) >= 3"
}
},
"metadata": {
"rules": [
{
"name": "check hobbies size and name",
"expression": "size(message.hobbies) == 3 && size(message.name) > 3",
"message": "hobbies must have 3 items"
},
{
"name": "checkAge",
"expression": "message.age >= 18",
"message": "age must be greater than or equal to 18"
},
{
"name": "check email",
"expression": "message.email.endsWith('yahoo.com')",
"message": "email should end with 'yahoo.com'"
},
{
"name": "check street",
"expression": "size(message.address.street) >= 3",
"message": "address.street length must be greater than equal to 3"
}
]
}
}

Avro Schema Example

In Avro, constraints and rules are defined directly in the schema. Here's an example that includes various validations:

{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string", "minLength": 3, "maxLength": 50, "expression": "size(name) >= 3 && size(name) <= 50"},
{"name": "age", "type": "int", "minimum": 0, "maximum": 120, "expression": "age >= 0 && age <= 120"},
{"name": "email", "type": "string", "format": "email", "expression": "email.contains('foo')"},
{
"name": "address",
"type": {
"type": "record",
"name": "AddressRecord",
"fields": [
{"name": "street", "type": "string", "minLength": 5, "maxLength": 100, "expression": "size(street) >= 5 && size(street) <= 10"},
{"name": "city", "type": "string", "minLength": 2, "maxLength": 50}
]
},
"expression": "size(address.street) >= 5 && address.street.contains('paris') || address.city == 'paris'"
},
{"name": "hobbies", "type": {"type": "array", "items": "string"}, "minItems": 3, "expression": "size(hobbies) >= 3"},
{
"name": "friends",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "Friend",
"fields": [
{"name": "name", "type": "string", "expression": "size(name) < 3"},
{"name": "age", "type": "int", "minimum": 2, "maximum": 10}
]
}
}
}
],
"metadata": {
"rules": [
{
"name": "check hobbies size and name",
"expression": "size(message.hobbies) == 3 && size(message.name) > 3",
"message": "hobbies must have 3 items"
},
{
"name": "checkAge",
"expression": "message.age >= 18",
"message": "age must be greater than or equal to 18"
},
{
"name": "check email",
"expression": "message.email.endsWith('yahoo.com')",
"message": "email should end with 'yahoo.com'"
},
{
"name": "check street",
"expression": "size(message.address.street) >= 3",
"message": "address.street length must be greater than equal to 3"
}
]
}
}