使用AWK和R來解析25TB的DNA資料

大資料工作該怎麼樣做?用巨量的及其搭建Hadoop叢集,利用Spark記憶體計算?Kafka?課本上都是這樣教的。那麼實際中的大資料是怎麼處理的?是怎麼樣的一個流程?用什麼軟體架構的呢?

概述

Nick Strayer是範德比爾特大學生物統計學系的博士研究生,他負責近負責為實驗室建立一個處理大量原始DNA測序(SNP晶片)資料的工作流程。目標是能夠快速獲取給定基因位點(SNP)的資料做資料建模。使用R語言和AWK他以簡介自然的方式清理和組資料,大大加快了查詢速度,達到滿意的資料處理效果。

資料

些資料由範德比爾特大學遺傳學處理中心提供為25 TB的tsvs資料。資料有五部分構成,每部分包含大約240個4G大小的檔案。檔案中,每行包含一個樣本的單個SNP的資料。隨著SNP值的增加,讀數的強度,不同等位基因的頻率等等都有多個數字列。要從中獲取大約30列有意義的片段資料。

目標

和任何資料專案一樣,最重要的是考慮如何使用資料。主要做法是透過SNP基礎在SNP上擬合模型和工作流程。即一次只需要一個SNP的資料。需要儘可能簡單,快速和便宜地提取與250萬SNP記錄中的一個相關的所有記錄。

使用AWK和R來解析25TB的DNA資料

失敗嘗試

第一次嘗試

首先Nick透過一兩個小時架設了一個Hive伺服器來執行處理資料。由於資料儲存在AWS S3物件儲存上,他使用因此Athena的服務,該服務允許對S3資料執行Hive SQL查詢。你可以避免設定/啟動Hive群集,只需為搜尋到的資料付費即可。

在將Athena指向實驗資料做其格式後,運行了一些測試,比如:

select * from intensityData limit 10;

很快就能獲得查詢結果。但是當要實際使用這些資料時,查詢要求獲取SNP的所有資料,執行如下查詢:

select * from intensityData where snp = ‘rs123456’;

該查詢耗費了8分鐘和4+T位元組的資料,才查詢到了結果。Athena的服務收費為每TB 5美元的。所以查詢需要8分鐘和20美元的賬單。

如果要完成所有模型所需的查詢,大概需要38年和5000w美刀的賬單,顯然該方法是不可取的。

經驗教訓:沒有一種廉價的方法可以同時解析25tb的資料。

使用Parquet最佳化

第一次失敗後,最佳化策略是嘗試對資料格式進行轉化,所有TSV轉換為Parquet檔案。Parquet檔案適用於處理較大的資料集,因為它以列式儲存方資料。因此每列都儲存在其自己的記憶體/磁碟部分中,與行式文字檔案不同。這樣查詢只需讀取感興趣的列內容即可。此外,它還會按列保留每個檔案的值範圍記錄,因此,如果要查詢的值不在列範圍內,Spark不會耗費時間掃描檔案。

這樣工作執行AWS的大資料元件AWS Glue,將TSV資料轉換為Parquet,並將新的Parquet檔案連線到Athena。轉化大概花了大約五個小時。但是,當執行查詢時,它花費了大約相同的時間和但是資料流量小一點。由於Spark嘗試最佳化作業只是將單個TSV塊解壓縮並將其放置在自己的Parquet塊中。因為每個塊都足以包含多個樣本的完整記錄,所以每個檔案都包含其中的所有SNP,Spark必須開啟所有檔案才能提取需要的內容。

使用AWK和R來解析25TB的DNA資料

有趣的預設(和推薦)Parquet壓縮型別:snappy不可拆分。因此,每個執行程式仍然處於解壓縮和載入整個3。5gig資料集的任務中。

經驗教訓:小心Parquet檔案大小和型別。

解決問題

為了解決這個問題需要做的就是對SNP列而不是個體上的資料進行排序。這將允許一定數量的資料僅包含少量SNP,而Parquet的智慧僅開放式值範圍內功能可以做到一點。然而,對跨群集分佈的數十億行資料進行排序並非易事。

最後悲劇發生了,AWS排序的元件Glue運行了2天后奔潰了,所有結果沒了,AWS還不打算為該單退款。

經驗教訓:排序很難,特別是分散式資料。

嘗試資料分割槽

另一個想法是將資料劃分為染色體。這樣可以將資料減少為更易於管理的塊。透過在Glue指令碼中新增一行:partition_by = “chr”到Spark匯出功能將資料放入這些儲存桶中。

使用AWK和R來解析25TB的DNA資料

不幸的是,事情也不順利。因為染色體的大小不同,其中的資料量會各不相同。Spark傳送給其計算Jobs的任務不均衡,並且由於某些節點提前完成並處於空閒狀態而執行緩慢。當查詢單個SNP時,資料不平衡再次引起問題:在較大的染色體中(也就是實際想要獲取資料的地方),成本僅提高了數10倍甚至更多。

經驗教訓:Spark中的分割槽需要平衡。

細化分割槽

和上面簡單13分割槽方法對比,Nika又做了一件極端的決定,對每個SNP上進行分割槽,保證了每個分割槽的大小相等。然而該方法是有問題的,在使用了Glue並新增partition_by =‘snp’,工作開始並執行。一天後,透過查詢資料並沒有寫入S3,所以kill掉了這項任務。實際上Glue的做法是將中間檔案寫入隱藏的S3位置,都有20億資料量。在透過1天,並且1000美刀的賬單後才意識到了錯誤。

經驗教訓:永遠不要嘗試分250萬個分割槽。

分割槽+排序

既然細化大分割槽的方法也不行,那麼只好採用分割槽+排序的方法結合了。首先透過染色體上進行分割槽,然後對每個分割槽進行排序。理論上,這能使每個查詢更快,因為期望的SNP資料將僅存在於給定區域內的一些Parquet塊的範圍內。

然而事實證明,分割槽資料的排序也需要大量的工作,最終切換到EMR自建叢集,使用8個強大的AWS例項虛擬機器(C5。4xl),結合Sparklyr構建更靈活的工作流程:

Sparklyr片段按chr分割槽並在分割槽中排序w/in,使用snp bin加入原始資料

原始資料

raw_data

group_by(chr) %>%

arrange(Position) %>%

Spark_write_Parquet(

path = DUMP_LOC,

mode = ‘overwrite’,

partition_by = c(‘chr’)

但是,工作並沒有解決。嘗試了所有的調優技巧:增加了分配給查詢的每個執行器的記憶體;使用了高ram節點型別;廣播變數。但它總是繞過一半,然後執行器會慢慢開始失敗,直到所有查詢最終都會停止。

經驗教訓:排序仍然很難,調整Spark也是無濟於事。

更有創意的嘗試

每個SNP都有一個位置值。這是一個整數,對應它染色體上有多少個鹼基。這是一種很好的組織資料的方法。一個想法是按照每條染色體的區域構建分割槽。Aka (positions 1 - 2000, 2001 - 4000, 等等)。問題是SNP沿染色體分佈不均勻,因此大小會有很大差異。

使用AWK和R來解析25TB的DNA資料

解決方案是按位置排名。已經載入的資料進行了查詢,以獲得唯一SNP,它們的位置和染色體的列表。然後在每個染色體內進行分類,並將SNP捆綁到給定大小的區間中。例如1000 SNPS,這樣就得到從SNP -> bin-in-chromosome的對映。

snp_to_bin %

group_by(chr) %>%

arrange(position) %>%

mutate(

rank = 1:n()

bin = floor(rank/snps_per_bin)

ungroup()

經驗教訓:定製資料需要定製解決方案。

使用Spark

目標是將這個小的(250萬行)資料幀讀入Spark,將其與原始資料連線,然後在新新增的bin列上進行分割槽。

data_w_bin %

left_join(sdf_broadcast(snp_to_bin), by =‘snp_name’) %>%

group_by(chr_bin) %>%

arrange(Position) %>%

Spark_write_Parquet(

path = DUMP_LOC,

mode = ‘overwrite’,

partition_by = c(‘chr_bin’)

注意使用了sdf_broadcast(),這讓Spark知道它應該將這個資料幀傳送到所有節點。當資料很小並且所有任務都需要時,會很有用。

然而任務也沒有成功。與排序嘗試一樣,作業執行一段時間完成加入任務,然後隨著分割槽啟動,執行程式就開始崩潰。

經驗教訓:Spark加速很快,但分割槽仍然很昂貴

正確嘗試

引入AWK

截止目前,所有的Spark故障都是由於資料在群集周圍亂序。解決這種亂序,可以透過一些預處理幫助它,可以嘗試將原始文字資料拆分到染色體列上,這樣就可以為Spark提供一些“預分割槽”資料。

在解決如何按列值拆分問題時候,引入了AWK,可以透過在指令碼中執行寫入而不是將結果傳送到stdout來按列的值拆分文字檔案。下面一個一個bash指令碼測試該項工作,下載了一個gzip壓縮檔案,然後使用gzip將其解壓縮,將其穿給awk。

gzip -dc path/to/chunk/file。gz |

awk -F ‘\t’ \

‘’

有效,工作解決了。

經驗教訓:不要睡在基礎知識上。有人可能在80年代解決了你的問題。

parallel並行執行

資料分割的過程有點慢。透過htop監控系統過程中,發現高配置的ec2例項只使用了單核和200 多M的記憶體,所以並行化執行分割任務是必須的選擇,這個工具也很成熟那就是gnu parallel,用來並行任務的最佳選擇。使用新的GNU並行工作流執行拆分效率非常好,但是將S3物件下載到磁碟有點慢並且沒有完全並行化而導致瓶頸,最佳化方法:

1。發現可以直接將S3下載步驟加入管道中,跳過中間磁碟儲存。這樣可以避免將原始資料寫入磁碟,也可以在AWS上使用更小且更便宜的儲存。

3。切換到網路速度最佳化的ec2例項,就是名稱中帶有n的那些。然而,使用帶‘n’例項導致的計算能力損失超過了下載速度的提高。

4。將gzip更換為pigz,這是一個並行的gzip工具,可以執行一最佳化操作來並行化解壓縮。

結合這些最佳化,任務變得很快。透過提高下載速度和避免寫入磁碟,現在能夠在幾個小時內處理整個5 TB的資料。

透過gnu-parallel,可以解壓縮和分割19gig csv,就像可以下載一樣快。甚至無法讓Spark執行它。

使用AWK和R來解析25TB的DNA資料

經驗教訓:gnu parallel是神奇的,每個人都應該使用它。

使用新轉換後的資料

處理過後,在S3以解壓縮和半組織格式儲存了資料,現在繼續執行Spark查詢了。確實最終在Spark上完成事情,分割槽Parquet檔案並不是非常小(200KB左右)但資料到了它該到的地方。

使用AWK和R來解析25TB的DNA資料

經驗教訓:Spark喜歡未壓縮的資料,不喜歡組合分割槽。

測試本地Spark查詢

經驗教訓:Spark對於簡單的工作來說是一個很大的開銷。

隨著資料載入到合理的格式,可以測試速度。透過設定了一個R指令碼來啟動本地Spark伺服器,然後從給定的Parquet位置載入Spark資料幀。嘗試載入所有資料,但由於某種原因無法讓Sparklyr識別分割槽。

使用AWK和R來解析25TB的DNA資料

花費了29。415秒。比以前好多了。此外,無法透過啟用快取來加快速度,因為當將bin的Spark資料幀快取在記憶體中時,Spark總是崩潰,即使資料集有50多G。

使用AWK關聯陣列

Nick還是使用了AWK關聯陣列來執行聯合SNP -> bin表和我的原始資料,而不使用Spark。

透過在AWK指令碼中使用了BEGIN塊。

join_data。awk

while(getline 。。。)命令從bin csv載入所有行,並將第一列(SNP名稱)設定為bin關聯陣列的鍵,將第二個值(bin)設定為值。然後,在主檔案的每一行上執行中,每一行都被髮送到一個輸出檔案,該檔案具有基於其bin的唯一名稱:。。。_bin_“bin[$1]”_。。。。

變數batch_num和chunk_id的對應管道給出的資料,這些資料確保並行執行的每個執行緒寫入其自己的唯一檔案來避免資料寫入中的競爭條件。

所有原始資料拆分後的染色體資料夾,所以現在可以用另一個bash指令碼來一次處理染色體並將更多分割槽資料傳送回S3。

使用AWK和R來解析25TB的DNA資料

此指令碼有兩個並行部分:

第一個讀入包含所需染色體資料的每個檔案,並將它們分成多個執行緒,將其檔案吐入其代表性區域。為了防止多個執行緒寫入相同bin檔案的競爭條件,AWK被傳遞它用於寫入唯一位置的檔案的名稱,例如, chr_10_bin_52_batch_2_aa。csv這導致磁碟上有大量的小檔案(為此使用了1TB EBS卷)。

第二個並行管道透過並將每個bin的單獨檔案合併到帶有cat的單個csv中,並將它們傳送給匯出。。。

經驗教訓:AWK中的關聯陣列非常強大。

使用R語言

上面這個bash指令碼的這一部分:。。。cat chunked/*_bin_{}_*。csv | 。/upload_as_rds。R。。。。。此行將bin的所有連線檔案管道傳輸到以下R指令碼中。處理

使用AWK和R來解析25TB的DNA資料

透過傳遞readr :: read_csv變數檔案(“stdin”),它將透過管道傳輸到R指令碼的資料載入到資料幀中,然後使用aws。s3將其作為。rds檔案直接寫入s3。

Rds有點像Parquet的初級版本,沒有柱狀儲存的細節。

即使工作流程中的R速度非常慢,整個流程也非常快。用於讀取和寫入資料的R部分相當最佳化。在單個平均大小的染色體上測試後,使用C5n。4xl例項在大約兩個小時內完成了這項工作。

經驗教訓:以從R指令碼中訪問stdin和stdout,從而在管道中使用它。

S3的限制

事實證明,S3將給定檔案的路徑視為可以被認為是雜湊表或基於文件的資料庫的簡單金鑰。將“桶”視為表,每個檔案都是一個條目。因為速度和效率對S3為亞馬遜賺錢很重要,所以這個關鍵檔案路徑系統是超級最佳化的並不奇怪。為了最佳化查詢,我們不需要做大量的get請求,希望查詢速度很快。最後發現製作大約20k的bin檔案效果最好。

經驗教訓:由於智慧路徑實現,S3可以處理大量檔案。

格式相容

“為什麼你會使用專有的檔案格式?”,主要是為了最佳化載入速度(使用gzipped csvs載入大約7倍的時間)以及與工作流程的相容性。R可以輕鬆載入Parquet(或Arrow)檔案,而不會產生Spark的開銷。

如果最終需要將資料轉換為另一種格式,而仍然擁有原始的原始文字資料,並且可以再次執行管道。

經驗教訓:過早最佳化儲存方法是浪費時間的根本原因。

流程完成

既然有單個染色體工作的工作流程,為了處理所有染色體的資料。需要啟動多個ec2例項來轉換的所有資料,但不想讓超級不平衡的工作負載(就像Spark從不平衡的分割槽中受到的影響)。為了解決這個問題,使用R。編寫一個作業最佳化指令碼。

首先,查詢S3以確定每個染色體在儲存方面的大小。

使用AWK和R來解析25TB的DNA資料

結果:

使用AWK和R來解析25TB的DNA資料

然後編寫了一個函式,它將獲取這個總大小資訊,,然後拆分成num_jobs組並報告每個作業資料大小的變數。

使用AWK和R來解析25TB的DNA資料

結果:

使用AWK和R來解析25TB的DNA資料

這樣所有工作接本上就可以搞定,將之前的bash指令碼包裝成一個大的for迴圈:

for DESIRED_CHR in “16” “9” “7” “21” “MT”

do

# 處理單個蛋白質的指令碼

done

最後新增一個關機命令。

sudo shutdown -h now

使用AWS CLI來啟動一堆例項,透過user_data選項將它們的作業的bash指令碼傳遞給它們。它們自動執行然後自動關閉,這樣不會多耗費一分的成本(雲計算時代需要做的)。

aws ec2 run-instances 。。。\

——tag-specifications “ResourceType=instance,Tags=[]” \

——user-data file://>

經驗教訓:不要試圖最佳化工作,讓計算機去做。

提供API

最後一步是簡化實驗室成員儘可能多地使用資料的過程。他提供一個簡單的查詢API。如果將來決定從使用。rds切換到Parquet檔案,希望坑自動適配。他構建並記錄了一個非常簡單的介面其中包含一些用於訪問資料的函式,以函式get_snp為中心。此外,他還構建了一個pkgdown站點,因此實驗室成員可以輕鬆檢視示例/文件。

使用AWK和R來解析25TB的DNA資料

經驗教訓:為終端使用者保持API簡單,並提供靈活性。

智慧快取。

由於這些資料的主要工作流程之一是同時在一堆SNP上執行相同的模型/分析,在為SNP提取資料時,整個bin的資料將保留並附加到返回的物件。這意味著如果執行新查詢,則可以(可能)使用舊查詢結果來加速它。

使用AWK和R來解析25TB的DNA資料

在構建軟體包時,運行了很多基準來比較不同方法之間的速度,透過它可以讓我們瞭解直覺中的誤解。比如,dplyr::filter比使用基於索引的過濾來獲取行要快得多,但使用索引語法從過濾後的資料幀中獲取單個列要快得多。

經驗教訓:如果的資料設定良好,快取將很容易!

結果

速度大大提高。典型的用例是掃描基因組的功能重要區域(例如基因)。之前,無法做到這一點之前(因為它花費太多的錢)。現在由於bin結構和快取,每個SNP查詢平均需要不到十分之一秒,並且資料使用量甚至不足以進行舍入的S3成本。

使用AWK和R來解析25TB的DNA資料

總結教訓

最終的解決方案是定製的,幾乎肯定不是最佳解決方案,但是反覆試驗的產物。

此外,如果你處於僱傭某人作為資料科學家的位置,請考慮這樣一個事實,即擅長這些工具需要經驗,而且經驗需要資金。Nick很幸運,有資金支付這筆費用,但許多確定的人永遠不會有機會,因為他們沒有資金進行嘗試。“大資料”工具是通才。如果你有時間,幾乎可以肯定能夠使用智慧資料清理,儲存和檢索技術為你的問題編寫更快的解決方案。當然,最終會歸結為成本效益分析。

TAG: 資料SparkBinSNP分割槽