Searching...
Wednesday, 2 November 2016

Apache Camel Kafka Component Example

19:38

Apache camel API has the inbuilt kafka component and it is very simple to create producer, consumer and process messages. Here is one simple Kafka producer and consumer example using Apache camel and Kafka.

Steps

1) Download Apache kafka from https://kafka.apache.org/downloads


2) Extract the tar and start the zookeeper and kafka 


 ./zookeeper-server-start.sh ../config/zookeeper.properties

 ./kafka-server-start.sh ../config/server.properties


3. Create a Maven project and add below dependencies


<dependencies>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-core</artifactId>
			<version>2.16.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-kafka</artifactId>
			<version>2.16.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-jetty</artifactId>
			<version>2.16.3</version>
		</dependency>
		<dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-library</artifactId>
			<version>2.11.0</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.6</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.6.6</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-test</artifactId>
			<version>2.17.3</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
		</dependency>
	</dependencies>


4. Create a camel main class to add camel routes and its processors.


package com.vinod.test;

import org.apache.camel.Exchange;
import org.apache.camel.Main;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;

public class CamelKafkaConsumerTest {

    public static void main(String... args) throws Exception {
	Main main = new Main();
	main.enableHangupSupport();
	main.addRouteBuilder(new MyCamelJettyBuilder());
	main.run(args);
    }
}

class MyCamelJettyBuilder extends RouteBuilder {
    String topicName = "topic=test";
    String kafkaServer = "kafka:localhost:9092";
    String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
    String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
    String autoOffsetOption = "autoOffsetReset=smallest";
    String groupId = "groupId=testingvinod";

    String toKafka = new StringBuilder().append(kafkaServer).append("?").append(
	    topicName).append("&").append(zooKeeperHost).append("&").append(
		    serializerClass).toString();

    String fromKafka = new StringBuilder().append(toKafka).append("&").append(
	    autoOffsetOption).append("&").append(groupId).toString();

    public void configure() {
	from("jetty:http://localhost:8182/mytestservice").process(
		new Processor() {
		    public void process(Exchange exchange) throws Exception {
			String message = exchange.getIn().getBody(String.class);
			exchange.getIn().setBody(message, String.class);
			exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY,
				0);
			exchange.getIn().setHeader(KafkaConstants.KEY, "1");
		    }
		}).to(toKafka);
	from(fromKafka).process(new Processor() {
	    public void process(Exchange exchange) throws Exception {
		if (exchange.getIn() != null) {
		    Message message = exchange.getIn();
		    String data = message.getBody(String.class);
		    System.out.println("Data =" + data.toString());
		}
	    }
	});

    }
}

In this example exposed a jetty end point to give the input message instead of static message, once started the above program use any rest client to test the program.


5) Send message to Kafka topic using our service .. http://localhost:8182/mytestservice


Screen Shot 2016 11 01 at 9 03 01 PM

now we can see the output which we sent and consumed from Kafka in the console

Data  =Test my message by Vinod


6) Download Example

https://github.com/kkvinodkumaran/camel


Reference: Apache Camel, Kafka

0 comments: