Telegram 憑證監控機器人實作 EP3 - 整合 MongoDB

閱讀時間約 15 分鐘


raw-image

👨‍💻 簡介

這一部分主要讓 TG Bot 可以接收指令,對 MongoDB 進行增刪改查,讓我能夠透過 TG Bot 來新增、修改、刪除 domain,這樣就能夠做到自動化管理,而且可以做到多環境的管理。

🛠️ 使用工具

  • Python 3.9.6
  • MongoDB
  • TG Bot

📝 功能需求

  • 透過 TG Bot 讀取 MongoDB 所有 domain
  • 透過 TG Bot 讀取 MongoDB 特定 domain
  • 透過 TG Bot 新增 Domain 到 MongoDB
  • 透過 TG Bot 新增同一 env 底下多組 domain 到 MongoDB
  • 透過 TG Bot 修改 MongoDB Domain
  • 透過 TG Bot 刪除 MongoDB Domain

🎯Setup

  1. 透過 TG Bot 讀取 MongoDB 所有 domain

先將原本的 MongoDB 啟動起來,並放入值,接著來去呼叫 load_domain_envs_from_mongodb 這個 function

import telebot
from mongo import init_mongo_client, get_collection, load_domain_envs_from_mongodb

mongodb_uri = "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"
TOKEN = "your-token"
bot = telebot.TeleBot(TOKEN)

@bot.message_handler(commands=["get_all"])
def handle_get_all_command(message):
domain_data = load_domain_envs_from_mongodb(collection)
bot.send_message(message.chat.id, f"{domain_data}", parse_mode="Markdown")

if __name__ == "__main__":
client = init_mongo_client(mongodb_uri)
collection_name = "domains"
collection = get_collection(client, collection_name)
bot.infinity_polling()

接著就可以到 TG 輸入指令測試

raw-image

輸出的格式是 Python 物件,想要可讀性高一點可以使用 yaml Library 轉成 yaml 格式:

def convert_to_yaml(python_obj):
return yaml.dump(python_obj, allow_unicode=True)

@bot.message_handler(commands=["get_all"])
def handle_get_all_command(message):
domain_data = load_domain_envs_from_mongodb(collection)
yaml_domain_data = convert_to_yaml(domain_data)
bot.send_message(message.chat.id, f"{yaml_domain_data}", parse_mode="Markdown")

yaml.dump 可以很方便的將 Python 對象序列化為 yaml 格式的 string。

重新到 TG 輸入指令後結果會如圖:

raw-image

接下來就能進行第二步,讀取特定 domain。

2. 透過 TG Bot 讀取 MongoDB 特定 Domain

會需要這功能主要是為了之後整合監控憑證,會需要將特定 domain 查詢是否過期:

@bot.message_handler(commands=["get"])
def handle_get_command(message):
try:
# 從 message 中解析參數
_, get_env, get_domain = message.text.split(maxsplit=2)
except ValueError:
# 如果參數數量不正確,回復用戶正確的使用方式
bot.reply_to(
message,
"使用方式不正確。請按照以下格式輸入:\n/get <env> <domain>",
)
return

# 調用取得 domain MongoDB 的函數
get_result = get_domain_from_mongodb(collection, get_env, get_domain)
print("get result", get_result)
# 根據操作結果回復用戶
if get_result:
domain_info_yaml = convert_to_yaml(get_result)
bot.reply_to(message, f"{domain_info_yaml}", parse_mode="Markdown")
else:
bot.reply_to(message, f"在 {get_env} 環境中未找到域名 {get_domain} 的訊息。")

接著到 TG 輸入測試,結果如圖:

raw-image

以上就是查詢的部分,接著來做新增。

3. 透過 TG Bot 新增 Domain 到 MongoDB

新增需要傳入參數,因此會需要在 TG 的函數裡取得對應的參數,然後去呼叫 add_domain_to_mongodb 函數:

@bot.message_handler(commands=["add"])
def handle_add_command(message):
try:
# 從 message 中解析參數
_, env, domain = message.text.split(maxsplit=2)
except ValueError:
# 如果參數數量不正確,回復用戶正確的使用方式
bot.reply_to(
message, "使用方式不正確。請按照以下格式輸入:\n/add <env> <domain>"
)
return

