Skip to main content
Quick navigation

Large Message/Batch Handling

Introduction

Large message/batch handling interceptor will save the actual message produced to gateway into a cloud storage service. It helps to protect data or optimize storage in actual kafka.

Currently supported cloud storages are Amazon S3 or Azure Blob Storage:

  • Amazon S3 or Amazon Simple Storage Service is a service offered by Amazon Web Services that provides object storage through a web service interface.
  • Azure Blob Storage is a service offered by Microsoft Azure to provide blob storage.

Configuration

Note: On versions earlier then 3.2.0, you will need to specify the localDiskDirectory property in the S3 connector itself and not in the general configuration.

keytypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied
s3ConfigS3Amazon S3 Configuration
azureConfigAzureAzure Blob Storage Configuration
minimumSizeInBytesintOnly upload to s3 for batch/message record has size greater than or equal to this minimumSizeInBytes
localDiskDirectorystringLocal temp storage, used when we download file from S3 while fetching messages

S3

By default, s3 credentials default on managed identity. They will be overwritten if a specific basic credentials (accessKey and secretKey) or session credentials (accessKey, secretKey and sessionToken) is configured.

keytypedescription
accessKeystringS3 access key
secretKeystringS3 secret key
sessionTokenstringS3 session token
bucketNamestringS3 bucket name
uristringS3 uri
regionstringS3 Region

Azure

Note that your application will require at least Storage Blob Data Contributor permissions to be able to read/write the data.

keytypedescription
tenantIdstringAzure tenant id
clientIdstringAzure client id
secretstringAzure client secret
blobEndpointstringAzure blob storage endpoint to use
bucketNamestringBucket (container) name in blob storage configured to store in

Examples

Large batches

Each batch above the minimumSizeInBytes threshold will be saved in one file on s3.

With credentials default on managed identity:

{
"name": "myLargeBatchHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"localDiskDirectory": "myStorage/",
"s3Config": {
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
}
}
}

With basic credentials:

{
"name": "myLargeBatchHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"localDiskDirectory": "myStorage/",
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
}
}
}

With session credentials:

{
"name": "myLargeBatchHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"localDiskDirectory": "myStorage/",
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"sessionToken": "mySessionToken",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
}
}
}

Large messages

Each individual message above the minimumSizeInBytes threshold will be saved in one file on s3.

With credentials default on managed identity:

{
"name": "myLargeMessageHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"localDiskDirectory": "myStorage/",
"s3Config": {
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
}
}
}

With basic credentials:

{
"name": "myLargeMessageHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"localDiskDirectory": "myStorage/",
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
}
}
}

With sessionCredentials:

{
"name": "myLargeMessageHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"localDiskDirectory": "myStorage/",
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"sessionToken": "mySessionToken",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
}
}
}