通過生成HFile導入HBase


要實現DataFrame通過HFile導入HBase有兩個關鍵步驟

第一個是要生成Hfile
第二個是HFile導入HBase

測試DataFrame數據來自mysql,如果對讀取mysql作為DataFrame不熟悉的人可以參考 Spark:讀取mysql數據作為DataFrame
當然也可以自己決定DataFrame的數據來源,此處以Mysql為例

1.mysql的信息

mysql的信息我保存在了外部的配置文件,這樣方便后續的配置添加。

復制代碼
1 //配置文件示例:
2 [hdfs@iptve2e03 tmp_lillcol]$ cat job.properties 
3 #mysql數據庫配置
4 mysql.driver=com.mysql.jdbc.Driver
5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
6 mysql.username=user
7 mysql.password=123456
復制代碼

2.需要的jar依賴

sbt版本,maven的對應修改即可

復制代碼
 1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
 2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
 3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
 4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
 5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
 6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
 7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
 8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
 9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0" 
復制代碼

3. 完整代碼

復制代碼
  1 import java.io.FileInputStream
  2 import java.util.Properties
  3 
  4 import org.apache.hadoop.conf.Configuration
  5 import org.apache.hadoop.fs.{FileSystem, Path}
  6 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
  7 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  8 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
  9 import org.apache.hadoop.hbase.util.Bytes
 10 import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue}
 11 import org.apache.hadoop.mapreduce.Job
 12 import org.apache.spark.rdd.RDD
 13 import org.apache.spark.sql.functions.{concat, lit}
 14 import org.apache.spark.sql.hive.HiveContext
 15 import org.apache.spark.sql.{DataFrame, SQLContext}
 16 import org.apache.spark.{SparkConf, SparkContext}
 17 
 18 /**
 19   * @author 利伊奧克兒-lillcol
 20   *         2018/10/14-11:08
 21   *
 22   */
 23 object TestHFile {
 24   var hdfsPath: String = ""
 25   var proPath: String = ""
 26   var DATE: String = ""
 27 
 28   val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
 29   val sc: SparkContext = new SparkContext(sparkConf)
 30   val sqlContext: SQLContext = new HiveContext(sc)
 31 
 32   import sqlContext.implicits._
 33 
 34   def main(args: Array[String]): Unit = {
 35     hdfsPath = args(0)
 36     proPath = args(1)
 37 
 38     //HFile保存路徑
 39     val save_path: String = hdfsPath + "TableTestHFile"
 40     //獲取測試DataFrame
 41     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)
 42 
 43     val resultDataFrame: DataFrame = dim_sys_city_dict
 44       .select(concat($"city_id", lit("_"), $"city_name", lit("_"), $"city_code").as("key"), $"*")
 45     //注:resultDataFrame 里面的 key 要放在第一位,因為后面需要對字段名排序
 46     saveASHfFile(resultDataFrame, "cf_info", save_path)
 47   }
 48 
 49   /**
 50     * 將DataFrame 保存為 HFile
 51     *
 52     * @param resultDataFrame 需要保存為HFile的 DataFrame,DataFrame的第一個字段必須為"key"
 53     * @param clounmFamily    列族名稱(必須在Hbase中存在,否則在load數據的時候會失敗)
 54     * @param save_path       HFile的保存路徑
 55     */
 56   def saveASHfFile(resultDataFrame: DataFrame, clounmFamily: String, save_path: String): Unit = {
 57     val conf: Configuration = HBaseConfiguration.create()
 58     lazy val job = Job.getInstance(conf)
 59     job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) //設置MapOutput Key Value 的數據類型
 60     job.setMapOutputValueClass(classOf[KeyValue])
 61 
 62     var columnsName: Array[String] = resultDataFrame.columns //獲取列名 第一個為key
 63     columnsName = columnsName.drop(1).sorted //把key去掉  因為要排序
 64 
 65     val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame
 66       .map(row => {
 67         var kvlist: Seq[KeyValue] = List()
 68         var rowkey: Array[Byte] = null
 69         var cn: Array[Byte] = null
 70         var v: Array[Byte] = null
 71         var kv: KeyValue = null
 72         val cf: Array[Byte] = clounmFamily.getBytes //列族
 73         rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
 74         for (i <- 1 to (columnsName.length - 1)) {
 75           cn = columnsName(i).getBytes() //列的名稱
 76           v = Bytes.toBytes(row.getAs[String](columnsName(i))) //列的值
 77           //將rdd轉換成HFile需要的格式,我們上面定義了Hfile的key是ImmutableBytesWritable,那么我們定義的RDD也是要以ImmutableBytesWritable的實例為key
 78           kv = new KeyValue(rowkey, cf, cn, v) //封裝一下 rowkey, cf, clounmVale, value
 79           //
 80           kvlist = kvlist :+ kv //將新的kv加在kvlist后面(不能反 需要整體有序)
 81         }
 82         (new ImmutableBytesWritable(rowkey), kvlist)
 83       })
 84 
 85     //RDD[(ImmutableBytesWritable, Seq[KeyValue])] 轉換成 RDD[(ImmutableBytesWritable, KeyValue)]
 86     val result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => {
 87       s.iterator
 88     })
 89 
 90     delete_hdfspath(save_path) //刪除save_path 原來的數據
 91     //保存數據
 92     result
 93       .sortBy(x => x._1, true) //要保持 整體有序
 94       .saveAsNewAPIHadoopFile(save_path,
 95       classOf[ImmutableBytesWritable],
 96       classOf[KeyValue],
 97       classOf[HFileOutputFormat2],
 98       job.getConfiguration)
 99 
100   }
101 
102   /**
103     * 刪除hdfs下的文件
104     *
105     * @param url 需要刪除的路徑
106     */
107   def delete_hdfspath(url: String) {
108     val hdfs: FileSystem = FileSystem.get(new Configuration)
109     val path: Path = new Path(url)
110     if (hdfs.exists(path)) {
111       val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
112       hdfs.delete(path, true)
113     }
114   }
115 
116   /**
117     * 獲取 Mysql 表的數據
118     *
119     * @param sqlContext
120     * @param tableName 讀取Mysql表的名字
121     * @param proPath   配置文件的路徑
122     * @return 返回 Mysql 表的 DataFrame
123     */
124   def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
125     val properties: Properties = getProPerties(proPath)
126     sqlContext
127       .read
128       .format("jdbc")
129       .option("url", properties.getProperty("mysql.url"))
130       .option("driver", properties.getProperty("mysql.driver"))
131       .option("user", properties.getProperty("mysql.username"))
132       .option("password", properties.getProperty("mysql.password"))
133       //        .option("dbtable", tableName.toUpperCase)
134       .option("dbtable", tableName)
135       .load()
136 
137   }
138 
139   /**
140     * 獲取 Mysql 表的數據 添加過濾條件
141     *
142     * @param sqlContext
143     * @param table           讀取Mysql表的名字
144     * @param filterCondition 過濾條件
145     * @param proPath         配置文件的路徑
146     * @return 返回 Mysql 表的 DataFrame
147     */
148   def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String): DataFrame = {
149     val properties: Properties = getProPerties(proPath)
150     var tableName = ""
151     tableName = "(select * from " + table + " where " + filterCondition + " ) as t1"
152     sqlContext
153       .read
154       .format("jdbc")
155       .option("url", properties.getProperty("mysql.url"))
156       .option("driver", properties.getProperty("mysql.driver"))
157       .option("user", properties.getProperty("mysql.username"))
158       .option("password", properties.getProperty("mysql.password"))
159       .option("dbtable", tableName)
160       .load()
161   }
162 
163   /**
164     * 獲取配置文件
165     *
166     * @param proPath
167     * @return
168     */
169   def getProPerties(proPath: String): Properties = {
170     val properties: Properties = new Properties()
171     properties.load(new FileInputStream(proPath))
172     properties
173   }
174 }
復制代碼

4. 測試代碼

復制代碼
 1 def main(args: Array[String]): Unit = {
 2     hdfsPath = args(0)
 3     proPath = args(1)
 4 
 5     //HFile保存路徑
 6     val save_path: String = hdfsPath + "TableTestHFile"
 7     //獲取測試DataFrame
 8     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)
 9 
10     val resultDataFrame: DataFrame = dim_sys_city_dict
11       .select(concat($"city_id", lit("_"), $"city_name", lit("_"), $"city_code").as("key"), $"*")
12     //注:resultDataFrame 里面的 key 要放在第一位,因為后面需要對字段名排序
13     saveASHfFile(resultDataFrame, "cf_info", save_path)
14   }
復制代碼

 5. 執行命令

復制代碼
 1 nohup spark-submit --master yarn \
 2 --driver-memory 4G \
 3 --num-executors 2 \
 4 --executor-cores 4 \
 5 --executor-memory 8G \
 6 --class com.iptv.job.basedata.TestHFile \
 7 --jars /var/lib/hadoop-hdfs/tmp_lillcol/mysql-connector-java-5.1.38.jar \
 8 tygq.jar \
 9 hdfs://ns1/user/hive/warehouse/ \
10 /var/lib/hadoop-hdfs/tmp_lillcol/job.properties > ./TestHFile.log 2>&1 &
復制代碼

6.執行結果

1 [hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/TableTestHFile
2 0       0       hdfs://ns1/user/hive/warehouse/TableTestHFile/_SUCCESS
3 12.3 K  24.5 K  hdfs://ns1/user/hive/warehouse/TableTestHFile/cf_info

 7. HFile load 進 Hbase

復制代碼
1 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/TableTestHFile iptv:spark_test
2 
3 .....
4 18/10/17 10:14:20 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/TableTestHFile/cf_info/fdc37dc6811140dfa852ac71b00b33aa first=200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ last=769_\xE4\xB8\x9C\xE8\x8E\x9E_GD_DG
5 18/10/17 10:14:20 INFO client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
6 18/10/17 10:14:20 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x16604bba6872fff
7 18/10/17 10:14:20 INFO zookeeper.ClientCnxn: EventThread shut down
8 18/10/17 10:14:20 INFO zookeeper.ZooKeeper: Session: 0x16604bba6872fff closed
復制代碼

 

8.查看HBase中的數據

復制代碼
 1 hbase(main):005:0> scan 'iptv:spark_test',{LIMIT=>2}
 2 ROW                                                          COLUMN+CELL                                                                                                                                                                     
 3  200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ                          column=cf_info:bureau_id, timestamp=1539742949840, value=BF55                                                                                    
 4  200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ                          column=cf_info:bureau_name, timestamp=1539742949840, value=\x85\xAC\xE5                              
 5  200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ                          column=cf_info:city_code, timestamp=1539742949840, value=112                                                                                                                  
 6  200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ                          column=cf_info:city_id, timestamp=1539742949840, value=112                                                                                                                      
 7  200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ                          column=cf_info:city_name, timestamp=1539742949840, value=\xB7\x9E                                                                                               
 8  200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ                          column=cf_info:dict_id, timestamp=1539742949840, value=112                                                                                                                       
 9  200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ                          column=cf_info:group_id, timestamp=1539742949840, value=112                                                                                                                     
10  200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ                          column=cf_info:group_name, timestamp=1539742949840, value=\x8C\xBA                                                                      
11  200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ                          column=cf_info:sort, timestamp=1539742949840, value=112                                                                                                                           
12  660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW                          column=cf_info:bureau_id, timestamp=1539742949840, value=6AA0EF0B                                                                                       
13  660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW                          column=cf_info:bureau_name, timestamp=1539742949840, value=xE5\x8F\xB8                                 
14  660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW                          column=cf_info:city_code, timestamp=1539742949840, value=112                                                                                                                  
15  660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW                          column=cf_info:city_id, timestamp=1539742949840, value=112                                                                                                                      
16  660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW                          column=cf_info:city_name, timestamp=1539742949840, value=\xBE                                                                                               
17  660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW                          column=cf_info:dict_id, timestamp=1539742949840, value=112                                                                                                                       
18  660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW                          column=cf_info:group_id, timestamp=1539742949840, value=112                                                                                                                     
19  660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW                          column=cf_info:group_name, timestamp=1539742949840, value=\x8C\xBA                                                                      
20  660_\xE6\xB1\x95\xE5\xB0\xBE_GD_SW                          column=cf_info:sort, timestamp=1539742949840, value=112  
復制代碼

 9.總結

多列族,多列處理

通過算法將原本只能單個一個列族一個列處理的數據擴展到了多列族,多列處理。
實現的關鍵是下面的兩段代碼

復制代碼
 1 var columnsName: Array[String] = resultDataFrame.columns //獲取列名 第一個為key
 2     columnsName = columnsName.drop(1).sorted //把key去掉  因為要排序
 3 
 4     val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame
 5       .map(row => {
 6         var kvlist: Seq[KeyValue] = List()
 7         var rowkey: Array[Byte] = null
 8         var cn: Array[Byte] = null
 9         var v: Array[Byte] = null
10         var kv: KeyValue = null
11         val cf: Array[Byte] = clounmFamily.getBytes //列族
12         rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
13         for (i <- 1 to (columnsName.length - 1)) {
14           cn = columnsName(i).getBytes() //列的名稱
15           v = Bytes.toBytes(row.getAs[String](columnsName(i))) //列的值
16           //將rdd轉換成HFile需要的格式,我們上面定義了Hfile的key是ImmutableBytesWritable,那么我們定義的RDD也是要以ImmutableBytesWritable的實例為key
17           kv = new KeyValue(rowkey, cf, cn, v) //封裝一下 rowkey, cf, clounmVale, value
18           //
19           kvlist = kvlist :+ kv //將新的kv加在kvlist后面(不能反 需要整體有序)
20         }
21         (new ImmutableBytesWritable(rowkey), kvlist)
22       })
23 
24     //RDD[(ImmutableBytesWritable, Seq[KeyValue])] 轉換成 RDD[(ImmutableBytesWritable, KeyValue)]
25     val result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => {
26       s.iterator
27     })
復制代碼

 

DataFrame的優勢就是它算是一個結構化數據,我們很容易對里面的每一個字段進行處理

  • 通過resultDataFrame.columns獲取所有列名,通過drop(1)刪掉“key”,(序號從1開始)
  • 通過sorted 對列名進行排序,默認就是升序的,如果不排序會報錯,具體錯誤后面展示
  •  然后通過map取出每一行一行數據,再通過for對每一個字段處理,每處理一個字段相關信息加入List,得到 RDD[(ImmutableBytesWritable, Seq[KeyValue])]
  • 通過flatMapValues將RDD[(ImmutableBytesWritable, Seq[KeyValue])] 轉換成 RDD[(ImmutableBytesWritable, KeyValue)]

通過上述處理,我們將得到RDD[(ImmutableBytesWritable, KeyValue)]類型的數據,就可以直接使用saveAsNewAPIHadoopFile這個方法了

排序

此處有兩個地方進行了排序

  • rowkey

這個就不用說了,這個必須要整體有序,實現代碼

復制代碼
1 //保存數據
2     result
3       .sortBy(x => x._1, true) //要保持 整體有序
4       .saveAsNewAPIHadoopFile(save_path,
5       classOf[ImmutableBytesWritable],
6       classOf[KeyValue],
7       classOf[HFileOutputFormat2],
8       job.getConfiguration)
復制代碼
  • 列名
1 //列名也要保持整體有序,實現代碼
2 var columnsName: Array[String] = resultDataFrame.columns //獲取列名 第一個為key;
3     columnsName = columnsName.drop(1).sorted //把key去掉  因為要排序

 

如果不排序 會出現下面的錯誤

1 18/10/15 14:19:32 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 2.0 (TID 3, iptve2e03): java.io.IOException: Added a key not lexically larger than previous. 
2 Current cell = 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ/cf_info:area_code/1539584366048/Put/vlen=5/seqid=0, 
3     lastCell = 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ/cf_info:dict_id/1539584366048/Put/vlen=2/seqid=0

上面的意思是當前列名cf_info:area_code比前一個列名cf_info:dict_id小,這就是為什么需要對列名排序的原因,同時還要把key刪除掉,因為不刪除會出現cf_info:key這個列,這顯然是不如何要求的。
而把key放在第一位也是為了在這個步驟中刪除掉key,否則一經排序就很難輕松的刪除掉key了

保存路徑

保存的路徑不能存在,那就刪除唄

復制代碼
 1 /**
 2     * 刪除hdfs下的文件
 3     *
 4     * @param url 需要刪除的路徑
 5     */
 6   def delete_hdfspath(url: String) {
 7     val hdfs: FileSystem = FileSystem.get(new Configuration)
 8     val path: Path = new Path(url)
 9     if (hdfs.exists(path)) {
10       val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
11       hdfs.delete(path, true)
12     }
13   }
復制代碼

 

列族名稱

列族需要在Hbase中存在,列可以不存在

對比總結

Hive-Hbase

  • 優點:

關聯Hive,容易對數據進行二次加工

操作相對簡單,要求沒那么高

可以輕易處理多列族多列問題

  • 缺點:

建立一張臨時表,消耗空間增加一倍左右

load數據的時候很快,但是insert into的時候耗費時間與數據量相關

 

HFile

  • 優點:

Load數據很快

從頭到尾產生的文件只有一個HFile,必兩一種方式節省空間

  • 缺點:

數據很難二次加工,查詢如果沒有工具很不友好

 對開發有一定的要求

轉載:https://www.cnblogs.com/lillcol/p/9797061.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM