docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中(三)

简介: docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

9.2 修改记录信息测试

同时,我们可以再测试修改,我们将用户头像路径进行修改,看看 es 是否同步了新的数据:

UPDATE `user` SET icon='https:///langlang' WHERE id=1001

查看 es 信息,使用 apipost 发送请求:【GET】http://192.168.65.133:9200/es_demo_collect/_search

// 以下是该请求需要携带的json数据,表示查询es_demo_collect索引中的全部文档数据
{
    "query": {
        "match_all": {}
    }
}

10.实战开发-后端代码

以下只展示我认为比较与本文相关的比较重要的文件,完整源码的获取链接我会放在文章的最后。

10.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.fox</groupId>
    <artifactId>elasticsearch-canal-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!--fastjson依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.33</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.7</version>
        </dependency>
    </dependencies>
</project>

10.2 application.yml配置

server:
  # 服务端口
  port: 9999
elasticsearch:
  # es访问ip
  hostname: 192.168.65.133
  # es访问port
  port: 9200
  blog:
    # 访问索引
    index: es_demo_collect
    # 搜索返回字段
    source_fields: userId,title,username,userIcon,introduce,createTime,updateTime

10.3 ElasticsearchConfig.java配置类

package com.fox.es.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 17:51
 */
@Configuration
public class ElasticsearchConfig {
    @Value("${elasticsearch.hostname}")
    private String hostname;
    @Value("${elasticsearch.port}")
    private Integer port;
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        RestClientBuilder builder = RestClient.builder(
                new HttpHost(hostname, port, "http")
        );
        return new RestHighLevelClient(builder);
    }
}

10.4 ⭐测试是否连接 es 成功

package com.fox.es.controller;
import com.fox.es.entity.Result;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 18:33
 */
@RestController
public class TestController {
    @Resource
    private RestHighLevelClient restHighLevelClient;
    /**
     * 用于测试是否连接 es 成功
     *
     * @return 返回 es 的基本信息,等价于访问:http://127.0.0.1:9200
     * @throws IOException 异常信息
     */
    @GetMapping("/getEsInfo")
    public Result getEsInfo() throws IOException {
        MainResponse info = restHighLevelClient.info(RequestOptions.DEFAULT);
        return Result.ok(info);
    }
}

浏览器访问:http://localhost:9999/getEsInfo

10.5 ⭐搜索服务

10.5.1 controller层

package com.fox.es_canal.controller;
import com.fox.es_canal.constant.BlogConstants;
import com.fox.es_canal.entity.Result;
import com.fox.es_canal.service.BlogService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:16
 */
@RestController
@RequestMapping("/blog")
public class BlogController {
    @Resource
    private BlogService blogService;
    /**
     * 通过关键词获取数据列表
     *
     * @param keyWords 关键词
     * @param pageNo   页码
     * @return 数据列表,按照相关性从高到低进行排序
     */
    @GetMapping("/list")
    public Result list(@RequestParam("keyWords") String keyWords,
                       @RequestParam("pageNo") Integer pageNo) {
        // BlogConstants是我写的一个常量类,里面定义了一个变量 SEARCH_PAGE_NUM = 15
        return blogService.list(keyWords, pageNo, BlogConstants.SEARCH_PAGE_NUM);
    }
}

10.5.2 service接口层

package com.fox.es_canal.service;
import com.fox.es_canal.entity.Result;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:18
 */
public interface BlogService {
    /**
     * 通过关键词获取数据列表
     *
     * @param keyWords 关键词
     * @param pageNo 页码
     * @param pageSize 每页大小
     * @return 数据列表,按照相关性从高到低进行排序
     */
    Result list(String keyWords, int pageNo, int pageSize);
}

10.5.3 service实现层

package com.fox.es_canal.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fox.es_canal.dto.BlogSimpleInfoDTO;
import com.fox.es_canal.entity.Result;
import com.fox.es_canal.service.BlogService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @author 狐狸半面添
 * @create 2023-03-22 20:18
 */
@Slf4j
@Service
public class BlogServiceImpl implements BlogService {
    @Resource
    private RestHighLevelClient restHighLevelClient;
    @Value("${elasticsearch.blog.index}")
    private String blogIndexStore;
    @Value("${elasticsearch.blog.source_fields}")
    private String blogFields;
    public Result list(String keyWords, int pageNo, int pageSize) {
        // 1.设置索引 - blog
        SearchRequest searchRequest = new SearchRequest(blogIndexStore);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        // 2.source源字段过虑
        String[] sourceFieldsArray = blogFields.split(",");
        searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
        // 3.关键字
        if (StringUtils.hasText(keyWords)) {
            // 哪些字段匹配关键字
            MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(keyWords, "title", "tags", "username", "introduce", "content");
            // 设置匹配占比(表示最少匹配的子句个数,例如有五个可选子句,最少的匹配个数为5*70%=3.5.向下取整为3,这就表示五个子句最少要匹配其中三个才能查到)
            multiMatchQueryBuilder.minimumShouldMatch("70%");
            // 提升字段的Boost值
            multiMatchQueryBuilder.field("title", 15);
            multiMatchQueryBuilder.field("tags", 10);
            multiMatchQueryBuilder.field("introduce", 7);
            multiMatchQueryBuilder.field("content", 3);
            multiMatchQueryBuilder.field("username", 3);
            boolQueryBuilder.must(multiMatchQueryBuilder);
        }
        // 4.分页
        int start = (pageNo - 1) * pageSize;
        searchSourceBuilder.from(start);
        searchSourceBuilder.size(pageSize);
        // 布尔查询
        searchSourceBuilder.query(boolQueryBuilder);
        // 6.高亮设置
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.preTags("<font color='red'>");
        highlightBuilder.postTags("</font>");
        // 设置高亮字段
        ArrayList<HighlightBuilder.Field> fields = new ArrayList<>();
        fields.add(new HighlightBuilder.Field("title"));
        fields.add(new HighlightBuilder.Field("introduce"));
        fields.add(new HighlightBuilder.Field("username"));
        highlightBuilder.fields().addAll(fields);
        searchSourceBuilder.highlighter(highlightBuilder);
        // 请求搜索
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse;
        try {
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("博客搜索异常:{}", e.getMessage());
            return Result.error(e.getMessage());
        }
        // 结果集处理
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        // 记录总数
        long totalHitsCount = hits.getTotalHits().value;
        // 数据列表
        List<BlogSimpleInfoDTO> list = new ArrayList<>();
        for (SearchHit hit : searchHits) {
            JSONObject jsonObject = JSONObject.parseObject(hit.getSourceAsString());
            BlogSimpleInfoDTO blog = new BlogSimpleInfoDTO();
            blog.setId(Integer.parseInt(hit.getId()));
            blog.setUsername(jsonObject.getString("username"));
            blog.setTitle(jsonObject.getString("title"));
            blog.setUserId(Long.parseLong(jsonObject.getString("userId")));
            blog.setUserIcon(jsonObject.getString("userIcon"));
            blog.setIntroduce(jsonObject.getString("introduce"));
            blog.setCreateTime(LocalDateTime.parse(jsonObject.getString("createTime"), DateTimeFormatter.ISO_OFFSET_DATE_TIME));
            blog.setUpdateTime(LocalDateTime.parse(jsonObject.getString("updateTime"), DateTimeFormatter.ISO_OFFSET_DATE_TIME));
            // 取出高亮字段内容
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            if (highlightFields != null) {
                blog.setTitle(parseHighlightStr(blog.getTitle(), highlightFields.get("title")));
                blog.setIntroduce(parseHighlightStr(blog.getIntroduce(), highlightFields.get("introduce")));
                blog.setUsername(parseHighlightStr(blog.getUsername(), highlightFields.get("username")));
            }
            list.add(blog);
        }
        // 封装信息返回前端
        HashMap<String, Object> resultMap = new HashMap<>(4);
        // 页码
        resultMap.put("pageNo", pageNo);
        // 每页记录数量
        resultMap.put("pageSize", pageSize);
        // 总记录数
        resultMap.put("total", totalHitsCount);
        // 该页信息
        resultMap.put("items", list);
        return Result.ok(resultMap);
    }
    public String parseHighlightStr(String text, HighlightField field) {
        if (field != null) {
            Text[] fragments = field.getFragments();
            StringBuilder stringBuilder = new StringBuilder();
            for (Text str : fragments) {
                stringBuilder.append(str.string());
            }
            return stringBuilder.toString();
        } else {
            return text;
        }
    }
}

