🧵 一個小故事
在某個星期六的清晨,工程師小明被監控的訊息吵醒, Airflow 上的任務居然跑了 16 個小時還沒結束!
原來是某支資料轉換腳本卡在網路重連邏輯中,無限重試,一路跑進週末。
這時候小明邊泡咖啡邊想: 「要是早點加上 timeout,就不用週末加班了...」。🧠 為什麼需要 timeout?
- 防止任務卡住不結束(ex: 死循環、等待外部系統)。
- 提高資源使用效率(避免佔用 executor slot)。
- 提升排錯與監控效率(SLA 警示 + 自動重試)。
✅ 常見四種過期控制方法
1. execution_timeout(最推薦)
針對單一任務設定最長執行時間,超時會自動失敗:
from airflow.decorators import task
from datetime import timedelta
@task(execution_timeout=timedelta(minutes=5))
def extract_data():
2. 使用 default_args 統一設定所有任務 timeout
若你的 DAG 中任務數量多、結構一致,建議使用 default_args:
default_args = {
'execution_timeout': timedelta(minutes=15),
...
}
with DAG(
dag_id='daily_etl_pipeline',
default_args=default_args,
...
) as dag:
...
3. 設定 SLA(延遲通知,不會強制中止任務)
若想追蹤任務是否「在可接受時間內完成」,但不想強制 timeout,可以用 SLA:
@task(sla=timedelta(minutes=30))
def load_data():
...
SLA 可搭配 email、Slack 或 callback function 做延遲警示,不會終止任務執行。
4. 限制整個 DAG Run 的執行時間
有時候你想限制整條流程(DAG)不得超過某個時間,例如每日 ETL 最晚要在 2 小時內跑完:
DAG(
dag_id='hourly_summary',
dagrun_timeout=timedelta(hours=2),
...
)
💡 加值技巧
- 若加上
execution_timeout
的任務容易 timeout,可配合retries
+retry_delay
自動重試。 - 若是外部系統導致 delay,可考慮加入
timeout
或retry
參數在 API call 裡面(例如 requests 或 SQL hook)。 - 若要明確中止某任務,可加上
on_failure_callback
做通知或補償行為。
✅ 總結表格

結語
記得幫每個你「不確定會多久」的任務,加上 execution_timeout, 這不只是為了穩定系統,也是為了讓週末的你,可以安心睡覺 😴。