基本模块
import pymongo
import sys
import queue
from bson.objectid import ObjectId
from pymongo import UpdateOne
连接池
class MongoConnectionPool(object):
def __init__(self, params):
if 'db' not in params:
raise Exception('未指定数据库')
self.db = params['db']
self.host = 'localhost' if 'host' not in params else params['host']
self.port = '27017' if 'port' not in params else params['port']
self.username = '' if 'username' not in params else params['username']
self.password = '' if 'password' not in params else params['password']
self.pool_size = 5 if 'pool_size' not in params else params['pool_size']
self.conn_queue = queue.Queue(maxsize=self.pool_size)
for _ in range(self.pool_size):
self.conn_queue.put(self.__create_new_conn())
def __create_new_conn(self):
if self.username and self.password:
url = f'mongodb://{self.username}:{self.password}@{self.host}:{self.port}/{self.db}'
else:
url = f'mongodb://{self.host}:{self.port}/{self.db}'
return pymongo.MongoClient(url)[self.db]
def get_conn(self):
conn = self.conn_queue.get()
if conn is None:
conn = self.__create_new_conn()
return conn
def put_conn(self, conn):
self.conn_queue.put(conn)
CRUD封装
class MongoHelper:
def __init__(self, params:dict=None, pool:MongoConnectionPool=None):
if pool:
self.pool = pool
else:
if 'db' not in params:
raise Exception('未指定数据库')
db = params['db']
host = 'localhost' if 'host' not in params else params['host']
port = '27017' if 'port' not in params else params['port']
username = '' if 'username' not in params else params['username']
password = '' if 'password' not in params else params['password']
if username and password:
url = f'mongodb://{username}:{password}@{host}:{port}/{db}'
self.client = pymongo.MongoClient(url)[db]
# __url = f'mongodb://{username}:{password}@{host}:{port}/'
# self.client = pymongo.MongoClient(url)[db]
else:
url = f'mongodb://{host}:{port}/{db}'
self.client = pymongo.MongoClient(url)[db]
def insert_init(self, collection, size=500):
self.collection = collection
self.bucket_ = []
self.bulk_size = size
self.bulk_total = 0
@property
def bucket(self):
if self.bucket_:
if len(self.bucket_) == self.bulk_size or self.bucket_[-1] == None:
self.bucket_.pop()
data = []
for i in self.bucket_:
data.append(
UpdateOne(
{'_id': i[0]},
{'$set': i[1]},
upsert=True
)
)
if data:
self.bulk_write(self.collection, data)
self.bulk_total += len(data)
self.bucket_ = []
return self.bucket_
def insert_one(self, collection, doc):
if self.pool:
client = self.pool.get_conn()
data = client[collection].insert_one(doc).inserted_id
self.pool.put_conn(client)
return data
else:
return self.client[collection].insert_one(doc).inserted_id
def insert_many(self, collection, docs=None, update=True):
if self.pool:
client = self.pool.get_conn()
data = client[collection].insert_many(docs).inserted_ids
self.pool.put_conn(client)
return data
else:
return self.client[collection].insert_many(docs).inserted_ids
def delete_one(self, collection, dsl):
if self.pool:
client = self.pool.get_conn()
data = client[collection].delete_one(dsl)
self.pool.put_conn(client)
return data
else:
return self.client[collection].delete_one(dsl)
def delete_many(self, collection, dsl):
if self.pool:
client = self.pool.get_conn()
data = client[collection].delete_many(dsl)
self.pool.put_conn(client)
return data
else:
return self.client[collection].delete_many(dsl)
def update_one(self, collection, dsl, col, upsert=False):
if self.pool:
client = self.pool.get_conn()
data = client[collection].update_one(dsl, col, upsert=upsert)
self.pool.put_conn(client)
return data
else:
return self.client[collection].update_one(dsl, col, upsert=upsert)
def update_many(self, collection, dsl, col):
if self.pool:
client = self.pool.get_conn()
data = client[collection].update_many(dsl, col)
self.pool.put_conn(client)
return data
else:
return self.client[collection].update_many(dsl, col)
def find_one(self, collection:str, dsl:dict=None, ret:dict=None,):
if self.pool:
client = self.pool.get_conn()
if ret:
data = client[collection].find_one(dsl, ret)
else:
data = client[collection].find_one(dsl)
self.pool.put_conn(client)
return data
else:
if ret:
return self.client[collection].find_one(dsl, ret)
else:
return self.client[collection].find_one(dsl)
def find(self, collection, *args, **kwargs):
if self.pool:
client = self.pool.get_conn()
else:
client = self.client
res = client[collection].find(*args, **kwargs)
if 'sort' in kwargs:
res = res.sort(kwargs['sort'])
if 'limit' in kwargs:
res = res.limit(kwargs['limit'])
if self.pool:
self.pool.put_conn(client)
return [i for i in res]
def bulk_write(self, collection, requests, ordered=True):
# TODO: insert_many? bulk_write?
if self.pool:
client = self.pool.get_conn()
data = client[collection].bulk_write(requests, ordered=ordered)
self.pool.put_conn(client)
return data
else:
return self.client[collection].bulk_write(requests, ordered=ordered)
def drop(self, collection):
if self.pool:
client = self.pool.get_conn()
data = client[collection].drop()
self.pool.put_conn(client)
return data
else:
return self.client[collection].drop()
def list_collections(self):
if self.pool:
client = self.pool.get_conn()
data = client.list_collections()
self.pool.put_conn(client)
return data
else:
return self.client.list_collections()
def close(self):
if self.pool:
client = self.pool.get_conn()
return client.close()
else:
return self.client.close()
使用实例
mongo_params = {
'db': 'bad_gang',
'host': 'localhost',
'port': 20182,
'pool_size': 1
}
mongo = MongoHelper(
pool=MongoConnectionPool(mongo_params)
)
v = mongo.find('data', {"gang_id":"37b955f0-47a2-4139-83de-1cf597a9b842"}, {"websites":{"$slice":[0,3]}, "total":0, "ct":0}, limit=3, sort=[('_id', -1)])
print(v)
0 评论
大哥整点话呗~