Hbase的Python客户端封装

  • 分类: Python 数据库
  • 发表日期:2021-07-30 15:07:00
  • 最后修改:2021-07-30 15:07:00

基本模块

import queue
import json
import happybase
from traceback import format_exc
from thriftpy2.transport.base import TTransportException

 

 

连接池

class HbaseConnectionPool():
    def __init__(self, **kwargs):
        self.hosts = kwargs.get('hosts', 'localhost')
        self.port = kwargs.get('port', 9090)
        self.table_prefix = kwargs.get('table_prefix', '')
        self.pool_size = kwargs.get('pool_size', 3)
        self.conn_queue = queue.Queue(maxsize=self.pool_size)
        for _ in range(self.pool_size):
            self.conn_queue.put(self._create_new_conn())

    def _create_new_conn(self):
        conn = happybase.Connection(
            host=self.hosts,
            port=self.port,
            table_prefix=self.table_prefix,
            table_prefix_separator=b'_',
            protocol='compact', 
            transport='framed'
        )
        return conn

    def get_conn(self):
        conn = self.conn_queue.get()
        if conn is None:
            conn = self._create_new_conn()
        return conn

    def put_conn(self, conn):
        self.conn_queue.put(conn)

    def __del__(self):
        try:
            while True:
                conn = self.conn_queue.get_nowait()
                if conn:
                    conn.close()
        except queue.Empty:
            pass

 

CRUD封装

class HbaseHelper:
    def __init__(self, hosts='localhost', port=9090, table_prefix='', pool=None, logger=None):
        self.pool = None
        self.conn = None
        hosts
        self.logger = logger
        self.batch_total = 0
        self.table_prefix = table_prefix
        if pool:
            self.pool = pool
        else:
            self.conn = happybase.Connection(
                host=hosts,
                port=port,
                table_prefix=self.table_prefix,
                table_prefix_separator=b'_',
                protocol='compact', 
                transport='framed'
            )

    def insert_init(self, table, size:int=500):
        self.bucket_ = []
        self.bulk_size = size
        if self.pool:
            self.conn = self.pool.get_conn()
        self.batch = happybase.Table(f"{self.table_prefix}_{table}", self.conn).batch()

    def insert_done(self):
        self.pool.put_conn(self.conn)

    def bulk_insert(self, data):
        try:
            for d in data:
                self.batch.put(d[0], d[1])
            self.batch.send()
            self.batch_total += self.bulk_size
        except:
            if self.logger:
                self.logger.error(format_exc())

    @property
    def bucket(self):
        if len(self.bucket_) == self.bulk_size:
            self.bulk_insert(self.bucket_)
            self.bucket_ = []
        return self.bucket_

    def gets(self, table, rowkeys, columns=None):
        contents = {}

        if self.pool:
            self.conn = self.pool.get_conn()
        try:
            table_opr = happybase.Table(table, self.conn)
            res = table_opr.rows(rowkeys, columns=columns)
        except TTransportException:
            self.conn = self.pool._create_new_conn()
            table_opr = happybase.Table(table, self.conn)
            res = table_opr.rows(rowkeys, columns=columns)

        for r in res:
            rowkey, value = r[0], r[1]
            rowkey = str(rowkey, encoding="utf-8")
            value = {str(k, encoding="utf-8").split(':')[1]:json.loads(str(v, encoding="utf-8")) for k,v in value.items()}
            contents[rowkey] = value

        if self.pool:
            self.pool.put_conn(self.conn)

        return contents

 

post
2020年12月26日 19:59 原创
post
2021年2月24日 18:06 原创 草稿

记一次elasticsearch项目的部署

post
2021年3月25日 22:09 原创
post
2021年4月15日 20:59 原创
post
2021年4月18日 16:32 原创
post
2021年6月17日 16:16 原创
post
2021年7月9日 09:40 原创 草稿

针对情报平台的多种 es dsl 测试

post
2021年7月13日 09:36 原创
post
2021年7月30日 12:01 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:18 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 16:09 原创
post
2021年7月30日 16:02 原创
post
2021年7月30日 16:02 原创
post
2021年8月16日 15:28 原创
post
2021年8月16日 20:01
post
2021年8月17日 12:07 原创
post
2021年8月25日 16:11
post
2021年8月31日 15:42 原创
post
2021年10月8日 16:17
post
2021年10月13日 11:43
post
2021年10月21日 15:47 原创
post
2021年10月25日 11:27
post
2021年11月24日 16:45 原创
post
2023年8月18日 20:39 原创 未公开

本博客部署教程

post
2024年6月13日 09:47 原创

0 评论

大哥整点话呗~