最近剛好在對RabbitMQ進行研究,發現網路上對RabbitMQ的文章都不太夠全面,這篇文章來幫大家統整一下關於RabbitMQ的六種常見模式及應用,另外也會非常詳細的介紹各個參數代表的意義,對RabbitMQ有興趣的話,就歡迎看下去吧~
RabbitMQ其實是一種Message Queue(後面會簡稱會Queue),所以我們先來了解什麼是Queue吧!Queue提供了一個強大且靈活的平台,主要用於應用程式之間傳遞訊息。
Queue主要由三個元件所組成,
接下來介紹Queue的兩個最常見的用途,分別是任務緩衝、系統解耦
譬如碰到有任務必須運算很久(CPU Bound的任務),像用AI生成一張圖片要很長的時間,如果短時期碰到很多人想同時下載圖片,伺服器就有可能爆掉,也有些封包可能會遺失。Queue可以扮演了一個任務緩衝的角色,讓任務進行「排隊」,使得每個任務都不會遺失,排隊的使用者也可以先去做其他事(非同步任務)。
除此之外,Queue也可以幫助任兩個系統進行解耦,Producer和Consumer並不需要記得彼此的位址,只需要記得Queue在哪裡即可,如此三者可以架設在不同主機,也不需要使用相同語言開發,最重要是可以很方便依據需求對伺服器進行水平擴展(增加Server數量)。
舉例來說,原本Cient端要傳訊息交給Server端解決,但用戶數量突然增加,伺服器來不及處理完所有任務,由於Client端是向Queue傳遞訊息.可用增加Server數量取代增加Server效能,成本下降非常多,也大幅提高可擴充性。
RabbitMQ是一個實現了AMQP協議的開源的Message Queue,比一般傳統的Queue多了一個Exchange,方便更彈性的應用,而RabbitMQ的特色有以下的幾點:
RabbitMQ總共提供六種常見的使用模式,方便使用者對應不同的情境進行客製化的使用,從最基礎的Simple、Worker模式,到有使用的Exchange的Pub/Sub、Headers、Routing、Topic模式。
Simple模式是最基本的模式,由Producer、Worker、Broker各一個所組成,因為只有一對一,所以主要處理最一般的非同步任務。
Worker模式則是進階的Simple模式,當今天Producer的任務傳的太多,就可以發揮RabbitMQ藉由系統解耦,能快速水平擴展的優勢,藉由增加Consumer的數量,加快處理訊息的速度。
範例說明:
訊息若沒經過特別處理,則會依照如下圖的Round-Robin的模式,將A,B,C的三則訊息輪流交給Consumer 1, Consumer 2進行處理,是工廠為了要接大量的訂單,而不斷增加產線加快速度的模式。
常見應用:
1.高流量的任務處理,像是多個需要套用AI模型的任務
前面兩個模式主要探討Queue和Consumer的關係,接下來的模式都是使用Exchange這個元件,但Exchange的類型不同,而產生不同模式,可以將訊息依據不同規則,只轉送到符合規則的Queue,著重於Exchange和Queue的關係,那我們就開始吧!
首先是Pub/Sub模式,又稱發布者/訂閱者模式,Exchange的類型是fanout,也就是將訊息全部轉送,當Exchange從Producer收到訊息,只將訊息轉送給和Exchange綁定的Queue,每個和Exchange綁定的Queue都會拿到同一份訊息。
範例說明:
以下圖為例,Exchange收到A,B,C三則訊息,由於Q1, Q2都給Exchange綁定,所以兩個Queue都會同時收到A,B,C三則訊息,再交由Queue後面的Consumer進行處理,所以有Exchange的模式,都是可以跟前面Simple、Worker模式進行組合搭配。
常見應用:
接下來是Routing模式,Exchange的類型是direct,當你希望將Producer送出的訊息加上特定Key,只傳送給有匹配Key的Queue時,就可使用這種模式。
Routing的特色是一對一精確比對,每則訊息最多只有一個Routing Key,每個Queue都可以跟Exchange用多組Key進行綁定,訊息只會轉送給兩個Key完全相同的Queue。
Queue和Exchange綁定的所有Key.contains(訊息的Routing Key),才會進行轉送
範例說明:
如下圖Q1就和Exchange綁定一個Key(error),Q2則跟Exchange綁定多組Key(info, warn, error),當有則帶Key的訊息從左邊Producer(P)傳到Exchange,Exchange就會將訊息做不同的轉送。
1.Key為error:訊息複製兩份,分別傳送給Q1和Q2,兩個都和Exchange有用error綁定2.Key為warn或info:直接將訊息傳給Q2,只有Q2有用warn和info進行綁定
3.Key不為以上三者:訊息會直接被丟棄,Exchange找不到有匹配的Queue。或是也可以建立一個Dead Letter Exchange,將被丟棄的訊息做額外處理
常見應用:
1.進階Youtube訂閱制,每個訂閱者可根據不同會員等級收到不同通知
2.Log系統應用,Q1可以專門儲存error log,Q2每個log都接收傳到Console
接下來是Topic模式,Exchange的類型是topic,當你希望將Producer送出的訊息加上特定Key,只傳送給有相似Key的Queue時,就可使用這種模式。
Routing的特色是一對一模糊比對,每則訊息最多只有一個Routing Key,每個Queue都可以跟Exchange用多組Key進行綁定,訊息只會轉送給兩個Key相似的Queue。
和前面Routing模式的最大差異是:
綁定的Routing Key增加*(star)和#(bash)兩個可模糊比對的符號,(*)代表一個單詞,(#)代表0到多個單詞,但符號間必須用(.)進行隔開,如(*.orange.*),則代表orange的前後必須也只能有一個單詞,如lazy.orange.rabbit則符合規則。
若未使用這兩個符號,則跟Routing模式完全相同
範例說明:
如下圖Q1就和Exchange綁定一個Key(*.orange.*),Q2則跟Exchange綁定兩個Key(*.*.rabbit、lazy.#),當有則帶Key的訊息從左邊Producer(P)傳到Exchange,Exchange就會將訊息做不同的轉送。
1.Key為lazy.orange.elephant:訊息複製兩份,分別傳送給Q1和Q2,因為符合
Q1的規則:orange前面(lazy)、後面(elephant)各有一個單詞
Q2的規則1:完全沒有Rabbit所以不符合
Q2的規則2:lazy後面有多個單詞(orange, elephant)
2.Key為happy.lazy.orange:訊息直接被丟棄,因沒有完全符合Q1、Q2的任一個規則
Q1的規則:不符orange前面只能有一個單詞(happy和lazy)、不符後面必須一個(0個)
Q2的規則1:完全沒有Rabbit所以不符合
Q2的規則2:符合lazy後面有0到多個單詞(他0個)、但不符lazy前面沒有單詞(happy)
Go範例實作:
# producer.go
conn, err := amqp.Dial(util.RabbitMQ_host)
if err != nil {
log.Printf("Error occurs when connect to RabbitMQ: %v", err)
return
}
defer conn.Close()
# 建立channel連線
ch, err := conn.Channel()
if err != nil {
log.Printf("Error occurs when set Channel: %v", err)
return
}
defer ch.Close()
err = ch.PublishWithContext(ctx,
"Exchange_Name" , // Exchange名稱
"", // Routing Key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(body)
})
# consumer.go
# 建立
args := amqp.Table{}
q, err := ch.QueueDeclare(
"Queue_Name", // Queue_Name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
if err != nil {
fmt.Printf("Error occurs when set Queue: %v", err)
return
}
# 設定Binding的規則
err = ch.QueueBind(
q.Name, // Queue名稱
"Routing_Key" , // Routing Key
"Exchnage_Name", // Exchange名稱
false, // 不等待狀態
nil, // 額外參數
)
if err != nil {
log.Printf("Error occurs when set Queue: %v", err)
return
}
# 建立Consumer
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
常見應用:
1.log有多個層級時,像是error.serious.log和error.light.log只要綁定error.#一個Key,就可以同時接收到多種log的Queue,不需要同時綁定兩個Key,簡潔性會更高。
接下來介紹是Header模型,Exchange Type設定為headers,這個模式網路上比較少人有介紹,因為也比較複雜一些,想要一次比對多個值時可以使用這種模式。
Header的特色是整組全對或整組對一個,由於必須在規則上綁定多個鍵值,不能使用前面的Key而需要改使用headers,以Go語言為例。
1.Producer的的部分必須把用來比對的多個鍵值放到amqp.Publishing的headers
2.Consumer必須先建立一個amqp.Table,將想比對的所有值放進去後,再把Table放在QueueBind裡的arguments,除此之外Table必須放入一個x-match的key,他的value可以是any或是all,分別代表整組對一個和整組全對的規則
符合規則的訊息,就會從Exchange被轉送到所有有綁定、規則也符合的Queue裡面,即可完成整串流程。
範例說明:
為了避免大家搞混,這邊同時把Routing, Topic, Headers三種模式用一張圖進行比對,Header模式為最下面的流程表。
1.Queue的規則1:x-match是any,訊息只要有{color:red}的header會被轉送
2.Queue的規則2:x-match是all,必須同時有{color:red},{legs:4}兩個才會轉送
所以左邊第一則訊息符合規則1,會被轉送到Queue,左邊第二則訊息同時符合規則1和2也會被轉送,但不管符合幾個規則,都只會被轉送一次而已。
Go範例實作:
# producer.go
contentType := "application/json"
err := ch.PublishWithContext(ctx,
"Exchange_Name" , // Exchange名稱
"", // Routing Key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: contentType,
Body: []byte(body),
Headers: amqp.Table{"ContentType": contentType,
"key1": "value1"},
})
# consumer.go
headers := amqp.Table{
"x-match": "any",
"Content-Type": "application/json",
"key1": "value1"
}
err := ch.QueueBind(
"Queue_Name", // Queue名稱
"", // Routing Key
"Exchange_Name" , // Exchange名稱
false, // 不等待狀態
headers,
)
常見應用:
譬如說需要既有Key的比對,又希望額外增加Content-Type必須為application/json的比對條件,就可以把兩個值同時放入amqp.Table,但請注意headers和實際上封包的headers完全沒有關係,比對時只會比對amqp.Table內的鍵值,所以建議使用上面同時使用contentType變數的做法,才可以達到效果喔。
RabbitMQ目前最適合的使用場景是在對Queue不太熟悉且需求也不太複雜,RabbitMQ可以非常快速讓你快速搭建起Queue的基本架構以及多種路由規則,但若遇到量能較大的需求,如每秒數百萬訊息的需求,建議可以參考Kafka或其他Queue的元件喔~
以上就是RabbitMQ的基礎介紹,希望能幫從0開始的新手們了解RabbitMQ,希望對你有幫助。之後反應不錯的話,我會再撰寫關於更多進階參數使用,如Priority Queue及Go範例文章,若有錯誤或不周延的地方,再請各位大神指教、糾正了~
你可能還會想看:
幫非結構化資料找個家,快速入門MinIO(一):基本概念介紹
AWS CCP考試準備資源與心得分享(Certified Cloud Practitioner)