來源於 https://blog.csdn.net/u013850277/article/details/81040686
筆者環境:hdp2.5.3 + centos6.9 + phoenix4.7
官網文檔:
Phoenix provides two methods for bulk loading data into Phoenix tables:
Single-threaded client loading tool for CSV formatted data via the psql command
MapReduce-based bulk load tool for CSV and JSON formatted data
The psql tool is typically appropriate for tens of megabytes, while the MapReduce-based loader is typically better for larger load volumes.
上述大意為:phoenix有兩種方式供批量寫數據。一種是單線程psql方式,另一種是mr分布式。單線程適合一次寫入十來兆的文件,mr方式更加適合寫大批量數據。
下面分別用兩種方式進行測試批量寫數據
准備階段
1、 創建phoenix表(對應的hbase表並不存在)
CREATE TABLE example (
my_pk bigint not null,
m.first_name varchar(50),
m.last_name varchar(50)
CONSTRAINT pk PRIMARY KEY (my_pk));
2、創建二級索引
create index example_first_name_index on example(m.first_name);
3、創建data.csv文件,並上傳一份至hdfs中
12345,John,Doe
67890,Mary,Poppins
4、修改內容為如下再上傳至hdfs
12345,Joddhn,Dois
67890,Maryddd,Poppssssins
123452,Joddhn,Dois
678902,Maryddd,Poppssssins2
批量寫入數據
單線程psql方式如下:
[root@hdp18 Templates]#
/usr/hdp/2.5.3.0-37/phoenix/bin/psql.py -t EXAMPLE hdp14:2181 /root/Templates/data.csv
注:
(1)/root/Templates/data.csv為本地文件
(2) hdp14:2181為zookeeper對應的主機以及端口
(3) 上述語句還支持不少參數,如-t為表名,-d為文件內容分割符,默認為英文符的逗號。
驗證數據是否寫入正常以及索引表是否有進行同步更新
0: jdbc:phoenix:hdp14,hdp15> select * from example;
+--------+-------------+------------+
| MY_PK | FIRST_NAME | LAST_NAME |
+--------+-------------+------------+
| 12345 | John | Doe |
| 67890 | Mary | Poppins |
+--------+-------------+------------+
2 rows selected (0.023 seconds)
0: jdbc:phoenix:hdp14,hdp15> select * from example_first_name_index;
+---------------+---------+
| M:FIRST_NAME | :MY_PK |
+---------------+---------+
| John | 12345 |
| Mary | 67890 |
+---------------+---------+
2 rows selected (0.018 seconds)
通過上述結果可知批量導入數據正常以及批量導入數據是會自動更新索引表的。
mr批量寫數據方式
[root@hdp14 ~]# HADOOP_CLASSPATH=/usr/hdp/2.5.3.0-37/hbase/lib/hbase-protocol.jar:/usr/hdp/2.5.3.0-37/hbase/conf/ hadoop jar /usr/hdp/2.5.3.0-37/phoenix/phoenix-4.7.0.2.5.3.0-37-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool --table EXAMPLE --input /tmp/YCB/data.csv
1
注:
1、官網指出如果為phoenix4.0及以上要用如下方式
(HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar phoenix-<version>-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool --table EXAMPLE --input /data/example.csv)
1
2、/tmp/YCB/data.csv為hdfs上對應的文件路徑
3、該命令可隨意挑集群中一台機器,當然也可以通過指定具體機器執行,如添加-z 機器:端口便可
HADOOP_CLASSPATH=/usr/hdp/2.5.3.0-37/hbase/lib/hbase-protocol.jar:/usr/hdp/2.5.3.0-37/hbase/conf/ hadoop jar /usr/hdp/2.5.3.0-37/phoenix/phoenix-4.7.0.2.5.3.0-37-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool -z hdp15:2181 --table EXAMPLE --input /tmp/YCB/data.csv
1
驗證結果:
0: jdbc:phoenix:hdp14,hdp15> SELECT * FROM example1_first_name_index;
+---------------+---------+
| M:FIRST_NAME | :MY_PK |
+---------------+---------+
| Joddhn | 12345 |
| Joddhn | 123452 |
| Maryddd | 67890 |
| Maryddd | 678902 |
+---------------+---------+
4 rows selected (0.042 seconds)
0: jdbc:phoenix:hdp14,hdp15> SELECT * FROM example1;
+---------+-------------+---------------+
| MY_PK | FIRST_NAME | LAST_NAME |
+---------+-------------+---------------+
| 12345 | Joddhn | Dois |
| 67890 | Maryddd | Poppssssins |
| 123452 | Joddhn | Dois |
| 678902 | Maryddd | Poppssssins2 |
+---------+-------------+---------------+
測試批量導入的速度
1、創建t_person表
CREATE TABLE T_PERSON(
my_pk varchar(100) not null,
m.first_name varchar(50),
m.last_time varchar(50)
CONSTRAINT pk PRIMARY KEY (my_pk));
造數據:數據量2082518,其中有一部分my_pk是重復的,去除重復的一共1900000數據。
執行mr文件批量導入數據,結果如下:
[root@hdp18 ~]# HADOOP_CLASSPATH=/usr/hdp/2.5.3.0-37/hbase/lib/hbase-protocol.jar:/usr/hdp/2.5.3.0-37/hbase/conf/ hadoop jar /usr/hdp/2.5.3.0-37/phoenix/phoenix-4.7.0.2.5.3.0-37-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool -z hdp15,hdp16,hdp14:2181 --table T_PERSON --input /tmp/YCB/T_PERSON.csv
18/07/13 15:57:45 INFO mapreduce.AbstractBulkLoadTool: Configuring HBase connection to hdp15,hdp16,hdp14:2181
18/07/13 15:57:45 INFO util.QueryUtil: Creating connection with the jdbc url: jdbc:phoenix:hdp15,hdp16,hdp14:2181:/hbase-secure;
......
18/07/13 15:57:51 INFO input.FileInputFormat: Total input paths to process : 1
18/07/13 15:57:51 INFO mapreduce.JobSubmitter: number of splits:1
18/07/13 15:57:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1528269660320_0870
18/07/13 15:57:51 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: 10.194.67.4:8020, Ident: (HDFS_DELEGATION_TOKEN token 32437 for hbase)
18/07/13 15:57:51 INFO mapreduce.JobSubmitter: Kind: HBASE_AUTH_TOKEN, Service: a0221273-8655-48fa-9113-2165f8c94518, Ident: (org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@35e)
18/07/13 15:57:52 INFO impl.YarnClientImpl: Submitted application application_1528269660320_0870
18/07/13 15:57:52 INFO mapreduce.Job: The url to track the job: http://hdp15.gzbigdata.org.cn:8088/proxy/application_1528269660320_0870/
18/07/13 15:57:52 INFO mapreduce.Job: Running job: job_1528269660320_0870
18/07/13 15:58:00 INFO mapreduce.Job: Job job_1528269660320_0870 running in uber mode : false
18/07/13 15:58:00 INFO mapreduce.Job: map 0% reduce 0%
18/07/13 15:58:14 INFO mapreduce.Job: map 7% reduce 0%
18/07/13 15:58:17 INFO mapreduce.Job: map 12% reduce 0%
18/07/13 15:58:20 INFO mapreduce.Job: map 16% reduce 0%
18/07/13 15:58:23 INFO mapreduce.Job: map 21% reduce 0%
18/07/13 15:58:26 INFO mapreduce.Job: map 26% reduce 0%
18/07/13 15:58:29 INFO mapreduce.Job: map 30% reduce 0%
18/07/13 15:58:32 INFO mapreduce.Job: map 35% reduce 0%
18/07/13 15:58:35 INFO mapreduce.Job: map 40% reduce 0%
18/07/13 15:58:38 INFO mapreduce.Job: map 44% reduce 0%
18/07/13 15:58:41 INFO mapreduce.Job: map 49% reduce 0%
18/07/13 15:58:44 INFO mapreduce.Job: map 54% reduce 0%
18/07/13 15:58:47 INFO mapreduce.Job: map 59% reduce 0%
18/07/13 15:58:50 INFO mapreduce.Job: map 63% reduce 0%
18/07/13 15:58:53 INFO mapreduce.Job: map 67% reduce 0%
18/07/13 15:58:58 INFO mapreduce.Job: map 100% reduce 0%
18/07/13 15:59:09 INFO mapreduce.Job: map 100% reduce 72%
18/07/13 15:59:12 INFO mapreduce.Job: map 100% reduce 81%
18/07/13 15:59:16 INFO mapreduce.Job: map 100% reduce 90%
18/07/13 15:59:18 INFO mapreduce.Job: map 100% reduce 100%
18/07/13 15:59:19 INFO mapreduce.Job: Job job_1528269660320_0870 completed successfully
18/07/13 15:59:19 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=189674691
FILE: Number of bytes written=379719399
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=131364304
HDFS: Number of bytes written=181692515
HDFS: Number of read operations=8
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=55939
Total time spent by all reduces in occupied slots (ms)=34548
Total time spent by all map tasks (ms)=55939
Total time spent by all reduce tasks (ms)=17274
Total vcore-milliseconds taken by all map tasks=55939
Total vcore-milliseconds taken by all reduce tasks=17274
Total megabyte-milliseconds taken by all map tasks=315048448
Total megabyte-milliseconds taken by all reduce tasks=194574336
Map-Reduce Framework
Map input records=2082518
Map output records=2082518
Map output bytes=185509649
Map output materialized bytes=189674691
Input split bytes=120
Combine input records=0
Combine output records=0
Reduce input groups=1900000
Reduce shuffle bytes=189674691
Reduce input records=2082518
Reduce output records=5700000
Spilled Records=4165036
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=4552
CPU time spent (ms)=179120
Physical memory (bytes) snapshot=4526735360
Virtual memory (bytes) snapshot=18876231680
Total committed heap usage (bytes)=4565499904
Phoenix MapReduce Import
Upserts Done=2082518
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=131364184
File Output Format Counters
Bytes Written=181692515
18/07/13 15:59:19 INFO mapreduce.AbstractBulkLoadTool: Loading HFiles from /tmp/5bd7b5b6-6a56-4eb6-b2cb-09f3794fd0f1
18/07/13 15:59:19 INFO zookeeper.RecoverableZooKeeper: Process identifier=hconnection-0x3eb83530 connecting to ZooKeeper ensemble=hdp15:2181,hdp16:2181,hdp14:2181
18/07/13 15:59:19 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=hdp15:2181,hdp16:2181,hdp14:2181 sessionTimeout=180000 watcher=org.apache.hadoop.hbase.zookeeper.PendingWatcher@356b73ad
18/07/13 15:59:19 INFO zookeeper.ClientCnxn: Opening socket connection to server hdp16.gzbigdata.org.cn/10.194.67.6:2181. Will not attempt to authenticate using SASL (unknown error)
18/07/13 15:59:19 INFO zookeeper.ClientCnxn: Socket connection established to hdp16.gzbigdata.org.cn/10.194.67.6:2181, initiating session
18/07/13 15:59:19 INFO zookeeper.ClientCnxn: Session establishment complete on server hdp16.gzbigdata.org.cn/10.194.67.6:2181, sessionid = 0x3645fa39f313754, negotiated timeout = 40000
18/07/13 15:59:19 INFO mapreduce.AbstractBulkLoadTool: Loading HFiles for T_PERSON from /tmp/5bd7b5b6-6a56-4eb6-b2cb-09f3794fd0f1/T_PERSON
18/07/13 15:59:19 WARN mapreduce.LoadIncrementalHFiles: managed connection cannot be used for bulkload. Creating unmanaged connection.
18/07/13 15:59:19 INFO zookeeper.RecoverableZooKeeper: Process identifier=hconnection-0x691fd3a8 connecting to ZooKeeper ensemble=hdp15:2181,hdp16:2181,hdp14:2181
18/07/13 15:59:19 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=hdp15:2181,hdp16:2181,hdp14:2181 sessionTimeout=180000 watcher=org.apache.hadoop.hbase.zookeeper.PendingWatcher@16716a0e
18/07/13 15:59:19 INFO zookeeper.ClientCnxn: Opening socket connection to server hdp16.gzbigdata.org.cn/10.194.67.6:2181. Will not attempt to authenticate using SASL (unknown error)
18/07/13 15:59:19 INFO zookeeper.ClientCnxn: Socket connection established to hdp16.gzbigdata.org.cn/10.194.67.6:2181, initiating session
18/07/13 15:59:19 INFO zookeeper.ClientCnxn: Session establishment complete on server hdp16.gzbigdata.org.cn/10.194.67.6:2181, sessionid = 0x3645fa39f313755, negotiated timeout = 40000
18/07/13 15:59:19 WARN hbase.HBaseConfiguration: Config option "hbase.regionserver.lease.period" is deprecated. Instead, use "hbase.client.scanner.timeout.period"
18/07/13 15:59:19 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 32438 for hbase on 10.194.67.4:8020
18/07/13 15:59:19 WARN hbase.HBaseConfiguration: Config option "hbase.regionserver.lease.period" is deprecated. Instead, use "hbase.client.scanner.timeout.period"
18/07/13 15:59:19 WARN hbase.HBaseConfiguration: Config option "hbase.regionserver.lease.period" is deprecated. Instead, use "hbase.client.scanner.timeout.period"
18/07/13 15:59:19 INFO hfile.CacheConfig: CacheConfig:disabled
18/07/13 15:59:19 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://hdp14.gzbigdata.org.cn:8020/tmp/5bd7b5b6-6a56-4eb6-b2cb-09f3794fd0f1/T_PERSON/M/855fac01686145859c36a14fa090909c first=00200114763CC76DE2614A94DA362C5A last=FFFFFF30763CC76D4129C9A7D2EBC8E6
18/07/13 15:59:20 INFO hdfs.DFSClient: Cancelling HDFS_DELEGATION_TOKEN token 32438 for hbase on 10.194.67.4:8020
18/07/13 15:59:20 INFO client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
18/07/13 15:59:20 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x3645fa39f313755
18/07/13 15:59:20 INFO zookeeper.ZooKeeper: Session: 0x3645fa39f313755 closed
18/07/13 15:59:20 INFO zookeeper.ClientCnxn: EventThread shut down
18/07/13 15:59:20 INFO mapreduce.AbstractBulkLoadTool: Incremental load complete for table=T_PERSON
18/07/13 15:59:20 INFO mapreduce.AbstractBulkLoadTool: Removing output directory /tmp/5bd7b5b6-6a56-4eb6-b2cb-09f3794fd0f1
通過上述結果可知在筆者機器各環境一般的情況下(5個節點,64G內存),大概批量寫入200萬數據用時兩分鍾左右。
0: jdbc:phoenix:hdp14,hdp15> select count(1) from T_PERSON;
+-----------+
| COUNT(1) |
+-----------+
| 1900000 |
+-----------+
小結:
1、速度:
CSV data can be bulk loaded with built in utility named psql. Typical upsert rates are 20K - 50K rows per second (depends on how wide are the rows).
解釋上述意思是:通過bulk loaded 的方式批量寫數據速度大概能達到20K-50K每秒。具體這個數值筆者也只是粗糙測試過,速度也還算挺不錯的。官網出處:https://phoenix.apache.org/faq.html
2、 通過測試確認批量導入會自動更新phoenix二級索引(這個結果不受是否先有hbase表的影響)。
3、導入文件編碼默認是utf-8格式。
4、mr方式支持的參數還有其他的具體如下:
5、mr方式導入數據默認會自動更新指定表的所有索引表,如果只需要更新指定的索引表可用-it 參數指定更新的索引表。對文件默認支持的分割符是逗號,參數為-d.
6、如果是想通過代碼方式批量導入數據,可以通過代碼先將數據寫到hdfs中,將mr批量導入方式寫到shell腳本中,再通過代碼調用shell腳本(寫批量執行命令的腳本)執行便可(這種方式筆者也沒有試過,等實際有需求再試試了,理論上應該是沒問題的)。
參考官網
https://phoenix.apache.org/bulk_dataload.html
