mark下關於如何讓spark的ThriftServer高可用


 

 

1、spark的ThriftServer介紹

spark的thriftServer是在hiveServer2基礎上實現的 , 提供了Thrift服務

,也就是提供了遠程的ODBC服務

用戶畫像,對受眾目標進行篩選 , 數據源是在hive數倉的;

早期直接通過hive on spark的方式去做處理 , 但是發現性能不好,處理慢

后來改用了thriftServer,采用jdbc的方式,直接去讀取數據倉庫內容,性能在一定程度得到了提升

2、ThriftServer的缺陷

我們使用了spark的ThriftServer,雖然性能得到了提升,但是安全並沒有得到保障

因為spark的ThriftServer並沒有 類似hive里面的服務自主發現功能;

所以spark的thriftServer就出現了單點問題

 

3、ThriftServer的解決

解決這個問題,首先要搞懂ThriftServer的啟動流程大概 什么樣子

3.1、ThriftServer的啟動流程

1):我們在啟動thrift server,是通過腳本start-thriftserver.sh

 

 

 2):然后在進入spark-daemon.sh 這個腳本 , 攜帶CLASS參數進行啟動

 

 

 

 

 

 3):然后看bin/spark-submit.sh

 

 

 然后sparksubmit,會去執行一個叫做:HiveThriftServer2的類;

3.2、解決單點問題

3.2.1、內部流程說明

1):首先進入HiveThriftServer2,會執行main函數

 

 

 

 

 

這樣,最后在server.start() 這里,就啟動了

啟動的服務是HiveThriftServer2

 

 

 其中的綁定服務addService,就是在ArrayList去維護當前的service

 

 

 所以在調用initCompositeService,開始啟動服務,會最終調用到HiveServer2

 

 

 

這個init方法是個接口:

直接看實現,是在HiveService2的init方法:

 

 

 那么最后我們這個流程是怎么啟動的呢?

 

 

 然后我們 查看parse方法

 

 

 然后查看StartOptionExecutor

 

 

 然后查看startHiveServer2

 

 

 

那么我們遇到的問題是sparkThriftServer存在單點問題 ,但是尷尬的是,spark本身沒有給出解決方案,

但是hive是有的,hive支持了hiveServer2服務的高可用;

通過配置就可以實現;

所以我們可以通過zookeeper來實現spark的 ThriftServer的高可用

3.2.2、把thriftServer模塊導入到工程

源碼工程的總pom文件

 

 

 

3.2.3、在HiveThriftServer2啟動的時候,讓他在zookeeper里面注冊一份信息,代表當前第一個實例

所以需要在:

1):實例化一個全局的hiveConf,用來調用hive的一些回調函數,

主要是可以去hive-site.xml里面拿配置文件

<property>
        <name>hive.server2.support.dynamic.service.discovery</name>
        <value>true</value>
</property>

 

 

2):在啟動HiveThriftServer2的時候,把hiveServer2注冊到zookeeper中

HiveThriftServer2 實現了HIveServer2

而且是使用反射處理的(早期hive開發的時候,就已經在內部使用private,可能沒想過別人 會調用)

 

 

3):在停止HiveThriftServer2的時候,把hiveserrver2從zookeeper中注銷掉

 

 

3.3、修改hiveServer2源碼

1):添加獲取thriftServer的IP:HOST

拿到thriftServer的域名

 

 拼接ThriftServer的地址

 

 

2):對zookeeper做kuberos認證

 

 

3):添加zookeeper的權限控制

 

 

4):控制是否需要重新在zookeeper上注冊hiveServer2

 

 

5):zk的監控者,如果發現注冊信息為null,會觸發監控,然后關掉當前注冊hiveServer2的實例信息

 

 

接下來重點來了,上面所有的操作,都是為了給這個方法做鋪墊服務的

6):把服務注冊到zookeeper中

 

 1 private CuratorFramework zooKeeperClient;
 2   private String znodePath;
 3   /**
 4    * 把服務注冊到zookeeper中(通過反射調用)
 5    * @param hiveConf
 6    * @throws Exception
 7    */
 8   private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
 9     //從hiveConf中獲取zookeeper地址
10     String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
11     //從hive-site.xml中獲取hive.server2.zookeeper.namespace的配置信息
12     String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
13     //獲取用戶提供的thriftServer地址(IP:PORT) node01:10001 , node02:10002
14     String instanceURI = getServerInstanceURI();
15     //做Kerberos認證
16     setUpZooKeeperAuth(hiveConf);
17     //獲取hive連接zookeeper的session超時時間
18     int sessionTimeout =
19             (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
20                     TimeUnit.MILLISECONDS);
21     //hive連接zookeeper的等待時間
22     int baseSleepTime =
23             (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
24                     TimeUnit.MILLISECONDS);
25     //hive連接zookeeper的最大重試次數
26     int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
27     // 獲取zookeeper客戶端
28     zooKeeperClient =
29             CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
30                     .sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider)
31                     .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
32     //啟動zookeeper客戶端
33     zooKeeperClient.start();
34     //TODO 在zookeeper上根據rootNamespace創建一個空間(用來存儲數據的文件夾)
35     try {
36       zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
37               .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
38       LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
39     } catch (KeeperException e) {
40       if (e.code() != KeeperException.Code.NODEEXISTS) {
41         LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
42         throw e;
43       }
44     }
45     //TODO 把hiveserver2的信息注冊到rootNamespace下:
46     // serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005
47     try {
48       String pathPrefix =
49               ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
50                       + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
51                       + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
52       byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
53       //創建一個臨時節點
54       znode =
55               new PersistentEphemeralNode(zooKeeperClient,
56                       PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
57       znode.start();
58 
59       //給定臨時節點創建的超時時間,如果超過120秒,則拋異常
60       long znodeCreationTimeout = 120;
61       if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
62         throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
63       }
64 
65 
66       setDeregisteredWithZooKeeper(false);
67       znodePath = znode.getActualPath();
68       // TODO 添加zk的watch , 如果服務不見了,需要第一時間watche到
69       if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
70         // No node exists, throw exception
71         throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
72       }
73       LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
74     } catch (Exception e) {
75       LOG.fatal("Unable to create a znode for this server instance", e);
76       if (znode != null) {
77         znode.close();
78       }
79       throw (e);
80     }
81   }

7):移除znode,代表當前程序關閉

 1   //移除znode,代表當前程序關閉(通過反射調用)
 2   private void removeServerInstanceFromZooKeeper() throws Exception {
 3     setDeregisteredWithZooKeeper(false);
 4 
 5     if (znode != null) {
 6       znode.close();
 7     }
 8     zooKeeperClient.close();
 9     LOG.info("Server instance removed from ZooKeeper.");
10   }

3:修改spark-daemon.sh腳本

 

 

 

4、maven打包

mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Dscala-2.11 -DskipTests clean package

但是以上操作只是編譯出了一份文件,並不是可運行的包,所以需要稍微改一下命令:

./make-distribution.sh --name 2.6.0-cdh5.14.0 --tgz -Pyarn -Phive -Phive-thriftserver -Phadoop-2.6 -Dscala-2.11 -Dhadoop.version=hadoop-2.6.0-cdh5.14.0 -DskipTests clean package

 

5、將新生成的spark-hive-thriftserver_2.11-2.1.0.jar替換spark/jars 下面的同名包

6、修改hive/conf和spark/conf下面的配置文件:hive-site.xml

在里面添加如下配置:

<property>
    <name>hive.server2.support.dynamic.service.discovery</name>
    <value>true</value>
</property>


<property>
    <name>hive.server2.zookeeper.namespace</name>
    <value>hiveserver2_zk</value>
</property>

<property
    <name>hive.zookeeper.quorum</name>
    <value>cdh1:2181,cdh2:2181,cdh3:2181</value>
</property>

<property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
</property>

<property>
    <name>hive.server2.thrift.bind.host</name>
    <value>cdh1</value>
</property>

<property>
    <name>hive.server2.thrift.port</name>
    <value>10003</value>
</property>

7、驗證

1):

分別在某台機器上執行兩次:

sbin/start-thriftserver.sh \
--master yarn \
--conf spark.driver.memory=1G \
--executor-memory 512m \
--num-executors 1 \
--hiveconf hive.server2.thrift.port=10001/10002

 

2):在zookeeper中可以看到如下信息:

 

 3):通過beeline模擬jdbc去查詢hive

[root@cdh2 spark-2.1.0-bin-2.6.0-cdh5.14.0]# beeline
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://cdh1:2181,cdh2:2181,cdh3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk

 

 

去頁面觀察:

 

殺掉一個thriftServer

 

 

在重新登錄beeline,在做查詢,可以發現依然可以查詢

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM