參考資料:http://blog.csdn.net/honglei915/article/details/37563647
參數說明:http://ju.outofmemory.cn/entry/119243
參數說明/Demo:http://www.aboutyun.com/thread-9906-1-1.html
Kafka+Spark:
http://shiyanjun.cn/archives/1097.html
http://ju.outofmemory.cn/entry/84636
1. Kafka啟動:
1. 先啟動所有節點的zookeeper : 進入ZOOKEEPER_HOME/bin 執行./zkServer.sh start
2. 啟動所有節點的kafka:進入 KAFKA_HOME/bin 執行 ./kafka-server-start.sh config/server.properties &
2. 參數說明
2.0 boker參數說明 (配置文件位於config/server.properties)
name | 默認值 | 描述 |
---|---|---|
broker.id | none | 每一個boker都有一個唯一的id作為它們的名字。 這就允許boker切換到別的主機/端口上, consumer依然知道 |
enable.zookeeper | true | 允許注冊到zookeeper |
log.flush.interval.messages | Long.MaxValue | 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量 |
log.flush.interval.ms | Long.MaxValue | 在數據被寫入到硬盤前的最大時間 |
log.flush.scheduler.interval.ms | Long.MaxValue | 檢查數據是否要寫入到硬盤的時間間隔。 |
log.retention.hours | 168 | 控制一個log保留多長個小時 |
log.retention.bytes | -1 | 控制log文件最大尺寸 |
log.cleaner.enable | false | 是否log cleaning |
log.cleanup.policy | delete | delete還是compat. 其它控制參數還包括log.cleaner.threads,log.cleaner.io.max.bytes.per.second, log.cleaner.dedupe.buffer.size,log.cleaner.io.buffer.size,log.cleaner.io.buffer.load.factor, log.cleaner.backoff.ms,log.cleaner.min.cleanable.ratio,log.cleaner.delete.retention.ms |
log.dir | /tmp/kafka-logs | 指定log文件的根目錄 |
log.segment.bytes | 110241024*1024 | 單一的log segment文件大小 |
log.roll.hours | 24 * 7 | 開始一個新的log文件片段的最大時間 |
message.max.bytes | 1000000 + MessageSet.LogOverhead | 一個socket 請求的最大字節數 |
num.network.threads | 3 | 處理網絡請求的線程數 |
num.io.threads | 8 | 處理IO的線程數 |
background.threads | 10 | 后台線程序 |
num.partitions | 1 | 默認分區數 |
socket.send.buffer.bytes | 102400 | socket SO_SNDBUFF參數 |
socket.receive.buffer.bytes | 102400 | socket SO_RCVBUFF參數 |
zookeeper.connect | localhost:2182/kafka | 指定zookeeper連接字符串, 格式如hostname:port/chroot。chroot是一個namespace |
zookeeper.connection.timeout.ms | 6000 | 指定客戶端連接zookeeper的最大超時時間 |
zookeeper.session.timeout.ms | 6000 | 連接zk的session超時時間 |
zookeeper.sync.time.ms | 2000 | zk follower落后於zk leader的最長時間 |
2.1 producer參數說明(配置文件位於config/producer.properties或者在程序內定義)

#指定kafka節點列表,用於獲取metadata,不必全部指定 metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092 # 指定分區處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區 #partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner # 是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。 compression.codec=none # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認為kafka.serializer.DefaultEncoder,即byte[] serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder # serializer.class=kafka.serializer.DefaultEncoder # serializer.class=kafka.serializer.StringEncoder # 如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮。 #compressed.topics= ########### request ack ############### # producer接收消息ack的時機.默認為0. # 0: producer不會等待broker發送ack # 1: 當leader接收到消息之后發送ack # 2: 當所有的follower都同步消息成功后發送ack. request.required.acks=0 # 在向producer發送ack之前,broker允許等待的最大時間 # 如果超時,broker將會向producer發送一個error ACK.意味着上一次消息因為某種 # 原因未能成功(比如follower未能同步成功) request.timeout.ms=10000 ########## end ##################### # 同步還是異步發送消息,默認“sync”表同步,"async"表異步。異步可以提高發送吞吐量, # 也意味着消息將會在本地buffer中,並適時批量發送,但是也可能導致丟失未發送過去的消息 producer.type=sync ############## 異步發送 (以下四個異步參數可選) #################### # 在async模式下,當message被緩存的時間超過此值后,將會批量發送給broker,默認為5000ms # 此值和batch.num.messages協同工作. queue.buffering.max.ms = 5000 # 在async模式下,producer端允許buffer的最大消息量 # 無論如何,producer都無法盡快的將消息發送給broker,從而導致消息在producer端大量沉積 # 此時,如果消息的條數達到閥值,將會導致producer端阻塞或者消息被拋棄,默認為10000 queue.buffering.max.messages=20000 # 如果是異步,指定每次批量發送數據量,默認為200 batch.num.messages=500 # 當消息在producer端沉積的條數達到"queue.buffering.max.meesages"后 # 阻塞一定時間后,隊列仍然沒有enqueue(producer仍然沒有發送出任何消息) # 此時producer可以繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間 # -1: 無阻塞超時限制,消息不會被拋棄 # 0:立即清空隊列,消息被拋棄 queue.enqueue.timeout.ms=-1 ################ end ############### # 當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發的次數 # 因為broker並沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失) # 有可能導致broker接收到重復的消息,默認值為3. message.send.max.retries=3 # producer刷新topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況 # 因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即刷新 # (比如topic失效,partition丟失,leader失效等),此外也可以通過此參數來配置額外的刷新機制,默認值600000 topic.metadata.refresh.interval.ms=60000
2.2 consumer參數說明(配置文件位於config/consumer.properties或者在程序內定義)

# zookeeper連接服務器地址,此處為線下測試環境配置(kafka消息服務-->kafka broker集群線上部署環境wiki) # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka # zookeeper的session過期時間,默認5000ms,用於檢測消費者是否掛掉,當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡 zookeeper.session.timeout.ms=5000 zookeeper.connection.timeout.ms=10000 # 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的消息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的消息 zookeeper.sync.time.ms=2000 #指定消費組 group.id=xxx # 當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息 # 注意offset信息並不是每消費一次消息就向zk提交一次,而是現在本地保存(內存),並定期提交,默認為true auto.commit.enable=true # 自動更新時間。默認60 * 1000 auto.commit.interval.ms=1000 # 當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤消息消費情況,便於觀察 conusmer.id=xxx # 消費者客戶端編號,用於區分不同客戶端,默認客戶端程序自動產生 client.id=xxxx # 最大取多少塊緩存到消費者(默認10) queued.max.message.chunks=50 # 當有新的consumer加入到group時,將會reblance,此后將會有partitions的消費端遷移到新 # 的consumer上,如果一個consumer獲得了某個partition的消費權限,那么它將會向zk注冊 # "Partition Owner registry"節點信息,但是有可能此時舊的consumer尚沒有釋放此節點, # 此值用於控制,注冊節點的重試次數. rebalance.max.retries=5 # 獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk # 每次feth將得到多條消息,此值為總大小,提升此值,將會消耗更多的consumer端內存 fetch.min.bytes=6553600 # 當消息的尺寸不足時,server阻塞的時間,如果超時,消息將立即發送給consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 如果zookeeper沒有offset值或offset值超出范圍。那么就給個初始的offset。有smallest、largest、 # anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest auto.offset.reset=smallest # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認為kafka.serializer.DefaultDecoder,即byte[] derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder
3. 例:
接口 KafkaProperties.java
public interface KafkaProperties { final static String zkConnect = "192.168.1.160:2181"; final static String groupId = "group1"; final static String topic = "topic1"; // final static String kafkaServerURL = "192.168.1.160"; // final static int kafkaServerPort = 9092; // final static int kafkaProducerBufferSize = 64 * 1024; // final static int connectionTimeOut = 20000; // final static int reconnectInterval = 10000; // final static String topic2 = "topic2"; // final static String topic3 = "topic3"; // final static String clientId = "SimpleConsumerDemoClient"; }
生產者 KafkaProducer.java
import java.util.Properties; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducer extends Thread { private final kafka.javaapi.producer.Producer<Integer, String> producer; private final String topic; private final Properties props = new Properties(); public KafkaProducer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "192.168.1.160:9092"); // 配置kafka端口 producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } @Override public void run() { int messageNo = 1; while (true) { String messageStr = new String("This is a message, number: " + messageNo); System.out.println("Send:" + messageStr); producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); messageNo++; try { sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
消費者 KafkaConsumer.java
import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private final ConsumerConnector consumer; private final String topic; public KafkaConsumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); // zookeeper的地址 props.put("group.id", KafkaProperties.groupId); // 組ID //zk連接超時 props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } @Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println("receive:" + new String(it.next().message())); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
執行函數 KafkaConsumerProducerDemo.java
public class KafkaConsumerProducerDemo { public static void main(String[] args) { KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic); producerThread.start(); KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic); consumerThread.start(); } }
-----------------------------
另一個例子:http://www.cnblogs.com/sunxucool/p/3913919.html
Producer端代碼
1) producer.properties文件:此文件放在/resources目錄下

#partitioner.class= metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093 ##,127.0.0.1:9093 producer.type=sync compression.codec=0 serializer.class=kafka.serializer.StringEncoder ##在producer.type=async時有效 #batch.num.messages=100
2) LogProducer.java代碼樣例

package com.test.kafka; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class LogProducer { private Producer<String,String> inner; public LogProducer() throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); ProducerConfig config = new ProducerConfig(properties); inner = new Producer<String, String>(config); } public void send(String topicName,String message) { if(topicName == null || message == null){ return; } KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); inner.send(km); } public void send(String topicName,Collection<String> messages) { if(topicName == null || messages == null){ return; } if(messages.isEmpty()){ return; } List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); for(String entry : messages){ KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); kms.add(km); } inner.send(kms); } public void close(){ inner.close(); } /** * @param args */ public static void main(String[] args) { LogProducer producer = null; try{ producer = new LogProducer(); int i=0; while(true){ producer.send("test-topic", "this is a sample" + i); i++; Thread.sleep(2000); } }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){ producer.close(); } } } }
五.Consumer端
1) consumer.properties:文件位於/resources目錄下

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 ##,127.0.0.1:2182,127.0.0.1:2183 # timeout in ms for connecting to zookeeper zookeeper.connectiontimeout.ms=1000000 #consumer group id group.id=test-group #consumer timeout #consumer.timeout.ms=5000
2) LogConsumer.java代碼樣例

package com.test.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class LogConsumer { private ConsumerConfig config; private String topic; private int partitionsNum; private MessageExecutor executor; private ConsumerConnector connector; private ExecutorService threadPool; public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties")); config = new ConsumerConfig(properties); this.topic = topic; this.partitionsNum = partitionsNum; this.executor = executor; } public void start() throws Exception{ connector = Consumer.createJavaConsumerConnector(config); Map<String,Integer> topics = new HashMap<String,Integer>(); topics.put(topic, partitionsNum); Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics); List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic); threadPool = Executors.newFixedThreadPool(partitionsNum); for(KafkaStream<byte[], byte[]> partition : partitions){ threadPool.execute(new MessageRunner(partition)); } } public void close(){ try{ threadPool.shutdownNow(); }catch(Exception e){ // }finally{ connector.shutdown(); } } class MessageRunner implements Runnable{ private KafkaStream<byte[], byte[]> partition; MessageRunner(KafkaStream<byte[], byte[]> partition) { this.partition = partition; } public void run(){ ConsumerIterator<byte[], byte[]> it = partition.iterator(); while(it.hasNext()){ MessageAndMetadata<byte[],byte[]> item = it.next(); System.out.println("partiton:" + item.partition()); System.out.println("offset:" + item.offset()); executor.execute(new String(item.message()));//UTF-8 } } } interface MessageExecutor { public void execute(String message); } /** * @param args */ public static void main(String[] args) { LogConsumer consumer = null; try{ MessageExecutor executor = new MessageExecutor() { public void execute(String message) { System.out.println(message); } }; consumer = new LogConsumer("test-topic", 2, executor); consumer.start(); }catch(Exception e){ e.printStackTrace(); }finally{ // if(consumer != null){ // consumer.close(); // } } } }