Shuffle核心概念、Shuffle調(diào)優(yōu)及故障排除
三、 SortShuffle解析
SortShuffleManager的運(yùn)行機(jī)制主要分成兩種,一種是普通運(yùn)行機(jī)制,另一種是bypass運(yùn)行機(jī)制。當(dāng)shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(shí)(默認(rèn)為200),就會(huì)啟用bypass機(jī)制。
1. 普通運(yùn)行機(jī)制
在該模式下,數(shù)據(jù)會(huì)先寫(xiě)入一個(gè)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中,此時(shí)根據(jù)不同的shuffle算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。如果是reduceByKey這種聚合類的shuffle算子,那么會(huì)選用Map數(shù)據(jù)結(jié)構(gòu),一邊通過(guò)Map進(jìn)行聚合,一邊寫(xiě)入內(nèi)存;如果是join這種普通的shuffle算子,那么會(huì)選用Array數(shù)據(jù)結(jié)構(gòu),直接寫(xiě)入內(nèi)存。接著,每寫(xiě)一條數(shù)據(jù)進(jìn)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會(huì)判斷一下,是否達(dá)到了某個(gè)臨界閾值。如果達(dá)到臨界閾值的話,那么就會(huì)嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫(xiě)到磁盤(pán),然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。
在溢寫(xiě)到磁盤(pán)文件之前,會(huì)先根據(jù)key對(duì)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進(jìn)行排序。排序過(guò)后,會(huì)分批將數(shù)據(jù)寫(xiě)入磁盤(pán)文件。默認(rèn)的batch數(shù)量是10000條,也就是說(shuō),排序好的數(shù)據(jù),會(huì)以每批1萬(wàn)條數(shù)據(jù)的形式分批寫(xiě)入磁盤(pán)文件。寫(xiě)入磁盤(pán)文件是通過(guò)Java的BufferedOutputStream實(shí)現(xiàn)的。BufferedOutputStream是Java的緩沖輸出流,首先會(huì)將數(shù)據(jù)緩沖在內(nèi)存中,當(dāng)內(nèi)存緩沖滿溢之后再一次寫(xiě)入磁盤(pán)文件中,這樣可以減少磁盤(pán)IO次數(shù),提升性能。
一個(gè)task將所有數(shù)據(jù)寫(xiě)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過(guò)程中,會(huì)發(fā)生多次磁盤(pán)溢寫(xiě)操作,也就會(huì)產(chǎn)生多個(gè)臨時(shí)文件。最后會(huì)將之前所有的臨時(shí)磁盤(pán)文件都進(jìn)行合并,這就是merge過(guò)程,此時(shí)會(huì)將之前所有臨時(shí)磁盤(pán)文件中的數(shù)據(jù)讀取出來(lái),然后依次寫(xiě)入最終的磁盤(pán)文件之中。此外,由于一個(gè)task就只對(duì)應(yīng)一個(gè)磁盤(pán)文件,也就意味著該task為下游stage的task準(zhǔn)備的數(shù)據(jù)都在這一個(gè)文件中,因此還會(huì)單獨(dú)寫(xiě)一份索引文件,其中標(biāo)識(shí)了下游各個(gè)task的數(shù)據(jù)在文件中的start offset與end offset。
SortShuffleManager由于有一個(gè)磁盤(pán)文件merge的過(guò)程,因此大大減少了文件數(shù)量。比如第一個(gè)stage有50個(gè)task,總共有10個(gè)Executor,每個(gè)Executor執(zhí)行5個(gè)task,而第二個(gè)stage有100個(gè)task。由于每個(gè)task最終只有一個(gè)磁盤(pán)文件,因此此時(shí)每個(gè)Executor上只有5個(gè)磁盤(pán)文件,所有Executor只有50個(gè)磁盤(pán)文件。
普通運(yùn)行機(jī)制的SortShuffleManager工作原理如下圖所示:
普通運(yùn)行機(jī)制的SortShuffleManager工作原理2. bypass運(yùn)行機(jī)制
bypass運(yùn)行機(jī)制的觸發(fā)條件如下:
shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold=200參數(shù)的值。不是聚合類的shuffle算子。
此時(shí),每個(gè)task會(huì)為每個(gè)下游task都創(chuàng)建一個(gè)臨時(shí)磁盤(pán)文件,并將數(shù)據(jù)按key進(jìn)行hash然后根據(jù)key的hash值,將key寫(xiě)入對(duì)應(yīng)的磁盤(pán)文件之中。當(dāng)然,寫(xiě)入磁盤(pán)文件時(shí)也是先寫(xiě)入內(nèi)存緩沖,緩沖寫(xiě)滿之后再溢寫(xiě)到磁盤(pán)文件的。最后,同樣會(huì)將所有臨時(shí)磁盤(pán)文件都合并成一個(gè)磁盤(pán)文件,并創(chuàng)建一個(gè)單獨(dú)的索引文件。
該過(guò)程的磁盤(pán)寫(xiě)機(jī)制其實(shí)跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的,因?yàn)槎家獎(jiǎng)?chuàng)建數(shù)量驚人的磁盤(pán)文件,只是在最后會(huì)做一個(gè)磁盤(pán)文件的合并而已。因此少量的最終磁盤(pán)文件,也讓該機(jī)制相對(duì)未經(jīng)優(yōu)化的HashShuffleManager來(lái)說(shuō),shuffle read的性能會(huì)更好。
而該機(jī)制與普通SortShuffleManager運(yùn)行機(jī)制的不同在于:第一,磁盤(pán)寫(xiě)機(jī)制不同;第二,不會(huì)進(jìn)行排序。也就是說(shuō),啟用該機(jī)制的最大好處在于,shuffle write過(guò)程中,不需要進(jìn)行數(shù)據(jù)的排序操作,也就節(jié)省掉了這部分的性能開(kāi)銷。
bypass運(yùn)行機(jī)制的SortShuffleManager工作原理如下圖所示:
bypass運(yùn)行機(jī)制的SortShuffleManager工作原理
四、map和reduce端緩沖區(qū)大小
在Spark任務(wù)運(yùn)行過(guò)程中,如果shuffle的map端處理的數(shù)據(jù)量比較大,但是map端緩沖的大小是固定的,可能會(huì)出現(xiàn)map端緩沖數(shù)據(jù)頻繁spill溢寫(xiě)到磁盤(pán)文件中的情況,使得性能非常低下,通過(guò)調(diào)節(jié)map端緩沖的大小,可以避免頻繁的磁盤(pán)IO操作,進(jìn)而提升Spark任務(wù)的整體性能。
map端緩沖的默認(rèn)配置是32KB,如果每個(gè)task處理640KB的數(shù)據(jù),那么會(huì)發(fā)生640/32 = 20次溢寫(xiě),如果每個(gè)task處理64000KB的數(shù)據(jù),即會(huì)發(fā)生64000/32=2000次溢寫(xiě),這對(duì)于性能的影響是非常嚴(yán)重的。
map端緩沖的配置方法:
val conf = new SparkConf()
.set("spark.shuffle.file.buffer", "64")
Spark Shuffle過(guò)程中,shuffle reduce task的buffer緩沖區(qū)大小決定了reduce task每次能夠緩沖的數(shù)據(jù)量,也就是每次能夠拉取的數(shù)據(jù)量,如果內(nèi)存資源較為充足,適當(dāng)增加拉取數(shù)據(jù)緩沖區(qū)的大小,可以減少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進(jìn)而提升性能。
reduce端數(shù)據(jù)拉取緩沖區(qū)的大小可以通過(guò)spark.reducer.maxSizeInFlight參數(shù)進(jìn)行設(shè)置,默認(rèn)為48MB。該參數(shù)的設(shè)置方法如下:
reduce端數(shù)據(jù)拉取緩沖區(qū)配置:
val conf = new SparkConf()
.set("spark.reducer.maxSizeInFlight", "96")
五、reduce端重試次數(shù)和等待時(shí)間間隔
Spark Shuffle過(guò)程中,reduce task拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試。對(duì)于那些包含了特別耗時(shí)的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次),以避免由于JVM的full gc或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗。在實(shí)踐中發(fā)現(xiàn),對(duì)于針對(duì)超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過(guò)程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。
reduce端拉取數(shù)據(jù)重試次數(shù)可以通過(guò)spark.shuffle.io.maxRetries參數(shù)進(jìn)行設(shè)置,該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒(méi)有成功,就可能會(huì)導(dǎo)致作業(yè)執(zhí)行失敗,默認(rèn)為3,該參數(shù)的設(shè)置方法如下:
reduce端拉取數(shù)據(jù)重試次數(shù)配置:
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
Spark Shuffle過(guò)程中,reduce task拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試,在一次失敗后,會(huì)等待一定的時(shí)間間隔再進(jìn)行重試,可以通過(guò)加大間隔時(shí)長(zhǎng)(比如60s),以增加shuffle操作的穩(wěn)定性。
reduce端拉取數(shù)據(jù)等待間隔可以通過(guò)spark.shuffle.io.retryWait參數(shù)進(jìn)行設(shè)置,默認(rèn)值為5s,該參數(shù)的設(shè)置方法如下:
reduce端拉取數(shù)據(jù)等待間隔配置:
val conf = new SparkConf()
.set("spark.shuffle.io.retryWait", "60s")
發(fā)表評(píng)論
請(qǐng)輸入評(píng)論內(nèi)容...
請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字
最新活動(dòng)更多
-
11月20日火熱報(bào)名中>> 2024 智能家居出海論壇
-
11月28日立即報(bào)名>>> 2024工程師系列—工業(yè)電子技術(shù)在線會(huì)議
-
12月19日立即報(bào)名>> 【線下會(huì)議】OFweek 2024(第九屆)物聯(lián)網(wǎng)產(chǎn)業(yè)大會(huì)
-
即日-12.26火熱報(bào)名中>> OFweek2024中國(guó)智造CIO在線峰會(huì)
-
即日-2025.8.1立即下載>> 《2024智能制造產(chǎn)業(yè)高端化、智能化、綠色化發(fā)展藍(lán)皮書(shū)》
-
精彩回顧立即查看>> 【在線會(huì)議】多物理場(chǎng)仿真助跑新能源汽車(chē)
推薦專題
- 1 腦機(jī)接口芯片,華為出了新專利!
- 2 今年諾獎(jiǎng)對(duì)人工智能的重視,給我們的基礎(chǔ)教育提了個(gè)醒
- 3 銀行業(yè)AI大模型,從入局到求變
- 4 巨頭搶布局,VC狂撒錢(qián),為了能讓「AI讀心」這些公司卷瘋了
- 5 阿斯麥ASML:“骨折級(jí)”洋相,又成AI第一殺手?
- 6 蘋(píng)果市值創(chuàng)新高,iPhone 16能否助力突破4萬(wàn)億美元大關(guān)?
- 7 一場(chǎng)“載入史冊(cè)”的發(fā)布會(huì),讓馬斯克失去了4700億
- 8 百度谷歌比較研究2024:中美“遠(yuǎn)古AI龍頭”的現(xiàn)狀與趨勢(shì)
- 9 洞見(jiàn)AI風(fēng)潮 第二屆vivo藍(lán)河操作系統(tǒng)創(chuàng)新賽開(kāi)啟招募
- 10 地平線開(kāi)啟配售,阿里百度各砸5000萬(wàn)美金,市值最高超500億
- 高級(jí)軟件工程師 廣東省/深圳市
- 自動(dòng)化高級(jí)工程師 廣東省/深圳市
- 光器件研發(fā)工程師 福建省/福州市
- 銷售總監(jiān)(光器件) 北京市/海淀區(qū)
- 激光器高級(jí)銷售經(jīng)理 上海市/虹口區(qū)
- 光器件物理工程師 北京市/海淀區(qū)
- 激光研發(fā)工程師 北京市/昌平區(qū)
- 技術(shù)專家 廣東省/江門(mén)市
- 封裝工程師 北京市/海淀區(qū)
- 結(jié)構(gòu)工程師 廣東省/深圳市