storm集成kafka


kafkautil:

 
         
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;

import org.springframework.beans.factory.annotation.Value;

public class KafkaUtil {
    
    @Value("#{sys['connect']}")
    private static String zkConnect ;
    @Value("#{sys['metadata.broker.list']}")
    private static String brokerList;
    @Value("#{sys['request.required.acks']}")
    private static String ack;
    
    private static Producer<String, String> producer = null;
    
    /*static{
        Properties p = PropertiesUtil.getProperties("kafka.properties");
        zkConnect = (String) p.get("zk.connect");
        brokerList = (String) p.get("metadata.broker.list");
        ack = (String) p.get("request.required.acks");
        topic = (String) p.get("topic.imeidata");
    }
    */
    public static Producer<String,String> getProducer(){
        if(producer == null){
            Properties p = PropertiesUtil.getProperties("kafka.properties");
            zkConnect = (String) p.get("zk.connect");
            brokerList = (String) p.get("metadata.broker.list");
            ack = (String) p.get("request.required.acks");
            
            Properties props = new Properties();
            props.put("zk.connect", zkConnect);
            props.put("metadata.broker.list", brokerList);
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", ack);
            props.put("producer.type", "async");//是否同步 sync:同步   async:異步  
            props.put("partitioner.class", "com.kafka.SendPartitioner");//發送到多個分區進行分布式存儲的分區算法類
            
            props.put("request.timeout.ms", "50000");
            props.put("queue.buffering.max.ms", "10000");//默認值5000  異步模式下,每隔此時間間隔會將緩沖的消息提交一次
            props.put("batch.num.messages", "1000");//默認值200  異步模式下,一次批量提交消息的條數,
                                                    //但如果間隔時間超過 queue.buffering.max.ms 的值,不管有沒有達到批量提交的設值,都會進行一次提交
            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer<String, String>(config);
        }
        return producer;
    }
   
}
View Code
kafka消息發送類的屬性:
1:zk.connect:zk服務端連接地址

2:metadata.broker.list:zk客戶端地址
3:serializer.class:kafka消息發送序列化格式

4:request.required.acks:是否確認消息消費機制 它有三個選項:1,0,-1
0,意味着producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些數據。經測試,每10K消息大約會丟幾百條消息。

1,意味着在leader replica已經接收到數據后,producer會得到一個ack。這個選項提供了更好的持久性,因為在server確認請求成功處理后,client才會返回。如果剛寫到leader上,還沒來得及復制leader就掛了,那么消息才可能會 丟失。

     -1,意味着在所有的ISR都接收到數據后,producer才得到一個ack。這個選項提供了最好的持久性,只要還有一個replica存活,那么數據就不會丟失。經測試,100W條消息沒有丟消息。

  

5:request.timeout.ms:請求超時
6:producer.type 是否同步 它有兩個選項 sync:同步   async:異步  同步模式下,每發送一次消息完畢才會返回 在異步模式下,可以選擇異步參數。
7:queue.buffering.max.ms:默認值5000  異步模式下,每隔此時間間隔會將緩沖的消息提交一次
8:batch.num.messages:默認值200  異步模式下,一次批量提交消息的條數,但如果間隔時間超過 queue.buffering.max.ms 的值,不管有沒有達到批量提交的設值,都會進行一次提交
9:partitioner.class:自定義分區算法
在一個kafka集群中,每一個節點稱為一個broker,所以進入zk通過/ls命令查看根目錄有個brokers目錄(kafka默認安裝配置文件是放在zk根目錄,我更喜歡入在自定義目錄下),這里保存了當前kafka集群在正在運行的節點名:
  
只有將所有消息最大限度平均的發送到每個broker上去,才能達到最好的集群效果。那么kafka是通過什么來保證這一點的呢。
 
kafka消息類KeyedMessae中有一個方法,參數分別為將要發送消息的隊列,和消息KEY,VALUE。通過對KEY的HASH值求broker的個數求模,將會得到broker值,它就是將接收消息的節點。
可以自定義分區實現類,並在屬性中指明:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SendPartitioner implements Partitioner{

    public SendPartitioner(VerifiableProperties verifiableProperties) {}

    @Override
    public int partition(Object key, int numPartitions) {
        try {
            return Math.abs(key.hashCode() % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }

}
View Code
numPartitions 指的是kafka集群節點數,不用顯式指定,它可以通過zk實時得到此值。


以上屬性大都可以通過kafka的安裝配置文件來指定。但一個kafka集群可能並不止服務一個隊列或者一個項目。不同的項目具體業務需求不同,所以最好是在各個項目提定具體的參數。


 Storm:

 storm與kafka集成有第三方框架,叫做storm-kafka.jar。簡而言之,它其實只做了一件事情。就是已經寫好了storm的spout,我們只需要編寫bolt和提交topology即可實現storm.

 它幫我們實現了kafka消費端相對難把握的一件事情,就是偏移量offset的問題。如果你不想每次啟動都重頭讀取kafka消息,盡量避免消息重復消費,那必須要保證良好的偏移量機制。特別是在多個用戶組和隊列的情況下。

 代碼:

 

import com.util.PropertiesUtil;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
    
public class Topology {
    
    private static String topic;
    private static String brokerZkStr;
    private static String brokerZkPath;
    private static String offset;
    private static String app;
    
    static{
        Properties p = PropertiesUtil.getProperties("storm.properties");
        brokerZkStr = (String) p.get("brokerZkStr");
        brokerZkPath = (String) p.get("brokerZkPath");
        topic = (String) p.get("kafka.topic");
        offset = (String) p.get("kafka.offset");
        app = (String) p.get("kafka.app");
         
    }
    
    
    public static void main(String[] args) throws InterruptedException {  
        ZkHosts zk = new ZkHosts(brokerZkStr,brokerZkPath);
        SpoutConfig spoutConf = new SpoutConfig(zk, topic, 
                offset,//偏移量 offset 的根目錄
                app);//對應一個應用
        
        List<String> zkServices = new ArrayList<>();
        
        for(String str : zk.brokerZkStr.split(",")){
            zkServices.add(str.split(":")[0]);
        }
        
        spoutConf.zkServers = zkServices;
        spoutConf.zkPort = 2181;
        spoutConf.forceFromStart = false;// true:從頭消費  false:從offset處消費
        spoutConf.socketTimeoutMs = 60 * 1000;
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new KafkaSpout(spoutConf),4);
        //builder.setSpout("spout", new TestSpout(),5);
        builder.setBolt("bolt1", new GetMsgBolt(),4).shuffleGrouping("spout");
        
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(4);
        if(args.length>0){
            try {
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else{
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyTopology", config, builder.createTopology());
        }
    }  
}
View Code

 

屬性:
1:brokerZkPath kafka集群在zk里的根目錄,默認是brokers
2:kafka.offset kafka消息隊列偏移量記錄在zk中的位置
3:kafka.app 實際上就是
kafka.offset的子目錄,父級目錄規定了kafka集群消息offset的總位置,子目錄是具體每個隊列或者應用消息的偏移量,避免在多用戶組多隊列情況下偏移量錯亂的情況。


 
        

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM