kafkautil:

import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; public class KafkaUtil { @Value("#{sys['connect']}") private static String zkConnect ; @Value("#{sys['metadata.broker.list']}") private static String brokerList; @Value("#{sys['request.required.acks']}") private static String ack; private static Producer<String, String> producer = null; /*static{ Properties p = PropertiesUtil.getProperties("kafka.properties"); zkConnect = (String) p.get("zk.connect"); brokerList = (String) p.get("metadata.broker.list"); ack = (String) p.get("request.required.acks"); topic = (String) p.get("topic.imeidata"); } */ public static Producer<String,String> getProducer(){ if(producer == null){ Properties p = PropertiesUtil.getProperties("kafka.properties"); zkConnect = (String) p.get("zk.connect"); brokerList = (String) p.get("metadata.broker.list"); ack = (String) p.get("request.required.acks"); Properties props = new Properties(); props.put("zk.connect", zkConnect); props.put("metadata.broker.list", brokerList); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", ack); props.put("producer.type", "async");//是否同步 sync:同步 async:異步 props.put("partitioner.class", "com.kafka.SendPartitioner");//發送到多個分區進行分布式存儲的分區算法類 props.put("request.timeout.ms", "50000"); props.put("queue.buffering.max.ms", "10000");//默認值5000 異步模式下,每隔此時間間隔會將緩沖的消息提交一次 props.put("batch.num.messages", "1000");//默認值200 異步模式下,一次批量提交消息的條數, //但如果間隔時間超過 queue.buffering.max.ms 的值,不管有沒有達到批量提交的設值,都會進行一次提交 ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } return producer; } }
kafka消息發送類的屬性:
1:zk.connect:zk服務端連接地址
2:metadata.broker.list:zk客戶端地址
3:serializer.class:kafka消息發送序列化格式
4:request.required.acks:是否確認消息消費機制 它有三個選項:1,0,-1
0,意味着producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些數據。經測試,每10K消息大約會丟幾百條消息。
1,意味着在leader replica已經接收到數據后,producer會得到一個ack。這個選項提供了更好的持久性,因為在server確認請求成功處理后,client才會返回。如果剛寫到leader上,還沒來得及復制leader就掛了,那么消息才可能會 丟失。
-1,意味着在所有的ISR都接收到數據后,producer才得到一個ack。這個選項提供了最好的持久性,只要還有一個replica存活,那么數據就不會丟失。經測試,100W條消息沒有丟消息。
5:request.timeout.ms:請求超時
6:producer.type 是否同步 它有兩個選項 sync:同步 async:異步 同步模式下,每發送一次消息完畢才會返回 在異步模式下,可以選擇異步參數。
7:queue.buffering.max.ms:默認值5000 異步模式下,每隔此時間間隔會將緩沖的消息提交一次
8:batch.num.messages:默認值200 異步模式下,一次批量提交消息的條數,但如果間隔時間超過 queue.buffering.max.ms 的值,不管有沒有達到批量提交的設值,都會進行一次提交
9:partitioner.class:自定義分區算法
在一個kafka集群中,每一個節點稱為一個broker,所以進入zk通過/ls命令查看根目錄有個brokers目錄(kafka默認安裝配置文件是放在zk根目錄,我更喜歡入在自定義目錄下),這里保存了當前kafka集群在正在運行的節點名:

只有將所有消息最大限度平均的發送到每個broker上去,才能達到最好的集群效果。那么kafka是通過什么來保證這一點的呢。
kafka消息類KeyedMessae中有一個方法,參數分別為將要發送消息的隊列,和消息KEY,VALUE。通過對KEY的HASH值求broker的個數求模,將會得到broker值,它就是將接收消息的節點。
可以自定義分區實現類,並在屬性中指明:

import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SendPartitioner implements Partitioner{ public SendPartitioner(VerifiableProperties verifiableProperties) {} @Override public int partition(Object key, int numPartitions) { try { return Math.abs(key.hashCode() % numPartitions); } catch (Exception e) { return Math.abs(key.hashCode() % numPartitions); } } }
numPartitions 指的是kafka集群節點數,不用顯式指定,它可以通過zk實時得到此值。
以上屬性大都可以通過kafka的安裝配置文件來指定。但一個kafka集群可能並不止服務一個隊列或者一個項目。不同的項目具體業務需求不同,所以最好是在各個項目提定具體的參數。
Storm:
storm與kafka集成有第三方框架,叫做storm-kafka.jar。簡而言之,它其實只做了一件事情。就是已經寫好了storm的spout,我們只需要編寫bolt和提交topology即可實現storm.
它幫我們實現了kafka消費端相對難把握的一件事情,就是偏移量offset的問題。如果你不想每次啟動都重頭讀取kafka消息,盡量避免消息重復消費,那必須要保證良好的偏移量機制。特別是在多個用戶組和隊列的情況下。
代碼:

import com.util.PropertiesUtil; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; public class Topology { private static String topic; private static String brokerZkStr; private static String brokerZkPath; private static String offset; private static String app; static{ Properties p = PropertiesUtil.getProperties("storm.properties"); brokerZkStr = (String) p.get("brokerZkStr"); brokerZkPath = (String) p.get("brokerZkPath"); topic = (String) p.get("kafka.topic"); offset = (String) p.get("kafka.offset"); app = (String) p.get("kafka.app"); } public static void main(String[] args) throws InterruptedException { ZkHosts zk = new ZkHosts(brokerZkStr,brokerZkPath); SpoutConfig spoutConf = new SpoutConfig(zk, topic, offset,//偏移量 offset 的根目錄 app);//對應一個應用 List<String> zkServices = new ArrayList<>(); for(String str : zk.brokerZkStr.split(",")){ zkServices.add(str.split(":")[0]); } spoutConf.zkServers = zkServices; spoutConf.zkPort = 2181; spoutConf.forceFromStart = false;// true:從頭消費 false:從offset處消費 spoutConf.socketTimeoutMs = 60 * 1000; spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConf),4); //builder.setSpout("spout", new TestSpout(),5); builder.setBolt("bolt1", new GetMsgBolt(),4).shuffleGrouping("spout"); Config config = new Config(); config.setDebug(false); config.setNumWorkers(4); if(args.length>0){ try { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } }else{ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("MyTopology", config, builder.createTopology()); } } }
屬性:
1:brokerZkPath kafka集群在zk里的根目錄,默認是brokers
2:kafka.offset kafka消息隊列偏移量記錄在zk中的位置
3:kafka.app 實際上就是kafka.offset的子目錄,父級目錄規定了kafka集群消息offset的總位置,子目錄是具體每個隊列或者應用消息的偏移量,避免在多用戶組多隊列情況下偏移量錯亂的情況。