Skip to main content
Quick navigation

Dynamic Header Injection & Removal

There are multiple interceptors available for manipulating headers, either injection or regex based removal.

This demo will run you through some of these use cases step-by-step.

View the full demo in realtime

You can either follow all the steps manually, or watch the recording

Review the docker compose environment

As can be seen from docker-compose.yaml the demo environment consists of the following services:

  • gateway1
  • gateway2
  • kafka-client
  • kafka1
  • kafka2
  • kafka3
  • schema-registry
cat docker-compose.yaml

Starting the docker environment

Start all your docker processes, wait for them to be up and ready, then run in background

  • --wait: Wait for services to be running|healthy. Implies detached mode.
  • --detach: Detached mode: Run containers in the background
docker compose up --detach --wait

Creating topic users on gateway1

Creating on gateway1:

  • Topic users with partitions:1 and replication-factor:1
kafka-topics \
--bootstrap-server localhost:6969 \
--replication-factor 1 \
--partitions 1 \
--create --if-not-exists \
--topic users

Adding interceptor inject-headers

Let's create the interceptor to inject various headers

step-06-inject-headers-interceptor.json:

{
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "inject-headers"
},
"spec" : {
"comment" : "Adding interceptor: inject-headers",
"pluginClass" : "io.conduktor.gateway.interceptor.DynamicHeaderInjectionPlugin",
"priority" : 100,
"config" : {
"headers" : {
"X-MY-KEY" : "my own value",
"X-USER" : "{{user}}",
"X-INTERPOLATED" : "User {{user}} via ip {{userIp}}"
}
}
}
}
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-06-inject-headers-interceptor.json | jq

Send tom and laura into users

Producing 2 messages in users in cluster gateway1

Sending 2 events

{
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : "Chancery lane, London"
}
{
"name" : "laura",
"username" : "laura@conduktor.io",
"password" : "kitesurf",
"visa" : "#888999XZ",
"address" : "Dubai, UAE"
}
echo '{"name":"tom","username":"tom@conduktor.io","password":"motorhead","visa":"#abc123","address":"Chancery lane, London"}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--topic users

echo '{"name":"laura","username":"laura@conduktor.io","password":"kitesurf","visa":"#888999XZ","address":"Dubai, UAE"}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--topic users

Verify tom and laura have the corresponding headers

Verify tom and laura have the corresponding headers in cluster gateway1

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic users \
--from-beginning \
--max-messages 3 \
--timeout-ms 3000 \
--property print.headers=true | jq

returns 2 events

{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-MY-KEY" : "my own value",
"X-USER" : "anonymous"
},
"value" : {
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : "Chancery lane, London"
}
}
{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-MY-KEY" : "my own value",
"X-USER" : "anonymous"
},
"value" : {
"name" : "laura",
"username" : "laura@conduktor.io",
"password" : "kitesurf",
"visa" : "#888999XZ",
"address" : "Dubai, UAE"
}
}

Adding interceptor remove-headers

Let's create the interceptor remove-headers to remove headers that match X-MY-.*

step-09-remove-headers-interceptor.json:

{
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "remove-headers"
},
"spec" : {
"comment" : "Adding interceptor: remove-headers",
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.MessageHeaderRemovalPlugin",
"priority" : 100,
"config" : {
"headerKeyRegex" : "X-MY-.*"
}
}
}
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-09-remove-headers-interceptor.json | jq

Verify tom and laura have the corresponding headers

Verify tom and laura have the corresponding headers in cluster gateway1

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic users \
--from-beginning \
--max-messages 3 \
--timeout-ms 3000 \
--property print.headers=true | jq

returns 2 events

{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-USER" : "anonymous"
},
"value" : {
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : "Chancery lane, London"
}
}
{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-USER" : "anonymous"
},
"value" : {
"name" : "laura",
"username" : "laura@conduktor.io",
"password" : "kitesurf",
"visa" : "#888999XZ",
"address" : "Dubai, UAE"
}
}

Remove interceptor remove-headers

Let's delete the interceptor remove-headers so we can access all our headers again

curl \
--silent \
--request DELETE "http://localhost:8888/gateway/v2/interceptor/remove-headers" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data-raw '{
"vCluster" : "passthrough"
}' | jq

Verify tom and laura have X-MY-KEY back

Verify tom and laura have X-MY-KEY back in cluster gateway1

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic users \
--from-beginning \
--max-messages 3 \
--timeout-ms 3000 \
--property print.headers=true | jq

returns 2 events

{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-MY-KEY" : "my own value",
"X-USER" : "anonymous"
},
"value" : {
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : "Chancery lane, London"
}
}
{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-MY-KEY" : "my own value",
"X-USER" : "anonymous"
},
"value" : {
"name" : "laura",
"username" : "laura@conduktor.io",
"password" : "kitesurf",
"visa" : "#888999XZ",
"address" : "Dubai, UAE"
}
}

Tearing down the docker environment

Remove all your docker processes and associated volumes

  • --volumes: Remove named volumes declared in the "volumes" section of the Compose file and anonymous volumes attached to containers.
docker compose down --volumes

Conclusion

Leveraging headers in Kafka is of tremendous help!