[saiku] 使用 Apache Phoenix and HBase 結合 saiku 做大數據查詢分析


 

 

saiku不僅可以對傳統的RDBMS里面的數據做OLAP分析,還可以對Nosql數據庫如Hbase做統計分析。

本文簡單介紹下一個使用saiku去查詢分析hbase數據的例子。

 

1、phoenix和hbase的關系

我們知道:hbase雖然好用,但是想用jdbc方式來查詢數據單純的hbase是辦不到的,這里需要借助一個JDBC中間件名叫phoenix(英文:鳳凰)來實現對HBASE的JDBC查詢。在phoenix中可以用簡單的sql語句來訪問hbase的數據。中間的轉換對用戶是透明的。

安裝只需3步:
1、下載phoenix並解壓到用戶家目錄
2、將phoenix/lib下的core包和client包拷貝到hbase的lib目錄下
3、將hbase的hbase-site.xml拷貝到phoenix的bin目錄下
注意,集群的每個節點都要如此配置哦

啟動phoenix:
進入phoenix/bin,輸入命令:./sqlline.py master:2181

master 是zookeeper節點的ip,通過hosts文件映射
2181是zookeeper客戶端的默認端口號

進入這個shell之后,可以通過phoenix的命令來操作hbase
比如:
!tables 查看所有表
!columns 表名稱 查看某個表的列結構
!quit 退出shell
其他的命令可以輸入help查看
普通sql語句直接執行sql操作

如果不用linux shell 客戶端,可以使用squirrel sql clinet 這個工具(類似於查詢mysql用navicat for mysql) phoenix 安裝使用教程:
http://www.cnblogs.com/laov/p/4137136.html

phoenix官網語法:
http://phoenix.apache.org/language/index.html

squirrel sql clinet 安裝使用教程:
http://blog.sina.com.cn/s/blog_79346ff80102v6hm.html

 

2、在項目中集成phoenix

 

准備工作:用phoenix創建表Company和Order4 ,為查詢列創建索引(耗磁盤資源) 批量導入測試數據

Company(ID-BIGINT,CODE-VARCHAR,NAME-VARCHAR)

ORDER4 

 

| | | ORDER4 | ID    | -5 | BIGINT |
| | | ORDER4 | CODE | 12 | VARCHAR |
| | | ORDER4 | NAME | 12 | VARCHAR |
| | | ORDER4 | STATUS | 12 | VARCHAR |
| | | ORDER4 | QUANTITY | 6 | FLOAT |
| | | ORDER4 | ORDERTYPE | 12 | VARCHAR |
| | | ORDER4 | DETAILSIZE | 6 | FLOAT |
| | | ORDER4 | COMPANYID | -5 | BIGINT |
| | | ORDER4 | CREATER | 12 | VARCHAR |
| | | ORDER4 | CREATE_TIME | 91 | DATE |
| | | ORDER4 | UPDATER | 12 | VARCHAR |
| | | ORDER4 | UPDATE_TIME | 91 | DATE |

 

建表sql例子

DROP TABLE IF EXISTS P_1000;
CREATE TABLE IF NOT EXISTS P_1000 (
    HOST CHAR(2) NOT NULL, DOMAIN VARCHAR NOT NULL, 
    FEATURE VARCHAR NOT NULL,
    USAGE.DATE VARCHAR, STATS.ACTIVE_VISITOR INTEGER 
  CONSTRAINT PK PRIMARY KEY (HOST, DOMAIN, FEATURE)
) SPLIT ON ('CSGoogle','CSSalesforce','EUApple','EUGoogle',
     'EUSalesforce', 'NAApple','NAGoogle','NASalesforce');

 

逐條插入數據

UPSERT INTO p_1000 VALUES('11','localhost1','localhost1','2015-10-11',3);
UPSERT INTO p_1000 VALUES('12','localhost2','localhost2','2015-10-12',31);
UPSERT INTO p_1000 VALUES('13','localhost3','localhost3','2015-10-13',67);

 

批量導入數據步驟

編寫建表sql保存到表名.sql

使用excel生成數據並保存為csv格式 名稱必須是表名.csv

編寫查詢測試sql保存為表名_test.sql

使用phoenix/bin下面的腳本psql.py來執行批量導入

 

#建表p_1000並導入數據並查詢出導入數據

./psql.py master,node1,node2 p_1000_table.sql p_1000.csv p_1000_select.sql

參數分別是:zookeeper節點、建表sql、數據文件、查詢sql

 

其他例子:

psql localhost my_ddl.sql
psql localhost my_ddl.sql my_table.csv
psql -t MY_TABLE my_cluster:1825 my_table2012-Q3.csv
psql -t MY_TABLE -h COL1,COL2,COL3 my_cluster:1825 my_table2012-Q3.csv
psql -t MY_TABLE -h COL1,COL2,COL3 -d : my_cluster:1825 my_table2012-Q3.csv

 

(1)導入jar 

phoenix-4.6.0-HBase-1.0-client-without-hbase.jar

phoenix-4.6.0-HBase-1.0-server.jar

/usr/lib/hbase/hbase.jar

/usr/lib/hadoop/hadoop-common.jar

/usr/lib/hadoop/hadoop-auth.jar

特別注意:如果出現類沖突,將phoenix的jar包優先置頂(Java Build Path)

(2)配置 datasource - order.txt

type=OLAP
name=ORDER_COMPANY
driver=mondrian.olap4j.MondrianOlap4jDriver
Locale=zh_CN
DynamicSchemaProcessor=mondrian.i18n.LocalizingDynamicSchemaProcessor
location=jdbc:mondrian:Jdbc=jdbc:phoenix:master,node1,node2;Catalog=res:saiku-schemas/order.xml;JdbcDrivers=org.apache.phoenix.jdbc.PhoenixDriver
username=name
password=pwd

(3)配置 schema - order.xml

注意表名不管定義時是什么樣,在schema文件中都必須大寫,否則會報錯 table undefined

公司表作為基礎信息表,和訂單表進行關聯。

<?xml version="1.0"?>
<Schema name="ORDER_COMPANY">
	<Dimension type="StandardDimension" name="COMPANY_DIMENSION">
		<Hierarchy hasAll="true" allMemberName="All Types">
			<Table name="COMPANY"></Table>
			<Level name="COMPANY_CODE" column="CODE" uniqueMembers="false"/>
			<Level name="COMPANY_NAME" visible="true" column="ID" nameColumn="NAME" table="COMPANY" type="String" uniqueMembers="false"/>
		</Hierarchy>
	</Dimension>
	<Cube name="ORDER_COMPANY_CUBE">
		<Table name="ORDER4"/>
		<DimensionUsage source="COMPANY_DIMENSION" name="USE_COMPANY_DIMENSION" visible="true" foreignKey="COMPANYID" highCardinality="false"></DimensionUsage>
		<Dimension name="ORDER_DIMENSION">
			<Hierarchy hasAll="true" allMemberName="All Types">
				<Level name="Date" column="CREATE_TIME" uniqueMembers="false"/>
			</Hierarchy>
		</Dimension>
		<Measure name="QUANTITY" column="QUANTITY" aggregator="sum" formatString="Standard"/>
	</Cube>
</Schema>

(4)修改Mondrina的源代碼,重編譯到項目中

在查詢的時候,需要將大數據表放在所有表之前,不然查詢會報錯

比如:ORDER4 100多萬  company 4條

select * from ORDER4 as o,COMPANY as c where o.companyid = c.id //可以正常查詢

select * from COMPANY as c,ORDER4 as o where o.companyid = c.id //報錯

Error: Encountered exception in sub plan [0] execution.
SQLState: null
ErrorCode: 0

RolapStar.addToFrom -》 將mdx查詢語句轉換為傳統sql查詢語句

query.addFrom(relation, alias, failIfExists);
//將這一句挪到方法最后,這樣就調換了事實表(order4大數據表-在前)和 維度表(company小表-在后)
public void addToFrom(
            SqlQuery query,
            boolean failIfExists,
            boolean joinToParent)
        {
            Util.assertTrue((parent == null) == (joinCondition == null));
            if (joinToParent) {
                if (parent != null) {
                    parent.addToFrom(query, failIfExists, joinToParent);
                }
                if (joinCondition != null) {
                    query.addWhere(joinCondition.toString(query));
                }
            }
            query.addFrom(relation, alias, failIfExists);//將這一句挪到方法最后
        }

 

翻譯的sql語句(能良好執行的)

select
    "COMPANY"."CODE" as "c0",
    "COMPANY"."ID" as "c1",
    "ORDER4"."CREATE_TIME" as "c2",
    sum("ORDER4"."QUANTITY") as "m0"
from
    "ORDER4" as "ORDER4",//大數據表在前
    "COMPANY" as "COMPANY"//小數據表在后
where
    "ORDER4"."COMPANYID" = "COMPANY"."ID"
group by
    "COMPANY"."CODE",
    "COMPANY"."ID",
    "ORDER4"."CREATE_TIME"

 

測試結果

多表聯合單維度(10s左右)

 

多表聯合多維度(據情況而定)

 

備注:mondrian 和 phoenix 集成支持表join ,但是只能是事實表關聯維度表,查詢速度才正常,查詢效率與mysql 比較,無明顯提升,可能還有點慢,但是至少解決了數據倉庫在hbase中,使用saiku做分析是沒問題,經測試數據量在102w事實表和4條維度表關聯查詢 ,基本保持在7-10秒之間。總之:大數據表在前就對了!

 

參考資料:

https://blogs.apache.org/phoenix/entry/olap_with_apache_phoenix_and

 

phoenix-jdbc測試類

package org.saiku.database;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.phoenix.jdbc.Jdbc7Shim.Statement;

/**
 * 連接數據庫的工具類,被定義成不可繼承且是私有訪問
 */
public final class PhoenixDBUtils {
    // private static String url = "jdbc:mysql://localhost:3306/testdb";
    // private static String user = "user";
    // private static String psw = "pwd";

    private static String url = "jdbc:phoenix:master,node1,node2";
    private static String user = "hadoop";
    private static String psw = "hadoop";

    private static Connection conn;
    private static Statement statement;

    static {
        try {
            // Class.forName("com.mysql.jdbc.Driver");
            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public static void main(String args[]) throws SQLException {
        conn = DriverManager.getConnection(url, user, psw);
        statement = (Statement)conn.createStatement();
        System.out.println("HI,The connection is:" + conn);
        System.out.println("HI,The statement is:" + statement);
        
        String sql = "select * from student";
        sql = "select \"data\".\"xxid\",\"data\".\"xsrs\" from student";
        PreparedStatement ps1 = conn.prepareStatement(sql);
        ResultSet rs1 = ps1.executeQuery();
        System.out.println("ResultSet is : " +rs1);
        List list = resultSetToList(rs1);
        System.out.println("LIST is : "+ list);
    }

    private PhoenixDBUtils() {
    }

    /**
     * 獲取數據庫的連接
     * 
     * @return conn
     */
    public static Connection getConnection() {
        if (null == conn) {
            try {
                conn = DriverManager.getConnection(url, user, psw);
            } catch (SQLException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }
        return conn;
    }

    public static Statement getStatement() {
        if (null == statement) {
            try {
                statement = (Statement) PhoenixDBUtils.getConnection().createStatement();
            } catch (SQLException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }
        return statement;
    }

    /**
     * 釋放資源
     * 
     * @param conn
     * @param pstmt
     * @param rs
     */
    public static void closeResources(Connection conn, PreparedStatement pstmt,
            ResultSet rs) {
        if (null != rs) {
            try {
                rs.close();
            } catch (SQLException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            } finally {
                if (null != pstmt) {
                    try {
                        pstmt.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    } finally {
                        if (null != conn) {
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                e.printStackTrace();
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            }
        }
    }

    /**
     * 
     * @Method: com.wdcloud.sql.DBUtils.java
     * @Description: TODO 將ResultSet轉成list
     * @author: luoshoulei
     * @date: 2015年11月19日 下午2:08:25
     * @version: 1.0
     * @param rs
     * @return
     * @throws java.sql.SQLException
     * @List
     * @update [日期YYYY-MM-DD] [更改人姓名][變更描述]
     */
    public static List resultSetToList(ResultSet rs)
            throws java.sql.SQLException {
        if (rs == null)
            return Collections.EMPTY_LIST;
        ResultSetMetaData md = rs.getMetaData(); // 得到結果集(rs)的結構信息,比如字段數、字段名等
        int columnCount = md.getColumnCount(); // 返回此 ResultSet 對象中的列數
        List list = new ArrayList();
        Map rowData = new HashMap();
        while (rs.next()) {
            rowData = new HashMap(columnCount);
            for (int i = 1; i <= columnCount; i++) {
                rowData.put(md.getColumnName(i), rs.getObject(i));
            }
            list.add(rowData);
        }
        return list;
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM