2024-06-23|閱讀時間 ‧ 約 39 分鐘

新手必看:RabbitMQ超詳細解析,圖解六大模式應用場景

最近剛好在對RabbitMQ進行研究,發現網路上對RabbitMQ的文章都不太夠全面,這篇文章來幫大家統整一下關於RabbitMQ的六種常見模式及應用,另外也會非常詳細的介紹各個參數代表的意義,對RabbitMQ有興趣的話,就歡迎看下去吧~

raw-image

Message Queue是什麼?

RabbitMQ其實是一種Message Queue(後面會簡稱會Queue),所以我們先來了解什麼是Queue吧!Queue提供了一個強大且靈活的平台,主要用於應用程式之間傳遞訊息。

Queue主要由三個元件所組成,

  1. Producer(生產者):主要用來發送訊息,訊息會透過Broker傳到Consumer
  2. Consumer(消費者):主要用來接收訊息,會從Broker去接收Producer傳來的訊息
  3. Broker(佇列):在Producer和Consumer之間扮演橋樑的角色,負責轉送訊息

RabbitMQ基本架構

接下來介紹Queue的兩個最常見的用途,分別是任務緩衝系統解耦

任務緩衝

譬如碰到有任務必須運算很久(CPU Bound的任務),像用AI生成一張圖片要很長的時間,如果短時期碰到很多人想同時下載圖片,伺服器就有可能爆掉,也有些封包可能會遺失。Queue可以扮演了一個任務緩衝的角色,讓任務進行「排隊」,使得每個任務都不會遺失,排隊的使用者也可以先去做其他事(非同步任務)。

系統解耦

除此之外,Queue也可以幫助任兩個系統進行解耦,Producer和Consumer並不需要記得彼此的位址,只需要記得Queue在哪裡即可,如此三者可以架設在不同主機,也不需要使用相同語言開發,最重要是可以很方便依據需求對伺服器進行水平擴展(增加Server數量)

舉例來說,原本Cient端要傳訊息交給Server端解決,但用戶數量突然增加,伺服器來不及處理完所有任務,由於Client端是向Queue傳遞訊息.可用增加Server數量取代增加Server效能,成本下降非常多,也大幅提高可擴充性

RabbitMQ有什麼特色?

RabbitMQ是一個實現了AMQP協議的開源的Message Queue,比一般傳統的Queue多了一個Exchange,方便更彈性的應用,而RabbitMQ的特色有以下的幾點:

  1. 提供了靈活的路由機制,利用Exchange將訊息進行多種方式的轉送
  2. 提供高可用性、高數據可靠性的功能,利用分散式部署的方式保障數據的安全
  3. 提供數據持久化的功能,保障RabbitMQ進行重啟的時候數據仍可以保存
  4. 提供多種語言的API可供調用,包括Go, Java, PHP, C#, Python, JavaScript…
  5. 提供清楚的可視化工具,方便使用UI進行管理

RabbitMQ的六種應用模式

RabbitMQ總共提供六種常見的使用模式,方便使用者對應不同的情境進行客製化的使用,從最基礎的Simple、Worker模式,到有使用的Exchange的Pub/Sub、Headers、Routing、Topic模式。

Simple模式

Simple模式是最基本的模式,由Producer、Worker、Broker各一個所組成,因為只有一對一,所以主要處理最一般的非同步任務。

Simple模式,來源:RabbitMQ

Worker模式

Worker模式則是進階的Simple模式,當今天Producer的任務傳的太多,就可以發揮RabbitMQ藉由系統解耦,能快速水平擴展的優勢,藉由增加Consumer的數量,加快處理訊息的速度。

範例說明:

訊息若沒經過特別處理,則會依照如下圖的Round-Robin的模式,將A,B,C的三則訊息輪流交給Consumer 1, Consumer 2進行處理,是工廠為了要接大量的訂單,而不斷增加產線加快速度的模式。

常見應用:

1.高流量的任務處理,像是多個需要套用AI模型的任務

Worker模式,來源:RabbitMQ


前面兩個模式主要探討Queue和Consumer的關係,接下來的模式都是使用Exchange這個元件,但Exchange的類型不同,而產生不同模式,可以將訊息依據不同規則,只轉送到符合規則的Queue,著重於Exchange和Queue的關係,那我們就開始吧!


Publisher/Subscriber模式

首先是Pub/Sub模式,又稱發布者/訂閱者模式,Exchange的類型是fanout,也就是將訊息全部轉送,當Exchange從Producer收到訊息,只將訊息轉送給和Exchange綁定的Queue,每個和Exchange綁定的Queue都會拿到同一份訊息。

範例說明

以下圖為例,Exchange收到A,B,C三則訊息,由於Q1, Q2都給Exchange綁定,所以兩個Queue都會同時收到A,B,C三則訊息,再交由Queue後面的Consumer進行處理,所以有Exchange的模式,都是可以跟前面Simple、Worker模式進行組合搭配。

常見應用

  1. Youtube訂閱制,當YT有影片更新時,每個訂閱者都可以收到相同的訊息通知
  2. 當廣告的電子報發布時,所有訂閱者都可以收到相同的電子報

Pub/Sub模式,來源:RabbitMQ

Routing模式

接下來是Routing模式,Exchange的類型是direct,當你希望將Producer送出的訊息加上特定Key,只傳送給有匹配Key的Queue時,就可使用這種模式。

Routing的特色是一對一精確比對,每則訊息最多只有一個Routing Key,每個Queue都可以跟Exchange用多組Key進行綁定,訊息只會轉送給兩個Key完全相同的Queue。

Queue和Exchange綁定的所有Key.contains(訊息的Routing Key),才會進行轉送

範例說明

如下圖Q1就和Exchange綁定一個Key(error),Q2則跟Exchange綁定多組Key(info, warn, error),當有則帶Key的訊息從左邊Producer(P)傳到Exchange,Exchange就會將訊息做不同的轉送。

1.Key為error:訊息複製兩份,分別傳送給Q1和Q2,兩個都和Exchange有用error綁定2.Key為warn或info:直接將訊息傳給Q2,只有Q2有用warn和info進行綁定
3.Key不為以上三者:訊息會直接被丟棄,Exchange找不到有匹配的Queue。或是也可以建立一個Dead Letter Exchange,將被丟棄的訊息做額外處理

常見應用

1.進階Youtube訂閱制,每個訂閱者可根據不同會員等級收到不同通知
2.Log系統應用,Q1可以專門儲存error log,Q2每個log都接收傳到Console

Routing模式,來源:RabbitMQ

Topic模式

接下來是Topic模式,Exchange的類型是topic,當你希望將Producer送出的訊息加上特定Key,只傳送給有相似Key的Queue時,就可使用這種模式。

Routing的特色是一對一模糊比對,每則訊息最多只有一個Routing Key,每個Queue都可以跟Exchange用多組Key進行綁定,訊息只會轉送給兩個Key相似的Queue。

和前面Routing模式的最大差異是:

綁定的Routing Key增加*(star)和#(bash)兩個可模糊比對的符號,(*)代表一個單詞,(#)代表0到多個單詞,但符號間必須用(.)進行隔開,如(*.orange.*),則代表orange的前後必須也只能有一個單詞,如lazy.orange.rabbit則符合規則。
若未使用這兩個符號,則跟Routing模式完全相同

範例說明

如下圖Q1就和Exchange綁定一個Key(*.orange.*),Q2則跟Exchange綁定兩個Key(*.*.rabbit、lazy.#),當有則帶Key的訊息從左邊Producer(P)傳到Exchange,Exchange就會將訊息做不同的轉送。

1.Key為lazy.orange.elephant:訊息複製兩份,分別傳送給Q1和Q2,因為符合

Q1的規則:orange前面(lazy)、後面(elephant)各有一個單詞
Q2的規則1:完全沒有Rabbit所以不符合
Q2的規則2:lazy後面有多個單詞(orange, elephant)

2.Key為happy.lazy.orange:訊息直接被丟棄,因沒有完全符合Q1、Q2的任一個規則

Q1的規則:不符orange前面只能有一個單詞(happy和lazy)、不符後面必須一個(0個)
Q2的規則1:完全沒有Rabbit所以不符合
Q2的規則2:符合lazy後面有0到多個單詞(他0個)、但不符lazy前面沒有單詞(happy)

Go範例實作

# producer.go
conn, err := amqp.Dial(util.RabbitMQ_host)
if err != nil {
log.Printf("Error occurs when connect to RabbitMQ: %v", err)
return
}
defer conn.Close()

# 建立channel連線
ch, err := conn.Channel()
if err != nil {
log.Printf("Error occurs when set Channel: %v", err)
return
}
defer ch.Close()
err = ch.PublishWithContext(ctx,
"Exchange_Name" , // Exchange名稱
"", // Routing Key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(body)
})


# consumer.go 
# 建立
args := amqp.Table{}
q, err := ch.QueueDeclare(
"Queue_Name", // Queue_Name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
if err != nil {
fmt.Printf("Error occurs when set Queue: %v", err)
return
}

# 設定Binding的規則
err = ch.QueueBind(
q.Name, // Queue名稱
"Routing_Key"  , // Routing Key
"Exchnage_Name", // Exchange名稱
false, // 不等待狀態
nil, // 額外參數
)
if err != nil {
log.Printf("Error occurs when set Queue: %v", err)
return
}

# 建立Consumer
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)

常見應用
1.log有多個層級時,像是error.serious.log和error.light.log只要綁定error.#一個Key,就可以同時接收到多種log的Queue,不需要同時綁定兩個Key,簡潔性會更高。

Topic模式,來源:RabbitMQ


Header模式

接下來介紹是Header模型,Exchange Type設定為headers,這個模式網路上比較少人有介紹,因為也比較複雜一些,想要一次比對多個值時可以使用這種模式。

Header的特色是整組全對整組對一個,由於必須在規則上綁定多個鍵值,不能使用前面的Key而需要改使用headers,以Go語言為例。

1.Producer的的部分必須把用來比對的多個鍵值放到amqp.Publishing的headers

2.Consumer必須先建立一個amqp.Table,將想比對的所有值放進去後,再把Table放在QueueBind裡的arguments,除此之外Table必須放入一個x-match的key,他的value可以是any或是all,分別代表整組對一個整組全對的規則

  • {"x-match": "all"}: 代表Table裡的headers和訊息的headers必須完全相符
  • {"x-match": "any"}: 代表Table裡的headers和訊息的headers有一個相符即可

符合規則的訊息,就會從Exchange被轉送到所有有綁定、規則也符合的Queue裡面,即可完成整串流程。

範例說明

為了避免大家搞混,這邊同時把Routing, Topic, Headers三種模式用一張圖進行比對,Header模式為最下面的流程表。

1.Queue的規則1:x-match是any,訊息只要有{color:red}的header會被轉送

2.Queue的規則2:x-match是all,必須同時有{color:red},{legs:4}兩個才會轉送

所以左邊第一則訊息符合規則1,會被轉送到Queue,左邊第二則訊息同時符合規則1和2也會被轉送,但不管符合幾個規則,都只會被轉送一次而已。

RabbitMQ模式

Go範例實作

# producer.go
contentType := "application/json"
err := ch.PublishWithContext(ctx,
"Exchange_Name" , // Exchange名稱
"", // Routing Key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: contentType,
Body: []byte(body),
Headers: amqp.Table{"ContentType": contentType,
"key1": "value1"},
})


# consumer.go 
headers := amqp.Table{
"x-match": "any",
"Content-Type": "application/json",
"key1": "value1"
}
err := ch.QueueBind(
"Queue_Name", // Queue名稱
"", // Routing Key
"Exchange_Name" , // Exchange名稱
false, // 不等待狀態
headers,
)

常見應用

譬如說需要既有Key的比對,又希望額外增加Content-Type必須為application/json的比對條件,就可以把兩個值同時放入amqp.Table,但請注意headers和實際上封包的headers完全沒有關係,比對時只會比對amqp.Table內的鍵值,所以建議使用上面同時使用contentType變數的做法,才可以達到效果喔。

總結

RabbitMQ目前最適合的使用場景是在對Queue不太熟悉且需求也不太複雜,RabbitMQ可以非常快速讓你快速搭建起Queue的基本架構以及多種路由規則,但若遇到量能較大的需求,如每秒數百萬訊息的需求,建議可以參考Kafka或其他Queue的元件喔~

以上就是RabbitMQ的基礎介紹,希望能幫從0開始的新手們了解RabbitMQ,希望對你有幫助。之後反應不錯的話,我會再撰寫關於更多進階參數使用,如Priority Queue及Go範例文章,若有錯誤或不周延的地方,再請各位大神指教、糾正了~


你可能還會想看:

CNN實作Kaggle貓狗影像辨識(Pytorch)

幫非結構化資料找個家,快速入門MinIO(一):基本概念介紹

AWS CCP考試準備資源與心得分享(Certified Cloud Practitioner)



分享至
成為作者繼續創作的動力吧!
從 Google News 追蹤更多 vocus 的最新精選內容從 Google News 追蹤更多 vocus 的最新精選內容

作者的相關文章

吉米富的沙龍 的其他內容

你可能也想看

發表回應

成為會員 後即可發表留言
© 2024 vocus All rights reserved.