2024-06-18|閱讀時間 ‧ 約 28 分鐘

設計模式與程式架構(七)

    ※ 生產者和消費者模式

    定義:

    生產者和消費者在同一時間內共同存取某一個資料空間。生產者負責生成數據並將其放入共享空間,消費者負責從共享空間中取走數據進行處理。兩者之間互不相干,也不須互相知道對方的存在。

    • 共同存取資料空間:生產者和消費者共享同一個資料空間。這個空間通常是緩衝區或隊列,用於在它們之間傳遞數據。
    • 生產者和消費者的操作:
    1. 生產者:負責生成數據並將其放入共享空間。
    2. 消費者:負責從共享空間中取走數據並進行處理。
    • 互不相干:生產者和消費者彼此獨立運作,彼此之間不需要了解對方的存在或狀態。它們之間的唯一通信方式是通過共享的資料空間。

    優點:

    • 生產者與消費者之間完全解耦合:
    1. 生產者和消費者彼此獨立,互不依賴,只通過共享的資料空間進行通信。
    2. 這種解耦提高了系統的靈活性和可擴展性,使得系統更容易維護和擴展。例如,可以獨立地增加或減少生產者或消費者的數量。
    • 在多線程的系統架構中依然容易實作:
    1. 生產者/消費者模式簡化了多線程環境下的數據傳遞和協作。
    2. 通常使用緩衝區或隊列來實現這種模式,可以有效避免線程間的競爭條件,確保數據一致性和系統穩定性。

    ※ 什麼是 Message Queue(MQ)?

    定義:

    Message Queue (MQ),又稱為消息佇列,是一種訊息傳遞仲介。在這種架構中,生產者(Producer)生成並發送消息,仲介(Broker)管理和傳遞消息,消費者(Consumer)接收並處理消息。消息佇列提供了不同程序或不同系統之間的非同步溝通

    優點:

    • 非同步溝通生產者和消費者可以用不同的速度工作,彼此不需要等待對方完成操作。
    • 系統解耦生產者和消費者之間不直接通信,而是通過消息佇列進行間接通信,這提高了系統的靈活性和可維護性。
    • 暫存容錯當消費者(Consumer)意外關閉或出現故障時,未處理完的消息會暫存在消息佇列(MQ)中,並不會丟失。這些消息可以在消費者重啟後繼續處理。這種特性確保了系統的穩定性和可恢復性。
    • 可靠性消息佇列通常具有消息持久化和重試機制,確保消息不會丟失。
    • 水平擴展Producer和Consumer:Producer 可以分散在不同來源、裝置收集(例如 IoT (物聯網)應用);Consumer 可以按照需求和資源,運行在多台機器上,加速訊息(任務)的消化和處理。
    • 可擴展性可以輕鬆地增加更多的生產者或消費者來應對負載變化,保持系統的高效運行。

    常用套件:

    • RabbitMQ:是一個支持多種主流程式語言的消息佇列系統。例如:Node.js、Golang、Java、C/C++,Python。
    • Redis:是一個流行的開源內存資料庫,被廣泛用作快取(cache)和儲存服務。它特別適合需要快速存取和高效處理數據的應用場景。 特別是和Node.js 的結合能夠為應用帶來高效的數據處理和管理能力。

    ※ 簡單的生產者-消費者模式的實現範例解說

    const buffer = <any>[] //用一個array當作是緩衝區
    const MAX_BUFFER = 10 //緩衝區最大上限

    //設一個生產者
    class Producer {
    private buffer: any[]
    //告訴它buffer
    constructor(buffer: any[]) {
    this.buffer = buffer
    }
    //產生一個容易被發現的內容
    random = () => String(~(Math.random() * 1000)).padStart(3, "0")
    start = () => {
    //使用 setInterval 定時器
    setInterval(() => {
    //每秒產生消息往緩衝區塞資料
    if (this.buffer.length >= MAX_BUFFER)
    return console.warn("緩衝區已滿,請稍等")
    //產生message的內容
    const msg = "內容" + this.random()
    console.log("產生" + msg)
    //將資料放進buffer
    this.buffer.push(msg)
    }, 1000)
    }
    }
    //設一個消費者
    class Consumer {
    private buffer: any[]
    constructor(buffer: any[]) {
    this.buffer = buffer
    }
    start = () => {
    setInterval(() => {
    //消費者往緩衝區拿資料
    if (this.buffer.length <= 0) return console.warn("緩衝區已空,請稍等")

    //將資料取出來處理
    const msg = this.buffer.shift()
    console.log("取出", msg, "來處理")
    }, 1200)
    }
    }
    //緩衝區監控器來偵測buffer內的資料有哪些
    const buffer_monitor = setInterval(() => {
    console.log("--> 緩衝區目前有", buffer.length, "筆資料")
    }, 500)

    //建立生產者和消費者實例,並將共享的 buffer 傳遞給它們
    const producer = new Producer(buffer)
    const consumer = new Consumer(buffer)

    // 啟動生產者和消費者
    producer.start()
    consumer.start()
    /**--> 緩衝區目前有 0 筆資料
    產生內容-684
    --> 緩衝區目前有 1 筆資料
    取出 內容-684 來處理
    --> 緩衝區目前有 0 筆資料
    產生內容-380
    --> 緩衝區目前有 1 筆資料
    取出 內容-380 來處理
    -->不斷產出資料直到terminal結束/*



    分享至
    成為作者繼續創作的動力吧!
    © 2024 vocus All rights reserved.