Skip to main content
Quick navigation


There are a number of interceptors already available as part of the Conduktor Marketplace, where you can see what is already available. If one of the existing interceptors does not fulfil your requirements, you can create your own interceptor using the Interceptor API.

This page describes the steps required to create your own interceptor, ready to deploy to your environment, or release to the Conduktor Marketplace for the wider community to enjoy.

This is applicable to only the Open Source Conduktor Gateway.

If you have ideas or requirements for interceptors that can be added to the Enterprise Gateway let us know.

What is the Conduktor Gateway

The Conduktor Gateway is a powerful tool that sits between your Kafka clients and your Kafka clusters. This gateway hosts plugable, extensible interceptors, which can be configured to have access to all Kafka traffic passing through the Gateway.

Interceptors are configured to trigger actions based on the type and content of Kafka traffic. From manipulating the Kafka data itself, to interacting with an external service, what can be achieved by being able to act on any of your in-flight Kafka data is limited only by your imagination.

Interceptors are configured in the Conduktor Gateway's application.yaml configuration file by specifying a fully qualified class name, configuration details and a priority. Multiple interceptors can be configured to run sequentially, in the priority specified order.

If there is no interceptor available that fulfils your requirements, you can easily add your own.

Fundamentals of Interceptor Development

The Interceptor Framework defines the two interfaces required to define your own Interceptors: and

The interface defines a method that takes a Kafka request or response object, which can then be used to drive that interceptor function.

The Kafka requests or responses are objects that inherit from Kafka’s AbstractRequestResponse type. For example a ProduceRequest or a FetchResponse.

An interceptor is likely to contain multiple implementations of the interface. Each of these implementations work on different types of Kafka requests or responses. The interface implementations within an Interceptor should all have the same general purpose, such as encryption, logging or adding a message header.

For example, a logging Interceptor may include two implementations; one that takes a ProduceRequest Java object and logs information about any produced records, and a separate implementation that takes a FetchResponse Java object and logs information about any consumed records.

These are separate implementation classes as the processing of a ProduceRequest object is quite different to the processing on a ConsumeResponse object

An Plugin returns a list of all the interceptors that combine to to make the function provided by the Interceptor.

It also processes the interceptor configuration, which is specified in application.yaml.

Interceptor jar

The Interceptor jar file contains the implementations and the implementation. The jar file should be added to the classpath of your gateway to provide access to the Interceptor functionality.


The interface defines the API that is used to intercept a Kafka Request or Response:

CompletionStage<T> intercept(T input, InterceptorContext interceptorContext) {
//Interceptor code here


<T> extends AbstractRequestResponse

There are three conceptual levels for how specific the interceptor type <T> match is:

  • Specific requests or responses, where <T> is any of the specific request or response classes (eg FetchResponse)
  • All requests or all responses, where <T> is AbstractRequest or AbstractResponse
  • All flows, where <T> is AbstractRequestResponse.

When writing a new interceptor, use the most specific type of <T> that is suitable for your needs.

The next section describes how to determine what to use for <T>:

Intercept specific request or response types

To intercept a specific request or response, specify the exact type of the request or response in the method declaration. For example

CompletionStage<FetchResponse> intercept(FetchResponse input, InterceptorContext interceptorContext)`

Important: Only specify one instance of the intercept method in each implementation class.

It is not valid to have more than one intercept method, this will result in only one of the interceptor methods being run when Kafka API flows are processed. The following example is not valid:

CompletionStage<FetchResponse> intercept(FetchResponse input, InterceptorContext interceptorContext) { }
CompletionStage<ProduceRequest> intercept(ProduceRequest input, InterceptorContext interceptorContext) { }

Instead, make two separate Java class files that each implement the interface, specifying their own type. For the above example create both

public class FetchLoggerInterceptor implements Interceptor<FetchResponse> {
CompletionStage<FetchResponse> intercept(FetchResponse input, InterceptorContext interceptorContext) {}


public class ProduceLoggerInterceptor implements Interceptor<ProduceRequest> {
CompletionStage<ProduceRequest> intercept(ProduceRequest input, InterceptorContext interceptorContext) {}

Then use Plugin to provide both implementations to the Gateway to register it for use:

public class LoggerInterceptorPlugin implements Plugin {
public List<Interceptor> getInterceptors() {
return List.of(
new FetchLoggerInterceptor(),
new ProduceLoggerInterceptor()

You might want to intercept specific requests or responses if you have a very targeted interceptor that is only applicable to a handful or Kafka API flows, or if each type of request or response being processed has a different type so needs to be handled differently.

Intercept all requests or all responses

To intercept all requests or all responses, specify the generic AbstractRequest or AbstractResponse as the type in the method declaration.

public class RequestsLoggerInterceptor implements Interceptor<AbstractRequest> {
CompletionStage<AbstractRequest> intercept(AbstractRequest input) {}

Then use the Plugin to provide this implementation the Gateway to register it for use:

public class LoggerInterceptorPlugin implements InterceptorPlugin {
public List<Interceptor> getInterceptors() {
return List.of(
new RequestsLoggerInterceptor()

You should intercept AbstractRequest or AbstractResponse if the interceptor needs to work in the same way on all requests or all responses. For example, you might want to process all requests to count the number of request bytes passing through the Gateway to Kafka, this would use an AbstractRequest.

Intercept all traffic (both requests and responses)

To intercept all requests and all responses, specify the generic AbstractRequestResponse as the type in the method declaration.

public class AllLoggerInterceptor implements Interceptor<AbstractRequestResponse> {
CompletionStage<AbstractRequest> intercept(AbstracAbstractRequestResponsetRequest input) {}

Then use the Plugin to provide this implementation to the Gateway:

public class LoggerInterceptorPlugin implements Plugin {
public List<Interceptor> getInterceptors() {
return List.of(
new AllLoggerInterceptor()

You should intercept AbstractRequestResponse if the interceptor needs to work on all requests and responses. For example, you might want to write an audit record for all requests and responses passing through the gateway.


The intercept method on the interface returns a CompletionStage to the Gateway. This holds the Kafka request or response that was passed in to the intercept method. The request or response may have been updated if this interceptor is one that manipulates the Kafka data, or it may remain unchanged if the interceptor does not manipulate the data.

The interceptor’s CompletionStage may hold a synchronous or asynchronous computation result.

For example, an synchronous logging interceptor would return a completed Future as its CompletionStage:

return CompletableFuture.completedFuture(input)

An interceptor that adds data to a Kafka request based on a query to an external database would return a Future, holding the yet to be completed computation on the input Kafka request or response. This example also updates the original request or response input:

CompletableFuture<AbstractRequestResponse> completableFuture = new CompletableFuture<>();

//Run the task asynchronously
Executors.newCachedThreadPool().submit(() -> {
//Call out to database
//Update input based on the slow database call
//Return the updated input by completing the future

return completableFuture;

Important: The Future held in the CompletionStage must be completed by the intercept method. If the Future is not eventually completed, then processing of Kafka API requests will be blocked and they will not be sent to Kafka.

Remember to handle completion of the Future for error cases as well as success cases.

InterceptorContext acts as a container for information about the requests and responses that are being intercepted. This information can be useful to Interceptor implementations. For instance, an auditing interceptor may wish to record the hostname of connected clients.

It holds the following info:

  • direction - The direction of the request (REQUEST or RESPONSE)
  • requestHeader - The Kafka RequestHeader associated with the requests that triggered this request/response.
  • clientAddress - The address of the client that created this request (host/port)
  • inFlightInfo - A Map to store extra information your interceptor may wish to pass on to subsequent interceptors in the chain. Typically, this is used to pass information between REQUEST interceptors and their corresponding RESPONSE interceptors. For instance an audit interceptor for FindCoordinator may wish to store details of the group id requested in the REQUEST in inFlightInfo as the RESPONSE does not contain this information. The map is keyed with String to provide an easy identifier for retrieval but the values can be any Object.

Interceptor order

Individual interceptors are triggered in the order specified by their priority, with 0 being the highest priority and executed first. Negative priorities are not allowed.

If two interceptors have the same priority then order of execution is indeterminate.

Only interceptors applicable to the type of the request or response are triggered for a particular Kafka flow.

For example, a Kafka FetchRequest API request arriving with the Gateway will trigger interceptors where the type of the intercept method parameter is FetchRequest, AbstractRequest or AbstractRequestResponse.

The next interceptor in the prioritised list of applicable interceptors will not run until the previous interceptor’s CompletionStage has run and the associated Future has completed.

Plugin defines two methods:

void configure(Map<String,String> config);

List<Interceptor> getInterceptors();

The configure method is used to process the interceptor configuration read from application.yaml.

The getInterceptors method is used to populate the list of interceptors that this Interceptor jar provides.


Validation of the configuration passed in from the application.yaml file must be done by the configure method, and an appropriate error message returned, including a suggestion of an action to resolve the problem, if the configuration is not valid.

The configure method is run during initialization of the Gateway, and the Gateway will fail to start if the configuration is invalid.

The configure method can be used initialise this configuration on the individual interceptor instances.


The getInterceptors method is used to provide a list of interceptors to the Gateway.

The interceptors could be provided conditionally based on configuration, for example a logging interceptor could have a parameter “incoming_only : boolean”. If only incoming requests are to be logged, then getInterceptors only returns the list of interceptors that apply to incoming requests.

Note: Remember that AbstractRequest is applicable to both requests and responses.

Next Steps

Once you have implemented the and interfaces, the next step is to build your changes into a standalone jar file.

The pom.xml in the loggerInterceptor package demonstrates one way to do this.

Place the built interceptor jar file on the classpath of the gateway.

Restart the gateway with the new jar file on the classpath. In this example, the new interceptor is in a jar file that can be found in the conduktor-gateway repository under myNewInterceptor/target/my-new-interceptor-1.0-SNAPSHOT.jar.

$ cp myNewInterceptor/target/my-new-interceptor-1.0-SNAPSHOT.jar bin/my-new-interceptor-1.0-SNAPSHOT.jar # Best practice is to store your interceptors in a central location
$ export CLASSPATH=$CLASSPATH:bin/my-new-interceptor-1.0-SNAPSHOT.jar
$ bin/

Submit an Interceptor to Conduktor

Conduktor lovse to learn new things, and share what we have learnt with the Kafka community.

If you've written a new interceptor and want to contribute it to the Kafka Community via the Conduktor Marketplace, please get in touch and we'll work with you to share it.


Failed to load interceptors

When attempting to start the Gateway, the following exception is seen:

 Failed to load interceptors
java.lang.ClassNotFoundException: io.conduktor.example.MyPlugin
at jdk.internal.loader.BuiltinClassLoader.loadClass( ~[?:?]

This means that the jar file for the interceptor is not available on the classpath of the running JVM.

Ensure you are not running using the java -jar command, as this overwrites any classpath settings you have included. Check you have specified the fully qualified class name of the Plugin correctly.