Kafka - GenericRecord vs SpecificRecord
Kafka has two types of record on producing and consuming Kafka messages which are called GenericRecord and SpecificRecord.
Main difference between GenericRecord and SpecificRecord is that SpecificRecord type can use the Java type information after generating Java classes from Schema definition.
Producer
Producer does not have many difference between two types in terms of sending a message.
GenericRecord
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//avro schema
String simpleMessageSchema =
"{" +
" \"type\": \"record\"," +
" \"name\": \"SimpleMessage\"," +
" \"namespace\": \"com.codingharbour.avro\"," +
" \"fields\": [" +
" {\"name\": \"content\", \"type\": \"string\", \"doc\": \"Message content\"}," +
" {\"name\": \"date_time\", \"type\": \"string\", \"doc\": \"Datetime when the message\"}" +
" ]" +
"}";
//parse the schema
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(simpleMessageSchema);
//prepare the avro record
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("content", "Hello world");
avroRecord.put("date_time", Instant.now().toString());
//prepare the kafka record
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("avro-topic", null, avroRecord);
producer.send(record);
producer.flush();
producer.close();
SpecificRecord
1
2
3
4
5
6
7
8
SimpleMessage simpleMessage = new SimpleMessage();
simpleMessage.setContent("Hello world");
simpleMessage.setDateTime(Instant.now().toString());
ProducerRecord<String, SpecificRecord> record = new ProducerRecord<>("avro-topic", null, simpleMessage);
producer.send(record);
producer.flush();
producer.close();
Consumer
Consumer needs some extra configuration to use SpecificRecord.
GenericRecord
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "generic-record-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("avro-topic"));
//poll the record from the topic
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.println("Message content: " + record.value().get("content"));
System.out.println("Message time: " + record.value().get("date_time"));
}
consumer.commitAsync();
}
SpecificRecord
Consumer needs to make sure that KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
is true.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "specific-record-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); //ensures records are properly converted
KafkaConsumer<String, SimpleMessage> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("avro-topic"));
//poll the record from the topic
while (true) {
ConsumerRecords<String, SimpleMessage> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, SimpleMessage> record : records) {
System.out.println("Message content: " + record.value().getContent()); //1
System.out.println("Message time: " + record.value().getDateTime()); //2
}
consumer.commitAsync();
}
Kafka resources
https://codingharbour.com/blog/