storm-kafka-0.8-plus 源碼解析


https://github.com/wurstmeister/storm-kafka-0.8-plus

http://blog.csdn.net/xeseo/article/details/18615761

 

准備,一些相關類

GlobalPartitionInformation (storm.kafka.trident)

記錄partitionid和broker的關系

GlobalPartitionInformation info = new GlobalPartitionInformation();

info.addPartition(0, new Broker("10.1.110.24",9092));

info.addPartition(0, new Broker("10.1.110.21",9092));

可以靜態的生成GlobalPartitionInformation,向上面代碼一樣
也可以動態的從zk獲取,推薦這種方式
從zk獲取就會用到DynamicBrokersReader

 

DynamicBrokersReader

核心就是從zk上讀出partition和broker的對應關系
操作zk都是使用curator框架

核心函數,

    /**
     * Get all partitions with their current leaders
     */
    public GlobalPartitionInformation getBrokerInfo() {
        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
        try {
            int numPartitionsForTopic = getNumPartitions(); //從zk取得partition的數目
            String brokerInfoPath = brokerPath();
            for (int partition = 0; partition < numPartitionsForTopic; partition++) {
                int leader = getLeaderFor(partition); //從zk獲取partition的leader broker
                String path = brokerInfoPath + "/" + leader;
                try {
                    byte[] brokerData = _curator.getData().forPath(path);
                    Broker hp = getBrokerHost(brokerData); //從zk獲取broker的host:port
                    globalPartitionInformation.addPartition(partition, hp);//生成GlobalPartitionInformation 
                } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                    LOG.error("Node {} does not exist ", path);
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
        return globalPartitionInformation;
    }

 

DynamicPartitionConnections

維護到每個broker的connection,並記錄下每個broker上對應的partitions

核心數據結構,為每個broker維持一個ConnectionInfo

Map<Broker, ConnectionInfo> _connections = new HashMap();

ConnectionInfo的定義,包含連接該broker的SimpleConsumer和記錄partitions的set

    static class ConnectionInfo {
        SimpleConsumer consumer;
        Set<Integer> partitions = new HashSet();

        public ConnectionInfo(SimpleConsumer consumer) {
            this.consumer = consumer;
        }
    }

核心函數,就是register

    public SimpleConsumer register(Broker host, int partition) {
        if (!_connections.containsKey(host)) {
            _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
        }
        ConnectionInfo info = _connections.get(host);
        info.partitions.add(partition);
        return info.consumer;
    }

 

 

PartitionManager

關鍵核心邏輯,用於管理一個partiiton的讀取狀態
先理解下面幾個變量,

Long _emittedToOffset;
Long _committedTo;
SortedSet<Long> _pending = new TreeSet<Long>();
LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();

kafka對於一個partition,一定是從offset從小到大按順序讀的,並且這里為了保證不讀丟數據,會定期的將當前狀態即offset寫入zk

幾個中間狀態,

從kafka讀到的offset,_emittedToOffset
從kafka讀到的messages會放入_waitingToEmit,放入這個list,我們就認為一定會被emit,所以emittedToOffset可以認為是從kafka讀到的offset

已經成功處理的offset,lastCompletedOffset
由於message是要在storm里面處理的,其中是可能fail的,所以正在處理的offset是緩存在_pending中的
如果_pending為空,那么lastCompletedOffset=_emittedToOffset
如果_pending不為空,那么lastCompletedOffset為pending list里面第一個offset,因為后面都還在等待ack

    public long lastCompletedOffset() {
        if (_pending.isEmpty()) {
            return _emittedToOffset;
        } else {
            return _pending.first();
        }
    }

 

已經寫入zk的offset,_committedTo
我們需要定期將lastCompletedOffset,寫入zk,否則crash后,我們不知道上次讀到哪兒了
所以_committedTo <= lastCompletedOffset

完整過程,

1. 初始化,

關鍵就是注冊partition,然后初始化offset,以知道從哪里開始讀

    public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
        _partition = id;
        _connections = connections;
        _spoutConfig = spoutConfig;
        _topologyInstanceId = topologyInstanceId;
        _consumer = connections.register(id.host, id.partition); //注冊partition到connections,並生成simpleconsumer
        _state = state;
        _stormConf = stormConf;

        String jsonTopologyId = null;
        Long jsonOffset = null;
        String path = committedPath();
        try {
            Map<Object, Object> json = _state.readJSON(path);
            LOG.info("Read partition information from: " + path +  " --> " + json );
            if (json != null) {
                jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
                jsonOffset = (Long) json.get("offset"); // 從zk中讀出commited offset
            }
        } catch (Throwable e) {
            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
        }

        if (jsonTopologyId == null || jsonOffset == null) { // zk中沒有記錄,那么根據spoutConfig.startOffsetTime設置offset,Earliest或Latest
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
            LOG.info("No partition information found, using configuration to determine offset");
        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
        } else {
            _committedTo = jsonOffset;
        }

        _emittedToOffset = _committedTo; // 初始化時,中間狀態都是一致的
    }

 

2. 從kafka讀取messages,放到_waitingToEmit

從kafka中讀到數據ByteBufferMessageSet,
把需要emit的msg,MessageAndRealOffset,放到_waitingToEmit
把沒完成的offset放到pending
更新emittedToOffset

    private void fill() {
        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
        for (MessageAndOffset msg : msgs) {
            _pending.add(_emittedToOffset);
            _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
            _emittedToOffset = msg.nextOffset();
        }
    }

其中fetch message的邏輯如下,

    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
        ByteBufferMessageSet msgs = null;
        String topic = config.topic;
        int partitionId = partition.partition;
        for (int errors = 0; errors < 2 && msgs == null; errors++) { // 容忍兩次錯誤
            FetchRequestBuilder builder = new FetchRequestBuilder();
            FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
                    clientId(config.clientId).build();
            FetchResponse fetchResponse;
            try {
                fetchResponse = consumer.fetch(fetchRequest);
            } catch (Exception e) {
                if (e instanceof ConnectException) {
                    throw new FailedFetchException(e);
                } else {
                    throw new RuntimeException(e);
                }
            }
            if (fetchResponse.hasError()) { // 主要處理offset outofrange的case,通過getOffset從earliest或latest讀
                KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
                if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
                    long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
                    LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
                            "retrying with default start offset time from configuration. " +
                            "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
                    offset = startOffset;
                } else {
                    String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                    LOG.error(message);
                    throw new FailedFetchException(message);
                }
            } else {
                msgs = fetchResponse.messageSet(topic, partitionId);
            }
        }
        return msgs;
    }

 

3. emit msg

從_waitingToEmit中取到msg,轉換成tuple,然后通過collector.emit發出去

    public EmitState next(SpoutOutputCollector collector) {
        if (_waitingToEmit.isEmpty()) {
            fill();
        }
        while (true) {
            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
            if (toEmit == null) {
                return EmitState.NO_EMITTED;
            }
            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
            if (tups != null) {
                for (List<Object> tup : tups) {
                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                }
                break;
            } else {
                ack(toEmit.offset);
            }
        }
        if (!_waitingToEmit.isEmpty()) {
            return EmitState.EMITTED_MORE_LEFT;
        } else {
            return EmitState.EMITTED_END;
        }
    }

可以看看轉換tuple的過程,
可以看到是通過kafkaConfig.scheme.deserialize來做轉換

    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
        Iterable<List<Object>> tups;
        ByteBuffer payload = msg.payload();
        ByteBuffer key = msg.key();
        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
        } else {
            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
        }
        return tups;
    }

所以你使用時,需要定義scheme邏輯,

spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());

public class TestMessageScheme implements Scheme {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);
    
    @Override
    public List<Object> deserialize(byte[] bytes) {
    try {
        String msg = new String(bytes, "UTF-8");
        return new Values(msg);
    } catch (InvalidProtocolBufferException e) {
         LOGGER.error("Cannot parse the provided message!");
    }
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("msg");
    }
}

 

