关于es routing的介绍,网上已经有很多介绍,这里不在赘述,只介绍使用过程中的一些注意事项及测试情况。
一、服务器概况
硬件平台:x86_64
机器硬件名:x86_64
系统处理器的体系结构:x86_64
内核版本:#1 SMP Fri May 8 10:59:10 UTC 2020
操作系统:GNU/Linux
操作系统的发行版号:4.18.0-193.el8.x86_64
Linux系统:CentOS Linux release 8.2.2004 (Core)
物理CPU个数:1
每个物理CPU核数:20
逻辑CPU个数:40
内存:32GB
彼时服务器资源占用情况:
二、导入数据
2.1 降低数据倾斜程度
注意这里说的是降低,在默认情况下 es 使用docid
作为 routing key 将文档均匀分散在所有分片。
# 默认路由算法
shard = hash(routing) % number_of_primary_shards
而路由则是使用用户定义的routing key
,具有相同routing key
的文档,将被分配到同一个分片。
# 引入该参数以后路由算法
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
在实际开发中,可能存在某些routing key
的文档数量很多,导致集群数据不平衡。
使用index.routing_partition_size
可以将数据分配到分片的子集而不是单个分片。
# settings
{
"index": {
"number_of_shards": "30",
"routing_partition_size": 10,
"number_of_replicas": "0",
}
}
这样做虽然可以使数据更均匀,但是搜索的分片也更多,所以请根据业务设置子集的大小。
2.2 文档 id 不再全局唯一
es 的每个分片都是一个倒排索引,索引 id 是唯一的。而 es 默认使用docid
作为routing key
,所以 es 只需要根据docid
计算就能得知数据在哪个分片上。这也是为什么索引定义之后,分片数不能修改的原因。
而使用自定义路由,则可能存在docid
一样,但是routing key
不一样。例如某个索引将文档类型作为routing key
,在首次插入时,routing key
等于 A,被分配到了分片 1。在第二次插入时,routing key
被修改等于 B,被分配到了分片 2。
这时如果不指定routing key
查询,将会查出具有相同docid
的两条数据。同样的,增加删除修改数据,都可能出现问题。所以使用自定义路由,只能用户自己保证docid
的全局唯一性。
一种方法是,强制所有的增删改查操作都必须带有**routing**
参数
# mappings
{
"_routing": {
"required": True
}
}
携带routing
查询:
curl -XGET "http://localhost:9200/aiip.route_test/_search?routing=scenic&pretty" -H "Content-Type: application/json" -d '{"track_total_hits":true, "query":{"bool":{"filter":[{"match":{"title":"国家公园"}}]}}, "size": 3}'
routing key
可以是多个,用逗号隔开即可。
2.3 Python客户端导入数据
使用elasticsearch.helpers.bulk
提供的批量操作接口。
因业务不同,这里只贴出简化后的核心代码:
class EsHelper:
def __init__(self, nodes:list=['http://127.0.0.1:9200'], pool=None, logger=None):
self.pool = pool
self.logger = logger
if not self.pool:
assert isinstance(nodes, list)
self.conn = Elasticsearch(nodes, timeout=10)
if not self.conn.ping():
raise Exception('无法与es集群建立连接')
def insert_init(self, size:int=500):
self.bucket_ = []
self.bulk_size = size
self.bulk_total = 0
def bulk(self, data):
try:
if self.pool:
conn = self.pool.get_conn()
else:
conn = self.conn
success, failed = helpers.bulk(
conn, data,
stats_only=True, chunk_size=500, request_timeout=30
)
self.bulk_total += len(data)
except:
print(format_exc())
if self.logger:
self.logger.error(format_exc())
finally:
if self.pool:
self.pool.put_conn(conn)
@property
def bucket(self):
if self.bucket_:
if len(self.bucket_) == self.bulk_size or self.bucket_[-1] == None:
self.bucket_.pop()
if self.bucket_:
self.bulk(self.bucket_)
self.bucket_ = []
return self.bucket_
es_conn = EsHelper(pool=esconn_pool)
es_conn.insert_init()
for msg in file_or_kafka:
_routing = msg['type'] # 根据业务设计 routing key
action = {
"_index": 'route_test'
"_id": md5(msg["website"].strip().encode()).hexdigest(), # 自定义 id
"_routing": _routing, # 多了这个字段
"_source": msg
}
try:
es_conn.bucket.append(action)
except:
from traceback import format_exc
logger.error(format_exc())
es_conn.bucket.append(None)
三、测试
NO | 总时间 | ES 查询时间 | MongoDB 查询时间 | 查询效果 | 备注 |
---|---|---|---|---|---|
1 | 0.3753 | 0.365214 | 0.010013 | 正常 | es查询100次,mongo查询一次取12条数据,单位:秒/次。下同。 |
2 | 0.122578 | 0.113337 | 0.009176 | 正常 | |
3 | 0.189266 | 0.180636 | 0.008565 | 正常 |
0 评论
大哥整点话呗~