訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

求關注

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

高階特性全在這裡!(上)

前言

前面我們介紹了RabbitMQ的安裝、各大訊息中介軟體的對比、AMQP核心概念、管控臺的使用、快速入門RabbitMQ。本章將介紹RabbitMQ的高階特性。分兩篇(上/下)進行介紹。

訊息如何保障100%的投遞成功?

冪等性概念詳解

在海量訂單產生的業務高峰期,如何避免訊息的重複消費的問題?

Confirm確認訊息、Return返回訊息

1 訊息如何保障100%的投遞成功?

1.1 什麼是生產端的可靠性投遞?

保障訊息的成功發出

保障MQ節點的成功接收

傳送端收到MQ節點(Broker)確認應答

完善的訊息進行補償機制

前三步不一定能保障訊息能夠100%投遞成功。因此要加上第四步

BAT/TMD 網際網路大廠的解決方案:

- 訊息落庫,對訊息狀態進行打標

在傳送訊息的時候,需要將訊息持久化到資料庫中,並給這個訊息設定一個狀態(未傳送、傳送中、到達)。當訊息狀態發生了變化,需要對訊息做一個變更。針對沒有到達的訊息做一個輪訓操作,重新發送。對輪訓次數也需要做一個限制3-5次。確保訊息能夠成功的傳送。

訊息的延遲投遞,做二次確認,回撥檢查

具體採用哪種方案,還需要根據業務與訊息的併發量而定。

1.2 第一種方案:

生產端-可靠性投遞

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

第一種方案

圖解:

藍色部分表示:生產者負責傳送訊息傳送至Broker端

Biz DB:訂單資料庫 MSG DB: 訊息資料

面對小規模的應用可以採用加事務的方式,保證事務的一致性。但在大廠中面對高併發,並沒有加事務,事務的效能拼接非常嚴重,而是做補償。

比如:如下發一條訂單訊息。

step1:儲存訂單訊息(建立訂單),業務資料入庫,訊息也入庫。缺點:需要持久化兩次。(status:0)

step2:在step1成功的前提下,傳送訊息

step3:Broker收到訊息後,confirm給我們的生產端。Confirm Listener非同步監聽Broker回送的訊息。

step4:抓取出指定的訊息,更新(status=1),表示訊息已經投遞成功。

step5:分散式定時任務獲取訊息狀態,如果等於0則抓取資料出來。

step6:重新發送訊息

step7:重試限制設定3次。如果訊息重試了3次還是失敗,那麼(status=2),認為這個訊息就是失敗的。

查詢這些訊息為什麼失敗,可能需要人工去查詢。

假設step2執行成功,step3由於網路閃斷。那麼confirm將永遠收不到訊息,那麼我們需要設定一個規則:

例如:在訊息入庫的時候,設定一個臨界值 timeout=5min,當超過5min之後,就將這條資料抓取出來。

或者寫一個定時任務每隔5分鐘就將status=0的訊息抓取出來。可能存在小問題:訊息傳送出去,定時任務又正好剛執行,Confirm還未收到,定時任務就會執行,會導致訊息執行兩次。

更精細化操作:訊息超時容忍限制。confirm在2-3分鐘內未收到訊息,則重新發送。

保障MQ我們思考如果第一種可靠性投遞,在高併發的場景下是否合適?

第一種方案對資料有兩次入庫,一次業務資料入庫,一次訊息入庫。這樣對資料的入庫是一個瓶頸。

其實我們只需要對業務進行入庫。

訊息的延遲投遞,做二次確認,回撥檢查

這種方式並不一定能保證100%成功,但是也能保證99。99%的訊息成功。如果遇到特別極端的情況,那麼就只能需要人工去補償,或者定時任務去做。

第二種方式主要是為了減少對資料庫的操作。

看下第二種方式:

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

第二種方案

圖解:

Upstream service:生產端

DownStream service:消費端

Callback service:回撥服務

step1:業務訊息入庫成功後,第一次訊息傳送。

step2:同樣在訊息入庫成功後,傳送第二次訊息,這兩條訊息是同時傳送的。第二條訊息是延遲檢查,可以設定2min、5min 延遲傳送。

step3:消費端監聽指定佇列。

step4:消費端處理完訊息後,內部生成新的訊息send confirm。投遞到MQ Broker。

step5: Callback Service 回撥服務監聽MQ Broker,如果收到Downstream service傳送的訊息,則可以確定訊息傳送成功,執行訊息儲存到MSG DB。

step6:Check Detail檢查監聽step2延遲投遞的訊息。此時兩個監聽的佇列不是同一個,5分鐘後,Callback service收到訊息,檢查MSG DB。如果發現之前的訊息已經投遞成功,則不需要做其他事情。如果檢查發現失敗,則Callback 進行補償,主動傳送RPC 通訊。通知上游生產端重新發送訊息。

這樣做的目的:少做了一次DB儲存。關注點並不是百分百的投遞成功,而是效能。

2. 冪等性概念

2.1 冪等性是什麼?

冪等(idempotent、idempotence)是一個數學與計算機學概念,常見於抽象代數中,即f(f(x)) = f(x)。簡單的來說就是

一個操作多次執行產生的結果與一次執行產生的結果一致

我們可以借鑑資料庫的樂觀鎖機制:

比如我們執行一條更新庫存的SQL語句:

UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1

利用加版本號Version的方式來保證冪等性。

推薦文章:面試必備的資料庫悲觀鎖與樂觀鎖

2.2 消費端-冪等性保障

在海量訂單產生的業務高峰期,如何避免訊息的重複消費問題?

在高併發的情況下,會有大量的訊息到達MQ,消費端需要監聽大量的訊息。這樣的情況下,難免會出現訊息的重複投遞,網路閃斷等等。如果不去做冪等,則會出現訊息的重複消費。

-消費端實現冪等性,就意味著,我們的訊息永遠不會被消費多次,即使我們收到了多條一樣的訊息,也只會執行一次。

看下網際網路大廠主流的冪等性操作:

-唯一ID+指紋嗎機制,利用資料庫主鍵去重。

-利用Redis的原子性實現

-其他的技術實現冪等性

2.2.1 唯一ID+指紋碼機制

唯一ID + 指紋嗎機制,利用資料庫主鍵去重。

保證唯一性

SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指紋碼

如果查詢沒有,則新增。有則不需要做任何操作,消費端不需要消費訊息。

好處:實現簡單

壞處:高併發下有資料庫寫入的效能瓶頸

解決方案:跟進ID進行分庫分表進行演算法路由

分攤流量壓力。

2.2.2 Redis 原子特性實現

最簡單使用Redis的自增。

使用Redis進行冪等,需要考慮的問題。

第一:我們是否需要資料落庫,如果落庫的話,關鍵解決的問題是資料庫和快取如何做到原子性?

加事務不行,Redis和資料庫的事務不是同一個,無法保證同時成功同時失敗。大家有什麼更好的方案呢?

第二:如果不進行落庫,那麼都儲存到快取中,如何設定定時同步的策略?

怎麼做到快取資料的穩定性?

3. Confirm 確認訊息

理解Confirm 訊息確認機制:

訊息的確認,是指生產者投遞訊息後,如果Broker收到訊息,則會給我們生產者一個應答。

生產者進行接收應答,用來確定這條訊息是否正常的傳送到Broker,這種方式也是訊息的可靠性投遞的核心保障!

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

Confirm確認訊息流程圖

藍色:producer 生產者 紅色:MQ Broker 伺服器

生產者把訊息傳送到Broker端,Broker收到訊息之後回送給producer。Confirm Listener 監聽應答。

操作是非同步操作,當生產者傳送完訊息之後,就不需要管了。Confirm Listener 監聽MQ Broker的應答。

3.1 如何實現Confirm確認訊息?

第一步:在channel上開啟確認模式:

channel.confirmSelect()

第二步;在chanel上 新增監聽:

addConfirmListener

,監聽成功和失敗的返回結果,根據具體的結果對訊息進行重新發送、或記錄日誌等後續處理!

3.2 程式碼編寫:

生產者:

消費者:

工具類:

先啟動消費端=》再啟動生產端

3.3 檢視管控臺:

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

佇列1

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

佇列2

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

3.4 列印結果:

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

消費端列印結果

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

生產端列印結果

可以觀察到消費端先接收到訊息,之後生產端再接收到回撥資訊。如果出現磁碟已滿、RabbitMQ出現異常、queue容量到達上限都可能接收到

no ack

如果ack和no ack訊息都未接收到,這就是之前所說的。RabbitMQ出現網路閃斷,可以採用上面所說的

訊息補償

4. Return訊息機制

Return Listener用於處理一些不可路由的訊息!

我們的訊息生產者,透過指定一個Exchange和Routingkey,把訊息送達到某一個佇列中去,然後我們的消費者監聽佇列,進行消費處理操作!

但是在某些情況下,如果我們在傳送訊息的時候,當前的exchange不存在或者指定的路由key路由不到,這個時候如果我們需要監聽這種不可達的訊息,就要使用Return Listener!

在基礎API中有一個關鍵的配置項:

Mandatory:如果為true,則監聽器會接收到路由不可達的訊息,然後進行後續處理,如果為false,那麼broker端自動刪除該訊息!

4.1 Return訊息機制流程

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

Return訊息機制流程

Producer生產端將訊息傳送到MQ Broker端,但是出現NotFind Exchange,傳送的訊息的Exchange,在Broker端未能找到。或者找到了,但是路由key路由不到指定的佇列。因此是一個錯誤的訊息。

這個時候,生產端應該知道傳送的這條訊息,並不會被處理。因此MQ Broker提供了這種Return機制,將這些不可達的訊息傳送給生產端,這時候生產端就需要設定Return Listener去接收這些不可達的訊息。然後及時記錄日誌,去處理這些訊息。

4.2 程式碼演示

生產者:

消費者:

ConnectionUtils 工具程式碼在上面。

啟動消費端,並檢視管控臺。

4.3 檢視管控臺

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

Exchanges

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

queues

4.4 檢視列印結果

放開消費端程式碼:channel。basicPublish(exchange, routingKey, true, null, msg。getBytes());

消費端列印結果:

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

消費端列印結果

可以看到列印結果正常,此時再改程式碼為:

channel。basicPublish(exchange, routingKeyError, true, null, msg。getBytes());

訊息中介軟體——RabbitMQ(七)高階特性全在這裡!(上)

生產端列印結果

可以看到生產端接收到了不可達的訊息。

文末

歡迎關注個人微信公眾號:

Coder程式設計

獲取最新原創技術文章和免費學習資料,更有大量精品思維導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識!

TAG: 訊息傳送Broker投遞監聽