功能1: 今天到現在為止 實戰課程的訪問量
yyyyMMdd courseID
使用數據庫來進行存儲我們的統計結果
Spark Streaming把統計結果寫入到數據庫里面
可視化前端根據: yyyyMMdd courseId 把數據庫里面的統計結果展示出來
選擇什么數據庫作為統計結果的存儲呢?
RDBMS: MySQL、Oracle...
day courseId click_count
20171111 1 10
20171111 2 10
下一個批次數據進來以后:
20171111 + 1 ==> click_count + 下一個批次的統計結果
NoSQL: HBase、Redis...
HBase: 一個API就能搞定,非常方便
20171111 + 1 ==> click_count + 下一個批次的統計結果
本次課程為什么要選擇HBase的一個原因所在
前置要求:
啟動HDFS
啟動ZK
啟動HBase
HBase表設計
創建表
create 'imooc_course_clickcount' , 'info'
Rowkey設計
day_courseId
如何使用Scala來操作HBase數據庫呢?
定義: case clas CourseClickCount.scala
package com.imooc.domain
/**
* 實戰課程點擊數
* @param day_course 對應的就是HBase中的rowkey,20171111_1
* @param click_count 對應的20171111_1的訪問總數
*/
case class CourseClickCount(day_course: String, click_count: Int)
CourseClickCountDAO.scala
package com.imooc.dao
import com.imooc.domain.CourseClickCount
import scala.collection.mutable.ListBuffer
/**
* 實戰課程點擊數數據訪問層
*/
object CourseClickCountDAO {
// 定義HBase的表名,列族,列名
val tableName = "imooc_course_clickcount"
val cf = "info"
val qualifer = "click_count"
/**
* 保存數據到HBase
* @param list CourseClickCount集合
* 要實現sava這個方法,就需要HBase的工具類,暫時寫
*/
def save(list: ListBuffer[CourseClickCount]): Unit = {
}
/**
* 根據rowkey查詢值
* @param day_course
* @return
*/
def count(day_course: String):Long = {
0l
}
}
私有模式(單例模式)已構建完畢
HBaseUtils.scala 1.基本的私有構造方法
package com.imooc.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import java.io.IOException;
/**
* HBase操作工具類
*/
public class HBase2Utils {
HBaseAdmin admin = null;
Configuration configuration = null;
/**
*私有構造方法
*/
private HBase2Utils() {
// 加載HBase的配置文件 zookeeper rootdir
configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum", "Master:2181");
configuration.set("hbase.rootdir", "hdfs://Master:8020/hbase");
try {
admin = new HBaseAdmin(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
// Java單例模式
private static HBase2Utils instance = null;
// synchronized 線程同步, 避免線程安全問題
public static synchronized HBase2Utils getInstance() {
if (instance == null) {
instance = new HBase2Utils();
}
return instance;
}
}
2.自定義的getTable方法及測試
// 獲取表名,獲取后,進行測試
public HTable getTable(String tableName) {
HTable table = null;
try {
table = new HTable(configuration, tableName);
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
// getTable方法的測試
public static void main(String[] args) {
HTable table = HBase2Utils.getInstance().getTable("imooc_course_clickcount");
System.out.println(table.getName().getNameAsString());
}
運行測試代碼,看控制台輸出,沒有輸出,取hbase中查看表內數據 list desc scan table rowkey columnfamily column value
3.自定義的put方法及測試
/**
* 添加一條記錄到HBase表
* @param tableName 表名
* @param rowkey 表的rowkey
* @param cf 表的列族columnfamily
* @param column 表的列
* @param value 表的值
*/
public void put(String tableName, String rowkey, String cf, String column, String value) {
HTable table = getTable(tableName);
Put put = new Put(rowkey.getBytes());
put.add(cf.getBytes(), column.getBytes(), value.getBytes());
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
// put方法的測試
public static void main(String[] args) {
// put中需要傳入的參數: String tableName, String rowkey, String cf, String column, String value
String tableName = "imooc_course_clickcount";
String rowkey = "20171111_88";
String cf = "info";
String column = "click_count"; //訪問量的key
String value = "10"; // 訪問量value
HBase2Utils.getInstance().put(tableName,rowkey,cf,column,value);
}
運行測試代碼,看控制台輸出,沒有輸出,取hbase中查看表內數據 list desc scan table rowkey columnfamily column value scan "imooc_course_clickcount", 看是否有數據添加