Redian新闻
>
跨机房 ES 同步实战

跨机房 ES 同步实战

公众号新闻

来源 | OSCHINA 社区

作者 | 京东云开发者-谢泽华

原文链接:https://my.oschina.net/u/4090830/blog/5606003


背景
众所周知单个机房在出现不可抗拒的问题(如断电、断网等因素)时,会导致无法正常提供服务,会对业务造成潜在的损失。所以在协同办公领域,一种可以基于同城或异地多活机制的高可用设计,在保障数据一致性的同时,能够最大程度降低由于机房的仅单点可用所导致的潜在高可用问题,最大程度上保障业务的用户体验,降低单点问题对业务造成的潜在损失显得尤为重要。
同城双活,对于生产的高可用保障,重大的意义和价值是不可言喻的。表面上同城双活只是简单的部署了一套生产环境而已,但是在架构上,这个改变的影响是巨大的,无状态应用的高可用管理、请求流量的管理、版本发布的管理、网络架构的管理等,其提升的架构复杂度巨大。
结合真实的协同办公产品:京办(为北京市政府提供协同办公服务的综合性平台)生产环境面对的复杂的政务网络以及京办同城双活架构演进的案例,给大家介绍下京办持续改进、分阶段演进过程中的一些思考和实践经验的总结。本文仅针对 ES 集群在跨机房同步过程中的方案和经验进行介绍和总结。

架构

1. 部署 Logstash 在金山云机房上,Logstash 启动多个实例(按不同的类型分类,提高同步效率),并且和金山云机房的 ES 集群在相同的 VPC
2. Logstash 需要配置大网访问权限,保证 Logstash 和 ES 原集群和目标集群互通。
3. 数据迁移可以全量迁移和增量迁移,首次迁移都是全量迁移后续的增加数据选择增量迁移。
4. 增量迁移需要改造增加识别的增量数据的标识,具体方法后续进行介绍。
原理

Logstash 工作原理


 
Logstash 分为三个部分 input 、filter、ouput:
1. input 处理接收数据,数据可以来源 ES,日志文件,kafka 等通道.
2. filter 对数据进行过滤,清洗。
3. ouput 输出数据到目标设备,可以输出到 ES,kafka,文件等。

增量同步原理

1. 对于 T 时刻的数据,先使用 Logstash 将 T 以前的所有数据迁移到有孚机房京东云 ES,假设用时∆T
2. 对于 T 到 T+∆T 的增量数据,再次使用 logstash 将数据导入到有孚机房京东云的 ES 集群
3. 重复上述步骤 2,直到∆T 足够小,此时将业务切换到华为云,最后完成新增数据的迁移
适用范围:ES 的数据中带有时间戳或者其他能够区分新旧数据的标签

流程




准备工作

1. 创建 ECS 和安装 JDK 忽略,自行安装即可
2. 下载对应版本的 Logstash,尽量选择与 Elasticsearch 版本一致,或接近的版本安装即可
https://www.elastic.co/cn/downloads/logstash

1) 源码下载直接解压安装包,开箱即用

2)修改对内存使用,logstash 默认的堆内存是 1G,根据 ECS 集群选择合适的内存,可以加快集群数据的迁移效率。

 3. 迁移索引

Logstash 会帮助用户自动创建索引,但是自动创建的索引和用户本身的索引会有些许差异,导致最终数据的搜索格式不一致,一般索引需要手动创建,保证索引的数据完全一致。

以下提供创建索引的 python 脚本,用户可以使用该脚本创建需要的索引。

create_mapping.py 文件是同步索引的 python 脚本,config.yaml 是集群地址配置文件。

注:使用该脚本需要安装相关依赖

yum install -y PyYAML
yum install -y python-requests

拷贝以下代码保存为 create_mapping.py:

import yaml
import requests
import json
import getopt
import sys

defhelp():
print
"""
usage:
-h/--help print this help.
-c/--config config file path, default is config.yaml

example:
python create_mapping.py -c config.yaml
"""

defprocess_mapping(index_mapping, dest_index):
print(index_mapping)
# remove unnecessary keys
del index_mapping["settings"]["index"]["provided_name"]
del index_mapping["settings"]["index"]["uuid"]
del index_mapping["settings"]["index"]["creation_date"]
del index_mapping["settings"]["index"]["version"]

# check alias
aliases
= index_mapping["aliases"]
for alias inlist(aliases.keys()):
if alias == dest_index:
print(
"source index "+ dest_index +" alias "+ alias +" is the same as dest_index name, will remove this alias.")
del index_mapping["aliases"][alias]
if index_mapping["settings"]["index"].has_key("lifecycle"):
lifecycle
= index_mapping["settings"]["index"]["lifecycle"]
opendistro
={"opendistro":{"index_state_management":
{"policy_id": lifecycle["name"],
"rollover_alias": lifecycle["rollover_alias"]}}}
index_mapping
["settings"].update(opendistro)
# index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
del index_mapping["settings"]["index"]["lifecycle"]
print(index_mapping)
return index_mapping
defput_mapping_to_target(url, mapping, source_index, dest_auth=None):
headers
={'Content-Type':'application/json'}
create_resp
= requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
if create_resp.status_code !=200:
print(
"create index "+ url +" failed with response: "+str(create_resp)+", source index is "+ source_index)
print(create_resp.text)
withopen(source_index +".json","w")as f:
json
.dump(mapping, f)
defmain():
config_yaml
="config.yaml"
opts
, args = getopt.getopt(sys.argv[1:],'-h-c:',['help','config='])
for opt_name, opt_value in opts:
if opt_name in('-h','--help'):
help()
exit
()
if opt_name in('-c','--config'):
config_yaml
= opt_value

config_file
=open(config_yaml)
config
= yaml.load(config_file)
source
= config["source"]
source_user
= config["source_user"]
source_passwd
= config["source_passwd"]
source_auth
=None
if source_user !="":
source_auth
=(source_user, source_passwd)
dest
= config["destination"]
dest_user
= config["destination_user"]
dest_passwd
= config["destination_passwd"]
dest_auth
=None
if dest_user !="":
dest_auth
=(dest_user, dest_passwd)
print(source_auth)
print(dest_auth)

# only deal with mapping list
if config["only_mapping"]:
for source_index, dest_index in config["mapping"].iteritems():
print("start to process source index"+ source_index +", target index: "+ dest_index)
source_url
= source +"/"+ source_index
response
= requests.get(source_url, auth=source_auth)
if response.status_code !=200:
print("*** get ElasticSearch message failed. resp statusCode:"+str(
response
.status_code)+" response is "+ response.text)
continue
mapping
= response.json()
index_mapping
= process_mapping(mapping[source_index], dest_index)

dest_url
= dest +"/"+ dest_index
put_mapping_to_target
(dest_url, index_mapping, source_index, dest_auth)
print("process source index "+ source_index +" to target index "+ dest_index +" successed.")
else:
# get all indices
response
= requests.get(source +"/_alias", auth=source_auth)
if response.status_code !=200:
print("*** get all index failed. resp statusCode:"+str(
response
.status_code)+" response is "+ response.text)
exit
()
all_index
= response.json()
for index inlist(all_index.keys()):
if"."in index:
continue
print("start to process source index"+ index)
source_url
= source +"/"+ index
index_response
= requests.get(source_url, auth=source_auth)
if index_response.status_code !=200:
print("*** get ElasticSearch message failed. resp statusCode:"+str(
index_response
.status_code)+" response is "+ index_response.text)
continue
mapping
= index_response.json()

dest_index
= index
if index in config["mapping"].keys():
dest_index
= config["mapping"][index]
index_mapping
= process_mapping(mapping[index], dest_index)

dest_url
= dest +"/"+ dest_index
put_mapping_to_target
(dest_url, index_mapping, index, dest_auth)
print("process source index "+ index +" to target index "+ dest_index +" successed.")

if __name__ =='__main__':
main
()

配置文件保存为 config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"

# 是否只处理这个文件中mapping地址的索引
# 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建
# 如果设置成false,则会取源端集群的所有索引,除去(.kibana)
# 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称
# 如果匹配不到,则使用源端原始的索引名称
only_mapping: true

# 要迁移的索引,key为源端的索引名字,value为目的端的索引名字
mapping:
source_index: dest_index

以上代码和配置文件准备完成,直接执行 python create_mapping.py 即可完成索引同步。

索引同步完成可以取目标集群的 kibana 上查看或者执行 curl 查看索引迁移情况:

GET _cat/indices?v



全量迁移
Logstash 配置位于 config 目录下。
用户可以参考配置修改 Logstash 配置文件,为了保证迁移数据的准确性,一般建议建立多组 Logstash,分批次迁移数据,每个 Logstash 迁移部分数据。
配置集群间迁移配置参考:



input{
elasticsearch{
# 源端地址
hosts => ["ip1:port1","ip2:port2"]
# 安全集群配置登录用户名密码
user => "username"
password => "password"
# 需要迁移的索引列表,以逗号分隔,支持通配符
index => "a_*,b_*"
# 以下三项保持默认即可,包含线程数和迁移数据大小和logstash jvm配置相关
docinfo=>true
slices => 10
size => 2000
scroll => "60m"
}
}

filter {
# 去掉一些logstash自己加的字段
mutate {
remove_field => ["@timestamp", "@version"]
}
}

output{
elasticsearch{
# 目的端es地址
hosts => ["http://ip:port"]
# 安全集群配置登录用户名密码
user => "username"
password => "password"
# 目的端索引名称,以下配置为和源端保持一致
index => "%{[@metadata][_index]}"
# 目的端索引type,以下配置为和源端保持一致
document_type => "%{[@metadata][_type]}"
# 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}

# 调试信息,正式迁移去掉
stdout { codec => rubydebug { metadata => true }}
}

增量迁移

预处理:

1. @timestamp 在 elasticsearch2.0.0beta 版本后弃用

https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html

2. 本次对于京办从金山云机房迁移到京东有孚机房,所涉及到的业务领域多,各个业务线中所代表新增记录的时间戳字段不统一,所涉及到的兼容工作量大,于是考虑通过 elasticsearch 中预处理功能 pipeline 进行预处理添加统一增量标记字段:gmt_created_at,以减少迁移工作的复杂度(各自业务线可自行评估是否需要此步骤)。

PUT _ingest/pipeline/gmt_created_at
{
"description":"Adds gmt_created_at timestamp to documents",
"processors":[
{
"set":{
"field":"_source.gmt_created_at",
"value":"{{_ingest.timestamp}}"
}
}
]
}

3. 检查 pipeline 是否生效

GET _ingest/pipeline/*

4. 各个 index 设置对应 settings 增加 pipeline 为默认预处理

PUT index_xxxx/_settings
{
"settings": {
"index.default_pipeline": "gmt_created_at"
}
}

5. 检查新增 settings 是否生效

GET index_xxxx/_settings



增量迁移脚本

schedule-migrate.conf

index:可以使用通配符的方式

query: 增量同步的 DSL,统一 gmt_create_at 为增量同步的特殊标记

schedule: 每分钟同步一把,"* * * * *"

input {
elasticsearch
{
hosts
=>["ip:port"]
# 安全集群配置登录用户名密码
user
=>"username"
password
=>"password"
index
=>"index_*"
query
=>'{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
size
=>5000
scroll
=>"5m"
docinfo
=>true
schedule
=>"* * * * *"
}
}
filter
{
mutate
{
remove_field
=>["source", "@version"]
}
}
output
{
elasticsearch
{
# 目的端es地址
hosts
=>["http://ip:port"]
# 安全集群配置登录用户名密码
user
=>"username"
password
=>"password"
index
=>"%{[@metadata][_index]}"
document_type
=>"%{[@metadata][_type]}"
document_id
=>"%{[@metadata][_id]}"
ilm_enabled
=>false
manage_template
=>false
}

# 调试信息,正式迁移去掉
stdout
{ codec => rubydebug { metadata =>true}}
}

问题:

mapping 中存在 join 父子类型的字段,直接迁移报 400 异常


[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, 
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #<LogStash::Event:0x3b3df773>],
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400,
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse",
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}

解决方法:

https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140 https://github.com/elastic/elasticsearch/issues/26183

结合业务特征,通过在 filter 中加入小量的 ruby 代码,将_routing 的值取出来,放回 logstah event 中,由此问题得以解决。

示例:


END



开源白嫖不提倡?



这里有最新开源资讯、软件更新、技术干货等内容

点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦~

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
黑五之后记得用 BrandClub 同步订单,可能再赚好几百!【注册奖励 $21】重新定义“秒”:潘建伟团队打破长距离时钟同步纪录A Japanese Man’s 30-Year Quest to Green a Chinese Desert依赖重、扩展差,字节跳动是如何优化Apache Atlas 实时消息同步的?kaKao今年有多惨?机房着火网民怒骂,马娘被怼,子公司取消上市中国力量!厦门力量!Chinese rescuers save lives in quake-hit Turkiye宇宙人(1139期)飓风来袭NASA月球火箭再次推迟发射;DARPA“地球同步卫星机器人服务”即将迎来发射【哲学门】“雅典学园” 学习群同步文章:胡塞尔专题加国“禁塑令”逐步实施 餐饮业团体狠批成本增125%!iptables实战-DNAT、SNAT和负载均衡深秋恩赐的味觉(3)山珍 野木瓜(AKEBI)【新春走基层】电信机房来了一名“新员工”学员小档案 | 新南威尔士大学-商科 6门课程同步提高2022年前三季度中企上市热度回升,VC/PE机构IPO成绩单同步出炉!阿斯利康邀您参加北京智慧医疗科技创新论坛,路演项目同步招募中Mary and Jesus in the Eyes of Chinese PaintersChinese University Fires Professor Accused of Sexual Harassment重定义“秒”:中国团队打破长距离时钟同步纪录Hunan Shuts Access to Forest Areas to Prevent More WildfiresChinese Cities Allow Spring Festival Fireworks Amid COVID Blues早秋2021,红色天地的萍水相逢(1)大家聊巨变|上海,探索在高质量发展中逐步实现共同富裕直播预约 | 胡恺健:Privacy by Design理论架构与技术实战 | DPOHUB何谈第9期超美!多伦多市政广场圣诞树今晚亮灯!同步开放52个室外冰场!从亨廷顿的预言到特朗普的MAGA(三)《最后的诗歌》:9: 栗子树甩掉它的火炬《国际经济法》课程群虚拟教研室同步课堂正式开讲——上海与新疆两地四校共享优质教学资源当科技住宅业主打开新风机房敦泰大裁员,高管同步降薪吉林工商学院:颠覆传统机房?选锐捷VDI云桌面就赢了As Couriers Fall Sick, Chinese Cities Ask Residents to Fill In【便民】车行道、人行道建设同步实施,宝山这一道路新建工程设计方案正在公示→User Sues Chinese Streaming Giant iQiyi for Curbing Access报道称一些苹果AirPods Pro第二代耳机存在音频漂移和同步问题终于找到这个漏了。。
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。