2024-11-24|閱讀時間 ‧ 約 21 分鐘

技術筆記-socket 我來了,進到「即時通訊」的領域

這一趟越走越興奮的旅程,從 測試富邦新一代 api 開始,很自然的開始 用 python 造出自己的 gui 程式,本來也不想學太多有的沒的,頂多三五分鐘用 api 監控一次市場行情,就可以玩出很多策略了,沒想到 api 一個小小的不完美,迫使我把手伸進 socket 領域了。


原來我想監控的重要數據,是市場的「即時買賣五檔掛單」,這個在股票類的 Quote Api 是可以取得的 (asks / bids),但在相對應的期貨 Api 卻沒有這項資訊!還好有富果的社群服務人員親切解說,雖然不可能立刻請求官方改版,但目前既有的 socket 連線服務可以取得這項資訊!所以呢?誰怕誰,那就來寫 socket 程式吧。看著範例程式好像很簡單,沒想到摸下去水還有點深勒,被迫把 python class 的 instance method / class method / static method,還有 multi-thread 相關議題,都研究了一遍,滿滿學習成長的快感。為免日後再度踩坑,特此筆記重點。


首先跟著範例建立最基本的程式,不同於 web api 每呼叫一次才會回應一次,socket 連線建立之後,就等著源源不絕的資料不斷湧出:

from fubon_neo.sdk import FubonSDK, Mode
import os
from dotenv import load_dotenv

def handle_message(message):
print(message)

def handle_connect():
print('connected')

def handle_error(message):
print(message)

sdk = FubonSDK()
load_dotenv()
id = os.getenv('id')
pwd = os.getenv('pwd')
accounts = sdk.login(id, pwd, f"{id}.pfx" , f"{pwd}")

sdk.init_realtime(Mode.Normal) # Speed

socket_client = sdk.marketdata.websocket_client.futopt
socket_client.on('message', handle_message)
socket_client.on("connect", handle_connect)
socket_client.on("error", handle_error)
socket_client.connect()
socket_client.subscribe({
'channel': 'books',
'symbol': 'TMFL4', # TXFL4: 台指期 2024/12
'afterHours' : True # 夜盤行情
})
----------------------------------------------
# 以下為輸出,源源不絕一直冒出來
connected
{"event":"authenticated","data":{"message":"Authenticated successfully"}}
{"event":"pong","data":{"time":1732380011049826,"state":""}}
{"event":"subscribed","data":{"id":"YA6BZOn2ZMsXWM384XY9I8gDvKr3NDFqL0rVzRmGuPRBownX0","channel":"books","symbol":"TMFL4","afterHours":true}}
{"event":"snapshot","data":{"symbol":"TMFL4","type":"FUTURE_AH","exchange":"TAIFEX","bids":[{"price":23048,"size":5},{"price":23047,"size":3},{"price":23046,"size":2},{"price":23045,"size":11},{"price":23044,"size":1}],"asks":[{"price":23055,"size":23},{"price":23056,"size":1},{"price":23058,"size":5},{"price":23059,"size":1},{"price":23060,"size":6}],"time":1732309199945000},"id":"YA6BZOn2ZMsXWM384XY9I8gDvKr3NDFqL0rVzRmGuPRBownX0","channel":"books"}
{"event":"heartbeat","data":{"time":1732380030001084}}

以上這段,放在一個 jupyter notebook 的格子就能跑,當該格子執行完成之後,資料持續冒出,那代表它其實是執行在另一個執行緒中。所以要在另一個格子執行 socket_client.disconnect() 才可以結束它。


測試成功是很高興,但旅程才剛開始,離真正應用還有坑坑巴巴的路要走。首先是需要把此通訊功能,包到一個 class 裡面,因為一個應用絕對不止看著資料吐到 console 就結束的,需要把資料取出,顯示在圖表或其他 UI,與其他資料整合,再做出下單動作等等。若有人土炮硬幹,想把所有程式寫在同一個檔案,也很難逃避 ui 顯示的需求,終究要面對程式拆分和系統架構的議題,所以 ... 好好學吧。以下先用一個 class 把所有功能包起來:

from fubon_neo.sdk import FubonSDK, Mode
import os
from dotenv import load_dotenv

class nRealTime:
def __init__(self):
self.sdk = FubonSDK()
load_dotenv()
id = os.getenv('id')
pwd = os.getenv('pwd')
dir = os.path.dirname(os.path.abspath(__file__))
accounts = self.sdk.login(id, pwd, f"{dir}/{id}.pfx" , f"{pwd}")
print(accounts)

def handle_message(self, message):
print(message)

def handle_error(self, message):
print(message)

def connect(self):
self.sdk.init_realtime(Mode.Normal) # Speed / Normal
futopt = self.sdk.marketdata.websocket_client.futopt
futopt.on('message', self.handle_message)
futopt.on("error", self.handle_error)
futopt.connect()

futopt.subscribe({
'channel': 'books',
'symbol': 'TMFA5', # MFK4: 微台指 2024/11, TXFK4: 台指期 2024/11
'afterHours' : True # 夜盤行情
})

if __name__ == "__main__":
nRT = nRealTime()
nRT.connect()

第一個改變注意到了,要用「dir = os.path.dirname(os.path.abspath(__file__)) 」取得程式當下執行的路徑,否則連憑證檔都找不到;這也是增加程式可攜性的基本作為。改好之後,並未連線成功,注意到 login 是成功的,為何連線卻失敗?這是第一個坑:

這個錯誤訊息誤導性太強,因為並非認證 (authrntication) 問題,若朝認證問題去搜尋一定會浪費很多時間,這是回呼函數的問題。若回呼函數是某物件的 instance method,依照 python 預設的機制,第一個參數必定是 self,但 socket 元件並不知悉啊! 所以就掛了。

解法朝向「消除 self 參數」思考,第一種是把 method 變成 static,也就是在一個類別中跨實體共用,這樣就不需要也不允許 self 參數,只要加一個裝飾詞就可 @staticmethod,回呼函數的 self 改成 class name:

# 以上程式碼,修改的部分
@staticmethod
def handle_message(message):
print(message)

@staticmethod
def handle_error(message):
print(message)

def connect(self):
self.sdk.init_realtime(Mode.Normal) # Speed / Normal
futopt = self.sdk.marketdata.websocket_client.futopt

futopt.on('message', nRealTime.handle_message)
futopt.on("error", nRealTime.handle_error)
futopt.connect()

futopt.subscribe({
'channel': 'books',
'symbol': 'TMFA5', # TX423000K4: 23000買權11月第4, TMFK4: 微台指 2024/11, TXFK4: 台指期 2024/11
'afterHours' : True # 夜盤行情
})

果然可以正確連線了。那就進一步來整合到我的 ui 吧。把下列程式碼加到 ui 程式:

@staticmethod
def callback_message(message):
print(message) # 這一行會成功
# 但以下這行會失敗!因為 static method 無法存取 instance 相關資源
self.st_message.insert("1.0", message + '\n')

def socket_connect(self):
nt.sdk.init_realtime(Mode.Normal)
self.socket_client = nt.sdk.marketdata.websocket_client.futopt
self.socket_client.on('message', AppInterface.callback_message)

self.socket_client.connect()

self.socket_client.subscribe({
'channel': 'books',
'symbol': 'TMFA5', # TX423000K4: 23000買權11月第4, TMFK4: 微台指 2024/11, TXFK4: 台指期 2024/11
'afterHours' : True # 夜盤行情
})

問題來了,前述的 static method 無法存取 instance 相關的資源,而整個應用程式的主視窗也是程式啟動後動態產生的,想要把回傳訊息顯示在 ui 上完全沒有機會,實在不知道怎麼把整個 ui 元件都變成 static,這影響太大了,因此尋求第二種解決方案:用 lambda 運算子把回呼函數包起來,隱藏 self 參數:

def callback_message(self, message):
self.st_message.insert("1.0", message + '\n')

def socket_connect(self):
nt.sdk.init_realtime(Mode.Normal)
self.socket_client = nt.sdk.marketdata.websocket_client.futopt
callback_message = lambda message: self.callback_message(message)
self.socket_client.on('message', callback_message)

self.socket_client.connect()

self.socket_client.subscribe({
'channel': 'books',
'symbol': 'TMFA5', # TX423000K4: 23000買權11月第4, TMFK4: 微台指 2024/11, TXFK4: 台指期 2024/11
'afterHours' : True # 夜盤行情
})
-------------------------------------------
# 結果 ui 成功顯示一比回傳資料,但僅僅只有一筆,之後又回報與前次一樣的「誤導性錯誤訊息」
# 明明就認證成功,卻又報 Exception: authentication timeout
{"event":"authenticated","data":{"message":"Authenticated successfully"}}

這就是第二個坑。經過一番掙扎,終於搞清楚,socket connection 不知道是使用新的 thread 還是 ui 的 main thread,不管怎樣一定會出問題的。因為若是同一個 thread,會導致 ui 卡頓,若是不同的 thread,則會導致資源衝突,就是 socket thread and main thread 同時是更新 ui,ui 該何去何從?這種問題在語言設計的底層一定就排除掉了,不會允許發生的,只是報出的錯誤訊息不會詳述這些原因。因此,正規的方法就是,好好用程式控制,明確的產生另一個 thread 供 socket 使用。這需要 import threading 套件:

import threading
def socket_connect(self):
nt.sdk.init_realtime(Mode.Normal) # Speed / Normal
if self.socket_thread:
self.socket_disconnect()
self.socket_thread = threading.Thread(target = self.run_socket_connect, daemon=True)
self.socket_thread.start()

# 必須負責任的釋放 thread 相關資源
def socket_disconnect(self):
self.socket_client.disconnect()
self.socket_thread.join()
self.root.after(0, self.append_socket_message, 'disconnected')

def run_socket_connect(self):
self.socket_client = nt.sdk.marketdata.websocket_client.futopt
callback_message = lambda message: self.callback_message(message)
self.socket_client.on('message', callback_message)
callback_error = lambda message: self.callback_error(message)
self.socket_client.on("error", callback_error)
callback_connect = lambda: self.callback_connect()
self.socket_client.on("connect", callback_connect)

self.socket_client.connect()

self.socket_client.subscribe({
'channel': 'books',
'symbol': 'TMFA5', # TX423000K4: 23000買權11月第4, TMFK4: 微台指 2024/11, TXFK4: 台指期 2024/11
'afterHours' : True # 夜盤行情
})

另外,回呼函數也不能很粗暴的直接更新 ui,必須等待 main thread 有空的時候,按照順序工作,注意使用 after() 函式:

def callback_message(self, message):
# for socket safe, use after
self.root.after(0, self.append_socket_message, message)

def append_socket_message(self, message):
self.st_message.insert("1.0", message + '\n')

終於把 socket 訊息正確顯示在 ui 上了:

走到這一步,訊息流已經都串通了,想要的 bids and asks 也正確顯示了,技術性問題已經全部克服,程式交易這件事,從來沒有感覺到,似乎就快要登堂入室了。相較於從前依附在特定平台上的程式交易,掌握度和自由度都更高了。


最終的挑戰還是「交易策略」的研發!這種事情別想要有簡單的答案,也別指望 ai 了,好好潛心修練吧。當擁有更高檔的「工具」或「兵器」,操作的人自己要升級才是。


Newman 2024/11/24

導覽頁:紐曼的技術筆記-索引













分享至
成為作者繼續創作的動力吧!
© 2024 vocus All rights reserved.