使用 Docker 部署 Kafka 與 Node.js 實現訊息生產與消費

閱讀時間約 12 分鐘
Kafka 是一個高效能、可擴展的分布式事件流平台,常用於處理大量的實時數據流。在這篇文章中將介紹如何使用 Docker 部署 Kafka 和 Zookeeper,並通過 Node.js 實現 Kafka 生產者與消費者的簡單範例,協助你快速理解 Kafka 的基本用法。

Kafka 和 Zookeeper 部署

Kafka 需要 Zookeeper 來協調和管理分布式環境中的服務狀態。可以使用以下的 docker-compose.yml 文件來搭建 Kafka 和 Zookeeper:

version: '3'

services:

zookeeper:

image: wurstmeister/zookeeper:latest

container_name: zookeeper

ports:

- "2181:2181"

kafka:

image: wurstmeister/kafka:latest

container_name: kafka

ports:

- "9093:9093"

environment:

KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://192.168.0.193:9093

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT

KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093

KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

depends_on:

- zookeeper

服務啟動後確認 Kafka 是否正常運行

啟動 Docker 容器後,可以使用以下命令來檢查 Kafka 是否成功啟動:

docker exec kafka kafka-topics.sh --list --bootstrap-server 192.168.0.193:9093

如果成功列出現有的 topic,則表示 Kafka 正常運行

使用 Kafka UI 監控消息發送情況

若希望視覺化監控 Kafka 的消息流動,可以使用 Kafka UI 工具來進行監控。使用以下命令來啟動 Kafka UI:

podman run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui

這將啟動一個可以透過瀏覽器訪問的 UI,幫助你查看和管理 Kafka 中的 topic 和消息。

Node.js 與 Kafka 整合

安裝所需套件

在 Node.js 中,我們可以使用 KafkaJS 來操作 Kafka。首先需要安裝 kafkajs

npm install kafkajs

目錄結構

以下是範例專案的目錄結構:

project/
├── producer.js // Kafka 生產者
├── consumer.js // Kafka 消費者
├── server.js // Node.js 伺服器

Kafka 生產者 (producer.js)

Kafka 生產者負責將消息發送到 Kafka 的指定 topic 中。以下是簡單的 Kafka 生產者程式碼:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});

const producer = kafka.producer();

const sendMessage = async (topic, messages) => {
await producer.connect();
await producer.send({
topic,
messages: Array.isArray(messages) ? messages.map((value) => ({ value })) : [{ value: messages }],
});
console.log(`Messages sent to topic: ${topic}`);
await producer.disconnect();
};

// 如果是直接執行,則進行測試發送
if (require.main === module) {
const topic = 'test-topic';
const messages = ['Hello Kafka!', 'This is a standalone message.'];

sendMessage(topic, messages)
.then(() => console.log('Producer finished successfully.'))
.catch((err) => console.error('Producer encountered an error:', err));
}

module.exports = { sendMessage };

Kafka 消費者 (consumer.js)

Kafka 消費者從 Kafka 中訂閱指定的 topic,並處理收到的消息。以下是簡單的 Kafka 消費者程式碼:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});

const consumer = kafka.consumer({ groupId: 'test-group' });

const startConsumer = async (topic, messageHandler) => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });

await consumer.run({
eachMessage: async ({ message }) => {
messageHandler(message.value.toString());
},
});
console.log(`Consumer started for topic: ${topic}`);
};

// 如果是直接執行,則啟動並列印收到的訊息
if (require.main === module) {
const topic = 'test-topic';

startConsumer(topic, (message) => {
console.log(`Standalone Consumer received message: ${message}`);
}).catch((err) => console.error('Consumer encountered an error:', err));
}

module.exports = { startConsumer };

Node.js 伺服器 (server.js)

接下來,我們將使用 Express 框架啟動一個簡單的 HTTP 伺服器,並透過 API 路由觸發 Kafka 生產者和消費者。

const express = require('express');
const { sendMessage } = require('./producer');
const { startConsumer } = require('./consumer');

