What is a safeguard?
Enforce your rules where it matters
Safeguard ensures that your teams follow your rules and can't break convention.
Enable your teams, prevent common mistakes, protect your infra.
View the full demo in realtime
Review the docker compose environment
As can be seen from docker-compose.yaml
the demo environment consists of the following services:
- gateway1
- gateway2
- kafka-client
- kafka1
- kafka2
- kafka3
- schema-registry
cat docker-compose.yaml
image: confluentinc/cp-server:7.5.0
hostname: kafka1
container_name: kafka1
- 9092:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:29093,2@kafka2:29093,3@kafka3:29093
KAFKA_PROCESS_ROLES: broker,controller
test: nc -zv kafka1 29092 || exit 1
interval: 5s
retries: 25
image: confluentinc/cp-server:7.5.0
hostname: kafka2
container_name: kafka2
- 9093:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:29093,2@kafka2:29093,3@kafka3:29093
KAFKA_PROCESS_ROLES: broker,controller
test: nc -zv kafka1 29092 || exit 1
interval: 5s
retries: 25
image: confluentinc/cp-server:7.5.0
hostname: kafka3
container_name: kafka3
- 9094:9094
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:29093,2@kafka2:29093,3@kafka3:29093
KAFKA_PROCESS_ROLES: broker,controller
test: nc -zv kafka3 29092 || exit 1
interval: 5s
retries: 25
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
container_name: schema-registry
- 8081:8081
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:29092,kafka2:29092,kafka3:29092
- type: bind
source: .
target: /clientConfig
read_only: true
condition: service_healthy
condition: service_healthy
condition: service_healthy
test: nc -zv schema-registry 8081 || exit 1
interval: 5s
retries: 25
image: harbor.cdkt.dev/conduktor/conduktor-gateway:3.5.0-SNAPSHOT
hostname: gateway1
container_name: gateway1
KAFKA_BOOTSTRAP_SERVERS: kafka1:29092,kafka2:29092,kafka3:29092
condition: service_healthy
condition: service_healthy
condition: service_healthy
- 6969:6969
- 6970:6970
- 6971:6971
- 6972:6972
- 8888:8888
test: curl localhost:8888/health
interval: 5s
retries: 25
image: harbor.cdkt.dev/conduktor/conduktor-gateway:3.5.0-SNAPSHOT
hostname: gateway2
container_name: gateway2
KAFKA_BOOTSTRAP_SERVERS: kafka1:29092,kafka2:29092,kafka3:29092
condition: service_healthy
condition: service_healthy
condition: service_healthy
- 7969:6969
- 7970:6970
- 7971:6971
- 7972:6972
- 8889:8888
test: curl localhost:8888/health
interval: 5s
retries: 25
image: confluentinc/cp-kafka:latest
hostname: kafka-client
container_name: kafka-client
command: sleep infinity
- type: bind
source: .
target: /clientConfig
read_only: true
Starting the docker environment
Start all your docker processes, wait for them to be up and ready, then run in background
: Wait for services to berunning|healthy
. Implies detached mode.--detach
: Detached mode: Run containers in the background
- Command
- Output
- Recording
docker compose up --detach --wait
Network safeguard_default Creating
Network safeguard_default Created
Container kafka2 Creating
Container kafka3 Creating
Container kafka-client Creating
Container kafka1 Creating
Container kafka-client Created
Container kafka3 Created
Container kafka2 Created
Container kafka1 Created
Container schema-registry Creating
Container gateway2 Creating
Container gateway1 Creating
Container gateway2 Created
Container gateway1 Created
Container schema-registry Created
Container kafka-client Starting
Container kafka1 Starting
Container kafka3 Starting
Container kafka2 Starting
Container kafka2 Started
Container kafka1 Started
Container kafka-client Started
Container kafka3 Started
Container kafka1 Waiting
Container kafka3 Waiting
Container kafka1 Waiting
Container kafka3 Waiting
Container kafka1 Waiting
Container kafka2 Waiting
Container kafka2 Waiting
Container kafka2 Waiting
Container kafka3 Waiting
Container kafka3 Healthy
Container kafka2 Healthy
Container kafka1 Healthy
Container kafka3 Healthy
Container kafka1 Healthy
Container gateway2 Starting
Container kafka3 Healthy
Container kafka1 Healthy
Container kafka2 Healthy
Container schema-registry Starting
Container kafka2 Healthy
Container gateway1 Starting
Container schema-registry Started
Container gateway2 Started
Container gateway1 Started
Container gateway2 Waiting
Container kafka-client Waiting
Container kafka1 Waiting
Container kafka2 Waiting
Container kafka3 Waiting
Container schema-registry Waiting
Container gateway1 Waiting
Container kafka2 Healthy
Container kafka1 Healthy
Container kafka3 Healthy
Container kafka-client Healthy
Container gateway1 Healthy
Container gateway2 Healthy
Container schema-registry Healthy
Creating topic cars on gateway1
Creating on gateway1
- Topic
with partitions:1 and replication-factor:1
- Command
- Output
- Recording
Producing 3 messages in cars
Produce 3 records to the cars topic.
Sending 3 events
"type" : "Ferrari",
"color" : "red",
"price" : 10000
"type" : "RollsRoyce",
"color" : "black",
"price" : 9000
"type" : "Mercedes",
"color" : "black",
"price" : 6000
- Command
- Output
- Recording
echo '{"type":"Ferrari","color":"red","price":10000}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--topic cars
echo '{"type":"RollsRoyce","color":"black","price":9000}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--topic cars
echo '{"type":"Mercedes","color":"black","price":6000}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--topic cars
Consume the cars topic
Let's confirm the 3 cars are there by consuming from the cars topic.
- Command
- Output
- Recording
kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic cars \
--from-beginning \
--max-messages 4 \
--timeout-ms 3000 | jq
[2024-11-25 21:14:18,186] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
Processed a total of 3 messages
"type": "Ferrari",
"color": "red",
"price": 10000
"type": "RollsRoyce",
"color": "black",
"price": 9000
"type": "Mercedes",
"color": "black",
"price": 6000
Describing topic cars
Replication factor is 1?
This is bad: we can lose data!
- Command
- Output
- Recording
Adding interceptor guard-on-create-topic
Let's make sure this problem never repeats itself and add a topic creation safeguard.
... and while we're at it, let's make sure we don't abuse partitions either
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "guard-on-create-topic"
"spec" : {
"comment" : "Adding interceptor: guard-on-create-topic",
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority" : 100,
"config" : {
"replicationFactor" : {
"min" : 2,
"max" : 2
"numPartition" : {
"min" : 1,
"max" : 3
- Command
- Output
- Recording
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-09-guard-on-create-topic-interceptor.json | jq
"resource": {
"kind": "Interceptor",
"apiVersion": "gateway/v2",
"metadata": {
"name": "guard-on-create-topic",
"scope": {
"vCluster": "passthrough",
"group": null,
"username": null
"spec": {
"comment": "Adding interceptor: guard-on-create-topic",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority": 100,
"config": {
"replicationFactor": {
"min": 2,
"max": 2
"numPartition": {
"min": 1,
"max": 3
"upsertResult": "CREATED"
Listing interceptors
Listing interceptors on gateway1
- Command
- Output
- Recording
curl \
--silent \
--request GET "http://localhost:8888/gateway/v2/interceptor" \
--user "admin:conduktor" | jq
"kind": "Interceptor",
"apiVersion": "gateway/v2",
"metadata": {
"name": "guard-on-create-topic",
"scope": {
"vCluster": "passthrough",
"group": null,
"username": null
"spec": {
"comment": "Adding interceptor: guard-on-create-topic",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority": 100,
"config": {
"replicationFactor": {
"min": 2,
"max": 2
"numPartition": {
"min": 1,
"max": 3
Create a topic that is not within policy
Topic creation is denied by our policy
- Command
- Output
- Recording
kafka-topics \
--bootstrap-server localhost:6969 \
--replication-factor 1 \
--partitions 100 \
--create --if-not-exists \
--topic roads
[!IMPORTANT] We get the following exception
> Request parameters do not satisfy the configured policy.
>Topic 'roads' with number partitions is '100', must not be greater than 3.
>Topic 'roads' with replication factor is '1', must not be less than 2
Error while executing topic command : Request parameters do not satisfy the configured policy. Topic 'roads' with number partitions is '100', must not be greater than 3. Topic 'roads' with replication factor is '1', must not be less than 2
[2024-11-25 21:14:21,040] ERROR org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy. Topic 'roads' with number partitions is '100', must not be greater than 3. Topic 'roads' with replication factor is '1', must not be less than 2
Let's now create it again, with parameters within our policy
Perfect, it has been created
- Command
- Output
- Recording
Adding interceptor guard-on-alter-topic
Let's make sure we enforce policies when we alter topics too
Here the retention can only be between 1 and 5 days
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "guard-on-alter-topic"
"spec" : {
"comment" : "Adding interceptor: guard-on-alter-topic",
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.AlterTopicConfigPolicyPlugin",
"priority" : 100,
"config" : {
"retentionMs" : {
"min" : 86400000,
"max" : 432000000
- Command
- Output
- Recording
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-13-guard-on-alter-topic-interceptor.json | jq
"resource": {
"kind": "Interceptor",
"apiVersion": "gateway/v2",
"metadata": {
"name": "guard-on-alter-topic",
"scope": {
"vCluster": "passthrough",
"group": null,
"username": null
"spec": {
"comment": "Adding interceptor: guard-on-alter-topic",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.AlterTopicConfigPolicyPlugin",
"priority": 100,
"config": {
"retentionMs": {
"min": 86400000,
"max": 432000000
"upsertResult": "CREATED"
Update 'cars' with a retention of 60 days
Altering the topic is denied by our policy
- Command
- Output
- Recording
kafka-configs \
--bootstrap-server localhost:6969 \
--alter \
--entity-type topics \
--entity-name roads \
--add-config retention.ms=5184000000
[!IMPORTANT] We get the following exception
> Request parameters do not satisfy the configured policy. Resource 'roads' with retention.ms is '5184000000', must not be greater than '432000000'
Error while executing config command with args '--bootstrap-server localhost:6969 --alter --entity-type topics --entity-name roads --add-config retention.ms=5184000000'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy. Resource 'roads' with retention.ms is '5184000000', must not be greater than '432000000'
at java.base/java.util.concurrent.CompletableFuture.wrapInExecutionException(CompletableFuture.java:345)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:440)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2140)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:374)
at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:341)
at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:97)
at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy. Resource 'roads' with retention.ms is '5184000000', must not be greater than '432000000'
Update 'cars' with a retention of 3 days
Topic updated successfully
- Command
- Output
- Recording
Adding interceptor guard-on-produce
Let's make sure we enforce policies also at produce time!
Here message shall be sent with compression and with the right level of resiliency
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "guard-on-produce"
"spec" : {
"comment" : "Adding interceptor: guard-on-produce",
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.ProducePolicyPlugin",
"priority" : 100,
"config" : {
"acks" : {
"value" : [ -1 ],
"action" : "BLOCK"
"compressions" : {
"value" : [ "NONE", "GZIP" ],
"action" : "BLOCK"
- Command
- Output
- Recording
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-16-guard-on-produce-interceptor.json | jq
"resource": {
"kind": "Interceptor",
"apiVersion": "gateway/v2",
"metadata": {
"name": "guard-on-produce",
"scope": {
"vCluster": "passthrough",
"group": null,
"username": null
"spec": {
"comment": "Adding interceptor: guard-on-produce",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ProducePolicyPlugin",
"priority": 100,
"config": {
"acks": {
"value": [
"action": "BLOCK"
"compressions": {
"value": [
"action": "BLOCK"
"upsertResult": "CREATED"
Produce sample data to our cars topic without the right policies
Produce 1 record ... that do not match our policy
Sending 1 event
"type" : "Fiat",
"color" : "red",
"price" : -1
- Command
- Output
- Recording
echo '{"type":"Fiat","color":"red","price":-1}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--request-required-acks 1 \
--compression-codec snappy \
--topic cars
[!IMPORTANT] We get the following exception
> Request parameters do not satisfy the configured policy.
>Topic 'cars' with invalid value for 'acks': 1. Valid value is one of the values: -1.
>Topic 'cars' with invalid value for 'compressions': SNAPPY. Valid value is one of the values: [GZIP, NONE]
>>[2024-11-25 21:14:27,125] ERROR Error when sending message to topic cars with key: null, value: 40 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy. Topic 'cars' with invalid value for 'acks': 1. Valid value is one of the values: -1. Topic 'cars' with invalid value for 'compressions': SNAPPY. Valid value is one of the values: [GZIP, NONE]
Produce sample data to our cars topic that complies with our policy
Producing a record matching our policy
Sending 1 event
"type" : "Fiat",
"color" : "red",
"price" : -1
- Command
- Output
- Recording
Adding interceptor produce-rate
Let's add some rate limiting policy on produce
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "produce-rate"
"spec" : {
"comment" : "Adding interceptor: produce-rate",
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.ProducerRateLimitingPolicyPlugin",
"priority" : 100,
"config" : {
"maximumBytesPerSecond" : 1
- Command
- Output
- Recording
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-19-produce-rate-interceptor.json | jq
"resource": {
"kind": "Interceptor",
"apiVersion": "gateway/v2",
"metadata": {
"name": "produce-rate",
"scope": {
"vCluster": "passthrough",
"group": null,
"username": null
"spec": {
"comment": "Adding interceptor: produce-rate",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ProducerRateLimitingPolicyPlugin",
"priority": 100,
"config": {
"maximumBytesPerSecond": 1
"upsertResult": "CREATED"
Produce sample data
Do not match our produce rate policy
Sending 1 event
"type" : "Fiat",
"color" : "red",
"price" : -1
- Command
- Output
- Recording
Check in the audit log that produce was throttled
Check in the audit log that produce was throttled in cluster kafka1
- Command
- Output
- Recording
kafka-console-consumer \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--topic _conduktor_gateway_auditlogs \
--from-beginning \
--timeout-ms 3000 \| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.ProducerRateLimitingPolicyPlugin")'
returns 1 event
"id" : "33412f7a-3e09-4537-b510-7e7671a6dd01",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:45.687694921Z",
"eventData" : {
"interceptorName" : "produce-rate",
"level" : "warn",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.ProducerRateLimitingPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client produced (108) bytes, which is more than 1 bytes per second, producer will be throttled by 827 milliseconds"
Remove interceptor produce-rate
- Command
- Output
- Recording
Adding interceptor consumer-group-name-policy
Let's add some naming conventions on consumer group names
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "consumer-group-name-policy"
"spec" : {
"comment" : "Adding interceptor: consumer-group-name-policy",
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin",
"priority" : 100,
"config" : {
"groupId" : {
"value" : "my-group.*",
"action" : "BLOCK"
- Command
- Output
- Recording
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-23-consumer-group-name-policy-interceptor.json | jq
"resource": {
"kind": "Interceptor",
"apiVersion": "gateway/v2",
"metadata": {
"name": "consumer-group-name-policy",
"scope": {
"vCluster": "passthrough",
"group": null,
"username": null
"spec": {
"comment": "Adding interceptor: consumer-group-name-policy",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin",
"priority": 100,
"config": {
"groupId": {
"value": "my-group.*",
"action": "BLOCK"
"upsertResult": "CREATED"
Consuming from cars
Consuming from cars in cluster gateway1
- Command
- Output
- Recording
kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic cars \
--from-beginning \
--timeout-ms 3000 \
--group group-not-within-policy | jq
[!IMPORTANT] We get the following exception
Unexpected error in join group response: Request parameters do not satisfy the configured policy.
[2024-11-25 21:14:35,929] ERROR [Consumer clientId=console-consumer, groupId=group-not-within-policy] JoinGroup failed due to unexpected error: Request parameters do not satisfy the configured policy. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-11-25 21:14:35,930] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Unexpected error in join group response: Request parameters do not satisfy the configured policy.
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:741)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1300)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1275)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:686)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:473)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
Check in the audit log that fetch was denied
Check in the audit log that fetch was denied in cluster kafka1
- Command
- Output
- Recording
kafka-console-consumer \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--topic _conduktor_gateway_auditlogs \
--from-beginning \
--timeout-ms 3000 \| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin")'
returns 1 event
"id" : "94550d33-85bb-498a-b9dd-12726e4b31f4",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:49.189252548Z",
"eventData" : {
"interceptorName" : "consumer-group-name-policy",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. GroupId 'group-not-within-policy' is invalid, naming convention must match with regular expression my-group.*"
Consuming from cars
Consuming from cars in cluster gateway1
- Command
- Output
- Recording
kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic cars \
--from-beginning \
--timeout-ms 3000 \
--group my-group-within-policy | jq
[2024-11-25 21:14:44,838] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
Processed a total of 5 messages
"type": "Ferrari",
"color": "red",
"price": 10000
"type": "RollsRoyce",
"color": "black",
"price": 9000
"type": "Mercedes",
"color": "black",
"price": 6000
"type": "Fiat",
"color": "red",
"price": -1
"type": "Fiat",
"color": "red",
"price": -1
Remove interceptor consumer-group-name-policy
- Command
- Output
- Recording
Adding interceptor guard-limit-connection
Let's add some connect limitation policy
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "guard-limit-connection"
"spec" : {
"comment" : "Adding interceptor: guard-limit-connection",
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"priority" : 100,
"config" : {
"maximumConnectionsPerSecond" : 1,
"action" : "BLOCK"
- Command
- Output
- Recording
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-28-guard-limit-connection-interceptor.json | jq
"resource": {
"kind": "Interceptor",
"apiVersion": "gateway/v2",
"metadata": {
"name": "guard-limit-connection",
"scope": {
"vCluster": "passthrough",
"group": null,
"username": null
"spec": {
"comment": "Adding interceptor: guard-limit-connection",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"priority": 100,
"config": {
"maximumConnectionsPerSecond": 1,
"action": "BLOCK"
"upsertResult": "CREATED"
Consuming from cars
Consuming from cars in cluster gateway1
- Command
- Output
- Recording
kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic cars \
--from-beginning \
--timeout-ms 3000 \
--group my-group-id-convention-cars | jq
[2024-11-25 21:14:47,489] WARN [Consumer clientId=console-consumer, groupId=my-group-id-convention-cars] Received error POLICY_VIOLATION from node 2147483645 when making an ApiVersionsRequest with correlation id 7. Disconnecting. (org.apache.kafka.clients.NetworkClient)
[2024-11-25 21:14:47,943] WARN [Consumer clientId=console-consumer, groupId=my-group-id-convention-cars] Received error POLICY_VIOLATION from node 2147483645 when making an ApiVersionsRequest with correlation id 12. Disconnecting. (org.apache.kafka.clients.NetworkClient)
[2024-11-25 21:14:48,633] WARN [Consumer clientId=console-consumer, groupId=my-group-id-convention-cars] Received error POLICY_VIOLATION from node 2147483645 when making an ApiVersionsRequest with correlation id 17. Disconnecting. (org.apache.kafka.clients.NetworkClient)
[2024-11-25 21:14:49,354] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
[2024-11-25 21:14:49,417] WARN [Consumer clientId=console-consumer, groupId=my-group-id-convention-cars] Received error POLICY_VIOLATION from node 2147483645 when making an ApiVersionsRequest with correlation id 23. Disconnecting. (org.apache.kafka.clients.NetworkClient)
Processed a total of 0 messages
Check in the audit log that connection was denied
Check in the audit log that connection was denied in cluster kafka1
- Command
- Output
- Recording
kafka-console-consumer \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--topic _conduktor_gateway_auditlogs \
--from-beginning \
--timeout-ms 3000 \| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin")'
returns 8 events
"id" : "1e53a507-af73-4b99-bc84-5a807f3de770",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:56.701871260Z",
"eventData" : {
"interceptorName" : "guard-limit-connection",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client connections exceed the limitation of 1 connections per second"
"id" : "0fb8d183-5ff2-48b4-aabc-3467217450ef",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:56.959415551Z",
"eventData" : {
"interceptorName" : "guard-limit-connection",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client connections exceed the limitation of 1 connections per second"
"id" : "c93c0926-5922-4051-97ca-d1aa89c77de0",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:57.295491052Z",
"eventData" : {
"interceptorName" : "guard-limit-connection",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client connections exceed the limitation of 1 connections per second"
"id" : "df384dd9-db03-4fbd-a273-ada355e2ce2f",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:58.404869427Z",
"eventData" : {
"interceptorName" : "guard-limit-connection",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client connections exceed the limitation of 1 connections per second"
"id" : "66eea142-00c0-4ece-a09c-28874731dd90",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:58.564739886Z",
"eventData" : {
"interceptorName" : "guard-limit-connection",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client connections exceed the limitation of 1 connections per second"
"id" : "ef500ff9-0e8e-4620-acba-06d5a6d5d41b",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:58.875050011Z",
"eventData" : {
"interceptorName" : "guard-limit-connection",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client connections exceed the limitation of 1 connections per second"
"id" : "2bc84c96-4f01-4e0e-a8aa-05a72f0ed548",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:59.278733386Z",
"eventData" : {
"interceptorName" : "guard-limit-connection",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client connections exceed the limitation of 1 connections per second"
"id" : "f0aa524a-26c2-44c2-bd3e-89be157da595",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:12:59.314247386Z",
"eventData" : {
"interceptorName" : "guard-limit-connection",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client connections exceed the limitation of 1 connections per second"
Remove interceptor guard-limit-connection
- Command
- Output
- Recording
Adding interceptor guard-agressive-auto-commit
Let's block aggressive auto-commits strategies
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "guard-agressive-auto-commit"
"spec" : {
"comment" : "Adding interceptor: guard-agressive-auto-commit",
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"priority" : 100,
"config" : {
"maximumCommitsPerMinute" : 1,
"action" : "BLOCK"
- Command
- Output
- Recording
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-32-guard-agressive-auto-commit-interceptor.json | jq
"resource": {
"kind": "Interceptor",
"apiVersion": "gateway/v2",
"metadata": {
"name": "guard-agressive-auto-commit",
"scope": {
"vCluster": "passthrough",
"group": null,
"username": null
"spec": {
"comment": "Adding interceptor: guard-agressive-auto-commit",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"priority": 100,
"config": {
"maximumCommitsPerMinute": 1,
"action": "BLOCK"
"upsertResult": "CREATED"
Consuming from cars
Consuming from cars in cluster gateway1
- Command
- Output
- Recording
kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic cars \
--from-beginning \
--timeout-ms 3000 \
--group group-with-aggressive-autocommit | jq
[2024-11-25 21:15:01,591] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
[2024-11-25 21:15:01,919] ERROR [Consumer clientId=console-consumer, groupId=group-with-aggressive-autocommit] Offset commit failed on partition cars-0 at offset 5: Request parameters do not satisfy the configured policy. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-11-25 21:15:01,919] WARN [Consumer clientId=console-consumer, groupId=group-with-aggressive-autocommit] Synchronous auto-commit of offsets {cars-0=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} failed: Unexpected error in commit: Request parameters do not satisfy the configured policy. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Processed a total of 5 messages
"type": "Ferrari",
"color": "red",
"price": 10000
"type": "RollsRoyce",
"color": "black",
"price": 9000
"type": "Mercedes",
"color": "black",
"price": 6000
"type": "Fiat",
"color": "red",
"price": -1
"type": "Fiat",
"color": "red",
"price": -1
Check in the audit log that connection was denied
Check in the audit log that connection was denied in cluster kafka1
- Command
- Output
- Recording
kafka-console-consumer \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--topic _conduktor_gateway_auditlogs \
--from-beginning \
--timeout-ms 3000 \| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin")'
returns 97 events
"id" : "e5cd9e94-63f8-4b15-80be-4efd6d2e7293",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:02.366408596Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "496e69e6-bcdc-4267-8a81-9b4ce851ee23",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:02.581626346Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "ec86e5bd-0f79-4abd-b9b2-dafee6a5fd9d",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:02.853232388Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "0a16c8ec-404c-4c6e-b886-7edf00d55555",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:02.966332221Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "9678910d-b383-4152-a58a-f31525b52cda",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.057689263Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "83837ef3-8877-40ca-8db9-7ec59bf1c1b6",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.095049971Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "bc73d37b-51c0-424f-b72b-32f1acd015b6",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.110465679Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "38e2f0e9-4f6d-4784-87a5-7c5edbc68f26",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.166864388Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "4c444c80-a2d2-4195-b696-0df96135685a",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.174546179Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "b41dac87-107c-4cbf-9f8c-f5d194ada13a",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.178724263Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "f96af90e-15c9-4296-8afb-c45bed7c4b23",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.271905346Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "2422b1be-348b-49b4-8759-fbb4a6f6aa02",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.366436055Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "e24f5be4-c0a5-4702-bfe9-c6d495f6372d",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.481124555Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "25fe96cf-65c9-4101-8c67-fdea78bd1e47",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.536608388Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "4c30147e-f86c-44ed-8944-27d6ea721fc0",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.605800138Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "7793390a-da67-40db-83e5-13a11409c98f",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.711637555Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "b3e887e3-339b-4111-89eb-359eb72f0da0",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.744428721Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "1a5c191a-cb65-4fd6-a781-ac9f07d4a46a",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.777703846Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "8a3d7795-8960-493c-9845-34b6e61b0431",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.787502180Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "bda241f1-3ff1-4cf3-8d1a-99a13bfce86b",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.808810846Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "d63615e8-943b-4d41-9806-ec522bdbdcae",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.825743638Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "cddc30df-3111-40d4-b3dd-1448da4bb589",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.889991471Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "63677f38-e2d9-4491-8ae4-b30a44b07a4c",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:03.990913180Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "3fa4ac34-0c10-4b4b-9b89-c70e112c5b21",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.001465722Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "5e7e2c5b-a31d-46cf-8b35-48beadffeeb1",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.075681513Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "0e417065-ea73-4b96-9f15-3fda34e3afa9",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.108583597Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "0732634f-2b5b-400f-ba08-4f83d81e159c",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.212256138Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "00739454-2b1d-4058-bf60-8bed88226e22",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.219489680Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "c6749494-4bcb-48b1-a485-549df487ceef",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.269059055Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "388d4506-3576-485e-8166-70f438168d8d",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.327418305Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "1f952c09-9745-453a-b53e-7d7dfe37026f",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.392384097Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "8966b4b0-3d4d-48b7-8b30-fed93d90b43f",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.392373888Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "ab05a7b4-b061-4c97-8643-1a625a47c869",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.415559388Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "6c39bc84-b6ef-4cfa-b90f-9c168f016b0a",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.427850430Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "02960f88-e4b4-40c0-9b13-f4f3a63cdac8",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.439232555Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "3f217342-4ecd-4b7a-afb9-38bd8d2a07c5",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.498996055Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "bcd33017-cc9e-4269-b5c4-bf27d46fd710",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.600882305Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "81fc4b35-9bbc-4743-a0a8-e28b8ea83064",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.604968472Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "86ddba64-e340-408d-a00a-2d8fde1b9e6a",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",
"time" : "2024-11-25T21:13:04.634912763Z",
"eventData" : {
"interceptorName" : "guard-agressive-auto-commit",
"level" : "error",
"plugin" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message" : "Request parameters do not satisfy the configured policy. Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
"id" : "06fe8890-5c50-4c94-9b3f-bd719da08a28",
"source" : "krn://cluster=p0KPFA_mQb2ixdPbQXPblw",
"type" : "SAFEGUARD",
"authenticationPrincipal" : "passthrough",
"userName" : "anonymous",
"connection" : {
"localAddress" : null,
"remoteAddress" : "/"
"specVersion" : "0.1.0",