教你用纯Java实现一个即时通讯系统(附源码)(上)

简介: 教你用纯Java实现一个即时通讯系统(附源码)(上)

项目背景


和各位读者大致介绍下具体场景,线上的小程序中开放一些语音麦克风的房间,让用户进入房间之后可以互相通过语音聊天的方式进行互动。


image.png


这里分享一下相关的技术设计方案。这款系统的核心点设计在于如何能让一个用户发出的语音通知到其他用户上边。语音数据在客户端同事的处理下最终变成了io数据流请求到了后端,后端只需要将这些数据流传达给各个不同的终端即可达到广播通知的效果。


单机版架构


最初期上线的时候,为了赶速度,快速试错,所以简单地采用了单机版架构去设计。结合技术栈为 SpringBoot,WebSocket,MySQL技术。


线上一间语音房间的同时在线人数并不会特别多,大概在15-50人的区间段内,系统核心代码是通过SpringBoot内部的WebSocket技术去进行数据的主动推送。


设计思路


整体的设计图比较简单,基本就是一台服务器存储WebSocket连接,如下图所示:


image.png


用户进行WebSocket初始化连接的时候需要一个连接分配和存储的过程:


image.png


早期的存储是存放在了服务器本地的一个Map集合中。


image.png


当WebSocket进行连接的时候就会往内存中写入一条数据信息,当链接断开的时候,就将内存中的数据移除。然后进行语音广播的时候需要结合WebSocket内部的广播发送功能进行通知


image.png


看似设计比较简单,但是在后期业务变得庞大的时候出现了瓶颈。因为随着参加语音活动用户的增加,越来越多的WebSocketSession对象需要被存储到内存当中,这种有状态性的存储对于单机扩容不灵活。


设计缺陷


1.假设原先的服务器扩容到了A,B两台机器,A用户在A机器上边建立了WebSocketSession,B用户在B机器上边建立的WebSocketSession连接。此时如果A想要和B进行对话发送,需要先查找到具体WebSocketSession存放在哪台机器上边。


2.当用户出现了网络异常,临时断开连接进行重连的时候,也可能会出现1所说的问题。


集群架构


设计思路


一旦出现需要发送语音通知的时候,发送一条广播的mq消息,每个机器都接收到消息之后,触发自己的广播操作即可。


RocketMq的接入系统设计里面mq采用的是广播模式,这和我们通常使用的集群模式有一定的区别。


消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:



  • 集群:使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。


  • 集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。


  • 广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。


集群消费模式适用场景 适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。


image.png


注意事项


  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。


  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。


广播消费模式适用场景 适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。


image.png


注意事项


  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同订阅逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
  • 广播模式下,消息队列RocketMQ版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 广播模式下服务端不维护消费进度,所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。


这里面的应用场景需要对集群内部对每个消费者都对服务器内存中的socket连接进行session是否存在对判断,因此需要采用mq的广播模式。


关于mq部分的接入代码


Consumer模块的配置:


package org.idea.web.socket.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
 * @Author linhao
 * @Date created in 10:30 上午 2021/5/10
 */
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MqConsumerConfig {
    private boolean isOn;
    private String groupName;
    private String nameSrvAddr;
    private String topics;
    private Integer consumeThreadMin;
    private Integer consumeThreadMax;
    private Integer consumeMessageBatchMaxSize;
    /**
     getter 和 setter部分省略
    **/
}


Producer模块的配置展示:


package org.idea.web.socket.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
 * @Author linhao
 * @Date created in 10:26 上午 2021/5/10
 */
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MqProducerConfig {
    private boolean isOn;
    private String groupName;
    private String nameSrvAddr;
    private Integer maxMessageSize;
    private Integer sendMsgTimeout;
    private Integer retryTimesWhenSendFailed;
    /**
     getter 和 setter部分省略
    **/
}


RocketMq内部的消费端Bean配置


package org.idea.web.socket.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.idea.web.socket.config.MqConsumerConfig;
import org.idea.web.socket.config.MqProducerConfig;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
 * @Author linhao
 * @Date created in 10:34 上午 2021/5/10
 */
@Configuration
@Slf4j
@EnableConfigurationProperties({MqConsumerConfig.class})
public class MqConsumerAutoConfig {
    @Resource
    private MqConsumerConfig mqConsumerConfig;
    @Resource
    //这个接口需要手动实现顺序消费的逻辑 每次获取到消息队列的第一条数据
    private MessageListenerHandler messageListenerConcurrently;
    @Bean
    @ConditionalOnMissingBean
    public DefaultMQPushConsumer defaultMQPushConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr());
        consumer.setConsumerGroup(mqConsumerConfig.getGroupName());
        consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin());
        consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax());
        consumer.registerMessageListener(messageListenerConcurrently);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //消费模型是什么?
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //默认一次拉取一条消费
        consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize());
        //*表示订阅所有的tag
        try {
            consumer.subscribe(mqConsumerConfig.getTopics(), "*");
            consumer.start();
            log.info("【 MqConsumerAutoConfig 】mq consumer is started!");
        } catch (Exception e) {
            log.error("mq start fail,e is ", e);
        }
        return consumer;
    }
}



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6天前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
32 7
|
18天前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
28 4
|
24天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
64 2
|
21天前
|
监控 Java API
如何使用Java语言快速开发一套智慧工地系统
使用Java开发智慧工地系统,采用Spring Cloud微服务架构和前后端分离设计,结合MySQL、MongoDB数据库及RESTful API,集成人脸识别、视频监控、设备与环境监测等功能模块,运用Spark/Flink处理大数据,ECharts/AntV G2实现数据可视化,确保系统安全与性能,采用敏捷开发模式,提供详尽文档与用户培训,支持云部署与容器化管理,快速构建高效、灵活的智慧工地解决方案。
|
12天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
42 12
|
6天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
12天前
|
设计模式 消息中间件 搜索推荐
Java 设计模式——观察者模式:从优衣库不使用新疆棉事件看系统的动态响应
【11月更文挑战第17天】观察者模式是一种行为设计模式,定义了一对多的依赖关系,使多个观察者对象能直接监听并响应某一主题对象的状态变化。本文介绍了观察者模式的基本概念、商业系统中的应用实例,如优衣库事件中各相关方的动态响应,以及模式的优势和实际系统设计中的应用建议,包括事件驱动架构和消息队列的使用。
|
28天前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
36 3
|
8天前
|
人工智能 移动开发 安全
家政上门系统用户端、阿姨端源码,java家政管理平台源码
家政上门系统基于互联网技术,整合大数据分析、AI算法和现代通信技术,提供便捷高效的家政服务。涵盖保洁、月嫂、烹饪等多元化服务,支持多终端访问,具备智能匹配、在线支付、订单管理等功能,确保服务透明、安全,适用于家庭生活的各种需求场景,推动家政市场规范化发展。
|
1月前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
79 3