def detect_pingpong_patterns(day_df: pd.DataFrame, price_col='收盤', threshold=0.4) -> pd.DataFrame:
# ... (函式內容保持不變) ...
df = day_df.sort_values('日期').reset_index(drop=True).copy()
if price_col not in df.columns:
return pd.DataFrame()
df['pct_change'] = df[price_col].pct_change()
rows = []
for i in range(1, len(df)-1):
prev_change = df.loc[i, 'pct_change']
next_change = df.loc[i+1, 'pct_change']
if pd.notna(prev_change) and pd.notna(next_change):
if abs(prev_change) > threshold and abs(next_change) > threshold and prev_change * next_change < 0:
rows.append({
'日期(異常日)': df.loc[i, '日期'],
'前日收盤': df.loc[i-1, price_col],
'當日收盤': df.loc[i, price_col],
'次日收盤': df.loc[i+1, price_col],
'前日變動(%)': round(prev_change * 100, 2),
'次日變動(%)': round(next_change * 100, 2),
})
return pd.DataFrame(rows)
def _process_one_file(path: str):
try:
df_day = _load_day_clean_full(path)
# 僅保留 2024–2025(含起始緩衝)
df_day = _clip_by_date_range(df_day, col='日期',
start=DATE_START, end=DATE_END, pad_start=DATE_PAD_START)
stem = Path(path).stem
raw_id, _ = _parse_id_name(stem)
cid = _canonical_id(raw_id) # StockID 必須在這裡定義
if df_day.empty or df_day['日期'].max() < DATE_PAD_START:
return "SKIP", {'StockID': cid, 'file': Path(path).name,
'count': 0, 'reason': 'out_of_range'}, f"{Path(path).name}: out_of_range"
if SKIP_SUSPECT_STOCK:
suspects = detect_pingpong_patterns(df_day, threshold=PINGPONG_THRESHOLD)
if not suspects.empty:
msg = f"{Path(path).name}: [SKIP_PINGPONG] 命中疑似除權錯位 {len(suspects)} 筆"
# 將 cid 傳入 payload
return "SKIP", {'StockID': cid, 'file': Path(path).name, 'count': len(suspects), 'reason': 'suspect_pingpong'}, msg
# 週/月/年K 轉換
dfw = _resample_ohlc_with_flags(df_day, 'W'); dfw = _add_period_returns(dfw, 'W'); dfw['成交量'] = dfw['成交量'].astype('float64'); dfw['StockID'] = cid
dfm = _resample_ohlc_with_flags(df_day, 'M'); dfm = _add_period_returns(dfm, 'M'); dfm['成交量'] = dfm['成交量'].astype('float64'); dfm['StockID'] = cid
dfy = _resample_ohlc_with_flags(df_day, 'Y'); dfy = _add_period_returns(dfy, 'Y'); dfy['成交量'] = dfy['成交量'].astype('float64'); dfy['StockID'] = cid
# 週/月/年K 只留 2024–2025(不需要 pad)
for _df in (dfw, dfm, dfy):
_df.dropna(subset=['日期'], inplace=True)
_df['日期'] = pd.to_datetime(_df['日期']).dt.tz_localize(None)
dfw = dfw[(dfw['日期'] >= DATE_START) & (dfw['日期'] <= DATE_END)]
dfm = dfm[(dfm['日期'] >= DATE_START) & (dfm['日期'] <= DATE_END)]
dfy = dfy[(dfy['日期'] >= DATE_START) & (dfy['日期'] <= DATE_END)]
return True, (dfw, dfm, dfy), None
except Exception as e:
return False, None, f"{Path(path).name}: {e}"
def _split_into_shards(items, shard_size):
return [items[i:i+shard_size] for i in range(0, len(items), shard_size)] if shard_size > 0 else [items]
def build_wmy_parquets(market_key: str):
base_dir = LOCAL_BASE if WRITE_MODE in ('A','B') else DRIVE_BASE
market_abbr = MARKET_ABBR_MAP.get(market_key, market_key.replace('-share','').upper())
# 1) 本地快取
local_dayk_dir = _prepare_local_dayk(market_key)
if not local_dayk_dir:
return None, None, None
# 2) 輸出路徑
out_week = f"{base_dir}/{market_key}/weekK_{market_abbr}.parquet"
out_month = f"{base_dir}/{market_key}/monthK_{market_abbr}.parquet"
out_year = f"{base_dir}/{market_key}/yearK_{market_abbr}.parquet"
Path(f"{base_dir}/{market_key}").mkdir(parents=True, exist_ok=True)
# 3) 檔案清單
files = sorted(glob.glob(f"{local_dayk_dir}/*.csv"))
if not files:
print(f"❌ 快取路徑 {local_dayk_dir} 內無檔案,程式終止。")
return None, None, None
shards = _split_into_shards(files, SHARD_SIZE)
print(f"\n📦 [{market_key}] 共 {len(files)} 檔,分 {len(shards)} 批處理")
# 調整欄位列表以包含 IsFiltered_QA
base_front = ['StockID','日期','開盤','最高','最低','收盤','成交量']
base_common_wmy = ['CurPeriod_Days','PrevPeriod_Days', 'HasResume', 'IsFiltered_QA']
base_common_w = ['CurWeek_Days','PrevWeek_Days', 'HasResume']
ret_cols = [f'PrevC_{f}' for f in ['W','M','Y']] + [f'Ret_Gap_{f}' for f in ['W','M','Y']] + \
[f'Ret_Trad_{f}' for f in ['W','M','Y']] + [f'Ret_C_{f}' for f in ['W','M','Y']] + \
[f'Ret_Max_H_{f}' for f in ['W','M','Y']] + [f'Ret_Min_L_{f}' for f in ['W','M','Y']] + \
[f'Range_{f}' for f in ['W','M','Y']] + [f'Ret_Max_H_Pos_{f}' for f in ['W','M','Y']] + \
[f'Ret_H_from_O_{f}' for f in ['W','M','Y']] + [f'Ret_L_from_O_{f}' for f in ['W','M','Y']] + \
[f'Ret_End_RelH_{f}' for f in ['W','M','Y']] + [f'Ret_End_RelL_{f}' for f in ['W','M','Y']]
ret_cols = sorted(list(set(ret_cols)))
wk_cols = base_front + base_common_w + [c for c in ret_cols if c.endswith('_W')] + ['ISO_Week']
mo_cols = base_front + base_common_wmy + [c for c in ret_cols if c.endswith('_M')]
yr_cols = base_front + base_common_wmy + [c for c in ret_cols if c.endswith('_Y')]
# 移除重複的 Period/Week 欄位
mo_cols = [c for c in mo_cols if not c.startswith('CurWeek') and not c.startswith('PrevWeek')]
yr_cols = [c for c in yr_cols if not c.startswith('CurWeek') and not c.startswith('PrevWeek')]
wk_cols = [c for c in wk_cols if not c.startswith('CurPeriod') and not c.startswith('PrevPeriod') and c != 'IsFiltered_QA']
w_writer = ParquetStreamer(out_week, keep_cols=wk_cols)
m_writer = ParquetStreamer(out_month, keep_cols=mo_cols)
y_writer = ParquetStreamer(out_year, keep_cols=yr_cols)
total_ok = total_fail = 0
skip_records = []
t0 = time.time()
for si, shard in enumerate(shards, 1):
print(f"🧩 Shard {si}/{len(shards)}: {len(shard)} 檔")
shard_w, shard_m, shard_y = [], [], []
# [***修正點: 將 futs 和 as_completed 移入 with 區塊***]
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
# 建立 futures 字典,用於追蹤檔案路徑
futs = {ex.submit(_process_one_file, p): p for p in shard}
# 遍歷 futures
for f in as_completed(futs):
path = futs[f] # 從字典中獲取當前檔案路徑
try:
status, payload, err = f.result()
except Exception as e:
# 處理執行緒層級的極端錯誤
total_fail += 1
err = f"{Path(path).name}: Thread execution failed ({e})"
if total_fail <= 20:
print(" ❌", err)
continue
if status is True:
dfw, dfm, dfy = payload
shard_w.append(dfw); shard_m.append(dfm); shard_y.append(dfy)
total_ok += 1
elif status == "SKIP":
info = payload or {}
# 從 payload 中安全獲取 StockID
stock_id = info.get('StockID', Path(path).stem.split("_")[0])
skip_records.append({'StockID': stock_id, 'file': Path(path).name,
'reason': info.get('reason', 'suspect_pingpong'), 'count': info.get('count', 0)})
if total_fail + total_ok + len(skip_records) <= 20 and err:
print(" ⏭️", err)
else:
total_fail += 1
if total_fail <= 20 and err:
print(" ⚠️", err)
if shard_w: w_writer.append_df(pd.concat(shard_w, ignore_index=True))
if shard_m: m_writer.append_df(pd.concat(shard_m, ignore_index=True))
if shard_y: y_writer.append_df(pd.concat(shard_y, ignore_index=True))
time.sleep(0.5 + random.random()*0.5)
w_writer.close(); m_writer.close(); y_writer.close()
print(f"⏱️ 批次耗時:{time.time() - t0:.1f} 秒")
if skip_records:
qa_dir = Path(f"{DRIVE_BASE}/{market_key}/_qa")
qa_dir.mkdir(parents=True, exist_ok=True)
pd.DataFrame(skip_records).to_csv(qa_dir / "skip_pingpong.csv", index=False, encoding='utf-8-sig')
print(f"🧹 已輸出略過名單:{qa_dir / 'skip_pingpong.csv'}({len(skip_records)} 檔)")
market_abbr = MARKET_ABBR_MAP.get(market_key, market_key.replace('-share','').upper())
final_week = f"{DRIVE_BASE}/{market_key}/weekK_{market_abbr}.parquet"
final_month = f"{DRIVE_BASE}/{market_key}/monthK_{market_abbr}.parquet"
final_year = f"{DRIVE_BASE}/{market_key}/yearK_{market_abbr}.parquet"
if WRITE_MODE == 'A':
for src, dst in [(out_week, final_week), (out_month, final_month), (out_year, final_year)]:
if Path(src).exists():
Path(dst).parent.mkdir(parents=True, exist_ok=True)
Path(dst).unlink(missing_ok=True)
Path(src).replace(dst)
print("📤 A 模式:已將本機 Parquet 移動到 Google Drive")
elif WRITE_MODE == 'B':
for src, dst in [(out_week, final_week), (out_month, final_month), (out_year, final_year)]:
if Path(src).exists():
Path(dst).parent.mkdir(parents=True, exist_ok=True)
copy2(str(src), str(dst))
print("📤 B 模式:已將本機 Parquet 複製到 Google Drive(本機仍保留)")
else: # WRITE_MODE == 'C'
final_week, final_month, final_year = out_week, out_month, out_year
print("☁️ C 模式:直接寫入 Google Drive")
return final_week, final_month, final_year
# --------------------------------------------------------------------------------
# Colab Cell 4: QA 稽核函式 (Part 4) - QA 僅作為定義,不會被呼叫
# --------------------------------------------------------------------------------
def _geom_chain_return(day_df: pd.DataFrame, start, end) -> float:
# ... (函式內容保持不變) ...
sub = day_df[(day_df['日期'] >= start) & (day_df['日期'] <= end)].copy().sort_values('日期')
c = sub['收盤'].to_numpy(dtype='float64')
if len(c) < 2 or not np.all(np.isfinite(c)) or np.any(c <= 0):
return np.nan
return float(np.prod(c[1:] / c[:-1]) - 1.0)
def _week_bins():
# ... (函式內容保持不變) ...
edges = [-1000, -200, -100, -90, -80, -70, -60, -50, -40, -30, -20, -10, 0,
10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 200, 1000, 10000]
labels = [f"{edges[i]}~{edges[i+1]}%" for i in range(len(edges)-1)]
return edges, labels
def audit_weekly_parquet(market_key: str, week_path: str):
# 此函式不會被呼叫
pass
def audit_weekly_vs_daily(market_key: str):
# 此函式不會被呼叫
pass
def audit_weekly_high_consistency(market_key: str, week_path: str):
# 此函式不會被呼叫
pass
def audit_weekly_high_tail(market_key: str, week_path: str):
# 此函式不會被呼叫
pass
def audit_weekly_nan_filter(market_key: str, week_path: str):
# 此函式不會被呼叫
pass
def audit_monthly_yearly_filter(market_key: str, month_path: str, year_path: str):
# 此函式不會被呼叫
pass
# --------------------------------------------------------------------------------
# Colab Cell 5: 主流程執行 (Part 5) - 已移除 QA 呼叫
# --------------------------------------------------------------------------------
if __name__ == "__main__":
print(f"⏳ 僅處理期間:{DATE_START_STR} ~ {DATE_END_STR}(含起始緩衝 {PAD_DAYS} 天)")
print(f"⚠️ 月K/年K 報酬過濾已啟用:小於 {MIN_DAYS_MONTH} / {MIN_DAYS_YEAR} 交易日,或遇停牌/極端跳空報酬,該期報酬將被設為 NaN。")
print(f"🛠️ [時區修正已啟用] 修正月份邊界錯誤(如 9/1 誤算進 8 月)。")
for MK in MARKET_LIST:
print(f"\n{'='*20} 處理市場:{MK}(W-FRI + 玩股口徑 + 本地快取 + 僅核心清洗) {'='*20}")
# 1. 生成新的 Parquet 檔案 (核心轉換)
w, m, y = build_wmy_parquets(MK)
if w:
print(f"\n✅ 市場 {MK} 核心轉換成功!Parquet 檔案已輸出至 Google Drive (B 模式)。")
# --- QA 呼叫已移除以節省時間 ---
# audit_weekly_parquet(MK, w)
# audit_weekly_vs_daily(MK)
# ... (所有 audit_ 呼叫都已刪除)
# --------------------------------
qa_dir = Path(f"{DRIVE_BASE}/{MK}/_qa")
if qa_dir.exists():
print("\n🌟 核心 QA 報表輸出確認 (僅 Pingpong 略過清單):")
# 僅確認 skip_pingpong.csv 是否存在
f = qa_dir / "skip_pingpong.csv"
status = "✅" if f.exists() and f.stat().st_size > 0 else ("✅ (空表)" if f.exists() else "❌")
print(f" {status} skip_pingpong.csv")
print(f"\n🗃️ 本地日K快取保留:/content/_wmy_tmp/dayK_cache")
【技術補充】如何將雜訊轉為金礦?台股大數據的資料清洗(Data Cleaning)實作-2
更新 發佈閱讀 32 分鐘
投資理財內容聲明
留言
留言分享你的想法!
《炒股不看周月年K漲幅機率就是耍流氓》
5會員
246內容數
普通上班族,用 AI 與 Python 將炒股量化。我的數據宣言是:《炒股不做量化,都是在耍流氓》。
《炒股不看周月年K漲幅機率就是耍流氓》的其他內容
2025/11/12
我在跑台股 yearK 統計時,發現一筆堪稱「驚悚」的異常:
某檔股票的收盤價出現六位數(30 萬元)以上!
這篇不只要解剖這筆異常,更要講清楚——
其實,這不是孤例。 任何減資、拆股、反向分割的股票,都可能出現同樣錯位。
我們會一起看真實案例、驗證證據, 並展示一整套能

2025/11/12
我在跑台股 yearK 統計時,發現一筆堪稱「驚悚」的異常:
某檔股票的收盤價出現六位數(30 萬元)以上!
這篇不只要解剖這筆異常,更要講清楚——
其實,這不是孤例。 任何減資、拆股、反向分割的股票,都可能出現同樣錯位。
我們會一起看真實案例、驗證證據, 並展示一整套能

2025/11/09
簡單來說,我們用自己寫的程式把日K資料轉成月K,然後篩出2020-2025年間所有月漲幅超過100%的股票清單。
為什麼要做這個檢查
因為轉資料的程式很複雜:
要處理除權息
要過濾異常值
要偵測停牌復牌
要處理時區問題
怕哪個環節出錯,把沒漲100%的股票也算進來,或者該算的沒算到
2025/11/09
簡單來說,我們用自己寫的程式把日K資料轉成月K,然後篩出2020-2025年間所有月漲幅超過100%的股票清單。
為什麼要做這個檢查
因為轉資料的程式很複雜:
要處理除權息
要過濾異常值
要偵測停牌復牌
要處理時區問題
怕哪個環節出錯,把沒漲100%的股票也算進來,或者該算的沒算到