Here is one example to produce and consume message from Apache kafka using Java
Steps
1) Download Apache kafka from https://kafka.apache.org/downloads
2) Extract the tar and start the zookeeper and kafka
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties
3. Create a Maven project and add below dependencies
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.1.0</version> </dependency>
4. Create a main class to create producer/consumer and sending messages
package com.vinod.test; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Test class to send and receive message using kafka broker. * *@authorvinodkariyathungalkumaran * */ public class MyKafkaConsumer { private kafka.javaapi.consumer.ConsumerConnector consumer; /** * Method to create the kafka producer and sending message * */ public void testProducer() { Properties properties = new Properties(); properties.put("metadata.broker.list", "localhost:9092"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig producerConfig = new ProducerConfig(properties); kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); KeyedMessage<String, String> message = new KeyedMessage<String, String>("order", "Sending Customer Order, please process"); producer.send(message); } /** * Method to create consumer and consume message */ public void testConsumer() { String topic = "order"; Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "vinod"); props.put("zookeeper.session.timeout.ms", "5000"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); props.put("serializerClass", "kafka.serializer.StringEncoder"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); Map<String, Integer> topicCount = new HashMap<String, Integer>(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { String message = new String(it.next().message()); System.out.println("Message from order Topic: " + message); } } if (consumer != null) { consumer.shutdown(); } } public static void main(String[] args) { MyKafkaConsumer mykafkaConsumer = new MyKafkaConsumer(); mykafkaConsumer.testProducer(); mykafkaConsumer.testConsumer(); } }
5) Output printed in the console
Message from order Topic: Sending Customer Order, please process
6) Download Example
https://github.com/kkvinodkumaran/kafka
Reference: Kafka
0 comments:
Post a Comment