基本模块
import queue
import json
import happybase
from traceback import format_exc
from thriftpy2.transport.base import TTransportException
连接池
class HbaseConnectionPool():
def __init__(self, **kwargs):
self.hosts = kwargs.get('hosts', 'localhost')
self.port = kwargs.get('port', 9090)
self.table_prefix = kwargs.get('table_prefix', '')
self.pool_size = kwargs.get('pool_size', 3)
self.conn_queue = queue.Queue(maxsize=self.pool_size)
for _ in range(self.pool_size):
self.conn_queue.put(self._create_new_conn())
def _create_new_conn(self):
conn = happybase.Connection(
host=self.hosts,
port=self.port,
table_prefix=self.table_prefix,
table_prefix_separator=b'_',
protocol='compact',
transport='framed'
)
return conn
def get_conn(self):
conn = self.conn_queue.get()
if conn is None:
conn = self._create_new_conn()
return conn
def put_conn(self, conn):
self.conn_queue.put(conn)
def __del__(self):
try:
while True:
conn = self.conn_queue.get_nowait()
if conn:
conn.close()
except queue.Empty:
pass
CRUD封装
class HbaseHelper:
def __init__(self, hosts='localhost', port=9090, table_prefix='', pool=None, logger=None):
self.pool = None
self.conn = None
hosts
self.logger = logger
self.batch_total = 0
self.table_prefix = table_prefix
if pool:
self.pool = pool
else:
self.conn = happybase.Connection(
host=hosts,
port=port,
table_prefix=self.table_prefix,
table_prefix_separator=b'_',
protocol='compact',
transport='framed'
)
def insert_init(self, table, size:int=500):
self.bucket_ = []
self.bulk_size = size
if self.pool:
self.conn = self.pool.get_conn()
self.batch = happybase.Table(f"{self.table_prefix}_{table}", self.conn).batch()
def insert_done(self):
self.pool.put_conn(self.conn)
def bulk_insert(self, data):
try:
for d in data:
self.batch.put(d[0], d[1])
self.batch.send()
self.batch_total += self.bulk_size
except:
if self.logger:
self.logger.error(format_exc())
@property
def bucket(self):
if len(self.bucket_) == self.bulk_size:
self.bulk_insert(self.bucket_)
self.bucket_ = []
return self.bucket_
def gets(self, table, rowkeys, columns=None):
contents = {}
if self.pool:
self.conn = self.pool.get_conn()
try:
table_opr = happybase.Table(table, self.conn)
res = table_opr.rows(rowkeys, columns=columns)
except TTransportException:
self.conn = self.pool._create_new_conn()
table_opr = happybase.Table(table, self.conn)
res = table_opr.rows(rowkeys, columns=columns)
for r in res:
rowkey, value = r[0], r[1]
rowkey = str(rowkey, encoding="utf-8")
value = {str(k, encoding="utf-8").split(':')[1]:json.loads(str(v, encoding="utf-8")) for k,v in value.items()}
contents[rowkey] = value
if self.pool:
self.pool.put_conn(self.conn)
return contents
0 评论
大哥整点话呗~