Below are the steps to consume Avro messages from Kafka consumer in Java:
1.Add dependencies: Add the required dependencies to your project, including the Kafka client library and the Avro serializer/deserializer library.
2.Configure Kafka consumer properties: Configure the properties of the Kafka consumer, such as the 3.bootstrap server, the topic to consume from, and the deserializer class for the Avro schema.
4.Create a Kafka consumer instance: Create an instance of the KafkaConsumer class, passing in the consumer properties as a parameter.
5.Subscribe to the topic: Subscribe to the Kafka topic using the subscribe() method of the KafkaConsumer instance.
6.Poll for records: Use the poll() method of the KafkaConsumer instance to poll for records from the Kafka topic.
6.Deserialize Avro records: Use the Avro deserializer library to deserialize the Avro records from the Kafka topic.
7.Process the records: Process the records as required by your application logic.
8.Commit the offsets: Use the commitSync() method of the KafkaConsumer instance to commit the offsets of the processed records.
9.Close the consumer: When you are finished consuming records from the Kafka topic, close the KafkaConsumer instance using the close() method.
Refer below example code snippet to consume Avro messages from Kafka consumer in Java:
// Add dependencies
import org.apache.kafka.clients.consumer.*;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
// Configure Kafka consumer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", true);
// Create a Kafka consumer instance
KafkaConsumer<String, MyRecord> consumer = new KafkaConsumer<String, MyRecord>(props);
// Subscribe to the topic
consumer.subscribe(Arrays.asList("my-topic"));
// Poll for records
while (true) {
ConsumerRecords<String, MyRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, MyRecord> record : records) {
// Deserialize Avro records
MyRecord myRecord = record.value();
// Process the records
System.out.println(myRecord.toString());
}
// Commit the offsets
consumer.commitSync();
}
// Close the consumer
consumer.close();
The above example, replace "MyRecord" with the actual name of your Avro schema class. Also, make sure to provide the correct values for the bootstrap servers, group ID, topic, and schema registry URL
No comments:
Post a Comment