4. 定期的commit offset

    public void commit() {
        long lastCompletedOffset = lastCompletedOffset();
        if (lastCompletedOffset != lastCommittedOffset()) {
            Map<Object, Object> data = ImmutableMap.builder()
                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                    .put("offset", lastCompletedOffset)
                    .put("partition", _partition.partition)
                    .put("broker", ImmutableMap.of("host", _partition.host.host,
                            "port", _partition.host.port))
                    .put("topic", _spoutConfig.topic).build();
            _state.writeJSON(committedPath(), data);
            _committedTo = lastCompletedOffset;
        } else {
            LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
        }
    }

 

5. 最后關注一下,fail時的處理

首先作者沒有cache message,而只是cache offset
所以fail的時候,他是無法直接replay的,在他的注釋里面寫了,不這樣做的原因是怕內存爆掉

所以他的做法是,當一個offset fail的時候, 直接將_emittedToOffset回滾到當前fail的這個offset
下次從Kafka fetch的時候會從_emittedToOffset開始讀,這樣做的好處就是依賴kafka做replay,問題就是會有重復問題
所以使用時,一定要考慮,是否可以接受重復問題

    public void fail(Long offset) {
        //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
        // things might get crazy with lots of timeouts
        if (_emittedToOffset > offset) {
            _emittedToOffset = offset;
            _pending.tailSet(offset).clear();
        }
    }

 

KafkaSpout

最后來看看KafkaSpout

1. 初始化
關鍵就是初始化DynamicPartitionConnections和_coordinator

    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
        _collector = collector;

        Map stateConf = new HashMap(conf);
        List<String> zkServers = _spoutConfig.zkServers;
        if (zkServers == null) {
            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
        }
        Integer zkPort = _spoutConfig.zkPort;
        if (zkPort == null) {
            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
        }
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
        _state = new ZkState(stateConf);

        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));

        // using TransactionalState like this is a hack
        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
        if (_spoutConfig.hosts instanceof StaticHosts) {
            _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        } else {
            _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        }
    }

看看_coordinator 是干嘛的?
這很關鍵,因為我們一般都會開多個並發的kafkaspout,類似於high-level中的consumer group,如何保證這些並發的線程不沖突?
使用和highlevel一樣的思路,一個partition只會有一個spout消費,這樣就避免處理麻煩的訪問互斥問題(kafka做訪問互斥很麻煩,試着想想)
是根據當前spout的task數和partition數來分配,task和partitioin的對應關系的,並且為每個partition建立PartitionManager

這里首先看到totalTasks就是當前這個spout component的task size
StaticCoordinator和ZkCoordinator的差別就是, 從StaticHost還是從Zk讀到partition的信息,簡單起見,看看StaticCoordinator實現

public class StaticCoordinator implements PartitionCoordinator {
    Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
    List<PartitionManager> _allManagers = new ArrayList();

    public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
        StaticHosts hosts = (StaticHosts) config.hosts;
        List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(hosts.getPartitionInformation(), totalTasks, taskIndex);
        for (Partition myPartition : myPartitions) {// 建立PartitionManager
            _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
        }
        _allManagers = new ArrayList(_managers.values());
    }

    @Override
    public List<PartitionManager> getMyManagedPartitions() {
        return _allManagers;
    }

    public PartitionManager getManager(Partition partition) {
        return _managers.get(partition);
    }

}

其中分配的邏輯在calculatePartitionsForTask

    public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
        Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
        List<Partition> partitions = partitionInformation.getOrderedPartitions();
        int numPartitions = partitions.size();
        List<Partition> taskPartitions = new ArrayList<Partition>();
        for (int i = taskIndex; i < numPartitions; i += totalTasks) {// 平均分配,
            Partition taskPartition = partitions.get(i);
            taskPartitions.add(taskPartition);
        }
        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
        return taskPartitions;
    }

 

2. nextTuple

邏輯寫的比較tricky,其實只要從一個partition讀成功一次
只所以要for,是當EmitState.NO_EMITTED時,需要遍歷后面的partition以保證讀成功一次

    @Override
    public void nextTuple() {
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); i++) {

            // in case the number of managers decreased
            _currPartitionIndex = _currPartitionIndex % managers.size(); //_currPartitionIndex初始為0,每次依次讀一個partition
            EmitState state = managers.get(_currPartitionIndex).next(_collector); //調用PartitonManager.next去emit數據
            if (state != EmitState.EMITTED_MORE_LEFT) { //當EMITTED_MORE_LEFT時,還有數據,可以繼續讀,不需要+1
                _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
            }
            if (state != EmitState.NO_EMITTED) { //當EmitState.NO_EMITTED時,表明partition的數據已經讀完,也就是沒有讀到數據,所以不能break
                break;
            }
        }

        long now = System.currentTimeMillis();
        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
            commit(); //定期commit
        }
    }

定期commit的邏輯,遍歷去commit每個PartitionManager

    private void commit() {
        _lastUpdateMs = System.currentTimeMillis();
        for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
            manager.commit();
        }
    }

 

3. Ack和Fail

直接調用PartitionManager

    @Override
    public void ack(Object msgId) {
        KafkaMessageId id = (KafkaMessageId) msgId;
        PartitionManager m = _coordinator.getManager(id.partition);
        if (m != null) {
            m.ack(id.offset);
        }
    }

    @Override
    public void fail(Object msgId) {
        KafkaMessageId id = (KafkaMessageId) msgId;
        PartitionManager m = _coordinator.getManager(id.partition);
        if (m != null) {
            m.fail(id.offset);
        }
    }

 

4. declareOutputFields
所以在scheme里面需要定義,deserialize和getOutputFields

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(_spoutConfig.scheme.getOutputFields());
    }

 

Metrics

再來看下Metrics,關鍵學習一下如何在storm里面加metrics
在spout.open里面初始化了下面兩個metrics

kafkaOffset
反映出每個partition的earliestTimeOffset,latestTimeOffset,和latestEmittedOffset,其中latestTimeOffset - latestEmittedOffset就是spout lag
除了反映出每個partition的,還會算出所有的partitions的總數據

        context.registerMetric("kafkaOffset", new IMetric() {
            KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);

            @Override
            public Object getValueAndReset() {
                List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); //從coordinator獲取pms的信息
                Set<Partition> latestPartitions = new HashSet();
                for (PartitionManager pm : pms) {
                    latestPartitions.add(pm.getPartition());
                }
                _kafkaOffsetMetric.refreshPartitions(latestPartitions); //根據最新的partition信息刪除metric中已經不存在的partition的統計信息
                for (PartitionManager pm : pms) {
                    _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset()); //更新metric中每個partition的已經完成的offset
                }
                return _kafkaOffsetMetric.getValueAndReset();
            }
        }, _spoutConfig.metricsTimeBucketSizeInSecs);

_kafkaOffsetMetric.getValueAndReset,其實只是get,不需要reset

@Override
        public Object getValueAndReset() {
            try {
                long totalSpoutLag = 0;
                long totalEarliestTimeOffset = 0;
                long totalLatestTimeOffset = 0;
                long totalLatestEmittedOffset = 0;
                HashMap ret = new HashMap();
                if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
                    for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
                        Partition partition = e.getKey();
                        SimpleConsumer consumer = _connections.getConnection(partition);
                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
                        long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
                        long latestEmittedOffset = e.getValue();
                        long spoutLag = latestTimeOffset - latestEmittedOffset;
                        ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
                        ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
                        ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
                        ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
                        totalSpoutLag += spoutLag;
                        totalEarliestTimeOffset += earliestTimeOffset;
                        totalLatestTimeOffset += latestTimeOffset;
                        totalLatestEmittedOffset += latestEmittedOffset;
                    }
                    ret.put("totalSpoutLag", totalSpoutLag);
                    ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
                    ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
                    ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
                    return ret;
                } else {
                    LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
                }
            } catch (Throwable t) {
                LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
            }
            return null;
        }

 

kafkaPartition
反映出從Kafka fetch數據的情況,fetchAPILatencyMax,fetchAPILatencyMean,fetchAPICallCount 和 fetchAPIMessageCount

        context.registerMetric("kafkaPartition", new IMetric() {
            @Override
            public Object getValueAndReset() {
                List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
                Map concatMetricsDataMaps = new HashMap();
                for (PartitionManager pm : pms) {
                    concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
                }
                return concatMetricsDataMaps;
            }
        }, _spoutConfig.metricsTimeBucketSizeInSecs);

pm.getMetricsDataMap(),

public Map getMetricsDataMap() {
        Map ret = new HashMap();
        ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
        ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
        ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
        ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
        return ret;
    }

更新的邏輯如下,

    private void fill() {
        long start = System.nanoTime();
        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
        long end = System.nanoTime();
        long millis = (end - start) / 1000000;
        _fetchAPILatencyMax.update(millis);
        _fetchAPILatencyMean.update(millis);
        _fetchAPICallCount.incr();
        int numMessages = countMessages(msgs);
        _fetchAPIMessageCount.incrBy(numMessages);
}

 

我們在讀取kafka時,

首先是關心,每個partition的讀取狀況,這個通過取得KafkaOffset Metrics就可以知道

再者,我們需要replay數據,使用high-level接口的時候可以通過系統提供的工具,這里如何搞?

看下下面的代碼,
第一個if,是從配置文件里面沒有讀到配置的情況
第二個else if,當topologyInstanceId發生變化時,並且forceFromStart為true時,就會取startOffsetTime指定的offset(Latest或Earliest)
這個topologyInstanceId, 每次KafkaSpout對象生成的時候隨機產生,
String _uuid = UUID.randomUUID().toString();
Spout對象是在topology提交時,在client端生成一次的,所以如果topology停止,再重新啟動,這個id一定會發生變化

所以應該是只需要把forceFromStart設為true,再重啟topology,就可以實現replay

        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
            LOG.info("No partition information found, using configuration to determine offset");
        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
        } else {
            _committedTo = jsonOffset;
            LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
        }

 

代碼例子

storm-kafka的文檔很差,最后附上使用的例子

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.BrokerHosts;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;
import storm.kafka.KeyValueScheme;

    public static class SimplekVScheme implements KeyValueScheme { //定義scheme
        @Override
        public List<Object> deserializeKeyAndValue(byte[] key, byte[] value){
            ArrayList tuple = new ArrayList();
            tuple.add(key);
            tuple.add(value);
            return tuple;
        }
        
        @Override
        public List<Object> deserialize(byte[] bytes) {
            ArrayList tuple = new ArrayList();
            tuple.add(bytes);
            return tuple;
        }

        @Override
        public Fields getOutputFields() {
            return new Fields("key","value");
        }

    }   

        String topic = “test”;  //
        String zkRoot = “/kafkastorm”; //
        String spoutId = “id”; //讀取的status會被存在,/kafkastorm/id下面,所以id類似consumer group
        
        BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181"); 

        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
        spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new SimplekVScheme());
        
        /*spoutConfig.zkServers = new ArrayList<String>(){{ //只有在local模式下需要記錄讀取狀態時,才需要設置
            add("10.118.136.107");
        }};
        spoutConfig.zkPort = 2181;*/
        
        spoutConfig.forceFromStart = false; 
        spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();    
        spoutConfig.metricsTimeBucketSizeInSecs = 6;

        builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), 1);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM