id
is a valid UUID formatamount_cents
is a positive integer and not too largecurrency
is one of your accepted currenciesorder_date
is in ISO 8601 formatSELECT [ignored!] FROM [topic name] WHERE [field filter criteria]
Only one topic can be specified in the FROM clause (joins will be ignored), and the topic name is matched explicitly (no regexp support). If a record does not match the WHERE clause, it will be rejected. There are a variety of options for this described in the actions below. Fields are assumed to be from the value of the record. The Interceptor currently supports values in JSON, AVRO and Protobuf formats.
-
characters in them must be double quoted, as the dash is not a valid character for a SQL name. E.g. for a topic our-orders
you would need to use:SELECT * FROM "our-orders" WHERE ...
Nested fields can be accessed as expected with dot notation in the WHERE clause, e.g.:address.street = 'Electric Avenue'
=, >, >=, <, <=, <>
and REGEXP
(RegExp MySQL Operator)AND
IN
clause is not supported, but can be approximated with a RegExp.. WHERE record.key = 'some thing'
.. WHERE record.key.someValue.someChildValue = 'some thing'
.. WHERE record.partition = 1
.. WHERE record.timestamp = 98717823712
.. WHERE record.header.someHeaderKey = 'some thing'
.. WHERE record.offset = 1
Action | Description |
---|---|
BLOCK_WHOLE_BATCH | If any records in the batch are invalid, then block the whole batch. The produce request will fail for the client in this case. |
BLOCK_ONLY_INVALID_RECORDS | Any records that are invalid are removed from the batch, and all the valid records in the batch are saved in Kafka. If any records are written, a success response for that is returned to the client. When every message in the batch is invalid, an error is returned to the client. |
AUDIT_LOG_ONLY | For any records in the produce request which are invalid, record this in the audit log only. All records still are saved in Kafka |
THROTTLE | If any records in the produce request are invalid, throttle the producer for a certain amount of time (throttleTimeMs ). All records are still saved in Kafka. |
AUDIT_LOG_ONLY
mode, even though the records in this mode are still written to the “real” topic.
Header | Message |
---|---|
X-ERROR-MSG | Message does not match the statement [ …] |
X-TOPIC | The topic that the message was intended to be written to |
X-PARTITION | The partition of the topic the message was intended to be written to |
addErrorHeader
configuration parameter (defaults to true
).
If no deadLetterTopic
is configured for the policy, no messages will be written out in this manner.
Key | Type | Description |
---|---|---|
statement | String | SQL statement |
schemaRegistryConfig | Schema Registry | Schema registry config |
action | Action | Data quality producer action |
deadLetterTopic | String | Dead letter topic |
addErrorHeader | boolean (default true ) | Adds the error information headers into dead letter topic |
throttleTimeMs | int (default: 100) | Value to throttle with (only applicable when action is set to THROTTLE ). |
Key | Type | Default | Description |
---|---|---|---|
type | string | CONFLUENT | The type of schema registry to use: choose CONFLUENT (for Confluent-like schema registries including OSS Kafka) or AWS for AWS Glue schema registries. |
additionalConfigs | map | Additional properties to map to specific security-related parameters. For enhanced security, you can hide the sensitive values using environment variables as secrets. | |
Confluent Like | Configuration for Confluent-like schema registries | ||
host | string | URL of your schema registry. | |
cacheSize | string | 50 | Number of schemas that can be cached locally by this Interceptor so that it doesn’t have to query the schema registry every time. |
AWS Glue | Configuration for AWS Glue schema registries | ||
region | string | The AWS region for the schema registry, e.g. us-east-1 | |
registryName | string | The name of the schema registry in AWS (leave blank for the AWS default of default-registry) | |
basicCredentials | string | Access credentials for AWS (see below section for structure) | |
AWS credentials | AWS credentials configuration | ||
accessKey | string | The access key for the connection to the schema registry. | |
secretKey | string | The secret key for the connection to the schema registry. | |
validateCredentials | bool | true | true / false flag to determine whether the credentials provided should be validated when set. |
accountId | string | The Id for the AWS account to use. |
basicCredentials
section for the AWS Glue schema registry, the client we use to connect will instead attempt to find the connection information it needs from the environment and the credentials required can be passed this way to Gateway as part of its core configuration. Find out more from AWS documentation.
Read our blog about schema registry.
Action | Description |
---|---|
BLOCK_WHOLE_BATCH | If one message is invalid, block the whole batch |
BLOCK_ONLY_INVALID_RECORDS | If one message is invalid, block only the invalid message (all other messages in the batch are saved in Kafka) |
AUDIT_LOG_ONLY | If messages are invalid, audit log only (all messages still are saved in Kafka) |
THROTTLE | If messages are invalid, throttle the producer for a certain amount of time (throttleTimeMs ) |
topic
: the topic/s to apply the rule toschemaIdRequired
: whether records must/must not have a schema assigned to themvalidateSchema
: whether the policy should check if the data for the record matches the schema found for the record.action
: what to do if a problem is foundSetup | Effect |
---|---|
schemaIdRequired = false | Ensures that no records have a schema! |
schemaIdRequired = true , validateSchema = false | Ensures that records have a valid schema set, and that schema exists in the Schema Registry. Does not check whether the value actually matches the schema though |
schemaIdRequired = true , validateSchema = true | Ensures that records have a valid schema set, that schema exists in the Schema Registry and that the value in the record matches the schema. This includes any data validation rules in the schema (see below) as well as a structural check |
action
can be set to one of:
BLOCK
→ If any records in the batch fail the policy checks, record the problems in the audit log and then return an error to the client failing the entire batch. No records are written to Kafka at all if at least one of the records in the batch is considered invalid.INFO
→ In this mode the data is always written to Kafka whether it passes the checks or not - but any problems found recorded in the audit log.THROTTLE
→ If any records in the batch fail the policy checks, the data is still written to Kafka but the request will be throttled with time = throttleTimeMs
, forcing the client to back off. Any problems found are recorded in the audit log.AUDIT_LOG_ONLY
mode also, even though the records in this mode are still written to the “real” topic.
Header | Message |
---|---|
X-ERROR-MSG | Description of the reason for the policy failure |
X-TOPIC | The topic the message was intended to be written to |
X-PARTITION | The partition of that topic the message was intended to be written to |
addErrorHeader
configuration parameter (defaults to true
).
If no deadLetterTopic
is configured for the policy, then no messages will be written out in this manner.
Name | Type | Default | Description |
---|---|---|---|
topic | String | .* | Topics that match this regex will have the Interceptor applied |
schemaIdRequired | Boolean | false | Records must/must not have schemaId |
validateSchema | Boolean | false | If true, deserialize the record, validate the record structure and fields within the data itself. |
action | BLOCK , INFO , THROTTLE | BLOCK | Action to take if the value is outside the specified range. |
schemaRegistryConfig | Schema registry | N/A | Schema registry Config |
celCacheSize | int | 100 | In memory cache size for cel expressions, balancing speed and resource use, optimize performance. |
deadLetterTopic | String | Dead letter topic. Not used if this parameter is not set. | |
addErrorHeader | Boolean | true | Add or not add the error information headers into dead letter topic |
throttleTimeMs | int | 100 | Value to throttle with (only applicable when action is set to THROTTLE ). |
Key | Type | Default | Description |
---|---|---|---|
type | string | CONFLUENT | The type of schema registry to use: choose CONFLUENT (for Confluent-like schema registries including OSS Kafka) or AWS for AWS Glue schema registries. |
additionalConfigs | map | Additional properties maps to specific security-related parameters. For enhanced security, you can hide the sensitive values using environment variables as secrets. | |
Confluent Like | Configuration for Confluent-like schema registries | ||
host | string | URL of your schema registry. | |
cacheSize | string | 50 | Number of schemas that can be cached locally by this Interceptor so that it doesn’t have to query the schema registry every time. |
AWS Glue | Configuration for AWS Glue schema registries | ||
region | string | The AWS region for the schema registry, e.g. us-east-1 | |
registryName | string | The name of the schema registry in AWS (leave blank for the AWS default of default-registry ) | |
basicCredentials | string | Access credentials for AWS (see below section for structure) | |
AWS Credentials | AWS Credentials Configuration | ||
accessKey | string | The access key for the connection to the schema registry. | |
secretKey | string | The secret key for the connection to the schema registry. | |
validateCredentials | bool | true | true / false flag to determine whether the credentials provided should be validated when set. |
accountId | string | The Id for the AWS account to use. |
basicCredentials
section for the AWS Glue schema registry, the client we use to connect will instead attempt to find the connection information is needs from the environment, and the credentials required can be passed this way to Gateway as part of its core configuration. More information on the setup for this is found in the AWS documentation.
Read our blog about schema registry.
minimum
, maximum
, exclusiveMinimum
, exclusiveMaximum
, multipleOf
minLength
, maxLength
, pattern
, format
maxItems
, minItems
format
values:
byte
, date
, time
, date-time
, duration
, uri
, uri-reference
, uri-template
, uri
, email
, hostname
, ipv4
, ipv6
, regex
, uuid
, json-pointer
, json-pointer-uri-fragment
, relative-json-pointer
Key | Type | Description |
---|---|---|
name | string | Rule name |
expression | string | CEL expression for validation, must return BOOLEAN |
message | string | Error message if payload not matches the expression with namespace message. represents for produced message |