# 調用添加域名到 MongoDB 的函數
add_successful = add_domain_to_mongodb(collection, env, domain)
# 根據操作結果回復用戶
if add_successful:
bot.reply_to(message, "域名添加成功。")
else:
bot.reply_to(message, "域名添加失敗,請檢查輸入的數據。")

接著去 TG 輸入測試:

raw-image

基本的新增這樣就完成了,但如果域名很多,會需要每次都輸入一次 env,因此新增一個 bulk_add 的功能,在每次要將多組域名放在同一個 env 底下時會很有用。

4. 透過 TG Bot 新增同一 env 底下多組 domain 到 MongoDB

這裡會使用到跟 add_domain_to_mongodb 一樣的功能,都是 addToSet,但差別在於會使用到 $each,這個操作符允許一次性向 MongoDB 添加多個唯一的元素:

def bulk_add_domains_to_mongodb(collection, env, domains):
# 使用 $addToSet 和 $each 來同時添加多個唯一的域名到相同的 env 中
result = collection.update_one(
{"env": env}, {"$addToSet": {"domains": {"$each": domains}}}, upsert=True
)
if result.matched_count > 0 or result.upserted_id is not None:
print("域名已成功批量添加或更新。")
return True
else:
print("域名批量添加或更新失敗。")
return False

接著到 main.py 新增指令:

@bot.message_handler(commands=["bulk_add"])
def handle_bulk_add_command(message):
# 將命令解析為 env 和多個 domain
parts = message.text.split()
if len(parts) < 3:
bot.reply_to(
message,
"使用方式不正確。請按照以下格式輸入:\n/bulk_add <env> <domain1> <domain2> ...",
)
return

env = parts[1]
domains = parts[2:]
# 調用批量添加域名到 MongoDB 的函數
success = bulk_add_domains_to_mongodb(collection, env, domains)
# 根據操作結果回傳給使用者
if success:
bot.reply_to(message, "域名批量添加成功。")
else:
bot.reply_to(message, "域名批量添加失敗,請檢查輸入的數據。")

最後試著在 TG 執行:

raw-image

這樣批量新增的部分就完成了,接下來是做修改。

5. 透過 TG Bot 修改 MongoDB Domain

修改會需要傳入三個參數,分別是 envorigin_domainnew_domain,因此跟新增一樣需要去取得傳入的參數並帶入 update_domain_in_mongodb 函數:

@bot.message_handler(commands=["edit"])
def handle_edit_command(message):
try:
# 從 message 中解析參數
_, update_env, origin_domain, new_domain = message.text.split(maxsplit=3)
except ValueError:
# 如果參數數量不正確,回覆用戶正確的使用方式
bot.reply_to(
message,
"使用方式不正確。請按照以下格式輸入:\n/edit <env> <old_domain> <new_domain>",
)
return

# 調用更新 MongoDB 的函數
update_result = update_domain_in_mongodb(
collection, update_env, origin_domain, new_domain
)
# 根據操作結果回覆用戶
if update_result:
bot.reply_to(message, "域名更新成功。")
else:
bot.reply_to(message, "域名更新失敗,請檢查輸入的數據。")

一樣到 TG 測試

raw-image

修改完成後,最後就剩刪除了

6. 透過 TG Bot 刪除 MongoDB Domain

刪除就簡單很多,傳入 env 以及 domain 並調用 delete_domain_in_mongodb 就好了:

@bot.message_handler(commands=["del"])
def handle_delete_command(message):
try:
# 從 message 中解析參數
_, env, domain = message.text.split(maxsplit=2)
except ValueError:
# 如果參數數量不正確,回覆用戶正確的使用方式
bot.reply_to(
message, "使用方式不正確。請按照以下格式輸入:\n/del <env> <domain>"
)
return

# 調用從 MongoDB 刪除域名的函數
delete_successful = delete_domain_in_mongodb(collection, env, domain)
# 根據操作結果回覆用戶
if delete_successful:
bot.reply_to(message, "域名刪除成功。")
else:
bot.reply_to(message, "域名刪除失敗,請檢查輸入的數據。")

以上這樣就完成了 TG Bot 與 MongoDB 的整合了。 下一篇文章會將原本使用 ssl Library 撈取過期時間的部分也進行整合。

如果想看完整程式碼可以參考這裡 🔗 專案 repo –> ep3-tg-with-mongo

17會員
78內容數
golang
留言0
查看全部
發表第一個留言支持創作者!