const app = express();
const PORT = 3000;
const topic = 'test-topic';

// API 路由:發送訊息
app.get('/send', async (req, res) => {
const message = req.query.message || 'Default message';
try {
await sendMessage(topic, [message]);
res.send(`Message "${message}" sent to Kafka.`);
} catch (error) {
console.error('Error sending message:', error);
res.status(500).send('Failed to send message.');
}
});

// API 路由:啟動消費者
app.get('/consume', async (req, res) => {
try {
await startConsumer(topic, (message) => {
console.log(`Received message: ${message}`);
});
res.send('Consumer started. Check logs for received messages.');
} catch (error) {
console.error('Error starting consumer:', error);
res.status(500).send('Failed to start consumer.');
}
});

// 啟動伺服器
app.listen(PORT, () => {
console.log(`Server is running on http://localhost:${PORT}`);
});

啟動服務

  1. 啟動 Kafka 伺服器,確保 Kafka 正常運行。
  2. 啟動 Kafka 消費者(選擇性,僅用於查看消息):
    node consumer.js
  3. 啟動 Express 伺服器:
    node server.js

測試 API

你可以透過以下 API 發送和接收 Kafka 消息:

注意事項

  1. Kafka 設定:確保 Kafka Broker 運行於 localhost:9092(或根據實際情況更新為正確的地址)。
  2. Topic 設定:若 test-topic 尚未建立,KafkaJS 會自動創建它。
  3. 錯誤處理:生產者和消費者應加入完整的錯誤處理邏輯,以應對網絡問題或 Kafka 錯誤。



這篇文章簡單介紹了如何在 Docker 中部署 Kafka 和 Zookeeper,並且提供了在 Node.js 中使用 Kafka 進行訊息生產與消費的範例。希望能幫助你快速上手 Kafka,並了解如何與 Node.js 整合。



avatar-img
0會員
3內容數
軟體開發 & 金融投資的日常筆記
留言0
查看全部
avatar-img
發表第一個留言支持創作者!
嘿洽啦 的其他內容
本文探討如何快速搭建 Graylog 和 Opensearch 以滿足專案需求,並詳細介紹日誌管理、手動備份、快照設定及自動備份的步驟。通過設置 Docker 環境,解決權限不足問題,確保系統的穩定備份與恢復功能。讀者將學會如何有效管理日誌數據和進行必要的快照操作,提升系統的可靠性和數據安全性。
本文探討如何快速搭建 Graylog 和 Opensearch 以滿足專案需求,並詳細介紹日誌管理、手動備份、快照設定及自動備份的步驟。通過設置 Docker 環境,解決權限不足問題,確保系統的穩定備份與恢復功能。讀者將學會如何有效管理日誌數據和進行必要的快照操作,提升系統的可靠性和數據安全性。
本篇參與的主題活動
BHC炸雞台灣首家分店於11/11台北大巨蛋地下街B2「遠東Garden City花園綠廊」正式登場,從國父紀念館捷運站五號出口步行約2分鐘。店面特別規畫內用和外帶動線,讓球迷們可以更快速外帶點餐,也有計畫明年擴展至南部。
BHC炸雞台灣首家分店於11/11台北大巨蛋地下街B2「遠東Garden City花園綠廊」正式登場,從國父紀念館捷運站五號出口步行約2分鐘。店面特別規畫內用和外帶動線,讓球迷們可以更快速外帶點餐,也有計畫明年擴展至南部。
你可能也想看
Google News 追蹤
Thumbnail
這個秋,Chill 嗨嗨!穿搭美美去賞楓,裝備款款去露營⋯⋯你的秋天怎麼過?秋日 To Do List 等你分享! 秋季全站徵文,我們準備了五個創作主題,參賽還有機會獲得「火烤兩用鍋」,一起來看看如何參加吧~
Thumbnail
11/20日NVDA即將公布最新一期的財報, 今天Sell Side的分析師, 開始調高目標價, 市場的股價也開始反應, 未來一週NVDA將重新回到美股市場的焦點, 今天我們要分析NVDA Sell Side怎麼看待這次NVDA的財報預測, 以及實際上Buy Side的倉位及操作, 從
Thumbnail
Hi 大家好,我是Ethan😊 相近大家都知道保濕是皮膚保養中最基本,也是最重要的一步。無論是在畫室裡長時間對著畫布,還是在旅途中面對各種氣候變化,保持皮膚的水分平衡對我來說至關重要。保濕化妝水不僅能迅速為皮膚補水,還能提升後續保養品的吸收效率。 曾經,我的保養程序簡單到只包括清潔和隨意上乳液
Thumbnail
我們在「【Message Queue - Kafka】串流時代的超入門簡介」有介紹到關於Kafka的基礎概念, 那麼本章節主要著重於生產者(Producer)的面向來細部探討, 看看生產者(Producer)究竟是什麼? 有哪些應該要注意的? 我們今天的主題除了說明生產者(Producer)的
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
Thumbnail
為什麼會有Schema Registry的出現? 因為Kafka的零拷貝原則, 也就是kafka本身並不會去碰觸到訊息也不進行資料驗證, 而是bypass的傳送, 預設都以位元組來傳輸資料會比較有效率, 但位元組誰看得懂啊...。 加上Kafka的特性是生產者與消費者並不能直接溝通, 因
Thumbnail
連接器故名思議就是兩個系統之間的橋樑, 而Kafka Connect正是扮演著這樣的角色, 如圖上, 我們可以透過Kafka Connect將SQL的資料導出到Kafka並導入到MySQL。 豐富的Plugin Confluent Hub提供了各式各樣的外掛套件, 包括了MongoDB、My
Thumbnail
前言 大家好上次我們教了如何下載 Docker Image 使用,今天我們要教如何產出自己的 Image,這次會使用一個 Dockerfile 範例,自行打包 Docker Image,初步練習使用 Docker 指令 下載 Dockerfile 教學檔案 在這裡我已經幫忙寫好一個 Docke
Thumbnail
前言 上次講到 Dockerfile、DockerImage、Docker Container 他們之間的關係,今天我們要來熟悉 Docker Image 如何使用,教你如何抓取雲端上的 Docker Image, Docker Image 下載來源 當我們今天要要使用 Docker Imag
Thumbnail
前言 上次我們初步體驗 Docker 快速佈署能力,今天我們要來講解 Dockerfile、Docker Image 與 Docker Container 這些常見的名詞,我們來了解在我們佈署的時候做哪些事情 Docker 佈署流程 首先看到如下圖上半部,在我們一個完整的佈署流程,我們會先將我
Thumbnail
前言 大家好在先前我們講了什麼是 Docker,Docker 好處有什麼以及怎麼安裝 Docker,今天我們要來開始初體驗 Docker 容器,使用後您會發現 Docker 非常的方便快速 Docker Hub 介紹 首先在開始學怎麼抓取 Docker Image 之前,我們要先來介紹 Doc
Thumbnail
前言 上次我們針對 Docker 這樣容器化技術做了一點介紹,今天我們要來講解 Docker 架構,你是否發現在每次程式上伺服器的流程很麻煩呢 ? 是否發現你寫的程式在別的作業系統不能用呢 ? 如果你遇到這些問題,Docker 都可以幫助你解決這些問題 Docker 架構 在 Docker 這
Thumbnail
前言 大家好我們今天要來教 Docker 這項技術,什麼是 Docker ? Docker 可以幫助我們做什麼事情 ? Docker 是一項容器化技術,他可以降低我們在佈署 App 時,讓我們可以有效的分配作業系統資源,降低佈署作業成本,現在讓我們來了解 Docker 要解決的問題 傳統佈署遇
Thumbnail
這個秋,Chill 嗨嗨!穿搭美美去賞楓,裝備款款去露營⋯⋯你的秋天怎麼過?秋日 To Do List 等你分享! 秋季全站徵文,我們準備了五個創作主題,參賽還有機會獲得「火烤兩用鍋」,一起來看看如何參加吧~
Thumbnail
11/20日NVDA即將公布最新一期的財報, 今天Sell Side的分析師, 開始調高目標價, 市場的股價也開始反應, 未來一週NVDA將重新回到美股市場的焦點, 今天我們要分析NVDA Sell Side怎麼看待這次NVDA的財報預測, 以及實際上Buy Side的倉位及操作, 從
Thumbnail
Hi 大家好,我是Ethan😊 相近大家都知道保濕是皮膚保養中最基本,也是最重要的一步。無論是在畫室裡長時間對著畫布,還是在旅途中面對各種氣候變化,保持皮膚的水分平衡對我來說至關重要。保濕化妝水不僅能迅速為皮膚補水,還能提升後續保養品的吸收效率。 曾經,我的保養程序簡單到只包括清潔和隨意上乳液
Thumbnail
我們在「【Message Queue - Kafka】串流時代的超入門簡介」有介紹到關於Kafka的基礎概念, 那麼本章節主要著重於生產者(Producer)的面向來細部探討, 看看生產者(Producer)究竟是什麼? 有哪些應該要注意的? 我們今天的主題除了說明生產者(Producer)的
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
Thumbnail
為什麼會有Schema Registry的出現? 因為Kafka的零拷貝原則, 也就是kafka本身並不會去碰觸到訊息也不進行資料驗證, 而是bypass的傳送, 預設都以位元組來傳輸資料會比較有效率, 但位元組誰看得懂啊...。 加上Kafka的特性是生產者與消費者並不能直接溝通, 因
Thumbnail
連接器故名思議就是兩個系統之間的橋樑, 而Kafka Connect正是扮演著這樣的角色, 如圖上, 我們可以透過Kafka Connect將SQL的資料導出到Kafka並導入到MySQL。 豐富的Plugin Confluent Hub提供了各式各樣的外掛套件, 包括了MongoDB、My
Thumbnail
前言 大家好上次我們教了如何下載 Docker Image 使用,今天我們要教如何產出自己的 Image,這次會使用一個 Dockerfile 範例,自行打包 Docker Image,初步練習使用 Docker 指令 下載 Dockerfile 教學檔案 在這裡我已經幫忙寫好一個 Docke
Thumbnail
前言 上次講到 Dockerfile、DockerImage、Docker Container 他們之間的關係,今天我們要來熟悉 Docker Image 如何使用,教你如何抓取雲端上的 Docker Image, Docker Image 下載來源 當我們今天要要使用 Docker Imag
Thumbnail
前言 上次我們初步體驗 Docker 快速佈署能力,今天我們要來講解 Dockerfile、Docker Image 與 Docker Container 這些常見的名詞,我們來了解在我們佈署的時候做哪些事情 Docker 佈署流程 首先看到如下圖上半部,在我們一個完整的佈署流程,我們會先將我
Thumbnail
前言 大家好在先前我們講了什麼是 Docker,Docker 好處有什麼以及怎麼安裝 Docker,今天我們要來開始初體驗 Docker 容器,使用後您會發現 Docker 非常的方便快速 Docker Hub 介紹 首先在開始學怎麼抓取 Docker Image 之前,我們要先來介紹 Doc
Thumbnail
前言 上次我們針對 Docker 這樣容器化技術做了一點介紹,今天我們要來講解 Docker 架構,你是否發現在每次程式上伺服器的流程很麻煩呢 ? 是否發現你寫的程式在別的作業系統不能用呢 ? 如果你遇到這些問題,Docker 都可以幫助你解決這些問題 Docker 架構 在 Docker 這
Thumbnail
前言 大家好我們今天要來教 Docker 這項技術,什麼是 Docker ? Docker 可以幫助我們做什麼事情 ? Docker 是一項容器化技術,他可以降低我們在佈署 App 時,讓我們可以有效的分配作業系統資源,降低佈署作業成本,現在讓我們來了解 Docker 要解決的問題 傳統佈署遇