【原創】大叔經驗分享(2)為什么hive在大表上加條件后執行limit很慢


問題重現

select id from big_table where name = 'sdlkfjalksdjfla' limit 100;

首先看執行計划:

hive> explain select * from big_table where name = 'sdlkfjalksdjfla' limit 100;

OK

STAGE DEPENDENCIES:

  Stage-0 is a root stage

 

STAGE PLANS:

  Stage: Stage-0

    Fetch Operator

      limit: 100

      Processor Tree:

        TableScan

          alias: big_table 

          Statistics: Num rows: 7497189457 Data size: 1499437891589 Basic stats: COMPLETE Column stats: NONE

          Filter Operator

            predicate: (name = 'sdlkfjalksdjfla') (type: boolean)

            Statistics: Num rows: 3748594728 Data size: 749718945694 Basic stats: COMPLETE Column stats: NONE

            Select Operator

              expressions: id (type: string)

              outputColumnNames: _col0

              Statistics: Num rows: 3748594728 Data size: 749718945694 Basic stats: COMPLETE Column stats: NONE

              Limit

                Number of rows: 100

                Statistics: Num rows: 100 Data size: 20000 Basic stats: COMPLETE Column stats: NONE

                ListSink

 

Time taken: 0.668 seconds, Fetched: 23 row(s)

可見只有一個stage,即Fetch Operator,再看執行過程:

   java.lang.Thread.State: RUNNABLE

        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)

        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

        - locked <0x00000006c1e00cd8> (a sun.nio.ch.Util$2)

        - locked <0x00000006c1e00cc8> (a java.util.Collections$UnmodifiableSet)

        - locked <0x00000006c1e00aa0> (a sun.nio.ch.EPollSelectorImpl)

        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

        at org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)

        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)

        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)

        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)

        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)

        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)

        at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:186)

        at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:146)

        - locked <0x000000076b9bccb0> (a org.apache.hadoop.hdfs.RemoteBlockReader2)

        at org.apache.hadoop.hdfs.BlockReaderUtil.readAll(BlockReaderUtil.java:32)

        at org.apache.hadoop.hdfs.RemoteBlockReader2.readAll(RemoteBlockReader2.java:363)

        at org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1072)

        at org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1000)

        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1333)

        at org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:78)

        at org.apache.hadoop.fs.FSDataInputStream.readFully(FSDataInputStream.java:107)

        at org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readStripeFooter(RecordReaderUtils.java:166)

        at org.apache.orc.impl.RecordReaderImpl.readStripeFooter(RecordReaderImpl.java:239)

        at org.apache.orc.impl.RecordReaderImpl.beginReadStripe(RecordReaderImpl.java:858)

        at org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:829)

        at org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:986)

        at org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1021)

        at org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1057)

        at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.ensureBatch(RecordReaderImpl.java:77)

        at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.hasNext(RecordReaderImpl.java:89)

        at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$OrcRecordReader.next(OrcInputFormat.java:231)

        at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$OrcRecordReader.next(OrcInputFormat.java:206)

        at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:488)

        at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:428)

        at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:146)

        at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2098)

        at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:252)

        at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:183)

        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:399)

        at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:776)

        at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:714)

        at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:641)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:497)

        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)

        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

可見並沒有提交遠程job而是在本地直接做table scan,如果是在一個大表上加復雜查詢條件再做limit就會很慢,因為極有可能需要全表掃描之后才能收集到所需結果(limit條數),這也是為什么對大表不加條件直接limit反而很快的原因。

如果想修改這種行為,需要修改如下配置:

hive.fetch.task.conversion

Some select queries can be converted to a single FETCH task, minimizing latency. Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incur RS – ReduceSinkOperator, requiring a MapReduce task), lateral views and joins.

Supported values are none, minimal and more.
0. none: Disable hive.fetch.task.conversion
1. minimal: SELECT *, FILTER on partition columns (WHERE and HAVING clauses), LIMIT only
2. more: SELECT, FILTER, LIMIT only (including TABLESAMPLE, virtual columns)

這個配置會嘗試將query轉換為一個fetch任務;

默認為more,將其改為none再執行上邊的sql,就會提交到yarn上執行

set hive.fetch.task.conversion=none;

 相關的配置還有一個

hive.fetch.task.conversion.threshold

Input threshold (in bytes) for applying hive.fetch.task.conversion. If target table is native, input length is calculated by summation of file lengths. If it's not native, the storage handler for the table can optionally implement the org.apache.hadoop.hive.ql.metadata.InputEstimator interface. A negative threshold means hive.fetch.task.conversion is applied without any input length threshold.

默認為1073741824 (1 GB)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM