一、搭建准備環境
在搭建Hive
和SparkSql
進行整合之前,首先需要搭建完成HDFS
和Spark
相關環境
這里使用Hive
和Spark
進行整合的目的主要是:
1、使用
Hive
對SparkSql
中產生的表或者庫的元數據進行管理(因為SparkSql
沒有提供相關的功能,官方提供的是和Hive
的整合方案,官方之所以不在獨立去開發一個元數據管理
模塊是為了防止重復造輪子),所以直接復用了Hive
的元數據管理這一套內容2、單獨使用
Hive
的話速度太慢,所以在前期就打算切換到Spark作為計算引擎
,然后使用了Spark的thriftserver
向外提供JDBC
相關的服務
環境准備:
hadoop版本:Hadoop-2.7.7
spark版本:Spark-2.4.0
相關安裝包准備:
Mysql
:mysql57-community-release-el7-11.noarch.rpm
Hive
:apach-hive-1.2.2.tar.gz
Mysql驅動
:mysql-connector-java-5.1.47-bin.jar
如上內容是安裝Hive
所需要的環境
這里說一下Hive
官方所推薦的Hive On Spark
安裝方法,官方說需要編譯一個不包含Hive依賴的純凈版本的Spark版本
,但是使用這種方式編譯出來的內容和Spark
官方編譯出來的包,缺少了一些依賴,主要是在組件上,比如K8s\docker\parquert等等
所以不用這種方式;並且Hive
官方的支持也很慢,從官方看,沒有經過測試的版本和Spark2.4
進行整合的穩定版本;Spark
官方目前還是集成的Hive1.2.1
版本的Hive
,所以這里選擇跟着Spark官方走
我們后面還是主要使用的是SparkSql
作為主要的使用手段,只是借助了Hive
作為元數據管理的角色,至於他的那些新特性,到后面Spark
進行升級的時候再進行考慮
二、Mysql搭建
這里暫時就單節點進行部署,到后面可能會考慮主-主
備份的方式保證元數據安全,現在數據很少就不進行考慮了!如果后面再進行搭建,直接加一個節點部署就完了
首先我目前的服務器上是沒有安裝mysql的,如果安裝了mysql的需要自己注意,我先檢查Mysql
是否有其他版本內容,若果有卸載,其他環境的話需要注意,不要印象其他業務,這里Mysql
可以部署在任何一台能訪問的機器上
查看是否有多余的包
$ rpm -qa |grep -i mysql
刪除不需要或者多余的包
$ yum remove mysql-community mysql-community-server mysql-community-libs mysql-community-common
刪除Mysql安裝生成的路徑
$ whereis mysql
根據情況刪除出現的路徑
安裝msyql
$ wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
$ yum -y install mysql57-community-release-el7-10.noarch.rpm
$ yum -y install mysql-community-server
$ systemctl start mysqld.service
$ systemctl status mysqld.service #查看是否啟動
$ cat /var/log/mysqld.log | grep password #記住默認密碼,等會還要進行修改
$ mysql -uroot -p #此時登錄進去,需要重設密碼
mysql > set global validate_password_policy='LOW'; #密碼校驗不區分大小寫
mysql > set global validate_password_length=6; #校驗時密碼長度
mysql > ALTER USER USER() IDENTIFIED BY '123456'; #在Mysql新 5.7.6 版本設置密碼使用
mysql > ALTER USER 'hive'@'%' IDENTIFIED BY '123456'; #設置密碼,這里我還把root設置(當前設置老版本使用,新版本報錯)
mysql > GRANT ALL ON *.* TO 'hive'@'%' IDENTIFIED BY '123456'; #賦權限
mysql > SELECT Host,User FROM user; #刪除掉一些多余的用戶和權限,防止權限影響
主要登錄的是hive
中所要使用的用戶,查看在當前機器和在其他機器上是否都能進行登錄和進行相關操作,如果一切正常,那么久可以進行Hive
相關內容搭建,如上內容可以在root
用戶或者hadoop
用戶都行
三、Hive環境搭建
這里Hive
只是作為元數據管理角色
使用,或者在某些情況下也可以使用Hive進行操作
,比如,HIve delete AND update
(sparkSql中不能進行刪除相關操作,但是Hive中也是需要進行配置才能開啟的,因為HDFS中的數據是不能進行修改的,所以基本上不推薦使用,這里也就不進行開啟,后面需要再開啟)
下面搭建需要再hadoop
用戶進行搭建
$ tar -zxvf apache-hive-1.2.2-bin.tar.gz
$ mv apache-hive-1.2.2-bin hive-1.2.2
配置Hive環境變量
/etc/profile
export HIVE_HOME=/home/hadoop/hive-1.2.2
export PATH=$PATH:$HIVE_HOME/bin
$ source /etc/profile
~/hive-1.2.2/conf/hive.site.xml
配置(在進行下面配置的時候,里面配置非常多,我將其刪除了,然后只留下如下配置,相當於只是覆蓋了下面內容)
$ cp ~/hive-1.2.2/conf/hive.default.template ~/hive-1.2.2/conf/hive.site.xml
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://master1:3306/hive?createDatabaseIfNotExist=true&useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>password to use against metastore database</description>
</property>
</configuration>
如上根據自己的Mysql地址進行配置相關地址、用戶名、密碼等信息
配置完成之后需要將改配置文件復制到spark/conf
目錄下,然后需要將mysql驅動
分別復制到hive/lib
和spark/jars
目錄下
上面內容設置完成之后需要進行
初始化元數據
$ schematool -dbType mysql -initSchema
數據初始換成之后會在Mysql
中創建hive
的庫,並且在Hdfs
中創建/tmp/hive
和/user/hive/warehouse
這兩個路徑都是可以在hive.site.xml
中進行設置的,配置完成之后進入hive
終端
$ ./hive #在使用過程中需要開啟yarn hdfs
hive > show databases;
hive > use default;
hive > show tables;
hive > create table spark_on_hive(id int,username String);
hive > insert into spark_on_hive values(1,'zhangsan');
hive > select * from spark_on_hive;
執行如上命令進行相關測試,看Hive
是否能夠進行正常工作
出現如上內容說明配置正確,下面和sparkSql
進行相關整合
四、Hive和SparkSql進行整合
上面內容中已近將Mysql驅動
和hive.site.xml
拷貝到了Spark
中,這個時候需要將Spark集群
進行重啟,重啟完成之后,需要啟動Spark的thriftserver
服務
啟動這個服務主要是通過這個服務進行監聽JDBC
的相關操作,默認端口為10000
$ ./spark-2.4.0/sbin/start-thriftserver.sh
啟動完成之后可以通過Spark
中自帶的beeline
服務來進行JDBC
連接,連接到SparkSql
$ ./spark-2.4.0/bin/beeline
beeline > !connect jdbc:hive2://master1:10000/default
Enty Username : hadoop
Enty password : (空)
如上內容說明配置成功,在這里面能夠查詢出剛才通過hive終端
創建的表和數據,同樣的可以進行相關查詢創建操作
beelin
可以使用hive
中的也可以使用spark
中帶的,都可以;
經過上面的測試說明jdbc
也是能夠進行訪問SparkSql
中的數據的,並且根據日志看出來所有的查詢或者是創建操作走的都是Spark
而不是MapReduce
如上內容可以看出走的是Spark
五、代碼驗證
通過使用代碼進行驗證是否能夠使用JDBC
的方式來獲取查詢創建SparkSql
中的數據和內容
創建一個Maven
項目
pom.xml
配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.isoon.sparksql</groupId>
<artifactId>sparkjdbc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<hive.version>1.2.1</hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.demo;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
/**
* created by mojita on 2019/4/8
*/
public class SparkSqlDemo {
//使用jdbc進行對sparksql相關操作
public static void main(String[] args) {
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
//這里需要進行配置相關驗證內容
Connection connection = DriverManager.getConnection("jdbc:hive2://192.168.8.236:10000/default","hadoop","");
ResultSet rs = connection.createStatement().executeQuery("select * from spark_on_hive");
while (rs.next()) {
System.out.printf("%d / %s\r\n", rs.getInt(1), rs.getString(2));
}
rs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
如上內容說明也是能夠訪問成功的
六、集群搭建總覽和動態資源配置
上面內容部署完成之后,只有Master
節點能夠提供訪問,下面再進行其他節點部署,使其也能夠提供正常訪問
1、將
Spark/conf
目錄下的hive-site.xml
文件拷貝到需要提供訪問的節點2、拷貝
mysql驅動
到指定節點的spark/jars
文件夾中3、需要開啟
spark/sbin/start-thriftserver.sh
服務
這里也只是多開啟了一個客戶端支持訪問,並不是高可用的,Mysql
同樣也不是高可用的
整個部署的結構如下:
節點 | hive與spark整合 | hive | mySql | thriftServer服務 |
---|---|---|---|---|
192.168.8.106 | * | * | * | * |
192.168.8.236 | * | * |
在啟動thriftserver服務
的時候可以設置JDBC提交Sql
執行資源情況
sh $SPARK_HOME/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000 \
--master MASTER_URL \ //master的URL,如spark://host:port, mesos://host:port, yarn, 或local
--queue queue_name \ //如果使用yarn模式,設置隊列名字
--num-executors NUM \ //executor的數目
--conf spark.driver.memory=40g \ //driver內存的大小
--driver-cores NUM \ //driver CPU數目,cluster模式才有這個參數
--executor-memory 6g \ //executor內存大小,如果開啟動態分配,這個就不需要了
--conf spark.yarn.executor.memoryOverhead=2048 \ //overhead大小
如上內容為實例,想要獲取更多關於./start-thriftserver.sh
服務的參數可以使用如下命令./start-thriftserver.sh --help
查看
使用上面內容進行提交的任務,一直會占用資源,下面進行提交和配置動態的資源分配,在任務執行完成之后根據策略回收Spark集群計算資源
設置任務動態資源提交,需要對spark配置文件進行修改spark/conf/spark-default.xml
配置Spark/conf/spark-default.xml
spark.shuffle.service.enabled true #默認值為false
spark.sql.warehouse.dir hdfs://mycluster/user/hive/warehouse
設置完成之后再每次提交任務的時候加入如下兩個參數,這兩個參數也可以在代碼中直接指定
--conf spark.dynamicAllocation.enabled=true
--conf spark.shuffle.service.enabled=true
下面是一些策略配置(下面這些都是默認值)
spark/conf/spark-default.xml
spark.dynamicAllocation.executorIdleTimeout 60s #60秒沒有任務請求刪除executors
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity #如果啟用了動態分配並且具有高速緩存數據塊的執行程序已空閑超過此持續時間,則將刪除executors
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors #啟用動態分配時要運行的初始執行程序數 如果設置了`--num-executors`(或`spark.executor.instances`)並且大於此值,它將用作執行程序的初始數。
spark.dynamicAllocation.maxExecutors infinity #啟用動態分配時執行程序數的上限。
spark.dynamicAllocation.minExecutors 0 #啟用動態分配時執行程序數的下限。
spark.dynamicAllocation.executorAllocationRatio 1
spark.dynamicAllocation.schedulerBacklogTimeout 1s #如果啟用了動態分配,並且已有掛起的任務積壓超過此持續時間,則將請求新的executor。
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout #與之相同spark.dynamicAllocation.schedulerBacklogTimeout,但僅用於后續執行程序請求。
#因為使用spark作為執行引擎,讓sparksql知道倉庫位置,配置默認倉庫位置,如果代碼中使用到別的倉庫可以手動指定
spark.sql.warehouse.dir hdfs://mycluster/user/hive/warehouse
如上內容根據自己需求進行配置,目前使用的都是默認的參數,沒有進行更多的修改
下面是啟動thriftserver服務
其實這個服務也是提交了一個application
所以和執行spark-submit application
是一樣的
./start-thriftserver.sh \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.driver.maxResultSize=10g \
--master spark://master1:7077 \
--driver-memory 10g \
--driver-cores 3 \
--executor-memory 6g \
--executor-cores 3 \
--total-executor-cores 220
下面只是一個示例,在連接的時候最好還是指定倉庫地址
$ ./bin/beeline --hiveconf hive.server2.thrift.port=1000 --hiveconf "hive.metastore.warehouse.dir=hdfs://master1:9000/user/hive/warehouse"