本文档基于 centos 7,elasticsearch 7.7.0。其它系统或es版本不保证本文档可适用。
拉取镜像
# 搜索所有的Docker镜像
docker search elasticsearch
# 拉取镜像,“7.7.0”为本文档使用es版本,其它版本请自行更改
docker pull docker.io/elasticsearch:7.7.0
# 查看镜像
docker images
运行容器
创建所需文件
es的配置、数据、日志、插件等文件需做保留,我的保存路径是 /mnt/data/app/elasticsearch,需要对此文件进行权限设置(只能设置777)。
mkdir -p /mnt/data/app/elasticsearch/config
touch /mnt/data/app/elasticsearch/config/es.yml
mkdir -p /mnt/data/app/elasticsearch/data
mkdir -p /mnt/data/app/elasticsearch/logs
mkdir -p /mnt/data/app/elasticsearch/plugins
chmod -R 777 /mnt/data/app/elasticsearch/
修改配置文件
vi /mnt/data/app/elasticsearch/config/es.yml
注意:每个设置项的冒号后需要留一个空格
# ---------------------------------- cluster -----------------------------------
# 集群名称,用于定义哪些elasticsearch节点属同一个集群。
cluster.name: otiwms
# 节点名称,用于唯一标识节点,不可重名
node.name: node1
# 让节点成为主节点,且存储任何数据
node.master: true
node.data: true
# 以下列出了三种集群拓扑模式,如下:
# 1、如果想让节点不具备选举主节点的资格,只用来做数据存储节点。
# node.master: false
# node.data: true
# 2、如果想让节点成为主节点,且不存储任何数据,只作为集群协调者。
# node.master: true
# node.data: false
# 3、如果想让节点既不成为主节点,又不成为数据节点,
# 那么可将他作为搜索器,从节点中获取数据,生成搜索结果等
# node.master: false
# node.data: false
# 这个配置限制了单机上可以开启的ES存储实例的个数
# 当我们需要单机多实例,则需要把这个配置赋值2,或者更高。
# node.max_local_storage_nodes: 1
# ----------------------------------- Paths ------------------------------------
# 数据存储路径,可以设置多个路径用逗号分隔,有助于提高IO。
path.data: /usr/share/elasticsearch/data/data01,/usr/share/elasticsearch/data/data02
# 日志文件路径
path.logs: /usr/share/elasticsearch/logs
# ----------------------------------- Memory -------------------------------------
# circuit检查
# indices.breaker.type: none
# 确保 ES_MIN_MEM 和 ES_MAX_MEM 环境变量设置为相同的值,以及机器有足够的内存分配给Elasticsearch
# 注意:内存也不是越大越好,一般64位机器,最大分配内存别才超过32G
# 当JVM开始写入交换空间时(swapping)ElasticSearch性能会低下,你应该保证它不会写入交换空间
# 设置这个属性为true来锁定内存,同时也要允许elasticsearch的进程可以锁住内存,
# linux下可以通过 `ulimit -l unlimited` 命令
bootstrap.memory_lock: true
# 节点用于 fielddata 的最大内存,如果 fielddata 达到该阈值,就会把旧数据交换出去。
# 该参数可以设置百分比或者绝对值。默认设置是不限制,所以强烈建议设置该值。
indices.fielddata.cache.size: 20%
# 默认值是JVM堆内存的60%,注意为了让设置正常生效,一定要确保 indices.breaker.fielddata.limit
# 的值大于 indices.fielddata.cache.size 的值。否则的话,fielddata 大小一达到 limit 阈值就报错,
# 就永远到不了 size 阈值,无法触发对旧数据的交换任务了。
indices.breaker.fielddata.limit: 60%
# 断路器估算需要完成其他请求部分的结构大小,例如创建一个聚合,默认限制是堆内存的 40%。
indices.breaker.request.limit: 40%
# 结合 request 和 fielddata 断路器保证两者组合起来不会使用超过堆内存的 70%(默认值)。
indices.breaker.total.limit: 70%
#------------------------------------ Network And HTTP -----------------------------
# 设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0
network.bind_host: 0.0.0.0
# 设置其它节点和该节点通信的ip地址,如果不设置它会自动设置,值必须是个真实的ip地址
network.publish_host: 192.168.1.88
# 同时设置bind_host和publish_host上面两个参数
network.host: 192.168.1.88
# 设置集群中节点间通信的tcp端口,默认是9300
transport.tcp.port: 9300
# 设置是否压缩tcp传输时的数据,默认为false,不压缩
transport.tcp.compress: false
# 设置对外服务的http端口,默认为9200
http.port: 9200
# 设置请求内容的最大容量,默认100mb
http.max_content_length: 100mb
# --------------------------------- Discovery --------------------------------------
# 非单例模式时需要注释掉
discovery.type: single-mode
# 这个参数决定了要选举一个Master至少需要多少个节点,默认值是1,推荐设置为 N/2 + 1
# N是集群中节点的数量,这样可以有效避免脑裂
# discovery.zen.minimum_master_nodes: 1
# 在java里面GC (Garbage Collection 垃圾收集)是很常见的,但在GC时间比较长的时候,
# 在默认配置下,节点会频繁失联。节点的失联又会导致数据频繁重传,甚至会导致整个集群基本不可用。
# discovery参数是用来做集群之间节点通信的,默认超时时间是比较小的。我们把参数适当调大,
# 避免集群GC时间较长导致节点的丢失、失联。
# discovery.zen.ping.timeout: 200s
# discovery.zen.fd.ping_timeout: 200s
# discovery.zen.fd.ping.interval: 30s
# discovery.zen.fd.ping.retries: 6
# 设置集群中节点的探测列表,新加入集群的节点需要加入列表中才能被探测到。
# discovery.zen.ping.unicast.hosts: ["192.168.1.88:9300"]
# 是否打开广播自动发现节点,默认为true
# discovery.zen.ping.multicast.enabled: true
使用分词器插件
analysis-ik.tar.gz
elasticsearch-jieba-plugin-7.7.0.tar.gz
将解压的分词器插件放在 /mnt/data/app/elasticsearch/plugins 目录下
注意:结巴分词的目录名称必须是 __elasticsearch-jieba-plugin-es版本号
运行容器
docker run --name es -p 9200:9200 -p 9300:9300 \
--privileged=true\
--ulimit memlock=-1:-1 \
-e ES_JAVA_OPTS="-Xms3g -Xmx3g" \
-v /mnt/data/app/elasticsearch/config/es.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mnt/data/app/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mnt/data/app/elasticsearch/logs:/usr/share/elasticsearch/logs \
-v /mnt/data/app/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.7.0
参数说明
--name 容器名称
-p 主机与容器的端口映射关系
--privileged root权限启动(必须)
--ulimit “memlock=-1:-1”设置容器系统的最大锁定内存地址空间
-v 文件挂载
-d 后台启动
-e 设置环境变量
ES_JAVA_OPTS 设置分配给es 的堆内存大小
检查
- 查看容器是否成功启动,
docker ps -a
- 确认启动成功后,更新容器启动参数,设置自动重启
docker update es --restart=always
导入数据
创建索引
from elasticsearch import Elasticsearch
index_name = "es_test"
nodes = ["http://localhost:9200/"]
es = Elasticsearch(nodes)
es.indices.delete(index=index_name, ignore=[400, 404])
es.indices.create(index=index_name, ignore=400)
mappings = {
"properties": {
"name": {
"type": "text"
},
"age": {
"type": "integer"
}
}
}
es.indices.put_mapping(index=index_name, body=mapping)
es.close()
导入数据
import sys
import time
import traceback
from elasticsearch import Elasticsearch, helpers
index_name = "es_test"
nodes = ["http://localhost:9200/"]
es = Elasticsearch(nodes)
setting_body = {
"index": {
"refresh_interval": "30s",
"number_of_shards": "3",
"number_of_replicas": "0",
"translog": {
"flush_threshold_size": "1g",
"sync_interval": "60s",
"durability": "async" # request async
},
"merge": {
"scheduler": {
"max_thread_count": "4"
}
}
}
}
es.indices.put_settings(index=index_name, body=setting_body)
fin = "将导入es的数据文件路径"
i = 0
action_size = 1000
actions = []
start = time.time()
with open(fin, 'rb') as f:
for idx, line in enumerate(f):
try:
msg = json.loads(line.decode('utf-8').strip()) # 据文件格式修改
action = {
"_index": self.es_index,
"_source": msg
}
actions.append(action)
i += 1
if not i % action_size:
success, failed = helpers.bulk(es, actions,
stats_only=True, chunk_size=500, request_timeout=30)
print(f"{pname} {i} {time.time()-start:.4}")
if failed:
print("bulk faild: ", failed)
actions = []
start = time.time()
except Exception:
# TODO
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
sys.exit()
else:
pass
if actions:
success, failed = helpers.bulk(es, actions,
stats_only=True, chunk_size=500, request_timeout=30)
if failed:
print("bulk faild: ", failed)
print(f"final {pname} {i} {time.time()-start:.4}")
setting_body = {
"index": {
"refresh_interval": "1s",
# "number_of_replicas": "1",
"translog": {
"flush_threshold_size": "512mb",
"sync_interval": "5s",
"durability": "request" # request async
}
}
}
es.indices.put_settings(index=index_name, body=setting_body)
查询
查询集群状态
curl -XGET 'localhost:9200/_cluster/health?pretty=true'
pretty=true 格式化输出
level=indices 显示索引状态
level=shards 显示分片信息
集群系统信息,包括CPU JVM等
curl -XGET 'localhost:9200/_cluster/stats?pretty'
集群的详细信息。包括节点、分片等
curl -XGET 'localhost:9200/_cluster/state?pretty'
获取集群堆积的任务
curl -XGET 'localhost:9200/_cluster/pending_tasks?pretty=true'
查看每个节点上所有segment占用的memory总和
curl -XGET 'http://localhost:9200/_cat/nodes?v&h=segments.count,segments.memory,segments.index_writer_memory,segments.version_map_memory,segments.fixed_bitset_memory'
查看每个索引上所有segment的memory占用情况
curl -XGET 'http://localhost:9200/_cat/segments?v'
强制合并每个分片上索引只有一个segment
当查询速度慢时,排查方式之一可查看索引segment数量,每个segment就是一个倒排索引,更多的segment意味着更多的文件句柄,也即更长的IO花销。强制合并将减少体积较小的segment,每个分片的每个segment的体积大小建议40~50G。
合并操作,根据数据量级不同,耗时较长,需等待。
curl -s -XPOST 'http://localhost:9200/index_name/_forcemerge?max_num_segments=1'
查询某个索引的文档个数
curl -H "Content-Type: application/json" -XGET "http://localhost:9200/index_name/_search?pretty" -d '{"track_total_hits":true,"query":{"match_all":{}}, "size": 3}'
{
"track_total_hits":true,
"query":{
"match_all":{}
}
}
基本match条件查询并高亮
{
"track_total_hits":true,
"size":3,
"query":{
"match":{
"title":{
"query":"特朗普煽动支持者,现代版“进京勤王”上演"
}
}
},
"highlight":{
"pre_tags": "<em>",
"post_tags": "</em>",
"fields": {
"title": {}
}
}
}
嵌套查询
{
"track_total_hits":true,
"size":10,
"from":0,
"query":{
"bool":{
"must":[
{
"match":{
"title":{
"query":"灌篮高手"
}
}
}
],
"should":[
{
"match_phrase":{
"title":{
"query":"灌篮高手",
"slop": 5,
"boost":5
}
}
},
{
"match":{
"title":{
"query":"灌篮",
"boost":3
}
}
},
{
"match":{
"title":{
"query":"高手",
"boost":1
}
}
}
]
}
}
}
2 评论
阿添 2023-03-23 18:22:59
test