1.講講你做的過的項目, 項目里有哪些難點重點呢?
kafkaDirect ES /hive kafka producer
難點值得一提的有兩點:
1.rdd中用到外部變量的時候如何處理
2.廣播變量的更新 rdd處理kafka讀過來的數據,這些數據引用外部的class來進行規則解析,規則的更新后怎么辦?
2.講講多線程吧, 要是你,你怎么實現一個線程池呢
Executor接口
提供了execute()方法將任務提交和任務執行分離
ExecutorService接口
繼承Executor,增加了對線程池中任務生命周期的管理,可強制取消正在執行的任務,拒絕再接受任務。
提供了submit()方法來擴展Executor.execute(),使任務執行有返回值。
ScheduledExecutorService 接口
繼承ExecutorService接口,在其基礎上增加了定時執行任務的功能
ThreadPoolExecutor 完整的實現了一個線程池
ScheduledThreadPoolExecutor
擁有定時調度任務功能的線程池
Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池
Executors.newFixedThreadPool(int); //創建固定容量大小的緩沖池
class MyThread extends Thread {
private int i = 0;
@Override
public void run() {
for (i = 0; i < 10; i++) {
System.out.println(i);
}
}
}
class MyRunnable implements Runnable {
private int i = 0;
@Override
public void run() {
for (i = 0; i < 10; i++)
System.out.println(i);
}
}
3.講一下Mapreduce或者hdfs的原理和機制。map讀取數據分片。
hdfs nn dn
nn 進行元數據的管理 記錄管理的文件分成了哪些塊 這些塊在哪些機器上(這些是DN匯報上來的)
dn 真正存儲數據的節點 啟動后 會向NN注冊 並匯報他節點上文件block.
文件是分片的, 切割成不同的塊(BLOCK 128M),每一個塊默認有三個副本,副本的分布受機架感知影響,在本節點,同機架,不同機架這樣分布。
客戶端讀取文件的時候,默認返回離其最近的一個節點來返回數據。
MR 的原理 MAP REDUCE 兩種操作
map file split 的數量有關,例如文件有三個分片,而且文件本身是可分片的,就會有三個MAP 任務來處理
處理完,就會根據PARTITIONER 把數據分發到REDUCE
reduce (KEY,ITERATOR) 數據SHUFFLE過來后進行聚合處理。
4.shuffle 是什么? 怎么調優?
shuffle operation
SPARK內特定的操作會觸發SHUFFLE,如REDUCEBYKEY之類的操作。
它會觸發跨執行器甚至機器之間數據的復制。
REDUCEBYKEY 會觸發一個操作,對於某一個KEY的數據都要收集到特定的執行器中處理,
這就需要掃描所有的PARTITIONS。
性能影響:
因為SHUFFLE 會造成DISK/IO,數據序列化和網絡IO,所以它的代價是昂貴的。
SHUFFLE會消耗大量的堆內存,因為他使用內存數據結構來組織數據,當數據大到內存承受不了
的時候,數據會被吐到磁盤上,而且增大了GC的負擔。
SHUFFLE也在磁盤上生成了大量的臨時文件,這些文件會被保留,一直到相對應的RDD不再被使用。
這意味着長服的作業會消耗大量的磁盤空間,這個臨時存儲目錄是在SPARK.LOCAL.DIR.
就是Spark用於寫中間數據,如RDD Cache,Shuffle,Spill等數據的位置.
Spark On YARN的時候,Spark Executor的本地路徑依賴於Yarn的配置,而不取決於這個參數.
• shuffle過程中常出現的問題
常見問題一:reduce oom?
問題原因:
reduce task 去map端獲取數據,reduce一邊拉取數據一邊聚合,reduce端有一塊聚合內存(executor memory * 0.2),也就是這塊內存不夠
解決辦法:
1.增加reduce 聚合操作的內存的比例
2.增加Executor memory的大小 --executor-memory 5G
3.減少reduce task每次拉取的數據量 設置spak.reducer.maxSizeInFlight 24m, 拉取的次數就多了,因此建立連接的次數增多,有可能會連接不上(正好趕上map task端進行GC)
常見問題二:錯誤描述--shuffle file cannot find or executor lost
• 什 么時候需要調節Executor的堆外內存大小?
• shuffle file cannot find (DAGScheduler,resubmitting task)
• executor lost
• task lost
• out of memory
問題原因:
1.map task所運行的executor內存不足,導致executor 掛掉了,executor里面的BlockManager就掛掉了,導致ConnectionManager不能用,也就無法建立連接,從而不能拉取數據
2.executor並沒有掛掉
2.1 BlockManage之間的連接失敗(map task所運行的executor正在GC)
2.2建立連接成功,map task所運行的executor正在GC
3.reduce task向Driver中的MapOutputTracker獲取shuffle file位置的時候出現了問題
解決辦法:
1.增大Executor內存(即堆內內存) ,申請的堆外內存也會隨之增加--executor-memory 5G
2.增大堆外內存 --conf spark.yarn.executor.memoryoverhead 2048M --conf spark.executor.memoryoverhead 2048M
(默認申請的堆外內存是Executor內存的10%,真正處理大數據的時候,這里都會出現問題,導致spark作業反復崩潰,無法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G)
如何發現數據傾斜?
有些作業異常地慢,
通過Spark Web UI來查看當前運行的stage各個task分配的數據量,從而進一步確定是不是task分配的數據不均勻導致了數據傾斜。
數據傾斜解決方案
1.局部聚合和全局聚合
對熱鍵進行多次處理,
先對熱鍵的KEY做一個處理,加一個隨機前綴,這樣做一次合並,減少數據量,然后再去掉前綴再做處理的時候,數據量就會減少。
2.將 reduce join 改為 map join
A JOIB B FILTER XXX ->
例如原來使用 join 兩張然后過濾數據的方式,可以使用廣播變量,把過濾對象放在廣播變量里面,然后在map里面做。
3. 找到具體的傾斜數據進行處理 處理方式可以參考第一種方法。
spark.shuffle.sort.bypassMergeThreshold 加大這個值,如果你的程序確實對數據不需要排序,可以加大這個參數,讓其采用bypass機制,提高效率。
spark.shuffle.consolidateFiles 合並生成中間磁盤文件
spark.reducer.maxSizeInFlight 參數說明:該參數用於設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數據。
關於spark shuffle 這篇文章講解地非常不錯 從原理到數據傾斜的處理方法
https://www.cnblogs.com/arachis/p/Spark_Shuffle.html
hive.map.aggr = true #Map 端部分聚合,相當於Combiner
hive.groupby.skewindata=true
# 數據傾斜的時候進行負載均衡,當項設定為 true,生成的查詢計划會有兩個 MR Job。第一個 MR Job
中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果
是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job
再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key
被分布到同一個 Reduce 中),最后完成最終的聚合操作。
spark參數的優先級
在代碼里面SparkConf寫的優先級最高,接着是通過flags傳遞給spark-submit or spark-shell.
的,然后才是spark-defaults.conf 文件的配置。
5.項目用什么語言寫? Scala? Scala的特點? 和Java的區別?
關於scala,能說出以下五個特點,我覺得就對SCALA有一定的理解。
Scala /FOLDLEFT/CASE CLASS/MATCH CASE/IMPLICIT/
JVM
FUNCTIONAL PROGRAMING 函數的組合來解決問題
java 8 函數式編程的一些特性 stream() map,filter,lambda 表達式。
6.理論基礎怎么樣,比如數據結構里面的快速排序或者樹? 講一講你了解的樹的知識?
快排
樹
鏈表
7.講一下數據庫,SQl ,左外連接, 原理,實現?
有做ETL的經驗,自以為對數據庫還是有相對透徹的理解。
擅長TSQL。
左連接:表輸出的結果以左表為准,關聯不到的數據,使用NULL值補充。
原理: LOOP JOIN,HASH JOIN,MERGE JOIN 在底層通過這三種方式來實現JOIN。
SQL的執行過程基本可以分為 語法解析(PARSE) -> bind(到數據庫里面去確認語法數上的元素,如表和視圖,函數等對象是否存在)
->優化(剪枝下移)->執行
8.Hbase的設計有什么心得?
HBASE企業級開發指南中學習的一些技巧
ROWKEY的設計要根據查詢請求來進行設計
9. 談談你對多線程 並發 問題的一些操作?
synchronized
在java中,常用這個關鍵字來進行鎖定,便如安全的單例模式時就使用這個關鍵字,對NEW的方法進行鎖定。
//加上synchronized,並加入對象鎖,new一個任意對象即可,我們這里使用Object來解決同步問題
有時候直接在方法上加的synchronized
如果這個類是一個實例類,那它使用的是什么呢?鎖的是什么呢? 鎖定的是this,即實例對象本身。
如果這個類或方法是靜態的,那么它的原理又是什么呢? 鎖定的是當前整個類本身。
有時候,也會專門new Object()來作為鎖定的對象。
Java多線程可以通過:
1. synchronized關鍵字
2. Java.util.concurrent包中的lock接口和ReentrantLock實現類
這兩種方式實現加鎖。
使用synchronized 關鍵字有什么缺陷?
當前執行線程執行過程中,其他線程執行到同樣的代碼時必須等待,影響了執行效率。
粒度有點粗,譬如說要控制的數據對象有讀寫兩個功能,就可以分讀寫鎖,而這種方式就沒有,讀寫操作都會被鎖定。
系統自動去釋放鎖,第二種方法就需要用戶自己適當的時候釋放鎖,否則就會有造成死鎖的可能。
lock 可以提高多線程讀的效率(或者說非排他鎖的效率)
有大量線程競爭資源的時候,建議使用lock來對資源進行精細管理。
如果CLASSA有兩個方法A,B都加了同步的關鍵字,會有什么問題嗎?
沒有,同步關鍵字使用的是可重入鎖,即鎖是當前對象的實例,線程執行到A時獲取得對象的鎖,
則再執行B時,就不需要再申請鎖,也申請不到鎖(因為前面申請的鎖沒有釋放)。
線程A來調用方法A時申請了鎖,還沒有執行完時,
同時線程B來調用方法B時能否正確執行,還是在繼續等待?(等待)
所以可以使用更細粒度的執行,即在方法內部對部分代碼塊進行鎖定,鎖定的時候,自己傳入鎖定對象,可以進一步進行控制。
譬如讓同時訪問方法B的線程進行等待。
在方法中使用synchronized(new Object()) {} {} 的寫法有什么問題嗎?
經測試,還是發現它也起到了鎖定的作用,線程不會同時進入方法塊,
這是為什么呢?
類的靜態變量是線程非安全的
類的實例變量單例是是線程非安全的,非單例時是線程安全的。
局部變量是線程安全的
盡量減少對整個方法使用synchronized關鍵字,我們使用代碼代碼塊進行同步,就不影響這種情況,
線程A訪問方法A,線程B方法方法B,方法AB可都可能會有加鎖的資源,但只要使用的鎖對象不一樣,
就不影響並發操作。
11.有沒有用過Zookeeper呢? Zookeeper的適用場景是什么?
HA 狀態維護 分布式鎖 全局配置文件管理 操作Zookeeper是用的什么?
hbase用zookeeper存放一些配置信息 meta root信息
hadoop nn rm ha 用zk的臨時節點功能和watch特性來做主備的自動切換
kafka 用zk來存放bootstrap ,customer group offset信息
12.spark開發 比如 一個讀取hdfs上的文件,然后count有多少行的操作,你可以說說過程嗎。那這個count是在內存中,還是磁盤中計算的呢?磁盤中。
RDD[SRING]
rdd.mappartitions(l=>l.size).collect()
13.spark sql又為什么比hive快呢?
SPARK
內存
RDD-DAG 類似於hive tez 引擎,優於MR引擎
14.RDD的數據結構是怎么樣的? Partition數組。
partitions
dependency
compute (split:Partition,taskC:TaskContext)
1.spark作業的執行過程描述一下?
作業提交之后,分析 RDD 寬窄依賴 分成不同的STAGE task 再送到EXECUTOR中去執行。
SPARK作業的提交主要涉及到driver/executor這兩個節點。
driver/app master/ ->resourcemanager 交互的節點
spark app /main 運行的節點/
sparkconf->sparkcontext->server
RDD依賴分析,生成DAG
根據DAG中的寬窄依賴,划分stage,然后生成task,分發到executor中執行。
2.Hbase的PUT的一個過程。
初始化一個HTABLE類,然后構造puT類,寫入數據,第三步執行操作。
put可以插入一個或多行數據,也可以用於更新數據。
put 你怎么知道put的數據發送到哪個regionServer?
判斷提交的那一個row對象是在那個region當中
RegionLocations locs = connection.locateRegion(tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
locateRegionInMeta方法到.META.表中查找tableName的row所對應的HRegion所在位置,先從本地緩存查找,如果沒有,則進行下一步;
我覺得這個題最核心的部分就是這些:
行鍵是不可分割的字節數據,而且在表中是按由低到高的順序排列的。
表是被切分為region分發在不同的regionServer來管理。
PUT(TABLENAME,ROWKEY,BYTESARRAY) 來找到所屬REGION->RS->RPC 數據存儲或放入服務端緩存。
region(startrowkey-endrowkey) tableName ,rowkey 就可台找到所屬的RS
3.RDD算子里操作一個外部map比如往里面put數據。然后算子外再遍歷map。有什么問題嗎。
錯的,關於閉包的問題說明。這個MAP就是閉包里的變量,它有自己的作用域,里面的和外面的不一樣。
7.stage怎么划分的? 寬依賴窄依賴是什么?
stage 就是根據寬窄依賴划分的,
寬窄依賴是根據DAG中任務是否產生了SHUFFLE來判斷的。
像filter,map,flatmap,partition ->partiton 1:1關系,不存N:N關系。
9.講講列式存儲的 parquet文件底層格式。
parquet orc
列式存儲 增大壓縮比 減少數據量
可以謂詞下推 提少掃描的數據量
ORC 支持ACID
PARQUET 支持嵌套的數據結構
1.scala foldLeft 方法的理解
2.scala 模式匹配
3.scala 隱式變量 隱式轉換 柯里化
java 面試題
項目中,推薦判斷對象為空時使用null==Object的方法,這是為什么,null放在后面有區別嗎?
沒有區別,null放在前面是為了防止少寫一個=時,編譯器會報錯,有友好的提示作用。
hbase企業應用開發實戰
Hbase 安裝與配置 集群運維管理 故障處理
hbase數據模型(表 行鍵 列族 單元格) 數據模型的操作(get put scan delete) version,sort,count,自行分區
hbase表結構設計 rowkey 列族定義 DEMO
hbase client :java client hbase shell restful client mapreduce hbase
hbase and sql :hive/hbase phoenix kundera
核心結構(b+ lsm) 過濾器 二級索引 協處理器 布隆過濾器
性能優化 JVM 查詢優化 寫入優化
hbase 主要有哪些配置?
1.hbase.zookeeper.quorum host:port dir
2.hbase.rootdir :數據存在hdfs哪個位置
3.是否開啟集群分配式的參數 hbase.cluster.distributed
hbase 中元數據表存儲tableName與region的對應關系 就像hdfs nn存儲fileName與block的對應關系類似
rowkey 是不可分割的字節數組,按字典順序由低到高存儲在表中的。(很重要很重要)
有關kafka的存儲?
Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經消費完文件,減少磁盤占用。
(2).通過索引信息可以快速定位message和確定response的最大大小。
(3).通過index元數據全部映射到memory,可以避免segment file的IO磁盤操作。
(4).通過索引文件稀疏存儲,可以大幅降低index文件元數據占用空間大小。
11.Kafka新建的分區會在哪個目錄下創建
會在含有文件數量最少的磁盤分區創建,如果我們在log.dirs中新添加了一個目錄,則新建的分區會一直往里面放,直至創建的文件數量不是最少。
partition的數據如何保存到硬盤
topic中的多個partition以文件夾的形式保存到broker,每個分區序號從0遞增,
且消息有序
segment 是為了方便清理已經處理的消息。
Partition文件下有多個segment(xxx.index,xxx.log)
segment 文件里的 大小和配置文件大小一致可以根據要求修改 默認為1g
如果大小大於1g時,會滾動一個新的segment並且以上一個segment最后一條消息的偏移量命名
索引文件里索引的是什么? 索引的是log文件的內容,具體的是有兩列,第一列就是消息的序數,第二個這個消息的offset,根據索引就可以知道第幾個消息在什么位置。
00000000000000170410.index 索引的名字包含了第一個全局消息offset
根據這個命名就可以通過掃描索引文件名字,找到OFFSET位於哪個索引文件,進而知道位於哪個數據文件中。
消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。每個part在內存中對應一個index,記錄每個segment中的第一條消息偏移。
發布者發到某個topic的消息會被均勻的分布到多個part上(隨機或根據用戶指定的回調函數進行分布),broker收到發布消息往對應part的最后一個segment上添加該消息,
當某個segment上的消息條數達到配置值或消息發布時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到,
segment達到一定的大小后將不會再往該segment寫數據,broker會創建新的segment。
kafaka生產數據時數據的分組策略
生產者決定數據產生到集群的哪個partition中
每一條消息都是以(key,value)格式
Key是由生產者發送數據傳入
所以生產者(key)決定了數據產生到集群的哪個partition
如果key是空的,如何決定發送到哪個partition?
從源碼可以看出,首先獲取topic的所有Patition,如果客戶端不指定Patition,也沒有指定Key的話,
使用自增長的數字取余數的方式實現指定的Partition。這樣Kafka將平均的向Partition中生產數據。
https://www.cnblogs.com/haoxinyue/p/5743775.html
kafka中zk起到了什么作用?
1.consumer offset
2.集群管理 leader partition選舉 PARTITION ISR的管理, 節點實時狀態等(管理broker,consumer的動態加入和離開)
通過kafka 在zookeepr中的結構就可以發現其中的功能:
/consumer /brokers /config /admin
kafka message 消息的結構
offset
messagelength
xxx 其他字段
timestamp
keylen
key
valuelen
value
kafka 如何保證數據會均勻地分布在他的partition中呢??????????????
partition機制可以通過指定producer的partition.class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。
hw:highwatermark 高水位,取一個partition的ISR中最小的LEO(latest offset)作為HW,consumer只能消費到HW所在的位置。
對於leader新寫入的數據,consumer不能立刻消費,leader會等待所有ISR中的replica同步后更新hw,然后消費者才能消費。
這樣就保證了即使leader失效,該消息也會從新選擇出來的leader中獲取到。
KAFKA ISR 這種機制平衡了吞吐量和數據的不丟失。
Controller, kafka集群中的某一個broker會被選舉為controller,主要負責partition管理和副本的狀態管理。
執行類似重分配partition之類的管理事務。
消息的可靠性
當producer向leader發送數據時,可以通過request.required.acks參數來設置數據可靠性的級別:
1(默認):這意味着producer在ISR中的leader已成功收到的數據並得到確認后發送下一條message。如果leader宕機了,則會丟失數據。
0:這意味着producer無需等待來自broker的確認而繼續發送下一批消息。這種情況下數據傳輸效率最高,但是數據可靠性確是最低的。
-1:producer需要等待ISR中的所有follower都確認接收到數據后才算一次發送完成,可靠性最高。但是這樣也不能保證數據不丟失,
比如當ISR中只有leader時(前面ISR那一節講到,ISR中的成員由於某些情況會增加也會減少,最少就只剩一個leader),這樣就變成了acks=1的情況。
關於消息傳送的保障
at least once: 從broker拉取數據后,先消費,消費成功后再提交offset
at most once: 從broker拉取數據后,先提交offset,然后再消費,這樣如果失敗了,不會再消費第二次同樣的數據
exact once: 這個比較復雜,需要配合消息的去重,建議根據業務數據的特點自行處理。
要保證數據寫入到Kafka是安全的,高可靠的,需要如下的配置:
topic的配置:replication.factor>=3,即副本數至少是3個;2<=min.insync.replicas<=replication.factor
broker的配置:leader的選舉條件unclean.leader.election.enable=false
producer的配置:request.required.acks=-1(all),producer.type=sync
這篇文章講kafka相關的內容,非常不錯,內容豐富 作者可能是唯品會的員工
https://blog.csdn.net/lp284558195/article/details/80297208
hbase 面試題
hbase table 可以通過預建分區來減少合並分裂帶來的性能影響,請問如何進行這項操作?
1.通過hbase shell
create 't1', 'f1',SPLITS=>['10','20','30']
2.通過java api
關鍵問題
1。是行鍵怎么設計,以什么樣的形式生成行鍵。
2。行鍵分多少個 根據集群規則 算出會有多少個region,然后計算出區間
hbase 中的大小合並分別指的什么? 如何在生產中調整對系統使用的影響?
hbase region server 主要做的事情就是 接收數據的讀寫
memstore 數據flush到磁盤后形成hfile, compaction 就是對這些文件的合並與拆分。
hbase中有兩種compact
合並:
1。小合並 文件的合並
2。大合並 對某region下所有的storefile執行合並操作,這個過程中會清理有刪除標記的數據
和超過TTL的數據與版本的數據。
rowkey的設計原則?
1.短小 減少存儲壓力 無論是memostore hfile都會存儲rowkey
2.散列 減少熱點 盡量負載均衡 減少查詢壓力
3.方便業務查詢 常見的需求是后插入的數據優先查詢到 可以使用long.max - timestamp作為key的一部分
4.rowkey 唯一性原則 設計的時候要保證不重復
簡述布隆過濾器的原理?
簡單說一下BloomFilter原理
一旦表格中開啟BloomFilter,那么在生成StoreFile時同時會生成一份包含BloomFilter結構的文件MetaBlock,所以會增加一定的存儲開銷和內存開銷
內部是一個bit數組,初始值均為0
插入元素時對元素進行hash並且映射到數組中的某一個index,將其置為1,再進行多次不同的hash算法,將映射到的index置為1,同一個index只需要置1次。
查詢時使用跟插入時相同的hash算法,如果在對應的index的值都為1,那么就可以認為該元素可能存在,注意,只是可能存在
所以BlomFilter只能保證過濾掉不包含的元素,而不能保證誤判包含
數據寫入到的memostore,wal(log),這樣才算安全寫入,如果regionserver崩潰了,memostore的文件沒有完成寫入,可以根據wal來對數據進行重播。
那waL的數據寫在哪里呢?一般是HDFS.
Hbase掛掉的原因很多,很多時候是因為ZK和HDFS的緣故,比如磁盤問題導致hdfs的故障。
如果rs/hm進行了長時間的gc或改動了服務器的時間,也會導致zk超時,出問題。