MongoDB的Python客户端封装

  • 分类: Python 数据库
  • 发表日期:2021-07-30 15:24:00
  • 最后修改:2021-07-30 15:24:00

基本模块

import pymongo
import sys
import queue
from bson.objectid import ObjectId
from pymongo import UpdateOne

 

连接池

class MongoConnectionPool(object):
    def __init__(self, params):
        if 'db' not in params:
            raise Exception('未指定数据库')

        self.db = params['db']
        self.host = 'localhost' if 'host' not in params else params['host']
        self.port = '27017' if 'port' not in params else params['port']
        self.username = '' if 'username' not in params else params['username']
        self.password = '' if 'password' not in params else params['password']
        self.pool_size = 5 if 'pool_size' not in params else params['pool_size']

        self.conn_queue = queue.Queue(maxsize=self.pool_size)
        for _ in range(self.pool_size):
            self.conn_queue.put(self.__create_new_conn())

    def __create_new_conn(self):
        if self.username and self.password:
            url = f'mongodb://{self.username}:{self.password}@{self.host}:{self.port}/{self.db}'
        else:
            url = f'mongodb://{self.host}:{self.port}/{self.db}'

        return pymongo.MongoClient(url)[self.db]

    def get_conn(self):
        conn = self.conn_queue.get()
        if conn is None:
            conn = self.__create_new_conn()
        return conn

    def put_conn(self, conn):
        self.conn_queue.put(conn)

 

CRUD封装

class MongoHelper:
    def __init__(self, params:dict=None, pool:MongoConnectionPool=None):
        if pool:
            self.pool = pool

        else:
            if 'db' not in params:
                raise Exception('未指定数据库')

            db = params['db']
            host = 'localhost' if 'host' not in params else params['host']
            port = '27017' if 'port' not in params else params['port']
            username = '' if 'username' not in params else params['username']
            password = '' if 'password' not in params else params['password']

            if username and password:
                url = f'mongodb://{username}:{password}@{host}:{port}/{db}'
                self.client = pymongo.MongoClient(url)[db]
                # __url = f'mongodb://{username}:{password}@{host}:{port}/'
                # self.client = pymongo.MongoClient(url)[db]
            else:
                url = f'mongodb://{host}:{port}/{db}'
                self.client = pymongo.MongoClient(url)[db]

    def insert_init(self, collection, size=500):
        self.collection = collection
        self.bucket_ = []
        self.bulk_size = size
        self.bulk_total = 0

    @property
    def bucket(self):
        if self.bucket_:
            if len(self.bucket_) == self.bulk_size or self.bucket_[-1] == None:
                self.bucket_.pop()
                data = []
                for i in self.bucket_:
                    data.append(
                        UpdateOne(
                            {'_id': i[0]}, 
                            {'$set': i[1]}, 
                            upsert=True
                        )
                    )
                if data:
                    self.bulk_write(self.collection, data)
                    self.bulk_total += len(data)
                    self.bucket_ = []

        return self.bucket_

    def insert_one(self, collection, doc):
        if self.pool:
            client = self.pool.get_conn()
            data = client[collection].insert_one(doc).inserted_id
            self.pool.put_conn(client)
            return data
        else:
            return self.client[collection].insert_one(doc).inserted_id

    def insert_many(self, collection, docs=None, update=True):
        if self.pool:
            client = self.pool.get_conn()
            data = client[collection].insert_many(docs).inserted_ids
            self.pool.put_conn(client)
            return data
        else:
            return self.client[collection].insert_many(docs).inserted_ids

    def delete_one(self, collection, dsl):
        if self.pool:
            client = self.pool.get_conn()
            data = client[collection].delete_one(dsl)
            self.pool.put_conn(client)
            return data
        else:
            return self.client[collection].delete_one(dsl)

    def delete_many(self, collection, dsl):
        if self.pool:
            client = self.pool.get_conn()
            data = client[collection].delete_many(dsl)
            self.pool.put_conn(client)
            return data
        else:
            return self.client[collection].delete_many(dsl)

    def update_one(self, collection, dsl, col, upsert=False):
        if self.pool:
            client = self.pool.get_conn()
            data = client[collection].update_one(dsl, col, upsert=upsert)
            self.pool.put_conn(client)
            return data
        else:
            return self.client[collection].update_one(dsl, col, upsert=upsert)

    def update_many(self, collection, dsl, col):
        if self.pool:
            client = self.pool.get_conn()
            data = client[collection].update_many(dsl, col)
            self.pool.put_conn(client)
            return data
        else:
            return self.client[collection].update_many(dsl, col)

    def find_one(self, collection:str, dsl:dict=None, ret:dict=None,):
        if self.pool:
            client = self.pool.get_conn()

            if ret:
                data = client[collection].find_one(dsl, ret)
            else:
                data = client[collection].find_one(dsl)

            self.pool.put_conn(client)

            return data

        else:
            if ret:
                return self.client[collection].find_one(dsl, ret)
            else:
                return self.client[collection].find_one(dsl)

    def find(self, collection, *args, **kwargs):
        if self.pool:
            client = self.pool.get_conn()
        else:
            client = self.client

        res = client[collection].find(*args, **kwargs)

        if 'sort' in kwargs:
            res = res.sort(kwargs['sort'])

        if 'limit' in kwargs:
            res = res.limit(kwargs['limit'])

        if self.pool:
            self.pool.put_conn(client)

        return [i for i in res]

    def bulk_write(self, collection, requests, ordered=True):
        # TODO: insert_many? bulk_write?
        if self.pool:
            client = self.pool.get_conn()
            data = client[collection].bulk_write(requests, ordered=ordered)
            self.pool.put_conn(client)
            return data
        else:
            return self.client[collection].bulk_write(requests, ordered=ordered)

    def drop(self, collection):
        if self.pool:
            client = self.pool.get_conn()
            data = client[collection].drop()
            self.pool.put_conn(client)
            return data
        else:
            return self.client[collection].drop()

    def list_collections(self):
        if self.pool:
            client = self.pool.get_conn()
            data = client.list_collections()
            self.pool.put_conn(client)
            return data
        else:
            return self.client.list_collections()

    def close(self):
        if self.pool:
            client = self.pool.get_conn()
            return client.close()
        else:
            return self.client.close()

 

