python单例同步调用redis
Python 2025/3/23 13:44:30 点击:不统计
<网f站o学a习s制p作.cn>
python 调用同步调用redis ,并实现 事务操作, 代码采用python3.12 低于3.12版本 需要修改,修改方法在最下面
# coding: utf-8
"""同步操作单例模式,原子事务判断"""
# author: forasp.cn
# description: redis 同步操作单例模式,原子事务判断
import redis
from typing import Any, Optional
class RedisHandler:
_instance = None # 单例实例
_redis = None # Redis 连接对象
def __new__(
cls,
host: str = "localhost",
port: int = 6379,
password: Optional[str] = None,
username: Optional[str] = None
):
if not cls._instance:
auth_part = f"{username}:{password}@" if username and password else f":{password}@" if password else ""
cls._instance = super(RedisHandler, cls).__new__(cls)
cls._instance.redis_url = f"redis://{auth_part}{host}:{port}"
else:
print("单例模式")
return cls._instance
def connect(self):
"""初始化 Redis 连接,支持自动重连"""
if self._redis is None:
try:
self._redis = redis.from_url(
self.redis_url, decode_responses=True)
except Exception as e:
print(f"Redis 连接失败: {e}")
self._redis = None
else:
try:
# 通过 ping() 检查连接状态
self._redis.ping()
except Exception:
print("Redis 连接已断开,尝试重新连接...")
self._redis = redis.from_url(
self.redis_url, decode_responses=True)
def set_value(self, key: str, value: Any) -> bool:
"""原子性写入键值对"""
self.connect()
if self._redis:
return self._redis.set(key, value)
return False
def get_value(self, key: str) -> Optional[str]:
"""读取键对应的值"""
self.connect()
if self._redis:
return self._redis.get(key)
return None
def delete_key(self, key: str) -> int:
"""删除指定键"""
self.connect()
if self._redis:
return self._redis.delete(key)
return 0
def compare_and_set(self, key: str, expected_value: str, new_value: str) -> bool:
"""如果键的值等于 expected_value,则将其原子性地更新为 new_value"""
self.connect()
if self._redis:
while True:
try:
with self._redis.pipeline() as pipe:
pipe.watch(key)
current_value = pipe.get(key)
if current_value == expected_value:
print("原始值没有变动")
pipe.multi()
pipe.set(key, new_value)
pipe.execute()
return True
return False
except Exception as e:
print("wait,可能发生改变 try again", str(e))
continue
def close(self):
"""关闭 Redis 连接"""
if self._redis:
self._redis.close()
self._redis = None
# 示例使用
# 这里时同步调用方法
def main():
redis_handler = RedisHandler(password="123456")
redis_handler2 = RedisHandler(password="123456")
print("是否时单例模式:", redis_handler is redis_handler2)
redis_handler.set_value("test_key", "initial_value")
success = redis_handler.compare_and_set(
"test_key", "initial_value", "new value")
print(success) # 如果原值匹配,则返回 True,否则返回 False
value = redis_handler.get_value("test_key")
print(value) # 输出: new value
redis_handler.close()
if __name__ == "__main__":
main()
低于python3.12 版本 的
# 原来调用:
import aioredis
self._redis = await aioredis.from_url(self._redis_url, decode_responses=True)
# python 3.12以下版本 更改为:
import redis.asyncio as redis
self._redis = await redis.from_url(self._redis_url, decode_responses=True)
%77w%77%2E%66%6F%72%61%73%70%2E%63%6E
·上一篇:python单例异步调用redis >> ·下一篇:网站制作学习网