当前位置: 首页 > news >正文

数据同步-Mysql同步到ElasticSearch

Mysql同步到ElasticSearch

  • 数据同步
    • 1、定时任务
    • 2、双写
    • 3、MQ异步写入
    • 4、Logstash
    • 5、Canal

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。

数据同步包含:全量同步 (首次) + 增量同步(新数据)。

首次安装完 ES,把 MySQL 数据全量同步到 ES 里,写一个单次脚本。

public class FullSyncPostToEs implements CommandLineRunner {@Resourceprivate PostService postService;@Resourceprivate PostEsDao postEsDao;@Overridepublic void run(String... args) {List<Post> postList = postService.list();if (CollectionUtils.isEmpty(postList)) {return;}List<PostEsDTO> postEsDTOList = postList.stream().map(PostEsDTO::objToDto).collect(Collectors.toList());final int pageSize = 500;int total = postEsDTOList.size();log.info("FullSyncPostToEs start, total {}", total);for (int i = 0; i < total; i += pageSize) {int end = Math.min(i + pageSize, total);log.info("sync from {} to {}", i, end);postEsDao.saveAll(postEsDTOList.subList(i, end));}log.info("FullSyncPostToEs end, total {}", total);}
}

增量同步有五种方式:

1、定时任务

  • 定时任务:比如1 分钟 1 次,找到 MySQL 中过去几分钟内(至少是定时周期的 2 倍)发生改变的数据,然后更新到 ES。
public class IncSyncPostToEs {@Resourceprivate PostMapper postMapper;@Resourceprivate PostEsDao postEsDao;/*** 每分钟执行一次*/@Scheduled(fixedRate = 60 * 1000)public void run() {// 查询近 5 分钟内的数据Date fiveMinutesAgoDate = new Date(new Date().getTime() - 5 * 60 * 1000L);List<Post> postList = postMapper.listPostWithDelete(fiveMinutesAgoDate);if (CollectionUtils.isEmpty(postList)) {log.info("no inc post");return;}List<PostEsDTO> postEsDTOList = postList.stream().map(PostEsDTO::objToDto).collect(Collectors.toList());final int pageSize = 500;int total = postEsDTOList.size();log.info("IncSyncPostToEs start, total {}", total);for (int i = 0; i < total; i += pageSize) {int end = Math.min(i + pageSize, total);log.info("sync from {} to {}", i, end);postEsDao.saveAll(postEsDTOList.subList(i, end));}log.info("IncSyncPostToEs end, total {}", total);}
}

优点:简单易懂、占用资源少、不用引入第三方中间件;
缺点:有时间差;
应用场景:数据时间内不同步影响不大、或者数据几乎不发生修改;

2、双写

  • 双写:写数据的时候,必须去写入到ES,更新、删除都需要操作ES(加事务:可能存在写入某一方出现失败,形成脏数据)。

3、MQ异步写入

  • MQ异步写入:在写入数据库时,通过MQ异步写入ES,同样可能存在数据写入不一致问题。

4、Logstash

  • ES的Logstash数据同步管道:Logstash 事件处理管道有三个阶段:输入过滤器输出

下载地址:https://www.elastic.co/guide/en/logstash/7.17/installing-logstash.html
inputs 模块负责收集数据,filters 模块可以对收集到的数据进行格式化、过滤、简单的数据处理,outputs 模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。

inputs 和 outputs 支持编解码器,使您能够在数据进入或离开管道时对数据进行编码或解码,而无需使用单独的过滤器。
在这里插入图片描述
启动Logstash,添加一个conf配置文件,便可完成同步任务。

