storm-kafka的使用


storm-kafka的使用

1.storm-kafka介紹

storm-kafka是storm自帶的從kafka上獲取消息的kafka客戶端程序。
提供kafka和Trident的spout實現從kafka消費數據。

2.storm-kafka的使用實例

maven的依賴配置文件,要注意strom-kafka是使用的kafka的低級api,因此也要引用kafka的包。如果不引,雖然編譯不報錯,但運行時會報錯,我在初次使用時就是因為這個原因一直有問題。

		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>0.9.5</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka</artifactId>
			<version>0.9.5</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.8.1.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

下面是我寫的一個demo
具體步驟如下
1.new BrokerHosts
需要的參數zookeeper的地址
2.new SpoutConfig
構建SpoutConfig,需要設置BrokerHosts,kafka的topic,strom在zookeeper上的根等相關的參數。
3.new TopologyBuilder
給TopologyBuilder設置Soupt和Boult用於構建一個Topology
4.配置Config並設置參數,啟動LocalCluster,提交topology任務。

import java.util.Arrays;
import java.util.Map;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

/**
 * @Description:strom-kafka 使用
 * @author:difeng
 * @time:2015年11月18日 上午10:18:31
 */
public class StormKafkaConsumer {
	
	public static class PingCounter extends BaseRichBolt{
		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;
		private OutputCollector collector;
		@Override
		public void execute(Tuple input) {
			String msg = input.getString(0);
			System.out.println("---------------------" + msg + "-----------------");
			collector.ack(input);
		}

		@Override
		public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
			this.collector = arg2;
            System.out.println("++++++++++++++++++++prepare++++++++++++++++++++++++++++++++++");
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer arg0) {
			// TODO Auto-generated method stub
			 System.out.println("++++++++++++++++++++declareOutputFields+++++++++++++++++++++");
		}

	}

	public static void main(String[] args) {
	    //zookeeper的服務器地址
		String zks = "192.168.1.50:2181,192.168.1.57:2181,192.168.1.58:2181";
	    //消息的topic
		String topic = "data_icmp_ping";
		//strom在zookeeper上的根
		String zkRoot = "/storm"; 
		String id = "data_icmp_ping";
		BrokerHosts brokerHosts = new ZkHosts(zks);
		SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
		spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
		spoutConf.forceFromStart = true;
		spoutConf.zkServers = Arrays.asList(new String[] {"192.168.1.50,192.168.1.57,192.168.1.58"});
		spoutConf.zkPort = 2181;
		
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),3); 
		builder.setBolt("ping-counter", new PingCounter(),3).shuffleGrouping("kafka-reader");
		Config conf = new Config();
        conf.setDebug(true);
        //設置任務線程數
        conf.setMaxTaskParallelism(1);
        
        LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("test", conf, builder.createTopology());
		try {
			Thread.sleep(60000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		cluster.shutdown();
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM