When you want to read data from a Kafka topic using the Kafka Consumer API, you need to complete several key steps. Below are the detailed steps for this process:
Step 1: Add Dependencies
First, ensure your project includes the Apache Kafka dependency. If you are using Java with Maven as your build tool, add the following dependency to your pom.xml file:
xml<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
Step 2: Configure the Consumer
Creating a Kafka consumer requires specifying several configurations. The most critical ones include bootstrap.servers (the address of the Kafka cluster), key.deserializer and value.deserializer (the classes used for message deserialization), and group.id (the identifier for the consumer group). Here is a basic configuration example:
javaProperties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); // Read from the earliest message
Step 3: Create the Consumer
Using the configuration defined earlier, create a Kafka consumer:
javaKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Step 4: Subscribe to Topics
You need to subscribe to one or more topics. This can be achieved using the subscribe method:
javaconsumer.subscribe(Arrays.asList("my-topic"));
Step 5: Pull and Process Data
Finally, use a loop to continuously pull data from the server. Each time you pull, process the retrieved records:
javatry { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); }
This process will continuously listen for and process new messages.
Example Application
Suppose I work in an e-commerce platform and need to implement a service that reads order information from Kafka and processes each order. The steps above describe how I set up a consumer from scratch to read order data from the "orders" topic in Kafka and print the details of each order.
Note: When using the Kafka Consumer, you should also consider additional factors such as error handling, multi-threaded consumption, and consumer robustness. However, the core steps and configurations are as described above.