python单例异步调用redis
Python 2025/3/23 13:30:56 点击:不统计
http://%77w%77%2E%66%6F%72%61%73%70%2E%63n网站制作学习
python 单例模式实现异步操作读取redis ,并通过事务判断模式 实现多步骤原子操作,代码时基于python3.12 版本,低于该版本 的修改方式在最下面
# coding: utf-8
"""异步操作单例模式,原子事务判断"""
# author: forasp.cn
# description: redis 异步操作单例模式,原子事务判断
import asyncio
import redis.asyncio as redis
from typing import Any, Optional
class AsyncRedisHandler:
_instance = None # 单例实例
_redis = None # Redis 连接对象
def __new__(
cls,
host: str = "localhost",
port: int = 6379,
password: Optional[str] = None,
username: Optional[str] = None
):
if not cls._instance:
auth_part = f"{username}:{password}@" if username and password else f":{password}@" if password else ""
cls._instance = super(AsyncRedisHandler, cls).__new__(cls)
cls._instance.redis_url = f"redis://{auth_part}{host}:{port}"
else:
print("单例模式")
return cls._instance
async def connect(self):
"""初始化 Redis 连接,支持自动重连"""
if self._redis is None:
try:
self._redis = await redis.from_url(self.redis_url, decode_responses=True)
except Exception as e:
print(f"Redis 连接失败: {e}")
self._redis = None
else:
try:
# 通过 ping() 检查连接状态
await self._redis.ping()
except Exception:
print("Redis 连接已断开,尝试重新连接...")
self._redis = await redis.from_url(self.redis_url, decode_responses=True)
async def set_value(self, key: str, value: Any) -> bool:
"""原子性写入键值对"""
await self.connect()
if self._redis:
return await self._redis.set(key, value)
return False
async def get_value(self, key: str) -> Optional[str]:
"""读取键对应的值"""
await self.connect()
if self._redis:
return await self._redis.get(key)
return None
async def delete_key(self, key: str) -> int:
"""删除指定键"""
await self.connect()
if self._redis:
return await self._redis.delete(key)
return 0
async def compare_and_set(self, key: str, expected_value: str, new_value: str) -> bool:
"""如果键的值等于 expected_value,则将其原子性地更新为 new_value"""
await self.connect()
if self._redis:
async with self._redis.pipeline() as pipe:
while True:
try:
await pipe.watch(key)
current_value = await pipe.get(key)
if current_value == expected_value:
print("原始值没有变动")
pipe.multi()
pipe.set(key, new_value)
await pipe.execute()
return True
return False
except Exception as e:
print("wait,可能发生改变 try again", str(e))
continue
async def close(self):
"""关闭 Redis 连接"""
if self._redis:
await self._redis.aclose()
self._redis = None
# 示例使用
# 这里时异步调用方法
async def main():
redis_handler = AsyncRedisHandler(password="123456")
redis_handler2 = AsyncRedisHandler(password="123456")
print("是否时单例模式:", redis_handler is redis_handler2)
await redis_handler.set_value("test_key", "initial_value")
success = await redis_handler.compare_and_set("test_key", "initial_value", "new value")
print(success) # 如果原值匹配,则返回 True,否则返回 False
value = await redis_handler.get_value("test_key")
print(value) # 输出: new value
await redis_handler.close()
asyncio.run(main())
低于python3.12 版本
原来调用:
import aioredis
self._redis = await aioredis.from_url(self._redis_url, decode_responses=True)
python 3.12以下版本 更改为:
import redis.asyncio as redis
self._redis = await redis.from_url(self._redis_url, decode_responses=True)
http://%77%77%77%2E%66网站制作%6F学习网%72%61%73%70%2E%63%6E