Skip to main content
Quick navigation

What is a safeguard?

Enforce your rules where it matters

Safeguard ensures that your teams follow your rules and can't break convention.

Enable your teams, prevent common mistakes, protect your infra.

View the full demo in realtime

You can either follow all the steps manually, or watch the recording

Review the docker compose environment

As can be seen from docker-compose.yaml the demo environment consists of the following services:

  • gateway1
  • gateway2
  • kafka-client
  • kafka1
  • kafka2
  • kafka3
  • schema-registry
  • zookeeper
cat docker-compose.yaml

Starting the docker environment

Start all your docker processes, wait for them to be up and ready, then run in background

  • --wait: Wait for services to be running|healthy. Implies detached mode.
  • --detach: Detached mode: Run containers in the background
docker compose up --detach --wait

Creating virtual cluster teamA

Creating virtual cluster teamA on gateway gateway1 and reviewing the configuration file to access it

# Generate virtual cluster teamA with service account sa
token=$(curl \
--request POST "http://localhost:8888/admin/vclusters/v1/vcluster/teamA/username/sa" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data-raw '{"lifeTimeSeconds": 7776000}' | jq -r ".token")

# Create access file
echo """
bootstrap.servers=localhost:6969
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='sa' password='$token';
""" > teamA-sa.properties

# Review file
cat teamA-sa.properties

Creating topic cars on teamA

Creating on teamA:

  • Topic cars with partitions:1 and replication-factor:1
kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--replication-factor 1 \
--partitions 1 \
--create --if-not-exists \
--topic cars

Producing 3 messages in cars

Produce 3 records to the cars topic.

Sending 3 events

{
"type" : "Ferrari",
"color" : "red",
"price" : 10000
}
{
"type" : "RollsRoyce",
"color" : "black",
"price" : 9000
}
{
"type" : "Mercedes",
"color" : "black",
"price" : 6000
}

with

echo '{"type":"Ferrari","color":"red","price":10000}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--topic cars

echo '{"type":"RollsRoyce","color":"black","price":9000}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--topic cars

echo '{"type":"Mercedes","color":"black","price":6000}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--topic cars

Consume the cars topic

Let's confirm the 3 cars are there by consuming from the cars topic.

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic cars \
--from-beginning \
--max-messages 3 \
--timeout-ms 10000 | jq

returns

Processed a total of 3 messages
{
"type": "Ferrari",
"color": "red",
"price": 10000
}
{
"type": "RollsRoyce",
"color": "black",
"price": 9000
}
{
"type": "Mercedes",
"color": "black",
"price": 6000
}

Describing topic cars

Replication factor is 1?

This is bad: we can lose data!

kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--describe \
--topic cars

Adding interceptor guard-on-create-topic

Let's make sure this problem never repeats itself and add a topic creation safeguard.

... and while we're at it, let's make sure we don't abuse partitions either

Creating the interceptor named guard-on-create-topic of the plugin io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority" : 100,
"config" : {
"replicationFactor" : {
"min" : 2,
"max" : 2
},
"numPartition" : {
"min" : 1,
"max" : 3
}
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/guard-on-create-topic" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-10-guard-on-create-topic.json | jq

Listing interceptors for teamA

Listing interceptors on gateway1 for virtual cluster teamA

curl \
--request GET 'http://localhost:8888/admin/interceptors/v1/vcluster/teamA' \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent | jq

Create a topic that is not within policy

Topic creation is denied by our policy

kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--replication-factor 1 \
--partitions 100 \
--create --if-not-exists \
--topic roads

[!IMPORTANT] We get the following exception

org.apache.kafka.common.errors.PolicyViolationException:
> Request parameters do not satisfy the configured policy.
>Topic 'roads' with number partitions is '100', must not be greater than 3.
>Topic 'roads' with replication factor is '1', must not be less than 2

Let's now create it again, with parameters within our policy

Perfect, it has been created

kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--replication-factor 2 \
--partitions 3 \
--create --if-not-exists \
--topic roads

Adding interceptor guard-on-alter-topic

Let's make sure we enforce policies when we alter topics too

Here the retention can only be between 1 and 5 days

Creating the interceptor named guard-on-alter-topic of the plugin io.conduktor.gateway.interceptor.safeguard.AlterTopicConfigPolicyPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.AlterTopicConfigPolicyPlugin",
"priority" : 100,
"config" : {
"retentionMs" : {
"min" : 86400000,
"max" : 432000000
}
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/guard-on-alter-topic" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-14-guard-on-alter-topic.json | jq

Update 'cars' with a retention of 60 days

Altering the topic is denied by our policy

kafka-configs \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--alter \
--entity-type topics \
--entity-name roads \
--add-config retention.ms=5184000000

[!IMPORTANT] We get the following exception

org.apache.kafka.common.errors.PolicyViolationException:
> Request parameters do not satisfy the configured policy. Resource 'roads' with retention.ms is '5184000000', must not be greater than '432000000'

Update 'cars' with a retention of 3 days

Topic updated successfully

kafka-configs \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--alter \
--entity-type topics \
--entity-name roads \
--add-config retention.ms=259200000

Adding interceptor guard-on-produce

Let's make sure we enforce policies also at produce time!

Here message shall be sent with compression and with the right level of resiliency

Creating the interceptor named guard-on-produce of the plugin io.conduktor.gateway.interceptor.safeguard.ProducePolicyPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.ProducePolicyPlugin",
"priority" : 100,
"config" : {
"acks" : {
"value" : [ -1 ],
"action" : "BLOCK"
},
"compressions" : {
"value" : [ "NONE", "GZIP" ],
"action" : "BLOCK"
}
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/guard-on-produce" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-17-guard-on-produce.json | jq

Produce sample data to our cars topic without the right policies

Produce 1 record ... that do not match our policy

Sending 1 event

{
"type" : "Fiat",
"color" : "red",
"price" : -1
}

with

echo '{"type":"Fiat","color":"red","price":-1}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--request-required-acks 1 \
--compression-codec snappy \
--topic cars

[!IMPORTANT] We get the following exception

org.apache.kafka.common.errors.PolicyViolationException:
> Request parameters do not satisfy the configured policy.
>Topic 'cars' with invalid value for 'acks': 1. Valid value is one of the values: -1.
>Topic 'cars' with invalid value for 'compressions': SNAPPY. Valid value is one of the values: [GZIP, NONE]

Produce sample data to our cars topic that complies with our policy

Producing a record matching our policy

Sending 1 event

{
"type" : "Fiat",
"color" : "red",
"price" : -1
}

with

echo '{"type":"Fiat","color":"red","price":-1}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--request-required-acks -1 \
--compression-codec gzip \
--topic cars

Adding interceptor produce-rate

Let's add some rate limiting policy on produce

Creating the interceptor named produce-rate of the plugin io.conduktor.gateway.interceptor.safeguard.ProducerRateLimitingPolicyPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.ProducerRateLimitingPolicyPlugin",
"priority" : 100,
"config" : {
"maximumBytesPerSecond" : 1
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/produce-rate" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-20-produce-rate.json | jq

Produce sample data

Do not match our produce rate policy

Sending 1 event

{
"type" : "Fiat",
"color" : "red",
"price" : -1
}

with

echo '{"type":"Fiat","color":"red","price":-1}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--request-required-acks -1 \
--compression-codec none \
--topic cars

Check in the audit log that produce was throttled

Check in the audit log that produce was throttled in cluster kafka1

kafka-console-consumer \
--bootstrap-server localhost:19092,localhost:19093,localhost:19094 \
--topic _auditLogs \
--from-beginning \
--timeout-ms 3000 \
| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.ProducerRateLimitingPolicyPlugin")'

returns

Processed a total of 38 messages
{
"id": "5b0fd5aa-742f-48dd-822b-f23b2187302d",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53647"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:54:32.674463045Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.ProducerRateLimitingPolicyPlugin",
"message": "Client produced (108) bytes, which is more than 1 bytes per second, producer will be throttled by 134 milliseconds"
}
}

Remove interceptor produce-rate

curl \
--request DELETE "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/produce-rate" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent | jq

Adding interceptor consumer-group-name-policy

Let's add some naming conventions on consumer group names

Creating the interceptor named consumer-group-name-policy of the plugin io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin",
"priority" : 100,
"config" : {
"groupId" : {
"value" : "my-group.*",
"action" : "BLOCK"
}
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/consumer-group-name-policy" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-24-consumer-group-name-policy.json | jq

Consuming from cars

Consuming from cars in cluster teamA

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic cars \
--from-beginning \
--timeout-ms 10000 \
--group group-not-within-policy | jq

[!IMPORTANT] We get the following exception

Unexpected error in join group response: Request parameters do not satisfy the configured policy.

returns

[2024-02-14 03:54:38,590] ERROR [Consumer clientId=console-consumer, groupId=group-not-within-policy] JoinGroup failed due to unexpected error: Request parameters do not satisfy the configured policy. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.common.KafkaException: Unexpected error in join group response: Request parameters do not satisfy the configured policy.
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:711)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:603)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1270)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1245)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1255)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:473)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

Check in the audit log that fetch was denied

Check in the audit log that fetch was denied in cluster kafka1

kafka-console-consumer \
--bootstrap-server localhost:19092,localhost:19093,localhost:19094 \
--topic _auditLogs \
--from-beginning \
--timeout-ms 3000 \
| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin")'

returns

Processed a total of 43 messages
{
"id": "03adead3-35e4-40e9-875d-b94c946f89ad",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:49990"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:54:38.585534131Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin",
"message": "Request parameters do not satisfy the configured policy. GroupId 'group-not-within-policy' is invalid, naming convention must match with regular expression my-group.*"
}
}

Consuming from cars

Consuming from cars in cluster teamA

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic cars \
--from-beginning \
--timeout-ms 10000 \
--group my-group-within-policy | jq

returns

Processed a total of 5 messages
{
"type": "Ferrari",
"color": "red",
"price": 10000
}
{
"type": "RollsRoyce",
"color": "black",
"price": 9000
}
{
"type": "Mercedes",
"color": "black",
"price": 6000
}
{
"type": "Fiat",
"color": "red",
"price": -1
}
{
"type": "Fiat",
"color": "red",
"price": -1
}

Remove interceptor consumer-group-name-policy

curl \
--request DELETE "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/consumer-group-name-policy" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent | jq

Adding interceptor guard-limit-connection

Let's add some connect limitation policy

Creating the interceptor named guard-limit-connection of the plugin io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"priority" : 100,
"config" : {
"maximumConnectionsPerSecond" : 1,
"action" : "BLOCK"
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/guard-limit-connection" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-29-guard-limit-connection.json | jq

Consuming from cars

Consuming from cars in cluster teamA

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic cars \
--from-beginning \
--timeout-ms 10000 \
--group my-group-id-convention-cars | jq

returns

[2024-02-14 03:54:57,112] WARN [Consumer clientId=console-consumer, groupId=my-group-id-convention-cars] Received error POLICY_VIOLATION from node 2147483644 when making an ApiVersionsRequest with correlation id 4. Disconnecting. (org.apache.kafka.clients.NetworkClient)
[2024-02-14 03:54:57,947] WARN [Consumer clientId=console-consumer, groupId=my-group-id-convention-cars] Received error POLICY_VIOLATION from node 3 when making an ApiVersionsRequest with correlation id 15. Disconnecting. (org.apache.kafka.clients.NetworkClient)
[2024-02-14 03:54:59,105] WARN [Consumer clientId=console-consumer, groupId=my-group-id-convention-cars] Received error POLICY_VIOLATION from node 3 when making an ApiVersionsRequest with correlation id 18. Disconnecting. (org.apache.kafka.clients.NetworkClient)
Processed a total of 5 messages
{
"type": "Ferrari",
"color": "red",
"price": 10000
}
{
"type": "RollsRoyce",
"color": "black",
"price": 9000
}
{
"type": "Mercedes",
"color": "black",
"price": 6000
}
{
"type": "Fiat",
"color": "red",
"price": -1
}
{
"type": "Fiat",
"color": "red",
"price": -1
}

Check in the audit log that connection was denied

Check in the audit log that connection was denied in cluster kafka1

kafka-console-consumer \
--bootstrap-server localhost:19092,localhost:19093,localhost:19094 \
--topic _auditLogs \
--from-beginning \
--timeout-ms 3000 \
| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin")'

returns

{
"id": "f1a4068b-7e2f-4bd4-b352-f62ccaecaaad",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53736"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:54:57.103049668Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message": "Client connections exceed the limitation of 1 connections per second"
}
}
{
"id": "a78d7253-152e-44f8-8e85-ffaa96482569",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53739"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:54:57.948077251Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message": "Client connections exceed the limitation of 1 connections per second"
}
}
{
"id": "c9d3075f-e1dd-4c42-be62-872031245f73",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53740"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:54:59.096660544Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message": "Client connections exceed the limitation of 1 connections per second"
}
}
{
"id": "4cb22e9f-1fee-4254-9396-cfee4838f8d1",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53741"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:55:01.409479670Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message": "Client connections exceed the limitation of 1 connections per second"
}
}
{
"id": "69489f65-0987-4f93-80cf-2d32a28d649d",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53741"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:55:03.306784796Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message": "Client connections exceed the limitation of 1 connections per second"
}
}
{
"id": "bc5161f7-336c-4e0e-a298-3035410d9ff4",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53741"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:55:07.045165506Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message": "Client connections exceed the limitation of 1 connections per second"
}
}
{
"id": "b4585baa-2cfa-405e-9810-90f0791238d9",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53741"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:55:07.860314506Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message": "Client connections exceed the limitation of 1 connections per second"
}
}
{
"id": "94d3541d-0645-4b64-a76b-49f5427e896d",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
Processed a total of 63 messages
pal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53741"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:55:08.682395881Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"message": "Client connections exceed the limitation of 1 connections per second"
}
}

Remove interceptor guard-limit-connection

curl \
--request DELETE "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/guard-limit-connection" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent | jq

Adding interceptor guard-agressive-auto-commit

Let's block aggressive auto-commits strategies

Creating the interceptor named guard-agressive-auto-commit of the plugin io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"priority" : 100,
"config" : {
"maximumCommitsPerMinute" : 1,
"action" : "BLOCK"
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/guard-agressive-auto-commit" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-33-guard-agressive-auto-commit.json | jq

Consuming from cars

Consuming from cars in cluster teamA

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic cars \
--from-beginning \
--timeout-ms 10000 \
--group group-with-aggressive-autocommit | jq

returns

[2024-02-14 03:55:26,266] ERROR [Consumer clientId=console-consumer, groupId=group-with-aggressive-autocommit] Offset commit failed on partition cars-0 at offset 5: Request parameters do not satisfy the configured policy. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-02-14 03:55:26,818] ERROR [Consumer clientId=console-consumer, groupId=group-with-aggressive-autocommit] Offset commit failed on partition cars-0 at offset 5: Request parameters do not satisfy the configured policy. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-02-14 03:55:26,819] WARN [Consumer clientId=console-consumer, groupId=group-with-aggressive-autocommit] Asynchronous auto-commit of offsets {cars-0=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} failed: Unexpected error in commit: Request parameters do not satisfy the configured policy. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-02-14 03:55:26,819] WARN [Consumer clientId=console-consumer, groupId=group-with-aggressive-autocommit] Synchronous auto-commit of offsets {cars-0=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} failed: Unexpected error in commit: Request parameters do not satisfy the configured policy. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Processed a total of 5 messages
{
"type": "Ferrari",
"color": "red",
"price": 10000
}
{
"type": "RollsRoyce",
"color": "black",
"price": 9000
}
{
"type": "Mercedes",
"color": "black",
"price": 6000
}
{
"type": "Fiat",
"color": "red",
"price": -1
}
{
"type": "Fiat",
"color": "red",
"price": -1
}

Check in the audit log that connection was denied

Check in the audit log that connection was denied in cluster kafka1

kafka-console-consumer \
--bootstrap-server localhost:19092,localhost:19093,localhost:19094 \
--topic _auditLogs \
--from-beginning \
--timeout-ms 3000 \
| jq 'select(.type=="SAFEGUARD" and .eventData.plugin=="io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin")'

returns

Processed a total of 70 messages
{
"id": "45f3eb08-4422-4504-8236-6eccb591b978",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53796"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:55:26.250918792Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message": "Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
}
}
{
"id": "056b7df1-faed-4812-9d2a-75d3aa3118eb",
"source": "krn://cluster=A_OiC5SRRuCYf3vkbsYqBA",
"type": "SAFEGUARD",
"authenticationPrincipal": "teamA",
"userName": "sa",
"connection": {
"localAddress": null,
"remoteAddress": "/192.168.65.1:53796"
},
"specVersion": "0.1.0",
"time": "2024-02-14T02:55:26.812908501Z",
"eventData": {
"level": "error",
"plugin": "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"message": "Client calls join group (group-with-aggressive-autocommit) exceed the limitation of 1 commits per minute"
}
}

Tearing down the docker environment

Remove all your docker processes and associated volumes

  • --volumes: Remove named volumes declared in the "volumes" section of the Compose file and anonymous volumes attached to containers.
docker compose down --volumes

Conclusion

Safeguard is really a game changer!