AdSense

Sunday, August 16, 2015

Apache Kafka Java API example

I've just started learning Apache Kafka, and I realize there are not much documentation and examples on the project. So I decide to prepare my own notes.

Start Kafka
Kafka can be installed through source or compiled package. If you are interested in doing it via source, see this git. 

To make things easier, I choose the compiled package. Since everything is compiled, all we need to do is to start the servers. 
Kafka runs on ZooKeeper, so the first thing is to start the ZooKeeper server:
In the directory where Kafka is installed:

bin/zookeeper-server-start.sh config/zookeeper.properties

In a different window/tab, start Kafka server:

bin/kafka-server-start.sh config/server.properties

Create a topic
A topic is a category/feed name to which message are published. The producer sends message to the topic it chooses (see the following code for details), and the consumer accepts messages to the topic it wants.
In a different window/tab, create a topic (named "test"):

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Create producer and consumers through Java API
If you just want to get your feet wet, follow the doc and try creating producers and consumers through command line.

Here I use Eclipse to make things easier.

First, in Eclipse, create a Maven project (refer this post for creating a Maven project in Eclipse). To include dependencies in pom.xml, open pom.xml, at the bottom click pom.xml (see the figure below), and add the following dependencies:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>Kafka</groupId>
  <artifactId>KafkaProject</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>kafkatest</name>
  <url>http://maven.apache.org</url>
  <properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  
  <dependencies>
   <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
    <scope>compile</scope>
    <exclusions> 
     <exclusion> 
      <artifactId>jmxri</artifactId> 
      <groupId>com.sun.jmx</groupId> 
     </exclusion> 
     <exclusion> 
      <artifactId>jms</artifactId> 
      <groupId>javax.jms</groupId> 
     </exclusion> 
     <exclusion> 
      <artifactId>jmxtools</artifactId> 
      <groupId>com.sun.jdmk</groupId> 
     </exclusion> 
    </exclusions>
   </dependency>
  </dependencies>
</project>


Be sure to include all exclusions, otherwise you will get error.

Before writing our Producer class, start a consumer in a new window/tab in terminal (in the directory where Kafka is installed):

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Now create a class ProducerTest (or whatever name you want).

import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerTest {
 public static void main(String[] args) { 
  Properties props = new Properties();
  props.put("zk.connect", "localhost:2181"); 
  props.put("serializer.class","kafka.serializer.StringEncoder");
  props.put("metadata.broker.list", "localhost:9092");
  ProducerConfig config = new ProducerConfig(props);
  Producer<string, string> producer = new Producer<string, string>(config);
  for (long nEvents = 0; nEvents < 10; nEvents++){
   System.out.println("Creating events " + nEvents);
   long runtime = new Date().getTime();
   String msg = runtime + ", Shirley is awesome.";
   producer.send(new KeyedMessage<string, string>("test", msg));
  } 
 }
}

Now if you run the code, you will see messages showing up in the window/tab where consumer is created.

syang:kafka_2.10-0.8.2.0 syang$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
test
1439686893619, Shirley is awesome.
1439686893722, Shirley is awesome.
1439686893724, Shirley is awesome.
1439686893725, Shirley is awesome.
1439686893727, Shirley is awesome.
1439686893728, Shirley is awesome.
1439686893729, Shirley is awesome.
1439686893731, Shirley is awesome.
1439686893732, Shirley is awesome.
1439686893733, Shirley is awesome.


Start a producer in a new window/tab:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Create a ConsumerTest class:
The consumer class is more complicated since it involves multithreading.

public class ConsumerTest extends Thread{
 private final ConsumerConnector consumer;
 private final String topic;
 
 public ConsumerTest(String topic){
  consumer = kafka.consumer.Consumer
    .createJavaConsumerConnector(createConsumerConfig());
  this.topic = topic;
 }
 public static ConsumerConfig createConsumerConfig(){
  Properties props = new Properties();
  props.put("zookeeper.connect", "localhost:2181");
  props.put("group.id", "test_group");
  props.put("zookeeper.session.timeout.ms", "400000");
  props.put("zookeeper.sync.time.ms", "200");
  props.put("auto.commit.interval.ms", "1000");
  return new ConsumerConfig(props);
 }
 
 public void run(){
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  topicCountMap.put(topic, 1);
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  KafkaStream<byte[],byte[]> stream = consumerMap.get(topic).get(0);
  ConsumerIterator<byte[], byte[]> it = stream.iterator();
  while (it.hasNext())
   System.out.println(new String(it.next().message()));
 } 
 public static void main(String[] args) {
  ConsumerTest consumerThread = new ConsumerTest("test");
  consumerThread.start();
 }
}


Configurations
Both Producer and Consumer class requires a Properties object (implemented as a HashTable), which set the configurations for Producer and Consumer.

Producer configurations:
zk.connect: Specifies the Zookeeper connection string in the form of hostname:port/chroot. The chroot is a base directory which is prepended to all path operations. This namespaces all Kafka znodes to allow sharing with other applications on the same zookeeper cluster.
serializer.class: Used to encode data of type T into a Kafka message.
metadata.broker.list:the producer will only use it for getting metadata(topics, partitions and replicas). The socket connection for sending the actual data will be established based on the broker information returned in the metadata. Format host1:port1, host2, port2.

Consumer configurations:
group.id: A string that uniquely identifies the group of consumer processes to which this consumer belongs(producer sends messages to all consumer groups, each consumer receive desired messages).
zookeeper.session.timeout.ms: If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
zookeeper.sync.tims.ms: How far a zookeeper follower can be behind a zookeeper lead.
auto.commit.interval.ms: the frequency in ms that the consumer offsets are committed to a zookeeper.

 References:
[1] Kafka documentation
[2] http://vulab.com/blog/?p=611
[3] http://vulab.com/blog/?p=623
[4] http://blog.csdn.net/ganglia/article/details/12651891

1 comment:

  1. The development of artificial intelligence (AI) has propelled more programming architects, information scientists, and different experts to investigate the plausibility of a vocation in machine learning. Notwithstanding, a few newcomers will in general spotlight a lot on hypothesis and insufficient on commonsense application. IEEE final year projects on machine learning In case you will succeed, you have to begin building machine learning projects in the near future.

    Projects assist you with improving your applied ML skills rapidly while allowing you to investigate an intriguing point. Furthermore, you can include projects into your portfolio, making it simpler to get a vocation, discover cool profession openings, and Final Year Project Centers in Chennai even arrange a more significant compensation.


    Data analytics is the study of dissecting crude data so as to make decisions about that data. Data analytics advances and procedures are generally utilized in business ventures to empower associations to settle on progressively Python Training in Chennai educated business choices. In the present worldwide commercial center, it isn't sufficient to assemble data and do the math; you should realize how to apply that data to genuine situations such that will affect conduct. In the program you will initially gain proficiency with the specialized skills, including R and Python dialects most usually utilized in data analytics programming and usage; Python Training in Chennai at that point center around the commonsense application, in view of genuine business issues in a scope of industry segments, for example, wellbeing, promoting and account.

    ReplyDelete