> ## Documentation Index
> Fetch the complete documentation index at: https://docs.conduktor.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Enforce Kafka data quality — intercept and reject bad messages

> Block or redirect invalid Kafka messages at the proxy layer with Conduktor Gateway. Define CEL or JSON Schema rules to reject bad data before it reaches.

<Badge stroke color="blue" icon="sparkle" size="lg">Enterprise</Badge>

## Overview

Bad data breaks customer experiences, drives churn and slows growth. Conduktor helps teams catch and fix data quality issues *before* they impact your business. You define the Rules and we'll enforce them at the streaming layer.

[Find out more about data quality](/guide/conduktor-concepts/data-quality-policies).

<Note>
  This page details Conduktor's capability to enforce data quality *with* <Tooltip tip="A Kafka proxy that deploys extensible plugins for encryption, filtering, and data processing.">Gateway</Tooltip>.

  [Find out about data quality capabilities without Gateway](/guide/use-cases/observe-data-quality).
</Note>

<Tip>
  Want to enforce data quality with Gateway only, without Console? You can deploy the data quality policy Interceptor directly on Gateway. [Find out how to configure the data quality policy Interceptor](/guide/conduktor-in-production/admin/gateway-policies#validate-data-quality-policy).
</Tip>

<Info>
  **From our blog:** [Validate JSON in Kafka without migrating to Schema Registry](https://conduktor.io/blog/bounce-bad-kafka-data-with-json-schema-rules) Block malformed JSON before it breaks downstream consumers, no Schema Registry migration needed.
</Info>

## Prerequisites

Before creating data quality Rules and Policies, you have to:

* use **Conduktor <Tooltip tip="A user interface for managing and monitoring Kafka infrastructure, supporting Kubernetes deployment.">Console</Tooltip> 1.34** or later,
* use **Conduktor Gateway 3.9** or later,
* be logged in as an admin to Console UI or use an admin token for the Conduktor <Tooltip tip="Manages Console and Gateway resources via command line or integration into CI/CD pipelines.">CLI</Tooltip>,
* [configure your Gateway cluster in Console](/guide/conduktor-in-production/admin/configure-clusters) and fill in the **Provider** tab with Gateway API credentials.

## Rules

You can define three different types of Rules that will validate your Kafka message data quality:

* [CEL (Common Expression Language) expression](#cel-expression-rules)
* [Built-in](#built-in-rules)
* [JSON schema](#json-schema-rule)

<Info>
  <Tooltip tip="A Rule captures business logic that's applied to data. Rules have to be attached to Policies to take effect.">Rules</Tooltip> do nothing on their own - you **have to** attach them to a <Tooltip tip="A collection of validation Rules that's applied to Kafka topics or prefixes.">Policy</Tooltip>.
</Info>

### Create a Rule

You can create a data quality Rule (CEL or JSON schema) using the Console UI or the Conduktor CLI.

#### Test Rules before creation

When creating Rules using the Console UI, you can test both CEL expressions and JSON schema Rules against sample data before saving changes.

This helps with ensuring that your Rules work correctly with the expected data format, allowing you to iterate quickly and ensure that the Rules will work as expected before deploying them to production topics.

1. Click **Validate your Rule against sample messages** to open the validation panel.
2. In the **Rule validation** panel enter:
   * **Key**: sample message key (only for CEL rules that reference `key` properties)
   * **Value**: sample message value (the main data payload that your Rule will validate)
   * **Headers**: sample message headers (only for CEL rules that check header values)
3. Click **Test Rule** to validate it.
4. A message will appear explaining whether the Rule has passed, failed or if there's an evaluation error (an issue with your Rule syntax or data format).

<Tabs>
  <Tab title="Console UI">
    To create a Rule using the Console UI:

    1. Go to **Rules** and click **+New Rule**.
    2. Define the Rule details:
       * Add a descriptive **name** for the Rule.
       * The **Technical ID** will be auto-populated as you type in the name. This is used to identify this Rule in CLI/API.
       * (Optional) Enter a **Description** that explains the purpose of your Rule.
    3. Select the Rule type (**CEL expression** or **JSON schema**) and provide the required logic:
       * CEL is an expression language supporting common operators like `==` and `>` as well as macros like `has()` to check for the presence of fields. Use `matches()` to test regex patterns. [See the CEL language definition for details](https://github.com/google/cel-spec/blob/master/doc/intro.md) <Icon icon="up-right-from-square" />. You can access a set of pre-built examples by clicking **Show regex library**. Click on the relevant example to copy it and paste into your Rule expression. You can then customize it further, as necessary.
       * For JSON schema, enter your schema definition. [See the JSON schema specification for details](https://json-schema.org/specification) <Icon icon="up-right-from-square" />.
    4. (Optional) Define a custom message that replaces the default `<RULE_NAME> did not pass` when the Rule is violated, shown in the Policy violation history.
    5. Click **Create**.
  </Tab>

  <Tab title="Conduktor CLI">
    You can also use the [Conduktor CLI](/guide/conduktor-in-production/automate/cli-automation) to create a Rule:

    1. Save your rule configuration to a file, e.g. `rule.yaml`.

    Example for CEL:

    ```yaml theme={null}
    apiVersion: v1
    kind: DataQualityRule
    metadata:
        name: check-customer-email
    spec:
        celExpression: value.customer.email.matches(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
        displayName: check customer email
        description: Verify the customer's email format.
        type: Cel
    ```

    Example for JSON Schema:

    ```yaml theme={null}
    apiVersion: v1
    kind: DataQualityRule
    metadata:
        name: valid-user
    spec:
        schema:
          {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "type": "object",
            "required": ["name", "age"],
            "properties": {
              "name": { "type": "string" },
              "age": { "type": "integer", "minimum": 0 },
              "isActive": { "type": "boolean", "default": true }
            }
          }
        displayName: valid user
        description: check that the user is valid
        type: JsonSchema
    ```

    2. [Use the CLI](/guide/conduktor-in-production/automate/cli-automation) to apply the configuration:

    ```bash theme={null}
    conduktor apply -f rule.yaml
    ```
  </Tab>
</Tabs>

### CEL expression Rules

You can create Rules with CEL expressions that capture business logic of your data. For example: `value.customerId.matches("[0-9]{8}")`.

The Rules page lists your Rules with a preview of their CEL expressions. Open a Rule's detail page to see its description, full CEL expression and attached Policies.

#### Sample CEL Rules

<Note>
  Make sure you amend these examples to use your values.
</Note>

<AccordionGroup>
  <Accordion title="Email RegEx validation">
    Your requirements may be different from this RegEx, as email validation via RegEx is complex.
    `value.customer.email.matches(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")`
  </Accordion>

  <Accordion title="UUID RegEx validation">
    `value.customer.id.matches(r"^[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}$")`
  </Accordion>

  <Accordion title="Ensure Header is JSON">
    `headers['Content-Type'] == 'application/json'`
  </Accordion>
</AccordionGroup>

### Built-in Rules

We provide built-in validation Rules that can't be achieved with CEL.

<Info>
  We currently only support **Confluent** and **Confluent like** (e.g. Redpanda) schema registries.
</Info>

#### EnforceAvro

`EnforceAvro` ensures that:

* Your messages have a schema ID prepended to the message content.
* The schema ID exists within your schema registry.
* The schema it references is of type `avro`.

### JSON schema Rule

Enforce JSON Schema validation on Kafka messages to ensure data consistency and structure compliance.

#### Configure JSON schema

Add a JSON schema Rule using the CLI:

```yaml theme={null}
apiVersion: v1
kind: DataQualityRule
metadata:
    name: json
spec:
    schema:
        {
          "$schema": "http://json-schema.org/draft-07/schema#",
          "type": "object",
          "required": ["name", "age"],
          "properties": {
            "name": { "type": "string" },
            "age": { "type": "integer", "minimum": 0 },
            "isActive": { "type": "boolean", "default": true }
          }
        }
    displayName: valid user
    description: check that the user is valid
    type: JsonSchema
```

#### Schema requirements

* Has to conform to the [JSON schema specification](https://json-schema.org/specification) <Icon icon="up-right-from-square" />
* If no `$schema` is specified, will default to [draft 2020-12](https://json-schema.org/draft/2020-12) <Icon icon="up-right-from-square" />

#### Examples

<AccordionGroup>
  <Accordion title="Basic User Validation">
    ```json theme={null}
    {
      "type": "object",
      "required": ["id", "name", "age"],
      "properties": {
        "id": { "type": "string" },
        "name": { "type": "string", "minLength": 1 },
        "age": { "type": "integer", "minimum": 0 }
      }
    }
    ```
  </Accordion>

  <Accordion title="Schema with Version">
    ```json theme={null}
    {
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "type": "object",
      "required": ["id", "name"],
      "properties": {
        "id": { "type": "string" },
        "name": { "type": "string" }
      }
    }
    ```
  </Accordion>

  <Accordion title="No Additional Fields Allowed">
    ```json theme={null}
    {
      "type": "object",
      "required": ["id", "name"],
      "properties": {
        "id": { "type": "string" },
        "name": { "type": "string" }
      },
      "additionalProperties": false
    }
    ```
  </Accordion>
</AccordionGroup>

## Policies

A Policy is made up of Rules that are applied to topics/prefixes.

Once created, Policies can be assigned [actions](#actions) to take effect when certain criteria is met (e.g., a Rule in the Policy is violated).

The Policy's detail page shows its description, linked Rules, targets, violation and evaluated message counts and a history of recent violations.
You can also enable (and disable) actions for the Policy from this page.

The Policies page lists all of your Policies with their actions, targets and violation counts.

### Actions

The available actions to enable for a Policy are:

* **Block**: when a violation occurs, prevent data from being processed or transmitted. Blocking rejects the entire Kafka record batch, which has implications for your producer application. [Find out about handling blocked batches](/guide/conduktor-in-production/admin/gateway-policies#handle-blocked-batches).
* **Mark**: when a violation occurs, add a special header to the message so it can be identified and handled downstream. [Find out about how marking works](#mark-action).

By default, Policies created using the Console UI don't have any actions enabled. You have to complete the Policy creation first and then enable the required actions. If there are any additional actions you'd like to see, [get in touch](https://support.conduktor.io/hc/en-gb/requests/new?ticket_form_id=17438365654417) <Icon icon="up-right-from-square" />.

<Info>
  If both *Block* and *Mark* are enabled, **only blocking** will be applied: messages violating the Policy will be blocked entirely with no marking.
</Info>

#### Mark action

When the **Mark** action is enabled, messages that violate the Rule(s) are tagged with a special header:

* **Header name:** <code>conduktor.dataquality.violations</code>
* **Header value:** JSON object mapping policy names to arrays of failed rule names. For example:

```json theme={null}
{
  "policy-name": ["rule-1", "rule-2"],
  "another-policy": ["rule-3"]
}
```

**View violations in topic consume**

Console v1.42 and later shows Policy violations when consuming from topics on Gateway clusters. A Policy badge appears in the topic header, the consume table shows a **Marking violations** column and the **Metadata** tab displays full violation details. Hover over a Policy name to see IDs and violated Rules.

[Find out about viewing data quality violations in topics](/guide/manage-kafka/kafka-resources/topics#view-data-quality-policy-violations).

### Violations metrics

Monitor Rule violations for a Policy over the selected time range: 30 minutes, one hour, one day or seven days.

#### Metrics displayed

The page displays four types of metrics:

* **Messages evaluated count** - total messages checked against this Policy
* **Violations count** - total violations detected
* **Violations graph** - a bar chart showing past violations to help identify patterns and spikes
* **Violations history** - individual violation events showing when and where violations occurred

#### Data freshness

**Important**: There's a timing difference between the metrics:

* **Violations history** - updated almost immediately
* **Messages evaluated count, violations count and violations graph** - may be delayed up to one minute

This delay exists because these metrics are collected through our monitoring system (Cortex), which processes data in batches for performance.

The violation history comes directly from the backend database. Because of this, the violations history can show recent violations that don't yet appear in the graph or counts.

## Create a Policy

You can create a data quality Policy using the Console UI or the Conduktor CLI.

<Tabs>
  <Tab title="Console UI">
    To create a Policy using the Console UI:

    1. Under the *Trust* section of the main menu, go to **Policies** and click **+New policy**.
    2. Define the Policy details:
       * **Name** - add a descriptive name for the Policy.
       * **Technical ID** - will be auto-populated as you type the name. This is used to identify this Policy in CLI/API.
       * **Group** - select a group to own the Policy. This controls who can view and manage the Policy, and which resources can be targeted.
       * **Description** - (optional) enter a description to explain your Policy.
    3. Select Rules to be used in the Policy:
       * Every Policy must have at least one Rule
       * You can also create new Rules from this page
       * Click **Continue** to move to the next step.
    4. Select targets for the Policy:
       * Every Policy must have at least one target
       * A target consists of one or more topics on a specified Gateway
       * You can either select specific topics or specify a prefix like `orders-*`
       * Click **Continue** to move to the next step.
    5. Review the Policy and when ready, confirm by clicking **Create**.
  </Tab>

  <Tab title="Conduktor CLI">
    You can also use the [Conduktor CLI](/guide/conduktor-in-production/automate/cli-automation) to create a Policy:

    1. Save this example to file, e.g. `policy.yaml`:

    ```yaml theme={null}
    apiVersion: v1
    kind: DataQualityPolicy
    metadata:
        name: check-order-payload
        group: orders-team
    spec:
        displayName: Verify the order items
        description: Verify the order items payloads on purchase-pipeline topic.
        rules:
          - check-customer-email
        targets:
          - cluster: gateway
            topic: purchase-pipeline
            patternType: LITERAL
        actions:
          block:
            enabled: false
          mark:
            enabled: true
    ```

    2. Use [the CLI](/guide/conduktor-in-production/automate/cli-automation) to apply the configuration:

       ```bash theme={null}
       conduktor apply -f policy.yaml
       ```
  </Tab>
</Tabs>

### Manage a Policy

Once a Policy is created, you can view the attached Rule(s), the target(s) of the Policy and change the related [actions](#actions). You can also view the violations as they occur and the violation count will be shown.

To edit the list of Rules attached to a Policy, click **Edit selection** on the Policy details page. In the dialog that opens, select/deselect the required Rules from the list and save changes.

<Info>
  Since the **block** action has the ability to **stop data from being sent** to the requested topic, you have to confirm this by entering 'BLOCK' when prompted. Conversely, to disable the blocking, enter 'UNBLOCK' when prompted.
</Info>

### Assign permissions

Policies are **owned by user groups** and can be created by admin users or groups with the **Manage data quality** permission.

To apply this permission to a group: go to **Settings** > **Groups** and in the **Resource access** tab tick the **Manage data quality** checkbox for the relevant resources.

<Info>
  Make sure to enable this on the Gateway (and not the underlying Kafka) cluster. Modifying group permissions won't affect any Policies associated with the group.
</Info>

<img src="https://mintcdn.com/conduktor/FjE8hB_sHo6Ubhbb/images/topic-dq-manage-permission.png?fit=max&auto=format&n=FjE8hB_sHo6Ubhbb&q=85&s=9ae8a6e3a996109220492d4bec8d5d18" alt="The 'manage data quality' permission" width="2980" height="1700" data-path="images/topic-dq-manage-permission.png" />

### Set up Policy violation alerts

You can create alerts that are triggered when a Policy violation happens. Data quality alerts can be owned by groups or individual users.

<Tabs>
  <Tab title="Console UI">
    To create a data quality policy alert via the UI, go to the details page of a Policy (click on the button next to the violations graph) or from the alert tab on the Policies list page.

    A data quality policy alert needs to specify a Policy and a threshold: trigger after X violations within Y minutes/hours/days. This threshold replaces the combination of metric, operator and value used in other alerts.

    [Find out more about alerts](/guide/monitor-brokers-apps/alerts).
  </Tab>

  <Tab title="Conduktor CLI">
    You can use the [Conduktor CLI](/guide/conduktor-in-production/automate/cli-automation) to create a data quality policy alert:

    1. Save this example to file, e.g. `alert.yaml`:

    ```yaml theme={null}
    apiVersion: v3
    kind: Alert
    metadata:
        name: alert
        group: my-group
    spec:
        type: DataQualityPolicyAlert
        policyName: my-policy
        triggerAfter: 1
        withinInSeconds: 30
    ```

    2. Use [the CLI](/guide/conduktor-in-production/automate/cli-automation) to apply the configuration:

    ```bash theme={null}
    conduktor apply -f alert.yaml
    ```
  </Tab>
</Tabs>

### Using multiple Policies

When multiple Policies target the same topic, there are two scenarios that can occur when a record is produced:

* **None** of the Policies block the record and all are evaluated

  * The evaluation count is increased for all of the Policies.
  * The violation count is increased for each violated Policy.
  * An entry will appear in the violation history for each violated Policy.

* **One or more** of the Policies block the record production. In this scenario, one of the Policies blocks the record first and then hides it from others

  * For the first blocking Policy, both the violation and evaluation counts are increased and a report is generated.
  * For the other Policies: no counts are increased and no reports are generated.

<Tip>
  You can track data quality coverage and health metrics across all your topics using the [data quality overview page](/guide/conduktor-concepts/data-quality-policies#data-quality-metrics).
</Tip>

## Troubleshoot

<AccordionGroup>
  <Accordion title="What does Policy status mean?">
    This is the status of a data quality Policy:

    * **Pending**: the configuration isn't deployed or refreshed yet
    * **Ready**: the configuration is up-to-date on Gateway
    * **Failed**: something unexpected happened during the deployment. Check that the connected Gateway is active.
  </Accordion>

  <Accordion title="How do I handle headers with dashes?">
    Use bracket notation instead of dot notation. For example: `headers['Content-Type']`
  </Accordion>

  <Accordion title="Why can't `type()` figure out the right numeric types?">
    Whether your data is sent as **JSON** or **Avro**, Conduktor Gateway internally converts the payload to JSON before applying CEL rules. In JSON, all numeric values are treated as a generic `number` — there's no distinction between `int` and `double`. As a result, expressions like `type(value.age) == int` may **fail unexpectedly**, even if:

    * the original value is a valid integer (e.g., 12)
    * you're using an Avro schema where age is explicitly entered as an integer

    This is because the Avro type information is lost during the conversion to JSON.

    **Recommended workaround:**
    Use logic-based expressions like: `value.age > 0 && value.age < 130` This implicitly checks that the field is numeric and falls within a valid range, avoiding type inference.

    **Note**: CEL currently can't evaluate against Avro schemas directly — it only sees the JSON-converted payload.

    We recommend enabling Gateway debug logs to inspect how data is interpreted during rule evaluation and to understand why it may have failed.
  </Accordion>

  <Accordion title="Why can't I mix Gateway and non-Gateway targets in a single policy?">
    You can directly target topics on Kafka clusters without Gateway with Policies, but [actions](/guide/use-cases/enforce-data-quality#actions) require Gateway.

    Since the targeted cluster type affects the capabilities of data quality Policies, you can't create a Policy with a mixture of target types.

    [Find out about the differences in data quality policy capabilities with and without Gateway](/guide/conduktor-concepts/data-quality-policies).
  </Accordion>
</AccordionGroup>

## Related resources

* [Observe data quality](/guide/use-cases/observe-data-quality)
* [Connect to clusters](/guide/conduktor-in-production/admin/configure-clusters)
* [Learn about data masking functionality](/guide/conduktor-in-production/admin/data-masking)
* [Give us feedback/request a feature](https://conduktor.io/roadmap) <Icon icon="up-right-from-square" />
