來自:http://doc.okbase.net/QING____/archive/19447.html
也可參考:
http://blog.csdn.net/21aspnet/article/details/19325373
http://blog.csdn.net/unix21/article/details/18990123
kafka作為分布式日志收集或系統監控服務,我們有必要在合適的場合使用它。kafka的部署包括zookeeper環境/kafka環境,同時還需要進行一些配置操作.接下來介紹如何使用kafka.
我們使用3個zookeeper實例構建zk集群,使用2個kafka broker構建kafka集群.
其中kafka為0.8V,zookeeper為3.4.5V
一.Zookeeper集群構建
我們有3個zk實例,分別為zk-0,zk-1,zk-2;如果你僅僅是測試使用,可以使用1個zk實例.
1) zk-0
調整配置文件:
clientPort=2181 server.0=127.0.0.1:2888:3888 server.1=127.0.0.1:2889:3889 server.2=127.0.0.1:2890:3890 ##只需要修改上述配置,其他配置保留默認值
啟動zookeeper
./zkServer.sh start
2) zk-1
調整配置文件(其他配置和zk-0一只):
clientPort=2182 ##只需要修改上述配置,其他配置保留默認值
啟動zookeeper
./zkServer.sh start
3) zk-2
調整配置文件(其他配置和zk-0一只):
clientPort=2183 ##只需要修改上述配置,其他配置保留默認值
啟動zookeeper
./zkServer.sh start
二. Kafka集群構建
因為Broker配置文件涉及到zookeeper的相關約定,因此我們先展示broker配置文件.我們使用2個kafka broker來構建這個集群環境,分別為kafka-0,kafka-1.
1) kafka-0
在config目錄下修改配置文件為:
broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false
因為kafka用scala語言編寫,因此運行kafka需要首先准備scala相關環境。
> cd kafka-0 > ./sbt update > ./sbt package > ./sbt assembly-package-dependency
其中最后一條指令執行有可能出現異常,暫且不管。 啟動kafka broker:
> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因為zookeeper環境已經正常運行了,我們無需通過kafka來掛載啟動zookeeper.如果你的一台機器上部署了多個kafka broker,你需要聲明JMS_PORT.
2) kafka-1
broker.id=1 port=9093 ##其他配置和kafka-0保持一致
然后和kafka-0一樣執行打包命令,然后啟動此broker.
> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
到目前為止環境已經OK了,那我們就開始展示編程實例吧。
三.項目准備
項目基於maven構建,不得不說kafka java客戶端實在是太糟糕了;構建環境會遇到很多麻煩。建議參考如下pom.xml;其中各個依賴包必須版本協調一致。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>test-kafka</artifactId> <packaging>jar</packaging> <name>test-kafka</name> <url>http://maven.apache.org</url> <version>1.0.0</version> <dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.0</artifactId> <version>0.8.0-beta1</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> </dependencies> <build> <finalName>test-kafka-1.0</finalName> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> </resource> </resources> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.5</source> <target>1.5</target> <encoding>gb2312</encoding> </configuration> </plugin> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>2.2</version> <configuration> <encoding>gbk</encoding> </configuration> </plugin> </plugins> </build> </project>
四.Producer端代碼
1) producer.properties文件:此文件放在/resources目錄下
#partitioner.class= metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093 ##,127.0.0.1:9093 producer.type=sync compression.codec=0 serializer.class=kafka.serializer.StringEncoder ##在producer.type=async時有效 #batch.num.messages=100
2) LogProducer.java代碼樣例
package com.test.kafka; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class LogProducer { private Producer<String,String> inner; public LogProducer() throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); ProducerConfig config = new ProducerConfig(properties); inner = new Producer<String, String>(config); } public void send(String topicName,String message) { if(topicName == null || message == null){ return; } KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); inner.send(km); } public void send(String topicName,Collection<String> messages) { if(topicName == null || messages == null){ return; } if(messages.isEmpty()){ return; } List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); for(String entry : messages){ KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); kms.add(km); } inner.send(kms); } public void close(){ inner.close(); } /** * @param args */ public static void main(String[] args) { LogProducer producer = null; try{ producer = new LogProducer(); int i=0; while(true){ producer.send("test-topic", "this is a sample" + i); i++; Thread.sleep(2000); } }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){ producer.close(); } } } }
五.Consumer端
1) consumer.properties:文件位於/resources目錄下
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 ##,127.0.0.1:2182,127.0.0.1:2183 # timeout in ms for connecting to zookeeper zookeeper.connectiontimeout.ms=1000000 #consumer group id group.id=test-group #consumer timeout #consumer.timeout.ms=5000
2) LogConsumer.java代碼樣例
package com.test.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class LogConsumer { private ConsumerConfig config; private String topic; private int partitionsNum; private MessageExecutor executor; private ConsumerConnector connector; private ExecutorService threadPool; public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties")); config = new ConsumerConfig(properties); this.topic = topic; this.partitionsNum = partitionsNum; this.executor = executor; } public void start() throws Exception{ connector = Consumer.createJavaConsumerConnector(config); Map<String,Integer> topics = new HashMap<String,Integer>(); topics.put(topic, partitionsNum); Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics); List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic); threadPool = Executors.newFixedThreadPool(partitionsNum); for(KafkaStream<byte[], byte[]> partition : partitions){ threadPool.execute(new MessageRunner(partition)); } } public void close(){ try{ threadPool.shutdownNow(); }catch(Exception e){ // }finally{ connector.shutdown(); } } class MessageRunner implements Runnable{ private KafkaStream<byte[], byte[]> partition; MessageRunner(KafkaStream<byte[], byte[]> partition) { this.partition = partition; } public void run(){ ConsumerIterator<byte[], byte[]> it = partition.iterator(); while(it.hasNext()){ MessageAndMetadata<byte[],byte[]> item = it.next(); System.out.println("partiton:" + item.partition()); System.out.println("offset:" + item.offset()); executor.execute(new String(item.message()));//UTF-8 } } } interface MessageExecutor { public void execute(String message); } /** * @param args */ public static void main(String[] args) { LogConsumer consumer = null; try{ MessageExecutor executor = new MessageExecutor() { public void execute(String message) { System.out.println(message); } }; consumer = new LogConsumer("test-topic", 2, executor); consumer.start(); }catch(Exception e){ e.printStackTrace(); }finally{ // if(consumer != null){ // consumer.close(); // } } } }
在測試時,建議優先啟動consumer,然后再啟動producer,這樣可以實時的觀測到最新的消息。