Spark對接Kafka、HBase


  本項目是為網站日志流量分析做的基礎:網站日志流量分析系統,Kafka、HBase集群的搭建可參考:使用Docker搭建Spark集群(用於實現網站流量實時分析模塊),里面有關於該搭建過程

  本次對接Kafka及HBase是基於使用Docker搭建Spark集群(用於實現網站流量實時分析模塊)搭建的6個Docker容器來實現的對接。

  代碼地址:https://github.com/Simple-Coder/sparkstreaming-demo

一、SparkStreaming整合Kafka

1、maven代碼

2、啟動測試

2.1啟動3個kafka並測試

生產端消息如下:

 接收到的消息如下:

 2.2spark提交jar任務,SparkStreaming消費Kafka消息

  console-producer生產消息,spartkStreaming主動拉取消息,console-consumer也能收到消息,結果如下:  

console-consumer接收到的消息如下:

至此、Spark對接Kafka完成

3、問題總結

  KafkaUtils的選擇,由於maven中央庫只有1.6.3版本的spark-streaming-kafka,其他版本的spark-streaming-kafka-***的api調用個人不習慣,還是停留在之前的api,所以這可能是導致以下問題的所在,不過還好問題解決

①ClassNotFoundException: org.apache.kafka.common.utils.Utils:上傳kafka-clients-0.8.2.0.jar至spark的jars目錄

②java.lang.NoClassDefFoundError:org/apache/spark/streaming/kafka/KafkaUtils :上傳kafka_2.11-0.8.2.1.jar、spark-streaming-kafka_2.11-1.6.3.jar至Spark的jars目錄

③java.lang.NoClassDefFoundError:org/apache/spark/logging:開頭maven結構圖中,將工程jar上傳至spark的jars目錄

④NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer:上傳zkclient-0.11.jar至spark的jars目錄

二、SparkStreaming整合HBase

1、讀取HBase表

1.1 scala代碼

1.2 提交jar任務測試

  由於是docker容器搭建的集群,本地不容易測試,只好提交jar至docker容器

提交任務截圖如下:

 HBase表t2數據如下:

   至此scala讀取HBase數據成功,期間還是同樣的問題,缺少關於HBase的jar,將maven依賴的HBase全部上傳至Spark的jars目錄下即可。

2、寫入HBase表

2.1 scala代碼

 

 2.2 提交jar任務至spark測試

提交jar任務如下

 HBase命令行查看如下:

   至此、spark寫入hbase成功。

3、過濾器

  隨即的返回row的數據,chance取值為0到1.0,如果<0則為空,如果>1則包含所有的行。

3.1 scala代碼

3.2 提交jar至spark測試

第一次如下:

 第二次如下:

   至此、spark對接kafka及HBase完成,歡迎各位讀者指正、交流~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM