This feature is available with Conduktor Scale Plus only.
The Interceptor for handling large messages/batches will save the actual messages produced to Gateway into a cloud storage service. This helps to protect data or optimize storage in actual Kafka. We currently support:
  • Amazon S3 (Amazon Simple Storage Service) is a service offered by AWS (Amazon Web Services) that provides object storage through a web service interface.
  • Azure Blob Storage is a service offered by Microsoft Azure that provides blob storage.

Configuration

KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied
s3ConfigS3Amazon S3 configuration
azureConfigAzureAzure Blob Storage configuration
minimumSizeInBytesintOnly upload to S3 if batch/message record has size greater than or equal to this minimumSizeInBytes
localDiskDirectorystringLocal temp storage, used when we download file from S3 while fetching messages

Amazon S3

The S3 credentials default to managed identity. They will be overwritten if a specific basic credentials (accessKey and secretKey) or session credentials (accessKey, secretKey and sessionToken) are configured.
KeyTypeDescription
accessKeystringS3 access key
secretKeystringS3 secret key
sessionTokenstringS3 session token
bucketNamestringS3 bucket name
uristringS3 URI
regionstringS3 region

Azure Blob Storage

Note that your application will require at least Storage Blob Data Contributor permissions to be able to read/write the data.
KeyTypeDescription
tenantIdstringAzure tenant ID
clientIdstringAzure client ID
secretstringAzure client secret
blobEndpointstringAzure Blob Storage endpoint to use
bucketNamestringBucket (container) name in Blob Storage configured to store in

Examples

Large batches

Each batch that’s above the minimumSizeInBytes threshold will be saved in one file on Amazon S3, with credentials defaulting to managed identity:
{
  "name": "myLargeBatchHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1",
    }
  }
}
With basic credentials:
{
  "name": "myLargeBatchHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1",
    }
  }
}
With session credentials:
{
  "name": "myLargeBatchHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "sessionToken": "mySessionToken",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1",
    }
  }
}

Large messages

Each individual message that’s above the minimumSizeInBytes threshold will be saved in one file on Amazon S3, with credentials defaulting to managed identity:
{
  "name": "myLargeMessageHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1",
    }
  }
}
With basic credentials:
{
  "name": "myLargeMessageHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1",
    }
  }
}
With sessionCredentials:
{
  "name": "myLargeMessageHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "sessionToken": "mySessionToken",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1",
    }
  }
}