數據源、storm應用、結果集。storm應用從數據源讀取數據

2018-02-28 17:12:29 ortotra

有贊使用storm已經有将近3年時間,穩定支撐着實時統計、數據同步、對賬、監控、風控等業務。訂單實時統計是(shì)其中一個典型的業務,對數據準确性、性能等方面都有較高要求,也是(shì)上線(xiàn)時間最久的一個實時計算應用。通過訂單實時統計,描述使用storm時,遇到的準确性、性能、可靠性等方面的問題。


訂單實時統計的演進

第一版:流程走通

在使用storm之前,顯示實時統計數據一般有兩種方案:


在數據庫裏執行count、sum等聚合查詢,是(shì)簡單快速的實現(xiàn)方案,但(dàn)容易出現(xiàn)慢(màn)查詢。

在業務代碼裏對統計指标做累加,可以滿足指标的快速查詢,但(dàn)統計邏輯耦合到業務代碼,維護不方便,而且錯誤數據定位和修正不方便。

既要解耦業務和統計,也要滿足指标快速查詢,基于storm的實時計算方案可以滿足這兩點需求。


一個storm應用的基本結構有三部分:數據源、storm應用、結果集。storm應用從數據源讀取數據,經過計算後,把結果持久化或發送消息給其他應用。




第一版的訂單實時統計結構如下(xià)圖。在數據源方面,最早嘗試在業務代碼裏打日志的方式,但(dàn)總有業務分支無法覆蓋,采集的數據不全。我們的業務數據庫是(shì)mysql,随後嘗試基于mysql binlog的數據源,采用了阿裏開源的canal,可以做到完整的收集業務數據變更。


在結果數據的處理上,我們把統計結果持久化到了mysql,并通過另一個後台應用的RESTful API對外提供服務,一個mysql就可以滿足數據的讀寫需求。




爲了提升實時統計應用吞吐量,需要提升消息的并發度。spout裏設置了消息緩沖區,隻要消息緩沖區不滿,就會源源不斷從消息源canal拉取數據,并把分發到多個bolt處理。


第二版:性能提升

第一版的性能瓶頸在統計結果持久化上。爲了确保數據的準确性,把所有的統計指标持久化放(fàng)在一個數據庫事務裏。一筆訂單狀态更新後,會在一個事務裏有兩類操作:


訂單的曆史狀态也在數據庫裏存着,要與曆史狀态對比決定統計邏輯,并把最新的狀态持久化。storm的應用本身是(shì)無狀态的,需要使用存儲設備記錄狀态信息

當大家知(zhī)道實時計算好用後,各産品都希望有實時數據,統計邏輯越來越複雜(zá)。店鋪、商品、用戶等多個指标的寫操作都是(shì)在一個事務裏commit,這一簡單粗暴的方式早期很好滿足的統計需求,但(dàn)是(shì)對于update操作持有鎖時間過長,嚴重影響了并發能力。

爲此做了數據庫事務的瘦身:


去(qù)除曆史狀态的mysql持久化,而是(shì)通過單條binlog消息的前後狀态對比,決定統計邏輯,這樣就做到了統計邏輯上的無狀态。但(dàn)又(yòu)産生了新問題,如何保證消息有且隻有處理一次,爲此引入了一個redis用于保存最近24小時内已成功處理的消息binlog偏移量,而storm的消息分發機制又(yòu)可以保證相(xiàng)同消息總是(shì)能分配到一個bolt,避免線(xiàn)程安全問題。

統計業務拆分,先是(shì)線(xiàn)上業務和公司内部業務分離(lí),随後又(yòu)把線(xiàn)上業務按不同産品拆分。這個不僅僅是(shì)bolt級别的拆分,而是(shì)在spout就完全分開

随着統計應用拆分,在canal和storm應用之間加上消息隊列。canal不支持多消費(fèi)者,而實時統計業務也不用關系數據庫底層遷移、主從切換等維護工作,加上消息隊列能把底層數據的維護和性能優化交給更專業的團隊來做。

熱點數據在mysql裏做了分桶。比如,通常一個店鋪天級别的統計指标在mysql裏是(shì)一行數據。如果這個店鋪有突發的大量訂單,會出現(xiàn)多個bolt同時去(qù)update這行數據,出現(xiàn)數據熱點,mysql裏該行數據的鎖競争異常激烈。我們把這樣的熱點數據做了分桶,實驗證明在特定場景下(xià)可以有一個數量級吞吐量提升。

最終,第二版的訂單實時統計結構如下(xià),主要變化在于引入了MQ,并使用redis作爲消息狀态的存儲。而且由最初的一個應用,被拆成了多個應用。




第三版:準确性提升

經過第二版的優化,實時統計的吞吐量已經不成問題,但(dàn)還是(shì)遇到了做大數據最重要的準确性的問題:


統計口徑是(shì)會變化的,同樣是(shì)GMV,一年前和現(xiàn)在的算法可能有變化。例如一筆貨到付款訂單,是(shì)買家下(xià)單算成交,還是(shì)賣家發貨成交,在不同的時期可能使用不同的算法。

實時統計隻能按照當時的算法來做計算。有可能出現(xiàn)一段時間周期内的GMV,前一段是(shì)按舊(jiù)算法來計算,後一段按新算法來計算,提供的數據就不準确了。

實時統計難免會出現(xiàn)bug,有不準确的結果,修複錯誤數據是(shì)個難題。

爲了解決這個問題,凡是(shì)涉及到兩天以前數據的,一律由離(lí)線(xiàn)計算提供,最終展示給用戶的數據,就是(shì)曆史離(lí)線(xiàn)統計數據,并上今日昨日實時統計數據。爲什麽是(shì)今日昨日實時統計呢?因爲離(lí)線(xiàn)統計有數據準備、建模、統計的過程,要花費(fèi)幾個小時,每天的淩晨很可能還得不到前一天的離(lí)線(xiàn)統計結果。


一旦統計口徑有變化,隻需要重跑離(lí)線(xiàn)統計任務就可修複曆史數據,做到了冷熱數據分離(lí)。




實時計算的常見(jiàn)問題

通過訂單實時統計的案例,可以抽象出一些基于storm實時計算的共性問題。


消息狀态管理

storm不提供消息狀态管理,而且爲了達到水平擴展,最好是(shì)消息之間無狀态。對于大數據量、低精度的應用,需要做到無狀态。而像訂單實時統計這樣數據量不算太大,但(dàn)精度要求極高的場景,需要記錄消息處理狀态。而爲了應付重啓、分布式擴展的場景,往往需要額外的介質來存儲狀态。狀态信息往往是(shì)kv形式的讀寫,我們在實際的應用中,使用過redis、HBase作爲存儲。


消息不丢失、不重複、不亂序

對于準确性要求高的場景,需要保證數據正确的隻消費(fèi)一次。storm的有三種消息處理模式:


at most once,若不實現(xiàn)ack和fail方法,無論後續處理結果如何,消息隻會發送一次,必定不能滿足高準确性;

at least once,若實現(xiàn)了ack和fail方法,隻有調用了ack方法才會任務處理成功,否則會重試。可能會出現(xiàn)消息重複,在并發場景下(xià)重複又(yòu)意味着可能出現(xiàn)亂序;

exactly once,trident每個micro batch作爲整體隻成功處理一次,但(dàn)也是(shì)無法保證消息真的隻正确的處理一次,比如數據已經處理完畢并持久化,但(dàn)向數據源ack時失敗,就可能會有重試。

對于消息重複、亂序的場景,不是(shì)簡單的消息幂等能解決,有以下(xià)的處理思路:


使用前面提到的狀态管理的辦法,識别出重複、亂序的數據;

業務邏輯中,兼容重複、亂序數據,比如維護一個業務狀态機,把異常數據剔除。

對于時序判斷,盡量不用使用時間戳,因爲在分布式系統裏,各服務器時間不一緻是(shì)很常見(jiàn)的問題。


我們會嘗試在運行過程中重啓消息源、storm應用、存儲/MQ等下(xià)遊系統,或者制造網絡丢包、延遲等異常,手工觸發可能的消息丢失、重複、亂序場景,來驗證我們的應用能否對應這些異常情況。


複雜(zá)拓撲

在storm的文檔裏,有很多類似下(xià)圖的複雜(zá)應用。




對于需要消息可靠處理的場景,是(shì)不适合這樣複雜(zá)拓撲的,部分失敗如何回滾,是(shì)否要全部bolt處理完畢才ack,是(shì)需要面對的問題。過長的拓撲鏈路,裏面的慢(màn)速邏輯會拖慢(màn)整體性能。


可以考慮使用更簡化的拓撲,不同的邏輯之間盡量解耦,需要使用bolt的結果時,可以把數據持久化或者推送到MQ。




監控

生産環境少不了監控,除了服務器的基礎監控,還加了不少storm特有的監控:


消息延遲:消息在業務系統的時間戳與storm應用的當前時間戳對比,大于一定阈值則告警,不同應用的阈值會不同;

消息處理時長、fail數:這兩個都可以由storm的接口獲取,數值偏大很可能是(shì)出了問題;

應用TPS:記錄應用的emit、ack、fail數的變化趨勢,幫助分析應用的運行情況;

任務級監控:每台服務器的worker、executor數量,這也可以通過storm接口獲取。

除此之外,會有各類應用特有的監控,一般都是(shì)離(lí)線(xiàn)計算的結果與實時計算結果對比。對于數據同步類的應用,數據量比較大,可能會使用采樣的方式做校驗。