導(dǎo)讀:阿里云 EMR 團(tuán)隊(duì)和趣頭條的大數(shù)據(jù)團(tuán)隊(duì)共同研發(fā)了 RSS,解決 Spark on Yarn 層面提到的所有問題,并為 Spark 跑在 Kubernetes 上提供 Shuffle 基礎(chǔ)組件。
作者 | 王振華、曹佳清、范振
業(yè)務(wù)場景與現(xiàn)狀
趣頭條是一家依賴大數(shù)據(jù)的科技公司,在 2018~2019 年經(jīng)歷了業(yè)務(wù)的高速發(fā)展,主 App 和其他創(chuàng)新 App 的日活增加了 10 倍以上,相應(yīng)的大數(shù)據(jù)系統(tǒng)也從最初的 100 臺(tái)機(jī)器增加到了 1000 臺(tái)以上規(guī)模。多個(gè)業(yè)務(wù)線依賴于大數(shù)據(jù)平臺(tái)展開業(yè)務(wù),大數(shù)據(jù)系統(tǒng)的高效和穩(wěn)定成了公司業(yè)務(wù)發(fā)展的基石,在大數(shù)據(jù)的架構(gòu)上我們使用了業(yè)界成熟的方案,存儲(chǔ)構(gòu)建在 HDFS 上、計(jì)算資源調(diào)度依賴 Yarn、表元數(shù)據(jù)使用 Hive 管理、用 Spark 進(jìn)行計(jì)算,具體如圖 1 所示:
圖 1 趣頭條離線大數(shù)據(jù)平臺(tái)架構(gòu)圖
其中 Yarn 集群使用了單一大集群的方案,HDFS 使用了聯(lián)邦的方案,同時(shí)基于成本因素,HDFS 和 Yarn 服務(wù)在 ECS 上進(jìn)行了 DataNode 和 NodeManager 的混部。
在趣頭條每天有 6W 的 Spark 任務(wù)跑在 Yarn 集群上,每天新增的 Spark 任務(wù)穩(wěn)定在 100 左右,公司的迅速發(fā)展要求需求快速實(shí)現(xiàn),積累了很多治理欠債,種種問題表現(xiàn)出來集群穩(wěn)定性需要提升,其中 Shuffle 的穩(wěn)定性越來越成為集群的桎梏,亟需解決。
當(dāng)前大數(shù)據(jù)平臺(tái)的挑戰(zhàn)與思考
近半年大數(shù)據(jù)平臺(tái)主要的業(yè)務(wù)指標(biāo)是降本增效,一方面業(yè)務(wù)方希望離線平臺(tái)每天能夠承載更多的作業(yè),另一方面我們自身有降本的需求,如何在降本的前提下支撐更多地業(yè)務(wù)量對(duì)于每個(gè)技術(shù)人都是非常大地挑戰(zhàn)。熟悉 Spark 的同學(xué)應(yīng)該非常清楚,在大規(guī)模集群場景下,Spark Shuffle 在實(shí)現(xiàn)上有比較大的缺陷,體現(xiàn)在以下的幾個(gè)方面:
- Spark Shuffle Fetch 過程存在大量的網(wǎng)絡(luò)小包,現(xiàn)有的 External Shuffle Service 設(shè)計(jì)并沒有非常細(xì)致的處理這些 RPC 請(qǐng)求,大規(guī)模場景下會(huì)有很多connection reset 發(fā)生,導(dǎo)致 FetchFailed,從而導(dǎo)致 stage 重算。
- Spark Shuffle Fetch 過程存在大量的隨機(jī)讀,大規(guī)模高負(fù)載集群條件下,磁盤 IO 負(fù)載高、CPU 滿載時(shí)常發(fā)生,極容易發(fā)生 FetchFailed,從而導(dǎo)致 stage 重算。
- 重算過程會(huì)放大集群的繁忙程度,搶占機(jī)器資源,導(dǎo)致惡性循環(huán)嚴(yán)重,SLA 完不成,需要運(yùn)維人員手動(dòng)將作業(yè)跑在空閑的Label集群。
- 計(jì)算和 Shuffle 過程架構(gòu)不能拆開,不能把 Shuffle 限定在指定的集群內(nèi),不能利用部分 SSD 機(jī)器。
- M*N 次的 shuffle 過程:對(duì)于 10K mapper、5K reducer 級(jí)別的作業(yè),基本跑不完。
- NodeManager 和 Spark Shuffle Service 是同一進(jìn)程,Shuffle 過程太重,經(jīng)常導(dǎo)致 NodeManager 重啟,從而影響 Yarn 調(diào)度穩(wěn)定性。
以上的這些問題對(duì)于 Spark 研發(fā)同學(xué)是非常痛苦的,好多作業(yè)每天運(yùn)行時(shí)長方差會(huì)非常大,而且總有一些無法完成的作業(yè),要么業(yè)務(wù)進(jìn)行拆分,要么跑到獨(dú)有的 Yarn 集群中。除了現(xiàn)有面臨的挑戰(zhàn)之外,我們也在積極構(gòu)建下一代基礎(chǔ)架構(gòu)設(shè)施,隨著云原生 Kubernetes 概念越來越火,Spark 社區(qū)也提供了 Spark on Kubernetes 版本,相比較于 Yarn 來說,Kubernetes 能夠更好的利用云原生的彈性,提供更加豐富的運(yùn)維、部署、隔離等特性。但是 Spark on Kubernetes 目前還存在很多問題沒有解決,包括容器內(nèi)的 Shuffle 方式、動(dòng)態(tài)資源調(diào)度、調(diào)度性能有限等等。我們針對(duì) Kubernetes 在趣頭條的落地,主要有以下幾個(gè)方面的需求:
- 實(shí)時(shí)集群、OLAP 集群和 Spark 集群之前都是相互獨(dú)立的,怎樣能夠?qū)⑦@些資源形成統(tǒng)一大數(shù)據(jù)資源池。通過 Kubernetes 的天生隔離特性,更好的實(shí)現(xiàn)離線業(yè)務(wù)與實(shí)時(shí)業(yè)務(wù)混部,達(dá)到降本增效目的。
- 公司的在線業(yè)務(wù)都運(yùn)行在 Kubernetes 集群中,如何利用在線業(yè)務(wù)和大數(shù)據(jù)業(yè)務(wù)的不同特點(diǎn)進(jìn)行錯(cuò)峰調(diào)度,達(dá)成 ECS 的總資源量最少。
- 希望能夠基于 Kubernetes 來包容在線服務(wù)、大數(shù)據(jù)、AI 等基礎(chǔ)架構(gòu),做到運(yùn)維體系統(tǒng)一化。
因?yàn)槿ゎ^條的大數(shù)據(jù)業(yè)務(wù)目前全都部署在阿里云上,阿里云 EMR 團(tuán)隊(duì)和趣頭條的大數(shù)據(jù)團(tuán)隊(duì)進(jìn)行了深入技術(shù)共創(chuàng),共同研發(fā)了 Remote Shuffle Service(以下簡稱 RSS),旨在解決 Spark on Yarn 層面提到的所有問題,并為 Spark 跑在 Kubernetes 上提供 Shuffle 基礎(chǔ)組件。
Remote Shuffle Service 設(shè)計(jì)與實(shí)現(xiàn)
- Remote Shuffle Service 的背景
早在 2019 年初我們就關(guān)注到了社區(qū)已經(jīng)有相應(yīng)的討論,如 SPARK-25299。該 Issue 主要希望解決的問題是在云原生環(huán)境下,Spark 需要將 Shuffle 數(shù)據(jù)寫出到遠(yuǎn)程的服務(wù)中。但是我們經(jīng)過調(diào)研后發(fā)現(xiàn) Spark 3.0(之前的 master 分支)只支持了部分的接口,而沒有對(duì)應(yīng)的實(shí)現(xiàn)。該接口主要希望在現(xiàn)有的 Shuffle 代碼框架下,將數(shù)據(jù)寫到遠(yuǎn)程服務(wù)中。如果基于這種方式實(shí)現(xiàn),比如直接將 Shuffle 以流的方式寫入到 HDFS 或者 Alluxio 等高速內(nèi)存系統(tǒng),會(huì)有相當(dāng)大的性能開銷,趣頭條也做了一些相應(yīng)的工作,并進(jìn)行了部分的 Poc,性能與原版 Spark Shuffle 實(shí)現(xiàn)相差特別多,最差性能可下降 3 倍以上。同時(shí)我們也調(diào)研了一部分其他公司的實(shí)現(xiàn)方案,例如 Facebook 的 Riffle 方案以及 LinkedIn 開源的 Magnet,這些實(shí)現(xiàn)方案是首先將 Shuffle 文件寫到本地,然后在進(jìn)行 Merge 或者 Upload 到遠(yuǎn)程的服務(wù)上,這和后續(xù)我們的Kubernetes架構(gòu)是不兼容的,因?yàn)?Kubernetes 場景下,本地磁盤 Hostpath 或者 LocalPV 并不是一個(gè)必選項(xiàng),而且也會(huì)存在隔離和權(quán)限的問題。
基于上述背景,我們與阿里云 EMR 團(tuán)隊(duì)共同開發(fā)了 Remote Shuffle Service。RSS 可以提供以下的能力,完美的解決了 Spark Shuffle 面臨的技術(shù)挑戰(zhàn),為我們集群的穩(wěn)定性和容器化的落地提供了強(qiáng)有力的保證,主要體現(xiàn)在以下幾個(gè)方面:
- 高性能服務(wù)器的設(shè)計(jì)思路,不同于 Spark 原有 Shuffle Service,RPC 更輕量、通用和穩(wěn)定。
- 兩副本機(jī)制,能夠保證的 Shuffle fetch 極小概率(低于 0.01%)失敗。
- 合并 shuffle 文件,從 M*N 次 shuffle 變成 N 次 shuffle,順序讀 HDD 磁盤會(huì)顯著提升 shuffle heavy 作業(yè)性能。
- 減少 Executor 計(jì)算時(shí)內(nèi)存壓力,避免 map 過程中 Shuffle Spill。
- 計(jì)算與存儲(chǔ)分離架構(gòu),可以將 Shuffle Service 部署到特殊硬件環(huán)境中,例如 SSD 機(jī)器,可以保證 SLA 極高的作業(yè)。
- 完美解決 Spark on Kubernetes 方案中對(duì)于本地磁盤的依賴。
- Remote Shuffle Service 的實(shí)現(xiàn)
- 整體設(shè)計(jì)
Spark RSS 架構(gòu)包含三個(gè)角色:Master、Worker、Client。Master 和 Worker 構(gòu)成服務(wù)端,Client 以不侵入的方式集成到 Spark ShuffleManager 里(RssShuffleManager 實(shí)現(xiàn)了 ShuffleManager 接口)。
- Master 的主要職責(zé)是資源分配與狀態(tài)管理。
- Worker 的主要職責(zé)是處理和存儲(chǔ) Shuffle 數(shù)據(jù)。
- Client 的主要職責(zé)是緩存和推送 Shuffle 數(shù)據(jù)。
整體流程如下所示(其中 ResourceManager 和 MetaService 是 Master 的組件),如圖 2。
圖 2 RSS 整體架構(gòu)圖
- 實(shí)現(xiàn)流程
下面重點(diǎn)來講一下實(shí)現(xiàn)的流程:
- RSS 采用 Push Style 的 shuffle 模式,每個(gè) Mapper 持有一個(gè)按 Partition 分界的緩存區(qū),Shuffle 數(shù)據(jù)首先寫入緩存區(qū),每當(dāng)某個(gè) Partition 的緩存滿了即觸發(fā) PushData。
- Driver 先和 Master 發(fā)生 StageStart 的請(qǐng)求,Master 接受到該 RPC 后,會(huì)分配對(duì)應(yīng)的 Worker Partition 并返回給 Driver,Shuffle Client 得到這些元信息后,進(jìn)行后續(xù)的推送數(shù)據(jù)。
- Client 開始向主副本推送數(shù)據(jù)。主副本 Worker 收到請(qǐng)求后,把數(shù)據(jù)緩存到本地內(nèi)存,同時(shí)把該請(qǐng)求以 Pipeline 的方式轉(zhuǎn)發(fā)給從副本,從而實(shí)現(xiàn)了 2 副本機(jī)制。
- 為了不阻塞 PushData 的請(qǐng)求,Worker 收到 PushData 請(qǐng)求后會(huì)以純異步的方式交由專有的線程池異步處理。根據(jù)該 Data 所屬的 Partition 拷貝到事先分配的 buffer 里,若 buffer 滿了則觸發(fā) flush。RSS 支持多種存儲(chǔ)后端,包括 DFS 和 Local。若后端是 DFS,則主從副本只有一方會(huì) flush,依靠 DFS 的雙副本保證容錯(cuò);若后端是 Local,則主從雙方都會(huì) flush。
- 在所有的 Mapper 都結(jié)束后,Driver 會(huì)觸發(fā) StageEnd 請(qǐng)求。Master 接收到該 RPC 后,會(huì)向所有 Worker 發(fā)送 CommitFiles 請(qǐng)求,Worker 收到后把屬于該 Stage buffer 里的數(shù)據(jù) flush 到存儲(chǔ)層,close 文件,并釋放 buffer。Master 收到所有響應(yīng)后,記錄每個(gè) partition 對(duì)應(yīng)的文件列表。若 CommitFiles 請(qǐng)求失敗,則 Master 標(biāo)記此 Stage 為 DataLost。
- 在 Reduce 階段,reduce task 首先向 Master 請(qǐng)求該 Partition 對(duì)應(yīng)的文件列表,若返回碼是 DataLost,則觸發(fā) Stage 重算或直接 abort 作業(yè)。若返回正常,則直接讀取文件數(shù)據(jù)。
總體來講,RSS 的設(shè)計(jì)要點(diǎn)總結(jié)為 3 個(gè)層面:
- 采用 PushStyle 的方式做 shuffle,避免了本地存儲(chǔ),從而適應(yīng)了計(jì)算存儲(chǔ)分離架構(gòu)。
- 按照 reduce 做聚合,避免了小文件隨機(jī)讀寫和小數(shù)據(jù)量網(wǎng)絡(luò)請(qǐng)求。
- 做了 2 副本,提高了系統(tǒng)穩(wěn)定性。
- 容錯(cuò)
對(duì)于 RSS 系統(tǒng),容錯(cuò)性是至關(guān)重要的,我們分為以下幾個(gè)維度來實(shí)現(xiàn):
- PushData 失敗
當(dāng) PushData 失敗次數(shù)(Worker 掛了,網(wǎng)絡(luò)繁忙,CPU繁忙等)超過 MaxRetry 后,Client 會(huì)給 Master 發(fā)消息請(qǐng)求新的 Partition Location,此后本 Client 都會(huì)使用新的 Location 地址,該階段稱為 Revive。
若 Revive 是因?yàn)?Client 端而非 Worker 的問題導(dǎo)致,則會(huì)產(chǎn)生同一個(gè) Partition 數(shù)據(jù)分布在不同 Worker 上的情況,Master 的 Meta 組件會(huì)正確處理這種情形。
若發(fā)生 WorkerLost,則會(huì)導(dǎo)致大量 PushData 同時(shí)失敗,此時(shí)會(huì)有大量同一 Partition 的 Revive 請(qǐng)求打到 Master。為了避免給同一個(gè) Partition 分配過多的 Location,Master 保證僅有一個(gè) Revive 請(qǐng)求真正得到處理,其余的請(qǐng)求塞到 pending queue 里,待 Revive 處理結(jié)束后返回同一個(gè) Location。
- Worker 宕機(jī)
當(dāng)發(fā)生 WorkerLost 時(shí),對(duì)于該 Worker 上的副本數(shù)據(jù),Master 向其 peer 發(fā)送 CommitFile 的請(qǐng)求,然后清理 peer 上的 buffer。若 Commit Files 失敗,則記錄該 Stage 為 DataLost;若成功,則后續(xù)的 PushData 通過 Revive 機(jī)制重新申請(qǐng) Location。
- 數(shù)據(jù)去重
Speculation task 和 task 重算會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。解決辦法是每個(gè) PushData的數(shù)據(jù)片里編碼了所屬的 mapId、attemptId 和 batchId,并且 Master 為每個(gè) map task 記錄成功 commit 的 attemtpId。read 端通過 attemptId 過濾不同的 attempt 數(shù)據(jù),并通過 batchId 過濾同一個(gè) attempt 的重復(fù)數(shù)據(jù)。
- 多副本
RSS 目前支持 DFS 和 Local 兩種存儲(chǔ)后端。
在 DFS 模式下,ReadPartition 失敗會(huì)直接導(dǎo)致 Stage 重算或 abort job。在 Local 模式,ReadPartition 失敗會(huì)觸發(fā)從 peer location 讀,若主從都失敗則觸發(fā) Stage 重算或 abort job。
- 高可用
大家可以看到 RSS 的設(shè)計(jì)中 Master 是一個(gè)單點(diǎn),雖然 Master 的負(fù)載很小,不會(huì)輕易地掛掉,但是這對(duì)于線上穩(wěn)定性來說無疑是一個(gè)風(fēng)險(xiǎn)點(diǎn)。在項(xiàng)目的最初上線階段,我們希望可以通過 SubCluster 的方式進(jìn)行 workaround,即通過部署多套 RSS 來承載不同的業(yè)務(wù),這樣即使 RSS Master 宕機(jī),也只會(huì)影響有限的一部分業(yè)務(wù)。但是隨著系統(tǒng)的深入使用,我們決定直面問題,引進(jìn)高可用 Master。主要的實(shí)現(xiàn)如下:
首先,Master 目前的元數(shù)據(jù)比較多,我們可以將一部分與 ApplD ShuffleId 本身相關(guān)的元數(shù)據(jù)下沉到 Driver 的 ShuffleManager 中,由于元數(shù)據(jù)并不會(huì)很多,Driver 增加的內(nèi)存開銷非常有限。
另外,關(guān)于全局負(fù)載均衡的元數(shù)據(jù)和調(diào)度相關(guān)的元數(shù)據(jù),我們利用 Raft 實(shí)現(xiàn)了 Master 組件的高可用,這樣我們通過部署 3 或 5 臺(tái) Master,真正的實(shí)現(xiàn)了大規(guī)??蓴U(kuò)展的需求。
實(shí)際效果與分析
- 性能與穩(wěn)定性
團(tuán)隊(duì)針對(duì) TeraSort、TPC-DS 以及大量的內(nèi)部作業(yè)進(jìn)行了測試,在 Reduce 階段減少了隨機(jī)讀的開銷,任務(wù)的穩(wěn)定性和性能都有了大幅度提升。
圖 3 是 TeraSort 的 benchmark,以 10T Terasort 為例,Shuffle 量壓縮后大約 5.6T??梢钥闯鲈摿考?jí)的作業(yè)在 RSS 場景下,由于 Shuffle read 變?yōu)轫樞蜃x,性能會(huì)有大幅提升。
圖 3 TeraSort 性能測試(RSS 性能更好)
圖 4 是一個(gè)線上實(shí)際脫敏后的 Shuffle heavy 大作業(yè),之前在混部集群中很小概率可以跑完,每天任務(wù) SLA 不能按時(shí)達(dá)成,分析原因主要是由于大量的 FetchFailed 導(dǎo)致 stage 進(jìn)行重算。使用 RSS 之后每天可以穩(wěn)定的跑完,2.1T 的 shuffle 也不會(huì)出現(xiàn)任何 FetchFailed 的場景。在更大的數(shù)據(jù)集性能和SLA表現(xiàn)都更為顯著。
圖 4 實(shí)際業(yè)務(wù)的作業(yè) stage 圖(使用 RSS 保障穩(wěn)定性和性能)
- 業(yè)務(wù)效果
在大數(shù)據(jù)團(tuán)隊(duì)和阿里云 EMR 團(tuán)隊(duì)的共同努力下,經(jīng)過近半年的上線、運(yùn)營 RSS,以及和業(yè)務(wù)部門的長時(shí)間測試,業(yè)務(wù)價(jià)值主要體現(xiàn)在以下方面:
- 降本增效效果明顯,在集群規(guī)模小幅下降的基礎(chǔ)上,支撐了更多的計(jì)算任務(wù),TCO 成本下降 20%。
- SLA 顯著提升,大規(guī)模 Spark Shuffle 任務(wù)從跑不完到能跑完,我們能夠?qū)⒉煌?SLA 級(jí)別作業(yè)合并到同一集群,減小集群節(jié)點(diǎn)數(shù)量,達(dá)到統(tǒng)一管理,縮小成本的目的。原本業(yè)務(wù)方有一部分 SLA比 較高的作業(yè)在一個(gè)獨(dú)有的 Yarn 集群 B 中運(yùn)行,由于主 Yarn 集群 A 的負(fù)載非常高,如果跑到集群 A 中,會(huì)經(jīng)常的掛掉。利用 RSS 之后可以放心的將作業(yè)跑到主集群 A 中,從而釋放掉獨(dú)有 Yarn 集群 B。
- 作業(yè)執(zhí)行效率顯著提升,跑的慢→跑的快。我們比較了幾個(gè)典型的 Shuffle heavy 作業(yè),一個(gè)重要的業(yè)務(wù)線作業(yè)原本需要 3 小時(shí),RSS 版本需要 1.6 小時(shí)。抽取線上 5~10 個(gè)作業(yè),大作業(yè)的性能提升相當(dāng)明顯,不同作業(yè)平均下來有 30% 以上的性能提升,即使是 shuffle 量不大的作業(yè),由于比較穩(wěn)定不需要 stage 重算,長期運(yùn)行平均時(shí)間也會(huì)減少 10%-20%。
- 架構(gòu)靈活性顯著提升,升級(jí)為計(jì)算與存儲(chǔ)分離架構(gòu)。Spark 在容器中運(yùn)行的過程中,將 RSS 作為基礎(chǔ)組件,可以使得 Spark 容器化能夠大規(guī)模的落地,為離線在線統(tǒng)一資源、統(tǒng)一調(diào)度打下了基礎(chǔ)。
未來展望
趣頭條大數(shù)據(jù)平臺(tái)和阿里云 EMR 團(tuán)隊(duì)后續(xù)會(huì)繼續(xù)保持深入共創(chuàng),將探索更多的方向。主要有以下的一些思路:
- RSS 存儲(chǔ)能力優(yōu)化,包括將云的對(duì)象存儲(chǔ)作為存儲(chǔ)后端。
- RSS 多引擎支持,例如 MapReduce、Tez 等,提升歷史任務(wù)執(zhí)行效率。
- 加速大數(shù)據(jù)容器化落地,配合 RSS 能力,解決 K8s 調(diào)度器性能、調(diào)度策略等一系列挑戰(zhàn)。
- 持續(xù)優(yōu)化成本,配合 EMR 的彈性伸縮功能,一方面 Spark 可以使用更多的阿里云 ECS/ECI 搶占式實(shí)例來進(jìn)一步壓縮成本,另一方面將已有機(jī)器包括阿里云 ACK、ECI 等資源形成統(tǒng)一大池子,將大數(shù)據(jù)的計(jì)算組件和在線業(yè)務(wù)進(jìn)行錯(cuò)峰調(diào)度以及混部。
版權(quán)聲明:本文內(nèi)容由互聯(lián)網(wǎng)用戶自發(fā)貢獻(xiàn),該文觀點(diǎn)僅代表作者本人。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如發(fā)現(xiàn)本站有涉嫌抄襲侵權(quán)/違法違規(guī)的內(nèi)容, 請(qǐng)發(fā)送郵件至 舉報(bào),一經(jīng)查實(shí),本站將立刻刪除。