Here’s a list of all the Interceptors: Find out how to deploy an Interceptor.

Audit Interceptor

This Interceptor logs information from API key requests. To use it, inject it and implement ApiKeyAuditLog interface for audit. The currently supported Kafka API requests are:
  • ProduceRequest (PRODUCE)
  • FetchRequest (FETCH)
  • CreateTopicRequest (CREATE_TOPICS)
  • DeleteTopicRequest (DELETE_TOPICS)
  • AlterConfigRequest (ALTER_CONFIGS)

Configure audit Interceptor

NameTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied
apiKeysSet[string]Set of Kafka API keys to be audited
vclusterString.*vcluster that matches this regex will have the Interceptor applied
usernameString.*username that matches this regex will have the Interceptor applied
consumerGroupIdString.*consumerGroupId that matches this regex will have the Interceptor applied
topicPartitionsSet[Integer]Set of topic partitions to be audited

Audit Interceptor example

{
  "name": "myAuditInterceptorPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.AuditPlugin",
  "priority": 100,
  "config": {
    "topic": ".*",
    "apiKeys": [
      "PRODUCE",
      "FETCH"
    ],
    "vcluster": ".*",
    "username": ".*",
    "consumerGroupId": ".*",
    "topicPartitions": [
      1,
      2
    ]
  }
}

Data masking Interceptor

Field level data masking Interceptor masks sensitive fields within messages as they are consumed.

Configure data masking Interceptor

The policies will be applied when consuming messages.
KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied
policiesPolicy listList of your masking policies
errorPolicyStringfail_fetchDetermines the plugin behavior when it can’t parse a fetched message: fail_fetch or skip_masking

Data masking policy

KeyTypeDescription
nameStringUnique name to identify your policy
fieldslistList of fields that should be obfuscated with the masking rule. Fields can be in a nested structure with dot .. For example: education.account.username, banks[0].accountNo or banks[*].accountNo
ruleRuleRule
schemaRegistryConfigSchema registryThe schema registry in use.

Data masking rule

KeyTypeDefaultDescription
typeMasking typeMASK_ALLThe type of masking (see below).
maskingCharchar*The character used for masking data.
numberOfCharsnumberNumber of masked characters, required if type != MASK_ALL

Masking type

  • MASK_ALL: all data will be masked
  • MASK_FIRST_N: the first n characters will be masked
  • MASK_LAST_N: the last n characters will be masked

Error policy

You can control the plugin behavior when it can’t parse a fetched message through its errorPolicy which can be set to fail_fetch or skip_masking. The default is fail_fetch. In this mode, the plugin will return a failure to read the batch which the fetch record is part of, effectively blocking any consumer. In skip_masking mode, if there’s a failure to parse a message being fetched (e.g. an encrypted record is read in), then that record is skipped and returned un-masked.

Schema registry

KeyTypeDefaultDescription
typestringCONFLUENTThe type of schema registry to use. Choose CONFLUENT (for Confluent-like schema registries including OSS Kafka) or AWS for AWS Glue schema registries.
additionalConfigsmapAdditional properties that map to specific security-related parameters. For enhanced security, you can hide the sensitive values using environment variables as secrets.​
Confluent LikeConfiguration for Confluent-like schema registries
hoststringURL of your schema registry.
cacheSizestring50Number of schemas that can be cached locally by this Interceptor so that it doesn’t have to query the schema registry every time.
AWS GlueConfiguration for AWS Glue schema registries
regionstringThe AWS region for the schema registry, e.g. us-east-1.
registryNamestringThe name of the schema registry in AWS. Leave blank for the AWS default of default-registry.
basicCredentialsstringAccess credentials for AWS.
AWS credentialsAWS credential configuration
accessKeystringThe access key for the connection to the schema registry.
secretKeystringThe secret key for the connection to the schema registry.
validateCredentialsbooltrueThe true or false flag determines whether the credentials provided should be validated when set.
accountIdstringThe Id for the AWS account to use.
If you don’t provide a basicCredentials section for the AWS Glue schema registry, the client we use to connect will instead attempt to find the connection information is needs from the environment and the credentials required can be passed this way to Gateway as part of its core configuration. Find out more about setting up AWS.
Read our blog about schema registry.

Data masking Interceptor example

{
  "name": "myFieldLevelDataMaskingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.FieldLevelDataMaskingPlugin",
  "priority": 100,
  "config": {
    "schemaRegistryConfig": {
      "host": "http://schema-registry:8081"
    },
    "policies": [
      {
        "name": "Mask password",
        "rule": {
          "type": "MASK_ALL"
        },
        "fields": [
          "password"
        ]
      },
      {
        "name": "Mask visa",
        "rule": {
          "type": "MASK_LAST_N",
          "maskingChar": "X",
          "numberOfChars": 4
        },
        "fields": [
          "visa"
        ]
      }
    ]
  }
}

Secured schema registry

{
  "name": "myFieldLevelDataMaskingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.FieldLevelDataMaskingPlugin",
  "priority": 100,
  "config": {
    "schemaRegistryConfig": {
      "host": "http://schema-registry:8081",
      "additionalConfigs": {
        "schema.registry.url": "${SR_URL}",
        "basic.auth.credentials.source": "${SR_BASIC_AUTH_CRED_SRC}",
        "basic.auth.user.info": "${SR_BASIC_AUTH_USER_INFO}"
      }
    },
    "policies": [
      {
        "name": "Mask password",
        "rule": {
          "type": "MASK_ALL"
        },
        "fields": [
          "password"
        ]
      },
      {
        "name": "Mask visa",
        "rule": {
          "type": "MASK_LAST_N",
          "maskingChar": "X",
          "numberOfChars": 4
        },
        "fields": [
          "visa"
        ]
      }
    ]
  }
}

Dynamic header injection Interceptor

This Interceptor injects headers (such as user IP) to the messages as they are produced through Gateway. We support templating in this format: X-CLIENT_IP: "{{userIp}} testing". Here are the values we can expand:
  • uuid
  • userIp
  • vcluster
  • user
  • clientId
  • gatewayIp
  • gatewayHost
  • gatewayVersion
  • apiKey
  • apiKeyVersion
  • timestampMillis

Configure header injection Interceptor

ConfigTypeDescription
topicStringRegular expression that matches topics from your produce request
headersMapMap of header key and header value will be injected, with the header value we can use {{userIp}} for the user ip information we want to be injected
overrideIfExistsbooleanDefault false, configuration to override header on already exist

Header injection Interceptor example

{
  "name": "myDynamicHeaderInjectionInterceptor",
  "pluginClass": "io.conduktor.gateway.interceptor.DynamicHeaderInjectionPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "headers": {
      "X-CLIENT_IP": "{{userIp}} testing"
    },
    "overrideIfExists": true
  }
}
Let’s produce a simple record to the injectHeaderTopic topic.
echo 'inject_header' | docker-compose exec -T kafka-client \
    kafka-console-producer  \
        --bootstrap-server conduktor-gateway:6969 \
        --producer.config /clientConfig/gateway.properties \
        --topic injectHeaderTopic
Let’s consume from our injectHeaderTopic.
docker-compose exec kafka-client \
  kafka-console-consumer \
    --bootstrap-server conduktor-gateway:6969 \
    --consumer.config /clientConfig/gateway.properties \
    --topic injectHeaderTopic \
    --from-beginning \
    --max-messages 1 \
    --property print.headers=true
You should see the message with headers as below
X-USER_IP:172.19.0.3 testing   inject_header

Encryption Interceptor

This Interceptor is a robust and versatile solution for securing data within Kafka records. Its primary function is to safeguard sensitive information from unauthorized access, thereby enhancing data security both in transit and at rest. The key features are:
  • Field-Level encryption: encrypts specific fields within Kafka records, e.g. passwords or PII (Personally Identifiable Information). This is very useful when only certain parts of a message contain sensitive data.
  • Full Message encryption: encrypts the entire Kafka record, ensuring that all contents of the message are secured. This is particularly useful when the entire message is sensitive.

How it works

