使用Python操作Redis

  • 分类: Python 数据库
  • 发表日期:2021-07-30 16:36:00
  • 最后修改:2021-09-03 18:09:00

直接上代码:

import redis
from typing import Optional


class RedisHelper:
    HOST = "localhost"
    PORT = 6379
    DB = 0
    MAXCON = 3

    @classmethod
    def create_pool(self):
        return redis.ConnectionPool(
                      host=self.HOST,
                      port=self.PORT,
                      db=self.DB,
                      max_connections=self.MAXCON,
                      decode_responses=True)

    @property
    def ok(self):
        try:
            ok = redis.Redis(host=self.HOST, port=self.PORT).ping()
        except:
            ok = False
        return ok


class XRedis(redis.StrictRedis):
    _instance = None

    def __new__(cls, *args, **kw):
        if cls._instance is None:
            cls._instance = super(XRedis, cls).__new__(cls)
        return cls._instance

    def __init__(
            self, host='127.0.0.1', port=6379, db=7, pool:Optional[redis.ConnectionPool]=None,
        ):
        self.host, self.port, self.db = host, port, db
        self.pool = pool

        if pool:
            super(XRedis, self).__init__(connection_pool=pool)
        else:
            super(XRedis, self).__init__(host=host, port=port, db=db)

    @property
    def ok(self):
        try:
            return self.ping()
        except:
            return False

    def flushdb(self):
        ans = input(f'Delete all data from the current database ({self.db})? (y/n): ')
        if ans.upper() == 'Y':
            super().flushdb()
            print('delete\n')

    def flushall(self):
        ans = input(f'Delete all database data? (y/n): ')
        if ans.upper() == 'Y':
            super().flushall()
            print('deleted\n')


r = XRedis(pool=RedisHelper.create_pool())

如上代码建议拷贝到新建文件,作为模块引入。

如果不需要使用连接池,那么直接从需要的地方导入XRedis类,填入连接参数实例化即可。

如果需要使用连接池,建议模块中实例化(68行),再从需要的地方导入该实例对象(r)

 

extra:

redis.ConnectionPool实例化的时候其实并没有建立真实的redis连接,只是初始化了连接池的相关参数而已。

如果是使用StrictRedis,那么即使没有使用连接池,它自身也会创建。源码:

class Redis(object):
    def __init__(...):
        if not connection_pool:
            ...
            connection_pool = ConnectionPool(**kwargs)
        ...

StrictRedis = Redis

真正的连接操作其实是在执行动作的时候,此时redis将必然创建连接(连接池为空),在动作执行完成后,再将连接放入连接池,再下一次动作执行时,会优先从连接池中取已经创建好的连接。

def ping(self, message=None):
    """
    Ping the Redis server
    """
    message = '' if message is None else message
    return self.execute_command('PING', message)
def execute_command(self, *args, **options):
    "Execute a command and return a parsed response"
    pool = self.connection_pool
    command_name = args[0]
    conn = self.connection or pool.get_connection(command_name, **options)
    try:
        conn.send_command(*args)
        return self.parse_response(conn, command_name, **options)
    except (ConnectionError, TimeoutError) as e:
        conn.disconnect()
        if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
            raise
        conn.send_command(*args)
        return self.parse_response(conn, command_name, **options)
    finally:
        if not self.connection:
            pool.release(conn)
def get_connection(self, command_name, *keys, **options):
    "Get a connection from the pool"
    self._checkpid()
    try:
        connection = self._available_connections.pop()
    except IndexError:
        connection = self.make_connection()
    self._in_use_connections.add(connection)
    return connection

ConnectionPool通过管理可用连接池列表(_available_connections)正在使用的连接列表(_in_use_connections)从而实现连接池管理。

 

post
2020年12月26日 19:59 原创
post
2021年2月24日 18:06 原创 草稿

记一次elasticsearch项目的部署

post
2021年3月25日 22:09 原创
post
2021年4月15日 20:59 原创
post
2021年4月18日 16:32 原创
post
2021年6月17日 16:16 原创
post
2021年7月9日 09:40 原创 草稿

针对情报平台的多种 es dsl 测试

post
2021年7月13日 09:36 原创
post
2021年7月30日 12:01 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:18 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 16:09 原创
post
2021年8月16日 15:28 原创
post
2021年8月16日 20:01
post
2021年8月17日 12:07 原创
post
2021年8月25日 16:11
post
2021年8月31日 15:42 原创
post
2021年10月8日 16:17
post
2021年10月13日 11:43
post
2021年10月21日 15:47 原创
post
2021年10月25日 11:27
post
2021年11月24日 16:45 原创

0 评论

大哥整点话呗~