Skip to main content
Quick navigation

Kafka Resources

Kafka Resources

Topic

Creates a Topic in Kafka.

API Keys: Admin API Key Application API Key
Managed with: CLI API Console UI

---
apiVersion: kafka/v2
kind: Topic
metadata:
cluster: shadow-it
name: click.event-stream.avro
labels:
conduktor.io/description: |
# Event Stream from Click Application
This is a multiline markdown description that will appear in the Topic Catalog
conduktor.io/description.editable: "false"
spec:
replicationFactor: 3
partitions: 3
configs:
min.insync.replicas: '2'
cleanup.policy: delete
retention.ms: '60000'

Topic checks:

  • metadata.cluster is a valid Kafka Cluster
  • metadata.name must belong to the Application Instances
  • spec.replicationFactor and spec.partitions are immutable and cannot be modified once the topic is created
  • spec.configs must be valid Kafka Topic configs
  • All properties are validated against TopicPolicies attached to the Application Instance

Conduktor annotations

  • conduktor.io/description is optional. The description field in markdown that will be displayed in the Topic Catalog view
  • conduktor.io/description.editable is optional (defaults "true"). Defines whether the description can be updated in the UI

Side effect in Console & Kafka:

Subject

Creates a Subject in the Schema Registry.

API Keys: Admin API Key Application API Key
Managed with: CLI API Console UI

Local file

---
apiVersion: kafka/v2
kind: Subject
metadata:
cluster: shadow-it
name: myPrefix.topic-value
spec:
schemaFile: schemas/topic.avsc # relative to conduktor CLI execution context
format: AVRO
compatibility: FORWARD_TRANSITIVE

Inline

---
apiVersion: kafka/v2
kind: Subject
metadata:
cluster: shadow-it
name: myPrefix.topic-value
spec:
schema: |
{
"type": "long"
}
format: AVRO

Schema Reference

---
apiVersion: kafka/v2
kind: Subject
metadata:
cluster: shadow-it
name: myPrefix.topic-value
spec:
schema: |
{
"type": "record",
"namespace": "com.schema.avro",
"name": "Client",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "address",
"type": "com.schema.avro.Address"
}
]
}
format: AVRO
references:
- name: com.schema.avro.Address
subject: commons.address-value
version: 1

Subject checks:

  • metadata.cluster is a valid Kafka Cluster
  • metadata.name must belong to the Application Instance
  • One of spec.schema or spec.schemaFile must be present
    • schema requires an inline schema
    • schemaFile requires a path to a file that contains the schema relative to the CLI execution path
  • spec.format is mandatory. Defines the schema format: AVRO, PROTOBUF, JSON
  • spec.compatibility is optional. Defines the subject compatibility mode: BACKWARD, BACKWARD_TRANSITIVE, FORWARD, FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE, NONE
    • Unset the field if you want the compatibility mode to be the one defined at the Schema Registry global level
  • spec.references is optional. It specifies the names of referenced schemas

Side effect in Console & Kafka:

Connector

Creates a connector on a Kafka Connect cluster.

API Keys: Admin API Key Application API Key
Managed with: CLI API Console UI

---
apiVersion: kafka/v2
kind: Connector
metadata:
name: click.my-connector
cluster: 'prod-cluster'
connectCluster: kafka-connect-cluster
labels:
conduktor.io/auto-restart-enabled: true
conduktor.io/auto-restart-frequency: 600
spec:
config:
connector.class: io.connect.jdbc.JdbcSourceConnector
tasks.max: '1'
topic: click.pageviews
connection.url: "jdbc:mysql://127.0.0.1:3306/sample?verifyServerCertificate=false&useSSL=true&requireSSL=true"
consumer.override.sasl.jaas.config: o.a.k.s.s.ScramLoginModule required username="<user>" password="<password>";

Connector checks

  • metadata.connectCluster is a valid KafkaConnect Cluster
  • metadata.name must belong to the Application Instance

Conduktor annotations

  • conduktor.io/auto-restart-enabled is optional (default "false"). Defines whether the Console Automatic Restart feature is enabled for this Connector
  • conduktor.io/auto-restart-frequency is optional (default 600, meaning 10 minutes). Defines the delay between consecutive restart attempts