本模組負責將各市場的日K資料轉換為週K(W-FRI)、月K、年K格式,並進行一系列清洗與報酬計算。整體流程已涵蓋:
- 極端報酬過濾(跳空、燭身、最高/最低)
- ghost 段偵測與復牌標記
- 停牌後復牌的週期性跳空處理
- 薄樣本與極端報酬的 QA 標記
- pingpong 模式偵測(疑似除權錯位)
❗ QA 模組的取捨與定位
由於 QA 模組在某些環境下容易造成當機,因此目前已暫時省略 QA 報告的輸出。即使啟用 QA,也僅是標記潛在異常,並不代表即時錯誤判斷,仍需人工抽查與校對。
實際抽查台灣市場的月K資料後,發現即便報酬率超過 100%,大多仍屬正常(如復牌、強勢飆股等),因此目前採「大致信任清洗結果」的策略,僅保留 QA 標記欄位供後續分析使用。🧭 跨市場資料的清洗策略與精確度
目前的清洗流程主要針對台灣市場進行實測與優化,包含復牌偵測、跳空過濾、ghost 段處理、pingpong 模式排除等。但對於其他市場(如美股、港股、韓股、中股)由於交易制度、除權除息邏輯、資料品質等差異較大,尚未進行深入校對。
因此,雖然清洗邏輯已涵蓋多種異常處理機制,但無法保證所有市場資料皆 100% 精確。本模組的目標是:
洗出大差不差、可用的交易資料 供後續統計分析、策略回測、教學展示等用途。
若需進一步用於實盤交易或財務報表比對,仍建議搭配人工抽查或市場特化的補強模組。
# -*- coding: utf-8 -*-
"""
日K → weekK/monthK/yearK.parquet(W-FRI + 玩股口徑 + 本地快取 + 7 QA)
+ 包含所有修正與優化 (過濾/統計/QA輸出)
"""
import os, re, glob, time, warnings, random
from math import isfinite
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys # 用於 Colab 環境檢查
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from shutil import copy2
# ===== 基本設定 =====
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore")
WRITE_MODE = 'B' # 'A' / 'B' / 'C'
# 🌟 優化 1: Row Group 翻倍或四倍
ROW_GROUP_SIZE = 512_000 # 建議 512,000
# 🌟 優化 2: 增加執行緒數,最大化 CPU 利用率
MAX_WORKERS = 16 # 建議 16 或更高
# 🌟 優化 3: 顯著加大批次合併量,減少 I/O 寫入次數
SHARD_SIZE = 10000 # 建議 8,000 - 15,000 之間
# 🌟 快取控制 (保留快取,下次執行跳過複製)
KEEP_LOCAL_CACHE = True
MARKET_LIST = [
"hk-share", "kr-share", "cn-share" , "us-share"
# "us-share", "hk-share", "jp-share", "kr-share", "cn-share",
]
MARKET_ABBR_MAP = {
"tw-share": "TW", "us-share": "US", "cn-share": "CN",
"hk-share": "HK", "jp-share": "KR", "kr-share": "KR"
}
# 根據您的需求,假設 DRIVE_BASE 和 LOCAL_BASE 如下
try:
# 檢查是否在 Colab 環境中
if 'google.colab' in sys.modules:
from google.colab import drive
try:
# 避免重複 mount 警告
drive.mount('/content/drive', force_remount=False)
except Exception:
pass
DRIVE_BASE = "/content/drive/MyDrive/各國股票檔案"
else:
DRIVE_BASE = os.path.abspath("./data_drive")
except Exception:
DRIVE_BASE = os.path.abspath("./data_drive")
LOCAL_BASE = "/content/_wmy_tmp"
LOCAL_DAYK_CACHE = f"{LOCAL_BASE}/dayK_cache" # 本地日K快取(保留)
# ===== 範圍限制(只處理 2024-01-01 ~ 2025-12-31)=====
DATE_START_STR = "2000-01-01"
DATE_END_STR = "2025-12-31"
PAD_DAYS = 14 # 緩衝天數:保留起始日前 N 天,讓第一期有前收可算
DATE_START = pd.Timestamp(DATE_START_STR)
DATE_END = pd.Timestamp(DATE_END_STR)
DATE_PAD_START = DATE_START - pd.Timedelta(days=PAD_DAYS)
# ===== 清洗與偵測參數 =====
MIN_YEAR = 1990
ALLOW_ZERO_VOLUME = True
CHECK_RETURN_D = True
# [***修正點 A***] 將日 K 極端報酬門檻調高,確保極端跳空日不會被日 K 預先刪除。
EXTREME_RET_GAP = 100.00
EXTREME_RET_CANDLE = 100.00
EXTREME_RET_MAX_FROMP = 100.00
EXTREME_RET_MIN_FROMP = 100.00
ABS_MAX_REASONABLE = 100.00
# 週級安全閥:上一週無交易且本週大跳空 → 視為拆分/復牌,不計報酬
STOPWEEK_JUMP_THRESHOLD = 0.80 # 80%
# ====== 復牌偵測參數(NEW)======
LONG_GAP_DAYS = 5 # 與前一筆有效交易日相隔 ≥ N 天 → 疑似停牌/長假
RESUME_JUMP_THRESHOLD = 0.30 # 復牌第一天:|開盤/前收 - 1| ≥ 30%
GHOST_MIN_LEN = 2 # 連續 ghost(四價相等且零量)至少幾天算占位段
PRE_RESUME_DAYS = 5 # 在 ghost 段「前面」最多回溯幾天,若大跳空也視為復牌頭
EPS_EQ = 1e-8 # 四價相等容差
SKIP_SUSPECT_STOCK = True
PINGPONG_THRESHOLD = 0.40
# [***新增 M/Y 門檻***] 穩健過濾參數
MIN_DAYS_MONTH = 5 # 月K:至少要有 5 個交易日
MIN_DAYS_YEAR = 120 # 年K:至少要有 120 個交易日 (約半年)
BIG_RET_MONTH_YEAR = 0.50 # 月/年報酬 |Ret_Trad| > 50% 且 HasResume/薄樣本 → 過濾
GAP_BIG_MONTH_YEAR = 0.50 # 月/年跳空 |Ret_Gap| > 50% 且 HasResume/薄樣本 → 過濾
# 欄位候選
DATE_COLS = ['date','日期','Date']
OPEN_COLS = ['open','開盤','開盤價','Open']
HIGH_COLS = ['high','最高','最高價','High']
LOW_COLS = ['low','低','最低價','Low']
CLOSE_COLS = ['close','收盤','收盤價','Adj Close','adj close','Close']
VOL_COLS = ['volume','成交量','Volume']
SPLIT_COLS = ['Stock Splits','stock splits','Splits']
DIV_COLS = ['Dividends','dividends']
def _is_equal4(o, h, l, c, eps=EPS_EQ):
mx = max(o, h, l, c); mn = min(o, h, l, c)
return (mx - mn) <= eps
def _pick(df, cands):
low = {str(c).lower(): c for c in df.columns}
for k in cands:
if k.lower() in low:
return low[k.lower()]
for k in cands:
rx = re.compile(k, re.I)
for c in df.columns:
if rx.search(str(c)):
return c
return None
def _read_csv_fast(path: str) -> pd.DataFrame:
try:
return pd.read_csv(path, encoding='utf-8-sig', engine='pyarrow')
except Exception:
try:
return pd.read_csv(path, encoding='utf-8', engine='pyarrow')
except Exception:
try:
return pd.read_csv(path, encoding='utf-8-sig')
except Exception:
return pd.read_csv(path, encoding='utf-8')
def _parse_id_name(stem: str):
m = re.match(r'(?P<id>[^_]+)_(?P<name>.+)$', stem)
return (m.group('id'), m.group('name')) if m else (stem, "")
def _canonical_id(raw_id: str) -> str:
return raw_id.split('.')[0] if '.' in raw_id else raw_id
def _prepare_local_dayk(market_key: str) -> str:
drive_dayk_dir = Path(f"{DRIVE_BASE}/{market_key}/dayK")
local_dayk_dir = Path(f"{LOCAL_DAYK_CACHE}/{market_key}")
local_dayk_dir.mkdir(parents=True, exist_ok=True)
csv_files = sorted(glob.glob(str(drive_dayk_dir / "*.csv")))
if not csv_files:
print(f"❌ 快取失敗: Drive 上的 {drive_dayk_dir} 找不到任何 CSV 檔案。")
return ""
def _copy_if_newer(src: str, dst_dir: Path):
dst = dst_dir / Path(src).name
try:
if not dst.exists():
copy2(src, dst)
else:
s, d = Path(src).stat(), dst.stat()
if (s.st_mtime > d.st_mtime) or (s.st_size != d.st_size):
copy2(src, dst)
except Exception as e:
print(f"⚠️ 快取檔案失敗:{src} -> {dst} ({e})")
t_start = time.time()
print(f"\n🚀 增量快取 {len(csv_files)} 檔日K CSV 到本地:{local_dayk_dir}")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
list(as_completed([executor.submit(_copy_if_newer, f, local_dayk_dir) for f in csv_files]))
print(f"✅ 快取完成!耗時 {time.time() - t_start:.2f} 秒。路徑: {local_dayk_dir}")
return str(local_dayk_dir)
def _basic_price_checks(df: pd.DataFrame, allow_zero_vol=ALLOW_ZERO_VOLUME) -> pd.DataFrame:
df = df.copy()
for c in ['開盤','最高','最低','收盤','成交量']:
if c in df.columns:
df[c] = pd.to_numeric(df[c], errors='coerce')
df = df[(df['開盤'] > 0) & (df['最高'] > 0) & (df['最低'] > 0) & (df['收盤'] > 0)]
df = df[(df['最高'] >= df[['開盤','收盤','最低']].max(axis=1)) &
(df['最低'] <= df[['開盤','收盤','最高']].min(axis=1)) &
(df['最高'] >= df['最低'])]
if not allow_zero_vol and '成交量' in df.columns:
df = df[df['成交量'] > 0]
# [***修正點 1/3:這裡暫不處理時區***]
df['日期'] = pd.to_datetime(df['日期'], errors='coerce').dt.tz_localize(None)
df = df.dropna(subset=['日期'])
df = df[df['日期'].dt.year >= MIN_YEAR]
df = df.sort_values('日期').drop_duplicates(subset=['日期']).reset_index(drop=True)
return df
def _extreme_return_filter_day(df: pd.DataFrame) -> pd.DataFrame:
if not CHECK_RETURN_D:
return df
df = df.copy()
prev_c = df['收盤'].shift(1).replace(0, np.nan)
o_safe = df['開盤'].replace(0, np.nan)
ret_gap = (o_safe / prev_c) - 1
ret_candle = (df['收盤'] / o_safe) - 1
ret_max_p = (df['最高'] / prev_c) - 1
ret_min_p = (df['最低'] / prev_c) - 1
mask_ok = (
(ret_gap.abs() < EXTREME_RET_GAP) &
(ret_candle.abs() < EXTREME_RET_CANDLE) &
(ret_max_p.abs() < EXTREME_RET_MAX_FROMP) &
(ret_min_p.abs() < EXTREME_RET_MIN_FROMP) &
(ret_gap.abs() <= ABS_MAX_REASONABLE) &
(ret_candle.abs() <= ABS_MAX_REASONABLE) &
(ret_max_p.abs() <= ABS_MAX_REASONABLE) &
(ret_min_p.abs() <= ABS_MAX_REASONABLE)
)
return df[mask_ok.fillna(False)].copy()
def _clip_by_date_range(df: pd.DataFrame, col='日期',
start=DATE_START, end=DATE_END, pad_start=DATE_PAD_START):
"""裁成 2024–2025;起始保留 pad_start 以便第一期有前收。"""
if df.empty or col not in df.columns:
return df
m = (df[col] >= pad_start) & (df[col] <= end)
return df.loc[m].copy()
# [✅ 修正過並向量化]
def _mark_resume_flags_with_ghost(df_orig: pd.DataFrame) -> pd.Series:
"""在『未刪除 ghost』表上標記復牌日。(向量化優化)"""
if df_orig.empty:
return pd.Series(False, index=df_orig.index)
df = df_orig.copy()
# 🌟 向量化檢查四價相等
o, h, l, c = df['開盤'].to_numpy(), df['最高'].to_numpy(), df['最低'].to_numpy(), df['收盤'].to_numpy()
# 檢查 NaN/Inf 且四價相等
eq_ohlc_vec = np.isfinite(o) & np.isfinite(h) & np.isfinite(l) & np.isfinite(c) & \
((np.maximum.reduce([o, h, l, c]) - np.minimum.reduce([o, h, l, c])) <= EPS_EQ)
ghost_mask = (df['成交量'].fillna(0) == 0) & eq_ohlc_vec
prev_valid_close = df['收盤'].where(~ghost_mask).shift(1).ffill()
prev_valid_date = df['日期'].where(~ghost_mask).shift(1).ffill()
gap_days = (df['日期'] - prev_valid_date).dt.days
open_to_prev = (df['開盤'] / prev_valid_close) - 1
cond_A = (gap_days >= LONG_GAP_DAYS) & (open_to_prev.abs() >= RESUME_JUMP_THRESHOLD)
g = ghost_mask.to_numpy(); n = len(df)
resume_idx = set(np.where(cond_A)[0].tolist())
i = 0
while i < n:
if g[i]:
j = i
while j < n and g[j]:
j += 1
seg_len = j - i
if seg_len >= GHOST_MIN_LEN:
for back in range(1, PRE_RESUME_DAYS + 1):
k = i - back
if k < 0: break
if (not g[k]) and (abs(open_to_prev.iloc[k]) >= RESUME_JUMP_THRESHOLD):
resume_idx.add(k)
i = j
else:
i += 1
s = pd.Series(False, index=df.index)
if resume_idx:
s.iloc[sorted(list(resume_idx))] = True
return s
# [✅ 修正過並向量化]
def _load_day_clean_full(path: str) -> pd.DataFrame:
"""正規欄位對齊 → 標記復牌 → 刪 ghost → 價量檢查 → 極端過濾"""
raw = _read_csv_fast(path)
dc = _pick(raw, DATE_COLS)
oc = _pick(raw, OPEN_COLS)
hc = _pick(raw, HIGH_COLS)
lc = _pick(raw, LOW_COLS)
cc = _pick(raw, CLOSE_COLS)
vc = _pick(raw, VOL_COLS)
sc = _pick(raw, SPLIT_COLS) # 可能不存在
if not all([dc, oc, hc, lc, cc]):
raise ValueError(f"缺必要欄位:{Path(path).name}")
if vc is None:
raw['__vol__'] = np.nan
vc = '__vol__'
df0 = raw[[dc, oc, hc, lc, cc, vc]].copy()
df0.columns = ['日期','開盤','最高','最低','收盤','成交量']
for c in ['開盤','最高','最低','收盤','成交量']:
df0[c] = pd.to_numeric(df0[c], errors='coerce')
# [***修正 1:時區邊界錯誤修正***]
df0['日期'] = pd.to_datetime(df0['日期'], errors='coerce')
if df0['日期'].dt.tz is not None:
df0['日期'] = df0['日期'].dt.tz_convert('Asia/Taipei').dt.tz_localize(None)
else:
df0['日期'] = df0['日期'].dt.tz_localize(None)
# [***修正 1 結束***]
resume_flag = _mark_resume_flags_with_ghost(df0)
# 🌟 配合向量化邏輯
o, h, l, c = df0['開盤'].to_numpy(), df0['最高'].to_numpy(), df0['最低'].to_numpy(), df0['收盤'].to_numpy()
eq_ohlc = np.isfinite(o) & np.isfinite(h) & np.isfinite(l) & np.isfinite(c) & \
((np.maximum.reduce([o, h, l, c]) - np.minimum.reduce([o, h, l, c])) <= EPS_EQ)
ghost_mask = (df0['成交量'].fillna(0) == 0) & eq_ohlc
if sc is not None and sc in raw.columns:
splits = pd.to_numeric(raw[sc], errors='coerce').fillna(0)
ghost_mask = ghost_mask | ((df0['成交量'].fillna(0) == 0) & (splits > 0))
df = df0.loc[~ghost_mask].copy()
df['resume_flag'] = resume_flag.loc[df.index]
# === [***最終修正點 B***] 強化日 K 復牌標記:捕捉未能被 Ghost 邏輯捕捉的極端跳空 ===
DAILY_JUMP_THRESHOLD = 1.00 # 100% 日報酬
prev_c_day = df['收盤'].shift(1).replace(0, np.nan)
daily_ret = (df['收盤'] / prev_c_day) - 1
extreme_daily_jump_mask = daily_ret.abs().gt(DAILY_JUMP_THRESHOLD).fillna(False)
df['resume_flag'] = df['resume_flag'] | extreme_daily_jump_mask
# ====================================================================================
df = _basic_price_checks(df)
if df.empty:
raise ValueError("清洗後為空")
df = _extreme_return_filter_day(df)
if df.empty:
raise ValueError("極端報酬過濾後為空")
return df
def _resample_ohlc_with_flags(df: pd.DataFrame, rule: str) -> pd.DataFrame:
df = df.copy()
# [***修正 2:移除多餘的 tz_localize(None)***]
df['日期'] = pd.to_datetime(df['日期'], errors='coerce')
g = df.set_index('日期').sort_index()
if rule == 'W': # 以週五結算(與玩股網一致)
rule_code, label, closed = 'W-FRI', 'right', 'right'
period_name = 'Week'
elif rule == 'M':
rule_code, label, closed = 'ME', 'right', 'right'
period_name = 'Period'
elif rule == 'Y':
rule_code, label, closed = 'YE', 'right', 'right'
period_name = 'Period'
else:
raise ValueError("rule must be 'W', 'M', or 'Y'.")
o = g['開盤'].resample(rule_code, label=label, closed=closed).first()
h = g['最高'].resample(rule_code, label=label, closed=closed).max()
l = g['最低'].resample(rule_code, label=label, closed=closed).min()
c = g['收盤'].resample(rule_code, label=label, closed=closed).last()
v = g['成交量'].resample(rule_code, label=label, closed=closed).sum(min_count=1)
# 期間交易天數
n = g['收盤'].resample(rule_code, label=label, closed=closed).count().rename(f'Cur{period_name}_Days')
# 期間是否發生過 resume_flag
has_resume = None
if 'resume_flag' in g.columns:
has_resume = g['resume_flag'].resample(rule_code, label=label, closed=closed).max().rename('HasResume')
parts = [o, h, l, c, v, n]
if has_resume is not None:
parts.append(has_resume)
out = pd.concat(parts, axis=1).dropna(subset=['開盤','收盤'])
cols = ['開盤','最高','最低','收盤','成交量', f'Cur{period_name}_Days']
if has_resume is not None:
cols.append('HasResume')
out.columns = cols
# 上期交易天數
out[f'Prev{period_name}_Days'] = out[f'Cur{period_name}_Days'].shift(1).fillna(0).astype(int)
if rule == 'W':
iso = out.index.to_series().dt.isocalendar()
out['ISO_Week'] = iso.year.astype(str) + '-' + iso.week.astype(str).str.zfill(2)
out = out.rename(columns={'CurWeek_Days': 'CurWeek_Days', 'PrevWeek_Days': 'PrevWeek_Days'})
out = out.reset_index().rename(columns={'index':'日期'})
return _basic_price_checks(out)
def _add_period_returns(df: pd.DataFrame, freq_code: str) -> pd.DataFrame:
df = df.sort_values('日期').reset_index(drop=True).copy()
prev_c = df['收盤'].shift(1).replace(0, np.nan)
o = df['開盤'].replace(0, np.nan)
h = df['最高'].replace(0, np.nan)
l = df['最低'].replace(0, np.nan)
c = df['收盤'].replace(0, np.nan)
# 報酬率計算 (與您原代碼一致)
df[f'PrevC_{freq_code}'] = prev_c
df[f'Ret_Gap_{freq_code}'] = (o / prev_c) - 1
df[f'Ret_Trad_{freq_code}'] = (c - prev_c) / prev_c
df[f'Ret_C_{freq_code}'] = (c - o) / o
df[f'Ret_Max_H_{freq_code}'] = (h - prev_c) / prev_c
df[f'Ret_Min_L_{freq_code}'] = (l - prev_c) / prev_c
df[f'Range_{freq_code}'] = (h - l) / o
df[f'Ret_Max_H_Pos_{freq_code}'] = df[f'Ret_Max_H_{freq_code}'].clip(lower=0)
df[f'Ret_H_from_O_{freq_code}'] = (h - o) / o
df[f'Ret_L_from_O_{freq_code}'] = (l - o) / o
df[f'Ret_End_RelH_{freq_code}'] = (c - h) / h
df[f'Ret_End_RelL_{freq_code}'] = (c - l) / l
# --- 過濾邏輯 ---
final_clear_mask = pd.Series(False, index=df.index)
cols_to_nan = [f'Ret_Trad_{freq_code}', f'Ret_Max_H_{freq_code}', f'Ret_Min_L_{freq_code}']
# 預設 IsFiltered_QA 為 0 (僅 M/Y 需要)
if freq_code in ('M', 'Y'):
df['IsFiltered_QA'] = 0
if freq_code == 'W' and 'PrevWeek_Days' in df.columns:
# 1. 週K 玩股口徑過濾:上一週無交易且本週大跳空
big_jump = (df[f'PrevC_W'] > 0) & (df['開盤'] > 0) & ((df['開盤'] / df[f'PrevC_W'] - 1).abs() > STOPWEEK_JUMP_THRESHOLD)
prev_w_no_trade = (df['PrevWeek_Days'] == 0)
jump_mask = big_jump & prev_w_no_trade
final_clear_mask = final_clear_mask | jump_mask
# 2. 週K HasResume + 極端事件過濾 ([***最終修正點 C***])
EXTREME_RET_USER_REQ = 1.00 # 100%
BIG_RET_THRESHOLD = 0.50
has_resume_mask = df.get('HasResume', pd.Series(False, index=df.index)).fillna(0).gt(0)
is_extreme_event_current_week = has_resume_mask & df[f'Ret_Trad_W'].abs().gt(EXTREME_RET_USER_REQ)
skip_current_week_from_previous_event = is_extreme_event_current_week.shift(1).fillna(False)
mask_thin_or_large_jump = has_resume_mask & (df['CurWeek_Days'].lt(3) | df[f'Ret_Trad_W'].abs().gt(BIG_RET_THRESHOLD))
final_clear_mask = final_clear_mask | is_extreme_event_current_week | skip_current_week_from_previous_event | mask_thin_or_large_jump
elif freq_code in ('M', 'Y') and 'HasResume' in df.columns:
# 3. 月/年K 穩健過濾:薄樣本 + 停牌復牌 + 大跳空
period_name = 'Period'
# [***修正 3:修正 Days 欄位名稱判斷***]
days_col = 'CurPeriod_Days' if 'CurPeriod_Days' in df.columns else f'Cur{period_name}_Days'
prev_days_col = 'PrevPeriod_Days' if 'PrevPeriod_Days' in df.columns else f'Prev{period_name}_Days'
min_days = MIN_DAYS_MONTH if freq_code == 'M' else MIN_DAYS_YEAR
# 條件 A: 覆蓋率不足 (新掛牌或殘年)
small_coverage_mask = df[days_col].fillna(0).astype(int) < min_days
# 條件 B: 停牌/復牌週期極端值
has_resume = df['HasResume'].fillna(0).gt(0)
thin_mask = df[days_col].fillna(0).astype(int) < 3 # 極薄樣本 (3日以下)
big_gap_mask = df[f'Ret_Gap_{freq_code}'].abs() > GAP_BIG_MONTH_YEAR
big_ret_mask = df[f'Ret_Trad_{freq_code}'].abs() > BIG_RET_MONTH_YEAR
resume_extreme_mask = has_resume & (thin_mask | big_ret_mask | big_gap_mask)
# 條件 C: 上期無交易 + 本期大跳空 (類似週K的玩股口徑)
stop_jump_mask = (df[prev_days_col].fillna(0).astype(int) == 0) & (df[f'Ret_Gap_{freq_code}'].abs() > STOPWEEK_JUMP_THRESHOLD)
final_clear_mask = final_clear_mask | small_coverage_mask | stop_jump_mask | resume_extreme_mask
# 🎯 [***新增 QA 統計***]:標記被過濾的行
df.loc[final_clear_mask, 'IsFiltered_QA'] = 1
# 應用過濾:將滿足過濾條件的報酬設為 NaN
if final_clear_mask.any():
df.loc[final_clear_mask, cols_to_nan] = np.nan
return df
class ParquetStreamer:
def __init__(self, path: str, keep_cols=None, row_group_size: int = ROW_GROUP_SIZE):
self.path = path
self.writer = None
self.schema = None
self.rows = 0
self.keep_cols = keep_cols
self.row_group_size = row_group_size
def _ensure_writer(self, table: pa.Table):
if self.writer is None:
Path(self.path).parent.mkdir(parents=True, exist_ok=True)
self.schema = table.schema
self.writer = pq.ParquetWriter(self.path, self.schema, compression='snappy')
def append_df(self, df: pd.DataFrame):
if df is None or df.empty:
return
df = df.copy()
if self.keep_cols is not None:
for c in self.keep_cols:
if c not in df.columns:
df[c] = np.nan
df = df.reindex(columns=self.keep_cols)
if '日期' in df.columns:
df['日期'] = pd.to_datetime(df['日期'], errors='coerce').dt.tz_localize(None)
# 確保 IsFiltered_QA 欄位是整數
if 'IsFiltered_QA' in df.columns:
df['IsFiltered_QA'] = pd.to_numeric(df['IsFiltered_QA'], errors='coerce').fillna(0).astype('int32')
for c in df.columns:
if df[c].dtype in ['int64','float64','int32','float32'] and c not in ['ISO_Week', 'IsFiltered_QA']:
df[c] = pd.to_numeric(df[c], errors='coerce').astype(np.float64)
table = pa.Table.from_pandas(df, preserve_index=False)
self._ensure_writer(table)
if table.schema != self.schema:
table = pa.Table.from_pandas(df.reindex(columns=[f.name for f in self.schema]), preserve_index=False).cast(self.schema, safe=False)
self.writer.write_table(table, row_group_size=self.row_group_size)
self.rows += len(df)
def close(self):
if self.writer is not None:
try:
self.writer.close()
except:
pass











