flink1.14.0中集成hive3.1.2


不想看太多的話,直接拉到第二部分操作步驟:


 

1. 是解決過程:


在flink1.14.0中已經移除sql-client-defaults.yml配置文件了。

參考地址:https://issues.apache.org/jira/browse/FLINK-21454

 

 

於是我順着這個issue找到了FLIP-163這個鏈接。

https://cwiki.apache.org/confluence/display/FLINK/FLIP-163%3A+SQL+Client+Improvements

Use commands to configure the client
Currently sql-client uses a YAML file to configure the client, which has its own grammar rather than the commands used in the client. 
It causes overhead for users because users have to study both gramars and it's very tricky for users to debug the YAML problems.
Considering the Table Api has developed sophisticated grammar, it
's better to use the same commands in the client for users to
configure their settings including set the runtime/table settings and register Catalog/Module/Database/Table/Function. For better understanding, the chart below lists the relationship between YAML and the command in the client.

看不懂沒關系,直接翻譯:

 

也就是目前這個sql客戶端還有很多bug,並且使用yaml文件和本身的命令語法會導致用戶學習成本增加,所以在未來會放棄使用這個配置項,可以通過命令行模式來配置。

接着我們看到了在啟動sql-client時,可以通過-i參數來額外指定配置。

 

 

 配置方式如下:

-- set up the default properties
SET sql-client.execution.mode=batch;
SET parallism.default=10;
SET pipeline.auto-watermark-interval=500;
 
-- set up the catalog
CREATE CATALOG HiveCatalog WITH (
  'type' = 'hive'
);
 
USE CATALOG HiveCatalog;
 
-- register the table
CREATE IF NOT EXISTS TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);
 
CREATE IF NOT EXISTS TABLE pageviews_enriched (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews_enriched',
  ...
);

上面一大段配置,除了第一個hivecatalog的配置比較通用以外,其他的都是屬於定制化的配置,視為具體的業務應用場景時,自己配置。

那么我們就參照這個自己寫一個hivecatalog配置如下,文件為sql-conf.sql。

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/local/hive/conf',
    'hadoop-conf-dir'='/opt/local/hadoop/etc/hadoop/'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;

將該文件保存到flink/conf/文件下當做一個通用的配置吧。

然后啟動方式如下:

bin/sql-client.sh embedded -i conf/sql-conf.sql

還是沒有發現hive的文件,於是想到官網的連接hive的包如下:

 

 

 下載對應的包,安裝,后面遇到兼容性問題,修改包后即可解決。

 

 

 

 


 2. 操作步驟:


 

1. 下載

flink1.14.0的包,解壓縮。

2. 配置

配置系統環境變量/etc/profile和flink的配置文件flink-conf.yaml

/etc/profile 增加配置如下(這里默認jdk,haodoop都配置好):

#flink config
export HADOOP_CLASSPATH=`hadoop classpath`
export FLINK_HOME=/opt/local/flink
export PATH=$PATH:$FLINK_HOME/bin

flink-conf.yaml配置如下:

  # 修改了一個task可以使用2個slot

  taskmanager.numberOfTaskSlots: 2

# 增加一行
classloader.check-leaked-classloader: false

在bin/config.sh中第一行也添加了 以下環境變量

export HADOOP_CLASSPATH=`hadoop classpath`

 

3. 設置啟動配置

新增sql-conf.sql配置文件,配置hivecatalog

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/local/hive/conf',
    'hadoop-conf-dir'='/opt/local/hadoop/etc/hadoop/'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;

4. 下載相關依賴包

注意一共有4個,其中紅色方框中2個為必選(因為測試不夠深入,不知道其他兩個是否需要):

 

 

 注意: hive-exec-3.1.2.jar同樣存在guava和flink不兼容的問題,所以也是直接刪除里面的com.google文件夾。

 第一個包下載路徑:flink-sql-connector-hive-3.1.2_2.12-1.14.0.jar

官網地址如下:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/hive/overview/

 

由於要根據不同的地址下載不同的版本,並且要根據自己使用的scala版本來選擇不同的jar。這里給出這個鏈接的的ftp目錄可以自己選擇。

我的hive是3.1.2,但是由於給出的連接是scala2.11的,所以我順着這個地址找到2.12版本的如下:

https://repo.maven.apache.org/maven2/org/apache/flink/

最終的地址:這個包有46.4m大小。

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.0/

 其余三個下載方式:

可以在上面的maven網站找,也可以新建個項目,添加以下依賴后獲得。

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-3</artifactId>
            <version>3.1.1.7.2.9.0-173-9.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.version} 
        </artifactId>
            <version>${flink-version}</version>
            <scope>provided</scope>
        </dependency>

 

5. 解決包沖突問題

由於下載的這個 flink-sql-connector-hive-3.1.2_2.12-1.14.0.jar 以及 hive-exec-3.1.2.jar 包含的guava版本和hadoop版本有沖突,所以操作如下:

用壓縮軟件打開

找到com/google,然后把這個google全部刪除。(注意:不需要解壓縮,直接右鍵刪除)

刪除完后,關閉窗口,這時候只有43.4m了.

 2個包都同樣操作,然后將4個jar包都上傳到flink/lib文件夾下面。

 

 

 可選: 因為沒有測試更多的sql,所以不排除后面有依賴,先都放進去吧。

 

 6. 啟動環境

先啟動yarn-session

bin/yarn-session.sh -s 2 -jm 1024 -tm 2048 -nm test1 -d

然后再啟動sql-client,讓sql在yarn-session上執行:

bin/sql-client.sh embedded -i conf/sql-conf.sql -s yarn-session

 

注意看到success表示執行成功。

 

 

這時候可以在hadoop的yarn上看到這個任務 :

 

 

 

 

 

 

7. 測試DDL語句

 

 

 

官方其實在操作hive時,建議使用hive方言的,可以在命令行設置hive方言:

set table.sql-dialect=hive;

 測試簡單查詢:

select * from dt_dwd.dwd_stock_day_history limit 100;

說實在的,這界面比hive不是丑了一點點。。而是巨丑。

 

 測試統計查詢:(查詢浦發銀行每年收盤的最大價和最小價格)

select code,name,substr(dt,1,4) as yr,min(close) as min_close,max(close) as max_close
from dt_dwd.dwd_stock_day_history where code = '600000' group by code,name,substr(dt,1,4);

效率還可以:

 

最后次統計查詢耗時39s

 

 

 

 

 雖然 任務都能提交並且執行成功,但是還遺留了一個問題沒解決,那就是后台日志總是在報一些奇怪的錯誤。

就是當任務執行完成后,會顯示一些錯誤信息(並不影響結果)。

        at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74) [flink-sql-client_2.12-1.14.0.jar:1.14.0]
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (1357b58d9e46be54f7713850c26c681d)
        at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:917)
        at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:931)
        at org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:719)
        at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)

End of exception on server side>]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:532) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:512) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) ~[?:1.8.0_261]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_261]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_261]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_261]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_261]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_261]

目前百度到的信息碎片可能是任務執行結果的回執未收到,待后續再研究。

 

flink1.14.0的完整包或者依賴包可以通過百度網盤直接獲取:

鏈接:https://pan.baidu.com/s/1QNCFSRDJyUzJtAbdN7k9Fw
提取碼:pang

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM