spark+phoenix


 

phoenix作為查詢引擎,為了提高查詢效率,為phoenix表創建了二級索引,而數據是sparkstreaming通過hbase api直接向hbase插數據。那么問題來了,對於phoenix的二級索引,直接插入底層hbase的源表,不會引起二級索引的更新,從而導致phoenix索引數據和hbase源表數據不一致。而對於spark+phoenix的寫入方式,官方有文檔說明,但是有版本限制,以下是官方原文:

    • To ensure that all requisite Phoenix / HBase platform dependencies are available on the classpath for the Spark executors and drivers, set both ‘spark.executor.extraClassPath’ and ‘spark.driver.extraClassPath’ in spark-defaults.conf to include the ‘phoenix-<version>-client.jar’
    • Note that for Phoenix versions 4.7 and 4.8 you must use the ‘phoenix-<version>-client-spark.jar’. As of Phoenix 4.10, the ‘phoenix-<version>-client.jar’ is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the spark16 maven profile.

所以只能考慮用jdbc的方式做。

我使用的版本信息:

  • spark:2.2.1
  • phoenix:4.13.2

jar包引入:

  •  <dependency>
                <groupId>org.apache.phoenix</groupId>
                <artifactId>phoenix-core</artifactId>
                <version>4.13.1-HBase-1.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.phoenix</groupId>
                <artifactId>phoenix-spark</artifactId>
                <version>4.13.1-HBase-1.2</version>
            </dependency>

     

phoenixUtil類:

  • public class PhoenixUtil {
    
        private static LinkedList<Connection> connectionQueue;
    
        static {
            try {
                Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    
        public synchronized static Connection getConnection() throws SQLException {
            try {
                if (connectionQueue == null){
                    connectionQueue = new LinkedList<Connection>();
                    for (int i = 0;i < 3;i++){
                        Connection conn = DriverManager.getConnection("jdbc:phoenix:hostname:2181");
    
                        connectionQueue.push(conn);
                    }
                }
            }catch (Exception e1){
                e1.printStackTrace();
            }
            return connectionQueue.poll();
        }
    
        public static void returnConnection(Connection conn){
            connectionQueue.push(conn);
        }

     

在sparkstreaming中引入phoenixUtil類(由於業務關系,這里使用的是statement):

saveLines.foreachRDD(rdd -> { rdd.foreachPartition(p -> { Connection conn = PhoenixUtil.getConnection(); Statement stmt = conn.createStatement(); conn.setAutoCommit(false); //業務邏輯 //sql } stmt.addBatch(sql); } stmt.executeBatch(); conn.commit(); stmt.close(); PhoenixUtil.returnConnection(conn); ZkKafkaUtil.updateOffset(offsetRanges, GROUP_ID, TOPIC); }); });

最后,如果大家有更好的方式處理這個問題,歡迎指教。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM