這個實例中有一個KafkaSpout,一個KafkaBolt,一個自定義Bolt QueryBolt。數據流程是KafkaSpout從topic為recommend的消息隊列中取出String類型的消息,發送給QueryBolt。QueryBolt不做任何處理,直接轉發給KafkaBolt,只把經過的消息存儲在list。QueryBolt中自定義了cleanup方法,該方法在topology被殺死時調用,方法中把list中的所有數據打印在"C://"+this+".txt"文件中。KafkaBolt將接收到的數據直接轉存在主題為recevier的kafka消息隊列中。
然后是MessageScheme.java
最后是QueryBolt.java
問題1:zkRoot如何設置?非常重要,設置錯誤無法正確從kafka消息隊列中取出數據。
代碼結構:

以下是詳細代碼:
首先是topology.java
import
java.util.HashMap;
import
java.util.Map;
import
backtype.storm.Config;
import
backtype.storm.LocalCluster;
//import backtype.storm.LocalCluster;
import
backtype.storm.StormSubmitter;
import
backtype.storm.spout.SchemeAsMultiScheme;
import
backtype.storm.topology.TopologyBuilder;
import
storm.kafka.BrokerHosts;
import
storm.kafka.KafkaSpout;
import
storm.kafka.SpoutConfig;
import
storm.kafka.ZkHosts;
import
storm.kafka.bolt.KafkaBolt;
public
class
topology {
public
static
void
main(String [] args)
throws
Exception{
//配置zookeeper 主機:端口號
BrokerHosts brokerHosts =
new
ZkHosts(
"110.64.76.130:2181,110.64.76.131:2181,110.64.76.132:2181"
);
//接收消息隊列的主題
String topic=
"recommend"
;
//zookeeper設置文件中的配置,如果zookeeper配置文件中設置為主機名:端口號 ,該項為空
String
zkRoot
=
""
;
//任意
String spoutId=
"zhou"
;
SpoutConfig spoutConfig=
new
SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
//設置如何處理kafka消息隊列輸入流
spoutConfig.scheme=
new
SchemeAsMultiScheme(
new
MessageScheme());
Config conf=
new
Config();
//不輸出調試信息
conf.setDebug(
false
);
//設置一個spout task中處於pending狀態的最大的tuples數量
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,
1
);
Map<String, String> map=
new
HashMap<String,String>();
// 配置Kafka broker地址
map.put(
"metadata.broker.list"
,
"master:9092,slave1:9092,slave2:9092"
);
// serializer.class為消息的序列化類
map.put(
"serializer.class"
,
"kafka.serializer.StringEncoder"
);
conf.put(
"kafka.broker.properties"
, map);
// 配置KafkaBolt生成的topic
conf.put(
"topic"
,
"receiver"
);
TopologyBuilder builder =
new
TopologyBuilder();
builder.setSpout(
"spout"
,
new
KafkaSpout(spoutConfig),
1
);
builder.setBolt(
"bolt1"
,
new
QueryBolt(),
1
).setNumTasks(
1
).shuffleGrouping(
"spout"
);
builder.setBolt(
"bolt2"
,
new
KafkaBolt<String, String>(),
1
).setNumTasks(
1
).shuffleGrouping(
"bolt1"
);
if
(args.length==
0
){
LocalCluster cluster =
new
LocalCluster();
//提交本地集群
cluster.submitTopology(
"test"
, conf, builder.createTopology());
//等待6s之后關閉集群
Thread.sleep(
6000
);
//關閉集群
cluster.shutdown();
}
StormSubmitter.submitTopology(
"test"
, conf, builder.createTopology());
}
}
import
java.io.UnsupportedEncodingException;
import
java.util.List;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
import
backtype.storm.spout.Scheme;
import
backtype.storm.tuple.Fields;
import
backtype.storm.tuple.Values;
public
class
MessageScheme
implements
Scheme {
private
static
final
Logger LOGGER = LoggerFactory.getLogger(MessageScheme.
class
);
public
List<Object> deserialize(
byte
[] ser) {
try
{
//從kafka中讀取的值直接序列化為UTF-8的str
String mString=
new
String(ser,
"UTF-8"
);
return
new
Values(mString);
}
catch
(UnsupportedEncodingException e) {
// TODO Auto-generated catch block
LOGGER.error(
"Cannot parse the provided message"
);
}
return
null
;
}
public
Fields getOutputFields() {
// TODO Auto-generated method stub
return
new
Fields(
"msg"
);
}
}
import
java.io.FileNotFoundException;
import
java.io.FileOutputStream;
import
java.io.IOException;
import
java.io.PrintStream;
import
java.util.ArrayList;
import
java.util.List;
import
java.util.Map;
import
java.util.Vector;
import
backtype.storm.task.OutputCollector;
import
backtype.storm.task.TopologyContext;
import
backtype.storm.topology.IRichBolt;
import
backtype.storm.topology.OutputFieldsDeclarer;
import
backtype.storm.tuple.Fields;
import
backtype.storm.tuple.Tuple;
import
backtype.storm.tuple.Values;
public
class
QueryBolt
implements
IRichBolt {
List<String> list;
OutputCollector collector;
public
void
prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
list=
new
ArrayList<String>();
this
.collector=collector;
}
public
void
execute(Tuple input) {
// TODO Auto-generated method stub
String str=(String) input.getValue(
0
);
//將str加入到list
list.add(str);
//發送ack
collector.ack(input);
//發送該str
collector.emit(
new
Values(str));
}
public
void
cleanup() {
//topology被killed時調用
//將list的值寫入到文件
try
{
FileOutputStream outputStream=
new
FileOutputStream(
"C://"
+
this
+
".txt"
);
PrintStream p=
new
PrintStream(outputStream);
p.println(
"begin!"
);
p.println(list.size());
for
(String tmp:list){
p.println(tmp);
}
p.println(
"end!"
);
try
{
p.close();
outputStream.close();
}
catch
(IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
catch
(FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public
void
declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(
new
Fields(
"message"
));
}
public
Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return
null
;
}
}
觀察 server.properties 文件:
zookeeper.connect=master:2181,slave1:2181,slave2:2181
此時zkRoot="";
如果zookeeper.connect=master:2181,slave1:2181,slave2:2181/ok
此時zkRoot等於"/ok"
問題2:為什么KafkaSpout啟動之后,不能從頭開始讀起,而是自動跳過了kafka消息隊列之前的內容,只處理KafkaSpout啟動之后消息隊列中新增的值?
因為KafkaSpout默認跳過了Kafka消息隊列之前就存在的值,如果要從頭開始處理,那么需要設置spoutConfig.forceFromStart=true,即從offset最小的開始讀起。
附錄:KafkaSpout中關於 SpoutConfig的相關定義
SpoutConfig繼承自KafkaConfig。由於SpoutConfig和KafkaConfig所有的instance field全是
public
, 因此在使用構造方法后,可以直接設置各個域的值。
public
class
SpoutConfig
extends
KafkaConfig
implements
Serializable {
public
List<String> zkServers =
null
;
//記錄Spout讀取進度所用的zookeeper的host
public
Integer zkPort =
null
;
//記錄進度用的zookeeper的端口
public
String zkRoot =
null
;
//進度信息記錄於zookeeper的哪個路徑下
public
String id =
null
;
//進度記錄的id,想要一個新的Spout讀取之前的記錄,應把它的id設為跟之前的一樣。
public
long
stateUpdateIntervalMs =
2000
;
//用於metrics,多久更新一次狀態。
public
SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super
(hosts, topic);
this
.zkRoot = zkRoot;
this
.id = id;
}
}
public
class
KafkaConfig
implements
Serializable {
public
final
BrokerHosts hosts;
//用以獲取Kafka broker和partition的信息
public
final
String topic;
//從哪個topic讀取消息
public
final
String clientId;
// SimpleConsumer所用的client id
public
int
fetchSizeBytes =
1024
*
1024
;
//發給Kafka的每個FetchRequest中,用此指定想要的response中總的消息的大小
public
int
socketTimeoutMs =
10000
;
//與Kafka broker的連接的socket超時時間
public
int
fetchMaxWait =
10000
;
//當服務器沒有新消息時,消費者會等待這些時間
public
int
bufferSizeBytes =
1024
*
1024
;
//SimpleConsumer所使用的SocketChannel的讀緩沖區大小
public
MultiScheme scheme =
new
RawMultiScheme();
//從Kafka中取出的byte[],該如何反序列化
public
boolean
forceFromStart =
false
;
//是否強制從Kafka中offset最小的開始讀起
public
long
startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
//從何時的offset時間開始讀,默認為最舊的offset
public
long
maxOffsetBehind =
100000
;
//KafkaSpout讀取的進度與目標進度相差多少,相差太多,Spout會丟棄中間的消息
public
boolean
useStartOffsetTimeIfOffsetOutOfRange =
true
;
//如果所請求的offset對應的消息在Kafka中不存在,是否使用startOffsetTime
public
int
metricsTimeBucketSizeInSecs =
60
;
//多長時間統計一次metrics
public
KafkaConfig(BrokerHosts hosts, String topic) {
this
(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
}
public
KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
this
.hosts = hosts;
this
.topic = topic;
this
.clientId = clientId;
}
}