 C:\Windows\system32> cd C:\logstash-7.17.23\C:\logstash-7.17.23> .\bin\logstash.bat -f .\config\syslog.conf

syslog.conf:数据同步的配置文件。

举个例子:

输入事件:

input {jdbc {jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"  //数据库驱动jdbc_driver_class => "com.mysql.jdbc.Driver"                 //连接数据库jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"jdbc_user => "mysql"jdbc_password => "mysql"statement => "SELECT * from songs where artist = :favorite_artist"  //执行sql语句parameters => { "favorite_artist" => "Beethoven" }   //预编译  schedule => "* * * * *"    //corn表达式,多久进行同步}
}

:sql_last_value 可以设置每次查询结果中updatetime为最后的时间,作为下次增量同步的开始时间(需要对时间进行排序才能保证最后一条数据为时间最大的)。

input {jdbc {statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE updatetime > :sql_last_value order  by updatetime desc"use_column_value => truetracking_column => "updatetime "# ... other configuration bits}
}

输出事件:

output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "127.0.0,1:9200"  //写入到ES
index => "post_v1"        //ES对应的索引
document_id => "%{id)"	    //取数据库查询出的id作为ES中的唯一id
}
}

过滤事件:

filter {
mutate {
rename => {
"updatetime" =>"updateTime"    //给字段重命名
"userid"     => "userId"
"createtime" =>"createTime"
"isdelete"   =>"isDelete"
remove_field =>["thumbnm""favournum"]   //移除不需要同步到ES中的字段

更多参数,可参考官方文档进行配置:https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html。

5、Canal

  • Canal

优点:实时同步,实时性非常强;
原理:数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理;
canal: 帮你监听 binlog,并解析 binlog 为你可以理解的内容,它伪装成了 mysql 的从节点,获取主节点给的 binlog。

在这里插入图片描述

参考文档:https://github.com/alibaba/canal/wiki/QuickStart

后记
👉👉💕💕美好的一天,到此结束,下次继续努力!欲知后续,请看下回分解,写作不易,感谢大家的支持!! 🌹🌹🌹

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • perl的学习记录——仿真regression
  • 【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn
  • Jeremy Howard对创业,AI产品,技术趋势,社区的看法
  • 万龙觉醒免费辅助:VMOS云手机辅助巴克尔阵容搭配攻略!
  • jQuery基础2-css的操作-事件-属性-Ajax-DOM操作
  • Redis访问工具
  • 【系统架构设计师】建造者模式(Builder Pattern)
  • 苹果手机铃声怎么设置自己的歌?3个方法自定义手机铃声
  • Baumer工业相机堡盟工业相机如何通过NEOAPI SDK获取相机当前数据吞吐量(Python)
  • MySQL里面的日期字符串如何转成日期做比较运算,获取两个日期之间的所有日期(包括起始日期)
  • 【计算机方向】中科院一区TOP!国人发文占比50%,录用比例容易,晋升宝刊!
  • #C++ 智能指针 std::unique_ptr 、std::shared_ptr 和 std::weak_ptr
  • 双目标定测距C++代码记录
  • 影刀RPA实战:自动化同步商品库存至各大电商平台(二)
  • 【基础篇】深度学习面试题指南【1】面试必备!
  • CSS选择器——伪元素选择器之处理父元素高度及外边距溢出
  • Docker 1.12实践:Docker Service、Stack与分布式应用捆绑包
  • es的写入过程
  • HTML-表单
  • HTTP--网络协议分层,http历史(二)
  • java架构面试锦集:开源框架+并发+数据结构+大企必备面试题
  • Js基础知识(四) - js运行原理与机制
  • laravel 用artisan创建自己的模板
  • Node 版本管理
  • v-if和v-for连用出现的问题
  • windows-nginx-https-本地配置
  • 罗辑思维在全链路压测方面的实践和工作笔记
  • 排序算法之--选择排序
  • 学习使用ExpressJS 4.0中的新Router
  • Java总结 - String - 这篇请使劲喷我
  • 哈罗单车融资几十亿元,蚂蚁金服与春华资本加持 ...
  • 蚂蚁金服CTO程立:真正的技术革命才刚刚开始
  • ​比特币大跌的 2 个原因
  • #include到底该写在哪
  • #laravel部署安装报错loadFactoriesFrom是undefined method #
  • $(this) 和 this 关键字在 jQuery 中有何不同?
  • (04)Hive的相关概念——order by 、sort by、distribute by 、cluster by
  • (145)光线追踪距离场柔和阴影
  • (C)一些题4
  • (LeetCode) T14. Longest Common Prefix
  • (Redis使用系列) Springboot 使用redis的List数据结构实现简单的排队功能场景 九
  • (笔试题)合法字符串
  • (二刷)代码随想录第16天|104.二叉树的最大深度 559.n叉树的最大深度● 111.二叉树的最小深度● 222.完全二叉树的节点个数
  • (附源码)springboot 基于HTML5的个人网页的网站设计与实现 毕业设计 031623
  • (一)Dubbo快速入门、介绍、使用
  • (转)h264中avc和flv数据的解析
  • (转)用.Net的File控件上传文件的解决方案
  • .md即markdown文件的基本常用编写语法
  • .Net Core 微服务之Consul(三)-KV存储分布式锁
  • .NET业务框架的构建
  • .xml 下拉列表_RecyclerView嵌套recyclerview实现二级下拉列表,包含自定义IOS对话框...
  • @GetMapping和@RequestMapping的区别
  • [100天算法】-实现 strStr()(day 52)
  • [2015][note]基于薄向列液晶层的可调谐THz fishnet超材料快速开关——
  • [⑧ADRV902x]: Digital Pre-Distortion (DPD)学习笔记