前篇已經將 TG Bot 整合 MongoDB,現在要將一些函數修改,在對 DB 進行操作前,先對 domain 進行驗證操作。
之前在撰寫 CloudFunction 時有使用到 get_ssl_cert_expiry_date
函數,在這裡先對這個函數做一個簡單的修改,將驗證憑證是否有效以及取得過期時間拆成兩個函數:
def get_ssl_cert_info(domain, check_only=False):
ssl_context = ssl.create_default_context()
with ssl_context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=domain) as conn:
conn.settimeout(3.0)
try:
conn.connect((domain, 443))
if check_only:
return True
else:
return conn.getpeercert()
except Exception as e:
print(f"無法獲取 {domain} 的 SSL,錯誤:{e}")
return False if check_only else None
def get_ssl_cert_expiry_date(cert):
if cert is None:
return None
try:
expire_date = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z")
return expire_date
except Exception as e:
print(f"Error parsing SSL certificate's expiry date: {e}")
return None
接著在 handle_add_command
新增這個函數做驗證:
@bot.message_handler(commands=["add"])
def handle_add_command(message):
try:
_, env, domain = message.text.split(maxsplit=2)
except ValueError:
bot.reply_to(
message, "使用方式不正確。請按照以下格式輸入:\n/add <env> <domain>"
)
return
# 驗證憑證
if get_ssl_cert(domain,check_only=True):
add_successful = add_domain_to_mongodb(collection, env, domain)
if add_successful:
bot.reply_to(message, f"域名添加成功\n環境為{env}\ndomain為{domain}")
else:
bot.reply_to(message, "域名添加失敗,請檢查輸入的數據。")
else:
bot.reply_to(message, "證書檢查失敗,請檢查輸入的域名是否正確。")
這樣就完成了添加前的驗證了,接著來處理對所有的 domain 進行檢查過期時間。
2. 透過 TG Bot 檢查所有 domain 是否有過期
這裡會用到之前在撰寫 CloudFunction 時使用的 check_ssl_expiration
以及 send_notification
兩個函數,多添加一個參數,改成傳入憑證取得過期時間,我們先稍微修改一下:
def check_ssl_expiration(
domain, cert, env, platform, telegram_bot_token, telegram_group_id
):
expire_date = get_ssl_cert_expiry_date(cert)
if expire_date:
remaining_days = (expire_date - datetime.utcnow()).days
if remaining_days <= 30:
message = "\n".join(
[
"來源: Python",
"標題: 憑證到期",
f"域名: {domain}",
f"到期日: {expire_date.strftime('%Y-%m-%d')}",
f"平台: {platform}",
f"環境: {env}",
]
)
print(f"{domain} 的 SSL 證書將在 {remaining_days} 天內過期。")
send_notification(message, domain, telegram_bot_token, telegram_group_id)
else:
print(
f"{domain} 的 SSL 證書過期日期是 {expire_date.strftime('%Y-%m-%d')}。"
)
def send_notification(message, domain, telegram_bot_token, telegram_group_id):
telegram_send_message_url = (
f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage"
)
response = requests.post(
telegram_send_message_url, data={"chat_id": telegram_group_id, "text": message}
)
if response.status_code == 200:
print(f"已為 {domain} 發送通知")
else:
print(f"為 {domain} 發送失敗")
接著新增一個新檔案叫做 scheduler_jobs.py
,然後將之前的 check_ssl_cloud_function
重新命名並稍微做個修改:
def perform_ssl_checks(
bot, collection, chat_id, platform, telegram_bot_token, telegram_group_id
):
domain_data = load_domain_envs_from_mongodb(collection)
for env, domains in domain_data.items():
for domain in domains:
cert = get_ssl_cert(domain, check_only=False)
check_ssl_expiration(
domain, cert, env, platform, telegram_bot_token, telegram_group_id
)
bot.send_message(
chat_id, "所有域名的 SSL 到期時間檢查完成。", parse_mode="Markdown"
)
再來可以將 perform_ssl_checks
進行包裝:
def setup_scheduler(
bot, collection, chat_id, platform, telegram_bot_token, telegram_group_id
):
schedule.every(10).seconds.do(
perform_ssl_checks,
bot,
collection,
chat_id,
platform,
telegram_bot_token,
telegram_group_id,
)
最後就是在 main
裡進行調用:
import schedule
mongodb_uri = "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"
telegram_bot_token = "your-bot-token"
bot = telebot.TeleBot(telegram_bot_token)
platform = "gcp"
telegram_group_id = "your-chat-id"
def run_schedule():
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
client = init_mongo_client(mongodb_uri)
collection_name = "domains"
collection = get_collection(client, collection_name)
setup_scheduler(
bot, collection, telegram_group_id, platform, telegram_bot_token, telegram_group_id
)
# 創建一個 Thread 物件,target 指定了這個線程要執行的函數
t = threading.Thread(target=run_schedule)
# 啟動線程
t.start()
# 主線程繼續執行 Telegram bot 的無限輪詢
bot.infinity_polling()
這裡需注意,因為需要同時執行多個任務,像是定時檢查域名或是當有指令檢查所有域名,如果剛好某個操作比較耗時,會等到操作完,才執行下一個操作,因此需要使用到 threading
Library,讓這些操作都可以在後台執行,不影響主程序,文件在這。
使用的原因為:
threading
的情況下,如果你的程式碼先執行定時任務的無限循環(如 run_schedule
函數),那麼程式將永遠停留在那個循環中,後面的代碼(如啟動 Telegram bot 的代碼)將永遠不會執行。使用 threading
可以讓定時任務在一個獨立的線程中運行,從而不會阻塞主線程上的其他操作。threading
允許多個操作同時進行,從而提高程式的執行效率。特別是在進行網路請求或者其他需要等待的操作時,多線程可以在等待時處理其他任務,提高資源利用率。執行後可以在 terminal 看到如下資訊:
TG 會顯示資訊如圖:
而如果 30 天內快到期,顯示的資訊如圖:
TG 資訊如圖:
到此,基本的程式碼算是都完成了,下一篇會做簡單的整理程式碼,稍微優化。
如果想看完整程式碼可以參考這裡 🔗 專案 repo –> ep4-cert