kafkaspout以及kafkabolt的最簡實例


        這個實例中有一個KafkaSpout,一個KafkaBolt,一個自定義Bolt QueryBolt。數據流程是KafkaSpout從topic為recommend的消息隊列中取出String類型的消息,發送給QueryBolt。QueryBolt不做任何處理,直接轉發給KafkaBolt,只把經過的消息存儲在list。QueryBolt中自定義了cleanup方法,該方法在topology被殺死時調用,方法中把list中的所有數據打印在"C://"+this+".txt"文件中。KafkaBolt將接收到的數據直接轉存在主題為recevier的kafka消息隊列中。
        代碼結構:
         
        以下是詳細代碼:
首先是topology.java
import java.util.HashMap;
import java.util.Map;
 
import backtype.storm.Config;
import backtype.storm.LocalCluster;
//import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
 
public class topology {
    public static void main(String [] args) throws Exception{
        //配置zookeeper 主機:端口號
        BrokerHosts brokerHosts =new ZkHosts("110.64.76.130:2181,110.64.76.131:2181,110.64.76.132:2181");
        //接收消息隊列的主題
        String topic="recommend";
        //zookeeper設置文件中的配置,如果zookeeper配置文件中設置為主機名:端口號 ,該項為空
        String zkRoot="";
        //任意
        String spoutId="zhou";
        SpoutConfig spoutConfig=new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
        //設置如何處理kafka消息隊列輸入流
        spoutConfig.scheme=new SchemeAsMultiScheme(new MessageScheme());
        Config conf=new Config();
        //不輸出調試信息
        conf.setDebug(false);
        //設置一個spout task中處於pending狀態的最大的tuples數量
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        Map<String, String> map=new HashMap<String,String>();
        // 配置Kafka broker地址
        map.put("metadata.broker.list""master:9092,slave1:9092,slave2:9092");
        // serializer.class為消息的序列化類
        map.put("serializer.class""kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", map);
        // 配置KafkaBolt生成的topic
        conf.put("topic""receiver");
        TopologyBuilder builder =new TopologyBuilder();
        builder.setSpout("spout"new KafkaSpout(spoutConfig),1);
        builder.setBolt("bolt1"new QueryBolt(),1).setNumTasks(1).shuffleGrouping("spout");
        builder.setBolt("bolt2"new KafkaBolt<String, String>(),1).setNumTasks(1).shuffleGrouping("bolt1");
        if(args.length==0){
            LocalCluster cluster = new LocalCluster();
            //提交本地集群
            cluster.submitTopology("test", conf, builder.createTopology());
            //等待6s之后關閉集群
            Thread.sleep(6000);
            //關閉集群
            cluster.shutdown();
        }
        StormSubmitter.submitTopology("test", conf, builder.createTopology());
    }
}
然后是MessageScheme.java
import java.io.UnsupportedEncodingException;
import java.util.List;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
 
public class MessageScheme implements Scheme {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageScheme.class);
    public List<Object> deserialize(byte[] ser) {
        try {
            //從kafka中讀取的值直接序列化為UTF-8的str
            String mString=new String(ser, "UTF-8");
            return new Values(mString);
        catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            LOGGER.error("Cannot parse the provided message");
             
        }
        return null;
    }
 
    public Fields getOutputFields() {
        // TODO Auto-generated method stub
        return new Fields("msg");
    }
 
}
最后是QueryBolt.java
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Vector;
 
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
public class QueryBolt implements IRichBolt {
     
    List<String> list;
    OutputCollector collector;
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         
        list=new ArrayList<String>();
        this.collector=collector;
         
    }
 
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String str=(String) input.getValue(0);
        //將str加入到list
        list.add(str);
        //發送ack
        collector.ack(input);
        //發送該str
        collector.emit(new Values(str));
    }
 
    public void cleanup() {//topology被killed時調用
        //將list的值寫入到文件
        try {
            FileOutputStream outputStream=new FileOutputStream("C://"+this+".txt");
            PrintStream p=new PrintStream(outputStream);
            p.println("begin!");
            p.println(list.size());
            for(String tmp:list){
                p.println(tmp);
            }
            p.println("end!");
            try {
                p.close();
                outputStream.close();
            catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
             
        catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
     
        declarer.declare(new Fields("message"));
         
    }
 
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
 
}
問題1:zkRoot如何設置?非常重要,設置錯誤無法正確從kafka消息隊列中取出數據。
觀察 server.properties 文件:
zookeeper.connect=master:2181,slave1:2181,slave2:2181
此時zkRoot="";
如果zookeeper.connect=master:2181,slave1:2181,slave2:2181/ok
此時zkRoot等於"/ok"
問題2:為什么KafkaSpout啟動之后,不能從頭開始讀起,而是自動跳過了kafka消息隊列之前的內容,只處理KafkaSpout啟動之后消息隊列中新增的值?
因為KafkaSpout默認跳過了Kafka消息隊列之前就存在的值,如果要從頭開始處理,那么需要設置spoutConfig.forceFromStart=true,即從offset最小的開始讀起。
 
附錄:KafkaSpout中關於 SpoutConfig的相關定義
SpoutConfig繼承自KafkaConfig。由於SpoutConfig和KafkaConfig所有的instance field全是public, 因此在使用構造方法后,可以直接設置各個域的值。
 
public class SpoutConfig extends KafkaConfig implements Serializable {
    public List<String> zkServers = null//記錄Spout讀取進度所用的zookeeper的host
    public Integer zkPort = null;//記錄進度用的zookeeper的端口
    public String zkRoot = null;//進度信息記錄於zookeeper的哪個路徑下
    public String id = null;//進度記錄的id,想要一個新的Spout讀取之前的記錄,應把它的id設為跟之前的一樣。
    public long stateUpdateIntervalMs = 2000;//用於metrics,多久更新一次狀態。
 
    public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
        super(hosts, topic);
        this.zkRoot = zkRoot;
        this.id = id;
    }
}
public class KafkaConfig implements Serializable {
 
    public final BrokerHosts hosts; //用以獲取Kafka broker和partition的信息
    public final String topic;//從哪個topic讀取消息
    public final String clientId; // SimpleConsumer所用的client id
 
    public int fetchSizeBytes = 1024 1024//發給Kafka的每個FetchRequest中,用此指定想要的response中總的消息的大小
    public int socketTimeoutMs = 10000;//與Kafka broker的連接的socket超時時間
    public int fetchMaxWait = 10000;   //當服務器沒有新消息時,消費者會等待這些時間
    public int bufferSizeBytes = 1024 1024;//SimpleConsumer所使用的SocketChannel的讀緩沖區大小
    public MultiScheme scheme = new RawMultiScheme();//從Kafka中取出的byte[],該如何反序列化
    public boolean forceFromStart = false;//是否強制從Kafka中offset最小的開始讀起
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//從何時的offset時間開始讀,默認為最舊的offset
    public long maxOffsetBehind = 100000;//KafkaSpout讀取的進度與目標進度相差多少,相差太多,Spout會丟棄中間的消息
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所請求的offset對應的消息在Kafka中不存在,是否使用startOffsetTime
    public int metricsTimeBucketSizeInSecs = 60;//多長時間統計一次metrics
 
    public KafkaConfig(BrokerHosts hosts, String topic) {
        this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
    }
 
    public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
        this.hosts = hosts;
        this.topic = topic;
        this.clientId = clientId;
    }
 
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM