本項目是為網站日志流量分析做的基礎:網站日志流量分析系統,Kafka、HBase集群的搭建可參考:使用Docker搭建Spark集群(用於實現網站流量實時分析模塊),里面有關於該搭建過程
本次對接Kafka及HBase是基於使用Docker搭建Spark集群(用於實現網站流量實時分析模塊)搭建的6個Docker容器來實現的對接。
代碼地址:https://github.com/Simple-Coder/sparkstreaming-demo
一、SparkStreaming整合Kafka
1、maven代碼
2、啟動測試
2.1啟動3個kafka並測試
生產端消息如下:
接收到的消息如下:
2.2spark提交jar任務,SparkStreaming消費Kafka消息
console-producer生產消息,spartkStreaming主動拉取消息,console-consumer也能收到消息,結果如下:
console-consumer接收到的消息如下:
至此、Spark對接Kafka完成
3、問題總結
KafkaUtils的選擇,由於maven中央庫只有1.6.3版本的spark-streaming-kafka,其他版本的spark-streaming-kafka-***的api調用個人不習慣,還是停留在之前的api,所以這可能是導致以下問題的所在,不過還好問題解決
①ClassNotFoundException: org.apache.kafka.common.utils.Utils:上傳kafka-clients-0.8.2.0.jar至spark的jars目錄
②java.lang.NoClassDefFoundError:org/apache/spark/streaming/kafka/KafkaUtils :上傳kafka_2.11-0.8.2.1.jar、spark-streaming-kafka_2.11-1.6.3.jar至Spark的jars目錄
③java.lang.NoClassDefFoundError:org/apache/spark/logging:開頭maven結構圖中,將工程jar上傳至spark的jars目錄
④NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer:上傳zkclient-0.11.jar至spark的jars目錄
二、SparkStreaming整合HBase
1、讀取HBase表
1.1 scala代碼
1.2 提交jar任務測試
由於是docker容器搭建的集群,本地不容易測試,只好提交jar至docker容器
提交任務截圖如下:
HBase表t2數據如下:
至此scala讀取HBase數據成功,期間還是同樣的問題,缺少關於HBase的jar,將maven依賴的HBase全部上傳至Spark的jars目錄下即可。
2、寫入HBase表
2.1 scala代碼
2.2 提交jar任務至spark測試
提交jar任務如下:
HBase命令行查看如下:
至此、spark寫入hbase成功。
3、過濾器
隨即的返回row的數據,chance取值為0到1.0,如果<0則為空,如果>1則包含所有的行。
3.1 scala代碼
3.2 提交jar至spark測試
第一次如下:
第二次如下:
至此、spark對接kafka及HBase完成,歡迎各位讀者指正、交流~