如何在 AWS 使用 SASL/SCRAM 建立 MSK

更新於 2024/09/22閱讀時間約 3 分鐘

👨‍💻簡介

Amazon Managed Streaming for Apache Kafka(Amazon MSK)是 Amazon 推出的完全代管、具有高可用以及安全的 Apache Kafka 服務。在這篇文章中,會使用 AWS 建立 MSK,並使用 SASL/SCRAM 的驗證方式來完成整個設定。

🎯setup

建立叢集

進入到 MSK 頁面,然後創建叢集

raw-image
  1. 叢集設定

創建時選自訂建立,叢集類型我這邊使用的是已佈建,規格可以選最小的,版本使用 AWS 建議的,區域數量為3,達到高可用,最後叢集組態先使用預設即可

raw-image
raw-image
raw-image

2. 聯網

聯網的部分可以使用 AWS 幫你建立好的預設 VPC,我在前一步選擇三個可用區,因此會需要有三條不同區域的子網,安全群組也選擇預設的即可

raw-image

3. 安全

安全這邊選擇 SASL/SCRAM 身分驗證,加密靜態資料選擇使用 AWS 受管金鑰,待會會提到

SASL/SCRAM(Simple Authentication andSecurity Layer/ Salted Challenge Response Mechanism),是一種使用帳號和密碼來完成驗證的方式,但它的前提是 Client 端與 Proxy 之間必須使用 TLS 加密。

raw-image

4. 監控和標籤

這裡監控只要選基本的即可

raw-image

接著等到叢集建立完成後開始設定密鑰,建立叢集大概需要15分鐘

建立KMS Key

AmazonMSK 使用 Amazon Secret Manager 來儲存 MSK 使用的帳號與密碼。在設定 Secret Manager 之前,必須先在 Amazon KMS(Key Management Service) 創建一個使用者託管的 Key。

KMS是 Amazon 託管的密鑰管理服務

raw-image
  1. 設定金鑰

建立過程都先選默認的,如果後續要調整可以再改。

raw-image

2. 新增標籤

別名叫kafka-kms

raw-image

3. 定義金鑰管理許可

選擇自己的 IAM user

raw-image

4. 定義金鑰使用許可

一樣選擇自己的 IAM user 即可

raw-image

回到主頁就可以看到自己的金鑰建立完成了

raw-image

建立 SecretsManager

Amazon Secrets Manager 是密碼管理服務,可以把敏感資料儲存在這,不需要放在程式裡,可以透過 API 調用取得,避免在程式外洩。 AmazonMSK 通過 Secrets Manager 來儲存帳號與密碼。

這裡要注意一點,使用預設 AWS KMS 金鑰建立的密碼無法與 Amazon MSK 叢集搭配使用。

raw-image
raw-image
  1. 選擇機密類型
    在 Secrets Manager 中選擇其他類型的密碼,這邊要打上我們需要驗證的 username 以及 password,加密的金鑰要選擇前面建立的 KMS 密鑰。
raw-image

2. 設定機密

在建立密碼時,密碼名稱必須包含前綴為 AmazonMSK_

raw-image

3. 設定輪換

選擇預設即可

raw-image

關聯密鑰到 MSK Cluster

回到主頁面的屬性,往下滑找到關聯機密,選擇剛剛建立的機密即可

raw-image

設定權限

建立 topic 之前,我們需要先透過 Broker 去修改帳號的權限,接著開放公開存取。 在相同子網路上建立 ec2,透過私有連線下去修改權限,先去 Client 端主機下載相對應版本的 Kafka,可以透過 [此連結](Apache Kafka)查看。

wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz

下載完成後解壓縮。

tar -xzf kafka_2.13-3.8.0.tgz

如果機器上沒有安裝 java 也一併安裝

sudo yum -y install java-11

接著要先取得 BootstrapConnectString,將使用者權限設定完成。 在 MSK 主頁面點擊檢視用戶端資訊可以取得 BootstrapConnectString。

raw-image
raw-image

再來到 Client 端機器上的 kafka 目錄下,因為在執行權限設定時,需要進行身份驗證,所以需要建立一份驗證檔,取名為 client_sasl.properties,裡面會有 Secrets Manager 設定的帳號密碼,將這檔案建立在 kafka 目錄下

client_sasl.properties 內容如下

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alan" \
password="alan-secret";

執行以下命令來修改權限,替換 BootStrapConnectString 為剛剛複製的。如果 Cluster 的節點數小於3個,replication-factor要設定小於3。

export TOPIC="*"
export USER="alan"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-1.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-3.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096"

## 允許使用者所有權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALL \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image

以下是單獨權限設定

## 授予指定的使用者建立指定的主題。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Create \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定使用者對所有群組和指定主題的讀取權限。
## 該使用者可以從指定的主題讀取消息,並且適用於所有消費者群組。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Read --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定的使用者對指定的主題的寫入權限。
## 該使用者可以向指定的主題發送消息。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Write \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定的使用者對所有群組和指定的主題的描述權限。
## 該使用者可以查看主題和群組的元數據信息,例如消費者的偏移量和配置詳情。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Describe --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 授予指定的使用者對指定的主題的刪除權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation DELETE \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image
## 允許使用者修改資源(如 topic 的配置)。這個權限對於更改 topic partition 數或設定非常重要。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALTER \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image

設定完權限後,即可開始透過外網建立 topic

開放外網存取

使用外網存取 msk,必須先開啟公開存取,因此須先設定叢集組態檔,將 allow.everyone.if.no.acl.found=false 加上去,讓叢集使用這份組態檔

點擊左側菜單欄的叢集組態 -> 建立叢集組態 -> 加上 allow.everyone.if.no.acl.found=false

raw-image
raw-image
raw-image

接著回到叢集裡面的屬性,替換剛剛新增的設定檔

raw-image
raw-image

好了之後回到叢集裡面的屬性,編輯聯網設定,開啟公開存取

raw-image
raw-image

使用帳號與密碼連接到MSK Cluster

可以存取外網後,就可以開始測試外部 Client 端是否可以操作了。先到一台需要透過外網存取的機器,先取得外網 ip,將這 ip 加到MSK 的 SG 裡,開放可以連到 9196 port

raw-image
raw-image

安裝kafka以及java,並到 kafka 目錄下設定 client_sasl.properties

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alan" \
password="alan-secret";

首先先建立 topic,一樣使用 Broker 的連線資訊,但是是使用外網,先到叢集那邊複製公有端點

raw-image

到 kafka 目錄下執行指令建立 topic

export TOPIC="mytopic"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-topics.sh --create --bootstrap-server $BootStrapConnectString \
--replication-factor 3 \
--partitions 1 \
--topic $TOPIC \
--command-config client_sasl.properties
raw-image

接著開啟兩個終端視窗,使用 kafka-console,讓第一個視窗成為producer

export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-console-producer.sh --broker-list $BootStrapConnectString \
--topic $TOPIC \
--producer.config client_sasl.properties

接著讓第二個視窗成為consumer

export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"

./bin/kafka-console-consumer.sh --bootstrap-server $BootStrapConnectString \
--topic $TOPIC \
--consumer.config client_sasl.properties

執行後可以看到 Producer 會將訊息傳送出去,並且 Consumer 可以成功接收到訊息

raw-image

如果要用 python 來測試,須先安裝 boto3 以及 kafka-python package,並且設定好 aws-cli,因為需要取得 secret

pip install boto3
pip install kafka-python

建立一份 producer.py,成為producer

import boto3
import base64
import json
from kafka import KafkaProducer
from kafka import TopicPartition

def get_secret():
secret_name = "AmazonMSK_kfk01"
region_name = "ap-northeast-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
# For a list of exceptions thrown, see
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
raise e
secret = get_secret_value_response['SecretString']
return json.loads(secret)
topic = 'mytopic'
secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
producer = KafkaProducer(bootstrap_servers=BootstrapBroker_String,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=secret['username'],
sasl_plain_password=secret['password'],
)
try:
while True:
message = input("請輸入您要發送的消息 (輸入 'exit' 退出): ")
if message.lower() == 'exit':
print("結束輸入,關閉生產者...")
break
# 將用戶輸入的消息編碼為 bytes
producer.send(topic, message.encode('utf-8'))
producer.flush() # 確保消息已經被發送
print("消息發送成功")
except Exception as e:
print(f"發送消息失敗: {e}")
finally:
producer.close()
print("Kafka 生產者已關閉。")

建立一份consumer.py,成為 consumer

import boto3
import base64
import json
from kafka import KafkaConsumer
from kafka import TopicPartition

