在 PyQt 中,信號與槽(Signal & Slot)機制是用來實現物件間通信的核心機制。
當信號被發射時,槽函數(Slot)根據預先連接的規則被調用。這一過程有時候會呈現出「排隊」的現象,即信號並非立即執行,而是先放入事件隊列,等待事件循環(Event Loop)逐一處理。本文將介紹其原理、驗證方法以及如何處理排隊現象的方法。1. 信號與槽的連接方式
1.1 直接連接 (Direct Connection)
- 情況:當信號與槽位於同一個執行緒中時,預設使用直接連接。
- 行為:信號發射後,會立即調用槽函數,不通過事件隊列。
- 特點:執行效率高,但如果槽函數耗時,可能會阻塞信號發射者的程式流程。
1.2 排隊連接 (Queued Connection)
- 情況:當信號與槽不在同一個執行緒中,或者明確指定使用 Queued Connection。
- 行為:信號被發射後,會先放入事件隊列,等待事件循環處理時,再調用槽函數。
- 特點:這種方式保證信號發射是非阻塞的,可以避免長時間運算影響主執行緒,但會產生「排隊」效果,即信號會依次等待執行。
提示: PyQt 會根據信號和槽所在的執行緒自動選擇使用 Direct Connection 或 Queued Connection,也可以在連接時指定連接方式。
2. 信號排隊的原理與驗證
當一個信號被發射時:
- 如果使用 Queued Connection,該信號會被放入事件隊列中,等到事件循環處理到該信號時,槽函數才會執行。
- 這樣可以避免信號發射時因槽函數執行耗時而阻塞當前程式邏輯。
2.1 實驗程式驗證信號排隊
下面是一個實驗程式,用來驗證當信號發射速度快於槽函數處理速度時,信號如何排隊處理。
範例程式
from PyQt5.QtCore import QObject, pyqtSignal, QThread, QCoreApplication
import time
class Worker(QObject):
long_task_signal = pyqtSignal(int) # 定義信號,帶整數參數
def long_task(self):
for i in range(5):
print(f"Emitting signal {i+1}")
self.long_task_signal.emit(i + 1)
time.sleep(0.5) # 模擬一個耗時任務
def handle_signal(value):
print(f"Handling signal with value: {value}")
time.sleep(1) # 模擬槽函數執行時間
app = QCoreApplication([])
worker = Worker()
worker.long_task_signal.connect(handle_signal)
# 在一個子執行緒中執行 long_task
thread = QThread()
worker.moveToThread(thread)
thread.started.connect(worker.long_task)
thread.start()
app.exec_()
輸出結果

Emitting signal 1
Handling signal with value: 1
Emitting signal 2
Emitting signal 3
Handling signal with value: 2
Emitting signal 4
Emitting signal 5
Handling signal with value: 3
Handling signal with value: 4
Handling signal with value: 5
分析:
- 當 Worker 發射信號 1 時,槽函數 handle_signal 被觸發,開始執行並睡眠 1 秒。
- 在這段期間,Worker 繼續以每 0.5 秒發射後續信號(如信號 2 和 3)。
- 由於 handle_signal 還沒完成,這些信號被排入事件隊列,等待前面的槽處理完成後依次執行。
- 最終各個信號依次被處理,這就展示了信號的排隊行為。
3. 處理信號排隊的情況
如果在實際應用中,發現信號排隊導致槽函數處理延遲,可能需要採取一些措施來解決或緩解這一問題。以下提供三種常見方法:
3.1 使用多執行緒處理信號
將槽函數移至獨立的執行緒,可以使得長時間運算不會阻塞信號發射者。
例如,將處理信號的 TaskHandler 移到一個獨立的 QThread:
import sys
import time
from PyQt5.QtCore import QObject, pyqtSignal, QThread
from PyQt5.QtWidgets import QApplication
class Worker(QObject):
long_task_signal = pyqtSignal(int)
def long_task(self):
for i in range(5):
print(f"Emitting signal {i+1}")
self.long_task_signal.emit(i + 1)
time.sleep(0.5)
class TaskHandler(QObject):
@staticmethod
def handle_signal(value):
print(f"Handling signal with value: {value}")
time.sleep(1)
if __name__ == '__main__':
app = QApplication(sys.argv)
worker = Worker()
handler = TaskHandler()
# 建立子執行緒並將 handler 移動到該執行緒
thread = QThread()
handler.moveToThread(thread)
worker.long_task_signal.connect(handler.handle_signal)
thread.start()
worker.long_task()
sys.exit(app.exec_())
結果:

Emitting signal 1
Handling signal with value: 1
Emitting signal 2
Handling signal with value: 2
Emitting signal 3
Handling signal with value: 3
Emitting signal 4
Handling signal with value: 4
Emitting signal 5
Handling signal with value: 5
說明:
透過多執行緒方式,槽函數在獨立執行緒中運行,不會阻塞主執行緒,同時排隊的信號也能更快處理。
2. 調整信號發射的節奏
如果信號發射太頻繁,可以考慮調整信號的發射速率,避免過多信號同時進入事件循環。
import time
from PyQt5.QtCore import QObject, pyqtSignal
class Worker(QObject):
progress_signal = pyqtSignal(int)
def long_task(self):
for i in range(5):
self.progress_signal.emit(i + 1)
time.sleep(1) # 延遲發射信號的時間
3.2 調整信號發射的節奏
如果信號發射太頻繁,可以在發射信號之間加入適當的延遲,從而降低事件隊列的壓力。
class Worker(QObject):
progress_signal = pyqtSignal(int)
def long_task(self):
for i in range(5):
self.progress_signal.emit(i + 1)
time.sleep(1) # 延長發射間隔
這樣可以避免信號過多堆積在隊列中,從而使槽函數能夠逐一平穩處理。
3.3 使用緩衝機制
如果某些應用中信號發射頻率非常高,可能需要設置緩衝機制來對多餘的信號進行合併或丟棄。這通常需要根據具體應用場景自行設計邏輯,
例如:
- 設置計數器,在一定時間內只處理最新的信號;
- 將連續多個信號合併成一個統計結果再傳遞給槽函數。
4. 小結
- 信號與槽機制:
PyQt 中的信號與槽使得物件之間的通信變得靈活且異步。根據執行緒的不同,連接方式可能是直接連接(即時調用)或排隊連接(通過事件隊列)。 - 排隊現象:
當槽函數耗時較長時,信號發射者不會等待槽函數完成,而是將信號放入事件隊列,依次等待處理,這就導致了信號排隊的現象。 - 處理方法:
- 利用多執行緒將耗時的槽函數移到獨立的執行緒中;
- 調整信號發射節奏,減少過快發射;
- 設計合適的緩衝或合併機制來減少信號過載。
透過上述理論解釋與實際範例,學員可以更深入地理解 PyQt 信號與槽的工作原理及如何應對排隊現象,從而在開發中更靈活、高效地使用這一機制。