Skip to main content
Quick navigation

Consumer Group Policy

Introduction

The consumer group policy interceptor is designed to enhance the reliability and efficiency of Kafka consumer group operations.

By enforcing specific configuration policies, it ensures that consumer groups adhere to predefined rules, thereby preventing potential issues.

What happens when sending an invalid request

For example: you configure consumer with groupId is invalid_group_id, but the interceptor is being configured groupId=conduktor_group_id.*.

on BLOCK

Any request that doesn't match the interceptor's configuration will be blocked and return the corresponding error message.

When a consumer sends that configuration to the cluster, the following error is returned:

org.apache.kafka.common.errors.PolicyViolationException: 
Request parameters do not satisfy the configured policy. GroupId 'invalid_group_id' is invalid.`

on INFO

invalid_group_id is still accepted and you will receive an audit record with the following error: Request parameters do not satisfy the configured policy. GroupId 'invalid_group_id' is invalid.

Configuration

keytypedescription
groupIdRegexConfiguration for groupId.
sessionTimeoutMsIntegerConfiguration for session timeout.
rebalanceTimeoutMsIntegerConfiguration for rebalance timeout.
memberIdRegexConfiguration for memberId.
groupInstanceIdRegexConfiguration for groupInstanceId.

Regex

keytypedefaultdescription
valueStringValue as a regex, request values matching this regex will have interceptor applied.
actionActionBLOCKAction to take if the value is outside the specified range.

Integer

keytypedefaultdescription
minintMinimum value for the configuration.
maxintMaximum value for the configuration.
actionActionBLOCKAction to take if the value is outside the specified range.
overrideValueintValue to override with (only applicable when action is set to OVERRIDE).

Action

  • BLOCK → when fail, save in audit and return error.
  • INFO → execute API with wrong value, save in audit.
  • OVERRIDE → execute API with overrideValue values, save in audit the fact that we updated on the fly (with wrong value, and the one we used to fix them).

Example

{
"name": "myConsumerGroupPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin",
"priority": 100,
"config": {
"groupId": {
"value": "group.*",
"action": "BLOCK"
},
"sessionTimeoutMs": {
"max": 60000,
"action": "INFO"
},
"rebalanceTimeoutMs": {
"min": 30000,
"action": "OVERRIDE",
"overrideValue": 40000
},
"memberId": {
"value": "member.*",
"action": "INFO"
},
"groupInstanceId": {
"value": "groupInstance.*",
"action": "BLOCK"
}
}
}