Kafka Streams
1.Apache Kafka開源項目的一個組成部分,是一個功能強大,易於使用的庫.用於在Kafka上構建高可分布,可拓展,高容錯的應用程序.
2.Kafka Streams特點
1)功能強大:高擴展性,彈性,容錯
2)輕量級:無需專門的集群,一個庫,而不是框架.
3)完全集成:100%的Kafka 0.10版本兼容;易於集成到現有的程序
4)實時性:毫秒級延遲,並非微批處理,窗口允許亂序數據,允許遲到數據
3.
當前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基於Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對於熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。
既然Apache Spark與Apache Storm擁用如此多的優勢,那為何還需要Kafka Stream呢?主要有如下原因。
第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基於Kafka的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,並且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。
第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復雜。而Kafka Stream作為類庫,可以非常方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。
第三,就流式處理系統而言,基本都支持Kafka作為數據源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統的標准數據源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低。
第四,使用Storm或Spark Streaming時,需要為框架本身的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對於應用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預留內存。但是Kafka作為類庫不占用系統資源。
第五,由於Kafka本身提供數據持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。
第六,由於Kafka Consumer Rebalance機制,Kafka Stream可以在線動態調整並行度。
案例:數據清洗
需求描述:
實時處理單詞帶有”>>>”前綴的內容。例如輸入”lxz>>>lexue”,最終處理成“lexue”;
0) pom文件導入
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <!-- Winodws下提交至Yarn上運行,改客戶端是2.6.1s <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.1</version> </dependency> --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.mrunit/mrunit MRUnit測試 --> <dependency> <groupId>org.apache.mrunit</groupId> <artifactId>mrunit</artifactId> <version>0.9.0-incubating</version> <classifier>hadoop2</classifier> <scope>test</scope> </dependency> <dependency> <groupId>com.sun</groupId> <artifactId>tools</artifactId> <version>1.8.0</version> <scope>system</scope> <systemPath>${env.JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.3.2</version> <type>pom</type> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.3.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> <type>pom</type> </dependency> <!-- https://mvnrepository.com/artifact/org.glassfish.jersey.core/jersey-client --> <dependency> <groupId>org.glassfish.jersey.core</groupId> <artifactId>jersey-client</artifactId> <version>2.26</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version> </dependency> </dependencies>
1) 創建主程序
package com.lxz.kafka; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; import java.util.Properties; public class Application { public static void main(String[] args) { //1.定義輸入的topic String from = "first"; //定義輸出的topic String to = "second"; //設置參數 Properties settings = new Properties(); settings.put(StreamsConfig.APPLICATION_ID_CONFIG,"logFilter"); settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092"); StreamsConfig config = new StreamsConfig(settings); //擴建拓撲 TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE",from) .addProcessor("PROCESS",new ProcessorSupplier<byte[],byte[]>(){ public Processor<byte[],byte[]> get(){ //具體分析 // return new LogProcessor(); return new LogProcessor(); } },"SOURCE") .addSink("SINK",to,"PROCESS"); //創建Kafka stream KafkaStreams kafkaStreams = new KafkaStreams(builder, config); kafkaStreams.start(); } }
2) 具體業務處理
package com.lxz.kafka; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; public class LogProcessor implements Processor<byte[],byte[]> { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(byte[] key, byte[] value) { String input = new String(value); //如果包含“>>>"則只保留該標記后面的內容 if (input.contains(">>>")){ input = input.split(">>>")[1].trim(); //輸出到下一個topic context.forward("logProcessor".getBytes(),input.getBytes()); }else { context.forward("logProcessor".getBytes(),input.getBytes()); } } @Override public void punctuate(long l) { } @Override public void close() { } }
3) 報錯信息處理
如果遇到log4j提示報警,則是因為缺少了log4j的配置文件,在resources中創建log4j.properties並寫入
log4j.rootLogger=INFO,console,dailyFile # TODO 發布到阿里雲記得添加,另外控制台不輸出(只輸出warn或者error信息) # log4j.logger.org.mybatis = INFO log4j.logger.com.imooc.mapper=INFO log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.encoding=UTF-8 log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n # 定期滾動日志文件,每天都會生成日志 log4j.appender.dailyFile=org.apache.log4j.DailyRollingFileAppender log4j.appender.dailyFile.encoding=UTF-8 log4j.appender.dailyFile.Threshold=INFO # TODO 本地日志地址,正式環境請務必切換為阿里雲地址 log4j.appender.dailyFile.File=C:/logs/maven-ssm-alipay/log.log4j log4j.appender.dailyFile.DatePattern='.'yyyy-MM-dd log4j.appender.dailyFile.layout=org.apache.log4j.PatternLayout log4j.appender.dailyFile.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n
4) 測試
1.運行Idea主程序
2.啟動hadoop1 2 3,三台服務器,zk,kafka集群
3.在hadoop2上啟動生產者: bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic first
4.在hadoop3上啟動消費者: bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --from-beginning --topic second
5.在hadoop2上生產者下輸入:lxz>>>lexue
6.查看hadoop3上的消費者是否成功消費到了:lexue
生產者端:
消費者端: