本文系原創系列,轉載請注明。
原帖地址:http://blog.csdn.net/xeseo
前言
在前面Storm系列之——基本概念一文中,提到過Storm的Spout應該是源源不斷的取數據,不能間斷。那么,很顯然,消息隊列系統、分布式內存系統或內存數據庫是作為其數據源的很好的選擇。本文就如何集成Kafka進行介紹。
Kafka的基本介紹:http://blog.csdn.net/xeseo/article/details/18311955
准備工作
KafkaSpout其實網上已經有人寫了,在github上開源了,不用我們自己造輪子。只是要注意版本問題:
0.7版本的Kafka,對應KafkaSpout可以使用Storm-contrib下面的例子
源碼:https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka
Maven依賴:https://clojars.org/storm/storm-kafka
0.8版本的Kafka在API上和底層Offset的處理方式上發生了重大變化,所以老的KafkaSpout不再適用,必須使用新的KafkaAPI
源碼:https://github.com/wurstmeister/storm-kafka-0.8-plus
Maven依賴:https://clojars.org/net.wurstmeister.storm/storm-kafka-0.8-plus
這里因為0.8版本的Kafka必然是將來主流,所以我就不介紹0.7 的了,使用方式基本上是類似的。
PS:
是人寫的,就會有bug,何況是別人分享出來的。所以,遇到bug,還請去github上提交一個issue告訴作者修正。
2014/7/29 更新:
wurstmeister/storm-kafka-0.8-plus 現在合並到Apache Storm了,在其external/storm-kakfa目錄
Maven依賴直接更新成:
[plain] view plaincopyprint?
-
<dependency>
-
<groupId>org.apache.storm</groupId>
-
<artifactId>storm-kafka</artifactId>
-
<version>0.9.2-incubating</version>
-
</dependency>
但是storm似乎沒有直接把external的包加載到classpath,所以使用時,還得手動把該jar包從external/storm-kafka/下拷到storm的lib目錄。
當然,也可以在maven中加上<scope>compile</scope>,直接把該jar打到你項目一起。
使用KafkaSpout
一個KafkaSpout只能去處理一個topic的內容,所以,它要求初始化時提供如下與topic相關信息:
-
Kafka集群中的Broker地址 (IP+Port)
有兩種方法指定:
1. 使用靜態地址,即直接給定Kafka集群中所有Broker信息
[java] view plaincopyprint?
GlobalPartitionInformation info = new GlobalPartitionInformation();
info.addPartition(0, new Broker("10.1.110.24",9092));
info.addPartition(0, new Broker("10.1.110.21",9092));
BrokerHosts brokerHosts = new StaticHosts(info);
2. 從Zookeeper動態讀取
[java] view plaincopyprint?
BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181");
推薦使用這種方法,因為Kafka的Broker可能會動態的增減
-
topic名字
-
當前spout的唯一標識Id (以下代稱$spout_id)
-
zookeeper上用於存儲當前處理到哪個Offset了 (以下代稱$zk_root)
-
當前topic中數據如何解碼
了解Kafka的應該知道,Kafka中當前處理到哪的Offset是由客戶端自己管理的。所以,后面兩個的目的,其實是在zookeeper上建立一個 $zk_root/$spout_id 的節點,其值是一個map,存放了當前Spout處理的Offset的信息。
在Topology中加入Spout的代碼:
[java] view plaincopyprint?
-
String topic = "test";
-
String zkRoot = "kafkastorm";
-
String spoutId = "myKafka";
-
-
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
-
spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());
-
-
TopologyBuilder builder = new TopologyBuilder();
-
builder.setSpout("spout", new KafkaSpout(spoutConfig), spoutNum);
其中TestMessageScheme就是告訴KafkaSpout如何去解碼數據,生成Storm內部傳遞數據
[java] view plaincopyprint?
-
public class TestMessageScheme implements Scheme {
-
-
private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);
-
-
@Override
-
public List<Object> deserialize(byte[] bytes) {
-
try {
-
String msg = new String(bytes, "UTF-8");
-
return new Values(msg);
-
} catch (InvalidProtocolBufferException e) {
-
LOGGER.error("Cannot parse the provided message!");
-
}
-
-
//TODO: what happend if returns null?
-
return null;
-
}
-
-
@Override
-
public Fields getOutputFields() {
-
return new Fields("msg");
-
}
-
-
}
這個解碼方式是與Producer端生成時塞入數據的編碼方式配套的。這里我Producer端塞入的是String的byte,所以這里也還原成String,定義輸出為一個名叫"msg"的field。
后面就可以自己添加Bolt處理tuple中該field的數據了。
使用TransactionalTridentKafkaSpout
TransactionalTridentKafkaSpout是為事務性的Trident而用的。用法與KafkaSpout有所不同。
[java] view plaincopyprint?
-
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, topic, spoutId);
-
kafkaConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());
-
-
TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
-
-
TridentTopology topology = new TridentTopology();
-
topology.newStream("test_str", kafkaSpout).shuffle().each(new Fields("msg", new PrintFunction());
看到它並沒有要求我們提供zkRoot,因為直接代碼里面寫死了…… -_-T
地址是 /transactional/<STREAM_NAME>/<Spout_Id>,在上面的例子中,就是 /transactional/test_str/myKafaka
常見問題
1. 本地模式無法保存Offset
KafkaSpout初始化時,會去取spoutConfig.zkServers 和 spoutConfig.zkPort 變量的值,而該值默認是沒塞的,所以是空,那么它就會去取當前運行的Storm所配置的zookeeper地址和端口,而本地運行的Storm,是一個臨時的zookeeper實例,並不會真正持久化。所以,每次關閉后,數據就沒了。
本地模式,要顯示的去配置
[java] view plaincopyprint?
-
spoutConfig.zkServers = new ArrayList<String>(){{
-
add("10.1.110.20");
-
add("10.1.110.21");
-
add("10.1.110.24");
-
}};
-
spoutConfig.zkPort = 2181;
2. 用Maven導入時,運行中SLF4J打印MutipleBinding 錯誤,導致無log輸出。
原因是在這個KafkaSpout的pom.xml里依賴了kafka_2.9.2,而這貨帶了一個slf4j-simple的SLF4J綁定,修復這個問題
[html] view plaincopyprint?
-
<del><dependency>
-
<groupId>net.wurstmeister.storm</groupId>
-
<artifactId>storm-kafka-0.8-plus</artifactId>
-
<version>0.2.0</version>
-
<exclusion>
-
<groupId>org.slf4j</groupId>
-
<artifactId>slf4j-simple</artifactId>
-
</exclusion>
-
</dependency></del>
3. 如果在topology第一次啟動前,往kafka里面寫數據,啟動Storm后,這部分數據讀不出來
原因是第一次啟動topology時,在zookeeper上並未創建出保存Offset信息的節點,所以默認它會取當前partition最新的Offset(Kafka自己維護的單個partition上遞增序號)。
理論上,如果找不到保存的Offset信息,應該從-1的Offset讀起。
這個問題我給作者提出來了,但作者認為這樣可以避免重復處理,我沒有想通為何會有重復處理。但好在作者說會在后續版本加入參數來控制。
剛去看了下,似乎作者已經在提交 8b764cd fix掉了。有興趣的可以去試下。我是自己本地改了他的代碼。
以上問題已修復並合並。