本問使用時間倒序排列記錄面試題
kafka 單個 topic數據的有效時間,自定義
/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name testtopic --entity-type topics --add-config retention.ms=86400000
retention.ms=86400000 為一天,單位是毫秒。
如果數據沒有立即刪除,執行下面./kafka-topics.sh --zookeeper localhost:2181 --alter --topic testtopic --config cleanup.policy=delete
flume的搭建
配置flume-env.sh配置文件
修改JDK路徑
在conf/ 下進行配置腳本
分發:
scp -r /opt/module/ root@hadoop102:/usr/
dws與dwd的區別
dws:主題層,用於提供后續的業務查詢,OLAP分析,數據分發等
dwd:明細層,
zk的選舉
切片和切塊的區別
1、物理切塊:
物理切塊是HDFS對數據進行存儲時,會存儲3個副本到從節點上進行分布式存儲,保證存儲數據的高可用性;需要對元數據進行物理切塊,每個block塊默認128M。
2、邏輯切片:
split是一種邏輯切片,是MapReduce里的概念,用對象來封裝每一個切片的信息,是在數據處理前進行任務分發時對
任務切片邏輯確定maptask數量,每一個輸入文件至少有一個切片;每一個切片運行一個maptask。
3、物理切塊和邏輯切片大小:
split的大小在默認的情況下和HDFS的block切塊大小一致,為了是MapReduce處理的時候減少由於split和block之間大小不一致導致的網絡傳輸下載。
checkpoint 的機制
為了使 Flink 的狀態具有良好的容錯性,Flink 提供了檢查點機制 (CheckPoints) 。通過檢查點機制,Flink 定期在數據流上生成 checkpoint barrier ,當某個算子收到 barrier 時,即會基於當前狀態生成一份快照,然后再將該 barrier 傳遞到下游算子<期間,當前算子會暫停自己的數據處理過程>,下游算子接收到該 barrier 后,也基於當前狀態生成一份快照,依次傳遞直至到最后的 Sink 算子上。當出現異常后,Flink 就可以根據最近的一次的快照數據將所有算子恢復到先前的狀態。
JobManager端的 CheckPointCoordinator向 所有SourceTask發送CheckPointTrigger,Source Task會在數據流中安插CheckPoint barrier
Coordinator:協調員
Trigger:觸發器
如果檢查點的持續時間超過了設定的超時時間,CheckPointCoordinator就會任務本次CheckPoint失敗。會把這次的CheckPiont 產生的所有狀態數據刪除
checkpoint中保存的是什么信息
Checkpoint是通過給程序快照的方式使得將歷史某些時刻的狀態保存下來,當任務掛機以后,默認從最近一次保存的完整快照處進行恢復任務。
barrier對齊:
- 一旦Operator從輸入流接收到CheckPoint barrier n,它就不能處理來自該流的任何數據記錄,直到它從其他所有輸入接收到barrier n為止。否則,它會混合屬於快照n的記錄和屬於快照n + 1的記錄;
- 接收到barrier n的流暫時被擱置。從這些流接收的記錄不會被處理,而是放入輸入緩沖區。
- 上圖中第2個圖,雖然數字流對應的barrier已經到達了,但是barrier之后的1、2、3這些數據只能放到buffer中,等待字母流的barrier到達;
- 一旦最后所有輸入流都接收到barrier n,Operator就會把緩沖區中pending 的輸出數據發出去,然后把CheckPoint barrier n接着往下游發送
- 這里還會對自身進行快照;之后,Operator將繼續處理來自所有輸入流的記錄,在處理來自流的記錄之前先處理來自輸入緩沖區的記錄。
barrier不對齊?
checkpoint 是要等到所有的barrier全部都到才算完成
barrier不對齊就是指當還有其他流的barrier還沒到達時,為了不影響性能,也不用理會,直接處理barrier之后的數據。等到所有流的barrier的都到達后,就可以對該Operator做CheckPoint了;
為什么要進行barrier對齊?不對齊到底行不行?
Exactly Once時必須barrier對齊,如果barrier不對齊就變成了At Least Once<至少一次>;
第一種場景計算PV,kafka只有一個partition,精確一次,至少一次就沒有區別?《沒有區別,因為barrier是多個轉一個》
為了下游盡快做CheckPoint,所以會先發送barrier到下游,自身再同步進行快照;這一步,如果向下發送barrier后,自己同步快照慢怎么辦?下游已經同步好了,自己還沒?
答: 可能會出現下游比上游快照還早的情況,但是這不影響快照結果,只是下游快照的更及時了,我只要保障下游把barrier之前的數據都處理了,並且不處理barrier之后的數據,然后做快照,那么下游也同樣支持精確一次。這個問題你不要從全局思考,你單獨思考上游和下游的實例,你會發現上下游的狀態都是准確的,既沒有丟,也沒有重復計算。這里需要注意一點,如果有一個Operator 的CheckPoint失敗了或者因為CheckPoint超時也會導致失敗,那么JobManager會認為整個CheckPoint失敗。失敗的CheckPoint是不能用來恢復任務的,必須所有的算子的CheckPoint都成功,那么這次CheckPoint才能認為是成功的,才能用來恢復任務;
state和CheckPoint的保存位置:
默認情況下,State 會保存在 TaskManager 的內存中,CheckPoint 會存儲在 JobManager的內存中。State 和 CheckPoint 的存儲位置取決於 StateBackend 的配置。Flink 一共提供了 3 種 StateBackend 。 包 括 基 於 內 存 的 MemoryStateBackend 、 基 於 文 件 系 統 的FsStateBackend,以及基於 RockDB 作為存儲介質的 RocksDBState-Backend。
- 是否刪除 Checkpoint 中保存的數據:
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
設置為 RETAIN_ON_CANCELLATION:表示一旦 Flink 處理程序被 cancel 后,會保留CheckPoint 數據,以便根據實際需要恢復到指定的 CheckPoint。
設置為 DELETE_ON_CANCELLATION:表示一旦 Flink 處理程序被 cancel 后,會刪除CheckPoint 數據,只有 Job 執行失敗的時候才會保存 CheckPoint
flink內存模型
堆內內存和堆外內存
memoryManager的內存大小
flink任務的提交流程
flink 的幾個組件
JobManager
TaskManager
Dispather
ResourceManager
flink的resourceManager和yarn的resourceManager的區別
join 的開窗
flink的join開窗有幾種
flink的版本
可視化報表所用的組件
topN指標的統計
時序數據庫
數據分布
容錯機制
資源管理
各種API
DataSetAPI
DataStream API
Table API
Flink ML,Flink 的機器學習庫
Flink和Spark Streaming 的區別
Flink 是標准的實時處理引擎,基於事件驅動,而Spark Streaming是微批的模型
Flink根據用戶提交的代碼生成StreamGraph,經過優化生成JobGraph,然后提交給JobManager進行處理,JobManager會根據JobGraph生成ExecutionGraph,ExecutionGraph是Flink調度最核心的數據結構,JobManager根據ExecutionGraph對Job進行調度。
Flink的時間語義:
事件時間
注入時間
處理時間
watermark機制
如何保證Flink的sink輸出數據的不重復
flink如何保證端到端精准一次性消費
輸出到clickhouse保證一次精准嗎?
冪等操作,然后clickhouse有表引擎支持
MySQL和Kafka支持端到端的精准一次,因為有事務
exactly-once
flink數據輸出到clickhouse
flink背壓算法
動態反饋/自動反壓
實時數倉每一層是干啥的,注意點,細節處理
ck用replacingMergerTree
支持冪等性
Flink從Kafka讀取數據,把處理結果寫入Redis/HBase/Kafka,如何保證Exactly-once ?
用redis的hash去重
hbase會有相同rowkey的版本,刷寫的時候合並
需要開啟sink的wal
sink端保證數據准確有兩種方式,一是冪等操作,二是進行一個事務
如何確保狀態擁有精確一次的容錯保證?
如何在分布式場景下替多個擁有本地狀態的算子產生一個全域一致的快照?
如何在不中斷運算的前提下產生快照?
Flink的兩階段提交
HBase的熱點問題
hbase 建立二級索引
無答案版本: https://www.cnblogs.com/Anxc/p/15225294.html
flink 面試題參考: https://blog.csdn.net/wypblog/article/details/103900577
數據治理:https://mp.weixin.qq.com/s/DmrXAMXJNuXsWflgGcWCnA
013
Kafka的優勢如下:
高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒;
可擴展性:kafka集群支持熱擴展;
持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失;
容錯性:允許集群中節點故障(若副本數量為n,則允許n-1個節點故障);
高並發:支持數千個客戶端同時讀寫。
flume日志采集系統的優勢如下:
高可靠性:使用事務的辦法來保證event的可靠傳遞
高可用性:
分布式:可以有多個agent的聯合使用,體現如下圖:
failover 故障轉移
flinkCDC和普通的CDC 的區別
CDC:CDC是Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲數據庫的變動
而flinkCDC出現的意義是:可以將讀取數據和加工數據一並完成
CDc:CDC是 Change data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲數據庫的變動
而 flinkCDo出現的意義是:可以將讀取數據和加工數據一並完成
hbase中有多少數據量 <可以在HBase web ui界面中看到數據量的多少>
yarn 的 applicationMaster 的作用
有一個DataNode突然 宕機以后該怎么辦
sqoop導出hive數據到MySQL時,需要提前在MySQL中創建表嘛?
需要,導出模式有增量,全量,更新<一般不要用>
oracle好像特殊,就說只用過導出到MySQL。
sqoop導數據 mysql -> HDFS 需要提前建表嘛?
不需要,因為我們是分區表dt,手動 load data path 'hdfs://...'
阿里一面:
一面 問你項目 包括 數倉搭建的 構建流程 還有 離線的流程跟 里面的 某個框架 flume kafka 還有 存儲介質 有哪些 分別 有什么區別 還有就是 flink項目 包括精准一次性 checkpoint 包括 里面 某些算子的使用 還有就是 你們公司你負責什么模塊 具體做什么 人員分配等等
rowkey如何設計:
redis的多路復用:
其實它和
select
版本的區別簡單來說,epoll
是將你需要監聽的列表交給系統維護, 這樣當有新數據來的時候, 系統知道這是你要的, 等你下次來拿的時候, 直接給你了, 少去了上面的系統遍歷. 同時, 也沒有select
查詢時那一大堆參數, 每次都只調用一次進行綁定即可.那系統是怎么知道新數據的到來呢? 這里靠的是事件中斷, 忘得差不多了, 回頭再看看.
012
spark的序列化 kryo
clickhouse的空值問題
建表時字段設置非空 : name Nullable(String)
查詢時空值:使用case when,不能使用 coalesce(x,...) 返回第一個非NULL的值(類型轉化問題)
兩個表關聯的時候 使用 where coalesce(b.id,0) = 0
collect_set()
select username, collect_list(video_name) from t_visit_video group by username ;
011
kafka如何存儲非結構化數據
維度數據為什么使用HBase來存儲?<Rowkey的隨機讀寫的支持比較好>
HBase+ kyLin = ClickHouse 很像
自定義 Yarn的ApplicationMaster
yarn application -kill 殺掉 某個yarn任務
partition by和 distribute by 的區別
partition by [key] order by [key] 只能在窗口函數中使用,而distribute by [key] sort by 在窗口函數和select 中都可以使用。
窗口函數中兩者是沒有區別的
where 后面不能用partition by
集群遷移 distcp
hadoop distcp hdfs://master1:8020/foo/bar hdfs://master2:8020/bar/foo
數據遷移工具
sqoop,DataX,自定義
010
1.hadoop和spark調優
2.項目組有多少人,每個人都干什么活
3.你哪方面有亮點
4.留存率
5.未來三年規划
1.datanode壞了,文件會文件怎么辦?
在hadoop集群中假如一個datanode掛掉,后面會發生什么。
009
Kettle : ETL工具
DataX : 數據同步工具,sqoop
DataWorks +MaxCompute : 阿里雲提供的大數據解決方案
Kafka 語義:
至少一次:at-least-once
至多一次:at-most-once
僅一次:exactly-once
一致性hash算法
hash(圖片名稱)% N
Kafka生產數據是按照批次進行發送的,不是按照一條一條進行發送的。
https://www.cnblogs.com/sniffs/p/13159974.html
008
1.為什么要用日志服務器把日志存起來?日志是流式的為什么不用時序數據庫?
InfluxDB(單機版免費,集群版收費)最成熟,Kairosdb(底層使用Cassandra),OpenTsdb(底層使用HBase),beringei(Facebook開源),TimeScaleDB(底層基於PostgreSQL),TSDB(百度開源),HiTSDB(阿里開源,底層是PostgreSQL)。
2.七天內連續三天指標?
3.java和scala哪個更強,寫過幾千行代碼的邏輯嗎?
4.數據量多大?100G?太太小。
5.用過哪些壓縮?壓縮比?區別?
中國華能。
007
新架構: Dbus+wormhole+spark
個人了解的話,國內開發的,用的人也很少(類實時,中間使用kafka+spark Streaming )
https://bridata.github.io/DBus/more-system-architecture.html
https://blog.csdn.net/weixin_41608840/article/details/80989926
006
實時項目中數據為什么存到hbase中不是redis中
在實時項目中,我們最想要保證的是維度數據的數據可靠性,以及存儲量的大小,同時以kv類型進行存儲,所以選擇了hbase。
比較方式 | Redis | HBase |
---|---|---|
數據類型 | 支持KV類型,List、Set | 支持KV類型... |
部署難易 | 部署非常簡單 | 部署需要依賴hadoop、zookeeper等服務 |
數據可靠性 | Redis采用的是異步復制數據,在failover時可能會丟失數據 | HBase采用WAL,先記錄日志再寫入數據,理論上不會丟失數據 |
應用場景 | Redis比較適合做緩存 | HBase適合做大數據的持久存儲 |
數據類型:都支持KV類型。但Redis還支持List、Set
數據量:Redis支持的數據量受內存限制,HBase沒有這個限制,可以存儲遠超內存大小的數據。
部署難易:HBase部署需要依賴hadoop、zookeeper等服務,而Redis的部署非常簡單。
數據可靠性:HBase采用WAL,先記錄日志再寫入數據,理論上不會丟失數據。而Redis采用的是異步復制數據,在failover時可能會丟失數據。
應用場景:HBase適合做大數據的持久存儲,而Redis比較適合做緩存。如果數據丟失是不能容忍的,那就用只能用HBase;
如果需要一個高性能的環境,而且能夠容忍一定的數據丟失,那完全可以考慮使用Redis。
綜上所述,在實時項目中,我們最想要保證的是維度數據的數據可靠性,以及存儲量的大小,同時以kv類型進行存儲,所以選擇了hbase
https://www.sohu.com/a/465715859_185201
Redis 是近幾年最常用的緩存數據庫,讀與寫的操作都在內存中進行,其速度響應非常快,AOF/RDB 保存的相關數據全會加載到我們機器的內存中,從而導致 Redis 並不適合保存大量的數據,畢竟內存還是相對有限。
Hbase 適合需對數據進行隨機讀操作或者隨機寫操作、大數據上高並發操作,比如每秒對 PB 級數據進行上千次操作以及讀寫訪問均是非常簡單的操作。
flink和sparkstreaming的區別,為什么用flink
sparkstreaming 微批
Flink 流處理 基於事件觸發機制
sparkstreaming是基於微批處理的,所以采用DirecDstream的方式根據計算出的每個partition要取數據的offset范圍,
拉取一批數據形成rdd進行批量處理,而且該rdd和kafka的分區是一一對應的
Flink是真正的流處理,他是基於事件觸發機制進行處理,在kafkaConsumer拉去一批數據后,Flink將其經過處理之后,
變成這個record發送的事件觸發式的流處理
另外,FLink支持動態發現新增topic或者新增partition,sparkstreaming和0.8版本的kafka結合是不支持的,
后來和0.10版本的kafka結合的時候支持了
如何保證kafka 的消費有序
生產有序,存儲有序(一個topic的一個分區內),才能保證消費有序
kafka的集群搭建,依據什么搭建的?
奇數台,與zookeeper選舉有關
2 * (生產者峰值生產速率 * 副本 / 100) + 1
zookeeper 集群[HBase,Kafka]設置奇數台的原因
https://blog.csdn.net/qq_35260875/article/details/106148569
ZooKeeper集群最好使用奇數個服務器,即2n+1個服務器,這樣整個Zookeeper集群最多可以容忍n台服務器宕機而保證依然提供服務。
Kafka調優,spark調優
hbase的rowkey設計規則和如何設計
規則:長度原則(越短越好),散列原則,唯一原則(唯一性)
如何設計:
1.生成隨機數、hash、散列值
2.字符串反轉
3.字符串拼接
1.hadoop、spark內存溢出
2.壓縮格式
3.spark數據傾斜
4.spark聚合類算子
5.flink海量數據去重
6.ES的搜索過程
7.最短路徑算法
005
萬順叫車:
Kafka能否保證單分區有序,retry后會不會亂序
flinktable的sink模式有幾種
Upsert,Retract,Append
https://blog.csdn.net/yscoder/article/details/113731679
flink多久保存一次保存點
flink從保存點恢復時算子的唯一標識怎么確定的
join,intervaljoin,union
join:兩個數據類型一致
intervaljoin:按照指定字段以及右流相對於左流偏移的時間區間進行關聯,只支持事件時間
union:多個 數據流 (但數據類型要保證一致)
flink從上游數據源消費到的重復數據怎么處理的
flink內存溢出,是怎么查看的
004
hive存儲格式
1、TextFile
2、RCFile
3、ORCFile
4、Parquet
1、TextFile
TextFile文件不支持塊壓縮,默認格式,數據不做壓縮,磁盤開銷大,數據解析開銷大。這邊不做深入介紹。
2、RCFile
Record Columnar的縮寫。是Hadoop中第一個列文件格式。能夠很好的壓縮和快速的查詢性能,但是不支持模式演進。通常
寫操作比較慢,比非列形式的文件格式需要更多的內存空間和計算量。
RCFile是一種行列存儲相結合的存儲方式。首先,其將數據按行分塊,保證同一個record在一個塊上,避免讀一個記錄需要讀
取多個block。其次,塊數據列式存儲,有利於數據壓縮和快速的列存取。
3、ORCFile
存儲方式:數據按行分塊 每塊按照列存儲 ,壓縮快 快速列存取,效率比rcfile高,是rcfile的改良版本,相比RC能夠更好的壓
縮,能夠更快的查詢,但還是不支持模式演進。
4、Parquet
Parquet能夠很好的壓縮,有很好的查詢性能,支持有限的模式演進。但是寫速度通常比較慢。這中文件格式主要是用在
sqoop優化
batch :指定使用批處理模式,可以多條相關的SQL一起執行
boundary-query:解決數據傾斜問題(新生成一條主鍵的形式去處理)
fetch-size:sqoop分批次去導入數據,增大導入條數
fetch-size:增大並行的map數(默認為4)
split-by:指定切割單元的列名
batch
語法:--batch,指示使用批處理模式執行底層的SQL語句。在導出數據時,該參數能夠將相關的SQL語句組合在一起批量執行,也可以使用有效的API在JDBC接口中配置批處理參數
Dsqoop.export.records.per.statement
指定批處理數據條數,可和batch聯合使用
boundary-query
可解決數據傾斜問題
–boundary-query: select 1 as MIN , sum(1) as MAX from table where xxx
具體原理就是通過ROWNUM() 生成一個嚴格均勻分布的字段,然后指定為分割字段
指定導入數據的范圍值。當僅使用split-by參數指定的分隔列不是最優時,可以使用boundary-query參數指定任意返回兩個數字列的查詢。它的語法如下:--boundary-
query select min(id), max(id) from<tablename>。在配置boundary-query參數時,查詢語句中必須連同表名一起指定min(id)和max(id)。如果沒有配置該參數,默認時Sqoop使用select
min(<split-by>), max(<split-by>) from<tablename>查詢找出分隔列的邊界值。
fetch-size
導入數據時,指示每次從數據庫讀取的記錄數。使用下面的語法:--fetch-size=<n>,其中<n>表示Sqoop每次必須取回的記錄數,默認值為1000。可以基於讀取的數據量、可用的內存和帶寬大小適當增加fetch-size的值。某些情況下這可以提升25%的性能。
num-mappers
該參數的語法為--num-mappers <number ofmap tasks>,用於指定並行數據導入的map任務數,默認值為4。應該將該值設置成低於數據庫所支持的最大連接數。
split-by
該參數的語法為--split-by <column name>,指定用於Sqoop分隔工作單元的列名,不能與--autoreset-to-one-mapper選項一起使用。如果不指定列名,Sqoop基於主鍵列分隔 工作單元。
presto連過哪些數據庫
Hive mysql
kylin怎么設計cube
1、設置Cube名、描述信息等
2、設置Cube依賴的表模型(星狀模型,一個事實表和可選的多個維度表)
3、設置維度(維度有幾種類型這里不再討論,創建完之后就可以暫時性的忽略這幾種不同的類型,都把它當做普通的維度就可以了)
4、設置度量(每一個度量包括列和聚合函數,列只能是事實表上的列)
5、設置filter條件(用於對表中的數據進行過濾)
6、設置增量更新的信息(設置增量列和起始時間,該列必須是時間格式列)
7、高級設置(設置維度組、RowKey等)
服務器選型,fume、 kafka台數、數據量
Flume:根據業務自己看 一般三個
Kafka: 2 * (生產者峰值生產速率 * 副本 / 100) + 1 = 3
mysql怎么建索引
主鍵索引,唯一索引,普通索引,全文索引
1.添加PRIMARY KEY(主鍵索引)
mysql>ALTER TABLE `table_name` ADD PRIMARY KEY ( `column` )
2.添加UNIQUE(唯一索引)
mysql>ALTER TABLE `table_name` ADD UNIQUE (
`column`
)
3.添加INDEX(普通索引)
mysql>ALTER TABLE `table_name` ADD INDEX index_name ( `column` )
4.添加FULLTEXT(全文索引)
mysql>ALTER TABLE `table_name` ADD FULLTEXT ( `column`)
redis數據類型
string 字符串(可以為整形、浮點型和字符串,統稱為元素)
list 列表(實現隊列,元素不唯一,先入先出原則)
set 集合(各不相同的元素)
hash hash散列值(hash的key必須是唯一的)
sort set 有序集合
內部表外部表的區別
外部表不刪除元數據,內部表刪除元數據
EXTERANL: 外部表
1、在導入數據到外部表,數據並沒有移動到自己的數據倉庫目錄下(如果指定了location的話),也就是說外部表中的數據並不是由它自己來管理的!而內部表則不一樣;
2、在刪除內部表的時候,Hive將會把屬於表的元數據和數據全部刪掉;而刪除外部表的時候,Hive僅僅刪除外部表的元數據,數據是不會刪除的!
3. 在創建內部表或外部表時加上location 的效果是一樣的,只不過表目錄的位置不同而已,加上partition用法也一樣,只不過表目錄下會有分區目錄而已,load data local inpath直接把本地文件系統的數據上傳到hdfs上,有location上傳到location指定的位置上,沒有的話上傳到hive默認配置的數據倉庫中。
外部表相對來說更加安全些,數據組織也更加靈活,方便共享源數據。
建表語句
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name -- EXTERANL: 外部表
[(col_name data_type [COMMENT col_comment], ...)] -- 列名 列類型 列描述信息 ....
[COMMENT table_comment] -- 表描述信息
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] -- 創建分區表指定分區字段 分區列名 列類型
[CLUSTERED BY (col_name, col_name, ...) -- 創建分桶表指定分桶字段 分桶列名
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] -- 指定分桶數
[ROW FORMAT delimited fields terminated by ... ] -- 指定一條數據 字段與字段的分割符
[collection items terminated by ... ] -- 指定集合元素與元素的分割符
[map keys terminated by ... ] -- 指定map的kv的分割符
[STORED AS file_format] -- 指定文件存儲格式,默認為 textfile
[LOCATION hdfs_path] -- 指定表在hdfs中對應的路徑
[TBLPROPERTIES (property_name=property_value, ...)] -- 指定表的屬性
[AS select_statement] -- 基於某個查詢建表
寬依賴窄依賴
常見寬依賴:repartition,groupbyKey,ReduceByKey...
窄依賴:
指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應於一個子RDD的分區,和兩個父RDD的分區對應於一個子RDD 的分區。圖中,map/filter和union屬於第一類,對輸入進行協同划分(co-partitioned)的join屬於第二類。
寬依賴:
指子RDD的分區依賴於父RDD的所有分區,這是因為shuffle類操作,如圖中的groupByKey和未經協同划分的join。
單例模式
該類負責創建自己的對象,同時確保只有單個對象被創建
這種模式涉及到一個單一的類,該類負責創建自己的對象,同時確保只有單個對象被創建。這個類提供了一種訪問其唯一的對象的方式,可以直接訪問,不需要實例化該類的對象。
意圖:保證一個類僅有一個實例,並提供一個訪問它的全局訪問點。
主要解決:一個全局使用的類頻繁地創建與銷毀。
何時使用:當您想控制實例數目,節省系統資源的時候。
如何解決:判斷系統是否已經有這個單例,如果有則返回,如果沒有則創建。
關鍵代碼:構造函數是私有的
雙重校驗鎖:
volatile + synchronized(Singleton3.class)
public class Singleton3 {
private static volatile Singleton3 instance;
private Singleton3() {}
public static Singleton3 getInstance() {
//首先判斷是否為空
if(instance==null) {
//可能多個線程同時進入到這一步進行阻塞等待
synchronized(Singleton3.class) {
//第一個線程拿到鎖,判斷不為空進入下一步
if(instance==null) {
instance = new Singleton3();
}
}
}
return instance;
}
}
spark輸出10個文件想輸出5個怎么調整
df.coalesce(1).saveAsTextFile("D://1.txt")
DataFrame輸出結果保存為文件時,尤其是根據某個條件分區時,可以控制輸出文件的個數,從而減少小文件的個數
hve分區為什么能提高效率
主要是以縮小數據查詢范圍,提高查詢速度和性能的
沒有分區的存在,那么每次查詢Hive將會進行全表掃描
分區列的值將表划分為segments(文件夾)
查詢時使用分區列和常規列類似
查詢Hive自動過濾不用於提高性能的分區
主要是以縮小數據查詢范圍,提高查詢速度和性能的
兩數相加和為100用java實現
...
hashmap、 hashtable哪個是線程安全的
HashTable 和 ConcurrentHashMap 是線程安全的
HashTable
Hashtable中的方法是同步的,而HashMap中的方法在缺省情況下是非同步的。在多線程並發的環境下,可以直接使用Hashtable,hashtable的實現方法里面添加了synchronized關鍵字來確保線程同步,但是要使用HashMap的話就要自己增加同步處理了
哈希值的使用不同,HashTable直接使用對象的hashcode。而HashMap重新計算hash值。
003
中科軟ETL:
Reduce輸出端是小文件如何設置(配置參數)
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress=true;
SET hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
SET mapreduce.input.fileinputformat.split.maxsize=256000000;
SET mapreduce.input.fileinputformat.split.minsize=1;
SET mapreduce.input.fileinputformat.split.minsize.per.node=256000000;
SET mapreduce.input.fileinputformat.split.minsize.per.rack=256000000;
SET hive.merge.mapredfiles=true;
SET hive.merge.mapfiles=true;
SET hive.merge.smallfiles.avgsize=256000000;
通過執行上述參數,可以將hive計算完成后的輸出文件改為大文件,避免輸出大量小文件。
Linux查看磁盤掛載
指令df -h和fdisk -l
Hive的優化
MR小文件處理
MR流程
Linux命令:find
常用參數
find -name april* 在當前目錄下查找以april開始的文件
find /home -size +512k 查大於512k的文件
find . -type f 查當前目錄下的所有普通文件
Linux命令:awk
使用
df -k | awk ‘{print $1}’ | grep -v ’none’
Linux獲取參數xargs n1
shell獲取命令返回結果
$?的作用
hadoop 常用上傳下載命令
下載文件到本地 hadoop fs -get HDFS文件路徑 本地路徑
合並文件並下載到本地 hadoop fs getmerge HDFS文件路徑文件1 HDFS文件路徑文件2 本地路徑文件1
上傳文件 hadoop fs -put 本地文件 HDFS路徑
場景題:
hadoop 的hdfs 中有一個tmp文件夾下,有若干個文件夾和文件,
要求指刪除該文件夾下小於25MB的文件,不刪除原有的目錄
1
002
1.sql語句的優化,如何提高查詢效率。
不要有超過5個以上的表連接(JOIN)
考慮使用臨時表或表變量存放中間結果。
少用子查詢
between替換in
exist替換in
left join替換in
能用inner join連接盡量使用inner join連接( left join、right join、子查詢)
2.在sql語句運行中,什么情況下會導致索引失效。
建立聯合索引:(id,name,age)
可以用到的索引:[id,name.age],[id],[id,name]
name like 'qwe%' like會導致索引失效,全表掃描
3.刪除表中的數據,有幾種方式。
4.如何創建索引
5.創建索引,對於字段有什么樣的要求
6.索引和約束有什么區別,總共有索引,分別使用在什么樣的場景。
6.如何創建存儲過程
7.存儲過程游標有那幾種,它們由什么區別
8.存儲過程由幾種循環方式
9.查詢結果,如果進行分組和排序,分組和排序運行的優先級是怎么樣的
10.left join和inner join的區別
11.union all和union的區別用法
001
自我介紹:
姓名,畢業學校,上一家公司名稱,公司產品,職位,項目簡單講一下。
HDFS讀流程
MR優化
Gzip,snappy的壓縮
HBase的Rowkey的設計
Hive的優化
flink的窗口
flink的時間語義
CEP的邏輯
口述一個flink去重數據
講一下第一個項目(離線)
為什么使用HBase存儲維度數據
實時數倉的幾個指標說幾個
雙流join指定的時間是多少,水位線設置的超時時間是多少
yarn調度器
kafka的副本隊列
kafka的flower如果延遲過高會怎么樣(OSR和ISR)
kafka的默認清除策略(什么時候會觸發)
kafka高級API《自動,手動》<沒記全>
HBase寫數據(WAL的刷寫大小)
HBase中的RowKey設計原則
長度原則
散列原則
唯一原則
1.生成隨機數、hash、散列值
2.字符串反轉
3.字符串拼接(20170524000001_a12e )
HBase的列族為什么建立的很少
Region Split 到達時(就是如果一個表達到100行數據就會分裂),
分裂時所有的列族都會分裂,產生大量小文件
Flume Channel 有幾種,區別
File Channel 100萬event(配置多路徑,增大吞吐量) 磁盤
Memory Channel 100個 內存
Kafka Channel 存於Kafka ,kafka Channel > memory Channel + Kafka sink
flink保存狀態變量的方式
MemoryStateBackend
FsStateBackend
RocksDBStateBackend
上面這三種
//設置檢查點相關配置
// 開啟檢查點,精准一次性消費
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 設置檢查點超時時間
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 設置job 取消之后,檢查點是否保留 (保留)
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 設置狀態后端, 內存|文件系統|RocksDB
env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/flink/checkpoint"));
// 指定操作 HDFS 的用戶
System.setProperty("HADOOP_USER_NAME","root");
flink常用API
addSource(new SourceFunction<Integer>(){})
flatMap()
.map(r -> Tuple2.of(r,1))
.broadcast()//廣播
富函數(Rich):
RichFilterFunction<Integer>()
RichMapFunction<Integer, Long>()
addSink(new RichSinkFunction<Tuple2<Integer, Integer>>()
.keyBy(r -> r.f0);