Redis 的分佈式鎖(Distributed Lock)是一種利用 Redis 作為儲存媒介來控制多個應用實例之間對共享資源的訪問權限的機制, 它常用於避免多個服務同時修改同一筆資料,進而產生資料不一致的情況。
🔧 基本概念

🔐 加鎖
import uuid
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
lock_key = "my_lock"
lock_value = str(uuid.uuid4()) # 每次加鎖都生成唯一值
lock_ttl = 10 # seconds
# 嘗試加鎖
acquired = r.set(lock_key, lock_value, nx=True, ex=lock_ttl)
if acquired:
print("✅ 獲得鎖")
else:
print("❌ 鎖被佔用")
🔓 釋放鎖(要確保是自己持有的才釋放)
unlock_script = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
r.eval(unlock_script, 1, lock_key, lock_value)
print("🔓 鎖已釋放")
📦 搭配第三方套件更簡單
我們可以使用 Redlock-py 這個套件來讓我們使用鎖更加的方便, 它將我們複雜的細節進行封裝, 只需要簡單的使用 「🔐 加鎖和釋放」。
from redlock import Redlock
dlm = Redlock([
{"host": "localhost", "port": 6379, "db": 0},
# 可加入多個 Redis 節點來提高可靠性
])
# 嘗試取得鎖
lock = dlm.lock("resource_key", 10000) # 10 秒
if lock:
print("✅ 獲得分佈式鎖")
# 做你的事情...
dlm.unlock(lock) # 解鎖
print("🔓 鎖已釋放")
else:
print("❌ 鎖被佔用")
🧊 適時的封裝讓複雜的程式更加簡易
import uuid
import redis
class RedisLock:
def __init__(self, redis_client, lock_key, ttl=10):
self.redis = redis_client
self.lock_key = lock_key
self.ttl = ttl
self.lock_value = str(uuid.uuid4())
def acquire(self):
return self.redis.set(self.lock_key, self.lock_value, nx=True, ex=self.ttl)
def release(self):
unlock_script = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
return self.redis.eval(unlock_script, 1, self.lock_key, self.lock_value)
def __enter__(self):
if not self.acquire():
raise RuntimeError(f"Failed to acquire lock: {self.lock_key}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
🧪 使用範例
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
with RedisLock(r, "my_lock", ttl=10):
print("✅ 取得鎖,執行關鍵區塊")
# 在這裡做你的事,比如寫入資料庫、處理任務等等
print("🔓 鎖已釋放")
結語
不管是資料庫、快取伺服器, 只要牽涉到多機, 相信大部分都會有「鎖」的機制, 避免因為大量併發而導致資料異常, 當然我們也可以使用隊列來解決這個問題, 但實作上相對複雜許多, 對於小量場景的應用情境來說, 簡易的「鎖」已經能夠滿足, 當然我們還有一些更深入的使用技巧, 之後有遇到適合的場景也會在留言版進行分享, 與大家共同學習成長。