Skip to main content
Quick navigation

Configure SQL

Configure Conduktor SQL

info

This feature is in Beta and is subject to change as we enhance it further.

It's currently only available to Console Admins, and will be made available for all users when integrated with our security model (i.e. RBAC, Data Masking).

Index data from Kafka topics in a database to enable users to query data from the UI, API or CLI using SQL.

Note this feature enables you to troubleshoot, sample, analyze, aggregate and join data through:

  • Querying Kafka message data
  • Querying Kafka metadata (such as the offset, partition and timestamp)

We encourage you to use this feature in non-production environments and give us feedback.

Conduktor SQL

Console Configuration

For a fully integrated Docker Compose, run our get started stack to try SQL. The below guide details how to add this feature to your existing deployment.

Database Configuration

By default, the SQL feature is disabled. You will need to add additional configuration about the database for storing the data.

warning

You should provision a second database for storing SQL data that is different from the existing one used by Console's backend. This ensures separation of concerns and continued operation of the core Console experience if the SQL database becomes unavailable.

See database requirements and about database isolation for more guidance.

Configure the second database through environment variables:

  • CDK_KAFKASQL_DATABASE_URL: database connection url in the format [jdbc:]postgresql://[user[:password]@]netloc[:port][/dbname][?param1=value1&...]

Alternatively, set each value explicitly:

  • CDK_KAFKASQL_DATABASE_HOST: Postgresql server host name
  • CDK_KAFKASQL_DATABASE_PORT: Postgresql server port
  • CDK_KAFKASQL_DATABASE_NAME: Database name
  • CDK_KAFKASQL_DATABASE_USERNAME: Database login role
  • CDK_KAFKASQL_DATABASE_PASSWORD: Database login password
  • CDK_KAFKASQL_DATABASE_CONNECTIONTIMEOUT: Connection timeout option in seconds

Additional Configuration

Note that additional configuration can be made in relation to the indexing process:

  • CDK_KAFKASQL_CONSUMER-GROUP-ID: Consumer group name for the indexing process (default is conduktor-sql)
  • CDK_KAFKASQL_CLEAN-EXPIRED-RECORD-EVERY-IN-HOUR: The interval in which the clean-up process will run to purge data outside the desired retention period.

Index Topics for Querying

Index Topics in the UI

To create a new indexed topic, you can use the UI by navigating to the new SQL tab. Note you will only see this tab if you have configured the SQL database as a dependency.

When selecting a topic for indexing, you will be asked to configure the:

  • Indexed Retention Time: The furthest point in time from which the data will be indexed. Note that any data before this point in time will be purged periodically.
    • By default, purging happens every 1h, but it's configurable using the environment variable CDK_KAFKASQL_CLEAN-EXPIRED-RECORD-EVERY-IN-HOUR

Conduktor SQL Indexing

Understanding the state of indexing

After choosing to index a topic, you will be able to see the state of the indexing process in the Indexed Topics tab. The table name will only become available when data starts to be indexed.

The process gives insight into the:

  • Offset lag: The difference between the latest message offset in the Kafka topic and the current offset of the consumer group, indicating how much data is yet to be processed
  • Time lag: The delay between the timestamp of the latest message in the Kafka topic and the time when the message was indexed, reflecting processing latency
  • Indexed count: The total number of messages successfully indexed into the database table from the Kafka topic

Conduktor SQL Index Status

Index Topics in the CLI

Alternatively, you can index a topic through the conduktor CLI:

apiVersion: "v1"
kind: "IndexedTopic"
metadata:
name: "customers"
cluster: "kafka-cluster-dev"
spec:
retentionTimeInSecond: "86400" # 1 day of retention
conduktor apply -f index-topics.yml

Upon execution, the console backend will index messages from the (current time) - (retention time), and subsequently start listening for new records.

Querying the data

Using the UI
Query syntax requires the cluster technical-id is used as a prefix for the table name e.g. for the topic customers on the cluster kafka-cluster-dev:

SELECT *
FROM "kafka-cluster-dev_customers";

See database storage format for the underlying table structure. Conduktor SQL

Using the API

curl \
--header "Authorization: $token" \
--request POST 'localhost:8080/api/public/sql/v1/execute?maxLine=2' \
--data 'select * from "kafka-cluster-dev_customers"'

Using the CLI

conduktor sql 'select * from "kafka-cluster-dev_customers"' -n 2

Database Storage Format

Each indexed topic will have its dedicated SQL table. The table's name will apply the following convention ${cluster-technical-id}_${topic-name}.

The table will contain special column types, each of those columns is indexed:

  • __timestamp
  • __partition
  • __offset

The content of each record is flattened. Given the following record:

{
"a": {
"b": {
"c": "Hello World"
},
"userId": "109210921092"
}
}

Then, you'll have the following table structure:

__timestamp__partition__offseta.b.cuserId
123456789042Hello World109210921092

If records with a different shape come later, the table schema will be updated:

{
"newField": "Kafka"
}
__timestamp__partition__offseta.b.cuserIdnewField
123456789042Hello World109210921092NULL
123456790043NULLNULLKafka

Shrinker

As column names are limited in size (63 characters), the field name must sometimes be shrunk. We try to do that intelligently so it is still meaningful for users. The head characters are removed first:

my.reaaaally.loooooooooooooooooooooooooooooong.path.to.a.field

will give

m.r.oong.path.to.a.field

Collision Solver

Sometimes, the table or column names can be the same for two different topics or fields. To resolve the conflict, we suffix the name by _${inc} (e.g. my.field & my.field_2).

Relation between a table/column and a topic/field is tracked in special metadata tables:

  • _table_mappings
  • _table_fields_mappings

Database isolation

The Kafka SQL feature, while providing flexibility, introduces potential security risks. By allowing users (only admin) to execute arbitrary SQL commands, there's a chance of unauthorized access or malicious activities.

To mitigate these risks, we've implemented several security measures.

  • Read-Only Connections: While not foolproof, enforcing read-only connections limits the potential for data modification
  • SQL query pre-parsing and sanitizing:
    • Schema restriction: Restricting queries to the public schema prevents access to sensitive data in other schemas. For example, in the Conduktor database, the public schema is empty (except for the Flyway migration table which is also hidden)
    • Query Type Limitation: Allowing only SELECT statements ensures that users cannot modify or delete data. For example, it forbids ROLLBACK which would break the previous limitation

Despite these measures, it's crucial to isolate the Kafka indexing database from the console backend database. This isolation provides additional benefits:

  • Resource Contention: Prevents the Kafka indexing process or a user's arbitrary request from consuming excessive resources and impacting the overall system performance
  • Data Breach Mitigation: Limits the potential damage in case of a security breach in the SQL endpoint protection (not totally foolproof).

Known Limitations

There are several known limitations regarding the current beta experience.

Those are:

  • Data formats currently supported are plain JSON, and both Avro & JSON with Confluent Schema Registry
  • Byte, Array and Boolean data types are not currently parsed, they will be added in the next version
  • If for any reason a record can't be parsed, they are ignored and the consumer continues
  • To efficiently import data in Postgres, we didn't set any primary key, so a record can be there more than once
  • If you try to index a topic with a schema that is not supported, the lag value will be 0 but no records will appear in the table

If you identify more limitations or want to provide feedback, please contact us.