2024-05-09|閱讀時間 ‧ 約 45 分鐘

Telegram 憑證監控機器人實作 EP1 — 讀取 MongoDB Domain Info

raw-image

👨‍💻 簡介

上次做的憑證監控已經可以正常運作了,但這次希望能夠不從 yaml 讀取 domain info,而是從 MongoDB 進行讀取,方便未來的擴充性。

這次的重點是要透過 Python 連接 MongoDB,並且透過 Python 讀取 MongoDB,最後透過 Python 寫入 MongoDB。

🛠️ 使用工具

  • Python 3.9.6
  • MongoDB
  • Mongoshell
  • Docker
  • Docker-Compose

📝 功能需求

  • 建立 MongoDB docker-compose
  • 透過 Python 連接 MongoDB
  • 透過 Python 讀取 yaml 並寫入 MongoDB
  • 透過 Python 傳入 env 以及 domain 寫入 MongoDB
  • 透過 Python 讀取 MongoDB
  • 透過 Python 修改 MongoDB
  • 透過 Python 刪除 MongoDB

🎯Setup

  1. 建立 MongoDB docker-compose

要簡單使用 MongoDB 可以用 docker-compose 快速拉起:

version: "3.1"

services:
mongodb:
image: mongo:latest
container_name: mongodb_container
environment:
MONGO_INITDB_ROOT_USERNAME: rootuser
MONGO_INITDB_ROOT_PASSWORD: rootpass
ports:
- "27017:27017"
volumes:
- mongodb_data_container:/data/db
volumes:
mongodb_data_container:
docker-compose up -d

2. 透過 Python 連接 MongoDB

在 Python Library 中,可以使用 PyMongo操作 MongoDB。 要使用這個 Library 要先透過安裝:

python3 -m pip install pymongo

接著可以簡單操作一筆資料看是否正常:

# 導入 pymongo 模塊中的 MongoClient 類,用於與 MongoDB 資料庫進行連接
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

# 定義 MongoDB 連接 URI,包含用戶名、密碼、服務器地址、端口和資料庫名
mongodb_uri = "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"

try:
# 嘗試連接 MongoDB
client = MongoClient(mongodb_uri)
# 嘗試獲取服務器訊息,以確認連接
info = client.server_info() # 會在連接失敗時拋出 ConnectionFailure 異常
print("MongoDB 連接成功。Server Info:", info)
except ConnectionFailure:
print("MongoDB 連接失敗。請檢查您的連接設置和Server狀態。")

輸出如下:

