Quick navigation
Create Topic Policy
Introduction
Kafka is allowing the creation of topics freely, which leads to invalid topics being created in the cluster.
Create topic policy limits on topic creation to ensure that any topics created in the cluster adhere to a minimum/maximum specification for Replication Factor and Partition count, as well as topic-level configs.
Configuration
key | type | default | description |
---|---|---|---|
topic | String | .* | Topics that match this regex will have the interceptor applied |
namingConvention | Regex | Configuration for validating topic name convention | |
numPartition | Integer | Configuration for number of partitions | |
replicationFactor | Integer | Configuration for number of replicas | |
cleanupPolicy | Cleanup Policy | Configuration for cleanup.policy | |
compressionType | Compression Type | Configuration for compression.type | |
deleteRetentionMs | Long | Configuration for delete.retention.ms | |
fileDeleteDelayMs | Long | Configuration for file.delete.delay.ms | |
flushMessages | Long | Configuration for flush.messages | |
flushMs | Long | Configuration for flush.ms | |
indexIntervalBytes | Integer | Configuration for index.interval.bytes | |
maxCompactionLagMs | Long | Configuration for max.compaction.lag.ms | |
maxMessageBytes | Integer | Configuration for max.message.bytes | |
messageTimestampDifferenceMaxMs | Long | Configuration for message.timestamp.difference.max.ms | |
messageTimestampType | Message Timestamp Type | Configuration for message.timestamp.type | |
minCleanableDirtyRatio | Double | Configuration for min.cleanable.dirty.ratio | |
minCompactionLagMs | Long | Configuration for min.compaction.lag.ms | |
minInsyncReplicas | Integer | Configuration for min.insync.replicas | |
preallocate | Boolean | Configuration for preallocate | |
retentionBytes | Long | Configuration for retention.bytes | |
retentionMs | Long | Configuration for retention.ms | |
segmentBytes | Integer | Configuration for segment.bytes | |
segmentIndexBytes | Integer | Configuration for segment.bytes | |
segmentJitterMs | Long | Configuration for segment.jitter.ms | |
segmentMs | Long | Configuration for segment.ms | |
uncleanLeaderElectionEnable | Boolean | Configuration for unclean.leader.election.enable | |
messageDownconversionEnable | Boolean | Configuration for message.downconversion.enable |
Regex
key | type | default | description |
---|---|---|---|
value | String | Regex for validating topic name | |
action | Action | BLOCK | Action to take if the value is outside the specified range. |
Integer
key | type | default | description |
---|---|---|---|
min | int | Minimum value for the configuration. | |
max | int | Maximum value for the configuration. | |
action | Action | BLOCK | Action to take if the value is outside the specified range. |
overrideValue | int | Value to override with (only applicable when action is set to OVERRIDE ). |
Long
key | type | default | description |
---|---|---|---|
min | double | Minimum value for the configuration. | |
max | double | Maximum value for the configuration. | |
action | Action | BLOCK | Action to take if the value is outside the specified range. |
overrideValue | double | Value to override with (only applicable when action is set to OVERRIDE ). |
Cleanup Policy
key | type | default | description |
---|---|---|---|
values | Set[String] | Value for the configuration, should be a set of string that contains values from delete , compact or specify both policies in a comma-separated list like delete,compact . | |
action | Action | BLOCK | Action to take if the value is outside the specified range. |
overrideValue | String | Value to override with (only applicable when action is set to OVERRIDE ). |
Compression Type
key | type | default | description |
---|---|---|---|
values | Set[Compression] | Set of string contains compression types. | |
action | Action | BLOCK | Action to take if the value is outside the specified range. |
overrideValue | Compression | Value to override with (only applicable when action is set to OVERRIDE ). |
Message Timestamp Type
key | type | default | description |
---|---|---|---|
value | String | Only these are allowed, allowed values: CreateTime or LogAppendTime . If action is OVERRIDE , will use this value for override value | |
action | Action | BLOCK | Action to take if the value is outside the specified range. |
Boolean
key | type | default | description |
---|---|---|---|
value | Boolean | Value for the configuration. If action is OVERRIDE , will use this value for override value | |
action | Action | BLOCK | Action to take if the value is outside the specified range. |
Compression
uncompressed
gzip
snappy
lz4
zstd
producer
Action
BLOCK
→ when fail, save in audit and return error.INFO
→ execute API with wrong value, save in audit.OVERRIDE
→ execute API withoverrideValue
(orvalue
for others) values, save in audit the fact that we updated on the fly (with wrong value, and the one we used to fix them)
Example
{
"name": "myCreateTopicPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority": 100,
"config": {
"topic": "topic_1.*",
"numPartition": {
"min": 5,
"max": 5,
"action": "BLOCK"
},
"replicationFactor": {
"min": 2,
"max": 4,
"action": "OVERRIDE",
"overrideValue": 3
},
"retentionMs": {
"min": 10,
"max": 100
},
"retentionBytes": {
"min": 10,
"max": 100,
"action": "BLOCK"
},
"segmentMs": {
"min": 10,
"max": 100,
"action": "INFO"
},
"segmentBytes": {
"min": 10,
"max": 100,
"action": "BLOCK"
},
"segmentJitterMs": {
"min": 10,
"max": 100,
"action": "INFO",
"overrideValue": 20
},
"flushMessages": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"flushMs": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"maxMessageBytes": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"minInsyncReplicas": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"cleanupPolicy": {
"values": [
"delete",
"compact"
],
"action": "OVERRIDE"
},
"uncleanLeaderElectionEnable": {
"value": false,
"action": "BLOCK"
},
"compressionType": {
"values": [
"producer",
"gzip"
],
"action": "BLOCK"
}
}
}