Searching...
Sunday, 20 November 2016

Simple Kafka Producer and Consumer using Java

08:58

Here is one example to produce and consume message from Apache kafka using Java

Steps

1) Download Apache kafka from https://kafka.apache.org/downloads


2) Extract the tar and start the zookeeper and kafka 


 ./zookeeper-server-start.sh ../config/zookeeper.properties

 ./kafka-server-start.sh ../config/server.properties


3. Create a Maven project and add below dependencies


<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.10.1.0</version>
		</dependency>


4. Create a main class to create producer/consumer and sending messages


package com.vinod.test;


import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 * Test class to send and receive message using kafka broker.
 *
 *@authorvinodkariyathungalkumaran
 *
 */
public class MyKafkaConsumer {
    private kafka.javaapi.consumer.ConsumerConnector consumer;

    /**
     * Method to create the kafka producer and sending message
     *
     */
    public void testProducer() {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", "localhost:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig producerConfig = new ProducerConfig(properties);
        kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
        KeyedMessage<String, String> message = new KeyedMessage<String, String>("order", "Sending Customer Order, please process");
        producer.send(message);
    }

    /**
     * Method to create consumer and consume message
     */
    public void testConsumer() {
        String topic = "order";
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "vinod");
        props.put("zookeeper.session.timeout.ms", "5000");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        props.put("serializerClass", "kafka.serializer.StringEncoder");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        Map<String, Integer> topicCount = new HashMap<String, Integer>();
        topicCount.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                String message = new String(it.next().message());
                System.out.println("Message from order Topic: " + message);
            }
        }
        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        MyKafkaConsumer mykafkaConsumer = new MyKafkaConsumer();
        mykafkaConsumer.testProducer();
        mykafkaConsumer.testConsumer();
    }

}


5) Output printed in the console

Message from order Topic: Sending Customer Order, please process


6) Download Example

https://github.com/kkvinodkumaran/kafka





Reference: Kafka

0 comments: