Large message/batch handling
Introduction
Amazon S3 or Amazon Simple Storage Service is a service offered by Amazon Web Services that provides object storage through a web service interface.
Large message/batch handling interceptor will save the actual message produced to gateway into Amazon Simple Storage Service.
It helps to protect data or optimize storage in actual kafka.
Configuration
key | type | default | description |
---|---|---|---|
topic | String | .* | Topics that match this regex will have the interceptor applied |
s3Config | S3 | Amazon S3 Configuration | |
minimumSizeInBytes | int | Only upload to s3 for batch/message record has size greater than or equal to this minimumSizeInBytes |
S3
By default, s3 credentials default on managed identity. They will be overwritten if a specific basic redentials
(accessKey
and secretKey
)
or session credentials
(accessKey
, secretKey
and sessionToken
) is configured.
key | type | description |
---|---|---|
accessKey | string | S3 access key |
secretKey | string | S3 secret key |
sessionToken | string | S3 session token |
bucketName | string | S3 bucket name |
uri | string | S3 uri |
region | string | S3 Region |
localDiskDirectory | string | Local temp storage, used when we download file from S3 while fetching messages |
Examples
Large batches
Each batch above the minimumSizeInBytes threshold will be saved in one file on s3.
With credentials default on managed identity:
{
"name": "myLargeBatchHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}
With basic credentials
:
{
"name": "myLargeBatchHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}
With session credentials
:
{
"name": "myLargeBatchHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"sessionToken": "mySessionToken",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}
Large messages
Each individual message above the minimumSizeInBytes threshold will be saved in one file on s3.
With credentials default on managed identity:
{
"name": "myLargeMessageHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}
With basic credentials
:
{
"name": "myLargeMessageHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}
With sessionCredentials
:
{
"name": "myLargeMessageHandlingPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"minimumSizeInBytes": 1024,
"s3Config": {
"accessKey": "myAccessKey",
"secretKey": "mySecretKey",
"sessionToken": "mySessionToken",
"bucketName": "myBucketName",
"uri": "http://myexampleuri",
"region": "us-east-1",
"localDiskDirectory": "myStorage/"
}
}
}