def get_secret():
secret_name = "AmazonMSK_kfk01"
region_name = "ap-northeast-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
secret = base64.b64decode(get_secret_value_response['SecretBinary'])
return json.loads(secret)
TOPIC = "mytopic"
secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
consumer = KafkaConsumer(TOPIC, bootstrap_servers=BootstrapBroker_String,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=secret['username'],
sasl_plain_password=secret['password']
)
print("開始接收消息...")
try:
for message in consumer:
print(f"接收到消息: {message.value.decode('utf-8')}")
except Exception as e:
print(f"接收消息失敗: {e}")
finally:
consumer.close()
raw-image

📚Reference

avatar-img
17會員
83內容數
golang
留言0
查看全部
avatar-img
發表第一個留言支持創作者!
Alan的開發者天地 的其他內容
本文介紹如何使用 MongoDB 的命令行工具 Mongorestore 將先前備份的資料還原到資料庫中。Mongorestore 支援資料庫的整體恢復、特定集合的恢復,以及從遠端伺服器進行恢復等功能。無論是初學者還是有經驗的使用者,都能夠快速掌握如何使用 Mongorestore 工具。
👨‍💻簡介 在資料庫管理和系統維護中,備份是非常重要的一環。對於使用 MongoDB 的開發者和資料庫管理員來說,mongodump 是一個非常實用的命令行工具,能夠快速且輕鬆地完成資料庫的備份和恢復。無論是進行資料遷移、系統升級,還是面對突發的故障,mongodump 都能提供穩定的資料保護
本文介紹如何對 Telegram 憑證監控機器人的代碼進行優化,包括新增指令、讀取變數、提高可讀性和可維護性。
在這篇文章中,將繼續介紹 TG Bot 整合 MongoDB 的相關操作。主要包括對 domain 進行驗證操作,使用的工具有 Python 、MongoDB 和 TG Bot。具體的功能需求包括新增 domain 前檢查 domain 憑證以及透過 TG Bot 檢查所有 domain 是否過期。
本文介紹如何使用 TG Bot 來操作 MongoDB,包括讀取所有 domain、讀取特定 domain、新增 domain、批量新增 domain、修改 domain 和刪除 domain。透過 TG Bot 的指令操作,實現了自動化管理和多環境管理。
👨‍💻 簡介 今天這篇主要是帶大家快速建立屬於自己的 Telegram bot,申請 bot 的部分我會附上網址,請準備好之後再來開始。 🛠️ 使用工具 Python 3.9.6 TG Bot 📝 功能需求 輸入指令讓 TG Bot 回傳訊息 接受傳入參數並進行簡單回傳 設定
本文介紹如何使用 MongoDB 的命令行工具 Mongorestore 將先前備份的資料還原到資料庫中。Mongorestore 支援資料庫的整體恢復、特定集合的恢復,以及從遠端伺服器進行恢復等功能。無論是初學者還是有經驗的使用者,都能夠快速掌握如何使用 Mongorestore 工具。
👨‍💻簡介 在資料庫管理和系統維護中,備份是非常重要的一環。對於使用 MongoDB 的開發者和資料庫管理員來說,mongodump 是一個非常實用的命令行工具,能夠快速且輕鬆地完成資料庫的備份和恢復。無論是進行資料遷移、系統升級,還是面對突發的故障,mongodump 都能提供穩定的資料保護
本文介紹如何對 Telegram 憑證監控機器人的代碼進行優化,包括新增指令、讀取變數、提高可讀性和可維護性。
在這篇文章中,將繼續介紹 TG Bot 整合 MongoDB 的相關操作。主要包括對 domain 進行驗證操作,使用的工具有 Python 、MongoDB 和 TG Bot。具體的功能需求包括新增 domain 前檢查 domain 憑證以及透過 TG Bot 檢查所有 domain 是否過期。
本文介紹如何使用 TG Bot 來操作 MongoDB,包括讀取所有 domain、讀取特定 domain、新增 domain、批量新增 domain、修改 domain 和刪除 domain。透過 TG Bot 的指令操作,實現了自動化管理和多環境管理。
👨‍💻 簡介 今天這篇主要是帶大家快速建立屬於自己的 Telegram bot,申請 bot 的部分我會附上網址,請準備好之後再來開始。 🛠️ 使用工具 Python 3.9.6 TG Bot 📝 功能需求 輸入指令讓 TG Bot 回傳訊息 接受傳入參數並進行簡單回傳 設定
你可能也想看
Google News 追蹤
Thumbnail
*合作聲明與警語: 本文係由國泰世華銀行邀稿。 證券服務係由國泰世華銀行辦理共同行銷證券經紀開戶業務,定期定額(股)服務由國泰綜合證券提供。   剛出社會的時候,很常在各種 Podcast 或 YouTube 甚至是在朋友間聊天,都會聽到各種市場動態、理財話題,像是:聯準會降息或是近期哪些科
Thumbnail
透過充分利用 AWS Organizations 和 CloudFormation StackSets,您可以更好地實現企業級的雲端管理與控制,為業務的持續發展提供穩固的技術支撐。
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
AWS DataSync 是一種線上資料移動和探索服務,可簡化並加速向 AWS 的資料遷移,以及在內部部署儲存、邊緣節點、其他雲端和 AWS 儲存服務移入和移出資料[1]。 在某些架構上會,使用該服務會需要安裝 DataSync Agent 來傳輸檔案 您需要 DataSync Agen
Thumbnail
為什麼會有Schema Registry的出現? 因為Kafka的零拷貝原則, 也就是kafka本身並不會去碰觸到訊息也不進行資料驗證, 而是bypass的傳送, 預設都以位元組來傳輸資料會比較有效率, 但位元組誰看得懂啊...。 加上Kafka的特性是生產者與消費者並不能直接溝通, 因
Thumbnail
雲端已經成為App開發的核心,而Amazon的AWS(Amazon Web Services是開發者常用的平台,可以幫助開發者建立、整合和擴展App。
Thumbnail
這篇文章教你如何搭建Kubernetes Cluster,包括節點安裝前設定、軟體套件安裝、Control-Plane部署和加入運算節點等步驟。在建置之後,作者會分享一些基礎服務的安裝。希望這篇文章對你有所幫助。
Thumbnail
本文章將說明如果您想要從頭建置一組具有Loadbalancer HA架構的Kubernetes Cluster時,你可能會需要做的事前準備工作。
Thumbnail
在沒有分環境之前,每一隻lambda只有一個code console給所有人一起編輯,開發好了就deploy,根據設定的trigger觸發執行。 現在我們希望能夠在code console開發,然後deploy到不同的stage,目標是不同stage的api gateway能夠調用該lambda的
Thumbnail
*合作聲明與警語: 本文係由國泰世華銀行邀稿。 證券服務係由國泰世華銀行辦理共同行銷證券經紀開戶業務,定期定額(股)服務由國泰綜合證券提供。   剛出社會的時候,很常在各種 Podcast 或 YouTube 甚至是在朋友間聊天,都會聽到各種市場動態、理財話題,像是:聯準會降息或是近期哪些科
Thumbnail
透過充分利用 AWS Organizations 和 CloudFormation StackSets,您可以更好地實現企業級的雲端管理與控制,為業務的持續發展提供穩固的技術支撐。
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
AWS DataSync 是一種線上資料移動和探索服務,可簡化並加速向 AWS 的資料遷移,以及在內部部署儲存、邊緣節點、其他雲端和 AWS 儲存服務移入和移出資料[1]。 在某些架構上會,使用該服務會需要安裝 DataSync Agent 來傳輸檔案 您需要 DataSync Agen
Thumbnail
為什麼會有Schema Registry的出現? 因為Kafka的零拷貝原則, 也就是kafka本身並不會去碰觸到訊息也不進行資料驗證, 而是bypass的傳送, 預設都以位元組來傳輸資料會比較有效率, 但位元組誰看得懂啊...。 加上Kafka的特性是生產者與消費者並不能直接溝通, 因
Thumbnail
雲端已經成為App開發的核心,而Amazon的AWS(Amazon Web Services是開發者常用的平台,可以幫助開發者建立、整合和擴展App。
Thumbnail
這篇文章教你如何搭建Kubernetes Cluster,包括節點安裝前設定、軟體套件安裝、Control-Plane部署和加入運算節點等步驟。在建置之後,作者會分享一些基礎服務的安裝。希望這篇文章對你有所幫助。
Thumbnail
本文章將說明如果您想要從頭建置一組具有Loadbalancer HA架構的Kubernetes Cluster時,你可能會需要做的事前準備工作。
Thumbnail
在沒有分環境之前,每一隻lambda只有一個code console給所有人一起編輯,開發好了就deploy,根據設定的trigger觸發執行。 現在我們希望能夠在code console開發,然後deploy到不同的stage,目標是不同stage的api gateway能夠調用該lambda的