Part 2: Apache kafka basics
Created on: Sep 26, 2024
Let's create a simple java application and produce message.
package com.producer; import java.util.Properties; import java.util.Random; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.model.Order; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; public class OrderProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9094,localhost:9093"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Set acknowledgment level properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // or "1" or "0" properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); // Set retries properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); // Batch size properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "5"); // Linger time properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Long.toString(33 * 1024 * 1024)); // Buffer memory properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); // Enable compression properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // In-flight requests properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // Idempotence // Create a Kafka producer KafkaProducer<String, String> producer = new KafkaProducer<>(properties); Random random = new Random(); ObjectMapper objectMapper = new ObjectMapper(); int recrodCount = 1_000; for (int i = 1; i < recrodCount; i++) { // Message details String topic = "order"; // Create a ProducerRecord with a specific partition int partition = (i % 3); // specify the desired partition String key = "Order-"+partition; Order order = Order.builder() .orderId(random.nextInt(1000) + 1) // random id .quantity(random.nextInt(100) + 1) .productName("Product " + random.nextInt(1000)) .build(); String orderJson = ""; try { orderJson = objectMapper.writeValueAsString(order); } catch (JsonProcessingException e) { e.printStackTrace(); } ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, key, orderJson); // Send the record producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("Error occurred while sending message: " + exception.getMessage()); } else { System.out.println("Message sent successfully! Topic: " + metadata.topic() + ", Partition: " + metadata.partition() + ", Offset: " + metadata.offset()); } } }); } // Flush and close the producer producer.flush(); producer.close(); } }
Message sent successfully! Topic: order, Partition: 0, Offset: 333 Message sent successfully! Topic: order, Partition: 0, Offset: 334 Message sent successfully! Topic: order, Partition: 0, Offset: 335 Message sent successfully! Topic: order, Partition: 0, Offset: 336 .... ....
Important points about above program
- We have serialized key and value as string.
- We have set ACKS_CONFIG to all means all the replica brokers has to be acknowledge for successful transaction.
- We have RETRIES_CONFIG to 3 to retry sending message.
Let's write a consumer to consumer the order.
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class OrderConsume { public static void main(String[] args) { // Set up consumer properties Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "500000"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); // Create Kafka consumers int numConsumers = 3; // Set the desired number of consumers for (int i = 0; i < numConsumers; i++) { properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "CLIENTID "+i); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // Subscribe to the topic(s) you want to consume from consumer.subscribe(Collections.singletonList("order")); // Start consuming messages Thread consumerThread = new Thread(() -> { try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("Consumer: " + Thread.currentThread().getName() + ", Received message: " + record.value() + ", Topic: " + record.topic() + ", Partition: " + record.partition() + ", Offset: " + record.offset()); } } }catch (Exception exception){ System.out.println("some exception "+ exception.getLocalizedMessage()); exception.printStackTrace(); } finally { // Close the consumer when you're done consumer.close(); } }); // Start the consumer thread consumerThread.start(); } } }
Consumer: Thread-1, Received message: {"orderId":815,"quantity":75,"productName":"Product 782"}, Topic: order2, Partition: 1, Offset: 497 Consumer: Thread-1, Received message: {"orderId":548,"quantity":3,"productName":"Product 66"}, Topic: order2, Partition: 1, Offset: 498 Consumer: Thread-1, Received message: {"orderId":833,"quantity":61,"productName":"Product 130"}, Topic: order2, Partition: 1, Offset: 499 Consumer: Thread-1, Received message: {"orderId":943,"quantity":47,"productName":"Product 868"}, Topic: order2, Partition: 1, Offset: 500 Consumer: Thread-1, Received message: {"orderId":262,"quantity":82,"productName":"Product 471"}, Topic: order2, Partition: 1, Offset: 501 Consumer: Thread-1, Received message: {"orderId":877,"quantity":42,"productName":"Product 241"}, Topic: order2, Partition: 1, Offset: 502
In below program we can manually commit the offset using below program
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class OrderConsume { public static void main(String[] args) { // Set up consumer properties Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "500000"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); // Disable automatic offset committing properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Create Kafka consumers int numConsumers = 3; // Set the desired number of consumers for (int i = 0; i < numConsumers; i++) { properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "CLIENTID "+i); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // Subscribe to the topic(s) you want to consume from consumer.subscribe(Collections.singletonList("order2")); // Start consuming messages Thread consumerThread = new Thread(() -> { try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("Consumer: " + Thread.currentThread().getName() + ", Received message: " + record.value() + ", Topic: " + record.topic() + ", Partition: " + record.partition() + ", Offset: " + record.offset()); } consumer.commitAsync(); // or .commitSync() } }catch (Exception exception){ System.out.println("some exception "+ exception.getLocalizedMessage()); exception.printStackTrace(); } finally { // Close the consumer when you're done consumer.close(); } }); // Start the consumer thread consumerThread.start(); } } }
Check whole code in github
