04 April, 2023

Consume Avro messages from Kafka consumer in Java

Below are the steps to consume Avro messages from Kafka consumer in Java:

1.Add dependencies: Add the required dependencies to your project, including the Kafka client library and the Avro serializer/deserializer library.

2.Configure Kafka consumer properties: Configure the properties of the Kafka consumer, such as the 3.bootstrap server, the topic to consume from, and the deserializer class for the Avro schema.

4.Create a Kafka consumer instance: Create an instance of the KafkaConsumer class, passing in the consumer properties as a parameter.

5.Subscribe to the topic: Subscribe to the Kafka topic using the subscribe() method of the KafkaConsumer instance.

6.Poll for records: Use the poll() method of the KafkaConsumer instance to poll for records from the Kafka topic.

6.Deserialize Avro records: Use the Avro deserializer library to deserialize the Avro records from the Kafka topic.

7.Process the records: Process the records as required by your application logic.

8.Commit the offsets: Use the commitSync() method of the KafkaConsumer instance to commit the offsets of the processed records.

9.Close the consumer: When you are finished consuming records from the Kafka topic, close the KafkaConsumer instance using the close() method.

Refer below example code snippet to consume Avro messages from Kafka consumer in Java:

// Add dependencies

import org.apache.kafka.clients.consumer.*;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;

// Configure Kafka consumer properties

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("group.id", "test-group");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");

props.put("schema.registry.url", "http://localhost:8081");

props.put("specific.avro.reader", true);

// Create a Kafka consumer instance

KafkaConsumer<String, MyRecord> consumer = new KafkaConsumer<String, MyRecord>(props);

// Subscribe to the topic

consumer.subscribe(Arrays.asList("my-topic"));

// Poll for records

while (true) {

    ConsumerRecords<String, MyRecord> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, MyRecord> record : records) {

        // Deserialize Avro records

        MyRecord myRecord = record.value();

        // Process the records

        System.out.println(myRecord.toString());

    }

    // Commit the offsets

    consumer.commitSync();

}

// Close the consumer

consumer.close();

The above example, replace "MyRecord" with the actual name of your Avro schema class. Also, make sure to provide the correct values for the bootstrap servers, group ID, topic, and schema registry URL

No comments: