Shuffle核心概念、Shuffle調(diào)優(yōu)及故障排除
Spark調(diào)優(yōu)之Shuffle調(diào)優(yōu)
本節(jié)開始先講解Shuffle核心概念;然后針對HashShuffle、SortShuffle進(jìn)行調(diào)優(yōu);接下來對map端、reduce端調(diào)優(yōu);再針對Spark中的數(shù)據(jù)傾斜問題進(jìn)行剖析及調(diào)優(yōu);最后是Spark運行過程中的故障排除。
一、Shuffle的核心概念
1. ShuffleMapStage與ResultStage
ShuffleMapStage與ResultStage
在劃分stage時,最后一個stage稱為FinalStage,它本質(zhì)上是一個ResultStage對象,前面的所有stage被稱為ShuffleMapStage。
ShuffleMapStage的結(jié)束伴隨著shuffle文件的寫磁盤。
ResultStage基本上對應(yīng)代碼中的action算子,即將一個函數(shù)應(yīng)用在RDD的各個partition的數(shù)據(jù)集上,意味著一個job的運行結(jié)束。
2. Shuffle中的任務(wù)個數(shù)
我們知道,Spark Shuffle分為map階段和reduce階段,或者稱之為ShuffleRead階段和ShuffleWrite階段,那么對于一次Shuffle,map過程和reduce過程都會由若干個task來執(zhí)行,那么map task和reduce task的數(shù)量是如何確定的呢?
假設(shè)Spark任務(wù)從HDFS中讀取數(shù)據(jù),那么初始RDD分區(qū)個數(shù)由該文件的split個數(shù)決定,也就是一個split對應(yīng)生成的RDD的一個partition,我們假設(shè)初始partition個數(shù)為N。
初始RDD經(jīng)過一系列算子計算后(假設(shè)沒有執(zhí)行repartition和coalesce算子進(jìn)行重分區(qū),則分區(qū)個數(shù)不變,仍為N,如果經(jīng)過重分區(qū)算子,那么分區(qū)個數(shù)變?yōu)镸),我們假設(shè)分區(qū)個數(shù)不變,當(dāng)執(zhí)行到Shuffle操作時,map端的task個數(shù)和partition個數(shù)一致,即map task為N個。
reduce端的stage默認(rèn)取spark.default.parallelism這個配置項的值作為分區(qū)數(shù),如果沒有配置,則以map端的最后一個RDD的分區(qū)數(shù)作為其分區(qū)數(shù)(也就是N),那么分區(qū)數(shù)就決定了reduce端的task的個數(shù)。
3. reduce端數(shù)據(jù)的讀取
根據(jù)stage的劃分我們知道,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task會先執(zhí)行,那么后執(zhí)行的reduce task如何知道從哪里去拉取map task落盤后的數(shù)據(jù)呢?
reduce端的數(shù)據(jù)拉取過程如下:
map task 執(zhí)行完畢后會將計算狀態(tài)以及磁盤小文件位置等信息封裝到MapStatus對象中,然后由本進(jìn)程中的MapOutPutTrackerWorker對象將mapStatus對象發(fā)送給Driver進(jìn)程的MapOutPutTrackerMaster對象;在reduce task開始執(zhí)行之前會先讓本進(jìn)程中的MapOutputTrackerWorker向Driver進(jìn)程中的MapoutPutTrakcerMaster發(fā)動請求,請求磁盤小文件位置信息;當(dāng)所有的Map task執(zhí)行完畢后,Driver進(jìn)程中的MapOutPutTrackerMaster就掌握了所有的磁盤小文件的位置信息。此時MapOutPutTrackerMaster會告訴MapOutPutTrackerWorker磁盤小文件的位置信息;完成之前的操作之后,由BlockTransforService去Executor0所在的節(jié)點拉數(shù)據(jù),默認(rèn)會啟動五個子線程。每次拉取的數(shù)據(jù)量不能超過48M(reduce task每次最多拉取48M數(shù)據(jù),將拉來的數(shù)據(jù)存儲到Executor內(nèi)存的20%內(nèi)存中)。
二、HashShuffle解析
以下的討論都假設(shè)每個Executor有1個cpu core。
1. 未經(jīng)優(yōu)化的HashShuffleManager
shuffle write階段,主要就是在一個stage結(jié)束計算之后,為了下一個stage可以執(zhí)行shuffle類的算子(比如reduceByKey),而將每個task處理的數(shù)據(jù)按key進(jìn)行“劃分”。所謂“劃分”,就是對相同的key執(zhí)行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于下游stage的一個task。在將數(shù)據(jù)寫入磁盤之前,會先將數(shù)據(jù)寫入內(nèi)存緩沖中,當(dāng)內(nèi)存緩沖填滿之后,才會溢寫到磁盤文件中去。
下一個stage的task有多少個,當(dāng)前stage的每個task就要創(chuàng)建多少份磁盤文件。比如下一個stage總共有100個task,那么當(dāng)前stage的每個task都要創(chuàng)建100份磁盤文件。如果當(dāng)前stage有50個task,總共有10個Executor,每個Executor執(zhí)行5個task,那么每個Executor上總共就要創(chuàng)建500個磁盤文件,所有Executor上會創(chuàng)建5000個磁盤文件。由此可見,未經(jīng)優(yōu)化的shuffle write操作所產(chǎn)生的磁盤文件的數(shù)量是極其驚人的。
shuffle read階段,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結(jié)果中的所有相同key,從各個節(jié)點上通過網(wǎng)絡(luò)都拉取到自己所在的節(jié)點上,然后進(jìn)行key的聚合或連接等操作。由于shuffle write的過程中,map task給下游stage的每個reduce task都創(chuàng)建了一個磁盤文件,因此shuffle read的過程中,每個reduce task只要從上游stage的所有map task所在節(jié)點上,拉取屬于自己的那一個磁盤文件即可。
shuffle read的拉取過程是一邊拉取一邊進(jìn)行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數(shù)據(jù),然后通過內(nèi)存中的一個Map進(jìn)行聚合等操作。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到buffer緩沖中進(jìn)行聚合操作。以此類推,直到最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果。
未優(yōu)化的HashShuffleManager工作原理如下圖所示:
未優(yōu)化的HashShuffleManager工作原理2. 優(yōu)化后的HashShuffleManager
為了優(yōu)化HashShuffleManager我們可以設(shè)置一個參數(shù):spark.shuffle.consolidateFiles,該參數(shù)默認(rèn)值為false,將其設(shè)置為true即可開啟優(yōu)化機制,通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項。
開啟consolidate機制之后,在shuffle write過程中,task就不是為下游stage的每個task創(chuàng)建一個磁盤文件了,此時會出現(xiàn)shuffleFileGroup的概念,每個shuffleFileGroup會對應(yīng)一批磁盤文件,磁盤文件的數(shù)量與下游stage的task數(shù)量是相同的。一個Executor上有多少個cpu core,就可以并行執(zhí)行多少個task。而第一批并行執(zhí)行的每個task都會創(chuàng)建一個shuffleFileGroup,并將數(shù)據(jù)寫入對應(yīng)的磁盤文件內(nèi)。
當(dāng)Executor的cpu core執(zhí)行完一批task,接著執(zhí)行下一批task時,下一批task就會復(fù)用之前已有的shuffleFileGroup,包括其中的磁盤文件,也就是說,此時task會將數(shù)據(jù)寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate機制允許不同的task復(fù)用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進(jìn)行一定程度上的合并,從而大幅度減少磁盤文件的數(shù)量,進(jìn)而提升shuffle write的性能。
假設(shè)第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor(Executor CPU個數(shù)為1),每個Executor執(zhí)行5個task。那么原本使用未經(jīng)優(yōu)化的HashShuffleManager時,每個Executor會產(chǎn)生500個磁盤文件,所有Executor會產(chǎn)生5000個磁盤文件的。但是此時經(jīng)過優(yōu)化之后,每個Executor創(chuàng)建的磁盤文件的數(shù)量的計算公式為:cpu core的數(shù)量 * 下一個stage的task數(shù)量,也就是說,每個Executor此時只會創(chuàng)建100個磁盤文件,所有Executor只會創(chuàng)建1000個磁盤文件。
優(yōu)化后的HashShuffleManager工作原理如下圖所示:
優(yōu)化后的HashShuffleManager工作原理
請輸入評論內(nèi)容...
請輸入評論/評論長度6~500個字
最新活動更多
-
11月20日火熱報名中>> 2024 智能家居出海論壇
-
11月28日立即報名>>> 2024工程師系列—工業(yè)電子技術(shù)在線會議
-
12月19日立即報名>> 【線下會議】OFweek 2024(第九屆)物聯(lián)網(wǎng)產(chǎn)業(yè)大會
-
即日-12.26火熱報名中>> OFweek2024中國智造CIO在線峰會
-
即日-2025.8.1立即下載>> 《2024智能制造產(chǎn)業(yè)高端化、智能化、綠色化發(fā)展藍(lán)皮書》
-
精彩回顧立即查看>> 【在線會議】多物理場仿真助跑新能源汽車
推薦專題
- 1 腦機接口芯片,華為出了新專利!
- 2 今年諾獎對人工智能的重視,給我們的基礎(chǔ)教育提了個醒
- 3 銀行業(yè)AI大模型,從入局到求變
- 4 巨頭搶布局,VC狂撒錢,為了能讓「AI讀心」這些公司卷瘋了
- 5 阿斯麥ASML:“骨折級”洋相,又成AI第一殺手?
- 6 蘋果市值創(chuàng)新高,iPhone 16能否助力突破4萬億美元大關(guān)?
- 7 一場“載入史冊”的發(fā)布會,讓馬斯克失去了4700億
- 8 百度谷歌比較研究2024:中美“遠(yuǎn)古AI龍頭”的現(xiàn)狀與趨勢
- 9 洞見AI風(fēng)潮 第二屆vivo藍(lán)河操作系統(tǒng)創(chuàng)新賽開啟招募
- 10 地平線開啟配售,阿里百度各砸5000萬美金,市值最高超500億
- 高級軟件工程師 廣東省/深圳市
- 自動化高級工程師 廣東省/深圳市
- 光器件研發(fā)工程師 福建省/福州市
- 銷售總監(jiān)(光器件) 北京市/海淀區(qū)
- 激光器高級銷售經(jīng)理 上海市/虹口區(qū)
- 光器件物理工程師 北京市/海淀區(qū)
- 激光研發(fā)工程師 北京市/昌平區(qū)
- 技術(shù)專家 廣東省/江門市
- 封裝工程師 北京市/海淀區(qū)
- 結(jié)構(gòu)工程師 廣東省/深圳市