Skip to main content
Quick navigation

Create Topic Policy

Introduction

Kafka is allowing the creation of topics freely, which leads to invalid topics being created in the cluster.

Create topic policy limits on topic creation to ensure that any topics created in the cluster adhere to a minimum/maximum specification for Replication Factor and Partition count, as well as topic-level configs.

Configuration

keytypedefaultdescription
topicString.*Topics that match this regex will have the interceptor applied
namingConventionRegexConfiguration for validating topic name convention
numPartitionIntegerConfiguration for number of partitions
replicationFactorIntegerConfiguration for number of replicas
cleanupPolicyCleanup PolicyConfiguration for cleanup.policy
compressionTypeCompression TypeConfiguration for compression.type
deleteRetentionMsLongConfiguration for delete.retention.ms
fileDeleteDelayMsLongConfiguration for file.delete.delay.ms
flushMessagesLongConfiguration for flush.messages
flushMsLongConfiguration for flush.ms
indexIntervalBytesIntegerConfiguration for index.interval.bytes
maxCompactionLagMsLongConfiguration for max.compaction.lag.ms
maxMessageBytesIntegerConfiguration for max.message.bytes
messageTimestampDifferenceMaxMsLongConfiguration for message.timestamp.difference.max.ms
messageTimestampTypeMessage Timestamp TypeConfiguration for message.timestamp.type
minCleanableDirtyRatioDoubleConfiguration for min.cleanable.dirty.ratio
minCompactionLagMsLongConfiguration for min.compaction.lag.ms
minInsyncReplicasIntegerConfiguration for min.insync.replicas
preallocateBooleanConfiguration for preallocate
retentionBytesLongConfiguration for retention.bytes
retentionMsLongConfiguration for retention.ms
segmentBytesIntegerConfiguration for segment.bytes
segmentIndexBytesIntegerConfiguration for segment.bytes
segmentJitterMsLongConfiguration for segment.jitter.ms
segmentMsLongConfiguration for segment.ms
uncleanLeaderElectionEnableBooleanConfiguration for unclean.leader.election.enable
messageDownconversionEnableBooleanConfiguration for message.downconversion.enable

Regex

keytypedefaultdescription
valueStringRegex for validating topic name
actionActionBLOCKAction to take if the value is outside the specified range.

Integer

keytypedefaultdescription
minintMinimum value for the configuration.
maxintMaximum value for the configuration.
actionActionBLOCKAction to take if the value is outside the specified range.
overrideValueintValue to override with (only applicable when action is set to OVERRIDE).

Long

keytypedefaultdescription
mindoubleMinimum value for the configuration.
maxdoubleMaximum value for the configuration.
actionActionBLOCKAction to take if the value is outside the specified range.
overrideValuedoubleValue to override with (only applicable when action is set to OVERRIDE).

Cleanup Policy

keytypedefaultdescription
valuesSet[String]Value for the configuration, should be a set of string that contains values from delete, compact or specify both policies in a comma-separated list like delete,compact.
actionActionBLOCKAction to take if the value is outside the specified range.
overrideValueStringValue to override with (only applicable when action is set to OVERRIDE).

Compression Type

keytypedefaultdescription
valuesSet[Compression]Set of string contains compression types.
actionActionBLOCKAction to take if the value is outside the specified range.
overrideValueCompressionValue to override with (only applicable when action is set to OVERRIDE).

Message Timestamp Type

keytypedefaultdescription
valueStringOnly these are allowed, allowed values: CreateTime or LogAppendTime. If action is OVERRIDE, will use this value for override value
actionActionBLOCKAction to take if the value is outside the specified range.

Boolean

keytypedefaultdescription
valueBooleanValue for the configuration. If action is OVERRIDE, will use this value for override value
actionActionBLOCKAction to take if the value is outside the specified range.

Compression

  • uncompressed
  • gzip
  • snappy
  • lz4
  • zstd
  • producer

Action

  • BLOCK → when fail, save in audit and return error.
  • INFO → execute API with wrong value, save in audit.
  • OVERRIDE → execute API with overrideValue (or value for others) values, save in audit the fact that we updated on the fly (with wrong value, and the one we used to fix them)

Example

{
"name": "myCreateTopicPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority": 100,
"config": {
"topic": "topic_1.*",
"numPartition": {
"min": 5,
"max": 5,
"action": "BLOCK"
},
"replicationFactor": {
"min": 2,
"max": 4,
"action": "OVERRIDE",
"overrideValue": 3
},
"retentionMs": {
"min": 10,
"max": 100
},
"retentionBytes": {
"min": 10,
"max": 100,
"action": "BLOCK"
},
"segmentMs": {
"min": 10,
"max": 100,
"action": "INFO"
},
"segmentBytes": {
"min": 10,
"max": 100,
"action": "BLOCK"
},
"segmentJitterMs": {
"min": 10,
"max": 100,
"action": "INFO",
"overrideValue": 20
},
"flushMessages": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"flushMs": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"maxMessageBytes": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"minInsyncReplicas": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"cleanupPolicy": {
"values": [
"delete",
"compact"
],
"action": "OVERRIDE"
},
"uncleanLeaderElectionEnable": {
"value": false,
"action": "BLOCK"
},
"compressionType": {
"values": [
"producer",
"gzip"
],
"action": "BLOCK"
}
}
}