kafkaspot在ack機制下如何保證內存不溢


新浪微博:intsmaze劉洋洋哥。
 
storm框架中的kafkaspout類實現的是BaseRichSpout,它里面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證消息的重新發送;如果不實現ack機制,那么kafkaspout就無法得到消息的處理響應,就會在超時以后再次發送消息,導致消息的重復發送。
 
但是回想一下我們自己寫一個spout類實現BaseRichSpout並讓他具備消息重發,那么我們是會在我們的spout類里面定義一個map集合,並以msgId作為key。
public class MySpout extends BaseRichSpout {
    private static final long serialVersionUID = 5028304756439810609L;
    // key:messageId,Data
    private HashMap<String, String> waitAck = new HashMap<String, String>();
    private SpoutOutputCollector collector;
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    public void nextTuple() {
        String sentence = "the cow jumped over the moon";
        String messageId = UUID.randomUUID().toString().replaceAll("-", "");
        waitAck.put(messageId, sentence);
        //指定messageId,開啟ackfail機制
        collector.emit(new Values(sentence), messageId);
    }
    @Override
    public void ack(Object msgId) {
        System.out.println("消息處理成功:" + msgId);
        System.out.println("刪除緩存中的數據...");
        waitAck.remove(msgId);
    }
    @Override
    public void fail(Object msgId) {
        System.out.println("消息處理失敗:" + msgId);
        System.out.println("重新發送失敗的信息...");
        //重發如果不開啟ackfail機制,那么spout的map對象中的該數據不會被刪除的,而且下游
        collector.emit(new Values(waitAck.get(msgId)),msgId);
    }
}
 
那么kafkaspout會不會也是這樣還保存這已發送未收到bolt響應的消息呢?如果這樣,如果消息處理不斷失敗,不斷重發,消息不斷積累在kafkaspout節點上,kafkaspout端會不就會出現內存溢出?
 
其實並沒有,回想kafka的原理,Kafka會為每一個consumergroup保留一些metadata信息–當前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條消息后線性增加這個offset。當然,consumer也可將offset設成一個較小的值,重新消費一些消息。也就是說, kafkaspot在消費kafka的數據是,通過offset讀取到消息並發送給bolt后,kafkaspot只是保存者當前的offset值。
當失敗或成功根據msgId查詢offset值,然后再去kafka消費該數據來確保消息的重新發送。
 
那么雖然offset數據小,但是當offset的數據量上去了還是會內存溢出的?
其實並沒有,kafkaspout發現緩存的數據超過限制了,會把某端的數據清理掉的。
 
 
kafkaspot中發送數據的代碼
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
可以看到msgID里面包裝了offset參數。
它不緩存已經發送出去的數據信息。
 
當他接收到來至bolt的響應后,會從接收到的msgId中得到offset。以下是從源碼中折取的關鍵代碼:
public void ack(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
          m.ack(id.offset);
     }
 }
 m.ack(id.offset); public void ack(Long offset) {
     _pending.remove(offset);//處理成功移除offset
     numberAcked++;
 }

public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } m.fail(id.offset); public void fail(Long offset) {     failed.add(offset);//處理失敗添加offset numberFailed++; } SortedSet<Long> _pending = new TreeSet<Long>(); SortedSet<Long> failed = new TreeSet<Long>();

 

關於kafkaspot的源碼解析大家可以看這邊博客:http://www.cnblogs.com/cruze/p/4241181.html

源碼解析中涉及了很多kafka的概念,所以僅僅理解kafka的概念想完全理解kafkaspot源碼是很難的,如果不理解kafka概念,那么就只需要在理解storm的ack機制上明白kafkaspot做了上面的兩件事就可以了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM