国产精品福利自产拍在线观看,国产原创中文无码精品视频,岛国av无码精品一区二区三区,人人妻妻人人

降本增效利器!趣頭條Spark RSS最佳實踐(趣頭條slogan)

導讀:阿里云 EMR 團隊和趣頭條大數(shù)據(jù)團隊共同研發(fā)了 RSS,解決 Spark on Yarn 層面提到的所有問題,并為 Spark 跑在 Kubernetes 上提供 Shuffle 基礎組件。

作者 | 王振華、曹佳清、范振

業(yè)務場景與現(xiàn)狀

趣頭條是一家依賴大數(shù)據(jù)的科技公司,在 2018~2019 年經(jīng)歷了業(yè)務的高速發(fā)展,主 App 和其他創(chuàng)新 App 的日活增加了 10 倍以上,相應的大數(shù)據(jù)系統(tǒng)也從最初的 100 臺機器增加到了 1000 臺以上規(guī)模。多個業(yè)務線依賴于大數(shù)據(jù)平臺展開業(yè)務,大數(shù)據(jù)系統(tǒng)的高效和穩(wěn)定成了公司業(yè)務發(fā)展的基石,在大數(shù)據(jù)的架構上我們使用了業(yè)界成熟的方案,存儲構建在 HDFS 上、計算資源調(diào)度依賴 Yarn、表元數(shù)據(jù)使用 Hive 管理、用 Spark 進行計算,具體如圖 1 所示:

降本增效利器!趣頭條Spark RSS最佳實踐(趣頭條slogan)

圖 1 趣頭條離線大數(shù)據(jù)平臺架構圖

其中 Yarn 集群使用了單一大集群的方案,HDFS 使用了聯(lián)邦的方案,同時基于成本因素,HDFS 和 Yarn 服務在 ECS 上進行了 DataNode 和 NodeManager 的混部。

在趣頭條每天有 6W 的 Spark 任務跑在 Yarn 集群上,每天新增的 Spark 任務穩(wěn)定在 100 左右,公司的迅速發(fā)展要求需求快速實現(xiàn),積累了很多治理欠債,種種問題表現(xiàn)出來集群穩(wěn)定性需要提升,其中 Shuffle 的穩(wěn)定性越來越成為集群的桎梏,亟需解決。

當前大數(shù)據(jù)平臺的挑戰(zhàn)與思考

近半年大數(shù)據(jù)平臺主要的業(yè)務指標是降本增效,一方面業(yè)務方希望離線平臺每天能夠承載更多的作業(yè),另一方面我們自身有降本的需求,如何在降本的前提下支撐更多地業(yè)務量對于每個技術人都是非常大地挑戰(zhàn)。熟悉 Spark 的同學應該非常清楚,在大規(guī)模集群場景下,Spark Shuffle 在實現(xiàn)上有比較大的缺陷,體現(xiàn)在以下的幾個方面:

  • Spark Shuffle Fetch 過程存在大量的網(wǎng)絡小包,現(xiàn)有的 External Shuffle Service 設計并沒有非常細致的處理這些 RPC 請求,大規(guī)模場景下會有很多connection reset 發(fā)生,導致 FetchFailed,從而導致 Stage 重算。
  • Spark Shuffle Fetch 過程存在大量的隨機讀,大規(guī)模高負載集群條件下,磁盤 IO 負載高、CPU 滿載時常發(fā)生,極容易發(fā)生 FetchFailed,從而導致 stage 重算。
  • 重算過程會放大集群的繁忙程度,搶占機器資源,導致惡性循環(huán)嚴重,SLA 完不成,需要運維人員手動將作業(yè)跑在空閑的Label集群。
  • 計算和 Shuffle 過程架構不能拆開,不能把 Shuffle 限定在指定的集群內(nèi),不能利用部分 SSD 機器。
  • M*N 次的 shuffle 過程:對于 10K mapper、5K reducer 級別的作業(yè),基本跑不完。
  • NodeManager 和 Spark Shuffle Service 是同一進程,Shuffle 過程太重,經(jīng)常導致 NodeManager 重啟,從而影響 Yarn 調(diào)度穩(wěn)定性。

以上的這些問題對于 Spark 研發(fā)同學是非常痛苦的,好多作業(yè)每天運行時長方差會非常大,而且總有一些無法完成的作業(yè),要么業(yè)務進行拆分,要么跑到獨有的 Yarn 集群中。除了現(xiàn)有面臨的挑戰(zhàn)之外,我們也在積極構建下一代基礎架構設施,隨著云原生 Kubernetes 概念越來越火,Spark 社區(qū)也提供了 Spark on Kubernetes 版本,相比較于 Yarn 來說,Kubernetes 能夠更好的利用云原生的彈性,提供更加豐富的運維、部署、隔離等特性。但是 Spark on Kubernetes 目前還存在很多問題沒有解決,包括容器內(nèi)的 Shuffle 方式、動態(tài)資源調(diào)度、調(diào)度性能有限等等。我們針對 Kubernetes 在趣頭條的落地,主要有以下幾個方面的需求:

  • 實時集群、OLAP 集群和 Spark 集群之前都是相互獨立的,怎樣能夠將這些資源形成統(tǒng)一大數(shù)據(jù)資源池。通過 Kubernetes 的天生隔離特性,更好的實現(xiàn)離線業(yè)務與實時業(yè)務混部,達到降本增效目的。
  • 公司的在線業(yè)務都運行在 Kubernetes 集群中,如何利用在線業(yè)務和大數(shù)據(jù)業(yè)務的不同特點進行錯峰調(diào)度,達成 ECS 的總資源量最少。
  • 希望能夠基于 Kubernetes 來包容在線服務、大數(shù)據(jù)、AI 等基礎架構,做到運維體系統(tǒng)一化。

因為趣頭條的大數(shù)據(jù)業(yè)務目前全都部署在阿里云上,阿里云 EMR 團隊和趣頭條的大數(shù)據(jù)團隊進行了深入技術共創(chuàng),共同研發(fā)了 Remote Shuffle Service(以下簡稱 RSS),旨在解決 Spark on Yarn 層面提到的所有問題,并為 Spark 跑在 Kubernetes 上提供 Shuffle 基礎組件。

Remote Shuffle Service 設計與實現(xiàn)

  • Remote Shuffle Service 的背景

早在 2019 年初我們就關注到了社區(qū)已經(jīng)有相應的討論,如 SPARK-25299。該 Issue 主要希望解決的問題是在云原生環(huán)境下,Spark 需要將 Shuffle 數(shù)據(jù)寫出到遠程的服務中。但是我們經(jīng)過調(diào)研后發(fā)現(xiàn) Spark 3.0(之前的 master 分支)只支持了部分的接口,而沒有對應的實現(xiàn)。該接口主要希望在現(xiàn)有的 Shuffle 代碼框架下,將數(shù)據(jù)寫到遠程服務中。如果基于這種方式實現(xiàn),比如直接將 Shuffle 以流的方式寫入到 HDFS 或者 Alluxio 等高速內(nèi)存系統(tǒng),會有相當大的性能開銷,趣頭條也做了一些相應的工作,并進行了部分的 Poc,性能與原版 Spark Shuffle 實現(xiàn)相差特別多,最差性能可下降 3 倍以上。同時我們也調(diào)研了一部分其他公司的實現(xiàn)方案,例如 Facebook 的 Riffle 方案以及 LinkedIn 開源的 Magnet,這些實現(xiàn)方案是首先將 Shuffle 文件寫到本地,然后在進行 Merge 或者 Upload 到遠程的服務上,這和后續(xù)我們的Kubernetes架構是不兼容的,因為 Kubernetes 場景下,本地磁盤 Hostpath 或者 LocalPV 并不是一個必選項,而且也會存在隔離和權限的問題。

基于上述背景,我們與阿里云 EMR 團隊共同開發(fā)了 Remote Shuffle Service。RSS 可以提供以下的能力,完美的解決了 Spark Shuffle 面臨的技術挑戰(zhàn),為我們集群的穩(wěn)定性和容器化的落地提供了強有力的保證,主要體現(xiàn)在以下幾個方面:

  • 高性能服務器的設計思路,不同于 Spark 原有 Shuffle Service,RPC 更輕量、通用和穩(wěn)定。
  • 兩副本機制,能夠保證的 Shuffle fetch 極小概率(低于 0.01%)失敗。
  • 合并 shuffle 文件,從 M*N 次 shuffle 變成 N 次 shuffle,順序讀 HDD 磁盤會顯著提升 shuffle heavy 作業(yè)性能。
  • 減少 Executor 計算時內(nèi)存壓力,避免 map 過程中 Shuffle Spill。
  • 計算與存儲分離架構,可以將 Shuffle Service 部署到特殊硬件環(huán)境中,例如 SSD 機器,可以保證 SLA 極高的作業(yè)。
  • 完美解決 Spark on Kubernetes 方案中對于本地磁盤的依賴。

  • Remote Shuffle Service 的實現(xiàn)
  • 整體設計

Spark RSS 架構包含三個角色:Master、Worker、Client。Master 和 Worker 構成服務端,Client 以不侵入的方式集成到 Spark ShuffleManager 里(RssShuffleManager 實現(xiàn)了 ShuffleManager 接口)。

  • Master 的主要職責是資源分配與狀態(tài)管理。
  • Worker 的主要職責是處理和存儲 Shuffle 數(shù)據(jù)。
  • Client 的主要職責是緩存和推送 Shuffle 數(shù)據(jù)。

整體流程如下所示(其中 ResourceManager 和 MetaService 是 Master 的組件),如圖 2。

降本增效利器!趣頭條Spark RSS最佳實踐(趣頭條slogan)

圖 2 RSS 整體架構圖

  • 實現(xiàn)流程

下面重點來講一下實現(xiàn)的流程:

  • RSS 采用 Push Style 的 shuffle 模式,每個 Mapper 持有一個按 Partition 分界的緩存區(qū),Shuffle 數(shù)據(jù)首先寫入緩存區(qū),每當某個 Partition 的緩存滿了即觸發(fā) PushData。
  • Driver 先和 Master 發(fā)生 StageStart 的請求,Master 接受到該 RPC 后,會分配對應的 Worker Partition 并返回給 Driver,Shuffle Client 得到這些元信息后,進行后續(xù)的推送數(shù)據(jù)。
  • Client 開始向主副本推送數(shù)據(jù)。主副本 Worker 收到請求后,把數(shù)據(jù)緩存到本地內(nèi)存,同時把該請求以 Pipeline 的方式轉發(fā)給從副本,從而實現(xiàn)了 2 副本機制。
  • 為了不阻塞 PushData 的請求,Worker 收到 PushData 請求后會以純異步的方式交由專有的線程池異步處理。根據(jù)該 Data 所屬的 Partition 拷貝到事先分配的 buffer 里,若 buffer 滿了則觸發(fā) flush。RSS 支持多種存儲后端,包括 DFS 和 Local。若后端是 DFS,則主從副本只有一方會 flush,依靠 DFS 的雙副本保證容錯;若后端是 Local,則主從雙方都會 flush。
  • 在所有的 Mapper 都結束后,Driver 會觸發(fā) StageEnd 請求。Master 接收到該 RPC 后,會向所有 Worker 發(fā)送 CommitFiles 請求,Worker 收到后把屬于該 Stage buffer 里的數(shù)據(jù) flush 到存儲層,close 文件,并釋放 buffer。Master 收到所有響應后,記錄每個 partition 對應的文件列表。若 CommitFiles 請求失敗,則 Master 標記此 Stage 為 DataLost。
  • 在 Reduce 階段,reduce task 首先向 Master 請求該 Partition 對應的文件列表,若返回碼是 DataLost,則觸發(fā) Stage 重算或直接 abort 作業(yè)。若返回正常,則直接讀取文件數(shù)據(jù)。

總體來講,RSS 的設計要點總結為 3 個層面:

  • 采用 PushStyle 的方式做 shuffle,避免了本地存儲,從而適應了計算存儲分離架構。
  • 按照 reduce 做聚合,避免了小文件隨機讀寫和小數(shù)據(jù)量網(wǎng)絡請求。
  • 做了 2 副本,提高了系統(tǒng)穩(wěn)定性。

  • 容錯

對于 RSS 系統(tǒng),容錯性是至關重要的,我們分為以下幾個維度來實現(xiàn):

  • PushData 失敗

當 PushData 失敗次數(shù)(Worker 掛了,網(wǎng)絡繁忙,CPU繁忙等)超過 MaxRetry 后,Client 會給 Master 發(fā)消息請求新的 Partition Location,此后本 Client 都會使用新的 Location 地址,該階段稱為 Revive

若 Revive 是因為 Client 端而非 Worker 的問題導致,則會產(chǎn)生同一個 Partition 數(shù)據(jù)分布在不同 Worker 上的情況,Master 的 Meta 組件會正確處理這種情形。

若發(fā)生 WorkerLost,則會導致大量 PushData 同時失敗,此時會有大量同一 Partition 的 Revive 請求打到 Master。為了避免給同一個 Partition 分配過多的 Location,Master 保證僅有一個 Revive 請求真正得到處理,其余的請求塞到 pending queue 里,待 Revive 處理結束后返回同一個 Location。

  • Worker 宕機

當發(fā)生 WorkerLost 時,對于該 Worker 上的副本數(shù)據(jù),Master 向其 peer 發(fā)送 CommitFile 的請求,然后清理 peer 上的 buffer。若 Commit Files 失敗,則記錄該 Stage 為 DataLost;若成功,則后續(xù)的 PushData 通過 Revive 機制重新申請 Location。

  • 數(shù)據(jù)去重

Speculation task 和 task 重算會導致數(shù)據(jù)重復。解決辦法是每個 PushData的數(shù)據(jù)片里編碼了所屬的 mapId、attemptId 和 batchId,并且 Master 為每個 map task 記錄成功 commit 的 attemtpId。read 端通過 attemptId 過濾不同的 attempt 數(shù)據(jù),并通過 batchId 過濾同一個 attempt 的重復數(shù)據(jù)。

  • 多副本

RSS 目前支持 DFS 和 Local 兩種存儲后端。

在 DFS 模式下,ReadPartition 失敗會直接導致 Stage 重算或 abort job。在 Local 模式,ReadPartition 失敗會觸發(fā)從 peer location 讀,若主從都失敗則觸發(fā) Stage 重算或 abort job。

  • 高可用

大家可以看到 RSS 的設計中 Master 是一個單點,雖然 Master 的負載很小,不會輕易地掛掉,但是這對于線上穩(wěn)定性來說無疑是一個風險點。在項目的最初上線階段,我們希望可以通過 SubCluster 的方式進行 workaround,即通過部署多套 RSS 來承載不同的業(yè)務,這樣即使 RSS Master 宕機,也只會影響有限的一部分業(yè)務。但是隨著系統(tǒng)的深入使用,我們決定直面問題,引進高可用 Master。主要的實現(xiàn)如下:

首先,Master 目前的元數(shù)據(jù)比較多,我們可以將一部分與 ApplD ShuffleId 本身相關的元數(shù)據(jù)下沉到 Driver 的 ShuffleManager 中,由于元數(shù)據(jù)并不會很多,Driver 增加的內(nèi)存開銷非常有限。

另外,關于全局負載均衡的元數(shù)據(jù)和調(diào)度相關的元數(shù)據(jù),我們利用 Raft 實現(xiàn)了 Master 組件的高可用,這樣我們通過部署 3 或 5 臺 Master,真正的實現(xiàn)了大規(guī)??蓴U展的需求。

實際效果與分析

  • 性能與穩(wěn)定性

團隊針對 TeraSort、TPC-DS 以及大量的內(nèi)部作業(yè)進行了測試,在 Reduce 階段減少了隨機讀的開銷,任務的穩(wěn)定性和性能都有了大幅度提升。

圖 3 是 TeraSort 的 benchmark,以 10T Terasort 為例,Shuffle 量壓縮后大約 5.6T。可以看出該量級的作業(yè)在 RSS 場景下,由于 Shuffle read 變?yōu)轫樞蜃x,性能會有大幅提升。

降本增效利器!趣頭條Spark RSS最佳實踐(趣頭條slogan)

圖 3 TeraSort 性能測試(RSS 性能更好)

圖 4 是一個線上實際脫敏后的 Shuffle heavy 大作業(yè),之前在混部集群中很小概率可以跑完,每天任務 SLA 不能按時達成,分析原因主要是由于大量的 FetchFailed 導致 stage 進行重算。使用 RSS 之后每天可以穩(wěn)定的跑完,2.1T 的 shuffle 也不會出現(xiàn)任何 FetchFailed 的場景。在更大的數(shù)據(jù)集性能和SLA表現(xiàn)都更為顯著。

降本增效利器!趣頭條Spark RSS最佳實踐(趣頭條slogan)

圖 4 實際業(yè)務的作業(yè) stage 圖(使用 RSS 保障穩(wěn)定性和性能)

  • 業(yè)務效果

在大數(shù)據(jù)團隊和阿里云 EMR 團隊的共同努力下,經(jīng)過近半年的上線、運營 RSS,以及和業(yè)務部門的長時間測試,業(yè)務價值主要體現(xiàn)在以下方面:

  • 降本增效效果明顯,在集群規(guī)模小幅下降的基礎上,支撐了更多的計算任務,TCO 成本下降 20%。
  • SLA 顯著提升,大規(guī)模 Spark Shuffle 任務從跑不完到能跑完,我們能夠將不同 SLA 級別作業(yè)合并到同一集群,減小集群節(jié)點數(shù)量,達到統(tǒng)一管理,縮小成本的目的。原本業(yè)務方有一部分 SLA比 較高的作業(yè)在一個獨有的 Yarn 集群 B 中運行,由于主 Yarn 集群 A 的負載非常高,如果跑到集群 A 中,會經(jīng)常的掛掉。利用 RSS 之后可以放心的將作業(yè)跑到主集群 A 中,從而釋放掉獨有 Yarn 集群 B。
  • 作業(yè)執(zhí)行效率顯著提升,跑的慢→跑的快。我們比較了幾個典型的 Shuffle heavy 作業(yè),一個重要的業(yè)務線作業(yè)原本需要 3 小時,RSS 版本需要 1.6 小時。抽取線上 5~10 個作業(yè),大作業(yè)的性能提升相當明顯,不同作業(yè)平均下來有 30% 以上的性能提升,即使是 shuffle 量不大的作業(yè),由于比較穩(wěn)定不需要 stage 重算,長期運行平均時間也會減少 10%-20%。
  • 架構靈活性顯著提升,升級為計算與存儲分離架構。Spark 在容器中運行的過程中,將 RSS 作為基礎組件,可以使得 Spark 容器化能夠大規(guī)模的落地,為離線在線統(tǒng)一資源、統(tǒng)一調(diào)度打下了基礎。

未來展望

趣頭條大數(shù)據(jù)平臺和阿里云 EMR 團隊后續(xù)會繼續(xù)保持深入共創(chuàng),將探索更多的方向。主要有以下的一些思路:

  • RSS 存儲能力優(yōu)化,包括將云的對象存儲作為存儲后端。
  • RSS 多引擎支持,例如 MapReduce、Tez 等,提升歷史任務執(zhí)行效率。
  • 加速大數(shù)據(jù)容器化落地,配合 RSS 能力,解決 K8s 調(diào)度器性能、調(diào)度策略等一系列挑戰(zhàn)。
  • 持續(xù)優(yōu)化成本,配合 EMR 的彈性伸縮功能,一方面 Spark 可以使用更多的阿里云 ECS/ECI 搶占式實例來進一步壓縮成本,另一方面將已有機器包括阿里云 ACK、ECI 等資源形成統(tǒng)一大池子,將大數(shù)據(jù)的計算組件和在線業(yè)務進行錯峰調(diào)度以及混部。

版權聲明:本文內(nèi)容由互聯(lián)網(wǎng)用戶自發(fā)貢獻,該文觀點僅代表作者本人。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如發(fā)現(xiàn)本站有涉嫌抄襲侵權/違法違規(guī)的內(nèi)容, 請發(fā)送郵件至 舉報,一經(jīng)查實,本站將立刻刪除。

(0)
上一篇 2023年12月15日 上午9:54
下一篇 2023年12月15日 上午10:10

相關推薦