Configure SQL
Configure Conduktor SQL
This feature is in Beta and is subject to change as we enhance it further.
It's currently only available to Console Admins, and will be made available for all users when integrated with our security model (i.e. RBAC, Data Masking).
Index data from Kafka topics in a database to enable users to query data from the UI, API or CLI using SQL.
Note this feature enables you to troubleshoot, sample, analyze, aggregate and join data through:
- Querying Kafka message data
- Querying Kafka metadata (such as the offset, partition and timestamp)
We encourage you to use this feature in non-production environments and give us feedback.
Console Configuration
For a fully integrated Docker Compose, run our get started stack to try SQL. The below guide details how to add this feature to your existing deployment.
Database Configuration
By default, the SQL feature is disabled. You will need to add additional configuration about the database for storing the data.
You should provision a second database for storing SQL data that is different from the existing one used by Console's backend. This ensures separation of concerns and continued operation of the core Console experience if the SQL database becomes unavailable.
See database requirements and about database isolation for more guidance.
Configure the second database through environment variables:
CDK_KAFKASQL_DATABASE_URL
: database connection url in the format[jdbc:]postgresql://[user[:password]@]netloc[:port][/dbname][?param1=value1&...]
Alternatively, set each value explicitly:
CDK_KAFKASQL_DATABASE_HOST
: Postgresql server host nameCDK_KAFKASQL_DATABASE_PORT
: Postgresql server portCDK_KAFKASQL_DATABASE_NAME
: Database nameCDK_KAFKASQL_DATABASE_USERNAME
: Database login roleCDK_KAFKASQL_DATABASE_PASSWORD
: Database login passwordCDK_KAFKASQL_DATABASE_CONNECTIONTIMEOUT
: Connection timeout option in seconds
Additional Configuration
Note that additional configuration can be made in relation to the indexing process:
CDK_KAFKASQL_CONSUMER-GROUP-ID
: Consumer group name for the indexing process (default isconduktor-sql
)CDK_KAFKASQL_CLEAN-EXPIRED-RECORD-EVERY-IN-HOUR
: The interval in which the clean-up process will run to purge data outside the desired retention period.
Index Topics for Querying
Index Topics in the UI
To create a new indexed topic, you can use the UI by navigating to the new SQL tab. Note you will only see this tab if you have configured the SQL database as a dependency.
When selecting a topic for indexing, you will be asked to configure the:
- Indexed Retention Time: The furthest point in time from which the data will be indexed. Note that any data before this point in time will be purged periodically.
- By default, purging happens every 1h, but it's configurable using the environment variable
CDK_KAFKASQL_CLEAN-EXPIRED-RECORD-EVERY-IN-HOUR
- By default, purging happens every 1h, but it's configurable using the environment variable
Understanding the state of indexing
After choosing to index a topic, you will be able to see the state of the indexing process in the Indexed Topics tab. The table name will only become available when data starts to be indexed.
The process gives insight into the:
- Offset lag: The difference between the latest message offset in the Kafka topic and the current offset of the consumer group, indicating how much data is yet to be processed
- Time lag: The delay between the timestamp of the latest message in the Kafka topic and the time when the message was indexed, reflecting processing latency
- Indexed count: The total number of messages successfully indexed into the database table from the Kafka topic
Index Topics in the CLI
Alternatively, you can index a topic through the conduktor CLI:
apiVersion: "v1"
kind: "IndexedTopic"
metadata:
name: "customers"
cluster: "kafka-cluster-dev"
spec:
retentionTimeInSecond: "86400" # 1 day of retention
conduktor apply -f index-topics.yml
Upon execution, the console backend will index messages from the (current time) - (retention time), and subsequently start listening for new records.
Querying the data
Using the UI
Query syntax requires the cluster technical-id is used as a prefix for the table name e.g. for the topic customers
on the cluster kafka-cluster-dev
:
SELECT *
FROM "kafka-cluster-dev_customers";
See database storage format for the underlying table structure.
Using the API
curl \
--header "Authorization: $token" \
--request POST 'localhost:8080/api/public/sql/v1/execute?maxLine=2' \
--data 'select * from "kafka-cluster-dev_customers"'
Using the CLI
conduktor sql 'select * from "kafka-cluster-dev_customers"' -n 2
Database Storage Format
Each indexed topic will have its dedicated SQL table. The table's name will apply the following convention ${cluster-technical-id}_${topic-name}
.
The table will contain special column types, each of those columns is indexed:
__timestamp
__partition
__offset
The content of each record is flattened. Given the following record:
{
"a": {
"b": {
"c": "Hello World"
},
"userId": "109210921092"
}
}
Then, you'll have the following table structure:
__timestamp | __partition | __offset | a.b.c | userId |
---|---|---|---|---|
123456789 | 0 | 42 | Hello World | 109210921092 |
If records with a different shape come later, the table schema will be updated:
{
"newField": "Kafka"
}
__timestamp | __partition | __offset | a.b.c | userId | newField |
---|---|---|---|---|---|
123456789 | 0 | 42 | Hello World | 109210921092 | NULL |
123456790 | 0 | 43 | NULL | NULL | Kafka |
Shrinker
As column names are limited in size (63 characters), the field name must sometimes be shrunk. We try to do that intelligently so it is still meaningful for users. The head characters are removed first:
my.reaaaally.loooooooooooooooooooooooooooooong.path.to.a.field
will give
m.r.oong.path.to.a.field
Collision Solver
Sometimes, the table or column names can be the same for two different topics or fields. To resolve the conflict, we suffix the name by _${inc}
(e.g. my.field
& my.field_2
).
Relation between a table/column and a topic/field is tracked in special metadata tables:
_table_mappings
_table_fields_mappings
Database isolation
The Kafka SQL feature, while providing flexibility, introduces potential security risks. By allowing users (only admin) to execute arbitrary SQL commands, there's a chance of unauthorized access or malicious activities.
To mitigate these risks, we've implemented several security measures.
- Read-Only Connections: While not foolproof, enforcing read-only connections limits the potential for data modification
- SQL query pre-parsing and sanitizing:
- Schema restriction: Restricting queries to the public schema prevents access to sensitive data in other schemas. For example, in the Conduktor database, the public schema is empty (except for the Flyway migration table which is also hidden)
- Query Type Limitation: Allowing only SELECT statements ensures that users cannot modify or delete data. For example, it forbids ROLLBACK which would break the previous limitation
Despite these measures, it's crucial to isolate the Kafka indexing database from the console backend database. This isolation provides additional benefits:
- Resource Contention: Prevents the Kafka indexing process or a user's arbitrary request from consuming excessive resources and impacting the overall system performance
- Data Breach Mitigation: Limits the potential damage in case of a security breach in the SQL endpoint protection (not totally foolproof).
Known Limitations
There are several known limitations regarding the current beta experience.
Those are:
- Data formats currently supported are plain
JSON
, and bothAvro
&JSON
with Confluent Schema Registry - Byte, Array and Boolean data types are not currently parsed, they will be added in the next version
- If for any reason a record can't be parsed, they are ignored and the consumer continues
- To efficiently import data in Postgres, we didn't set any primary key, so a record can be there more than once
- If you try to index a topic with a schema that is not supported, the lag value will be 0 but no records will appear in the table
If you identify more limitations or want to provide feedback, please contact us.