Skip to main content
Enterprise The for handling large messages/batches will save the actual messages produced to into a cloud storage service. This helps to protect data or optimize storage in actual Kafka. We currently support:
  • Amazon S3 (Amazon Simple Storage Service) is a service offered by AWS (Amazon Web Services) that provides object storage through a web service interface.
  • Azure Blob Storage is a service offered by Microsoft Azure that provides blob storage.

Configuration

KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied
s3ConfigS3Amazon S3 configuration
azureConfigAzureAzure Blob Storage configuration
minimumSizeInBytesintOnly upload to S3 if batch/message record has size greater than or equal to this minimumSizeInBytes
localDiskDirectorystringLocal temp storage, used when we download file from S3 while fetching messages

Amazon S3

The S3 credentials default to managed identity. They will be overwritten if a specific basic credentials (accessKey and secretKey) or session credentials (accessKey, secretKey and sessionToken) are configured. For S3-compatible storage (MinIO, NetApp ONTAP, Dell ECS, Cloudflare R2), set disableChecksums to true. Otherwise, the AWS SDK sends a checksum in a form these backends do not accept, and uploads fail (the backend may report that the checksum header does not match).
KeyTypeDescription
accessKeystringS3 access key
secretKeystringS3 secret key
sessionTokenstringS3 session token
bucketNamestringS3 bucket name
uristringS3 URI
regionstringS3 region
disableChecksumsbooleanSet to true for S3-compatible storage (MinIO, NetApp ONTAP, Dell ECS, Cloudflare R2) so uploads succeed; the default checksum format is not accepted by these backends. Default: false.

Azure Blob Storage

Note that your application will require at least Storage Blob Data Contributor permissions to be able to read/write the data.
KeyTypeDescription
tenantIdstringAzure tenant ID
clientIdstringAzure client ID
secretstringAzure client secret
blobEndpointstringAzure Blob Storage endpoint to use
bucketNamestringBucket (container) name in Blob Storage configured to store in

Examples

Large batches

As of Gateway v3.18.0, the LargeBatchHandlingPlugin has been deprecated and will be removed in v3.21.0. Use the large message handling plugin instead, which handles individual messages above the size threshold.
Each batch that’s above the minimumSizeInBytes threshold will be saved in one file on Amazon S3, with credentials defaulting to managed identity:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeBatchHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'
With basic credentials:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeBatchHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'
With session credentials:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeBatchHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "sessionToken": "mySessionToken",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'

Large messages

Each individual message that’s above the minimumSizeInBytes threshold will be saved in one file on Amazon S3, with credentials defaulting to managed identity:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeMessageHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'
With basic credentials:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeMessageHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'
With sessionCredentials:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeMessageHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "sessionToken": "mySessionToken",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'