Springboot整合Elasticsearch、搭建Logstash同步数据( 二 )
.postTags(postTg))
.withSort(sortBuilder)
.withPageable(pageable)
.build;
AggregatedPagemerchants=elasticsearchTemplate.queryForPage(query,Merchant.class,newSearchResultMapper{@OverridepublicAggregatedPagemapResults(SearchResponseresponse,ClassaClass,Pageablepageable){
SearchHitshits=response.getHits;
ListstuList=newArrayList<>;for(SearchHithit:hits){
HighlightFieldhighlightField=hit.getHighlightFields.get("mob");
Stringmob=highlightField.getFragments[0].toString;
ObjectmerchantId=hit.getSourceAsMap.get("merchantId");
Stringname=(String)hit.getSourceAsMap.get("name");
Stringdescr=(String)hit.getSourceAsMap.get("descr");
Stringaddress=(String)hit.getSourceAsMap.get("address");
MerchantmerchantHL=newMerchant;
merchantHL.setDescr(descr);
merchantHL.setMerchantId(Long.valueOf(merchantId.toString));
merchantHL.setName(name);
merchantHL.setMob(mob);
merchantHL.setAddress(address);
【Springboot整合Elasticsearch、搭建Logstash同步数据】stuList.add(merchantHL);
}if(!stuList.isEmpty){returnnewAggregatedPageImpl<>((List)stuList);
}returnnull;
}
});
log.info("分页总数:{}",merchants.getTotalPages);
Listcontent=merchants.getContent;for(Merchantmerchant:content){
log.info("分页文档数据:{}",merchant);
}
}
删除索引
@TestpublicvoiddeleteDelMerchantIndex{
elasticsearchTemplate.deleteIndex(Merchant.class);
}
小结
不建议使用ElasticsearchTemplate对索引进行管理(创建索引、更新映射、删除索引) , 因为就像使用Mysql , 不会通过java代码去改变表结构 , ES也是如此 , 更多是用来进行CRUD操作 。
Logstash
概念
作为数据采集的工具;
以id或update_time作为同步边界;
以id同步:初次同步的时候会将所有数据同步过来 , 之后logstash的定时任务回去检查 , 比如上次同步到id为2000的数据 , 那么这次就同步2000之后的数据 , 使用id作为同步有很大的弊端 , 就只只能新增数据 , 无法更新 。
以update_time同步:初次同步就将所有的数据同步过来 , 之后如果有新增或更新的操作 , 那么就把以上一次更新时间为界 , 之后的数据全部做一次新增或更新 。
使用logstash-input-jdbd插件同步;
使用logstash需要跟ES的版本保持一致 , 比如两者都要是6.4.3版本;
同步数据时 , 需要事先创建索引 。
安装配置
上传并解压 , 再上传mysql的驱动jar包 , 以及jdk(logstash需要jdk)
文章图片
cd进入logstash目录之后 , 创建文件夹
mkdirsync
进入sync目录 , 创建配置文件
#先创建同步配置文件vimlogstash-db-sync.conf#拷贝数据库驱动到该路径下cp/usr/local/mysql-connector-java-8.0.13.jar.
修改logstash-db-sync.conf
input{
jdbc{#设置Mysql/MariaDB数据库url以及数据库名称jdbc_connection_string=>"jdbc:mysql://192.168.1.6:3306/xxg?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"#数据库用户名密码jdbc_user=>"root"jdbc_password=>"123456"#数据库驱动所在位置 , 可以是绝对路径或相对路径#jdbc_driver_library=>"/usr/local/logstash-6.4.3/sync/mysql-connector-java-5.1.49.jar"jdbc_driver_library=>"/usr/local/logstash-6.4.3/sync/mysql-connector-java-8.0.13.jar"#驱动类名#jdbc_driver_class=>"com.mysql.jdbc.Driver"jdbc_driver_class=>"com.mysql.cj.jdbc.Driver"#开启分页jdbc_paging_enabled=>"true"#分页每页数量 , 可以自定义jdbc_page_size=>"10000"#执行的sql文件路径statement_filepath=>"/usr/local/logstash-6.4.3/sync/product.sql"#设置定时任务间隔schedule=>"*****"#索引类型type=>"_doc"#是否开启记录上次追踪的结构 , 也就是上次更新的实际 , 这个会记录到last_run_metadata_path的文件use_column_value=https://pcff.toutiao.jxnews.com.cn/p/20210904/>true#记录上一次追踪的结果值last_run_metadata_path=>"/usr/local/logstash-6.4.3/sync/track_time"#如果use_column_value为true , 配置本参数 , 追踪的colume名 , 可以是自增id或者时间tracking_column=>"update_time"#tracking_column对应字段的类型tracking_column_type=>"timestamp"#是否清除last_run_metadata_path的记录 , true则每次都从头开始查询所有clean_run=>false#数据库字段名称大写转小写lowercase_column_names=>false
- 算法|侃侃而谈| 为什么视频网站必然走向兼并整合?
- meta|Facebook内部通告:将重新整合即时通讯功能
- 驾驶舱|江苏扬州“人才科创驾驶舱” 整合资源推动三链深度融合
- 整合“长城、比亚迪”!吉利全新RADAR品牌将发
- 追漫神器来了!整合全网资源,安卓苹果两端都能免费用
- 36氪|36氪首发 |「乐胶网」完成Pre-A轮融资,整合包装耗材行业上下游资源,打造包装行业产业集群
- 苹果|新一代Apple CarPlay发布,深度整合数位仪表板!15家车企支持搭载
- 图灵奖|为什么数据整合很难?图灵奖得主迈克尔·斯通布雷克这样“解答”
- 华为|罗永浩:除了华为真的有技术外,其他都是方案整合商,各位不要装!
- 对于微软来说|微软更新android应用与windows11整合
