Skip to main content
Quick navigation

Large message/batch handling

Introduction

Amazon S3 or Amazon Simple Storage Service is a service offered by Amazon Web Services that provides object storage through a web service interface.

Large message/batch handling interceptor will save the actual message produced to gateway into Amazon Simple Storage Service.

It helps to protect data or optimize storage in actual kafka.

Configuration

keytypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied
s3ConfigS3Amazon S3 Configuration
minimumSizeInBytesintOnly upload to s3 for batch/message record has size greater than or equal to this minimumSizeInBytes

S3

By default, s3 credentials default on managed identity. They will be overwritten if a specific basic redentials (accessKey and secretKey) or session credentials (accessKey, secretKey and sessionToken) is configured.

keytypedescription
accessKeystringS3 access key
secretKeystringS3 secret key
sessionTokenstringS3 session token
bucketNamestringS3 bucket name
uristringS3 uri
regionstringS3 Region
localDiskDirectorystringLocal temp storage, used when we download file from S3 while fetching messages

Examples

Large batches

Each batch above the minimumSizeInBytes threshold will be saved in one file on s3.

With credentials default on managed identity:

{
"name": "myLargeBatchHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}

With basic credentials:

{
"name": "myLargeBatchHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}

With session credentials:

{
"name": "myLargeBatchHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"sessionToken": "mySessionToken",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}

Large messages

Each individual message above the minimumSizeInBytes threshold will be saved in one file on s3.

With credentials default on managed identity:

{
"name": "myLargeMessageHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}

With basic credentials:

{
"name": "myLargeMessageHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}

With sessionCredentials:

{
"name": "myLargeMessageHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"sessionToken": "mySessionToken",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}