Skip to main content
Quick navigation

CEL Topic Filtering

Introduction

Conduktor Gateway's CEL topic feature uses CEL (Common Expression Language) expression to filter messages, based on a simple CEL Expression in the form.

Currently

  • Filtered by:
    • Record key (It supports SR):
      • Record key as string: - .. record.key == 'some thing'
      • Record key as schema: .. record.key.someValue.someChildValue == 'some thing'
    • Record value (It supports SR): .. record.value.someValue.someChildValue == 'some thing'
    • Partition: .. record.partition == 1
    • Timestamp: .. record.timestamp == 98717823712
    • Header: .. record.header.someHeaderKey == 'some thing'
    • Offset: .. record.offset == 1

Configuration

keytypedescription
virtualTopicStringWhen accessed, this virtual topic retrieves filtered data from the specified client topic, applying a CEL expression for data filtering.
topicStringSpecifies the client topic from which data is fetched.
expressionStringA CEL expression that returns BOOLEAN to filter data. This determines which data from the topic is relevant based on the given criteria.
schemaRegistryConfigSchema RegistrySchema Registry Config
celCacheSizeintIn memory cache size for cel expressions, balancing speed and resource use, optimize performance.

Schema Registry

KeyTypeDefaultDescription
typestringCONFLUENTThe type of schema registry to use: choose CONFLUENT (for Confluent-like schema registries including OSS Kafka) or AWS for AWS Glue schema registries.
additionalConfigsmapAdditional properties maps to specific security-related parameters. For enhanced security, you can hide the sensitive values using environment variables as secrets.​
Confluent LikeConfiguration for Confluent-like schema registries
hoststringURL of your schema registry.
cacheSizestring50Number of schemas that can be cached locally by this interceptor so that it doesn't have to query the schema registry every time.
AWS GlueConfiguration for AWS Glue schema registries
regionstringThe AWS region for the schema registry, e.g. us-east-1
registryNamestringThe name of the schema registry in AWS (leave blank for the AWS default of default-registry)
basicCredentialsstringAccess credentials for AWS (see below section for structure)
AWS CredentialsAWS Credentials Configuration
accessKeystringThe access key for the connection to the schema registry.
secretKeystringThe secret key for the connection to the schema registry.
validateCredentialsbooltruetrue / false flag to determine whether the credentials provided should be validated when set.
accountIdstringThe Id for the AWS account to use.

If you don't supply a basicCredentials section for the AWS Glue schema registry, the client used to connect will attempt to find the connection information from the environment. The required credentials can be passed to Gateway in this way as part of core configuration.

Find out more about the setup for this from AWS documentation.

See more about schema registry here

Example

{
"name": "myCelTopicPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.CelTopicPlugin",
"priority": 100,
"config": {
"virtualTopic": "legal_user",
"topic": "users",
"expression": "record.value.age > 18",
"schemaRegistryConfig": {
"host": "http://schema-registry:8081"
},
"celCacheSize": 100
}
}

Schema Registry with secured template

{
"name": "myCelTopicPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.CelTopicPlugin",
"priority": 100,
"config": {
"virtualTopic": "legal_user",
"topic": "users",
"statement": "record.value.age > 18",
"schemaRegistryConfig": {
"host": "http://schema-registry:8081",
"additionalConfigs": {
"schema.registry.url": "${SR_URL}",
"basic.auth.credentials.source": "${SR_BASIC_AUTH_CRED_SRC}",
"basic.auth.user.info": "${SR_BASIC_AUTH_USER_INFO}"
}
},
"celCacheSize": 100
}
}