使用实例

mongo_params = {
        'db': 'bad_gang',
        'host': 'localhost',
        'port': 20182,
        'pool_size': 1
    }

    mongo = MongoHelper(
        pool=MongoConnectionPool(mongo_params)
    )

    v = mongo.find('data', {"gang_id":"37b955f0-47a2-4139-83de-1cf597a9b842"}, {"websites":{"$slice":[0,3]}, "total":0, "ct":0}, limit=3, sort=[('_id', -1)])
    print(v)

 

post
2020年12月26日 19:59 原创
post
2021年2月24日 18:06 原创 草稿

记一次elasticsearch项目的部署

post
2021年3月25日 22:09 原创
post
2021年4月15日 20:59 原创
post
2021年4月18日 16:32 原创
post
2021年6月17日 16:16 原创
post
2021年7月9日 09:40 原创 草稿

针对情报平台的多种 es dsl 测试

post
2021年7月13日 09:36 原创
post
2021年7月30日 12:01 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:18 原创
post
2021年7月30日 16:09 原创
post
2021年7月30日 16:02 原创
post
2021年7月30日 16:02 原创
post
2021年8月16日 15:28 原创
post
2021年8月16日 20:01
post
2021年8月17日 12:07 原创
post
2021年8月25日 16:11
post
2021年8月31日 15:42 原创
post
2021年10月8日 16:17
post
2021年10月13日 11:43
post
2021年10月21日 15:47 原创
post
2021年10月25日 11:27
post
2021年11月24日 16:45 原创
post
2023年8月18日 20:39 原创 未公开

本博客部署教程

post
2024年6月13日 09:47 原创

0 评论

大哥整点话呗~