Consumer Task
Use the Consumer task to consume data from a Kafka topic.
You can consume data from Kafka with a number of conditions.
Method | Description |
---|---|
Single Record | Consume a single record from a Kafka topic |
X Records | Consume a user-defined number of records from a Kafka topic |
Time Interval | Constantly consume records until an elapsed time (ms) is reached |
With filter(s) | Use any of the above methods with additional filter conditions to intercept specific messages |
Deserialization support
When you consume data from Kafka you must specify the deserialization format for the record keys and values.
Currently supported SerDes formats:
- String
- JSON
- Long
- Float
- Double
- Bytes (base64)
- Avro (Custom)
- Avro (Schema Registry)
- Protobuf (Schema Registry)
- JSON (Schema Registry)
- MessagePack
Create a consumer task
For this exercise, we will assume you have already added a simple Produce task to your test scenario. We will consume the message configured in the Producer task.
To ensure the consumer task is subscribed when the message is produced, these tasks must be configured in parallel.
From inside the editor view, select the Scenario Start button and select Consumer from the dropdown menu.
Give your task an appropriate name, and select the same Cluster and Topic that was configured in the Producer task.
Navigate to the Data tab and define the deserialization format for the Key and Value. Ensure this matches the serialization format defined in the Producer task.
The Consumer task is configured with default lifecycle rules. It will stop when 1 record has been consumed, or fail after 60,000ms.
However, you can amend these to suit your needs within the Lifecycle section.
To learn about adding checks (tests) on data consumed from Kafka, move on to **Test Checks**.
Consumer filters
It's possible to filter messages consumed by your Consumer task. This is useful if you need to intercept a message based on a specific criteria.
There are multiple filter types:
Filter Type | Description |
---|---|
Headers | Filter on message headers (metadata) |
JSON Query | Filter on a message key or message value |
Schema Id | When using Schema Registry, filter on a schema Id for the message key or message value |
Also, you can configure your filter rules with two conditions:
Filter Condition | Description |
---|---|
All | Filter messages that obey all filter rules (in cases where there are multiple filters) |
Any of | Filter messages that abide by at least one filter (in cases where there are multiple filters) |
To add filters to your Consumer task, navigate to the Data tab when in edit mode. Under the Filters header, click the '+' button to add a new filter.