Skip to main content
This feature is available with Conduktor Trust only.

Overview

Bad data breaks customer experiences, drives churn and slows growth. Conduktor Trust helps teams catch and fix data quality issues before they impact your business. You define the Rules and we’ll enforce them at the streaming layer.
This page details Conduktor’s capability to observe data quality without Gateway.Find out about data quality capabilities with Gateway.

Prerequisites

Before creating data quality Rules and Policies, you have to:
  • use Conduktor Console 1.39 or later,
  • be logged in as an admin to Console UI or use an admin token for the Conduktor CLI,

Rules

You can define three different types of Rules that will validate your Kafka message data quality:
Rules do nothing on their own - you have to attach them to a Policy.

Create a Rule

You can create a data quality Rule (CEL or JSON schema) using the Console UI or the Conduktor CLI.

Test Rules before creation

When creating Rules using the Console UI, you can test both CEL expressions and JSON schema Rules against sample data before saving changes. This helps ensure that your Rules work correctly with the expected data format, allowing you to iterate quickly and confirm that the Rules will work as expected before deploying them to production topics.
  1. Click Validate your Rule against sample messages to open the validation panel.
  2. In the Rule validation panel enter:
    • Key - sample message key (only for CEL Rules that reference key properties)
    • Value - sample message value (the main data payload that your Rule will validate)
    • Headers - sample message headers (only for CEL Rules that check header values)
  3. Click Test Rule to validate it.
  4. A message will appear explaining whether the Rule has passed, failed or if there’s an evaluation error (an issue with your Rule syntax or data format).
  • Console UI
  • Conduktor CLI
To create a Rule using the Console UI:
  1. Go to Rules and click +New rule.
  2. Define the Rule details:
    • Name - add a descriptive name for the Rule.
    • Technical ID - will be auto-populated as you type the name. This is used to identify this Rule in CLI/API.
    • Description - (optional) enter a description that explains the purpose of your Rule.
  3. Select the Rule type (CEL expression or JSON schema) and provide the required logic:
    • CEL is an expression language supporting common operators like == and > as well as macros like has() to check for the presence of fields. Use matches() to test regex patterns. See the CEL language definition for details. You can access a set of pre-built examples by clicking Show regex library. Click on the relevant example to copy it and paste into your Rule expression. You can then customize it further, as necessary.
    • For JSON schema, enter your schema definition. See the JSON schema specification for details.
  4. (Optional) Define a custom message that replaces the default <RULE_NAME> did not pass when the Rule is violated, shown in the Policy violation history.
  5. Click Create.

CEL expression Rules

You can create Rules with CEL expressions that capture business logic of your data. For example: value.customerId.matches("[0-9]{8}"). The Rules page lists your Rules with a preview of their CEL expressions. Open a Rule’s detail page to see its description, full CEL expression and attached Policies.

Sample CEL Rules

Make sure you amend these examples to use your values.
Your requirements may be different from this RegEx, as email validation via RegEx is complex. value.customer.email.matches(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
value.customer.id.matches(r"^[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}$")
headers['Content-Type'] == 'application/json'

Built-in Rules

We provide built-in validation Rules that can’t be achieved with CEL.
We currently only support Confluent and Confluent like (e.g. Redpanda/Aiven) schema registries.

EnforceAvro

EnforceAvro ensures that:
  • Your messages have a schema ID prepended to the message content.
  • The schema ID exists within your schema registry.
  • The schema it references is of type avro.

JSON schema Rule

Enforce JSON Schema validation on Kafka messages to ensure data consistency and structure compliance.

Configure JSON schema

Add a JSON schema Rule using the CLI:
apiVersion: v1
kind: DataQualityRule
metadata:
    name: json
spec:
    schema:
        {
          "$schema": "http://json-schema.org/draft-07/schema#",
          "type": "object",
          "required": ["name", "age"],
          "properties": {
            "name": { "type": "string" },
            "age": { "type": "integer", "minimum": 0 },
            "isActive": { "type": "boolean", "default": true }
          }
        }
    displayName: valid user
    description: check that the user is valid
    type: JsonSchema

Schema requirements

Examples

{
    "type": "object",
    "required": ["id", "name", "age"],
    "properties": {
    "id": { "type": "string" },
    "name": { "type": "string", "minLength": 1 },
    "age": { "type": "integer", "minimum": 0 }
}
}
{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "type": "object",
    "required": ["id", "name"],
    "properties": {
    "id": { "type": "string" },
    "name": { "type": "string" }
}
}
{
    "type": "object",
    "required": ["id", "name"],
    "properties": {
    "id": { "type": "string" },
    "name": { "type": "string" }
},
    "additionalProperties": false
}

Policies

A Policy is made up of Rules that are applied to topics/prefixes. The Policy’s detail page shows its description, linked Rules, targets, violation and evaluated message counts and a history of recent violations. The Policies page lists all of your Policies with their targets and violation counts.

Violations metrics

Monitor Rule violations for a Policy over the selected time range: 30 minutes, one hour, one day or seven days.

Metrics displayed

The page displays four types of metrics:
  • Messages evaluated count - total messages checked against this Policy
  • Violations count - total violations detected
  • Violations graph - a bar chart showing past violations to help identify patterns and spikes
  • Violations history - individual violation events showing when and where violations occurred

Data freshness

Important: There’s a timing difference between the metrics:
  • Violations history - updated almost immediately
  • Messages evaluated count, violations count and violations graph - may be delayed up to one minute
This delay exists because these metrics are collected through our monitoring system (Cortex), which processes data in batches for performance. The violation history comes directly from the backend database. Because of this, the violations history can show recent violations that don’t yet appear in the graph or counts.

Create a Policy

You can create a data quality Policy using the Console UI or the Conduktor CLI.
  • Console UI
  • Conduktor CLI
To create a Policy using the Console UI:
  1. Under the Trust section of the main menu, go to Policies and click +New policy.
  2. Define the Policy details:
    • Name - add a descriptive name for the Policy.
    • Technical ID - will be auto-populated as you type the name. This is used to identify this Policy in CLI/API.
    • Group - select a group to own the Policy. This controls who can view and manage the Policy, and which resources can be targeted.
    • Description - (optional) enter a description to explain your Policy.
  3. Select Rules to be used in the Policy:
    • Every Policy must have at least one Rule
    • You can also create new Rules from this page
    • Click Continue to move to the next step.
  4. Select targets for the Policy:
    • Every Policy must have at least one target
    • A target consists of one or more topics on a specified Gateway
    • You can either select specific topics or specify a prefix like orders-*
    • Click Continue to move to the next step.
  5. Review the Policy and when ready, confirm by clicking Create.

Manage a Policy

Once a Policy is created, you can view the attached Rule(s), the target(s) of the Policy. You can also view the violations as they occur and the violation count will be shown. To edit the list of Rules attached to a Policy, click Edit selection on the Policy details page. In the dialog that opens, select/deselect the required Rules from the list and save changes.

Assign permissions

Policies are owned by user groups and can be created by admin users or groups with the Manage data quality permission. To apply this permission to a group: go to Settings > Groups and in the Resource access tab tick the Manage data quality checkbox for the relevant resources. The 'manage data quality' permission

Set up Policy violation alerts

You can create alerts that are triggered when a Policy violation happens. Data quality alerts can be owned by groups or individual users.
  • Console UI
  • Conduktor CLI
To create a data quality policy alert via the UI, go to the details page of a Policy (click on the button next to the violations graph) or from the alert tab on the Policies list page.A data quality policy alert needs to specify a Policy and a threshold: trigger after X violations within Y minutes/hours/days. This threshold replaces the combination of metric, operator and value used in other alerts.Find out more about alerts.

Using multiple Policies

When multiple Policies target the same topic, and a record is produced on that topic:
  • The evaluation count is increased for all of the Policies.
  • The violation count is increased for each violated Policy.
  • An entry will appear in the violation history for each violated Policy.

Troubleshoot

This is the status of a data quality Policy:
  • Pending: the configuration isn’t deployed or refreshed yet
  • Ready: the configuration is up-to-date on Gateway
  • Failed: something unexpected happened. Check that the clusters targeted by the Policy are online and reachable by Console. If the issue persists, contact Conduktor support
Use bracket notation instead of dot notation. For example: headers['Content-Type']
Whether your data is sent as JSON or Avro, Conduktor Gateway internally converts the payload to JSON before applying CEL rules. In JSON, all numeric values are treated as a generic number — there’s no distinction between int and double. As a result, expressions like type(value.age) == int may fail unexpectedly, even if:
  • the original value is a valid integer (e.g., 12)
  • you’re using an Avro schema where age is explicitly entered as an integer
This is because the Avro type information is lost during the conversion to JSON.Recommended workaround: Use logic-based expressions like: value.age > 0 && value.age < 130 This implicitly checks that the field is numeric and falls within a valid range, avoiding type inference.Note: CEL currently can’t evaluate against Avro schemas directly — it only sees the JSON-converted payload.We recommend enabling Gateway debug logs to inspect how data is interpreted during rule evaluation and to understand why it may have failed.
You can directly target topics on Kafka clusters without Gateway with Policies, but actions require Gateway.Since the targeted cluster type affects the capabilities of data quality Policies, you can’t create a Policy with a mixture of target types.Find out about the differences in data quality policy capabilities with and without Gateway.