1、spark的ThriftServer介紹
spark的thriftServer是在hiveServer2基礎上實現的 , 提供了Thrift服務
,也就是提供了遠程的ODBC服務
用戶畫像,對受眾目標進行篩選 , 數據源是在hive數倉的;
后來改用了thriftServer,采用jdbc的方式,直接去讀取數據倉庫內容,性能在一定程度得到了提升
2、ThriftServer的缺陷
我們使用了spark的ThriftServer,雖然性能得到了提升,但是安全並沒有得到保障
因為spark的ThriftServer並沒有 類似hive里面的服務自主發現功能;
所以spark的thriftServer就出現了單點問題
3、ThriftServer的解決
解決這個問題,首先要搞懂ThriftServer的啟動流程大概 什么樣子
3.1、ThriftServer的啟動流程
1):我們在啟動thrift server,是通過腳本start-thriftserver.sh
2):然后在進入spark-daemon.sh 這個腳本 , 攜帶CLASS參數進行啟動
3):然后看bin/spark-submit.sh
然后sparksubmit,會去執行一個叫做:HiveThriftServer2的類;
3.2、解決單點問題
3.2.1、內部流程說明
1):首先進入HiveThriftServer2,會執行main函數
這樣,最后在server.start() 這里,就啟動了
啟動的服務是HiveThriftServer2
其中的綁定服務addService,就是在ArrayList去維護當前的service
所以在調用initCompositeService,開始啟動服務,會最終調用到HiveServer2
這個init方法是個接口:
直接看實現,是在HiveService2的init方法:
那么最后我們這個流程是怎么啟動的呢?
然后我們 查看parse方法
然后查看StartOptionExecutor
然后查看startHiveServer2
那么我們遇到的問題是sparkThriftServer存在單點問題 ,但是尷尬的是,spark本身沒有給出解決方案,
但是hive是有的,hive支持了hiveServer2服務的高可用;
通過配置就可以實現;
所以我們可以通過zookeeper來實現spark的 ThriftServer的高可用
3.2.3、在HiveThriftServer2啟動的時候,讓他在zookeeper里面注冊一份信息,代表當前第一個實例
所以需要在:
1):實例化一個全局的hiveConf,用來調用hive的一些回調函數,
主要是可以去hive-site.xml里面拿配置文件
<property>
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
2):在啟動HiveThriftServer2的時候,把hiveServer2注冊到zookeeper中
HiveThriftServer2 實現了HIveServer2
而且是使用反射處理的(早期hive開發的時候,就已經在內部使用private,可能沒想過別人 會調用)
3):在停止HiveThriftServer2的時候,把hiveserrver2從zookeeper中注銷掉
3.3、修改hiveServer2源碼
1):添加獲取thriftServer的IP:HOST
拿到thriftServer的域名
拼接ThriftServer的地址
2):對zookeeper做kuberos認證
4):控制是否需要重新在zookeeper上注冊hiveServer2
6):把服務注冊到zookeeper中
1 private CuratorFramework zooKeeperClient; 2 private String znodePath; 3 /** 4 * 把服務注冊到zookeeper中(通過反射調用) 5 * @param hiveConf 6 * @throws Exception 7 */ 8 private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { 9 //從hiveConf中獲取zookeeper地址 10 String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); 11 //從hive-site.xml中獲取hive.server2.zookeeper.namespace的配置信息 12 String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); 13 //獲取用戶提供的thriftServer地址(IP:PORT) node01:10001 , node02:10002 14 String instanceURI = getServerInstanceURI(); 15 //做Kerberos認證 16 setUpZooKeeperAuth(hiveConf); 17 //獲取hive連接zookeeper的session超時時間 18 int sessionTimeout = 19 (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, 20 TimeUnit.MILLISECONDS); 21 //hive連接zookeeper的等待時間 22 int baseSleepTime = 23 (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 24 TimeUnit.MILLISECONDS); 25 //hive連接zookeeper的最大重試次數 26 int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); 27 // 獲取zookeeper客戶端 28 zooKeeperClient = 29 CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) 30 .sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider) 31 .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build(); 32 //啟動zookeeper客戶端 33 zooKeeperClient.start(); 34 //TODO 在zookeeper上根據rootNamespace創建一個空間(用來存儲數據的文件夾) 35 try { 36 zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) 37 .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); 38 LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2"); 39 } catch (KeeperException e) { 40 if (e.code() != KeeperException.Code.NODEEXISTS) { 41 LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e); 42 throw e; 43 } 44 } 45 //TODO 把hiveserver2的信息注冊到rootNamespace下: 46 // serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005 47 try { 48 String pathPrefix = 49 ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace 50 + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";" 51 + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence="; 52 byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8")); 53 //創建一個臨時節點 54 znode = 55 new PersistentEphemeralNode(zooKeeperClient, 56 PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8); 57 znode.start(); 58 59 //給定臨時節點創建的超時時間,如果超過120秒,則拋異常 60 long znodeCreationTimeout = 120; 61 if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { 62 throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); 63 } 64 65 66 setDeregisteredWithZooKeeper(false); 67 znodePath = znode.getActualPath(); 68 // TODO 添加zk的watch , 如果服務不見了,需要第一時間watche到 69 if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) { 70 // No node exists, throw exception 71 throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper."); 72 } 73 LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI); 74 } catch (Exception e) { 75 LOG.fatal("Unable to create a znode for this server instance", e); 76 if (znode != null) { 77 znode.close(); 78 } 79 throw (e); 80 } 81 }
7):移除znode,代表當前程序關閉
1 //移除znode,代表當前程序關閉(通過反射調用) 2 private void removeServerInstanceFromZooKeeper() throws Exception { 3 setDeregisteredWithZooKeeper(false); 4 5 if (znode != null) { 6 znode.close(); 7 } 8 zooKeeperClient.close(); 9 LOG.info("Server instance removed from ZooKeeper."); 10 }
3:修改spark-daemon.sh腳本
4、maven打包
mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Dscala-2.11 -DskipTests clean package
但是以上操作只是編譯出了一份文件,並不是可運行的包,所以需要稍微改一下命令:
./make-distribution.sh --name 2.6.0-cdh5.14.0 --tgz -Pyarn -Phive -Phive-thriftserver -Phadoop-2.6 -Dscala-2.11 -Dhadoop.version=hadoop-2.6.0-cdh5.14.0 -DskipTests clean package
5、將新生成的spark-hive-thriftserver_2.11-2.1.0.jar替換spark/jars 下面的同名包
6、修改hive/conf和spark/conf下面的配置文件:hive-site.xml
在里面添加如下配置:
<property> <name>hive.server2.support.dynamic.service.discovery</name> <value>true</value> </property> <property> <name>hive.server2.zookeeper.namespace</name> <value>hiveserver2_zk</value> </property> <property <name>hive.zookeeper.quorum</name> <value>cdh1:2181,cdh2:2181,cdh3:2181</value> </property> <property> <name>hive.zookeeper.client.port</name> <value>2181</value> </property> <property> <name>hive.server2.thrift.bind.host</name> <value>cdh1</value> </property> <property> <name>hive.server2.thrift.port</name> <value>10003</value> </property>
7、驗證
分別在某台機器上執行兩次:
sbin/start-thriftserver.sh \ --master yarn \ --conf spark.driver.memory=1G \ --executor-memory 512m \ --num-executors 1 \ --hiveconf hive.server2.thrift.port=10001/10002
2):在zookeeper中可以看到如下信息:
3):通過beeline模擬jdbc去查詢hive
[root@cdh2 spark-2.1.0-bin-2.6.0-cdh5.14.0]# beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://cdh1:2181,cdh2:2181,cdh3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk
去頁面觀察:
殺掉一個thriftServer
在重新登錄beeline,在做查詢,可以發現依然可以查詢