【技術補充】如何將雜訊轉為金礦?台股大數據的資料清洗(Data Cleaning)實作-2

更新 發佈閱讀 32 分鐘
投資理財內容聲明





def detect_pingpong_patterns(day_df: pd.DataFrame, price_col='收盤', threshold=0.4) -> pd.DataFrame:

    # ... (函式內容保持不變) ...

    df = day_df.sort_values('日期').reset_index(drop=True).copy()

    if price_col not in df.columns:

        return pd.DataFrame()

    df['pct_change'] = df[price_col].pct_change()

    rows = []

    for i in range(1, len(df)-1):

        prev_change = df.loc[i, 'pct_change']

        next_change = df.loc[i+1, 'pct_change']

        if pd.notna(prev_change) and pd.notna(next_change):

            if abs(prev_change) > threshold and abs(next_change) > threshold and prev_change * next_change < 0:

                rows.append({

                    '日期(異常日)': df.loc[i, '日期'],

                    '前日收盤': df.loc[i-1, price_col],

                    '當日收盤': df.loc[i, price_col],

                    '次日收盤': df.loc[i+1, price_col],

                    '前日變動(%)': round(prev_change * 100, 2),

                    '次日變動(%)': round(next_change * 100, 2),

                })

    return pd.DataFrame(rows)



def _process_one_file(path: str):

    try:

        df_day = _load_day_clean_full(path)

        # 僅保留 20242025(含起始緩衝)

        df_day = _clip_by_date_range(df_day, col='日期',

                                     start=DATE_START, end=DATE_END, pad_start=DATE_PAD_START)



        stem = Path(path).stem

        raw_id, _ = _parse_id_name(stem)

        cid = _canonical_id(raw_id) # StockID 必須在這裡定義



        if df_day.empty or df_day['日期'].max() < DATE_PAD_START:

            return "SKIP", {'StockID': cid, 'file': Path(path).name,

                            'count': 0, 'reason': 'out_of_range'}, f"{Path(path).name}: out_of_range"



        if SKIP_SUSPECT_STOCK:

            suspects = detect_pingpong_patterns(df_day, threshold=PINGPONG_THRESHOLD)

            if not suspects.empty:

                msg = f"{Path(path).name}: [SKIP_PINGPONG] 命中疑似除權錯位 {len(suspects)} 筆"

                # 將 cid 傳入 payload

                return "SKIP", {'StockID': cid, 'file': Path(path).name, 'count': len(suspects), 'reason': 'suspect_pingpong'}, msg



        # 週//K 轉換

        dfw = _resample_ohlc_with_flags(df_day, 'W'); dfw = _add_period_returns(dfw, 'W'); dfw['成交量'] = dfw['成交量'].astype('float64'); dfw['StockID'] = cid

        dfm = _resample_ohlc_with_flags(df_day, 'M'); dfm = _add_period_returns(dfm, 'M'); dfm['成交量'] = dfm['成交量'].astype('float64'); dfm['StockID'] = cid

        dfy = _resample_ohlc_with_flags(df_day, 'Y'); dfy = _add_period_returns(dfy, 'Y'); dfy['成交量'] = dfy['成交量'].astype('float64'); dfy['StockID'] = cid



        # 週//K 只留 20242025(不需要 pad)

        for _df in (dfw, dfm, dfy):

            _df.dropna(subset=['日期'], inplace=True)

            _df['日期'] = pd.to_datetime(_df['日期']).dt.tz_localize(None)

        dfw = dfw[(dfw['日期'] >= DATE_START) & (dfw['日期'] <= DATE_END)]

        dfm = dfm[(dfm['日期'] >= DATE_START) & (dfm['日期'] <= DATE_END)]

        dfy = dfy[(dfy['日期'] >= DATE_START) & (dfy['日期'] <= DATE_END)]



        return True, (dfw, dfm, dfy), None

    except Exception as e:

        return False, None, f"{Path(path).name}: {e}"



def _split_into_shards(items, shard_size):

    return [items[i:i+shard_size] for i in range(0, len(items), shard_size)] if shard_size > 0 else [items]



def build_wmy_parquets(market_key: str):

    base_dir = LOCAL_BASE if WRITE_MODE in ('A','B') else DRIVE_BASE

    market_abbr = MARKET_ABBR_MAP.get(market_key, market_key.replace('-share','').upper())



    # 1) 本地快取

    local_dayk_dir = _prepare_local_dayk(market_key)

    if not local_dayk_dir:

        return None, None, None



    # 2) 輸出路徑

    out_week  = f"{base_dir}/{market_key}/weekK_{market_abbr}.parquet"

    out_month = f"{base_dir}/{market_key}/monthK_{market_abbr}.parquet"

    out_year  = f"{base_dir}/{market_key}/yearK_{market_abbr}.parquet"

    Path(f"{base_dir}/{market_key}").mkdir(parents=True, exist_ok=True)



    # 3) 檔案清單

    files = sorted(glob.glob(f"{local_dayk_dir}/*.csv"))

    if not files:

        print(f"❌ 快取路徑 {local_dayk_dir} 內無檔案,程式終止。")

        return None, None, None



    shards = _split_into_shards(files, SHARD_SIZE)

    print(f"\n📦 [{market_key}] 共 {len(files)} 檔,分 {len(shards)} 批處理")



    # 調整欄位列表以包含 IsFiltered_QA

    base_front = ['StockID','日期','開盤','最高','最低','收盤','成交量']

    base_common_wmy = ['CurPeriod_Days','PrevPeriod_Days', 'HasResume', 'IsFiltered_QA']

    base_common_w = ['CurWeek_Days','PrevWeek_Days', 'HasResume']



    ret_cols = [f'PrevC_{f}' for f in ['W','M','Y']] + [f'Ret_Gap_{f}' for f in ['W','M','Y']] + \

               [f'Ret_Trad_{f}' for f in ['W','M','Y']] + [f'Ret_C_{f}' for f in ['W','M','Y']] + \

               [f'Ret_Max_H_{f}' for f in ['W','M','Y']] + [f'Ret_Min_L_{f}' for f in ['W','M','Y']] + \

               [f'Range_{f}' for f in ['W','M','Y']] + [f'Ret_Max_H_Pos_{f}' for f in ['W','M','Y']] + \

               [f'Ret_H_from_O_{f}' for f in ['W','M','Y']] + [f'Ret_L_from_O_{f}' for f in ['W','M','Y']] + \

               [f'Ret_End_RelH_{f}' for f in ['W','M','Y']] + [f'Ret_End_RelL_{f}' for f in ['W','M','Y']]

    ret_cols = sorted(list(set(ret_cols)))



    wk_cols = base_front + base_common_w + [c for c in ret_cols if c.endswith('_W')] + ['ISO_Week']

    mo_cols = base_front + base_common_wmy + [c for c in ret_cols if c.endswith('_M')]

    yr_cols = base_front + base_common_wmy + [c for c in ret_cols if c.endswith('_Y')]



    # 移除重複的 Period/Week 欄位

    mo_cols = [c for c in mo_cols if not c.startswith('CurWeek') and not c.startswith('PrevWeek')]

    yr_cols = [c for c in yr_cols if not c.startswith('CurWeek') and not c.startswith('PrevWeek')]

    wk_cols = [c for c in wk_cols if not c.startswith('CurPeriod') and not c.startswith('PrevPeriod') and c != 'IsFiltered_QA']



    w_writer = ParquetStreamer(out_week,  keep_cols=wk_cols)

    m_writer = ParquetStreamer(out_month, keep_cols=mo_cols)

    y_writer = ParquetStreamer(out_year,  keep_cols=yr_cols)



    total_ok = total_fail = 0

    skip_records = []



    t0 = time.time()

    for si, shard in enumerate(shards, 1):

        print(f"🧩 Shard {si}/{len(shards)}: {len(shard)} 檔")

        shard_w, shard_m, shard_y = [], [], []



        # [***修正點: 將 futs 和 as_completed 移入 with 區塊***]

        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:

            # 建立 futures 字典,用於追蹤檔案路徑

            futs = {ex.submit(_process_one_file, p): p for p in shard}



            # 遍歷 futures

            for f in as_completed(futs):

                path = futs[f]  # 從字典中獲取當前檔案路徑

                try:

                    status, payload, err = f.result()

                except Exception as e:

                    # 處理執行緒層級的極端錯誤

                    total_fail += 1

                    err = f"{Path(path).name}: Thread execution failed ({e})"

                    if total_fail <= 20:

                        print(" ❌", err)

                    continue



                if status is True:

                    dfw, dfm, dfy = payload

                    shard_w.append(dfw); shard_m.append(dfm); shard_y.append(dfy)

                    total_ok += 1

                elif status == "SKIP":

                    info = payload or {}

                    # 從 payload 中安全獲取 StockID

                    stock_id = info.get('StockID', Path(path).stem.split("_")[0])

                    skip_records.append({'StockID': stock_id, 'file': Path(path).name,

                                         'reason': info.get('reason', 'suspect_pingpong'), 'count': info.get('count', 0)})

                    if total_fail + total_ok + len(skip_records) <= 20 and err:

                        print(" ⏭️", err)

                else:

                    total_fail += 1

                    if total_fail <= 20 and err:

                        print(" ⚠️", err)



        if shard_w: w_writer.append_df(pd.concat(shard_w, ignore_index=True))

        if shard_m: m_writer.append_df(pd.concat(shard_m, ignore_index=True))

        if shard_y: y_writer.append_df(pd.concat(shard_y, ignore_index=True))

        time.sleep(0.5 + random.random()*0.5)



    w_writer.close(); m_writer.close(); y_writer.close()

    print(f"⏱️ 批次耗時:{time.time() - t0:.1f} 秒")



    if skip_records:

        qa_dir = Path(f"{DRIVE_BASE}/{market_key}/_qa")

        qa_dir.mkdir(parents=True, exist_ok=True)

        pd.DataFrame(skip_records).to_csv(qa_dir / "skip_pingpong.csv", index=False, encoding='utf-8-sig')

        print(f"🧹 已輸出略過名單:{qa_dir / 'skip_pingpong.csv'}({len(skip_records)} 檔)")



    market_abbr = MARKET_ABBR_MAP.get(market_key, market_key.replace('-share','').upper())

    final_week  = f"{DRIVE_BASE}/{market_key}/weekK_{market_abbr}.parquet"

    final_month = f"{DRIVE_BASE}/{market_key}/monthK_{market_abbr}.parquet"

    final_year  = f"{DRIVE_BASE}/{market_key}/yearK_{market_abbr}.parquet"



    if WRITE_MODE == 'A':

        for src, dst in [(out_week, final_week), (out_month, final_month), (out_year, final_year)]:

            if Path(src).exists():

                Path(dst).parent.mkdir(parents=True, exist_ok=True)

                Path(dst).unlink(missing_ok=True)

                Path(src).replace(dst)

        print("📤 A 模式:已將本機 Parquet 移動到 Google Drive")



    elif WRITE_MODE == 'B':

        for src, dst in [(out_week, final_week), (out_month, final_month), (out_year, final_year)]:

            if Path(src).exists():

                Path(dst).parent.mkdir(parents=True, exist_ok=True)

                copy2(str(src), str(dst))

        print("📤 B 模式:已將本機 Parquet 複製到 Google Drive(本機仍保留)")



    else:   # WRITE_MODE == 'C'

        final_week, final_month, final_year = out_week, out_month, out_year

        print("☁️ C 模式:直接寫入 Google Drive")



    return final_week, final_month, final_year



# --------------------------------------------------------------------------------

# Colab Cell 4: QA 稽核函式 (Part 4) - QA 僅作為定義,不會被呼叫

# --------------------------------------------------------------------------------



def _geom_chain_return(day_df: pd.DataFrame, start, end) -> float:

    # ... (函式內容保持不變) ...

    sub = day_df[(day_df['日期'] >= start) & (day_df['日期'] <= end)].copy().sort_values('日期')

    c = sub['收盤'].to_numpy(dtype='float64')

    if len(c) < 2 or not np.all(np.isfinite(c)) or np.any(c <= 0):

        return np.nan

    return float(np.prod(c[1:] / c[:-1]) - 1.0)



def _week_bins():

    # ... (函式內容保持不變) ...

    edges = [-1000, -200, -100, -90, -80, -70, -60, -50, -40, -30, -20, -10, 0,

             10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 200, 1000, 10000]

    labels = [f"{edges[i]}~{edges[i+1]}%" for i in range(len(edges)-1)]

    return edges, labels



def audit_weekly_parquet(market_key: str, week_path: str):

    # 此函式不會被呼叫

    pass



def audit_weekly_vs_daily(market_key: str):

    # 此函式不會被呼叫

    pass



def audit_weekly_high_consistency(market_key: str, week_path: str):

    # 此函式不會被呼叫

    pass



def audit_weekly_high_tail(market_key: str, week_path: str):

    # 此函式不會被呼叫

    pass



def audit_weekly_nan_filter(market_key: str, week_path: str):

    # 此函式不會被呼叫

    pass



def audit_monthly_yearly_filter(market_key: str, month_path: str, year_path: str):

    # 此函式不會被呼叫

    pass



# --------------------------------------------------------------------------------

# Colab Cell 5: 主流程執行 (Part 5) - 已移除 QA 呼叫

# --------------------------------------------------------------------------------



if __name__ == "__main__":

    print(f"⏳ 僅處理期間:{DATE_START_STR} ~ {DATE_END_STR}(含起始緩衝 {PAD_DAYS} 天)")

    print(f"⚠️ 月K/年K 報酬過濾已啟用:小於 {MIN_DAYS_MONTH} / {MIN_DAYS_YEAR} 交易日,或遇停牌/極端跳空報酬,該期報酬將被設為 NaN。")

    print(f"🛠️ [時區修正已啟用] 修正月份邊界錯誤(如 9/1 誤算進 8 月)。")



    for MK in MARKET_LIST:

        print(f"\n{'='*20} 處理市場:{MK}(W-FRI + 玩股口徑 + 本地快取 + 僅核心清洗) {'='*20}")



        # 1. 生成新的 Parquet 檔案 (核心轉換)

        w, m, y = build_wmy_parquets(MK)



        if w:

            print(f"\n✅ 市場 {MK} 核心轉換成功!Parquet 檔案已輸出至 Google Drive (B 模式)。")



            # --- QA 呼叫已移除以節省時間 ---

            # audit_weekly_parquet(MK, w)

            # audit_weekly_vs_daily(MK)

            # ... (所有 audit_ 呼叫都已刪除)

            # --------------------------------



        qa_dir = Path(f"{DRIVE_BASE}/{MK}/_qa")

        if qa_dir.exists():

            print("\n🌟 核心 QA 報表輸出確認 (僅 Pingpong 略過清單):")

            # 僅確認 skip_pingpong.csv 是否存在

            f = qa_dir / "skip_pingpong.csv"

            status = "✅" if f.exists() and f.stat().st_size > 0 else ("✅ (空表)" if f.exists() else "❌")

            print(f"   {status} skip_pingpong.csv")



        print(f"\n🗃️ 本地日K快取保留:/content/_wmy_tmp/dayK_cache")
留言
avatar-img
留言分享你的想法!
avatar-img
《炒股不看周月年K漲幅機率就是耍流氓》
5會員
246內容數
普通上班族,用 AI 與 Python 將炒股量化。我的數據宣言是:《炒股不做量化,都是在耍流氓》。
2025/11/12
我在跑台股 yearK 統計時,發現一筆堪稱「驚悚」的異常: 某檔股票的收盤價出現六位數(30 萬元)以上! 這篇不只要解剖這筆異常,更要講清楚—— 其實,這不是孤例。 任何減資、拆股、反向分割的股票,都可能出現同樣錯位。 我們會一起看真實案例、驗證證據, 並展示一整套能
Thumbnail
2025/11/12
我在跑台股 yearK 統計時,發現一筆堪稱「驚悚」的異常: 某檔股票的收盤價出現六位數(30 萬元)以上! 這篇不只要解剖這筆異常,更要講清楚—— 其實,這不是孤例。 任何減資、拆股、反向分割的股票,都可能出現同樣錯位。 我們會一起看真實案例、驗證證據, 並展示一整套能
Thumbnail
2025/11/09
簡單來說,我們用自己寫的程式把日K資料轉成月K,然後篩出2020-2025年間所有月漲幅超過100%的股票清單。 為什麼要做這個檢查 因為轉資料的程式很複雜: 要處理除權息 要過濾異常值 要偵測停牌復牌 要處理時區問題 怕哪個環節出錯,把沒漲100%的股票也算進來,或者該算的沒算到
2025/11/09
簡單來說,我們用自己寫的程式把日K資料轉成月K,然後篩出2020-2025年間所有月漲幅超過100%的股票清單。 為什麼要做這個檢查 因為轉資料的程式很複雜: 要處理除權息 要過濾異常值 要偵測停牌復牌 要處理時區問題 怕哪個環節出錯,把沒漲100%的股票也算進來,或者該算的沒算到
看更多