自動化SSL憑證監控與到期通知系統

更新於 發佈於 閱讀時間約 19 分鐘
raw-image

👨‍💻簡介

最近因為憑證越來越多,需要監控什麼時候到期,當到期時發送到期通知,因此撰寫一個簡單的小程式來完成。

這次使用Python和Telegram Bot來監控SSL證書的到期時間並發送通知。並使用GCP工具,如CloudFunction和CloudScheduler做部署平台。

🛠️使用工具

  • Python 3.9
  • Telegram Bot(Webhook)
  • CloudFunction
  • CloudScheduler

📝功能需求

  1. 取得憑證到期時間
  2. 到期後發送通知
  3. 透過 Telegram Bot 發送訊息
  4. 讀取 yaml domain list
  5. 設定環境變數
  6. CloudFunction 設定
  7. CloudScheduler 排程設定

🎯Setup

1. 取得憑證到期時間

def get_ssl_cert_expiry_date(domain):
"""
取得 SSL 證書的過期日期。

參數:
domain (str): 需要檢查SSL證書過期時間的域名。

返回:
datetime: SSL證書的過期日期,如果獲取失敗則返回None。
"""
# 建立SSL上下文,建立一個安全的“環境”來管理SSL設定和操作
ssl_context = ssl.create_default_context()
# 包裝socket對象,將基礎的socket通訊轉變為加密通訊
conn = ssl_context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=domain)
# 設定連接的超時時間為 3 秒,防止長時間等待
conn.settimeout(3.0)
try:
# 通過加密的連線嘗試連接到服務器的443端口(HTTPS
conn.connect((domain, 443))
# 取得服務器的SSL證書訊息
ssl_info = conn.getpeercert()
# 解析證書中的過期時間
expire_date = datetime.strptime(ssl_info['notAfter'], '%b %d %H:%M:%S %Y %Z')
return expire_date
except Exception as e:
# 處理連接或取得證書訊息過程中的異常
print(f"無法獲取 {domain} 的 SSL 證書過期日期,錯誤:{e}")
return None
finally:
# 確保無論成功與否,都關閉與服務器的連接
conn.close()

這部分使用了 python 的 ssl 以及 socket library,他們幫助我能夠建立安全的加密環境,以及使用 socket 進行通訊,使我能夠取得憑證相關資訊。

補充一下 datetime.strptime,因為從 ssl_info dict 解析後的 notAfter 會回傳 'Mar 4 06:35:50 2024 GMT,因此需要透過 datetime.strptime,將字串解析成 datetime 對象。

字符串格式如下:

  • %b:月份的縮寫名稱,如 Jan, Feb, Mar 等。
  • %d:月份中的天數,為 01 到 31。
  • %H:小時(24 小時制),從 00 到 23。
  • %M:分鐘,從 00 到 59。
  • %S:秒,從 00 到 59。
  • %Y:4 位數的年份,如 2024。
  • %Z:時區名稱,如 UTC 或 EST 等。

對應欄位:

Mar  4 06:35:50 2024 GMT
月 日 時 分 秒 年 時區
%b %d %H:%M:%S %Y %Z

至此,取得憑證過期時間完成,接下來需要建立一個 function 幫助我判斷當小於 30 天時,需要發送通知,我們往第二部分進行。

2. 到期後發送通知

邏輯判斷會需要讓返回的時間能夠與當前時間做比較,當小於等於 30 天時,則會發送通知。

def check_ssl_expiration(domain):
"""
檢查給定域名的SSL證書過期時間,並在證書即將過期時印出。

參數:
domain (str): 需要檢查SSL證書的域名。
"""
# 調用先前定義的函數get_ssl_cert_expiry_date來獲取SSL證書的過期日期。
expire_date = get_ssl_cert_expiry_date(domain)
# 如果成功取得到過期日期
if expire_date:
# 計算證書的剩餘有效天數
remaining_days = (expire_date - datetime.utcnow()).days
# 如果剩餘天數不超過30天
if remaining_days <= 30:
# 印出一條log訊息,說明證書將在指定天數內過期
print(f"{domain} 的 SSL 證書將在 {remaining_days} 天內過期。")
else:
# 如果證書的剩餘有效期超過30天,則打印證書的過期日期
print(f"{domain} 的 SSL 證書過期日期是 {expire_date.strftime('%Y-%m-%d')}。")

補充說明一下 datetime 以及 strftime 的用法:

  • datetime.utcnow() 這個 library 在這裡主要取得 UTC 的當前日期和時間
>>> from datetime import datetime
>>> datetime.utcnow()
## 返回一個`datetime.datetime`對象,這個對象包含了當前UTC時區的年、月、日、小時、分、秒,以及微秒。
datetime.datetime(2024, 4, 1, 15, 34, 21, 211636)

代表執行 datetime.utcnow() 時,被執行的時間是 2024 年 4 月 1 日,下午 3 點 34 分 21 秒,以及 211636 微秒。

  • strftime()

這個方法主要用來將時間對象轉為字串,方便組成要傳送的訊息。 "%Y-%m-%d" 是 strftime 方法的格式化字串參數,其中 %Y 表示 4 位數的年份,%m表示月份(01至12),%d 表示月份中的天數(01 至 31)。

整段 function 功能描述為:首先,使用 datetime.utcnow() 取得當前 UTC 時間,然後與 expire_date 進行相減,計算出證書的剩餘有效天數。

如果這個剩餘天數不超過 30 天,就印出一條 log 訊息,說明證書將在指定天數內過期;否則印出證書的過期日期。

3. 發送訊息到 telegram group

def send_notification(message, domain, telegram_bot_token, telegram_group_id):
"""
通過指定的Webhook URL發送通知。

參數:
message (dict): 要發送的消息內容,格式為字典。
domain (str): 域名,用於發送消息中表示哪個域名的SSL證書。
telegram_token (str): 要使用的telegram bot token。
telegram_group_id (str): 指定發送消息的群組。
"""
telegram_send_message_url = f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage"
# 向telegram api發送HTTP POST請求。
response = requests.post(telegram_send_message_url, data={
"chat_id": telegram_group_id,
"text": message
})

# 檢查響應的狀態碼。如果狀態碼為200,表示通知發送成功。
if response.status_code == 200:
print(f"已為 {domain} 發送通知")
else:
# 如果狀態碼不是200,表示發送失敗。
print(f"為 {domain} 發送失敗")

將 send_notification 添加到 check_ssl_expiration,這裡應該有更好的作法。

def check_ssl_expiration(domain, env, platform, telegram_bot_token, telegram_group_id):
"""
檢查給定域名的SSL證書過期時間,並在證書即將過期時通過webhook發送通知。

參數:
domain (str): 需要檢查SSL證書的域名。
env (str): 環境標籤(例如:開發、測試、正式),用於消息中以區分不同環境。
platform (str): 平台標籤(例如:AWS、GCP、Azure),用於消息中以標明證書部署的平台。
telegram_token (str): 要使用的telegram bot token。
telegram_group_id (str): 指定發送消息的群組。
"""
expire_date = get_ssl_cert_expiry_date(domain)
if expire_date:
remaining_days = (expire_date - datetime.utcnow()).days
if remaining_days <= 30:
# 建立發送訊息,包含了證書到期的相關信息
message = "\n".join([
"來源: Gitlab-Runner",
"標題: 憑證到期",
f"域名: {domain}",
f"到期日: {expire_date.strftime('%Y-%m-%d')}",
f"平台: {platform}",
f"環境: {env}",
])

print(f"{domain} 的 SSL 證書將在 {remaining_days} 天內過期。")
send_notification(message, domain, telegram_bot_token, telegram_group_id)
else:
print(f"{domain} 的 SSL 證書過期日期是 {expire_date.strftime('%Y-%m-%d')}。")

這裡我使用的是簡單的 telegram bot,發送後會呈現如下資訊。

raw-image

4. 讀取 yaml domain list

假設 domain.yaml 如下

domain_envs:
live:
- google.com
- en.wikipedia.org
def load_data_from_yaml(yaml_file_path, key):
"""
從YAML檔案加載指定鍵的配置。

參數:
yaml_file_path (str): YAML檔案的路徑。
key (str): 要從YAML檔案中讀取的鍵名。
返回:
dict or None: 返回從YAML檔案中讀取的配置字典。如果指定的鍵不存在,則返回空字典。
"""
try:
# 嘗試打開指定的YAML檔案。'r'表示以讀取模式打開,'encoding='utf-8''確保文件正確讀取UTF-8編碼的內容。
with open(yaml_file_path, 'r', encoding='utf-8') as file:
# 使用yaml.safe_load(file)安全地加載YAML檔案的內容。
# 此函數將YAML檔案的結構轉換為Python數據類型(通常是字典)。
data = yaml.safe_load(file)
# 嘗試從加載的數據中獲取特定鍵(key)的值。
# 如果鍵不存在,則默認返回一個空字典{}。
return data.get(key, {})
except FileNotFoundError as e:
# 如果嘗試打開的YAML檔案不存在,則捕獲FileNotFoundError異常。
logging.error(f"YAML檔案未找到: {e}")
# 錯誤日誌記錄後,返回一個空字典{},表示沒有加載到任何數據。
return {}
except Exception as e:
# 如果在加載或處理YAML檔案時發生了其他任何異常,則捕獲通用異常。
logging.error(f"讀取YAML檔案時發生錯誤: {e}")
# 同樣記錄錯誤日誌並返回一個空字典{}。
return {}

yaml_file_path = 'config.yaml'
domain_envs = load_data_from_yaml(yaml_file_path, 'domain_envs')

這裡主要使用 yaml 進行一些讀取操作,使我可以獲取到 domain_envs。 印出的格式如下:

{'live': ['google.com', 'en.wikipedia.org']}

而要取得到 env 以及各個 domain,則使用 for 迴圈進行迭代,將每個 item 都取得;而 domain 因為有多個所以會再使用一個 for 迴圈進行迭代,並一一進行確認證書狀態。

for env, domains in domain_envs.items():
for domain in domains:
check_ssl_expiration(domain, env, platform, telegram_bot_token, telegram_group_id)

5. 設定環境變數

這部分是使用 os.environ.get 方法,取得環境變數的值

def get_env_variable(name, default_value="未設定"):
"""
從系統環境變數中取得一個值。

參數:
name (str): 環境變數的鍵名。
default_value (str): 如果找不到鍵,則返回的預設值。預設為"未設定"。

返回:
str: 環境變數的值,或者在找不到鍵時返回預設值。
"""
return os.environ.get(name, default_value)

接著在入口函數設定,取得相關環境變數

platform = get_env_variable("PLATFORM")
telegram_bot_token = get_env_variable("TELEGRAM_BOT_TOKEN")
telegram_group_id = get_env_variable("TELEGRAM_GROUP_ID")

6. cloudfunction設定

  • 觸發條件選擇 HTTPS,並且須通過驗證
raw-image
  • 執行環境變數,需要將原本從系統取得的環境變數在這裡設定
raw-image
  • 入口函數需要將原先寫在 main function 的部分移到這裡
raw-image

設定好後可以測試看看是否正常,可在紀錄查看執行結果

raw-image

7. cloudscheduler 排程設定

要先設定 IAM,允許可呼叫 cloud function,先建立好 service account 後,加入兩個角色,Cloud Functions 管理員以及 Cloud Run 叫用者。

接著就能到 cloudscheduler 進行相關設定。

raw-image

這樣就完成了監控憑證的小程式了。

🔗專案repo –> ssl-certificate-checker

📚Reference

avatar-img
17會員
83內容數
golang
留言0
查看全部
avatar-img
發表第一個留言支持創作者!
Alan的開發者天地 的其他內容
引言 在當今的技術世界中,Kubernetes 已成為容器化應用的領導平台。作為一個高效的容器編排系統,它不僅管理著容器的部署和擴展,還提供了必要的自動化支持,以保證應用的高可用性和性能。在這個框架中,自動擴展功能起著至關重要的作用,特別是在面對不斷變化的負載和需求時。
👨‍💻簡介 在當今的雲計算時代,容器化和微服務架構成為了重要趨勢。Kubernetes,作為領先的容器編排平台,提供了強大的功能來管理和部署應用程式。然而,隨著應用程式和用戶的增加,有效管理誰可以對 Kubernetes 集群執行何種操作變得至關重要。
👨‍💻簡介 terraform在每次執行terraform plan或terraform apply時,是如何知道應該要管理哪些資源? 其實就是透過在每次執行terraform時,將建立或要變更的資源都記錄在terraform.state這份狀態檔,預設檔案使用JSON格式。
📔心得 最近,我在探索 Ansible 自動化工具的過程中,決定運用它來建立 ELK Stack,這是我之前使用 Docker 建立的經驗的延伸。在這個過程中,我想分享一下我的學習心得。
在 Kubernetes 裡,Secret 就像是一個保險箱,可以放你任何不想公開的東西。比如說密碼、API 金鑰、憑證等,這樣的資料可能會被放在 Pod 裡,但你可以用 Secret 來避免直接在應用程式的程式碼中暴露這些機密資料。
👨‍💻簡介 今天早上在下kubectl get pods時,突然跳出了以下錯誤 Unable to connect to the server: x509: certificate has expired or is not yet valid
引言 在當今的技術世界中,Kubernetes 已成為容器化應用的領導平台。作為一個高效的容器編排系統,它不僅管理著容器的部署和擴展,還提供了必要的自動化支持,以保證應用的高可用性和性能。在這個框架中,自動擴展功能起著至關重要的作用,特別是在面對不斷變化的負載和需求時。
👨‍💻簡介 在當今的雲計算時代,容器化和微服務架構成為了重要趨勢。Kubernetes,作為領先的容器編排平台,提供了強大的功能來管理和部署應用程式。然而,隨著應用程式和用戶的增加,有效管理誰可以對 Kubernetes 集群執行何種操作變得至關重要。
👨‍💻簡介 terraform在每次執行terraform plan或terraform apply時,是如何知道應該要管理哪些資源? 其實就是透過在每次執行terraform時,將建立或要變更的資源都記錄在terraform.state這份狀態檔,預設檔案使用JSON格式。
📔心得 最近,我在探索 Ansible 自動化工具的過程中,決定運用它來建立 ELK Stack,這是我之前使用 Docker 建立的經驗的延伸。在這個過程中,我想分享一下我的學習心得。
在 Kubernetes 裡,Secret 就像是一個保險箱,可以放你任何不想公開的東西。比如說密碼、API 金鑰、憑證等,這樣的資料可能會被放在 Pod 裡,但你可以用 Secret 來避免直接在應用程式的程式碼中暴露這些機密資料。
👨‍💻簡介 今天早上在下kubectl get pods時,突然跳出了以下錯誤 Unable to connect to the server: x509: certificate has expired or is not yet valid
你可能也想看
Google News 追蹤
Thumbnail
嘿,大家新年快樂~ 新年大家都在做什麼呢? 跨年夜的我趕工製作某個外包設計案,在工作告一段落時趕上倒數。 然後和兩個小孩過了一個忙亂的元旦。在深夜時刻,看到朋友傳來的解籤網站,興致勃勃熬夜體驗了一下,覺得非常好玩,或許有人玩過了,但還是想寫上來分享紀錄一下~
Thumbnail
寫作投資永遠不嫌晚,這是雙寶老爹的第#156篇關於加密貨幣教學文章,文未還有更多精彩的教學內容!
Thumbnail
在前一篇我們已經成功地建立簽核表單及簽核節點並關聯回請假表單,而本篇會接著介紹如何管理簽核節點狀態並同步更新簽核表單狀態。
Thumbnail
嘿,大家新年快樂~ 新年大家都在做什麼呢? 跨年夜的我趕工製作某個外包設計案,在工作告一段落時趕上倒數。 然後和兩個小孩過了一個忙亂的元旦。在深夜時刻,看到朋友傳來的解籤網站,興致勃勃熬夜體驗了一下,覺得非常好玩,或許有人玩過了,但還是想寫上來分享紀錄一下~
Thumbnail
寫作投資永遠不嫌晚,這是雙寶老爹的第#156篇關於加密貨幣教學文章,文未還有更多精彩的教學內容!
Thumbnail
在前一篇我們已經成功地建立簽核表單及簽核節點並關聯回請假表單,而本篇會接著介紹如何管理簽核節點狀態並同步更新簽核表單狀態。