教你用纯Java实现一个即时通讯系统(附源码)(下)

简介: 教你用纯Java实现一个即时通讯系统(附源码)(下)

RocketMq的服务生产者Bean配置


package org.idea.web.socket.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.idea.web.socket.config.MqProducerConfig;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
 * @Author linhao
 * @Date created in 11:05 上午 2021/5/10
 */
@Configuration
@Slf4j
@EnableConfigurationProperties({MqProducerConfig.class})
public class MqProducerAutoConfig {
    @Resource
    private MqProducerConfig mqProducerConfig;
    @Bean
    @ConditionalOnMissingBean
    //意味着DefaultMQProducer的配置可以被覆盖
    public DefaultMQProducer defaultMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());
        producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());
        //没有则自动创建topic的key
//        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
        producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());
        producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());
        producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());
        try {
            producer.start();
            log.info("【 MqProducerAutoConfig 】mq producer is started!");
        } catch (Exception e) {
            log.error("[MqProducerAutoConfig] start fail, e is ", e);
        }
        return producer;
    }
}


然后是对RocketMq内部发送消息事件的一层函数封装


package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.idea.web.socket.config.MqProducerConfig;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/**
 * 消息广播发送端
 *
 * @Author linhao
 * @Date created in 10:43 下午 2021/5/9
 */
@Component
@Slf4j
public class BroadcastMqProducer {
    @Resource
    private DefaultMQProducer defaultMQProducer;
    @Resource
    private MqProducerConfig mqProducerConfig;
    private static String TOPIC = "ws-topic";
    private static String TAGS = "ws-tag";
    public static Integer ALL_USER_RECEIVE_TYPE = 1;
    public static Integer ONE_USER_RECEIVE_TYPE = 2;
    /**
     * 点对点之间的消息发送
     *
     * @param destSessionKey
     * @param msg
     * @return
     */
    public SendResult sendWebSocketToUser(String destSessionKey,String msg) {
        if (StringUtils.isEmpty(msg)) {
            log.error("[sendWebSocketToUser] msg can not be null!");
            return null;
        }
        Message message = null;
        SendResult sendResult = null;
        try {
            BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
            broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);
            broadcastMqDTO.setMessage(msg);
            broadcastMqDTO.setSessionKey(destSessionKey);
            message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = defaultMQProducer.send(message);
        } catch (Exception e) {
            log.error("[sendWebSocketBroadcastMsg] e is ", e);
        }
        return sendResult;
    }
    /**
     * 广播消息发送
     *
     * @param msg
     * @return
     */
    public SendResult sendWebSocketBroadcastMsg(String msg) {
        if (StringUtils.isEmpty(msg)) {
            log.error("[sendWebSocketBroadcastMsg] msg can not be null!");
            return null;
        }
        Message message = null;
        SendResult sendResult = null;
        try {
            BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
            broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);
            broadcastMqDTO.setMessage(msg);
            message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = defaultMQProducer.send(message);
        } catch (Exception e) {
            log.error("[sendWebSocketBroadcastMsg] e is ", e);
        }
        return sendResult;
    }
}


对消息的订阅模块实现代码如下:


package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import com.oracle.tools.packager.Log;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.idea.web.socket.manager.SocketManager;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
import java.util.List;
import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;
import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;
/**
 * @Author linhao
 * @Date created in 10:59 上午 2021/5/10
 */
@Component
@Slf4j
public class MessageListenerHandler implements MessageListenerConcurrently {
    @Resource
    private SocketManager socketManager;
    @Resource
    private SimpMessagingTemplate template;
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(list)) {
            Log.info("receive empty msg");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = list.get(0);
        byte[] bytes = messageExt.getBody();
        String json = new String(bytes);
        BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);
        log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);
        if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
            log.info("[consumeMessage] 广播发送消息:触发----》消息内容为:" + broadcastMqDTO);
            template.convertAndSend("/topic/sendTopic", broadcastMqDTO);
        } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
            String sessionKey = broadcastMqDTO.getSessionKey();
            WebSocketSession webSocketSession = socketManager.get(sessionKey);
            if (webSocketSession != null) {
                template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());
                log.info("[consumeMessage] 点对点发送消息;触发----》消息内容为:" + broadcastMqDTO);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}


整体设计结构如下图:


image.png


于是按照这个结构进行了一版本的紧急开发迭代,原先的单台服务器扩展为了服务集群。


业务拓展后续产品经理提出一个需求,要求支持在同一间房内的两个用户之间发送悄悄话功能。这就需要我们进行一个点对点之间传输通讯的功能了。因此需要在mq通知到每台机器的时候加一个本地Session遍历的逻辑,如果当前机器存有用户token对应的session变量,那么就单独针对那个Session进行WebSocket的发送通知。


image.png


设计弊端一旦某台机器出现了异常崩溃,那么就意味着这台机器上的所有语音连接可能会出现中断情况。目前这一块的问题也在考虑解决,计划是将WebSocketSession存入到分布式缓存的redis中保证数据可靠存储,但是在后续尝试的时候发现WebSocketSession对象没有实现序列化接口,在存储到Redis的时候会出现异常。目前这个问题还在寻找解决思路中,不知道各位读者朋友们有什么好的思路。


遇到的问题点用户请求直接访问到了我们的内部服务器,如果在请求的中间加入一台nginx做负载均衡则需要在nginx中配置一些额外信息。


项目的源代码比较多,这里我把核心部分的代码整理了一份,感兴趣的朋友可以到我的gitee上边去下载:


https://gitee.com/IdeaHome_admin/socket-framework


推荐好文


>>【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能


>>分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!

>>能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6天前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
32 7
|
18天前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
28 4
|
24天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
64 2
|
21天前
|
监控 Java API
如何使用Java语言快速开发一套智慧工地系统
使用Java开发智慧工地系统,采用Spring Cloud微服务架构和前后端分离设计,结合MySQL、MongoDB数据库及RESTful API,集成人脸识别、视频监控、设备与环境监测等功能模块,运用Spark/Flink处理大数据,ECharts/AntV G2实现数据可视化,确保系统安全与性能,采用敏捷开发模式,提供详尽文档与用户培训,支持云部署与容器化管理,快速构建高效、灵活的智慧工地解决方案。
|
12天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
42 12
|
6天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
12天前
|
设计模式 消息中间件 搜索推荐
Java 设计模式——观察者模式:从优衣库不使用新疆棉事件看系统的动态响应
【11月更文挑战第17天】观察者模式是一种行为设计模式,定义了一对多的依赖关系,使多个观察者对象能直接监听并响应某一主题对象的状态变化。本文介绍了观察者模式的基本概念、商业系统中的应用实例,如优衣库事件中各相关方的动态响应,以及模式的优势和实际系统设计中的应用建议,包括事件驱动架构和消息队列的使用。
|
28天前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
36 3
|
8天前
|
人工智能 移动开发 安全
家政上门系统用户端、阿姨端源码,java家政管理平台源码
家政上门系统基于互联网技术,整合大数据分析、AI算法和现代通信技术,提供便捷高效的家政服务。涵盖保洁、月嫂、烹饪等多元化服务,支持多终端访问,具备智能匹配、在线支付、订单管理等功能,确保服务透明、安全,适用于家庭生活的各种需求场景,推动家政市场规范化发展。
|
1月前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
79 3