Skip to main content
Quick navigation

Produce Policy

Introduction

The produce policy interceptor will impose limits on incoming messages to kafka to ensure that messages going to kafka adhere to the configured specification.

What happens when sending an invalid request

Any request that doesn't match the interceptor's configuration will be blocked and return the corresponding error message.

For example: you want to send record without header, but the interceptor is being configured recordHeaderRequired=true.

When you send that request to the cluster, the following error is returned:

org.apache.kafka.common.errors.PolicyViolationException: 
Request parameters do not satisfy the configured policy. Headers are required

Configuration

keytypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied. If no value is set, it will be applied to all topics.
acksAcksConfiguration for acks modes
recordHeaderRequiredBooleanConfiguration of header usage
compressionsCompression TypeConfiguration for compression types
idempotenceRequiredBooleanConfiguration for idempotency usage
transactionRequiredBooleanConfiguration for transaction usage
versionVersionConfiguration for produce version

Acks

keytypedefaultdescription
valueArray[integer]Only these acks modes are allowed, allowed values: -1, 0, 1
actionActionBLOCKAction to take if the value is outside the specified range.

Boolean

keytypedefaultdescription
valueBooleanValue for the configuration. If action is OVERRIDE, will use this value for override value
actionActionBLOCKAction to take if the value is outside the specified range.

Version

keytypedefaultdescription
minintMinimum value of produce version
maxintMaximum value of produce version
actionActionBLOCKAction to take if the value is outside the specified range.

Compression Type

keytypedefaultdescription
valuesSet[Compression]Set of string contains compression types.
actionActionBLOCKAction to take if the value is outside the specified range. `
overrideValueCompressionValue to override with (only applicable when action is set to OVERRIDE).

Compression

  • NONE
  • GZIP
  • SNAPPY
  • LZ4
  • ZSTD

Action

  • BLOCK → when fail, save in audit and return error.
  • INFO → execute API with wrong value, save in audit.

Example

{
"name": "myProducePolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ProducePolicyPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"acks": {
"value": [
-1,
0
],
"action": "BLOCK"
},
"recordHeaderRequired": {
"value": true,
"action": "BLOCK"
},
"compressions": {
"value": [
"NONE",
"GZIP"
],
"action": "INFO"
},
"idempotenceRequired": {
"value": true,
"action": "INFO"
},
"transactionRequired": {
"value": true
},
"version": {
"min": 1,
"max": 3,
"action": "BLOCK"
}
}
}