前一版本的不夠好用,然後好像也有點危險。所以再來修改一下
這次把 key 給拔出來了,先做 export OpenAI API 的 key
直接注入到系統的環境變數中
環境變數注入指令
macOS, zsh
echo 'export OPENAI_API_KEY="your_openai_api_key"' >> ~/.zshrc
source ~/.zshrc
Linux, bash
echo 'export OPENAI_API_KEY="your_openai_api_key"' >> ~/.bashrc
source ~/.bashrc
Windows PowerShell
# 只在目前視窗有效
$env:OPENAI_API_KEY = "your_openai_api_key"
python your_script.py
# 永久
setx OPENAI_API_KEY "your_openai_api_key"
程式碼
import os
import re
import json
import math
import tempfile
import subprocess
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple, Optional
from openai import OpenAI
# =============== 可調參數 ===============
INPUT_FILE = "250918_1704.mp3"
MODEL_NAME = "gpt-4o-mini-transcribe" # 或 "gpt-4o-transcribe"
LANGUAGE = "zh"
MAX_PART_MB = 20
MAX_MODEL_DURATION_SEC = 1400
MAX_SAFE_DURATION_SEC = 1300
TARGET_CHUNK_SEC = None
SILENCE_MIN_LEN = 0.6
SILENCE_THRESH_DB = -35
SILENCE_SEARCH_WINDOW = 3.5
# 轉成輕量母檔參數
LIGHT_SR = 16000 # 16kHz
LIGHT_BR = "64k" # 48~64k 都可
WORKERS = 4 # 3~5 之間視頻寬/速率調整
# =======================================
def sh(cmd: List[str]) -> subprocess.CompletedProcess:
return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
def ffprobe_json(path: Path) -> dict:
out = sh(["ffprobe","-v","error","-show_format","-show_streams","-print_format","json",str(path)]).stdout
return json.loads(out)
def get_duration(meta: dict) -> float:
if "format" in meta and "duration" in meta["format"]:
return float(meta["format"]["duration"])
for s in meta.get("streams", []):
if "duration" in s:
return float(s["duration"])
raise RuntimeError("no duration")
def get_bitrate_bps(meta: dict) -> Optional[float]:
br = meta.get("format", {}).get("bit_rate")
if br:
try: return float(br)
except: pass
for s in meta.get("streams", []):
if s.get("codec_type")=="audio" and "bit_rate" in s:
try: return float(s["bit_rate"])
except: pass
return None
def estimate_target_chunk_sec_by_size(path: Path, max_mb: int) -> int:
meta = ffprobe_json(path)
dur = get_duration(meta)
size_mb = path.stat().st_size / (1024*1024)
if size_mb <= max_mb: # 不切
return math.ceil(dur)
br = get_bitrate_bps(meta)
if not br:
mbps = size_mb / dur
return max(30, int((max_mb * 0.85) / mbps))
target_bits = max_mb * 1024 * 1024 * 8 * 0.85
sec = target_bits / br
return max(30, int(sec))
def parse_silence_points(path: Path) -> List[Tuple[float,float]]:
# 僅分析一次全檔
p = subprocess.run([
"ffmpeg","-i",str(path),
"-af",f"silencedetect=noise={SILENCE_THRESH_DB}dB:d={SILENCE_MIN_LEN}",
"-f","null","-"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stderr = p.stderr
starts = [float(x) for x in re.findall(r"silence_start:\s*([0-9.]+)", stderr)]
ends = [float(x) for x in re.findall(r"silence_end:\s*([0-9.]+)", stderr)]
pairs=[]
si=ei=0
while si<len(starts) and ei<len(ends):
if ends[ei] > starts[si]:
pairs.append((starts[si], ends[ei]))
si+=1; ei+=1
else:
ei+=1
return pairs
def find_near_silence(pairs: List[Tuple[float,float]], target: float, window: float)->Optional[float]:
low, high = target-window, target+window
cand=[]
for s,e in pairs:
if low<=s<=high: cand.append(s)
if low<=e<=high: cand.append(e)
return min(cand, key=lambda x:abs(x-target)) if cand else None
def plan_boundaries(src: Path) -> List[float]:
"""回傳切點清單(秒),不含 0,含檔尾;供 segment_times 使用。"""
meta = ffprobe_json(src)
total = get_duration(meta)
br = get_bitrate_bps(meta)
target = TARGET_CHUNK_SEC or estimate_target_chunk_sec_by_size(src, MAX_PART_MB)
target = min(target, MAX_SAFE_DURATION_SEC)
# 不需切
size_mb = src.stat().st_size / (1024*1024)
if size_mb <= MAX_PART_MB and total <= MAX_SAFE_DURATION_SEC:
return [total]
sil = parse_silence_points(src)
cuts=[]
cur=0.0
while cur<total:
ideal = min(total, cur + target)
# 以位元率預估,過大就縮
if br:
while ((ideal-cur)*br/8/1024/1024) > MAX_PART_MB and (ideal-cur)>60:
ideal -= 15
cut = find_near_silence(sil, ideal, SILENCE_SEARCH_WINDOW) or ideal
if cut - cur < 10: # 避免太短
cut = min(total, cur + 10)
cuts.append(cut)
if cut >= total: break
cur = cut
# 再保險:任何一段 > MAX_SAFE_DURATION_SEC 的,再二次細分均切
final=[]
prev=0.0
for c in cuts:
seg = c - prev
if seg <= MAX_SAFE_DURATION_SEC:
final.append(c)
else:
parts = math.ceil(seg / MAX_SAFE_DURATION_SEC)
step = seg / parts
for i in range(1, parts+1):
final.append(min(prev + i*step, c))
prev = c
# 去重/排序
uniq=sorted(set(round(x,3) for x in final if x>0))
if uniq[-1] < total: uniq[-1]=total
return uniq
def to_light_master(src: Path, dst: Path):
# 轉 mono/16k 低碼率,後面切檔與上傳更快
sh([
"ffmpeg","-y","-i",str(src),
"-ac","1","-ar",str(LIGHT_SR),
"-vn","-c:a","libmp3lame","-b:a",LIGHT_BR,
str(dst)
])
def split_once(src: Path, out_dir: Path, boundaries: List[float]) -> List[Path]:
out_dir.mkdir(parents=True, exist_ok=True)
# segment_times 需為逗號分隔秒數(不含 0)
times = ",".join(f"{t:.3f}" for t in boundaries[:-1]) if len(boundaries)>1 else ""
pattern = str(out_dir / "part_%03d.mp3")
cmd = ["ffmpeg","-y","-i",str(src),"-c","copy","-f","segment"]
if times:
cmd += ["-segment_times", times]
cmd += [pattern]
sh(cmd)
return sorted(out_dir.glob("part_*.mp3"))
def transcribe_one(client: OpenAI, path: Path) -> str:
# 簡單重試
for i in range(5):
try:
with open(path, "rb") as f:
resp = client.audio.transcriptions.create(
file=f, model=MODEL_NAME, language=LANGUAGE
# 想要字幕可加:response_format="srt"
)
return resp.text
except Exception as e:
import time
time.sleep(2**i * 0.5)
last=e
raise last
def hhmmss(sec: float)->str:
s=int(round(sec)); h=s//3600; m=(s%3600)//60; r=s%60
return f"{h:02d}:{m:02d}:{r:02d}"
def main():
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("請把 OPENAI_API_KEY 設在環境變數。")
client = OpenAI(api_key=api_key)
src = Path(INPUT_FILE)
if not src.exists():
raise FileNotFoundError(src)
meta = ffprobe_json(src)
total = get_duration(meta)
print(f"來源:{src.name} | 時長:{hhmmss(total)} | 大小:{src.stat().st_size/1024/1024:.2f} MB")
with tempfile.TemporaryDirectory() as td:
td = Path(td)
light = td / "light_master.mp3"
to_light_master(src, light)
boundaries = plan_boundaries(light) # 在輕量母檔上計算切點
print(f"將切成 {len(boundaries)} 段")
parts_dir = td / "parts"
parts = split_once(light, parts_dir, boundaries)
# 併發上傳
results = [""]*len(parts)
with ThreadPoolExecutor(max_workers=WORKERS) as ex:
futs = {ex.submit(transcribe_one, client, p): idx for idx,p in enumerate(parts)}
for fut in as_completed(futs):
idx = futs[fut]
text = fut.result()
# 簡單加上時間範圍頭
start = 0.0 if idx==0 else boundaries[idx-1]
end = boundaries[idx]
results[idx] = f"[{idx+1:02d}] {hhmmss(start)} → {hhmmss(end)}\n{text}\n"
final = "\n".join(results)
print("\n--- 合併結果 ---\n")
print(final)
if __name__ == "__main__":
main()
用 ffmpeg 統一音量
ffmpeg -i input.mp3 -af "loudnorm" normalized.mp3
解釋
整體目的
它是一個 自動音檔轉錄工具:
會把輸入的音檔壓縮、切段,再送到 OpenAI 的轉錄 API(Whisper 模型)進行文字轉換,最後合併成完整的逐段文字稿。
處理流程
- 讀取設定
指定要處理的檔案、使用的轉錄模型(例如 gpt-4o-mini-transcribe)、語言(例如中文 "zh")。
設定限制:單段大小上限 (MAX_PART_MB)、單段時間上限 (MAX_SAFE_DURATION_SEC)、靜音判定門檻等。 - 讀取檔案資訊
用ffprobe
抓出音檔的長度、位元率、大小。 - 建立「輕量母檔」
轉成單聲道、16kHz、低碼率 MP3,縮小檔案以加快後續切割與上傳。 - 決定切點
若整檔很小且不長,直接送轉錄。
否則:
依檔案大小與位元率估算每段時間長度。
在估計切點附近,嘗試對齊到「靜音點」,避免硬切在講話中間。
確保每段不會過短(至少 10 秒)或過長(超過上限則再細分)。 - 實際切檔
用ffmpeg
把音檔依照計畫切成 part_001.mp3、part_002.mp3… 這樣的分段檔。 - 上傳並轉錄
用 ThreadPoolExecutor 併發處理,多個分段同時送到 OpenAI API。
每段最多重試五次,避免網路或 API 一時失敗。 - 組合輸出
每段結果前面會標上段號與時間範圍,例如:[01] 00:00:00 → 00:12:34
這段的文字內容… - 最後把所有段落合併印出。
關鍵設計點
- 檔案大小與時長控制:避免單段太大導致 API 拒收。
- 靜音對齊切割:讓切點更自然,不會把一句話切斷。
- 輕量化轉檔:減少切檔與上傳所需的資源。
- 多工併發:縮短整體轉錄時間。
- 重試機制:提高穩定性。
使用方式
- 系統需安裝
ffmpeg
與ffprobe
。 - 設定環境變數
OPENAI_API_KEY
。 - 把
INPUT_FILE
換成你的音檔路徑,直接執行程式。 - 螢幕上會輸出轉錄結果。