> ## Documentation Index
> Fetch the complete documentation index at: https://docs.conduktor.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka data validation policies

> Validate Kafka producer records with Conduktor Gateway policies. Define CEL rules, JSON Schema checks, or Schema ID validation to block or flag invalid records.

## Validate using data quality rules and policies

The `DataQualityPlugin` Interceptor validates Kafka producer records using Policies made up of:

* custom Rules defined with [CEL (Common Expression Language)](https://github.com/google/cel-spec) expressions
* built-in Rules such as checking for Schema ID
* or a static [JSON schema](https://json-schema.org/) definition

It works by evaluating every record produced to matching topics and taking action on violations.

This is the recommended Interceptor to start with for data quality validation, and it is the same Interceptor that is deployed when you create [data quality Policies in Console](/guide/use-cases/enforce-data-quality). You should use Console when you want to reuse Rules and Policies, but you can also validate data quality using only Gateway. This guide explains how to deploy the Interceptor directly on Gateway without Console.

<Info>
  **From our blog:** [Kafka policy enforcement](https://conduktor.io/blog/kafka-policy-enforcement) How to move from 50-item policy checklists to automated, enforceable rules at the proxy layer.
</Info>

### Examples

#### Mark and report with dead letter queue

Mark violating records on the `customer-transaction` topic and send them to a dead letter queue. Records are not blocked and still reach the original topic.

<Tabs>
  <Tab title="curl">
    ```bash theme={null}
    curl \
      --request PUT \
      --url 'http://localhost:8888/gateway/v2/interceptor' \
      --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
      --header 'Content-Type: application/json' \
      --data-raw '{
      "name": "customer-policy",
      "pluginClass": "io.conduktor.gateway.interceptor.dataquality.DataQualityPlugin",
      "priority": 1,
      "config": {
        "policyName": "customer-policy",
        "topicsRegex": ["^customer-transaction.*$"],
        "block": false,
        "mark": true,
        "report": true,
        "dlq": true,
        "dlqTopic": "customer-transaction-dlq",
        "maxNumberOfViolationReportPerSecond": 10,
        "consoleDeploymentId": "standalone",
        "rules": {
          "email-rule": {
            "type": "CEL",
            "expression": "value.customer.email.matches(r\"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}\")"
          },
          "age-rule": {
            "type": "CEL",
            "expression": "type(value.customer.age) == int && value.customer.age >= 0 && value.customer.age <= 130"
          },
          "classification-header-rule": {
            "type": "CEL",
            "expression": "\"data-classification\" in headers && headers[\"data-classification\"] in [\"C0\", \"C1\", \"C2\", \"C3\"]"
          }
        }
      }
    }'
    ```
  </Tab>

  <Tab title="Conduktor CLI">
    ```yaml theme={null}
    apiVersion: gateway/v2
    kind: Interceptor
    metadata:
      name: customer-policy
      scope:
        vCluster: passthrough
    spec:
      pluginClass: io.conduktor.gateway.interceptor.dataquality.DataQualityPlugin
      priority: 1
      config:
        policyName: customer-policy
        topicsRegex:
          - "^customer-transaction.*$"
        block: false
        mark: true
        report: true
        dlq: true
        dlqTopic: customer-transaction-dlq
        maxNumberOfViolationReportPerSecond: 10
        consoleDeploymentId: standalone
        rules:
          email-rule:
            type: CEL
            expression: >
              value.customer.email.matches(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
          age-rule:
            type: CEL
            expression: >
              type(value.customer.age) == int
              && value.customer.age >= 0
              && value.customer.age <= 130
          classification-header-rule:
            type: CEL
            expression: >
              'data-classification' in headers
              && headers['data-classification'] in ['C0', 'C1', 'C2', 'C3']
    ```

    Apply with:

    ```bash theme={null}
    conduktor apply -f customer-policy.yaml
    ```
  </Tab>
</Tabs>

#### Enforce schema ID

If your schema registry requires authentication, pass credentials using `additionalConfigs`. Avoid exposing secrets in your configuration with secured templates. Gateway will use its local environment variables to resolve at runtime.

<Tabs>
  <Tab title="curl">
    ```bash theme={null}
    curl \
      --request PUT \
      --url 'http://localhost:8888/gateway/v2/interceptor' \
      --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
      --header 'Content-Type: application/json' \
      --data-raw '{
      "name": "secured-dq-policy",
      "pluginClass": "io.conduktor.gateway.interceptor.dataquality.DataQualityPlugin",
      "priority": 1,
      "config": {
        "policyName": "secured-dq-policy",
        "topicsRegex": ["^sensitive-.*$"],
        "block": false,
        "mark": true,
        "report": true,
        "maxNumberOfViolationReportPerSecond": 10,
        "consoleDeploymentId": "standalone",
        "rules": {
          "enforce-schema": {
            "type": "ENFORCE_SCHEMA_ID"
          }
        },
        "schemaRegistryConfig": {
          "host": "http://schema-registry:8081",
          "type": "CONFLUENT",
          "additionalConfigs": {
            "basic.auth.credentials.source": "USER_INFO",
            "basic.auth.user.info": "${SR_USER}:${SR_PASSWORD}",
            "schema.registry.basic.auth.user.info": "${SR_USER}:${SR_PASSWORD}"
          }
        }
      }
    }'
    ```
  </Tab>

  <Tab title="Conduktor CLI">
    ```yaml theme={null}
    apiVersion: gateway/v2
    kind: Interceptor
    metadata:
      name: secured-dq-policy
      scope:
        vCluster: passthrough
    spec:
      pluginClass: io.conduktor.gateway.interceptor.dataquality.DataQualityPlugin
      priority: 1
      config:
        policyName: secured-dq-policy
        topicsRegex:
          - ^sensitive-.*$
        block: false
        mark: true
        report: true
        maxNumberOfViolationReportPerSecond: 10
        consoleDeploymentId: standalone
        rules:
          enforce-schema:
            type: ENFORCE_SCHEMA_ID
        schemaRegistryConfig:
          host: http://schema-registry:8081
          type: CONFLUENT
          additionalConfigs:
            basic.auth.credentials.source: USER_INFO
            basic.auth.user.info: $${SR_USER}:$${SR_PASSWORD}
            schema.registry.basic.auth.user.info: $${SR_USER}:$${SR_PASSWORD}
    ```

    Apply with:

    ```bash theme={null}
    conduktor apply -f secured-dq-policy.yaml
    ```
  </Tab>
</Tabs>

#### Block records that violate a JSON schema

Block any record on the `product` topic that does not match the expected product schema. This is useful for defending against poison pill records from errant CLI tools or misconfigured producers that write malformed data to a topic, which can cause downstream consumers to crash. No schema registry is needed since the JSON schema is defined inline in the Rule.

<Tabs>
  <Tab title="curl">
    ```bash theme={null}
    curl \
      --request PUT \
      --url 'http://localhost:8888/gateway/v2/interceptor' \
      --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
      --header 'Content-Type: application/json' \
      --data-raw '{
      "name": "product-schema-policy",
      "pluginClass": "io.conduktor.gateway.interceptor.dataquality.DataQualityPlugin",
      "priority": 1,
      "config": {
        "policyName": "product-schema-policy",
        "topicsRegex": ["^product$"],
        "block": true,
        "mark": false,
        "report": true,
        "dlq": true,
        "dlqTopic": "product-dlq",
        "maxNumberOfViolationReportPerSecond": 10,
        "consoleDeploymentId": "standalone",
        "rules": {
          "valid-product-schema": {
            "type": "JSON_SCHEMA",
            "schema": "{\"$comment\":\"setting additionalProperties to false to ensure no extra fields are allowed\",\"$id\":\"https://example.com/product.schema.json\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"additionalProperties\":false,\"description\":\"A product from Acme''s catalog\",\"properties\":{\"price\":{\"description\":\"The price of the product\",\"exclusiveMinimum\":0,\"type\":\"number\"},\"productId\":{\"description\":\"The unique identifier for a product\",\"type\":\"integer\"},\"productName\":{\"description\":\"Name of the product\",\"type\":\"string\"},\"tags\":{\"description\":\"Tags for the product\",\"items\":{\"type\":\"string\"},\"minItems\":1,\"type\":\"array\",\"uniqueItems\":true}},\"required\":[\"productId\",\"productName\",\"price\"],\"title\":\"Product\",\"type\":\"object\"}"
          }
        }
      }
    }'
    ```
  </Tab>

  <Tab title="Conduktor CLI">
    ```yaml theme={null}
    apiVersion: gateway/v2
    kind: Interceptor
    metadata:
      name: product-schema-policy
      scope:
        vCluster: passthrough
    spec:
      pluginClass: io.conduktor.gateway.interceptor.dataquality.DataQualityPlugin
      priority: 1
      config:
        policyName: product-schema-policy
        topicsRegex:
          - ^product$
        block: true
        mark: false
        report: true
        dlq: true
        dlqTopic: product-dlq
        maxNumberOfViolationReportPerSecond: 10
        consoleDeploymentId: standalone
        rules:
          valid-product-schema:
            type: JSON_SCHEMA
            schema: |
              {
                "$schema": "https://json-schema.org/draft/2020-12/schema",
                "$id": "https://example.com/product.schema.json",
                "title": "Product",
                "description": "A product from Acme's catalog",
                "type": "object",
                "$comment": "setting additionalProperties to false to ensure no extra fields are allowed",
                "additionalProperties": false,
                "properties": {
                  "productId": {
                    "description": "The unique identifier for a product",
                    "type": "integer"
                  },
                  "productName": {
                    "description": "Name of the product",
                    "type": "string"
                  },
                  "price": {
                    "description": "The price of the product",
                    "type": "number",
                    "exclusiveMinimum": 0
                  },
                  "tags": {
                    "description": "Tags for the product",
                    "type": "array",
                    "items": {
                      "type": "string"
                    },
                    "minItems": 1,
                    "uniqueItems": true
                  }
                },
                "required": [ "productId", "productName", "price" ]
              }
    ```

    Apply with:

    ```bash theme={null}
    conduktor apply -f product-schema-policy.yaml
    ```
  </Tab>
</Tabs>

### Configuration

| Key                                   | Type                                                    | Required | Default | Description                                                                                                                                                                                                        |
| :------------------------------------ | :------------------------------------------------------ | :------- | :------ | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `policyName`                          | String                                                  | Yes      |         | A name to identify this Policy. Violation reports and marking headers reference this name.                                                                                                                         |
| `topicsRegex`                         | List\<String>                                           | Yes      |         | List of regex patterns matching the topics to enforce. For example, `["^orders-.*$", "^payments$"]`.                                                                                                               |
| `rules`                               | Map\<String, [Rule](#rules)>                            | Yes      |         | A map of Rule names to Rule definitions. The Interceptor evaluates each Rule against every record.                                                                                                                 |
| `block`                               | Boolean                                                 | Yes      |         | If `true`, reject the entire batch when any record violates a Rule. The producer receives an `INVALID_RECORD` error.                                                                                               |
| `mark`                                | Boolean                                                 | No       | `false` | If `true` (and `block` is `false`), add a `conduktor.dataquality.violations` header to records that violate Rules.                                                                                                 |
| `report`                              | Boolean                                                 | Yes      |         | If `true`, write violation events to an internal Gateway topic for monitoring. See [violation reporting](#violation-reporting).                                                                                    |
| `maxNumberOfViolationReportPerSecond` | Integer                                                 | Yes      |         | Rate limit for violation reports per second. A value of `10` is a good starting point.                                                                                                                             |
| `dlq`                                 | Boolean                                                 | No       | `false` | If `true`, send violating records to a dead letter topic.                                                                                                                                                          |
| `dlqTopic`                            | String                                                  | No       |         | The topic name to use as a dead letter queue. Required if `dlq` is `true`.                                                                                                                                         |
| `schemaRegistryConfig`                | [Schema registry](#data-quality-policy-schema-registry) | No       |         | Schema registry configuration. Required if your Rules use the `ENFORCE_AVRO` or `ENFORCE_SCHEMA_ID` types.                                                                                                         |
| `consoleDeploymentId`                 | String                                                  | Yes      |         | An identifier used to separate violation data across deployments. If you are not using Console, provide any placeholder value (for example, `"standalone"`). This field will be made optional in a future version. |

<Info>
  If you are deploying this Interceptor without Console, set `consoleDeploymentId` to any placeholder value such as `"standalone"`. This field will be made optional in a future version.
</Info>

### Rules

Each Rule in the `rules` map has a `type` field that determines how the record is validated. The Interceptor supports four Rule types.

#### CEL expression

Evaluate a CEL expression against the record. The expression has to return a boolean: `true` means the record passes, `false` means it violates the Rule.

| Key          | Type   | Required | Description                                                                                                          |
| :----------- | :----- | :------- | :------------------------------------------------------------------------------------------------------------------- |
| `type`       | String | Yes      | `CEL`                                                                                                                |
| `expression` | String | Yes      | CEL expression to evaluate.                                                                                          |
| `message`    | String | No       | Custom error message returned to the producer when blocking. Defaults to `Data quality Rule '<rule-name>' violated`. |

**Available variables in CEL expressions:**

* `value` — the deserialized record value (access nested fields with dot notation, for example `value.customer.email`)
* `key` — the record key
* `headers` — a map of header names to values
* `topic` — the topic name
* `partition` — the partition number
* `offset` — the record offset

**Example CEL expressions:**

| Use case                   | Expression                                                                                                                                                                |
| :------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| Email format               | <code>value.customer.email.matches(r"\[a-zA-Z0-9.\_%+-]+@\[a-zA-Z0-9.-]+.\[a-zA-Z]\{2,}")</code>                                                                          |
| UUID format                | <code>value.id.matches(r"^\[0-9a-fA-F]\{8}-\[0-9a-fA-F]\{4}-\[0-9a-fA-F]\{4}-\[0-9a-fA-F]\{4}-\[0-9a-fA-F]\{12}\$")</code>                                                |
| Range check                | `value.customer.age >= 0 && value.customer.age <= 130`                                                                                                                    |
| Header validation          | `'data-classification' in headers && headers['data-classification'] in ['C0', 'C1', 'C2', 'C3']`                                                                          |
| Conditional required field | <code>!value.customer.preferences.newsletter \|\| (value.customer.preferences.preferred\_language != null && value.customer.preferences.preferred\_language != "")</code> |

#### JSON schema

Validate the record value against a JSON schema definition. This is especially helpful when your producer team wants to enforce standards on produced data without breaking downstream consumers by introducing a schema registry.

| Key       | Type   | Required | Description                                                  |
| :-------- | :----- | :------- | :----------------------------------------------------------- |
| `type`    | String | Yes      | `JSON_SCHEMA`                                                |
| `schema`  | String | Yes      | A JSON schema definition (as a string).                      |
| `message` | String | No       | Custom error message returned to the producer when blocking. |

#### ENFORCE\_AVRO built-in Rule

Verify that the record value is Avro-encoded: it has a schema ID prepended, the schema ID exists in the schema registry and the schema is of type Avro. Requires `schemaRegistryConfig` to be set.

| Key       | Type   | Required | Description           |
| :-------- | :----- | :------- | :-------------------- |
| `type`    | String | Yes      | `ENFORCE_AVRO`        |
| `message` | String | No       | Custom error message. |

#### ENFORCE\_SCHEMA\_ID built-in Rule

Verify that the record value has a valid schema ID prepended and that the schema exists in the schema registry. Requires `schemaRegistryConfig` to be set.

| Key       | Type   | Required | Description           |
| :-------- | :----- | :------- | :-------------------- |
| `type`    | String | Yes      | `ENFORCE_SCHEMA_ID`   |
| `message` | String | No       | Custom error message. |

### Data quality policy schema registry

A schema registry is only required if your topics use schema-encoded data. If your topics contain plain JSON, you can omit the `schemaRegistryConfig` field entirely.

| Key                 | Type    | Default     | Description                                                                                            |
| :------------------ | :------ | :---------- | :----------------------------------------------------------------------------------------------------- |
| `type`              | String  | `CONFLUENT` | `CONFLUENT` for Confluent-like schema registries (including Redpanda) or `AWS` for AWS Glue.           |
| `additionalConfigs` | Map     |             | Additional properties for security configuration. You can use environment variables as secrets.        |
| **Confluent-like**  |         |             |                                                                                                        |
| `host`              | String  |             | URL of your schema registry.                                                                           |
| `cacheSize`         | Integer | `50`        | Number of schemas cached locally by this Interceptor.                                                  |
| **AWS Glue**        |         |             |                                                                                                        |
| `region`            | String  |             | AWS region (for example, `us-east-1`).                                                                 |
| `registryName`      | String  |             | AWS Glue registry name. Leave blank for the default (`default-registry`).                              |
| `basicCredentials`  | Object  |             | AWS credentials (`accessKey`, `secretKey`). If omitted, credentials are resolved from the environment. |

### Actions

The Interceptor supports three actions that can be combined:

| Action                      | Behavior                                                                                                                                                                                                                                                                      |
| :-------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Block** (`block: true`)   | Reject the **entire Kafka record batch** for a partition if **any** record in it violates a Rule. The producer receives an `INVALID_RECORD` error (Kafka error code 87) with a message identifying the violated Rules. See [handle blocked batches](#handle-blocked-batches). |
| **Mark** (`mark: true`)     | Add a `conduktor.dataquality.violations` header to violating records. The header value is a JSON object mapping Policy names to arrays of violated Rule names. The Interceptor only applies marking when `block` is `false`.                                                  |
| **Report** (`report: true`) | Write violation events to an internal topic for monitoring. See [violation reporting](#violation-reporting).                                                                                                                                                                  |

If both `block` and `mark` are enabled, only blocking is applied.

<Warning>
  When blocking is enabled, a single invalid record causes the entire record batch for that partition to be rejected. This means valid records in the same record batch are also blocked. If you need to preserve valid records while flagging violations, use `mark` instead of `block`.
</Warning>

#### Mark header format

When marking is enabled, the `conduktor.dataquality.violations` header contains a JSON object:

```json theme={null}
{
  "my-policy-name": ["email-rule", "age-rule"]
}
```

Multiple Interceptors targeting the same topic will each append their Policy to this header.

### Dead letter queue

When `dlq` is `true` and `dlqTopic` is set, the Interceptor sends violating records to the dead letter topic with the following headers:

| Header             | Description                                                                               |
| :----------------- | :---------------------------------------------------------------------------------------- |
| `X-ERROR-MSG`      | Description of the violation including the Policy name, violated Rules and error details. |
| `X-TOPIC`          | The topic the record was originally destined for.                                         |
| `X-PARTITION`      | The partition the record was intended for.                                                |
| `X-POLICY`         | The Policy name that was violated.                                                        |
| `X-VIOLATED-RULES` | Comma-separated list of violated Rule names.                                              |

The Interceptor sends the violating records to the dead letter topic regardless of whether the action is `block` or `mark`. The `block` action will block the entire record batch that contains the violating record, but even so, only the violating record will be sent to the dead letter topic.

### Handle blocked batches

When `block` is enabled, the producer receives an `INVALID_RECORD` error (Kafka error code 87). This error is **non-retriable** and affects all records accumulated in the same record batch as the violating record. Because the Kafka producer client will not automatically retry the request, `block` should only be used when it is imperative that violating data not be written to disk.

The error message returned to the producer identifies the violated Rules, for example: `Data quality Rule 'email-rule' violated. Data quality Rule 'age-rule' violated`. If custom `message` fields are set on the Rules, those messages are used instead.

#### Handle producer errors

When testing in a development environment or favoring a "fail fast" approach to data quality validation, you should throw the `InvalidRecordException` in the record send callback to stop the application.

If the Kafka producer doesn't have an error callback or the callback is written to simply log the error and continue, then **all records in the same record batch as the violating record will be lost**.

If you decide a violating record must be blocked, but do not want to lose other passing records in the record batch, you must handle this error explicitly. One valid approach is:

1. Create a separate Kafka producer object with `batch.size = 1` and `linger.ms = 0` to only send records one at a time.
2. Catch `InvalidRecordException` so each record in the failed batch is passed to the separate producer in a different thread and retried. Use `.flush()` to ensure each record is sent by itself.
3. If this second send fails with the same exception, then we know this is a bad record and you can log the error message and continue.

The tradeoff to this approach is that it will **not preserve record ordering**. The record ordering is affected in two ways:

* The records in the failed batch will be retried out of order because the callback is executed asynchronously
* The main producer will continue to produce even while records in the failed batch are being retried

### Violation reporting

When `report` is `true`, the Interceptor writes Avro-encoded records to an internal Kafka topic named:

```
_conduktor_{gatewayClusterId}_data_quality_violation
```

All records on this topic are wrapped in a `DataQualityEvent` Avro envelope. Two event types are written:

#### Violation events

The Interceptor writes a `DataQualityViolation` event each time a record fails validation.

| Field                 | Type                 | Description                                                                                                                                        |
| :-------------------- | :------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic`               | String               | The topic where the violation occurred.                                                                                                            |
| `partition`           | Integer              | The partition number.                                                                                                                              |
| `createdAt`           | Instant              | Timestamp when the violation was detected.                                                                                                         |
| `policy`              | String               | The Policy name.                                                                                                                                   |
| `violatedRules`       | List\<String>        | Rule names that were violated.                                                                                                                     |
| `gatewayClusterId`    | String               | ID of the Gateway cluster.                                                                                                                         |
| `vCluster`            | String               | Virtual Cluster name.                                                                                                                              |
| `serviceAccount`      | String               | The service account that produced the record.                                                                                                      |
| `clientId`            | String               | Kafka client ID of the producer.                                                                                                                   |
| `errors`              | Map\<String, String> | Error details per Rule (Rule name to error message).                                                                                               |
| `consoleDeploymentId` | String               | The deployment identifier.                                                                                                                         |
| `offset`              | Long or Object       | The record offset. When the record was blocked, this contains batch metadata (batch size and offset within the batch) instead of the Kafka offset. |
| `actions`             | Object               | The actions that were applied (block, report, mark, dlq).                                                                                          |

The `maxNumberOfViolationReportPerSecond` configuration controls the rate at which the Interceptor writes violation events to this topic to prevent flooding under high violation volumes.

#### Evaluation count events

The Interceptor writes a `DataQualityChecksCount` event periodically (not per-record) with aggregated counts of all records evaluated, including records that passed validation. This provides the denominator for calculating violation rates.

Each event contains a list of counts grouped by cluster, topic, Policy and Rule:

| Count type         | Fields                                        | Description                              |
| :----------------- | :-------------------------------------------- | :--------------------------------------- |
| `PolicyCheckCount` | `cluster`, `topic`, `policy`, `value`         | Total records evaluated for this Policy. |
| `RuleCheckCount`   | `cluster`, `topic`, `policy`, `rule`, `value` | Total records evaluated for this Rule.   |

<Note>
  This topic is also used by Conduktor Console to display violation metrics and history.
</Note>

## Validate data using SQL-like checks

Conduktor Gateway offers a `DataQualityProducerPlugin` Interceptor that uses a SQL-like language to assert data quality before it's being produced.

Records in the topic from the FROM clause have to match the WHERE clause for the statement in order to be considered valid. This is particularly useful if your data is plain JSON with no schema but it can also be applied to AVRO, Protobuf data.

### Example

You have a topic for orders with records in this form:

```json theme={null}
{
  "id" : "B2EE6886-7FFF-4CAB-9B2A-CF0A06C9E648",
  "amount_cents": 12499,
  "currency": "EUR",
  "order_date": "2024-20-12T15:45:33Z"
}
```

You may want to ensure that the:

* `id` is a valid UUID format
* `amount_cents` is a positive integer and not too large
* `currency` is one of your accepted currencies
* `order_date` is in ISO 8601 format

This can be asserted with:

```sql theme={null}
SELECT
    *       -- ignored in this policy
FROM
    orders  -- topic to enforce the rule on
WHERE
        id REGEXP '^[0-9A-F-]{36}$'    -- 36 char UUID in hex with dash separators
    AND amount_cents REGEXP '[0-9]+'   -- amount must be an integer
    AND amount_cents > 0               -- ... greater than zero
    AND amount_cents < 1000000         -- ... and less than 1,000,000 cents
    AND currency REGEXP 'EUR|GBP|USD'  -- currency must be one of these three
    AND order_date REGEXP '^20[2-9][0-9]-[0-1][0-9]-[0-3][0-9]T[0-2][0-9]:[0-5][0-9]:[0-5][0-9]Z$'
                                       -- valid date, and after 2020  
```

In the statement, the list of selected fields is actually ignored - the important parts are the FROM clause (specifying the topic that the policy is applied to) and the WHERE clause, which specifies the condition data must meet in order to be considered valid. i.e. If the select returns something, the record is valid. If it returns no results, the record is considered invalid.

`SELECT [ignored!] FROM [topic name] WHERE [field filter criteria]`

Only one topic can be specified in the FROM clause (joins will be ignored), and the topic name is matched explicitly (no regexp support). If a record does not match the WHERE clause, it will be rejected. There are a variety of options for this described in the actions below. Fields are assumed to be from the value of the record. The Interceptor currently supports values in JSON, AVRO and Protobuf formats.

<Note>
  Topic names with dash `-` characters in them must be double quoted, as the dash is not a valid character for a SQL name. E.g. for a topic `our-orders` you would need to use:

  `SELECT * FROM "our-orders" WHERE ...`

  Nested fields can be accessed as expected with dot notation in the WHERE clause, e.g.:

  `address.street = 'Electric Avenue'`
</Note>

#### WHERE clause

If you specify a field name in the WHERE clause that doesn't exist in the record, the condition will always fail and the record will always be considered invalid. Fields in the WHERE clause have to exist in a record for it to be considered valid.

The WHERE clause supports a subset of SQL operations:

* The operators `=, >, >=, <, <=, <>` and `REGEXP` (RegExp MySQL Operator)
* When providing more than one condition in the WHERE clause, only the `AND`
* The `IN` clause is not supported, but can be approximated with a RegExp
* By default, the fields in the WHERE clause are looked up from the value in the record. You can also filter by other parts of the record using the syntax below:
  * Record key (it also supports encoded keys which require a schema registry lookup):
    * Record key as string: - `.. WHERE record.key = 'some thing'`
    * Record key as schema: `.. WHERE record.key.someValue.someChildValue = 'some thing'`
  * Partition: `.. WHERE record.partition = 1`
  * Timestamp: `.. WHERE record.timestamp = 98717823712`
  * Header: `.. WHERE record.header.someHeaderKey = 'some thing'`
  * Offset: `.. WHERE record.offset = 1`

#### Actions for invalid data

The policy acts on produce requests from Kafka clients which means it will often deal with a batch of multiple records spread over multiple topics and partitions. The policy can apply different effects to each request batch based on its configuration.

| Action              | Description                                                                                                                                                     |
| :------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| BLOCK\_WHOLE\_BATCH | If any records in the batch are invalid, then block the whole batch. The produce request will fail for the client in this case.                                 |
| AUDIT\_LOG\_ONLY    | For any records in the produce request which are invalid, record this in the audit log only. All records still are saved in Kafka                               |
| THROTTLE            | If any records in the produce request are invalid, throttle the producer for a certain amount of time (`throttleTimeMs`). All records are still saved in Kafka. |

#### Dead letter topic

If a dead letter topic service is configured for Gateway, you can optionally supply a topic name for this policy to use for any records which are considered invalid. This topic will be created with the default config for your Kafka setup.

Any record that the policy considers invalid, is written to the dead letter topic and has some headers added for audit purposes. Please note that this is also done in the `AUDIT_LOG_ONLY` mode, even though the records in this mode are still written to the "real" topic.

| Header      | Message                                                              |
| :---------- | :------------------------------------------------------------------- |
| X-ERROR-MSG | Message does not match the statement \[ ...]                         |
| X-TOPIC     | The topic that the message was intended to be written to             |
| X-PARTITION | The partition of the topic the message was intended to be written to |

The generation of these headers can be disabled with the `addErrorHeader` configuration parameter (defaults to `true`).

If no `deadLetterTopic` is configured for the policy, no messages will be written out in this manner.

#### Audit log

Any policy violation is logged in the configured Gateway audit log. This is currently logged at the *batch* level for each topic in the produce request. There's no per record audit - it identifies that a policy breach occurred for the produce request and identifies the tenant, username and client IP for the request.

#### Configuration

| Key                  | Type                                | Description                                                                |
| :------------------- | :---------------------------------- | :------------------------------------------------------------------------- |
| statement            | String                              | SQL statement                                                              |
| schemaRegistryConfig | [Schema registry](#schema-registry) | Schema registry config                                                     |
| action               | [Action](#action)                   | Data quality producer action                                               |
| deadLetterTopic      | String                              | Dead letter topic                                                          |
| addErrorHeader       | boolean (default `true`)            | Adds the error information headers into dead letter topic                  |
| throttleTimeMs       | int (default: 100)                  | Value to throttle with (only applicable when action is set to `THROTTLE`). |

##### Schema registry

| Key                   | Type   | Default     | Description                                                                                                                                                             |
| --------------------- | ------ | ----------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `type`                | string | `CONFLUENT` | The type of schema registry to use: choose `CONFLUENT` (for Confluent-like schema registries including OSS Kafka) or `AWS` for AWS Glue schema registries.              |
| `additionalConfigs`   | map    |             | Additional properties to map to specific security-related parameters. For enhanced security, you can hide the sensitive values using environment variables as secrets.​ |
| **Confluent Like**    |        |             | **Configuration for Confluent-like schema registries**                                                                                                                  |
| `host`                | string |             | URL of your schema registry.                                                                                                                                            |
| `cacheSize`           | string | `50`        | Number of schemas that can be cached locally by this Interceptor so that it doesn't have to query the schema registry every time.                                       |
| **AWS Glue**          |        |             | **Configuration for AWS Glue schema registries**                                                                                                                        |
| `region`              | string |             | The AWS region for the schema registry, e.g. *us-east-1*                                                                                                                |
| `registryName`        | string |             | The name of the schema registry in AWS (leave blank for the AWS default of *default-registry*)                                                                          |
| `basicCredentials`    | string |             | Access credentials for AWS (see below section for structure)                                                                                                            |
| **AWS credentials**   |        |             | **AWS credentials configuration**                                                                                                                                       |
| `accessKey`           | string |             | The access key for the connection to the schema registry.                                                                                                               |
| `secretKey`           | string |             | The secret key for the connection to the schema registry.                                                                                                               |
| `validateCredentials` | bool   | `true`      | `true` / `false` flag to determine whether the credentials provided should be validated when set.                                                                       |
| `accountId`           | string |             | The Id for the AWS account to use.                                                                                                                                      |

If you don't supply a `basicCredentials` section for the AWS Glue schema registry, the client we use to connect will instead attempt to find the connection information it needs from the environment and the credentials required can be passed this way to Gateway as part of its core configuration. [Find out more from AWS documentation](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default) <Icon icon="up-right-from-square" />.

[Read our blog about schema registry](https://www.conduktor.io/blog/what-is-the-schema-registry-and-why-do-you-need-to-use-it/) <Icon icon="up-right-from-square" />.

#### Action

| Action              | Description                                                                                    |
| :------------------ | :--------------------------------------------------------------------------------------------- |
| BLOCK\_WHOLE\_BATCH | If one message is invalid, block the whole batch                                               |
| AUDIT\_LOG\_ONLY    | If messages are invalid, audit log only (all messages still are saved in Kafka)                |
| THROTTLE            | If messages are invalid, throttle the producer for a certain amount of time (`throttleTimeMs`) |

#### Example

```json theme={null}
{
  "name": "myDataQualityProducerPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.safeguard.DataQualityProducerPlugin",
  "priority": 100,
  "config": {
    "statement": "SELECT x FROM orders WHERE amount_cents > 0 AND amount_cents < 1000000",
    "schemaRegistryConfig": {
       "host": "http://schema-registry:8081"
    },
    "action": "BLOCK_WHOLE_BATCH",
    "deadLetterTopic": "dead-letter-topic",
    "addErrorHeader": false
  }
}
```

## Validate schema payload

To enable your Kafka consumers to confidently and independently access data, ensure that all records sent through your Kafka system conform to an agreed structure. Records with missing or invalid schemas can cause application outages, as consumers may be unable to process the unexpected record format.

Moreover, the use of schemas can broadly only assert structural correctness of data and a level of compatibility for those structures. When it comes to the *values* in a record often all you can assert is a basic data type (integer, string, double etc.). This means that using a valid schema solves *some* concerns around data quality, there are other concerns to be dealt with in a bespoke or distributed manner across all the clients of a given data type.

Finally, while correct structure can be enforced in a Kafka ecosystem at a client level - each client needs to ensure that it knows and follows the expectations for the data. You cannot prevent one client correctly writing AVRO to a topic, while another one writes plain JSON to the same topic. If one client doesn't know the rules, it can't follow them.

### Enforce centralized policies

The schema validation Interceptor provides functionality that can be configured once in your Kafka system on the source for data (a topic), to ensure that:

* All records produced to Kafka have a schema set
* The record contents adhere to that schema
* The fields (values) in any given record comply to business validation rules you have set in the schema

This policy provides a centralized enforcement of the validation of these rules at the point of write to Kafka. This enforcement cannot be bypassed or ignored by a client, so provides a strong guarantee that data actually written in to Kafka does match your rule set.

[Read our blog about schema registry](https://www.conduktor.io/blog/what-is-the-schema-registry-and-why-do-you-need-to-use-it/) <Icon icon="up-right-from-square" />.

#### How does the Policy Work?

The policy operates on Produce Requests made to Kafka, and will inspect the entire batch of records in a request. Based on its setup, it performs various checks and then will take an action if it finds any problems.

The first important thing to note is that the Policy will do *nothing* if there is no Audit Log configured for Gateway (as it does not want to silently reject any data). So for the policy to work at all, you **must have the Audit Log configured**.

Next point of note is that the policy will only check the value for a Kafka record, and does not currently support checking the key or headers for a record.

The core config values for the policy itself are:

* `topic` : the topic/s to apply the rule to
* `schemaIdRequired` : whether records must/must not have a schema assigned to them
* `validateSchema` : whether the policy should check if the data for the record matches the schema found for the record.
* `action` : what to do if a problem is found

There are three levels of check you can apply:

| Setup                                               | Effect                                                                                                                                                                                                                                        |
| :-------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `schemaIdRequired = false`                          | Ensures that no records have a schema!                                                                                                                                                                                                        |
| `schemaIdRequired = true`, `validateSchema = false` | Ensures that records have a valid schema set, and that schema exists in the schema registry. Does not check whether the value actually matches the schema though.                                                                             |
| `schemaIdRequired = true`, `validateSchema = true`  | Ensures that records have a valid schema set, that schema exists in the schema registry and that the value in the record matches the schema. This includes any data validation rules in the schema (see below) as well as a structural check. |

#### Action

If any problems are found, the policy will take an action as configured. The `action` can be set to one of:

* `BLOCK` → If any records in the batch fail the policy checks, record the problems in the audit log and then return an error to the client failing the entire batch. No records are written to Kafka at all if at least one of the records in the batch is considered invalid.
* `INFO` → In this mode the data is always written to Kafka whether it passes the checks or not - but any problems found recorded in the audit log.
* `THROTTLE` → If any records in the batch fail the policy checks, the data is still written to Kafka but the request will be throttled with time = `throttleTimeMs`, forcing the client to back off. Any problems found are recorded in the audit log.

#### Dead letter topic

If a dead letter topic service is configured for Gateway, then you can optionally supply a topic name for this policy to use for any records which are considered invalid. This topic will be created with the default config for your Kafka setup.

Any record which the policy considers invalid is written to the dead letter topic, and has some headers added for audit purposes. Please note that this is done in the `AUDIT_LOG_ONLY` mode also, even though the records in this mode are still written to the "real" topic.

| Header      | Message                                                               |
| :---------- | :-------------------------------------------------------------------- |
| X-ERROR-MSG | Description of the reason for the policy failure                      |
| X-TOPIC     | The topic the message was intended to be written to                   |
| X-PARTITION | The partition of that topic the message was intended to be written to |

The generation of these headers can be disabled if required, through the `addErrorHeader` configuration parameter (defaults to `true`).

If no `deadLetterTopic` is configured for the policy, then no messages will be written out in this manner.

#### Configuration

The full configuration topics for the policy are as below.

| Name                 | Type                        | Default | Description                                                                                       |
| :------------------- | :-------------------------- | :------ | :------------------------------------------------------------------------------------------------ |
| topic                | String                      | `.*`    | Topics that match this regex will have the Interceptor applied                                    |
| schemaIdRequired     | Boolean                     | `false` | Records must/must not have schemaId                                                               |
| validateSchema       | Boolean                     | `false` | If true, deserialize the record, validate the record structure and fields within the data itself. |
| action               | `BLOCK`, `INFO`, `THROTTLE` | `BLOCK` | Action to take if the value is outside the specified range.                                       |
| schemaRegistryConfig | Schema registry             | N/A     | Schema registry Config                                                                            |
| celCacheSize         | int                         | 100     | In memory cache size for cel expressions, balancing speed and resource use, optimize performance. |
| deadLetterTopic      | String                      |         | Dead letter topic. Not used if this parameter is not set.                                         |
| addErrorHeader       | Boolean                     | `true`  | Add or not add the error information headers into dead letter topic                               |
| throttleTimeMs       | int                         | 100     | Value to throttle with (only applicable when action is set to `THROTTLE`).                        |

#### Schema registry

| Key                   | Type   | Default     | Description                                                                                                                                                           |
| --------------------- | ------ | ----------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `type`                | string | `CONFLUENT` | The type of schema registry to use: choose `CONFLUENT` (for Confluent-like schema registries including OSS Kafka) or `AWS` for AWS Glue schema registries.            |
| `additionalConfigs`   | map    |             | Additional properties maps to specific security-related parameters. For enhanced security, you can hide the sensitive values using environment variables as secrets.​ |
| **Confluent Like**    |        |             | **Configuration for Confluent-like schema registries**                                                                                                                |
| `host`                | string |             | URL of your schema registry.                                                                                                                                          |
| `cacheSize`           | string | `50`        | Number of schemas that can be cached locally by this Interceptor so that it doesn't have to query the schema registry every time.                                     |
| **AWS Glue**          |        |             | **Configuration for AWS Glue schema registries**                                                                                                                      |
| `region`              | string |             | The AWS region for the schema registry, e.g. `us-east-1`                                                                                                              |
| `registryName`        | string |             | The name of the schema registry in AWS (leave blank for the AWS default of `default-registry`)                                                                        |
| `basicCredentials`    | string |             | Access credentials for AWS (see below section for structure)                                                                                                          |
| **AWS Credentials**   |        |             | **AWS Credentials Configuration**                                                                                                                                     |
| `accessKey`           | string |             | The access key for the connection to the schema registry.                                                                                                             |
| `secretKey`           | string |             | The secret key for the connection to the schema registry.                                                                                                             |
| `validateCredentials` | bool   | `true`      | `true` / `false` flag to determine whether the credentials provided should be validated when set.                                                                     |
| `accountId`           | string |             | The Id for the AWS account to use.                                                                                                                                    |

If you do not supply a `basicCredentials` section for the AWS Glue schema registry, the client we use to connect will instead attempt to find the connection information is needs from the environment, and the credentials required can be passed this way to Gateway as part of its core configuration. More information on the setup for this is found in the [AWS documentation](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default) <Icon icon="up-right-from-square" />.

[Read our blog about schema registry](https://www.conduktor.io/blog/what-is-the-schema-registry-and-why-do-you-need-to-use-it/) <Icon icon="up-right-from-square" />.

#### Example

```json theme={null}
{
  "name": "mySchemaIdValidationInterceptor",
  "pluginClass": "io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin",
  "priority": 100,
  "config": {
    "topic": "topic_1.*",
    "schemaIdRequired": true,
    "validateSchema": true,
    "schemaRegistryConfig": {
      "host": "http://schema-registry:8081"
    },
    "action": "BLOCK",
    "celCacheSize": 100
  }
}
```

##### Schema registry with secured template

<Tabs>
  <Tab title="curl">
    ```json theme={null}
    {
      "name": "mySchemaIdValidationInterceptor",
      "pluginClass": "io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin",
      "priority": 100,
      "config": {
        "topic": "topic_1.*",
        "schemaIdRequired": true,
        "validateSchema": true,
        "schemaRegistryConfig": {
          "host": "http://schema-registry:8081",
          "additionalConfigs": {
            "schema.registry.url": "${SR_URL}",
            "basic.auth.credentials.source": "${SR_BASIC_AUTH_CRED_SRC}",
            "basic.auth.user.info": "${SR_BASIC_AUTH_USER_INFO}"
          }
        },
        "action": "BLOCK",
        "celCacheSize": 100
      }
    }
    ```
  </Tab>

  <Tab title="Conduktor CLI">
    ```yaml theme={null}
    apiVersion: gateway/v2
    kind: Interceptor
    metadata:
      name: mySchemaIdValidationInterceptor
      scope:
        vCluster: passthrough
    spec:
      pluginClass: io.conduktor.gateway.interceptor.safeguard.SchemaPayloadValidationPolicyPlugin
      priority: 100
      config:
        topic: "topic_1.*"
        schemaIdRequired: true
        validateSchema: true
        schemaRegistryConfig:
          host: http://schema-registry:8081
          additionalConfigs:
            schema.registry.url: $${SR_URL}
            basic.auth.credentials.source: $${SR_BASIC_AUTH_CRED_SRC}
            basic.auth.user.info: $${SR_BASIC_AUTH_USER_INFO}
        action: BLOCK
        celCacheSize: 100
    ```

    Apply with:

    ```bash theme={null}
    conduktor apply -f schema-validation-secured-template.yaml
    ```
  </Tab>
</Tabs>

#### Schema payload validations

When configured to do so, the schema validation Interceptor supports validating the value in a Kafka record against a specific set custom constraints for AvroSchema records. This is similar to the validations provided by JsonSchema, such as:

For fields in an Avro schema, you can specify specific constraints on what is considered a correct value. These rules operate on the specific fields value only.

* **INT**, **LONG**, **FLOAT**, **DOUBLE**: `minimum`, `maximum`, `exclusiveMinimum`, `exclusiveMaximum`, `multipleOf`
* **STRING**: `minLength`, `maxLength`, `pattern`, `format`
* **ARRAY**: `maxItems`, `minItems`

Current supported String `format` values:

* `byte`, `date`, `time`, `date-time`, `duration`, `uri`, `uri-reference`, `uri-template`, `uri`, `email`, `hostname`, `ipv4`, `ipv6`, `regex`, `uuid`, `json-pointer`, `json-pointer-uri-fragment`, `relative-json-pointer`

This Interceptor also supports further validating elements from the whole payload against specific custom constraints - or Metadata Rules - using an expression based on the CEL [Common Expression Language](https://github.com/google/cel-spec) <Icon icon="up-right-from-square" /> format. This provides a means to define more advanced rules dependent on *multiple* values in a record.

#### Metadata rule

| Key        | Type   | Description                                                                                                     |
| :--------- | :----- | :-------------------------------------------------------------------------------------------------------------- |
| name       | string | Rule name                                                                                                       |
| expression | string | CEL expression for validation, must return `BOOLEAN`                                                            |
| message    | string | Error message if payload not matches the `expression` with namespace `message.` represents for produced message |

#### JSON schema example

In JSON schema, constraints and rules are defined directly in the schema. Here's an example that includes various validations:

```json theme={null}
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "properties": {
    "name": {
      "type": "string",
      "minLength": 3,
      "maxLength": 50,
      "expression": "size(name) >= 3"
    },
    "age": {
      "type": "integer",
      "minimum": 0,
      "maximum": 120,
      "expression": "age >= 0 && age <= 120"
    },
    "email": {
      "type": "string",
      "format": "email",
      "expression": "email.contains('foo')"
    },
    "address": {
      "type": "object",
      "properties": {
        "street": {
          "type": "string",
          "minLength": 5,
          "maxLength": 10,
          "expression": "size(street) >= 5 && size(street) <= 10"
        },
        "city": {
          "type": "string",
          "minLength": 2,
          "maxLength": 50
        }
      },
      "expression": "size(address.street) > 1 && address.street.contains('paris') || address.city == 'paris'"
    },
    "hobbies": {
      "type": "array",
      "items": {
        "type": "string"
      },
      "minItems": 3,
      "expression": "size(hobbies) >= 3"
    }
  },
  "metadata": {
    "rules": [
      {
        "name": "check hobbies size and name",
        "expression": "size(message.hobbies) == 3 && size(message.name) > 3",
        "message": "hobbies must have 3 items"
      },
      {
        "name": "checkAge",
        "expression": "message.age >= 18",
        "message": "age must be greater than or equal to 18"
      },
      {
        "name": "check email",
        "expression": "message.email.endsWith('yahoo.com')",
        "message": "email should end with 'yahoo.com'"
      },
      {
        "name": "check street",
        "expression": "size(message.address.street) >= 3",
        "message": "address.street length must be greater than equal to 3"
      }
    ]
  }
}
```

#### Avro schema example

In Avro, constraints and rules are defined directly in the schema. Here's an example that includes various validations:

```json theme={null}
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string", "minLength": 3, "maxLength": 50, "expression": "size(name) >= 3 && size(name) <= 50"},
    {"name": "age", "type": "int", "minimum": 0, "maximum": 120, "expression": "age >= 0 && age <= 120"},
    {"name": "email", "type": "string", "format": "email", "expression": "email.contains('foo')"},
    {
      "name": "address",
      "type": {
        "type": "record",
        "name": "AddressRecord",
        "fields": [
          {"name": "street", "type": "string", "minLength": 5, "maxLength": 100, "expression": "size(street) >= 5 && size(street) <= 10"},
          {"name": "city", "type": "string", "minLength": 2, "maxLength": 50}
        ]
      },
      "expression": "size(address.street) >= 5 && address.street.contains('paris') || address.city == 'paris'"
    },
    {"name": "hobbies", "type": {"type": "array", "items": "string"}, "minItems": 3, "expression": "size(hobbies) >= 3"},
    {
      "name": "friends",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "Friend",
          "fields": [
            {"name": "name", "type": "string", "expression": "size(name) < 3"},
            {"name": "age", "type": "int", "minimum": 2, "maximum": 10}
          ]
        }
      }
    }
  ],
  "metadata": {
    "rules": [
      {
        "name": "check hobbies size and name",
        "expression": "size(message.hobbies) == 3 && size(message.name) > 3",
        "message": "hobbies must have 3 items"
      },
      {
        "name": "checkAge",
        "expression": "message.age >= 18",
        "message": "age must be greater than or equal to 18"
      },
      {
        "name": "check email",
        "expression": "message.email.endsWith('yahoo.com')",
        "message": "email should end with 'yahoo.com'"
      },
      {
        "name": "check street",
        "expression": "size(message.address.street) >= 3",
        "message": "address.street length must be greater than equal to 3"
      }
    ]
  }
}
```

## Related resources

* [Find out more about Data Quality Policies](/guide/conduktor-concepts/data-quality-policies)
* [Enforce data quality](/guide/use-cases/enforce-data-quality)
* [Find out more about Interceptors](/guide/conduktor-concepts/interceptors)
* [Give us feedback/request a feature](https://conduktor.io/roadmap) <Icon icon="up-right-from-square" />
