Skip to main content

Fetch Policy

Introduction

The fetch policy interceptor will be able to encourage (log) or block fetch requests that do not meet the specified configuration.

What happens when sending an invalid request

Any request that doesn't match the interceptor's configuration will be blocked and return the corresponding error message.

For example: you want to send fetch request with isolationLevel=read_committed, but the interceptor is being configured isolationLevel=read_uncommitted.

When you send that request to the cluster, consumer will retry the request and the following error is logged in the gateway:

org.apache.kafka.common.errors.PolicyViolationException: 
Request parameters do not satisfy the configured policy. Topic 'topicName' with isolationLevel is READ_UNCOMMITTED, must be READ_COMMITTED

Configuration

keytypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied. If no value is set, it will be applied to all topics.
isolationLevelIsolationLevelConfiguration for isolation level
rackIdRequiredBooleanConfiguration of rankId usage
fetchMaxBytesSafeguardIntegerConfigConfiguration for maxBytes
fetchMinBytesSafeguardIntegerConfigConfiguration for minBytes
maxWaitMsSafeguardIntegerConfigConfiguration for maxWaitMs
versionVersionConfiguration for fetch version

Isolation Level

keytypedefaultdescription
valueIsolationIsolation level for fetch request
actionActionBLOCKAction to take if the value is outside the specified range.

Boolean

keytypedefaultdescription
valueBooleanValue for the configuration
actionActionBLOCKAction to take if the value is outside the specified range.

Version

keytypedefaultdescription
minintMinimum value of fetch version
maxintMaximum value of fetch version
actionActionBLOCKAction to take if the value is outside the specified range.

SafeguardIntegerConfig

keytypedefaultdescription
minintMinimum value of property
maxintMaximum value of property
actionActionBLOCKAction to take if the value is outside the specified range.

Isolation

  • read_uncommitted
  • read_committed

Action

  • BLOCK → when fail, save in audit and return error.
  • INFO → execute API with wrong value, save in audit.

Example

{
"name": "myFetchPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.FetchPolicyPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"isolationLevel": {
"value": "read_uncommitted",
"action": "BLOCK"
},
"rackIdRequired": {
"value": true,
"action": "BLOCK"
},
"fetchMaxBytes": {
"min": 1000,
"max": 3000,
"action": "INFO"
},
"fetchMinBytes": {
"min": 1,
"max": 500,
"action": "INFO"
},
"maxWaitMs": {
"min": 10000,
"max": 20000,
"action": "INFO"
},
"version": {
"min": 1,
"max": 3,
"action": "BLOCK"
}
}
}