訊息傳遞的過程中有三種可能遺失的情境:
Producer端送到RabbitMQ時丟失:
- 外界環境問題導致: 發生網路丟包、網路故障等造成訊息丟失。
- 程式碼層面、配置層面導致訊息丟失。
RabbitMQ儲存的訊息丟失:
- 訊息沒有持久化。
- 磁碟意外損壞導致訊息同步失敗。
RabbitMQ送到Consumer時丟失:
消費者接收訊息後還沒來得及處理就當機。
處理方式:
為了確保訊息正確的送達RabbitMQ及Consumer正確的收到訊息,RabbitMQ為我們提供了兩種方式:
- 透過AMQP Transaction機制實現。
- 將Channel設定為confirm模式。
AMQP Transaction機制
主要有三個方法分別為:
- txSelect(): 將當前的channel設定為transaction模式。
- txCommit(): 提交事務。
- txRollback(): 回滾事務。
可以看到下面流程中多了四個步驟,如此一來當資料量大的時候處理起來會比較沒有效率,因此一般應用狀況下很少採用Transaction機制。
try {
channel.txSelect();
channel.basicPublish(...);
channel.txCommit();
} catch (e) {
channel.txRollback();
}
Confirm模式 — 生產端
首先我們一樣將通道設定為Confirm模式,Confirm模式與AMQP Transaction機制只能選一個使用。
channel.confirmSelect();
Confirm模式又分為以下三種方式:
1. 普通Confirm模式: 每發送一條消息就等待服務端回應一次。
這種方式較沒效率,每次丟一個訊息就得等待通知。
try {
// 發送一條訊息
channel.basicPublish(...);
// 等待確認訊號
if(channel.waitForConfirms()){
// 發送成功
} else {
// 重送或其他處置
}
} catch(...) {
...
}
2. 批次Confirm模式: 發送一批消息後再等待服務端回應。
這種方式可以發送一批後再等待通知,乍看之下蠻有效率的,但如果訊息常常丟失,那麼我們也得批次重傳。
try {
// 發送多條訊息
for (i = 0; i < batchSize; i++) {
channel.basicPublish(...);
}
// 等待確認訊號
if(channel.waitForConfirms()){
// 發送成功
} else {
// 重送或其他處置
}
} catch(...) {
...
}
3. 異步Confirm模式: 以Listener方式等待服務端回應。
這邊以Listener的方式來獲取通知訊息,RabbitMQ會發送以下兩種通知:
- Ack: 成功將訊息送到Queue。
- Nack: Queue達到上限、MQ異常、磁碟寫滿…等造成未正確將訊息送到Queue。
這邊會有一種狀況是Ack、Nack都沒收到,如網路瞬斷…等,這時候就得搭配其他機制去進行重送。
try {
// 開啟Confirm模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 成功將訊息送到Queue。
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// Queue達到上限、MQ異常、磁碟寫滿...等。
}
});
} catch(...) {
...
}
以上都不保證100%投送到MQ,只是盡量的保證能夠送達而已,如果對於投送訊息的部份有強烈需求則可以考慮搭配Redis…等DB來維持是否送達的狀態,但勢必會捨棄一些效能。
Confirm模式 — 消費端
消費端的部份就相對單純,處理完訊息後發送ack確認訊號給RabbitMQ即可,RabbitMQ收到後就認為已經消費完成會將該訊息刪除。