Sqoop介紹
Sqoop是一個用來將Hadoop和關系型數據庫中的數據相互轉移的工具,可以將一個關系型數據庫(例如:MySQL ,Oracle ,Postgres等)中的數據導進到Hadoop的HDFS中,也可以將HDFS的數據導進到關系型數據庫中。(本文檔中使用的是sqoop-1.4.2-cdh4.2.1版本)
為什么要使用sqoop
因為業務的不斷增長導致mysql數據庫中數據太多,影響數據查詢效率,所以使用sqoop把現有mysql中數據導入到HBase中,使用HBase支撐大數據量的查詢。
Sqoop的安裝
下載
wget http://archive.cloudera.com/cdh4/cdh/4/sqoop-1.4.2-cdh4.2.1.tar.gz
配置
sqoop-env.sh
cd ${sqoop_home}/conf
復制創建sqoop-env.sh
cp sqoop-env-template.sh sqoop-env.sh
修改 sqoop-env.sh 加入下面三行,根據集群情況填寫
HADOOP_COMMON_HOME=/home/cdh4-test/hadoop-2.0.0-cdh4.2.1
HADOOP_MAPRED_HOME=/home/cdh4-test/hadoop-2.0.0-mr1-cdh4.2.1
HBASE_HOME=/home/cdh4-test/hbase-0.94.2-cdh4.2.1
添加jar包
復制集群中hadoop的core包以及mysql-jdbc包到sqoop-lib目錄下。
測試
進入sqoop-bin目錄下執行命令
./sqoop-list-tables --connect jdbc:mysql://you_mysql_address:port/you_db_name --username mysqlusername --P
然后提示輸入密碼,輸入數據庫登錄密碼即可。然后終端顯示該數據庫下的所有表名稱。
Sqoop安裝成功
使用sqoop把mysql中數據導入hbase中
命令:
./sqoop-import --connect jdbc:mysql://xxx.xxx.xxx.xxx/service --table ddt_map_info_copy --hbase-create-table --hbase-table A --column-family info --split-by mapId --username root --P --compression-codec lzo -z
--table ddt_map_info_copy 表示導出gamewaveservice數據庫的ddt_map_info_copy表。
--hbase-table A 表示在復制到表A中。
--column-family person 表示在表A中建立列族person。
--hbase-row-key mapId 表示表A的row-key是map_info_copy表的mapId字段。
--hbase-create-table 表示在HBase中建立表。
--username root 表示使用用戶root連接Mysql。
-z 表示采用壓縮方式
--compression-codec lzo 表示用lzo方式壓縮 無效
遇到的問題及結局方案
Q1:需要根據業務邏輯對HBase的row-key做出調整。
A1:有兩種解決方案
1) 在數據導入之前對mysql原有表結構做出調整,增加一列存放即將作為Hbase row-key數據。
操作步驟:根據業務邏輯寫出sql並update相應表的每一行。
2) 擴展PutTransformer類,自定義導入HBase的put格式。並在執行導入命令時指定該類
操作步驟:
package com.gamewave.sqoop.extensions.tansformat_row_key;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.sqoop.hbase.PutTransformer;
public class DdtMapInfoTransFormat extends PutTransformer {
public static final Log LOG = LogFactory.getLog(DdtMapInfoTransFormat.class.getName());
private Map<String, byte[]> serializedFieldNames;
public DdtMapInfoTransFormat() {
serializedFieldNames = new TreeMap<String, byte[]>();
}
/**
* Return the serialized bytes for a field name, using the cache if it's
* already in there.
*/
private byte[] getFieldNameBytes(String fieldName) {
byte[] cachedName = serializedFieldNames.get(fieldName);
if (null != cachedName) {
// Cache hit. We're done.
return cachedName;
}
// Do the serialization and memoize the result.
byte[] nameBytes = Bytes.toBytes(fieldName);
serializedFieldNames.put(fieldName, nameBytes);
return nameBytes;
}
@Override
/** {@inheritDoc} */
public List<Put> getPutCommand(Map<String, Object> fields)throws IOException {
String rowKeyCol = getRowKeyColumn();
String colFamily = getColumnFamily();
byte[] colFamilyBytes = Bytes.toBytes(colFamily);
Object rowKey = fields.get(rowKeyCol);
if (null == rowKey) {
// If the row-key column is null, we don't insert this row.
LOG.warn("Could not insert row with null value for row-key column: "+ rowKeyCol);
return null;
}
Put put = new Put(Bytes.toBytes(rowKey.toString() + ":custom"));
for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) {
String colName = fieldEntry.getKey();
if (!colName.equals(rowKeyCol)) {
// This is a regular field, not the row key.
// Add it if it's not null.
Object val = fieldEntry.getValue();
if (null != val) {
put.add(colFamilyBytes, getFieldNameBytes(colName),Bytes.toBytes(val.toString()));
}
}
}
return Collections.singletonList(put);
}
}
1、擴展PutTransformer類
上面代碼做出示例,紅色字體為修改行,藍底字體為添加內容。
2、包並放進sqoop-lib目錄下,執行命令
./sqoop-import -D sqoop.hbase.insert.put.transformer.class=com.gamewave.sqoop.extensions.tansformat_row_key.DdtMapInfoTransFormat –connect jdbc:mysql://xxx.xxx.xxx.xxx/service --table ddt_map_info_copy --hbase-create-table --hbase-table ddt_map_info_copy --column-family info --split-by mapId --username root --P --compression-codec lzo
命令中-D為指定屬性名稱 參數值為鍵值對
sqoop.hbase.insert.put.transformer.class屬性值為指定格式化類名稱
3、驗證
進入hbase shell
可以查看得到rowkey后綴均為:custom,修改成功
