Skip to main content
Quick navigation

CEL Topic Filtering

Introduction

Conduktor Gateway's CEL topic feature uses CEL (Common Expression Language) expression to filter messages, based on a simple CEL Expression in the form.

Currently

  • Filtered by:
    • Record key (It supports SR):
      • Record key as string: - .. record.key == 'some thing'
      • Record key as schema: .. record.key.someValue.someChildValue == 'some thing'
    • Record value (It supports SR): .. record.value.someValue.someChildValue == 'some thing'
    • Partition: .. record.partition == 1
    • Timestamp: .. record.timestamp == 98717823712
    • Header: .. record.header.someHeaderKey == 'some thing'
    • Offset: .. record.offset == 1

Configuration

keytypedescription
virtualTopicStringWhen accessed, this virtual topic retrieves filtered data from the specified client topic, applying a CEL expression for data filtering.
topicStringSpecifies the client topic from which data is fetched.
expressionStringA CEL expression that returns BOOLEAN to filter data. This determines which data from the topic is relevant based on the given criteria.
schemaRegistryConfigSchema RegistrySchema Registry Config
celCacheSizeintIn memory cache size for cel expressions, balancing speed and resource use, optimize performance.

Schema Registry

keytypedefaultdescription
hostStringUrl of schema registry
cacheSizeString50This interceptor caches schemas locally so that it doesn't have to query the schema registry
additionalConfigsmapAdditional properties maps to specific security related parameters. For enhanced security, you can use the template ${MY_ENV_VAR} in map values, then define their actual values in the environmental config variables of Gateway. (eg: -e MY_ENV_VAR=someValue)

See more about schema registry here

Example

{
"name": "myCelTopicPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.CelTopicPlugin",
"priority": 100,
"config": {
"virtualTopic": "legal_user",
"topic": "users",
"expression": "record.value.age > 18",
"schemaRegistryConfig": {
"host": "http://schema-registry:8081"
},
"celCacheSize": 100
}
}

Schema Registry with secured template

{
"name": "myCelTopicPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.CelTopicPlugin",
"priority": 100,
"config": {
"virtualTopic": "legal_user",
"topic": "users",
"statement": "record.value.age > 18",
"schemaRegistryConfig": {
"host": "http://schema-registry:8081",
"additionalConfigs": {
"schema.registry.url": "${SR_URL}",
"basic.auth.credentials.source": "${SR_BASIC_AUTH_CRED_SRC}",
"basic.auth.user.info": "${SR_BASIC_AUTH_USER_INFO}"
}
},
"celCacheSize": 100
}
}