一、应用背景:

1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。

2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;重新修改mapping也可以考虑尝试使用Reindex。

索引settings分类:
1、静态配置:只有在创建的时候设置或者部分配置可以在索引库关闭的状态下修改。
  • index.number_of_shards (关闭索引库也无法修改)
  • index.sort.field
2、动态配置:在索引使用的时候就可以修改
  • index.number_of_replicas
  • index.refresh_interval

官网: Index modules | Elasticsearch Guide [8.10] | Elastic

二、数据迁移步骤:Reindex API | Elasticsearch Guide [8.10] | Elastic

2.1、创建新的索引

并修改了number_of_replicas和refresh_interval。

  1. 设置number_of_replicas为0防止我们迁移文档的同时又发送到副本节点,影响性能
  2. 设置refresh_interval为-1是限制其刷新。默认是1秒

当我们数据迁移完成再把上面两个值进行修改即可。

2.2、复制数据
1)同一个集群迁移
POST _reindex
{
  "source": {
    "index": "old_index"
  },
  "dest": {
    "index": "new_index"
  }
}

利用命令:curl _XPOST 'ES数据库请求地址:9200/_reindex' -d{"source":{"index":"old_index"},"dest":{"index":"new_index"}}

2) 扩集群迁移
  • 和同集群数据迁移基本一样,就是多了一个设置白名单而已。(如果是A集群 --> B集群,就需要在B中的elasticsearch.yml 设置A地址为白名单)reindex.remote.whitelist: 172.16.1.236:9200
  • 设置好索引、number_of_replicas: 0、refresh_interval: -1
  • 在remote中设置远程集群的地址与账号密码(如果配置了的话)。
  • 也可以添加query属性,只查询符号条件的。
POST /_reindex
{
  "source": {
    "index": "test_v1",
    "remote": {
      "host": "http://destHost:9200",
      "username": "username",
      "password": "password"
    },
    "query": {
      "match_all": {}
    }
  },
  "dest": {
    "index": "test_remote"
  }
}

完成之后记得重新配置远程集群索引的number_of_replicas、refresh_interval

3) 修改索引名
POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "testindex-old",
        "alias": "testindex-new"
      }
    }
  ]
}
4)reindex其他功能 

1、指定部分字段进行reindex
POST _reindex?refresh
{
  "source": {
    "index": "product_index",
    "_source":["productId","productName","updateTime","amuontTotal"]
  },
  "dest": {
    "index": "new_product_index"
  }
}
适用场景:如果只需要把源索引的部分字段进行 reindex 到目标索引,在请求体的 source 中设置 _source 参数指定这些字段即可。

2、使用 script 进行 reindex
POST _reindex?refresh
{
  "source": {
    "index": "product_index"
  },
  "dest": {
    "index": "new_product_index"
  },
  "script": {
    "source": "ctx._source.lastupdatetime = ctx._source.remove(\"updataTime\")"
  }
}
适用场景:ES script 是一个强大的存在,可以轻松帮我们实现很多对文档修改的需求,比如,把文档中的 updataTime 字段名称改为 lastupdatetime ;又比如,在文档中新增一个字段并赋默认值等。

3、多个源索引进行 reindex 到一个目标索引
POST _reindex?refresh
{
  "source": {
    "index": ["product_index","product_index_1","product_index_2"]
  },
  "dest": {
    "index": "new_product_index"
  }
}
适用场景:多个源索引向同一个目标索引进行 reindex,但需要注意多个源索引的文档id有可能是一样的,reindex 到目标索引时无法保证是哪个源索引的文档id,最终覆盖只保留一个文档id。
 

三、数据迁移效率

ES提供了_reindex这个API。相对于我们重新导入数据肯定会快不少,实测速度大概是bulk导入数据的5-10倍。常规的如果我们只是进行少量的数据迁移利用普通的reindex就可以很好的达到要求。

但是当我们发现我们需要迁移的数据量过大时,我们会发现reindex的速度会变得很慢。

数据量几十个G的场景下,elasticsearch reindex速度太慢,从旧索引导数据到新索引,当前最佳方案是什么?

原因分析:

reindex的核心做跨索引、跨集群的数据迁移。
慢的原因及优化思路无非包括:
    1)批量大小值可能太小。需要结合堆内存、线程池调整大小;
    2)reindex的底层是scroll实现,借助scroll并行优化方式,提升效率;
    3)跨索引、跨集群的核心是写入数据,考虑写入优化角度提升效率。

3.1、提升批量写入大小值

默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size。

POST _reindex
{
  "source": {
    "index": "source",
    "size": 5000
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

批量大小设置的依据:
1、使用批量索引请求以获得最佳性能。
批量大小取决于数据、分析和集群配置,但一个好的起点是每批处理5-15 MB。
注意,这是物理大小。文档数量不是度量批量大小的好指标。例如,如果每批索引1000个文档:
1)每个1kb的1000个文档是1mb。
2)每个100kb的1000个文档是100 MB。
这些是完全不同的体积大小。
2、逐步递增文档容量大小的方式调优。
1)从大约5-15 MB的大容量开始,慢慢增加,直到你看不到性能的提升。然后开始增加批量写入的并发性(多线程等等)。
2)使用kibana、cerebro或iostat、top和ps等工具监视节点,以查看资源何时开始出现瓶颈。如果您开始接收EsRejectedExecutionException,您的集群就不能再跟上了:至少有一个资源达到了容量。
要么减少并发性,或者提供更多有限的资源(例如从机械硬盘切换到ssd固态硬盘),要么添加更多节点。

3.2、借助scroll的sliced提升写入效率

Reindex支持Sliced Scroll以并行化重建索引过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。

sliced原理(from medcl)

1)用过Scroll接口吧,很慢?如果你数据量很大,用Scroll遍历数据那确实是接受不了,现在Scroll接口可以并发来进行数据遍历了。
2)每个Scroll请求,可以分成多个Slice请求,可以理解为切片,各Slice独立并行,利用Scroll重建或者遍历要快很多倍。

slicing的设定分为两种方式:手动设置分片、自动设置分片。

POST _reindex?slices=5&refresh
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

slices大小设置注意事项:

1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值
2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。

3.3、自动覆盖防止冲突(第一次导入失败的时候设置)

但如果新的index中有数据,并且可能发生冲突,那么可以设置version_type"version_type": "internal"或者不设置,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和ID的任何内容

POST _reindex
{
  "source": {
    "index": "old_index"
  },
  "dest": {
    "index": "new_index",
    "version_type": "internal"
  }
}
3.4、异步任务防止超时

POST _reindex?slices=3&refresh&wait_for_completion=false
{
  "source": {
    "index": "old-index"
  },
  "dest": {
    "index": "new-index"
  }
}

查看异步任务:

// 指定任务查看

GET /_tasks/cbKrtP8gShq6Ii15mfZkFg:7879651

//模糊匹配

GET _tasks?detailed=true&actions=*reindex

// 中途取消

POST _tasks/task_id:xxx/_cancel

总结:

reindex 是一个很耗时的操作,当 ES 索引的文档数量很大时,不得不去面对和思考效率的问题了,有以下几个方面可以参考:

创建目标索引:

  • settings.index.refresh_interval:刷新间隔时间,最好设置为-1,即不刷新,等 reindex 结束后可以重新设置该参数;
  • settings.index.number_of_replicas:副本数量,设置为0,副本在索引创建之后也是可以动态调整的,reindex 没必要设置;

调用 Reindex API:

  • 请求体使用参数 size 并根据当前文档的情况评估一个合理的批量处理的数值;
  • 请求参数使用自动切片方式,即 slices=auto;
  • 请求体使用参数 conflicts 并修改为 proceed,因为 conflicts 默认的是 abort,即默认遇到版本冲突的时候会退出 reindex;(不如自动覆盖防止冲突)
  • 异步任务防止超时
  • 请求参数适当限制处理速度,避免ES集群出现崩溃,即 requests_per_second;(默认即可)
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