Elasticsearch的Python客户端封装

基本模块

import sys
import os
import time
import queue
import requests
import ujson
from pprint import pprint
from traceback import format_exc
from elasticsearch import Elasticsearch, helpers
from elasticsearch import exceptions as es_exceptions

 

连接池

class EsConnectionPool(object):
    def __init__(self, **kwargs):
        default_hosts = ['http://127.0.0.1:9200']
        self.nodes = kwargs.get('nodes', default_hosts)
        self.pool_size = kwargs.get('pool_size', 5)
        self.timeout = kwargs.get('timeout', 10)
        self.conn_queue = queue.Queue(maxsize=self.pool_size)
        for _ in range(self.pool_size):
            self.conn_queue.put(self.__create_new_conn())

    def __create_new_conn(self):
        conn = Elasticsearch(self.nodes, timeout=self.timeout)
        if conn.ping():
            return conn
        else:
            raise Exception('无法与es主机建立连接') 

    def get_conn(self):
        conn = self.conn_queue.get()
        if conn is None:
            conn = self.__create_new_conn()
        return conn

    def put_conn(self, conn):
        self.conn_queue.put(conn)

    def __del__(self):
        try:
            while True:
                conn = self.conn_queue.get_nowait()
                if conn:
                    conn.close()
        except queue.Empty:
            pass

 

常用搜索模块

class EsSearch:
    def __init__(self, client):
        self.client = client

    def dsl(
            self, index:str, body:dict, 
            request_timeout=100, scroll=False,
            raw_result=False, only_id = False, score_and_id=True,
            *args, **kwargs
        ):
        def process_reponse(response):
            total = 'None'
            if 'total' in response['hits']:
                total = response['hits']['total']['value']

            if only_id:
                ret_data = {}

                for item in response['hits']['hits']:
                    ret_data.update({item['_id']:item['_score']})

            else:
                ret_data = self.clean_response(response, score_and_id)

            if '_scroll_id' in response:
                return response['_scroll_id'], ret_data
            else:
                return total, ret_data

        if scroll:
            if 'scroll_id' not in body:
                response = self.client.search(
                            index=index,
                            body=body,
                            request_timeout=request_timeout,
                            scroll='1m',
                            *args, **kwargs,
                        )
            else:
                response = self.client.scroll(
                            body=body,
                            request_timeout=10,
                        )
        else:
            response = self.client.search(
                        index=index,
                        body=body,
                        request_timeout=request_timeout,
                        *args, **kwargs,
                    )

        if raw_result:
            return response
        else:
            return process_reponse(response)

    def keywords(self, index:str, field:str, value:str, size=0, source=[]):
        dsl = {
            "from": 0,
            "size": 3 if not size else size,
            'query':{
                'term':{
                    field: value
                }
            }
        }

        if source:
            dsl.update(_source=source)

        response = self.client.search(index=index, body=dsl)

        pprint(response["hits"]["hits"])

    def match(self, index:str, field:str, text:str, size=1, source=[]):
        dsl = {
            "from": 0,
            "size": 3 if not size else size,
            'query':{
                "match":{
                    field:text
                }
            }
        }

        if source:
            dsl.update(_source=source)

        response = self.client.search(index=index, body=dsl)

        pprint(response["hits"]["hits"])

    def tokenizer(self, text:str, index:str=None, analyzer:str=None, field:str=None):
        if index:
            if not field:
                print('通过具体索引的字段分词,您已选择了索引,还没有指定字段(field)。')
                return
            else:
                url = f'http://localhost:9200/{index}/_analyze?pretty'
                data = {
                    'text':text, 
                    'field': field
                }

        else:
            if not analyzer:
                print('没有选择分词器(analyzer)')
                return
            else:
                url = f'http://localhost:9200/_analyze?pretty'
                data = {
                    'text':text, 
                    'analyzer': analyzer
                }

        headers = {'Content-Type': 'application/json'}

        r = requests.post(url, data=ujson.dumps(data), headers=headers)

        data = r.json()
        if 'tokens' in data:
            return data['tokens']
        else:
            return False

    def clean_response(self, response, flag:bool=True):
        data = []

        for item in response["hits"]["hits"]:
            if flag:
                item["_source"]["es_score"] = item["_score"]
                item["_source"]["es_id"] = item["_id"]
            data.append(item["_source"])

        return data

 

运维相关

class EsHelper:
    def __init__(self, nodes:list=['http://127.0.0.1:9200'], pool=None, logger=None):
        self.pool = pool
        self.logger = logger
        self.nodes = nodes

        if not self.pool:
            assert isinstance(nodes, list)
            self.conn = Elasticsearch(self.nodes, timeout=10)

            if not self.conn.ping():
                raise Exception('无法与es集群建立连接')

    def create_rollover_index(self, settings:dict, mappings:dict, index:str, index_patterns:str, index_aliases:str):

        headers = {"Content-Type": "application/json"}

        url = f"{self.nodes[0]}/_template/template.{index}?pretty"
        data = {
            "index_patterns": index_patterns,
            "settings": settings,
            "mappings": mappings
        }
        response = requests.put(url, data=ujson.dumps(data), headers=headers).json()
        if 'acknowledged' in response:
            if response['acknowledged']:
                print('模板创建成功')
        else:
            print(f'模板创建失败\n{response.text}')
            return False

        url = f"{self.nodes[0]}/%3C{index}-%7Bnow%2Fd%7D-000001%3E/?pretty" 
        data = {
            "aliases": {
                f"{index_aliases}": {
                    "is_write_index": True
                    }
                }
            }
        response = requests.put(url, data=ujson.dumps(data), headers=headers).json()

        if 'acknowledged' in response:
            if response['acknowledged']:
                print('别名设置成功')
                return True
        else:
            if response['error']['root_cause'][0]['type'] != "resource_already_exists_exception":
                print('别名设置失败\n{response}')
                return False
            else:
                index = response['error']['root_cause'][0]['index']
                ans = input(f'索引({index})已存在,是否继续导入数据(y/n):')
                if ans.upper() == 'Y':
                    return True
                else:
                    return False

    def create_es_index(self, index_name, settings, mappings, aliases=None):
        success = 1
        if self.pool:
            conn = self.pool.get_conn()
        else:
            conn = self.conn

        body = {
            "settings": settings,
            "mappings": mappings
        }

        res = conn.indices.create(index=index_name, body=body, ignore=400)

        if "acknowledged" in res:
            print(f"已创建索引“{index_name}”\n")

            if aliases:
                url = 'http://localhost:9200/_aliases'
                headers = {'Content-Type': 'application/json'}
                data = {'actions': [{'add':{ 'index': index_name, 'alias': aliases}}]}
                r = requests.post(url, data=ujson.dumps(data), headers=headers)
                print(f'{index_name}成功设置别名:{aliases}\n')

        else:
            from pprint import pprint
            pprint(res)
            print(f"无法创建索引“{index_name}”,请检查。\n")
            success = 0

        return success

    def insert_init(self, size:int=500):
        self.bucket_ = []
        self.bulk_size = size
        self.bulk_total = 0

    def bulk_insert(self, data):
        try:
            if self.pool:
                conn = self.pool.get_conn()
            else:
                conn = self.conn

            success, failed = helpers.bulk(
                conn, data, 
                stats_only=True, chunk_size=500, request_timeout=30
            )
            self.bulk_total += len(data)

        except:
            print(format_exc())
            if self.logger:
                self.logger.error(format_exc())

        finally:
            if self.pool:
                self.pool.put_conn(conn)

    @property
    def bucket(self):
        if self.bucket_:
            if len(self.bucket_) == self.bulk_size or self.bucket_[-1] == None:
                self.bucket_.pop()
                if self.bucket_:
                    self.bulk_insert(self.bucket_)
                    self.bucket_ = []

        return self.bucket_

    @property
    def xsearch(self):
        try:
            if self.pool:
                conn = self.pool.get_conn()

            else:
                conn = self.conn

            return EsSearch(conn)

        except:
            raise Exception('es查询出错')

        finally:
            if self.pool:
                self.pool.put_conn(conn)

    def export_data(self, index:str, save_file_path:str=None, dsl:dict=None):
        import ujson
        from hashlib import md5

        if not save_file_path:
            from a import get_file_handle
            f, save_file_path = get_file_handle(1, f'{index}.txt')
        else:
            dir_ = os.path.splitext(save_file_path)[0]

            if not os.path.exists(dir_):
                os.makedirs(dir_)

            f = open(save_file_path, mode='w', encoding='utf-8')

        if not dsl:
            dsl = {
                'size': 1000,
                'query': {
                    'match_all':{}
                }
            }

        print(f'导出的数据保存在:{save_file_path}')

        msg_cnt = 0

        while 1:
            scroll_id, data = self.xsearch.dsl(index, dsl, scroll=True, score_and_id=False)
            dsl = {'scroll': '5m', 'scroll_id': scroll_id}

            if len(data) == 0:
                break

            for msg in data:
                # ----------- 修改这 -----------
                msg['_id'] = md5(msg["website"].strip().encode()).hexdigest()
                # -----------------------------

                f.write(f'{ujson.dumps(msg)}\n')

            msg_cnt += len(data)
            print(msg_cnt, end='\r')

        f.close()

    def reindex(self, src_index:str, dest_index:str):
        url = 'http://localhost:9200/_reindex?wait_for_completion=false&slices=6'

        headers = {'Content-Type': 'application/json'}

        data = {
            "source": {
                "index": src_index,
                "size": 2000
            },
            "dest": {
                "index": dest_index
            }
        }
        # try:
        r = requests.post(url, data=ujson.dumps(data), headers=headers)
        print(r.text)
        # except

 

使用示例

es = EsHelper(
    pool=EsConnectionPool(
        nodes=['http://localhost:9200'],
        pool_size=1,
        timeout=10
    )
)

analysis = {"filter": {}, "analyzer": {}}
settings = {"analysis": analysis, "index":{}}
mappings = {"properties": {}}

# rollover,需配合另外一个脚本,有时间再更新上来
index_patterns = f'example-*'
aliases = 'example.ready'
res = es.create_rollover_index(settings, mappings, index_patterns, aliases)

# reindex
src_index = 'src_index_name'
dest_index = 'dest_index_name'
es.reindex(src_index, dest_index)

# 查看分词
es.xsearch.tokenizer(query_text, analyzer='ik_smart')

# dsl查询
es.xsearch.dsl('index_name', dsl, only_id=True)

# 导出索引数据
es.export_data('index_name')

 

post
2020年12月26日 19:59 原创
post
2020年12月26日 19:59 原创
post
2021年2月24日 18:06 原创 草稿

记一次elasticsearch项目的部署

post
2021年2月24日 18:06 原创 草稿

记一次elasticsearch项目的部署

post
2021年3月25日 22:09 原创
post
2021年4月15日 20:59 原创
post
2021年4月18日 16:32 原创
post
2021年6月17日 16:16 原创
post
2021年7月9日 09:40 原创 草稿

针对情报平台的多种 es dsl 测试

post
2021年7月13日 09:36 原创
post
2021年7月30日 12:01 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:18 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 16:09 原创
post
2021年7月30日 16:02 原创
post
2021年7月30日 16:02 原创
post
2021年8月16日 15:28 原创
post
2021年8月16日 20:01
post
2021年8月17日 12:07 原创
post
2021年8月25日 16:11
post
2021年8月31日 15:42 原创
post
2021年10月8日 16:17
post
2021年10月13日 11:43
post
2021年10月21日 15:47 原创
post
2021年10月25日 11:27
post
2021年11月24日 16:45 原创
post
2023年8月18日 20:39 原创 未公开

本博客部署教程

post
2024年6月13日 09:47 原创
post
2024年7月4日 08:54 原创

0 评论

大哥整点话呗~