Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.conduktor.io/llms.txt

Use this file to discover all available pages before exploring further.

Enterprise The for handling large messages/batches will save the actual messages produced to into a cloud storage service. This helps to protect data or optimize storage in actual Kafka. We currently support:
  • Amazon S3 (Amazon Simple Storage Service) is a service offered by AWS (Amazon Web Services) that provides object storage through a web service interface.
  • Azure Blob Storage is a service offered by Microsoft Azure that provides blob storage.

Configuration

KeyTypeDefaultDescription
topicstring.*Topics matching this regex have the Interceptor applied.
s3ConfigS3Amazon S3 configuration. Mutually exclusive with azureConfig.
azureConfigAzureAzure Blob Storage configuration. Mutually exclusive with s3Config.
minimumSizeInBytesint0Records with sizeInBytes ≥ minimumSizeInBytes are offloaded to cloud storage. Smaller records pass through unchanged.
localDiskDirectorystring${java.io.tmpdir}/myStorage/Path to the local on-disk cache used for both produce-time mirroring and fetch-time caching. See Local disk cache.
localCacheItemSizeint10000Maximum number of cached files (entries, not bytes). Oldest entries are evicted past this count.
localCacheExpireAfterWriteInSecondslong3600Time-to-live since the cache entry was written. Expired entries are removed (and their files deleted) lazily on subsequent activity.

Local disk cache

The interceptor maintains a local on-disk cache at localDiskDirectory that mirrors objects stored in cloud storage (S3 or Azure Blob). The same path and behavior apply to both backends.

When files are written to disk

  • On produce. Every record ≥ minimumSizeInBytes is uploaded to cloud storage and a copy is written to localDiskDirectory so subsequent fetches do not need to re-download.
  • On fetch (cache miss). When a consumer fetches a record whose payload lives in cloud storage, Gateway downloads it and writes a copy to localDiskDirectory as well.
Each cached object is one file under localDiskDirectory/<topic>/<uuid>.

When files are removed

Files are removed only when their cache entry is evicted, which is triggered by cache activity (addition or removal). The eviction is determined based on two cases:
  • Count-based: more than localCacheItemSize entries are present (default 10000). The least-recently-used entry is evicted.
  • TTL-based: an entry is older than localCacheExpireAfterWriteInSeconds since it was written (default 1 hour). Expired entries are removed by subsequent cache activity, not by a wall-clock timer.
The cache has no byte-size limit. With the defaults, up to 10 000 × max-record-size can accumulate on disk before count-based eviction kicks in. If the volume is smaller than that, lower localCacheItemSize (and/or localCacheExpireAfterWriteInSeconds) to keep peak usage within the volume. There is no automatic cleanup on Gateway restart. Files written by a previous Gateway process remain on disk and continue to count against the volume until the cache is rebuilt and entries are evicted, or until they are removed manually.

Sizing the volume

As a starting point, size the volume so that
localCacheItemSize  ×  expected_max_record_size  +  headroom  ≤  volume_capacity
If the volume is constrained, lower localCacheItemSize first. This is the only setting that strictly bounds the worst-case file count.

Permissions and startup behavior

  • If localDiskDirectory does not exist, Gateway attempts to create it at interceptor configuration time. If creation fails, the interceptor configuration is rejected.
  • If the directory exists but is not writable by the Gateway process, the interceptor configuration is rejected.
  • The Gateway process needs read and write permission for the directory and for files it creates inside it.

Default location

If localDiskDirectory is omitted, Gateway uses ${java.io.tmpdir}/myStorage/ (typically /tmp/myStorage/ because Gateway container runs Linux). For production deployments, set localDiskDirectory explicitly to a properly sized, persistent volume rather than relying on the system temp directory.

Amazon S3

The S3 credentials default to managed identity. They will be overwritten if a specific basic credentials (accessKey and secretKey) or session credentials (accessKey, secretKey and sessionToken) are configured. For S3-compatible storage (MinIO, NetApp ONTAP, Dell ECS, Cloudflare R2), set disableChecksums to true. Otherwise, the AWS SDK sends a checksum in a form these backends do not accept, and uploads fail (the backend may report that the checksum header does not match).
KeyTypeDescription
accessKeystringS3 access key
secretKeystringS3 secret key
sessionTokenstringS3 session token
bucketNamestringS3 bucket name
uristringS3 URI
regionstringS3 region
disableChecksumsbooleanSet to true for S3-compatible storage (MinIO, NetApp ONTAP, Dell ECS, Cloudflare R2) so uploads succeed; the default checksum format is not accepted by these backends. Default: false.

Azure Blob Storage

Note that your application will require at least Storage Blob Data Contributor permissions to be able to read/write the data.
KeyTypeDescription
tenantIdstringAzure tenant ID
clientIdstringAzure client ID
secretstringAzure client secret
blobEndpointstringAzure Blob Storage endpoint to use
bucketNamestringBucket (container) name in Blob Storage configured to store in

Examples

Large batches

As of Gateway v3.18.0, the LargeBatchHandlingPlugin has been deprecated and will be removed in v3.21.0. Use the large message handling plugin instead, which handles individual messages above the size threshold.
Each batch that’s above the minimumSizeInBytes threshold will be saved in one file on Amazon S3, with credentials defaulting to managed identity:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeBatchHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'
With basic credentials:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeBatchHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'
With session credentials:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeBatchHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeBatchHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "sessionToken": "mySessionToken",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'

Large messages

Each individual message that’s above the minimumSizeInBytes threshold will be saved in one file on Amazon S3, with credentials defaulting to managed identity:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeMessageHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'
With basic credentials:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeMessageHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'
With sessionCredentials:
curl \
  --request PUT \
  --url 'http://localhost:8888/gateway/v2/interceptor' \
  --header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
  --header 'Content-Type: application/json' \
  --data-raw '{
  "name": "myLargeMessageHandlingPlugin",
  "pluginClass": "io.conduktor.gateway.interceptor.LargeMessageHandlingPlugin",
  "priority": 100,
  "config": {
    "topic": "topic.*",
    "minimumSizeInBytes": 1024,
    "localDiskDirectory": "myStorage/",
    "s3Config": {
      "accessKey": "myAccessKey",
      "secretKey": "mySecretKey",
      "sessionToken": "mySessionToken",
      "bucketName": "myBucketName",
      "uri": "http://myexampleuri",
      "region": "us-east-1"
    }
  }
}'