基本模块
import sys
import os
import time
import queue
import requests
import ujson
from pprint import pprint
from traceback import format_exc
from elasticsearch import Elasticsearch, helpers
from elasticsearch import exceptions as es_exceptions
连接池
class EsConnectionPool(object):
def __init__(self, **kwargs):
default_hosts = ['http://127.0.0.1:9200']
self.nodes = kwargs.get('nodes', default_hosts)
self.pool_size = kwargs.get('pool_size', 5)
self.timeout = kwargs.get('timeout', 10)
self.conn_queue = queue.Queue(maxsize=self.pool_size)
for _ in range(self.pool_size):
self.conn_queue.put(self.__create_new_conn())
def __create_new_conn(self):
conn = Elasticsearch(self.nodes, timeout=self.timeout)
if conn.ping():
return conn
else:
raise Exception('无法与es主机建立连接')
def get_conn(self):
conn = self.conn_queue.get()
if conn is None:
conn = self.__create_new_conn()
return conn
def put_conn(self, conn):
self.conn_queue.put(conn)
def __del__(self):
try:
while True:
conn = self.conn_queue.get_nowait()
if conn:
conn.close()
except queue.Empty:
pass
常用搜索模块
class EsSearch:
def __init__(self, client):
self.client = client
def dsl(
self, index:str, body:dict,
request_timeout=100, scroll=False,
raw_result=False, only_id = False, score_and_id=True,
*args, **kwargs
):
def process_reponse(response):
total = 'None'
if 'total' in response['hits']:
total = response['hits']['total']['value']
if only_id:
ret_data = {}
for item in response['hits']['hits']:
ret_data.update({item['_id']:item['_score']})
else:
ret_data = self.clean_response(response, score_and_id)
if '_scroll_id' in response:
return response['_scroll_id'], ret_data
else:
return total, ret_data
if scroll:
if 'scroll_id' not in body:
response = self.client.search(
index=index,
body=body,
request_timeout=request_timeout,
scroll='1m',
*args, **kwargs,
)
else:
response = self.client.scroll(
body=body,
request_timeout=10,
)
else:
response = self.client.search(
index=index,
body=body,
request_timeout=request_timeout,
*args, **kwargs,
)
if raw_result:
return response
else:
return process_reponse(response)
def keywords(self, index:str, field:str, value:str, size=0, source=[]):
dsl = {
"from": 0,
"size": 3 if not size else size,
'query':{
'term':{
field: value
}
}
}
if source:
dsl.update(_source=source)
response = self.client.search(index=index, body=dsl)
pprint(response["hits"]["hits"])
def match(self, index:str, field:str, text:str, size=1, source=[]):
dsl = {
"from": 0,
"size": 3 if not size else size,
'query':{
"match":{
field:text
}
}
}
if source:
dsl.update(_source=source)
response = self.client.search(index=index, body=dsl)
pprint(response["hits"]["hits"])
def tokenizer(self, text:str, index:str=None, analyzer:str=None, field:str=None):
if index:
if not field:
print('通过具体索引的字段分词,您已选择了索引,还没有指定字段(field)。')
return
else:
url = f'http://localhost:9200/{index}/_analyze?pretty'
data = {
'text':text,
'field': field
}
else:
if not analyzer:
print('没有选择分词器(analyzer)')
return
else:
url = f'http://localhost:9200/_analyze?pretty'
data = {
'text':text,
'analyzer': analyzer
}
headers = {'Content-Type': 'application/json'}
r = requests.post(url, data=ujson.dumps(data), headers=headers)
data = r.json()
if 'tokens' in data:
return data['tokens']
else:
return False
def clean_response(self, response, flag:bool=True):
data = []
for item in response["hits"]["hits"]:
if flag:
item["_source"]["es_score"] = item["_score"]
item["_source"]["es_id"] = item["_id"]
data.append(item["_source"])
return data
运维相关
class EsHelper:
def __init__(self, nodes:list=['http://127.0.0.1:9200'], pool=None, logger=None):
self.pool = pool
self.logger = logger
self.nodes = nodes
if not self.pool:
assert isinstance(nodes, list)
self.conn = Elasticsearch(self.nodes, timeout=10)
if not self.conn.ping():
raise Exception('无法与es集群建立连接')
def create_rollover_index(self, settings:dict, mappings:dict, index:str, index_patterns:str, index_aliases:str):
headers = {"Content-Type": "application/json"}
url = f"{self.nodes[0]}/_template/template.{index}?pretty"
data = {
"index_patterns": index_patterns,
"settings": settings,
"mappings": mappings
}
response = requests.put(url, data=ujson.dumps(data), headers=headers).json()
if 'acknowledged' in response:
if response['acknowledged']:
print('模板创建成功')
else:
print(f'模板创建失败\n{response.text}')
return False
url = f"{self.nodes[0]}/%3C{index}-%7Bnow%2Fd%7D-000001%3E/?pretty"
data = {
"aliases": {
f"{index_aliases}": {
"is_write_index": True
}
}
}
response = requests.put(url, data=ujson.dumps(data), headers=headers).json()
if 'acknowledged' in response:
if response['acknowledged']:
print('别名设置成功')
return True
else:
if response['error']['root_cause'][0]['type'] != "resource_already_exists_exception":
print('别名设置失败\n{response}')
return False
else:
index = response['error']['root_cause'][0]['index']
ans = input(f'索引({index})已存在,是否继续导入数据(y/n):')
if ans.upper() == 'Y':
return True
else:
return False
def create_es_index(self, index_name, settings, mappings, aliases=None):
success = 1
if self.pool:
conn = self.pool.get_conn()
else:
conn = self.conn
body = {
"settings": settings,
"mappings": mappings
}
res = conn.indices.create(index=index_name, body=body, ignore=400)
if "acknowledged" in res:
print(f"已创建索引“{index_name}”\n")
if aliases:
url = 'http://localhost:9200/_aliases'
headers = {'Content-Type': 'application/json'}
data = {'actions': [{'add':{ 'index': index_name, 'alias': aliases}}]}
r = requests.post(url, data=ujson.dumps(data), headers=headers)
print(f'{index_name}成功设置别名:{aliases}\n')
else:
from pprint import pprint
pprint(res)
print(f"无法创建索引“{index_name}”,请检查。\n")
success = 0
return success
def insert_init(self, size:int=500):
self.bucket_ = []
self.bulk_size = size
self.bulk_total = 0
def bulk_insert(self, data):
try:
if self.pool:
conn = self.pool.get_conn()
else:
conn = self.conn
success, failed = helpers.bulk(
conn, data,
stats_only=True, chunk_size=500, request_timeout=30
)
self.bulk_total += len(data)
except:
print(format_exc())
if self.logger:
self.logger.error(format_exc())
finally:
if self.pool:
self.pool.put_conn(conn)
@property
def bucket(self):
if self.bucket_:
if len(self.bucket_) == self.bulk_size or self.bucket_[-1] == None:
self.bucket_.pop()
if self.bucket_:
self.bulk_insert(self.bucket_)
self.bucket_ = []
return self.bucket_
@property
def xsearch(self):
try:
if self.pool:
conn = self.pool.get_conn()
else:
conn = self.conn
return EsSearch(conn)
except:
raise Exception('es查询出错')
finally:
if self.pool:
self.pool.put_conn(conn)
def export_data(self, index:str, save_file_path:str=None, dsl:dict=None):
import ujson
from hashlib import md5
if not save_file_path:
from a import get_file_handle
f, save_file_path = get_file_handle(1, f'{index}.txt')
else:
dir_ = os.path.splitext(save_file_path)[0]
if not os.path.exists(dir_):
os.makedirs(dir_)
f = open(save_file_path, mode='w', encoding='utf-8')
if not dsl:
dsl = {
'size': 1000,
'query': {
'match_all':{}
}
}
print(f'导出的数据保存在:{save_file_path}')
msg_cnt = 0
while 1:
scroll_id, data = self.xsearch.dsl(index, dsl, scroll=True, score_and_id=False)
dsl = {'scroll': '5m', 'scroll_id': scroll_id}
if len(data) == 0:
break
for msg in data:
# ----------- 修改这 -----------
msg['_id'] = md5(msg["website"].strip().encode()).hexdigest()
# -----------------------------
f.write(f'{ujson.dumps(msg)}\n')
msg_cnt += len(data)
print(msg_cnt, end='\r')
f.close()
def reindex(self, src_index:str, dest_index:str):
url = 'http://localhost:9200/_reindex?wait_for_completion=false&slices=6'
headers = {'Content-Type': 'application/json'}
data = {
"source": {
"index": src_index,
"size": 2000
},
"dest": {
"index": dest_index
}
}
# try:
r = requests.post(url, data=ujson.dumps(data), headers=headers)
print(r.text)
# except
使用示例
es = EsHelper(
pool=EsConnectionPool(
nodes=['http://localhost:9200'],
pool_size=1,
timeout=10
)
)
analysis = {"filter": {}, "analyzer": {}}
settings = {"analysis": analysis, "index":{}}
mappings = {"properties": {}}
# rollover,需配合另外一个脚本,有时间再更新上来
index_patterns = f'example-*'
aliases = 'example.ready'
res = es.create_rollover_index(settings, mappings, index_patterns, aliases)
# reindex
src_index = 'src_index_name'
dest_index = 'dest_index_name'
es.reindex(src_index, dest_index)
# 查看分词
es.xsearch.tokenizer(query_text, analyzer='ik_smart')
# dsl查询
es.xsearch.dsl('index_name', dsl, only_id=True)
# 导出索引数据
es.export_data('index_name')
0 评论
大哥整点话呗~