10.5.4 效果测试

这里我们使用 apipost7 或浏览器 进行测试:

11.源码获取

Java源码地址:Mr-Write/SpringbootDemo: 各种demo案例 (github.com)

对应的是 elasticsearch-canal-demo 包模块。

12.其它说明

当我们在Java中写出往MySQL数据库添加、删除、修改博客记录的操作接口时,会同时通过 Canal 同步到es中,因为 canal 同步的本质还是去读 MySQL的 binlog 日志。由于比较简单,在这里就不做演示了。

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
相关文章
|
2月前
|
NoSQL 算法 Redis
【Docker】(3)学习Docker中 镜像与容器数据卷、映射关系!手把手带你安装 MySql主从同步 和 Redis三主三从集群!并且进行主从切换与扩容操作,还有分析 哈希分区 等知识点!
Union文件系统(UnionFS)是一种**分层、轻量级并且高性能的文件系统**,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下(unite several directories into a single virtual filesystem) Union 文件系统是 Docker 镜像的基础。 镜像可以通过分层来进行继承,基于基础镜像(没有父镜像),可以制作各种具体的应用镜像。
472 5
|
2月前
|
存储 关系型数据库 MySQL
MySQL Docker 容器化部署全指南
MySQL是一款开源关系型数据库,广泛用于Web及企业应用。Docker容器化部署可解决环境不一致、依赖冲突问题,实现高效、隔离、轻量的MySQL服务运行,支持数据持久化与快速迁移,适用于开发、测试及生产环境。
507 4
|
3月前
|
SQL 运维 关系型数据库
深入探讨MySQL的二进制日志(binlog)选项
总结而言,对MySQL binlogs深度理解并妥善配置对数据库运维管理至关重要;它不仅关系到系统性能优化也是实现高可靠性架构设计必须考虑因素之一。通过精心规划与周密部署可以使得该机能充分发挥作用而避免潜在风险带来影响。
136 6
|
4月前
|
存储 SQL 关系型数据库
MySQL中binlog、redolog与undolog的不同之处解析
每个都扮演回答回溯与错误修正机构角色: BinLog像历史记载员详细记载每件大大小小事件; RedoLog则像紧急救援队伍遇见突發情況追踪最后活动轨迹尽力补救; UndoLog就类似时间机器可倒带历史让一切归位原始样貌同时兼具平行宇宙观察能让多人同时看见各自期望看见历程而互不干扰.
239 9
|
4月前
|
关系型数据库 MySQL 数据库
为什么 MySQL 不推荐用 Docker 部署?
本文探讨了MySQL是否适合容器化的问题,分析了Docker容器在数据安全、性能瓶颈、状态管理及资源隔离等方面的挑战,并指出目前主流分布式数据库如TDSQL和OceanBase仍倾向于部署在物理机或KVM上。
272 0
|
8月前
|
安全 Java Linux
Linux安装Elasticsearch详细教程
Linux安装Elasticsearch详细教程
1443 64
|
7月前
|
JSON 安全 数据可视化
Elasticsearch(es)在Windows系统上的安装与部署(含Kibana)
Kibana 是 Elastic Stack(原 ELK Stack)中的核心数据可视化工具,主要与 Elasticsearch 配合使用,提供强大的数据探索、分析和展示功能。elasticsearch安装在windows上一般是zip文件,解压到对应目录。文件,elasticsearch8.x以上版本是自动开启安全认证的。kibana安装在windows上一般是zip文件,解压到对应目录。elasticsearch的默认端口是9200,访问。默认用户是elastic,密码需要重置。
3591 0
|
存储 安全 数据管理
如何在 Rocky Linux 8 上安装和配置 Elasticsearch
本文详细介绍了在 Rocky Linux 8 上安装和配置 Elasticsearch 的步骤,包括添加仓库、安装 Elasticsearch、配置文件修改、设置内存和文件描述符、启动和验证 Elasticsearch,以及常见问题的解决方法。通过这些步骤,你可以快速搭建起这个强大的分布式搜索和分析引擎。
469 5
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
存储 JSON Java
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
这篇文章是关于Elasticsearch的学习指南,包括了解Elasticsearch、版本对应、安装运行Elasticsearch和Kibana、安装head插件和elasticsearch-ik分词器的步骤。
1237 0
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。

热门文章

最新文章