elasticsearch小tips

时间:Dec. 9, 2019 分类:

目录:

reindex

reindex使用的情况更多的是修改索引的mapping,因为索引一旦创建就只能添加字段而不能修改字段

创建别名

PUT _alias
{
  "actions": [
    {
      "add": {
        "index": "nginx_www.whysdomain.com-2019.08.07",
        "alias": "nginx_www.whysdomain.com-2019.08.07_latest"
      }
    }
  ]
}

然后确认存储空间(reindex期间索引存储加倍)

创建新索引

PUT /nginx_www.whysdomain.com-2019.08.07_new
{
    "mappings" : {
      "properties" : {
        "@timestamp" : {
          "type" : "date"
        },
        "@version" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "agent" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "bytes" : {
          "type" : "long"
        },
        ...省略部分
        "zone" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "number_of_shards" : "4",
        "number_of_replicas" : "0"
      }
    }
}

重建索引

POST _reindex?wait_for_completion=false
{
    "source": {
        "index": "nginx_www.whysdomain.com-2019.08.07"
    },
    "dest": {
        "index": "nginx_www.whysdomain.com-2019.08.07_new"
    }
}
  • wait_for_completion=false:调用reindex接口,接口将会在reindex结束后返回,而接口返回超时只有30秒,如果reindex时间过长,建议加上wait_for_completion=false的参数条件,这样reindex 直接返回taskId

等待重建索引执行完成

将别名指向新的索引,同时解绑旧索引

PUT _alias { "actions": [ { "add": { "index": "nginx_www.whysdomain.com-2019.08.07_new", "alias": "nginx_www.whysdomain.com-2019.08.07_latest" }, "remove": { "index": "nginx_www.whysdomain.com-2019.08.07", "alias": "nginx_www.whysdomain.com-2019.08.07_latest" }

}

] }

删掉旧索引

索引生命周期

冷热存储

在elasticsearch.yml中定义冷热角色

node.attr.box_type: hot

还可以配置warm和cold

索引生命周期策略

{
    "policy": {
        "phases": {
            "hot": {
                "min_age": "0ms",
                "actions": {
                    "set_priority": {
                        "priority": 100
                    }
                }
            },
            "warm": {
                "min_age": "7d",
                "actions": {
                    "allocate": {
                        "number_of_replicas": 0,
                        "include": {},
                        "exclude": {},
                        "require": {
                            "box_type": "warm"
                        }
                    },
                    "set_priority": {
                        "priority": 70
                    }
                }
            },
            "cold": {
                "min_age": "15d",
                "actions": {
                    "allocate": {
                        "number_of_replicas": 0,
                        "include": {},
                        "exclude": {},
                        "require": {
                            "box_type": "warm"
                        }
                    },
                    "freeze": {},
                    "set_priority": {
                        "priority": 30
                    }
                }
            },
            "delete": {
                "min_age": "30d",
                "actions": {
                    "delete": {}
                }
            }
        }
    }
}

也可以直接在kibana上可视化添加

索引绑定生命周期

{
  "settings": {
    "index": {
      "lifecycle": {
        "name": "log_ilm_policy",
        "rollover_alias": "log_alias"
      }
    }
  }
}

也可以在template添加

使用geoip

版本参考https://github.com/maxmind/geoipupdate/releases

安装

$ wget https://github.com/maxmind/geoipupdate/releases/download/v4.0.0/geoipupdate_4.0.0_linux_amd64.tar.gz
$ tar xf geoipupdate_4.0.0_linux_amd64.tar.gz
$ cd geoipupdate_4.0.0_linux_amd64
$ geoipupdate -f /home/geoipupdate_4.0.0_linux_amd64/GeoIP.conf -d /data/geoip-database/

定时更新数据库任务

crontab -e
# bywhy update geoip database
59 2 * * * /disk1/geoipupdate_4.0.0_linux_amd64/geoipupdate -f /disk1/geoipupdate_4.0.0_linux_amd64/GeoIP.conf -d /shards/geoip-database/ >/dev/null 2>&1

logstash配置


       geoip {
           source => "remote_addr"
           target => "geoip"
           database => "/shards/geoip-database/GeoLite2-City.mmdb"
           # fields => ["city_name", "country_code2", "country_name", "region_name", "location"]
       }

elasticsearch添加template


curl -X PUT "http://192.168.31.10:9200/_template/geoip_location_format" -H 'Content-Type: application/json' -d '{
  "order": 0,
  "template": "nginx_*",
  "mappings": {
     "properties": {
        "geoip": {
           "properties": {
             "location" : {
                "type" : "geo_point"
             }
           }
         }
      }
   }
}'

elasticdump

文档参考https://www.npmjs.com/package/elasticdump

安装

npm install elasticdump -g
ln -s /root/node_modules/.staging/elasticdump-36ae1a76/bin/elasticdump  /usr/bin/

同步数据脚本

for i in book3_pay_back_trade_item book3_toc_order book3_toc_team book3order book3posticket book3 book3table
do
    elasticdump --input=http://123.123.123.123:9200/$i --output=http://127.0.0.1:9200/$i --type=settings
    elasticdump --input=http://123.123.123.123:9200/$i --output=http://127.0.0.1:9200/$i --type=mapping
    elasticdump --input=http://123.123.123.123:9200/$i --output=http://127.0.0.1:9200/$i --type=data
done

扩集群查询

参考远程集群

配置生命周期导致会有节点失效

$ _cluster/stats
{"_nodes":{"total":21,"successful":20,"failed":1,"failures":[
{"type":"failed_node_exception",
"reason":"Failed node [FcEUqS2LQf-M5n77SYkl3g]",
"node_id":"FcEUqS2LQf-M5n77SYkl3g",
"caused_by":{"type":"circuit_breaking_exception",
"reason":
    "[parent] Data too large, data for [<transport_request>] would be [29388015860/27.3gb], which is larger than the limit of [28561532518/26.5gb], real usage: [29388009392/27.3gb], new bytes reserved: [6468/6.3kb]",
"bytes_wanted":29388015860,
"bytes_limit":28561532518,
"durability":"PERMANENT"}}]},
"cluster_name":"why-bj","cluster_uuid":"ED-ZSmF9RamximutF4uBnw","timestamp":1568622292925,"status":"green","indices":{"count":4429,"shards":{"total":6541,"primaries":5301,"replication":0.23391812865497075,"index":{"shards":{"min":1,"max":10,"avg":1.4768570783472568},"primaries":{"min":1,"max":5,"avg":1.1968841724994355},"replication":{"min":0.0,"max":1.0,"avg":0.23346127794084442}}},"docs":{"count":53627455925,"deleted":5994321},"store":{"size_in_bytes":29026662802764},"fielddata":{"memory_size_in_bytes":1962291224,"evictions":0},"query_cache":{"memory_size_in_bytes":2862455028,"total_count":153136289,"hit_count":8626374,"miss_count":144509915,"cache_size":35665,"cache_count":218486,"evictions":182821},"completion":{"size_in_bytes":0},"segments":{"count":99073,"memory_in_bytes":50946810520,"terms_memory_in_bytes":36993803714,"stored_fields_memory_in_bytes":12277057296,"term_vectors_memory_in_bytes":0,"norms_memory_in_bytes":164739840,"points_memory_in_bytes":1441502478,"doc_values_memory_in_bytes":69707192,"index_writer_memory_in_bytes":2908918754,"version_map_memory_in_bytes":17534093,"fixed_bit_set_memory_in_bytes":169255776,"max_unsafe_auto_id_timestamp":1568622215614,"file_sizes":{}}},"nodes":{"count":{"total":20,"data":17,"coordinating_only":3,"master":13,"ingest":10},"versions":["7.0.0"],"os":{"available_processors":592,"allocated_processors":592,"names":[{"name":"Linux","count":20}],"pretty_names":[{"pretty_name":"CentOS Linux 7 (Core)","count":20}],"mem":{"total_in_bytes":2495182647296,"free_in_bytes":90912325632,"used_in_bytes":2404270321664,"free_percent":4,"used_percent":96}},"process":{"cpu":{"percent":93},"open_file_descriptors":{"min":1090,"max":13568,"avg":7102}},"jvm":{"max_uptime_in_millis":5463078804,"versions":[{"version":"12","vm_name":"OpenJDK 64-Bit Server VM","vm_version":"12+33","vm_vendor":"Oracle Corporation","bundled_jdk":true,"using_bundled_jdk":true,"count":20}],"mem":{"heap_used_in_bytes":304006773272,"heap_max_in_bytes":562640715776},"threads":6294},"fs":{"total_in_bytes":58417623912448,"free_in_bytes":35921416740864,"available_in_bytes":35921416740864},"plugins":[{"name":"repository-hdfs","version":"7.0.0","elasticsearch_version":"7.0.0","java_version":"1.8","description":"The HDFS repository plugin adds support for Hadoop Distributed File-System (HDFS) repositories.","classname":"org.elasticsearch.repositories.hdfs.HdfsPlugin","extended_plugins":[],"has_native_controller":false}],"network_types":{"transport_types":{"netty4":20},"http_types":{"netty4":20}},"discovery_types":{"zen":20}}}

保证索引分片大小不超过26.5gb

es下节点

https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cluster.html#cluster-shard-allocation-filtering

步骤1: 将节点从集群路由策略中排除

curl -XPUT http://0.0.0.0:9200/_cluster/settings -d '{"transient": {"cluster.routing.allocation.exclude._ip": "172.0.0.249"}}'

多个IP用,隔开

如果一个node有多个节点

PUT _cluster/settings
{
  "transient" : {
    "cluster.routing.allocation.exclude._name" : "es-09,es-11,es-04,es-12,es-05"
  }
}

步骤2:等待节点上分片全部被迁移

检查集群状态,若出现pening_tasks,当pending_tasks的等级>=HIGH时,存在集群无法新建索引的风险

curl http://0.0.0.0:9200/_cluster/health?pretty 

查看集群健康状态

curl http://0.0.0.0:9200/_cat/shards?v |grep ">"

查看当前正在迁移的服务

curl http://0.0.0.0:9200/_cluster/pending_tasks?pretty

查看当前pending_tasks状态

若集群中出现UNASSIGNED shards,检查原因,查看是否是分配策略导致无法迁移分片

curl http://0.0.0.0:9200/_cluster/allocation/explain?pretty

步骤3:下线节点

步骤4:取消节点禁用策略

curl -XPUT http://0.0.0.0:9200/_cluster/settings?pretty -d '{"transient": {"cluster.routing.allocation.exclude._ip": null}}'  

创建template

https://www.elastic.co/guide/en/elasticsearch/reference/7.7/indices-templates.html

在kibana7.7版本已经可以在kibana界面上修改和添加

磁盘数据量感知

  • disk.watermark.low代表着磁盘使用率的低水位线,默认85%,这个配置意味着,es不会将分片分配给超过这个值的节点,此设置对新创建的索引的主分片没有影响,但是会阻止分配它们的副本。
  • disk.watermark.high同理存在高水位线配置,默认为90% ,这意味着Elasticsearch将尝试将分片从磁盘使用率超过90%的节点上分离出来,这个配置同样影响集群的平衡。
  • disk.watermark.flood_stage,默认值95%,防止节点用完磁盘空间的最后手段的配置,采用强制只读的方式来保护集群和主机。

机架感知

  • /bin/elasticsearch -Enode.attr.rack_id=rack_one` 启动时指定。
  • cluster.routing.allocation.awareness.attributes: rack_id 配置文件中指定。

elasticsearch数据

shard的大小在10~20GB

  • elasticsearch是通过shard来实现负载均衡,过大影响迁移
  • 每个shard都是一个lucene实例,对应一个java线程,所以也不易过多

堆内存不超过32GB,大内存机器可以部署多个实例

java -Xmx32766m -XX:+PrintFlagsFinal 2> /dev/null | grep UseCompressedOops

检查最大堆内存jvm是否开启指针压缩

参考官方文档

关闭HeapDumpOnOutOfMemoryError

JVM的设置,默认会开启HeapDumpOnOutOfMemoryError,当堆内存溢出时,或者JVM被OOM时,会自动生成DUMP文件

ES当负载比较高的时候,实际内存有可能会超过设置的最大堆内存,如果开启此设置,JVM会锁住其内存空间进行DUMP操作。在dump的过程中,无法对集群中的其它heartbeat进行相应,会被其它节点认为此节点已经掉线,Master会将其从节点中移除,继而又会触发shard的迁移。因此建议关闭此参数

可以之间使用Jinfo关闭此参数

jinfo -flag -HeapDumpOnOutOfMemoryError  <pid>

关闭长查询任务

# coding=utf8
#!/usr/bin/python
#############################################
# 扫描出ES中, 运行时间超过10分钟的task
# 并且把他们cancel
#############################################
import requests
import os
import logging
from logging import Logger

log_conf = {
    "level": logging.INFO,
    "log_dir": "."
}

def init_logger(logger_name='all'):
    if logger_name not in Logger.manager.loggerDict:
        logger = logging.getLogger(logger_name)
        logger.setLevel(log_conf['level'])
        # file
        fmt = '%(asctime)s - %(process)s - %(levelname)s: - %(message)s'
        formatter = logging.Formatter(fmt)

        # all file
        log_file = os.path.join(log_conf['log_dir'], logger_name + '.log')
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

        # error file
        log_file = os.path.join(log_conf['log_dir'], logger_name + '.error.log')
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        file_handler.setLevel(logging.ERROR)
        logger.addHandler(file_handler)

    logger = logging.getLogger(logger_name)
    return logger

logger = init_logger("all")


def main():
    logger.info('[start]kill long task')
    # 1. scan
    wait2cancel_set = set()
    url = "http://localhost:9200/_tasks?actions=*search&detailed"
    res = requests.get(url)
    dd = res.json()
    for value in dd['nodes'].values():
        for task_id, task_info in value['tasks'].items():
            # 注意这里是纳秒
            run_secs = task_info['running_time_in_nanos']/1000/1000/1000
            # 10 min
            if run_secs > 60 * 10:
                wait2cancel_set.add(task_id)

    logger.info('wait2cancel_list:%s, count:%s', wait2cancel_set, len(wait2cancel_set))
    # 2. cancel
    for task_id in wait2cancel_set:
        # 请自行修改ES的地址
        url = "http://localhost:9200/_tasks/%s/_cancel" % (task_id)
        res = requests.post(url)
        logger.info("cancel task, task_id:%s, result:%s", task_id, res.content)

    logger.info('[end]kill long task')

if __name__ == '__main__':
    print '--------start-----------'
    main()
    print '--------end-----------'

副本调整为0

curl -XPUT  -H "Content-Type: application/json"   0.0.0.0:9200/nginx_whysdomain.com-2020.05.20/_settings  -d '{"number_of_replicas":"0"}'

分片数量控制

  • Shrink
  • Rollover

存储数据统计

curl 127.0.0.1:9200/_cat/indices > /tmp/index

egrep 'nginx' /tmp/index | grep 'gb$' | awk '{print $9}' | awk -F 'gb' '{sum+=$1}END{print sum}'

手动迁移分片

关闭集群自动索引分片分配

PUT _cluster/settings
{ 
  "persistent": { 
    "cluster.routing.allocation.enable": "none"
  }
}

允许迁移到同一主机

PUT _cluster/settings 
{ 
  "persistent": { 
    "cluster.routing.allocation.same_shard.host": "false"
  }
}

迁移分片

POST /_cluster/reroute
{
    "commands" : [ {
        "move" : 
            {
              "index" : "s_online-restaurant-2020.07.24",
              "shard" : 0, 
              "from_node" : "ulp-sj02-pos-h1", 
              "to_node" : "ulp-sj02-pos-w1"
            }
        }
    ]
}

不允许迁移到同一主机

PUT _cluster/settings 
{ 
  "persistent": { 
    "cluster.routing.allocation.same_shard.host": "true"
  }
}

开启集群自动索引分片分配

PUT _cluster/settings
{ 
  "persistent": { 
    "cluster.routing.allocation.enable": "all"
  }
}