2024-06-20|閱讀時間 ‧ 約 35 分鐘

Telegram 憑證監控機器人實作 EP5 — 代碼優化

raw-image


👨‍💻 簡介

在這篇文章中,我們將對 Telegram 憑證監控機器人的代碼進行優化。我們會新增一些簡單的指令,並且將部分變數改為從 yaml 檔案中讀取,而不是直接寫死在程式碼中。這些改進目的是提高可讀性和可維護性。

🛠️ 使用工具

  • Python 3.9.6
  • MongoDB
  • TG Bot

📝 功能需求

  • 新增 TG 指令 show_cert_info
  • 新增 TG 指令 help
  • 調整參數讀取

🎯Setup

1. 新增 TG 指令 show_cert_info

這裡會用到之前寫的 get_ssl_cert 函數,可以印出證書來看一下有什麼資訊:

{'subject': ((('countryName', 'US'),), (('stateOrProvinceName', 'California'),), (('localityName', 'San Francisco'),), (('organizationName', 'Wikimedia Foundation, Inc.'),), (('commonName', '*.wikipedia.org'),)), 'issuer': ((('countryName', 'US'),), (('organizationName', 'DigiCert Inc'),), (('commonName', 'DigiCert TLS Hybrid ECC SHA384 2020 CA1'),)), 'version': 3, 'serialNumber': '07419E39583A4C76CF1EA14347FA5F3A', 'notBefore': 'Oct 18 00:00:00 2023 GMT', 'notAfter': 'Oct 16 23:59:59 2024 GMT', 'subjectAltName': (('DNS', '*.wikipedia.org'), ('DNS', 'wikimedia.org'), ('DNS', 'mediawiki.org'), ('DNS', 'wikibooks.org'), ('DNS', 'wikidata.org'), ('DNS', 'wikinews.org'), ('DNS', 'wikiquote.org'), ('DNS', 'wikisource.org'), ('DNS',
....

將上述 SSL 證書資訊整理成列表形式:

  • subject:證書的主題,包含證書擁有者的詳細資訊,如國家名稱、省份名稱、地區名稱、組織名稱和通用名稱。
  • issuer:簽發者的資訊,包含簽發該 SSL 證書的機構的詳細資訊。
  • version:證書的版本號,目前有三個版本:v1、v2 和 v3。SSL/TLS 交易通常使用版本 3。
  • serialNumber:證書的序列號,簽發者賦予的唯一標識符。
  • notBefore:證書的生效日期,表示該證書在此日期之前不可用。
  • notAfter:證書的到期日期,表示該證書在此日期之後不再有效。
  • subjectAltName:主體別名,包括證書可以使用的其他域名列表,使一個證書可以對多個域名有效。
  • OCSP:在線證書狀態協定(Online Certificate Status Protocol)的網址,用於實時檢查 SSL 證書的撤銷狀態。
  • caIssuers:指向簽發此證書的 CA 證書的 URL,客戶端可以從這個位置獲取簽發者的證書。
  • crlDistributionPoints:指向證書撤銷列表(Certificate Revocation List, CRL)的 URL,客戶端可以從這些位置檢查證書是否被撤銷。

這個列表整理了 SSL 證書中的關鍵資訊,幫助理解和快速查找每個部分的功能和用途。

可以依照自己的需求看需要什麼資訊,這裡我新增一個新的函數,來解析取得的證書訊息:

def parse_ssl_cert_info(domain, cert):
issuer = dict(x[0] for x in cert["issuer"])
subject = dict(x[0] for x in cert["subject"])
issued_to = subject.get("commonName", subject.get("organizationName", ""))
issued_by = issuer.get("commonName", issuer.get("organizationName", ""))
valid_from = datetime.strptime(cert["notBefore"], "%b %d %H:%M:%S %Y %Z").strftime(
"%Y-%m-%d"
)
valid_until = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z").strftime(
"%Y-%m-%d"
)

cert_info = (
f"Domain: {domain}\n"
f"Issued To: {issued_to}\n"
f"Issued By: {issued_by}\n"
f"Valid From: {valid_from}\n"
f"Valid Until: {valid_until}"
)
return cert_info

主要作法是將憑證的資訊從字典中提取自己所需要的欄位,然後做一些格式上的轉換。

接著就可以到 TG Bot 來新增這條指令:

@bot.message_handler(commands=["cert_info"])
def send_cert_info(message):
_, get_domain = message.text.split(maxsplit=1)
cert = get_ssl_cert_info(get_domain, check_only=False)
cert_info = parse_ssl_cert_info(get_domain, cert)
bot.send_message(message.chat.id, cert_info)

如果解析失敗或是沒有輸入域名,也需要告訴使用者錯誤訊息:

@bot.message_handler(commands=["cert_info"])
def send_cert_info(message):
try:
_, get_domain = message.text.split(maxsplit=1)
except ValueError:
# 處理使用者沒有提供域名的情況
bot.send_message(message.chat.id, "請提供一個域名。例如:/cert_info example.com")
return
cert = get_ssl_cert_info(get_domain, check_only=False)
if cert is None or cert is False:
# 如果無法獲取證書,直接回傳錯誤訊息
error_message = f"域名錯誤: 無法獲取 {get_domain} 的 SSL 證書資訊。"
bot.send_message(message.chat.id, error_message)
else:
cert_info = parse_ssl_cert_info(get_domain, cert)
bot.send_message(message.chat.id, cert_info)

最後到 TG 輸入指令來確認一下:

確認輸出的資訊符合自己所需。

2. 新增 TG 指令 help

當指令比較多時,需要一個 help 指令來幫助我了解每個指令的用途,因此也可以新增一個簡單的 help 指令:

@bot.message_handler(commands=["help"])
def send_help_message(message):
help_text = """
以下是可用的命令列表及其用途:

/cert_info <domain> - 獲取指定域名的 SSL 證書資訊。
/get_all - 從 MongoDB 獲取所有域名環境的資訊。
/get <env> <domain> - 獲取指定環境下域名的資訊。
/edit <env> <old_domain> <new_domain> - 更新指定環境下域名的資訊。
/add <env> <domain> - 向 MongoDB 添加一個新的域名及其環境。
/del <env> <domain> - 從 MongoDB 刪除指定環境下的域名。
/bulk_add <env> <domain1> <domain2> ... - 向 MongoDB 批量添加多個域名及其環境。

請根據需要使用上述命令。
"""
bot.send_message(message.chat.id, help_text)

執行後的輸出會長這樣:

這樣當指令多時就可以比較方便顯示指令。

3. 調整參數讀取

這裡有兩種做法,一個是儲存到 config.yaml,然後進行讀取,另一個作法是寫成環境變數放在 docker-compose.yaml,然後透過啟動容器時映射到容器裡。

3-1. 透過 yaml 傳入參數 第一種做法比較直覺,把一些 Mongo 以及 TG 的相關參數放到 config.yaml,接著透過 yaml Library 進行讀取:

mongodb_uri: "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"
telegram_bot_token: "<YOUR-BOT-TOKEN>"
platform: "gcp"
telegram_group_id: "<YOUR-CHAT-ID>"
# config_loader.py
import yaml

def load_config():
with open("config.yaml", "r") as file:
return yaml.safe_load(file)

def get_mongodb_uri():
config = load_config()
return config.get("mongodb_uri", None)

def get_telegram_config():
config = load_config()
return {
"telegram_bot_token": config.get("telegram_bot_token", None),
"platform": config.get("platform", None),
"telegram_group_id": config.get("telegram_group_id", None)
}

3–2. 透過環境變數傳入參數 直接在 docker-compose.yaml 添加 env:

services
telegram_bot:
build: .
container_name: telegram_bot_container
depends_on:
- mongodb
environment:
TELEGRAM_BOT_TOKEN: <YOUR-BOT-TOKEN>
TELEGRAM_GROUP_ID: <YOUR-CHAT-ID>
MONGODB_URI: mongodb://rootuser:rootpass@mongodb:27017/mydatabase?authSource=admin
command: python main.py
restart: always

然後使用 os Library 進行調用:

import os

def get_mongodb_uri():
return os.getenv('MONGODB_URI', 'mongodb://localhost:27017/mydatabase')

def get_telegram_config():
return {
"telegram_bot_token": os.getenv('TELEGRAM_BOT_TOKEN', ''),
"platform": os.getenv('PLATFORM', 'local'),
"telegram_group_id": os.getenv('TELEGRAM_GROUP_ID', '')
}

最後就能到 main.py 進行調用:

from config_loader import get_telegram_config

telegram_config = get_telegram_config()
telegram_bot_token = telegram_config["telegram_bot_token"]
platform = telegram_config["platform"]
telegram_group_id = telegram_config["telegram_group_id"]

以上就是目前的優化,之後有想到會再回來補充。 下一篇會開始進行容器化的部分~

如果想看完整程式碼可以參考這裡 🔗 專案 repo –> ep5-optimize

分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.