直接上代码:
import redis
from typing import Optional
class RedisHelper:
HOST = "localhost"
PORT = 6379
DB = 0
MAXCON = 3
@classmethod
def create_pool(self):
return redis.ConnectionPool(
host=self.HOST,
port=self.PORT,
db=self.DB,
max_connections=self.MAXCON,
decode_responses=True)
@property
def ok(self):
try:
ok = redis.Redis(host=self.HOST, port=self.PORT).ping()
except:
ok = False
return ok
class XRedis(redis.StrictRedis):
_instance = None
def __new__(cls, *args, **kw):
if cls._instance is None:
cls._instance = super(XRedis, cls).__new__(cls)
return cls._instance
def __init__(
self, host='127.0.0.1', port=6379, db=7, pool:Optional[redis.ConnectionPool]=None,
):
self.host, self.port, self.db = host, port, db
self.pool = pool
if pool:
super(XRedis, self).__init__(connection_pool=pool)
else:
super(XRedis, self).__init__(host=host, port=port, db=db)
@property
def ok(self):
try:
return self.ping()
except:
return False
def flushdb(self):
ans = input(f'Delete all data from the current database ({self.db})? (y/n): ')
if ans.upper() == 'Y':
super().flushdb()
print('delete\n')
def flushall(self):
ans = input(f'Delete all database data? (y/n): ')
if ans.upper() == 'Y':
super().flushall()
print('deleted\n')
r = XRedis(pool=RedisHelper.create_pool())
如上代码建议拷贝到新建文件,作为模块引入。
如果不需要使用连接池,那么直接从需要的地方导入XRedis类,填入连接参数实例化即可。
如果需要使用连接池,建议模块中实例化(68行),再从需要的地方导入该实例对象(r)
extra:
redis.ConnectionPool实例化的时候其实并没有建立真实的redis连接,只是初始化了连接池的相关参数而已。
如果是使用StrictRedis,那么即使没有使用连接池,它自身也会创建。源码:
class Redis(object):
def __init__(...):
if not connection_pool:
...
connection_pool = ConnectionPool(**kwargs)
...
StrictRedis = Redis
真正的连接操作其实是在执行动作的时候,此时redis将必然创建连接(连接池为空),在动作执行完成后,再将连接放入连接池,再下一次动作执行时,会优先从连接池中取已经创建好的连接。
def ping(self, message=None):
"""
Ping the Redis server
"""
message = '' if message is None else message
return self.execute_command('PING', message)
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
conn = self.connection or pool.get_connection(command_name, **options)
try:
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
except (ConnectionError, TimeoutError) as e:
conn.disconnect()
if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
raise
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
finally:
if not self.connection:
pool.release(conn)
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
return connection
ConnectionPool通过管理可用连接池列表(_available_connections)和正在使用的连接列表(_in_use_connections)从而实现连接池管理。
0 评论
大哥整点话呗~