Skip to main content
Quick navigation

Yes, SQL topic work with Avro/ProtoBuff.

View the full demo in realtime

You can either follow all the steps manually, or watch the recording

Review the docker compose environment

As can be seen from docker-compose.yaml the demo environment consists of the following services:

  • gateway1
  • gateway2
  • kafka-client
  • kafka1
  • kafka2
  • kafka3
  • schema-registry
  • zookeeper
cat docker-compose.yaml

Starting the docker environment

Start all your docker processes, wait for them to be up and ready, then run in background

  • --wait: Wait for services to be running|healthy. Implies detached mode.
  • --detach: Detached mode: Run containers in the background
docker compose up --detach --wait

Creating virtual cluster teamA

Creating virtual cluster teamA on gateway gateway1 and reviewing the configuration file to access it

# Generate virtual cluster teamA with service account sa
token=$(curl \
--request POST "http://localhost:8888/admin/vclusters/v1/vcluster/teamA/username/sa" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data-raw '{"lifeTimeSeconds": 7776000}' | jq -r ".token")

# Create access file
echo """
bootstrap.servers=localhost:6969
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='sa' password='$token';
""" > teamA-sa.properties

# Review file
cat teamA-sa.properties

Creating topic cars on teamA

Creating on teamA:

  • Topic cars with partitions:1 and replication-factor:1
kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--replication-factor 1 \
--partitions 1 \
--create --if-not-exists \
--topic cars

Listing topics in teamA

kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--list

Registering schema for cars

{
"type": "record",
"name": "car",
"fields": [
{"name": "type", "type": "string"},
{"name": "price", "type": "long"},
{"name": "color", "type": "string"}
]
}

curl \
--request POST 'http://localhost:8081/subjects/cars-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--silent \
--data "{\"schemaType\": \"AVRO\", \"schema\": $(cat 08-cars-value.avro | jq tostring)}" | jq

Producing 2 messages in cars

Producing 2 messages in cars in cluster teamA

Sending 2 events

{
"type" : "Sports",
"price" : 75,
"color" : "blue"
}
{
"type" : "SUV",
"price" : 55,
"color" : "red"
}

with

echo '{"type":"Sports","price":75,"color":"blue"}' | \
kafka-avro-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--property "value.schema.id=1" \
--property "schema.registry.url=http://localhost:8081" \
--topic cars

echo '{"type":"SUV","price":55,"color":"red"}' | \
kafka-avro-console-producer \
--bootstrap-server localhost:6969 \
--producer.config teamA-sa.properties \
--property "value.schema.id=1" \
--property "schema.registry.url=http://localhost:8081" \
--topic cars

Consuming from cars

Consuming from cars in cluster teamA

kafka-avro-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic cars \
--from-beginning \
--max-messages 2 \
--timeout-ms 15000 \
--property "schema.registry.url=http://localhost:8081"| grep '{' | jq

returns

Processed a total of 2 messages
{
"type": "Sports",
"price": 75,
"color": "blue"
}
{
"type": "SUV",
"price": 55,
"color": "red"
}

Creating topic red-cars on teamA

Creating on teamA:

  • Topic red-cars with partitions:1 and replication-factor:1
kafka-topics \
--bootstrap-server localhost:6969 \
--command-config teamA-sa.properties \
--replication-factor 1 \
--partitions 1 \
--create --if-not-exists \
--topic red-cars

Adding interceptor red-cars

Let's create the interceptor to filter out the red cars from the cars topic.

Creating the interceptor named red-cars of the plugin io.conduktor.gateway.interceptor.VirtualSqlTopicPlugin using the following payload

{
"pluginClass" : "io.conduktor.gateway.interceptor.VirtualSqlTopicPlugin",
"priority" : 100,
"config" : {
"virtualTopic" : "red-cars",
"statement" : "SELECT * FROM cars WHERE color = 'red'",
"schemaRegistryConfig" : {
"host" : "http://schema-registry:8081"
}
}
}

Here's how to send it:

curl \
--request POST "http://localhost:8888/admin/interceptors/v1/vcluster/teamA/interceptor/red-cars" \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent \
--data @step-12-red-cars.json | jq

Listing interceptors for teamA

Listing interceptors on gateway1 for virtual cluster teamA

curl \
--request GET 'http://localhost:8888/admin/interceptors/v1/vcluster/teamA' \
--header 'Content-Type: application/json' \
--user 'admin:conduktor' \
--silent | jq

Consuming from red-cars

Consuming from red-cars in cluster teamA

kafka-avro-console-consumer \
--bootstrap-server localhost:6969 \
--consumer.config teamA-sa.properties \
--topic red-cars \
--from-beginning \
--max-messages 2 \
--timeout-ms 15000 \
--property "schema.registry.url=http://localhost:8081"| grep '{' | jq

returns

Processed a total of 1 messages
{
"type": "SUV",
"price": 55,
"color": "red"
}

Tearing down the docker environment

Remove all your docker processes and associated volumes

  • --volumes: Remove named volumes declared in the "volumes" section of the Compose file and anonymous volumes attached to containers.
docker compose down --volumes

Conclusion

SQL virtual topics can also work on data serialized with Schema Registry!