Skip to main content
Quick navigation

Field level encryption with Schema Registry

Yes, it work with Avro, Json Schema with nested fields

View the full demo in realtime

You can either follow all the steps manually, or watch the recording

Review the docker compose environment

As can be seen from docker-compose.yaml the demo environment consists of the following services:

  • gateway1
  • gateway2
  • kafka-client
  • kafka1
  • kafka2
  • kafka3
  • schema-registry
  • zookeeper
cat docker-compose.yaml

Starting the docker environment

Start all your docker processes, wait for them to be up and ready, then run in background

  • --wait: Wait for services to be running|healthy. Implies detached mode.
  • --detach: Detached mode: Run containers in the background
docker compose up --detach --wait

Let's asserts number of registered schemas

echo nb schemas = $(curl --silent http://localhost:8081/subjects/ | jq 'length')

Creating virtual cluster teamA

Creating virtual cluster teamA on gateway gateway1 and reviewing the configuration file to access it

# Generate virtual cluster teamA with service account sa
token=$(curl \
--request POST "http://localhost:8888/admin/vclusters/v1/vcluster/teamA/username/sa" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data-raw '{"lifeTimeSeconds": 7776000}' | jq -r ".token")

# Create access file
echo """
bootstrap.servers=localhost:6969
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='sa' password='$token';
""" > teamA-sa.properties

# Review file
cat teamA-sa.properties

Creating topic customers on teamA

Creating on teamA:

  • Topic customers with partitions:1 and replication-factor:1
kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--replication-factor 1 \
--partitions 1 \
--create --if-not-exists \
--topic customers

Adding interceptor encrypt

We want to encrypt two fields at the root layer, and location in the address object.

Here we are using an in memory KMS.

Creating the interceptor named encrypt of the plugin io.conduktor.gateway.interceptor.EncryptSchemaBasedPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.EncryptSchemaBasedPlugin",
"priority" : 100,
"config" : {
"schemaRegistryConfig" : {
"host" : "http://schema-registry:8081"
},
"defaultKeySecretId" : "myDefaultKeySecret",
"defaultAlgorithm" : {
"type" : "TINK/AES128_EAX",
"kms" : "IN_MEMORY"
},
"tags" : [ "PII", "ENCRYPTION", "GDPR" ]
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/encrypt" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-08-encrypt.json | jq

Listing interceptors for teamA

Listing interceptors on gateway1 for virtual cluster teamA

curl \
--request GET 'http://localhost:8888/admin/interceptors/v1/vcluster/teamA' \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent | jq

Registering schema for customers

{
"$id": "https://example.com/person.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Customer",
"type": "object",
"properties": {
"name": { "type": "string" },
"username": { "type": "string" },
"password": { "type": "string" },
"visa": { "type": "string" },
"address": {
"type": "object",
"properties": {
"location": { "type": "string", "conduktor.tags": ["PII", "GDPR"] },
"town": { "type": "string" },
"country": { "type": "string" }
}
}
}
}

curl \
--request POST 'http://localhost:8081/subjects/customers-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--silent \
--data "{\"schemaType\": \"JSON\", \"schema\": $(cat 10-customers-value.json | jq tostring)}" | jq

Let's send unencrypted avro message

Producing 1 message in customers in cluster teamA

Sending 1 event

{
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : {
"location" : "12 Chancery lane",
"town" : "London",
"country" : "UK"
}
}

with

echo '{"name":"tom","username":"tom@conduktor.io","password":"motorhead","visa":"#abc123","address":{"location":"12 Chancery lane","town":"London","country":"UK"}}' | \
kafka-json-schema-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--property "value.schema.id=1" \
--property "schema.registry.url=http://localhost:8081" \
--topic customers

Registering schema for customers

{
"$id": "https://example.com/person.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Customer",
"type": "object",
"properties": {
"name": { "type": "string" },
"username": { "type": "string" },
"password": { "type": "string", "conduktor.keySecretId": "password-secret"},
"visa": { "type": "string", "conduktor.keySecretId": "visa-secret" },
"address": {
"type": "object",
"properties": {
"location": { "type": "string", "conduktor.tags": ["PII", "GDPR"] },
"town": { "type": "string" },
"country": { "type": "string" }
}
}
}
}

curl \
--request POST 'http://localhost:8081/subjects/customers-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--silent \
--data "{\"schemaType\": \"JSON\", \"schema\": $(cat 12-customers-value.json | jq tostring)}" | jq

Schema diff

14c14,15
< "type": "string"
---
> "type": "string",
> "conduktor.keySecretId": "password-secret"
17c18,19
< "type": "string"
---
> "type": "string",
> "conduktor.keySecretId": "visa-secret"

Let's make sure they are encrypted

password and visa and the nested field address.location are encrypted

kafka-json-schema-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic customers \
--from-beginning \
--timeout-ms 10000 \
--property "schema.registry.url=http://localhost:8081"| grep '{' | jq

returns

Processed a total of 1 messages
{
"name": "tom",
"username": "tom@conduktor.io",
"password": "motorhead",
"visa": "#abc123",
"address": {
"location": "12 Chancery lane",
"town": "London",
"country": "UK"
}
}

Let's send unencrypted avro message

Producing 2 messages in customers in cluster teamA

Sending 2 events

{
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : {
"location" : "12 Chancery lane",
"town" : "London",
"country" : "UK"
}
}
{
"name" : "laura",
"username" : "laura@conduktor.io",
"password" : "kitesurf",
"visa" : "#888999XZ;",
"address" : {
"location" : "4th Street, Jumeirah",
"town" : "Dubai",
"country" : "UAE"
}
}

with

echo '{"name":"tom","username":"tom@conduktor.io","password":"motorhead","visa":"#abc123","address":{"location":"12 Chancery lane","town":"London","country":"UK"}}' | \
kafka-json-schema-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--property "value.schema.id=1" \
--property "schema.registry.url=http://localhost:8081" \
--topic customers

echo '{"name":"laura","username":"laura@conduktor.io","password":"kitesurf","visa":"#888999XZ;","address":{"location":"4th Street, Jumeirah","town":"Dubai", "country":"UAE"}}' | \
kafka-json-schema-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--property "value.schema.id=1" \
--property "schema.registry.url=http://localhost:8081" \
--topic customers

laura's password and visa are also encrypted

laura's password and visa are also encrypted in cluster teamA

kafka-json-schema-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic customers \
--from-beginning \
--timeout-ms 10000 \
--property "schema.registry.url=http://localhost:8081"| grep '{' | jq

returns

Processed a total of 3 messages
{
"name": "tom",
"username": "tom@conduktor.io",
"password": "motorhead",
"visa": "#abc123",
"address": {
"location": "12 Chancery lane",
"town": "London",
"country": "UK"
}
}
{
"name": "tom",
"username": "tom@conduktor.io",
"password": "motorhead",
"visa": "#abc123",
"address": {
"location": "12 Chancery lane",
"town": "London",
"country": "UK"
}
}
{
"name": "laura",
"username": "laura@conduktor.io",
"password": "kitesurf",
"visa": "#888999XZ;",
"address": {
"location": "4th Street, Jumeirah",
"town": "Dubai",
"country": "UAE"
}
}

Adding interceptor decrypt

Let's add the decrypt interceptor to decipher messages

Creating the interceptor named decrypt of the plugin io.conduktor.gateway.interceptor.DecryptPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.DecryptPlugin",
"priority" : 100,
"config" : {
"topic" : "customers",
"schemaRegistryConfig" : {
"host" : "http://schema-registry:8081"
}
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/decrypt" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-16-decrypt.json | jq

Listing interceptors for teamA

Listing interceptors on gateway1 for virtual cluster teamA

curl \
--request GET 'http://localhost:8888/admin/interceptors/v1/vcluster/teamA' \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent | jq

Let's make sure they are decrypted

password and visa and the nested field address.location are decrypted

kafka-json-schema-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic customers \
--from-beginning \
--timeout-ms 10000 \
--property "schema.registry.url=http://localhost:8081"| grep '{' | jq

returns

Processed a total of 3 messages
{
"name": "tom",
"username": "tom@conduktor.io",
"password": "motorhead",
"visa": "#abc123",
"address": {
"location": "12 Chancery lane",
"town": "London",
"country": "UK"
}
}
{
"name": "tom",
"username": "tom@conduktor.io",
"password": "motorhead",
"visa": "#abc123",
"address": {
"location": "12 Chancery lane",
"town": "London",
"country": "UK"
}
}
{
"name": "laura",
"username": "laura@conduktor.io",
"password": "kitesurf",
"visa": "#888999XZ;",
"address": {
"location": "4th Street, Jumeirah",
"town": "Dubai",
"country": "UAE"
}
}

Tearing down the docker environment

Remove all your docker processes and associated volumes

  • --volumes: Remove named volumes declared in the "volumes" section of the Compose file and anonymous volumes attached to containers.
docker compose down --volumes

Conclusion

Yes, encryption in the Kafka world can be simple!