1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
class AsyncDistributedSingleFlight:
def __init__(self, redis: Redis, local_cache: TTLCache):
self.redis = redis
self.local_cache = local_cache
self.lock_prefix = "singleflight:lock:"
self.result_prefix = "singleflight:result:"
# 进程内的异步锁(防止同一进程内的协程竞争)
self._local_locks: Dict[str, asyncio.Lock] = {}
async def _get_local_lock(self, key: str) -> asyncio.Lock:
"""获取进程内的本地锁(每个 key 一个)"""
if key not in self._local_locks:
self._local_locks[key] = asyncio.Lock()
return self._local_locks[key]
@asynccontextmanager
async def _distributed_lock(self, key: str, timeout: int = 10):
"""异步分布式锁(基于 Redis)"""
lock_key = f"{self.lock_prefix}{key}"
identifier = str(uuid.uuid4())
# 尝试获取锁:SET NX(不存在则设置),并设置过期时间防止死锁
acquired = await self.redis.set(
lock_key, identifier, nx=True, ex=timeout
)
try:
yield acquired, identifier
finally:
# 只有持有锁的进程才能释放锁
if acquired:
current = await self.redis.get(lock_key)
if current == identifier:
await self.redis.delete(lock_key)
async def _get_result(self, key: str) -> Optional[Any]:
"""从 Redis 获取缓存结果"""
result_key = f"{self.result_prefix}{key}"
data = await self.redis.get(result_key)
return json.loads(data) if data else None
async def _set_result(self, key: str, result: Any, ttl: int = 60):
"""将结果存入 Redis 和本地缓存"""
result_key = f"{self.result_prefix}{key}"
await self.redis.set(result_key, json.dumps(result), ex=ttl)
self.local_cache[key] = result # 同步更新本地缓存
async def do(
self,
key: str,
func: Callable,
*args,
result_ttl: int = 60,
lock_timeout: int = 10,
retry_interval: float = 0.1,
**kwargs
) -> Any:
"""
异步执行函数,确保同一 key 在多进程/多协程下只有一个实例执行
:param key: 任务标识
:param func: 异步函数(需用 async def 定义)
:param result_ttl: 结果缓存时间(秒)
:param lock_timeout: 分布式锁超时时间(秒)
:param retry_interval: 等待结果的重试间隔(秒)
"""
# 1. 先查本地缓存(最快,避免跨进程交互)
if key in self.local_cache:
return self.local_cache[key]
# 2. 获取进程内本地锁(防止同一进程内的多个协程同时竞争分布式锁)
local_lock = await self._get_local_lock(key)
async with local_lock:
# 双重检查本地缓存(防止本地锁等待期间已缓存结果)
if key in self.local_cache:
return self.local_cache[key]
# 3. 查 Redis 缓存(跨进程共享的结果)
cached_result = await self._get_result(key)
if cached_result is not None:
self.local_cache[key] = cached_result
return cached_result
# 4. 竞争分布式锁,执行任务
async with self._distributed_lock(key, lock_timeout) as (acquired, _):
if acquired:
# 成功获取锁,执行实际任务(确保是异步函数)
try:
result = await func(*args, **kwargs) # 异步执行
await self._set_result(key, result, result_ttl)
return result
except Exception as e:
# 异常不缓存,直接抛出(可根据需求修改)
raise e
else:
# 未获取到锁,循环等待结果(非阻塞等待)
while True:
result = await self._get_result(key)
if result is not None:
self.local_cache[key] = result
return result
# 异步睡眠,不阻塞事件循环
await asyncio.sleep(retry_interval)
def wrap(
self,
key_func: Optional[Callable] = None,
result_ttl: int = 60,
lock_timeout: int = 10
) -> Callable:
"""装饰器版本,用于异步函数"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
# 生成 key(默认用函数路径+参数生成)
if key_func:
key = key_func(*args, **kwargs)
else:
key = f"{func.__module__}:{func.__name__}:{args}:{tuple(kwargs.items())}"
return await self.do(
key=key,
func=func,
*args,
result_ttl=result_ttl,
lock_timeout=lock_timeout,** kwargs
)
return wrapper
return decorator
|