Once configured, the encryption and decryption processes are seamlessly managed by the interceptor. Encryption: The interceptor identifies the data that needs to be encrypted & the KMS details to share the encryption key, Gateway will then encrypt and produce the message. Decryption: Similar to encryption, the interceptor can decrypt either the entire message, specific fields or all the fields, based on your configuration.

Flexibility and Compatibility

You can refine how it’s encrypted with a choice of algorithm and KMS provider. Multiple Encryption Algorithms: The interceptor supports a variety of encryption algorithms, allowing you to choose the one that best meets your security requirements. KMS Integration: It integrates with various Key Management Services (KMS), providing flexibility in how you manage and store encryption keys.

Use encryption Interceptor

Encrypt data

  1. Identify data. The Interceptor first determines, based on its configuration, what data needs to be encrypted. This may include the entire message, specific fields or all the fields within the message. For example, if you have configured the interceptor to encrypt a password field, it will target this field within the incoming Kafka record for encryption.
  2. Retrieve key. The interceptor then generates a key and shares it with the the configured KMS (Key Management Service) or retrieves it, if it exists. Supported KMS options include Vault, Azure, AWS, GCP or an in-memory service for local development only. The key is fetched using the keySecretId specified in your configuration to ensure the correct key is utilized.
  3. Encrypt. Once the key is generated/retrieved, the Interceptor encrypts the identified data using the specified encryption algorithm. The original data within the message is now replaced with the encrypted version.
  4. Transmit. Finally, the encrypted data is either stored as is if it is an Avro record or converted into a JSON format and is then transmitted as a string to the designated destination.

Decrypt data

  1. Identify data. The Interceptor first determines, based on its configuration, which data needs to be decrypted. This may include the entire message, specific fields, or all the fields within the message.
  2. Retrieve key. Next, the Interceptor retrieves the decryption key from the KMS. Typically, this is the same key that was used during encryption. The correct key is obtained using the keySecretId provided in your Interceptor configuration and that’s stored in the header of the record, on the backing Kafka.
  3. Decrypt. The Interceptor then decrypts the identified data using the retrieved key and the specified encryption algorithm. The decrypted data replaces the encrypted data within the message.
  4. Consume. Once decrypted, the message is ready for consumption by the end-user or application. The Interceptor ensures that the decrypted data is correctly formatted and fully compatible with the Kafka record structure.
These encryption and decryption processes are fully transparent to the end-user or application. The Interceptor manages all these operations, allowing you to concentrate on the core business logic.

Manage keys

The Interceptor uses the envelope encryption technique to encrypt data. Here are some key terms we’ll use:
TermDefinition
KMSKey Management Service: A system responsible for managing and storing cryptographic keys, including the KEK.
KEKKey Encryption Key: A key stored in the KMS, used to encrypt the DEK. Notably, the KEK is never exposed to or known by the interceptor.
DEKData Encryption Key: A key generated by the interceptor, used to encrypt the actual data.
EDEKEncrypted Data Encryption Key: The DEK that has been encrypted by the KEK, ensuring that the DEK remains secure when stored or transmitted.
To encrypt the data, Gateway:
  1. Generates a DEK that is used to encrypt the data
  2. Sends the DEK to the KMS, so it encrypts it using its KEK and returns the EDEK to Gateway
  3. Cache the DEK and EDEK in memory for a configurable Time to Live (TTL)
  4. Encrypts the data using the DEK
  5. Stores the EDEK alongside the encrypted data, and both are sent to the backing Kafka
To decrypt the data, Gateway:
  1. Retrieves the EDEK that’s stored with the encrypted data
  2. Sends the EDEK to the KMS, which decrypts it (using the KEK) and returns the DEK to Gateway
  3. Decrypts the data using the DEK
envelope encryption

Optimizing performance with caching

To reduce the number of calls to the KMS and avoid some of the steps detailed above, the interceptor caches the DEK in memory. The cache has a configurable Time to Live (TTL), and the interceptor will call the KMS to decrypt the EDEK if the DEK is not in the cache, as detailed in the steps 1 and 2 above.