MongoDB 連接成功。Server Info: {'version': '7.0.8', 'gitVersion':
'c5d33e55ba38d98e2f48765ec4e55338d67a4a64', 'modules': [], 'allocator': 'tcmalloc',
'javascriptEngine': 'mozjs', 'sysInfo': 'deprecated', 'versionArray': [7, 0, 8, 0], 'openssl':
{'running': 'OpenSSL 3.0.2 15 Mar 2022', 'compiled': 'OpenSSL 3.0.2 15 Mar 2022'},
'buildEnvironment': {'distmod': 'ubuntu2204', 'distarch': 'aarch64', 'cc': '/opt/mongodbtoolchain/v4/
bin/gcc: gcc (GCC) 11.3.0', 'ccflags': '-Werror -include mongo/platform/basic.h -ffp-contract=off
...

稍微包裝一下:

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

mongodb_uri = "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"

def init_mongo_client(mongodb_uri):
try:
# 嘗試連接 MongoDB
client = MongoClient(mongodb_uri)
# 嘗試獲取服務器訊息,以確認連接
info = client.server_info() # 會在連接失敗時拋出 ConnectionFailure 異常
mongodb_version = info['version']
print("MongoDB 連接成功。Mongo 版本為", mongodb_version)
return client
except ConnectionFailure:
print("MongoDB 連接失敗。請檢查您的連接設置和Server狀態。")

client = init_mongo_client(mongodb_uri)

這樣就代表我們成功連線到 MongoDB 了。

3. 透過 Python 讀取 yaml 並寫入 MongoDB

接下來我想要將讀取的 domains.yaml 寫入 M ongo ,會用到之前的 load_data_from_yaml

client = init_mongo_client(mongodb_uri)
db = client.get_default_database()

# 定義要操作的集合名稱
collection_name = "domains"
collection = db[collection_name]
yaml_file_path = "domains.yaml"
domain_data = load_data_from_yaml(yaml_file_path, "domain_envs")
# 每次執行迴圈都會取得一個鍵值對,env 是 key,格式為 string,
# 而 value 則會是 domains,格式是 list
for env, domains in domain_data.items():
print(f"Writing domains for env {env}: {domains}")
# 這裡組成一個 document,方便儲存到 Mongo 裡
document = {
"env": env,
"domains": domains
}
# 更新條件,匹配那些 env 字段等於當前 env 值的 document
query = {"env": env}
# 將匹配的 document 的內容設為 `document` 字典中的內容
# 使用 upsert=True,如果不存在則插入,存在則更新
collection.update_one(query, {"$set": document}, upsert=True)
print("資料已成功寫入 MongoDB。")
在 Mongo 中,如果指定的集合 (collection) 不存在,當你進行第一次寫入操作(如插入文檔)時,MongoDB 會自動創建這個集合。

集合的創建是懶惰的(lazy),意味著直到你對集合進行了第一次寫入操作(例如使用 `insert_one``insert_many``update_one` 等方法),集合才會被實際創建。

這種設計使得在 MongoDB 中處理集合非常靈活,你不需要事先創建集合就可以開始開發和測試你的應用程序。MongoDB 會根據需要自動管理集合的創建。

儲存要注意的地方是格式,這裡因為使用的是 Mongo,所以我們這邊將讀取的格式組成一個 document。

在 Mongo 中,每個 document 都是以 BSON(Binary JSON) 格式儲存,這是一種類似於 JSON 的格式。

每個文檔都是由鍵值對(key-value pairs)組成的資料結構,其中每個鍵(key)是一個字符串,而值(value)可以是不同類型的資料類型,包括但不限於字符串、數字、布爾值、列表(在 BSON 中稱為)、甚至是嵌套的文檔。

可參考官方文檔[官方文檔](https://www.mongodb.com/docs/manual/reference/bson-types/#bson-types)

update_one基本語法如下:

collection.update_one(filter, update, upsert=False)
  • filter:一個字典,用於指定查詢條件,以匹配需要更新的文檔。
  • update:一個字典,用於指定如何更新匹配的文檔。這通常涉及到 MongoDB 的更新操作符,如 $set$unset 等。
  • upsert:一個可選的 boolean 值,默認為 False。如果設置為 True,當沒有文檔匹配 filter 查詢條件時,update 操作將會作為一個新的 doc 被插入到集合中。

寫入成功後可以透過 MongoShell 或是 Mongo Compass,這邊使用 MongoShell,在 docker-compose.yaml 添加 MongoShell 的 service:

services:
mongoshell:
image: mongo:latest
container_name: mongodb_mongoshell
depends_on:
- mongodb
entrypoint:
[
"mongosh",
"--host",
"mongodb",
"--username",
"rootuser",
"--password",
"rootpass",
"--authenticationDatabase",
"admin",
]
stdin_open: true
tty: true

接著透過以下指令進入 mongoshell:

docker-compose run mongoshell

要查看 collection 需執行以下 query:

use mydatabase
db.domains.find({})

這段代碼用意如下:

  • 選擇 mydatabase db
  • 查詢 domains collection 中所有的 document

輸出類似以下訊息:

[
{
_id: ObjectId('66150a22e1a8ac17b898a2f0'),
env: 'live',
domains: [ 'google.com', 'en.wikipedia.org' ]
}
]

讓我們稍微優化一下程式碼:

def write_domain_data_to_mongodb(mongo_client, collection_name, domain_data):
db = mongo_client.get_default_database()
collection = db[collection_name]

for env, domains in domain_data.items():
document = {
"env": env,
"domains": domains
}
# 更新條件,這裡假設 env 是唯一的
query = {"env": env}
# 使用 upsert=True,如果不存在則插入,存在則更新
collection.update_one(query, {"$set": document}, upsert=True)
print("數據已成功寫入 MongoDB。")

client = init_mongo_client(mongodb_uri)
collection_name = "domains"
yaml_file_path = "domains.yaml"
domain_data = load_data_from_yaml(yaml_file_path, "domain_envs")
write_domain_data_to_mongodb(client, collection_name, domain_data)

這樣就完成了從 yaml 進行寫入,接著需要撰寫另一套,透過傳入 env 以及 domain 資訊來進行寫入。

4. 透過 Python 傳入 env 以及 domain 寫入 MongoDB

會需要這個功能是因為之後要透過 TG Bot 傳入 env 以及 domain 進行寫入的操作:

def add_domain_to_mongodb(collection, env, domain):
# 嘗試添加或更新該 env 的域名
result = collection.update_one(
{"env": env}, {"$addToSet": {"domains": domain}}, upsert=True
)
if result.matched_count > 0 or result.upserted_id is not None:
print("域名已成功添加或更新。")
return True
else:
print("域名添加或更新失敗。")
return False

client = init_mongo_client(mongodb_uri)
db = client.get_default_database()
collection_name = "domains"
collection = db[collection_name]
add_env = "dev"
add_domain = "test.com"
add_domain_to_mongodb(collection, add_env, add_domain)

使用 addToSet 是為了確保新增時,如果已存在不會重複新增,確保每個 domain 的唯一性。

新增的部分告一段落,接著來進行讀取的部分。

5. 透過 Python 讀取 MongoDB

讀取可以透過 collection.find({}) 進行查詢:

db = client.get_default_database()
collection = db[collection_name]
try:
domain_envs = {}
data = collection.find({})
for item in data:
env = item.get("env")
domains = item.get("domains", [])
if env and domains:
domain_envs[env] = domains
print(domain_envs)
except Exception as e:
print(f"從 MongoDB 讀取數據失敗: {e}")
finally:
client.close()

接著優化一下代碼:

def load_domain_envs_from_mongodb(mongo_client, collection_name):
db = mongo_client.get_default_database()
collection = db[collection_name]
try:
domain_envs = {}
data = collection.find({})
for item in data:
env = item.get("env")
domains = item.get("domains", [])
if env and domains:
domain_envs[env] = domains
return domain_envs
except Exception as e:
print(f"從 MongoDB 讀取數據失敗: {e}")
return {}
finally:
client.close()

這樣就算是完成了基本的讀取,接下來要做取得單一 domain 資訊:

def get_domain_from_mongodb(collection, env, domain):
# 構造查詢條件
query = {"env": env, "domains": domain}
# 執行查詢操作
result = collection.find_one(query)
print("get result",result)
if result:
# 找到了相應的文檔,返回域名訊息
print(f"在環境 '{env}' 下找到域名 '{domain}' 的訊息。")
return result
else:
# 沒有找到相應的文檔
print(f"在環境 '{env}' 下未找到域名 '{domain}' 的訊息。")
return None

client = init_mongo_client(mongodb_uri)
db = client.get_default_database()
collection_name = "domains"
collection = db[collection_name]
get_env = "live"
get_domain = "google.com"
get_domain_from_mongodb(collection, get_env, get_domain)

讀取的部分告一段落,接著來進行修改的部分。

6. 透過 Python 修改 MongoDB

假設我們目前的 MongoDB 資料如下:

[
{
_id: ObjectId('66150a22e1a8ac17b898a2f0'),
env: 'live',
domains: [ 'google.com', 'en.wikipedia.org' ]
}
]

我打算將 google.com 改成 github.com,只需要將原本用來新增的 update_one 裡的 query 多一個 domains 欄位:

def update_domain_in_mongodb(collection, env_value, old_domain, new_domain):
# 建立查詢條件和更新動作
query = {"env": env_value, "domains": old_domain}
update_action = {"$set": {"domains.$": new_domain}}

# 執行更新操作
update_result = collection.update_many(query, update_action)
if update_result.matched_count > 0:
print(
f"成功更新文檔。匹配數量: {update_result.matched_count}, 修改數量: {update_result.modified_count}."
)
else:
print("未找到匹配的文檔或域名,更新未執行。")

db = client.get_default_database()
collection = db[collection_name]
env_value = "live"
origin_domain = "google.com"
new_domain = "github.com"

update_domain_in_mongodb(collection, env_value, origin_domain, new_domain)

這裡有用到 $ 佔位符,主要是將查詢語句的第一個值來做 update, 會先查找 env 等於 env_value 並且 domains 等於 old_domain, 接著將 domains list 中第一個匹配的 old_domain 元素更新為 new_domain

可參考官網的操作符文檔

7. 透過 Python 刪除 MongoDB

最後一步要做的是刪除,能夠在指定的 env 刪除 domain

def delete_domain_in_mongodb(collection, env_value, domain_to_delete):
query = {"env": env_value}
delete_action = {"$pull": {"domains": domain_to_delete}}
collection.update_one(query, delete_action)

db = client.get_default_database()
collection = db[collection_name]
env_value = "live"
delete_domain = "github.com"
delete_domain_in_mongodb(collection, env_value, delete_domain)

刪除一樣用 update 方法,然後使用 $pull 操作符,刪除指定的項目,官方文檔在這。

MongoDB 操作的部分就先告一段落,下篇會教如何打造屬於自己的 Telegram Bot,讓我的機器人能夠接收指令。

如果想看完整程式碼可以參考這裡 🔗 專案 repo –> ep1-mongo-setup

📚Reference

分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.