Skip to main content
Read from specific partitions and offsets using seek() and assign() in 10 minutes When you need to read from specific partitions or replay data from a particular offset, the seek() and assign() APIs bypass consumer groups entirely. What you’ll learn:
  • When to use seek() and assign() instead of consumer groups
  • How to read from a specific partition and offset
  • How to implement a bounded consumer (read N messages)
To use these API, make the following changes:
  • Remove the group.id from the consumer properties (we don’t use consumer groups anymore).
  • Remove the subscription to the topic.
  • Use consumer assign() and seek() APIs.
The code with these changes is shown in the snippet below.
package io.conduktor.demos.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerDemoAssignSeek {
    public static void main(String[] args) {

        Logger log = LoggerFactory.getLogger(ConsumerDemoAssignSeek.class.getName());

        String bootstrapServers = "127.0.0.1:9092";
        String topic = "demo_java";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // assign and seek are mostly used to replay data or fetch a specific message

        // assign
        TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
        long offsetToReadFrom = 7L;
        consumer.assign(Arrays.asList(partitionToReadFrom));

        // seek
        consumer.seek(partitionToReadFrom, offsetToReadFrom);

        int numberOfMessagesToRead = 5;
        boolean keepOnReading = true;
        int numberOfMessagesReadSoFar = 0;

        // poll for new data
        while(keepOnReading){
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records){
                numberOfMessagesReadSoFar += 1;
                log.info("Key: " + record.key() + ", Value: " + record.value());
                log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
                if (numberOfMessagesReadSoFar >= numberOfMessagesToRead){
                    keepOnReading = false; // to exit the while loop
                    break; // to exit the for loop
                }
            }
        }

        log.info("Exiting the application");

    }
}
Here we want to read messages from the offset 7 of partition 0 of the topic demo_java.

Partition offset

Make sure the partition offset of partition 0 of the topic demo_java is at least 7. Produce a number of messages to the topic to achieve that. Run the application. The console of the application will display the selected messages.
[main] INFO Subscribed to partition(s): demo_java-0
[main] INFO Seeking to offset 7 for partition demo_java-0
[main] INFO Cluster ID: 8fresFx9R4Kod8UqUrixrg
[main] INFO ConsumerDemoAssignSeek - Key: id_7, Value: hello world 7
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:7
[main] INFO ConsumerDemoAssignSeek - Key: id_8, Value: hello world 8
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:8
[main] INFO ConsumerDemoAssignSeek - Key: id_9, Value: hello world 9
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:9
[main] INFO ConsumerDemoAssignSeek - Key: id_0, Value: hello world 0
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:10
[main] INFO ConsumerDemoAssignSeek - Key: id_1, Value: hello world 1
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:11
[main] INFO ConsumerDemoAssignSeek - Exiting the application

Next steps