Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.conduktor.io/llms.txt

Use this file to discover all available pages before exploring further.

Conduktor logical topics are abstractions of real Kafka topics that provide additional functionality that’s not available by default. We offer the following types of logical topics:
  1. Alias topics are topics that can be accessed with a name (alias), but point to another, real topic behind the scenes. Alias topics can be very useful when you want to share topics but have sensitive naming conventions; or in scenarios where underlying topics might be frequently renamed.
  2. Concentrated topics transparently co-locate multiple topics in the same physical topic behind the scenes, acting as pointers to reduce costs on low-volume topics with large partition counts. They are completely transparent to consumers and producers and allow you to emulate different partition counts irrespective of the backing physical topic’s partition count.
  3. Topic views (tech preview) filter records from physical topics using SQL WHERE clauses. This is the recommended approach for filtering JSON data.
  4. SQL topics DEPRECATED use SQL language to query and filter an existing topic.
  5. CEL topics DEPRECATED another way to filter an existing topic using CEL expressions instead of SQL.

Alias topics

Alias topics act as pointers that target a specific physical topic. One of Kafka’s limitations is that you can’t rename topics - an issue that is solved with alias topics. You can have a number of alias topics pointing to the same physical topic.

Use

manages an alias topic mapping in it’s internal configuration by registering a target physical topic. This topic will be presented to Kafka clients like a regular topic. However, all requests for this topic will be forwarded to the physical topic. This means that consumer groups, fetch and produce are shared. Also, the alias topic does not replace the original one. For example, if you create an alias topic applicationB_orders that’s pointing to a physical topic orders, a client that can access the physical topic would be able to see both topics.

Limitations

  • using delegated Kafka security SASL delegated security protocols aren’t supported.
  • Alias topics can’t reference another alias topic.

Concentrated topics

Occasionally, topics have to be created for logical, rather than technical reasons (e.g. to differentiate between business units) which can result in considerable overuse of Kafka resources. Conduktor’s topic concentration allows data from a set of topics to be represented on a single underlying topic. Clients connecting through Conduktor Gateway can use concentrated topics as usual without any additional configuration. For example, let’s say we have the following topics:
  • us_east_orders - 100 partitions
  • us_west_orders - 100 partitions
  • emea_orders - 100 partitions
  • latam_orders - 100 partitions
The total Kafka resource requirement is 400 partitions. With topic concentration, all of these topics can be concentrated to a single topic, using only 1/4 of resources:
  • concentrated_orders - 100 partitions

Configuration

Concentrated topics can be configured in two ways:
  • ConcentrationRule (recommended): YAML configuration with pattern matching and advanced features
  • REST API: direct HTTP API calls for manual topic mapping
To create concentrated topics, first deploy ConcentrationRule:
---
kind: ConcentrationRule
metadata:
  name: concentration1
spec:
  pattern: concentrated.*
  physicalTopics:
    delete: physical.topic
Then topics that match the ConcentrationRule spec.pattern:
kafka-topics \
  --bootstrap-server conduktor-gateway:6969 \
  --topic concentrated.topicA \
  --partitions 3

kafka-topics \
  --bootstrap-server conduktor-gateway:6969 \
  --topic concentrated.topicB \
  --partitions 4
Topic ConcentrationWe now have two concentrated topics (concentrated.topicA and concentrated.topicB) with partition counts of 3 and 4 respectively, mapped to a single physical topic (physical.topic) with three partitions.To ensure that consumers don’t consume messages from other partitions or from other concentrated topics, we store the concentrated partition and the concentrated topic name in the record headers. Gateway will automatically filter the messages that should be returned to the consumer.

Limitations

Consumer offsets

When consuming from a concentrated topic, messages and ordering is always preserved but any metadata calculations (primarily lag and message count) are unlikely to be as expected. This is because the associated metadata is from the backing Kafka topic, rather than the concentrated topic seen from the perspective of the consumer. This is a known limitation.

Compact and delete+compact topics

You can create concentrated topics with any cleanup.policy, but your ConcentrationRule has to have a backing topic for each of them, otherwise it won’t let you create the topic.
---
kind: ConcentrationRule
metadata:
  name: concentration1
spec:
  pattern: concentrated.*
  physicalTopics:
    delete: physical.topic-delete
    compact: physical.topic-compact
    # deleteCompact: physical.topic-deletecompact
In this example, since the config for spec.deleteCompact is commented out, trying to create this topic will fail:
kafka-topics --create 
    --bootstrap-server conduktor-gateway:6969 \
    --topic <your-topic-name> \
    --partitions 3 \
    --config cleanup.policy=compact,delete
Error while executing topic command : Cleanup Policy is invalid
Backing topic cleanup policies are checked when you deploy a new ConcentrationRule. This prevents you from declaring a backing topic with a cleanup.policy of delete on the ConcentrationRule spec.physicalTopic.compact field.

Restricted topic configurations

The following list of topic properties are the only allowed properties for concentrated topics:
  • partitions
  • cleanup.policy
  • retention.ms
  • retention.bytes
  • delete.retention.ms
If any other configuration than the above is set, the topic creation will fail with an error. retention.ms and retention.bytes can be set to values lower or equal to the backing topic. If a user tries to create a topic with a higher value, topic creation will fail with an error:
kafka-topics --create 
    --bootstrap-server conduktor-gateway:6969 \
    --topic <your-topic-name> \
    --partitions 3 \
    --config retention.ms=704800000
Error while executing topic command : Value '704800000' for configuration 'retention.ms' is incompatible with physical topic value '604800000'.
This behavior can be altered with the flag spec.autoManaged.
With concentrated topics, the enforced retention policy is the physical topic’s retention policy, and not the policy requested at the concentrated topic creation time. The retention.ms and retention.bytes are not cleanup but retention guarantees.

Auto-managed backing topics

When autoManaged is enabled:
  • backing topics are automatically created with the default cluster configuration and partition count.
  • concentrated topics created with higher retention.ms and retention.bytes are allowed. This automatically extends the configuration of the backing topic.
---
kind: ConcentrationRule
metadata:
  name: concentration1
spec:
  pattern: concentrated.*
  physicalTopics:
    delete: physical.topic
  autoManaged: true
Let’s check the backing topic retention on the physical cluster:
kafka-configs --bootstrap-server kafka:9092 \
    --entity-type topics --entity-name physical.topic \
    --describe
Configs for topic 'physical.topic' are:
  cleanup.policy=delete
  retention.ms=604800000
  retention.bytes=-1
Let’s try to create a concentrated topic with a higher retention on Gateway:
kafka-topics --create 
    --bootstrap-server conduktor-gateway:6969 \
    --topic <your-topic-name> \
    --partitions 3 \
    --config retention.ms=704800000
Let’s review the backing topic again:
kafka-configs --bootstrap-server kafka:9092 \
    --entity-type topics --entity-name physical.topic \
    --describe
Configs for topic 'physical.topic' are:
  cleanup.policy=delete
  retention.ms=704800000
  retention.bytes=-1
As we can see, the retention has been updated.
If one user requests a topic with infinite retention (retention.ms = -1), all the topics with the same cleanup policy associated with the rule will also inherit this extended configuration and have infinite retention.

Message count, lag and offset (in)correctness

By default, concentrated topic reports the offsets of their backing topics. This impacts the calculations of Lag and Message Count that relies on partition EndOffset and group CommitedOffset. Offset Incorrectness Any tooling will currently display the message count, and the lag relative to the EndOffset of the physical topic. This can create confusion for customers and applications that will see incorrect metrics. To counter this, we’ve implemented a dedicated offset management capability for ConcentrationRules. Enable virtual offsets by adding the following line to the ConcentrationRule:
---
kind: ConcentrationRule
metadata:
  name: concentration1
spec:
  pattern: concentrated.*
  physicalTopics:
    delete: physical.topic
  offsetCorrectness: true
  • spec.offsetCorrectness only applies to concentrated topics with the cleanup.policy=delete
  • spec.offsetCorrectness is not retroactive on previously created concentrated topics

Known issues with offset correctness

There are three known issues with the offset correctness in concentrated topics: 1. Performance On startup, Gateway has to read the concentrated topic entirely before it’s available to consumers. The end-to-end latency is increased by up to 500 ms (or fetch.max.wait.ms, if non-default). 2. Memory Gateway consumes about ~250MB of heap memory per million records it’s read in concentrated topics. This value is not bound, so we don’t recommend offset correctness on high-volume topics, and recommend to size your JVM accordingly. 3. Unsupported Kafka API
  • DeleteRecords is not supported
  • Transactions are not supported
  • Only IsolationLevel.READ_UNCOMMITTED is supported (using IsolationLevel.READ_COMMITTED is undefined behavior)
  • Partition truncation (upon unclean.leader.election=true) may not be detected by consumers
Very slow consumer group edge case
Do not enable offset correctness when your topic has extended periods of inactivity.
When using topic concentration with offsetCorrectness enabled, there’s currently a limitation for consumer groups where the data in the topics is slow moving, and/or the consumer groups are not committing their offsets frequently. If a consumer group with a committed offset waits for the backing physical topic longer than the retention time (without committing a new offset), there’s a possibility for that consumer group to become blocked. In this scenario, a consumer group whose last committed offset has been removed from the topic, the group becomes blocked only if Gateway restarted before the next offset commit. If this limitation happens, the offsets for the affected consumer group will need to be manually reset for it to continue.

Topic views (tech preview)

Topic views are a tech preview feature. Enable with GATEWAY_FEATURE_FLAGS_TOPIC_VIEW=true.
Topic views will replace SQL topics and CEL topics once feature gaps are closed and they exit tech preview.
Topic views provide non-materialized views of physical Kafka topics, similar to database views. They apply transformations to records as they’re consumed, without modifying the underlying data. Currently, topic views support SQL-based filtering and projection on schema-less JSON, with support for additional transformations (eg CEL) and data types (eg schema JSON/AVRO) planned. Topic views are configured as a dedicated Gateway resource, similar to alias topics.

How it works

When a consumer fetches from a topic view, Gateway:
  1. Reads records from the underlying physical topic
  2. Applies the configured transformation to filter or modify each record
  3. Returns the resulting records to the consumer
Consumer offsets are committed against the physical topic because the topic view doesn’t exist in Kafka’s __consumer_offsets. This means you may see gaps in offset numbers when records are filtered out. The committed offset is always the offset of the last record in the batch, regardless of whether that record was filtered out.

Configuration

Configure topic views using the Gateway REST API or Conduktor CLI.
curl -X PUT "http://conduktor-gateway:8888/gateway/v2/topic-view" \
  -H "Content-Type: application/json" \
  -u "admin:conduktor" \
  -d '{
    "kind": "TopicView",
    "apiVersion": "gateway/v2",
    "metadata": {
      "name": "uk-customers",
      "vCluster": "passthrough"
    },
    "spec": {
      "transformation": {
        "type": "sql",
        "statement": "SELECT * FROM customers WHERE country='\''UK'\''"
      },
      "onError": {
        "type": "DROP"
      }
    }
  }'

SQL transformation

When using transformation.type: sql, topic views filter and transform records using SQL syntax. The transformation uses a subset of SQL that will be extended in future releases.
Topic names with dash - characters have to be double quoted, as the dash is not a valid character for a SQL name. For example, if you have a topic our-orders, use SELECT * FROM "our-orders" WHERE ...

Projection

Use the SELECT clause to reshape records by renaming fields, selecting a subset of fields, or computing new values. Rename fields:
SELECT name AS employee_name, salary FROM employees
Select a subset of fields (other fields are dropped):
SELECT name, department FROM employees
Compute values with CASE expressions:
SELECT name, CASE WHEN salary > 2000 THEN 'high' ELSE 'low' END AS salary_band FROM employees

Supported predicates

  • =, >, >=, <, <=, <> (standard comparison operators)
  • REGEXP (regular expression matching)
  • AND operator for combining multiple conditions (OR is not yet supported)

Regular expression matching

Use REGEXP for pattern matching:
-- Match email addresses
SELECT * FROM users WHERE email REGEXP '^[a-z]+@[a-z]+\.com$'

-- Match records where name starts with 'Bo'
SELECT * FROM customers WHERE name REGEXP 'Bo.*'

-- Match addresses containing 'Engineering'
SELECT * FROM employees WHERE education.major REGEXP '.*Engineering'

Nested field access

Access nested JSON fields using dot notation:
-- Simple nested field
SELECT * FROM orders WHERE address.city = 'London'

-- Deeply nested field
SELECT * FROM users WHERE profile.settings.theme = 'dark'

-- Numeric comparison on nested field
SELECT * FROM metrics WHERE metadata.stats.count > 100

Record key, headers, and Kafka metadata in WHERE

In addition to record value fields, Topic View SQL can filter on record keys, headers, and Kafka metadata:
  • Value (JSONPath): use the $ prefix for the payload, for example $.name or $.profile.settings.theme
  • Key: record.key compares the raw key as a string; use record.key.<path> when the key is JSON, for example record.key.user.name
  • Headers: record.header.<headerName> compares the header value as a UTF-8 string. Header names that contain . are not supported in this syntax
  • Partition, offset, timestamp: record.partition, record.offset, record.timestamp
SELECT * FROM orders WHERE record.header.tenant = 'prod'
SELECT * FROM users WHERE record.key = 'account-42'
SELECT * FROM events WHERE $.severity = 'ERROR'

Array index access

Access array elements using bracket notation with backticks:
-- First element (index 0)
SELECT * FROM orders WHERE `items[0].name` = 'apple'

-- Last element (negative index)
SELECT * FROM orders WHERE `items[-1].name` = 'banana'

-- Numeric array access
SELECT * FROM metrics WHERE `values[1]` > 50

-- Nested array field
SELECT * FROM events WHERE `data.records[0].status` = 'active'
Backticks are required around field names with array index notation.

Unsupported syntax

Topic views support only the SQL features documented above. The following are examples of popular features that are not yet supported: Operators:
  • OR operator (planned)
  • IN operator
  • LIKE with wildcards
  • BETWEEN
  • IS NULL / IS NOT NULL
  • String concatenation (||)
  • Numerical operators (+, -, *, / etc)
Functions:
  • String functions (UPPER, LOWER, CONCAT)
  • COALESCE
Query features:
  • JOIN
  • Subqueries
JSON patterns:
  • Array wildcards: items[*].name
  • JSONPath filter expressions: items[?(@.price > 10)]
If any of these SQL features would be useful for your use case, let your Conduktor support contact know.

Current limitations

All of these limitations will be addressed before the feature exits tech preview.
Consumer group conflicts can cause data loss. Don’t use the same consumer group for multiple topics that share a backing topic (including alias topics, topic views, and their physical topics). Offset commits from one will overwrite the other, causing consumers to skip messages. A validation check is planned to prevent this.
  • Data format: Record values are interpreted as schema-less JSON. Schema Registry support for Avro and Protobuf values is planned.
  • Main cluster only: Topic views are always created on the main cluster.
  • Transformation errors: The onError field is declared and persisted on every topic view, but Gateway doesn’t yet act on it at fetch time — failing records are always dropped silently regardless of whether onError.type is DROP or FAIL_FETCH. See Error handling.
  • Logical topic collisions: Gateway does not fully enforce logical topic type boundaries. Different logical topic types (such as topic views and concentrated topics) can override each other. Stricter validation is planned.
  • Read-only not enforced: Producing to a topic view is not blocked yet. Avoid producing to topic views.
  • Topic names in SQL: The physical topic name in the FROM clause must be the full physical name in the backing cluster. A topic view that targets a topic in the same Virtual Cluster needs to specify the Virtual Cluster prefix in the table name. We will address this inconsistency.
  • SQL validation: Unsupported SQL syntax (such as array wildcards or filter expressions) is not rejected when creating a topic view. Records may be incorrectly filtered until validation is added.
  • Record metadata in projections: You cannot include record metadata (such as offset or partition) in the SELECT clause. For example, SELECT type, record.offset AS offset FROM topic is not supported.

Planned transformation types

SQL is the only supported transformation type in this tech preview. We plan to add:

Error handling

The spec.onError field declares what Gateway should do when a record can’t be transformed (for example, the SQL doesn’t match the record structure). The field is required on every topic view and takes one of two values:
  • DROP — drop the failing record from the response and continue.
  • FAIL_FETCH — fail the whole fetch.
spec:
  transformation:
    type: sql
    statement: "SELECT * FROM customers WHERE country='UK'"
  onError:
    type: DROP
In this tech preview, Gateway accepts and persists onError but doesn’t yet act on it at fetch time — both values currently behave like DROP. FAIL_FETCH will be implemented in 3.20.0.

SQL topics

SQL topics are deprecated as of Gateway v3.19.0 and will be removed in Gateway v3.22.0. This feature is reimplemented in topic views, currently in tech preview.
Conduktor Gateway’s SQL topic feature uses a SQL-like language to filter and project messages, based on a simple SQL statement:
SELECT
    type,
    price as amount,
    color,
    CASE
        WHEN color = 'red' AND price > 1000 THEN 'Exceptional'
        WHEN price > 8000 THEN 'Luxury'
        ELSE 'Regular'
    END as quality,
    record.offset as record_offset,
    record.partition as record_partition
  FROM cars
This supports FetchResponse only (i.e., resulting topic is read-only): SELECT [list of fields] FROM [topic name] WHERE [field filter criteria]
Topic names with dash - characters have to be double quoted, as the dash is not a valid character for a SQL name. For example, if you have a topic our-orders, use SELECT * FROM "our-orders" WHERE ...
Other limitations:
  • With filter records based on more than one condition, only AND operator is supported
  • Supported predicates: =, >, >=, <, <=, <> and REGEXP (RegExp MySQL Operator)
  • Case expression is supported
  • Filtered by:
    • Record key (It supports SR):
      • Record key as string: - .. WHERE record.key = 'some thing'
      • Record key as schema: .. WHERE record.key.someValue.someChildValue = 'some thing'
    • Record value (It supports SR): .. WHERE $.someValue.someChildValue = 'some thing'
    • Partition: .. WHERE record.partition = 1
    • Timestamp: .. WHERE record.timestamp = 98717823712
    • Header: .. WHERE record.header.someHeaderKey = 'some thing'
    • Offset: .. WHERE record.offset = 1

Schemas and projections

If your data uses a schema, then it’s not possible to make use of the projection feature here because the resulting data will no longer match the original schema. For plain JSON topics, you can use the SELECT clause to alter the shape of the data returned; however, for schema’d data (Avro and Protobuf) you must not use a projection, i.e. the select should be in the form: SELECT * FROM ... Filtering with the where clause is still supported.

Configuration

KeyTypeDescription
virtualTopicStringif virtualTopic exists, fetch this topic will get the data from the statement without configure it’s own statement.
statementStringSQL Statement
schemaRegistryConfigSchema registrySchema registry configuration

Example

{
  "name": "mySqlTopicPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.VirtualSqlTopicPlugin",
  "priority": 100,
  "config": {
    "virtualTopic": "legal_user",
    "statement": "SELECT * FROM users WHERE age > 18",
    "schemaRegistryConfig": {
       "host": "http://schema-registry:8081"
    }
  }
}

Schema registry with secured template

{
  "name": "mySqlTopicPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.VirtualSqlTopicPlugin",
  "priority": 100,
  "config": {
    "virtualTopic": "legal_user",
    "statement": "SELECT * FROM users WHERE age > 18",
    "schemaRegistryConfig": {
      "host": "http://schema-registry:8081",
      "additionalConfigs": {
        "basic.auth.credentials.source": "${SR_BASIC_AUTH_CRED_SRC}",
        "basic.auth.user.info": "${SR_BASIC_AUTH_USER_INFO}"
      }
    }
  }
}

Filter topics with CEL

CEL topics are deprecated as of Gateway v3.17.0 and will be removed in Gateway v3.20.0. This feature will be reimplemented in topic views, currently in tech preview.
Conduktor Gateway’s CEL topic feature uses CEL (Common Expression Language) expression to filter messages, based on a simple CEL expression in the form. Currently
  • Filtered by:
    • Record key (It supports SR):
      • Record key as string: - .. record.key == 'some thing'
      • Record key as schema: .. record.key.someValue.someChildValue == 'some thing'
    • Record value (It supports SR): .. record.value.someValue.someChildValue == 'some thing'
    • Partition: .. record.partition == 1
    • Timestamp: .. record.timestamp == 98717823712
    • Header: .. record.header.someHeaderKey == 'some thing'
    • Offset: .. record.offset == 1

Configuration

KeyTypeDescription
virtualTopicStringWhen accessed, this virtual topic retrieves filtered data from the specified client topic, applying a CEL expression for data filtering.
topicStringSpecifies the client topic from which data is fetched.
expressionStringA CEL expression that returns BOOLEAN to filter data. This determines which data from the topic is relevant based on the given criteria.
schemaRegistryConfigSchema registry configurationSchema registry configuration
celCacheSizeintIn memory cache size for CEL expressions, balancing speed and resource use, optimize performance.

Example

{
  "name": "myCelTopicPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.CelTopicPlugin",
  "priority": 100,
  "config": {
    "virtualTopic": "legal_user",
    "topic": "users",
    "expression": "record.value.age > 18",
    "schemaRegistryConfig": {
      "host": "http://schema-registry:8081"
    },
    "celCacheSize": 100
  }
}

Schema registry with secured template

{
  "name": "myCelTopicPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.CelTopicPlugin",
  "priority": 100,
  "config": {
    "virtualTopic": "legal_user",
    "topic": "users",
    "expression": "record.value.age > 18",
    "schemaRegistryConfig": {
      "host": "http://schema-registry:8081",
      "additionalConfigs": {
        "basic.auth.credentials.source": "${SR_BASIC_AUTH_CRED_SRC}",
        "basic.auth.user.info": "${SR_BASIC_AUTH_USER_INFO}"
      }
    },
    "celCacheSize": 100
  }
}

Schema registry configuration

KeyTypeDefaultDescription
typestringCONFLUENTThe type of schema registry to use: choose CONFLUENT (for Confluent-like schema registries including OSS Kafka) or AWS for AWS Glue schema registries.
additionalConfigsmapAdditional properties maps to specific security-related parameters. For enhanced security, you can hide the sensitive values using environment variables as secrets.​
Confluent likeConfiguration for Confluent-like schema registries
hoststringURL of your schema registry.
cacheSizestring50Number of schemas that can be cached locally by this Interceptor so that it doesn’t have to query the schema registry every time.
AWS GlueConfiguration for AWS Glue schema registries
regionstringThe AWS region for the schema registry, e.g. us-east-1
registryNamestringThe name of the schema registry in AWS (leave blank for the AWS default of default-registry)
basicCredentialsstringAccess credentials for AWS (see below section for structure)
AWS credentialsAWS credentials configuration
accessKeystringThe access key for the connection to the schema registry.
secretKeystringThe secret key for the connection to the schema registry.
validateCredentialsbooltruetrue / false flag to determine whether the credentials provided should be validated when set.
accountIdstringThe Id for the AWS account to use.
If you don’t supply a basicCredentials section for the AWS Glue schema registry, the client used to connect will attempt to find the connection information from the environment. The required credentials can be passed to Gateway in this way as part of core configuration. Find out more about credentials from AWS documentation . Read our blog about schema registry .