Netty(一) SpringBoot 整合长连接心跳机制(上)

简介: Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。

前言


Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。


最终能达到的效果:


  • 客户端每隔 N 秒检测是否需要发送心跳。


  • 服务端也每隔 N 秒检测是否需要发送心跳。


  • 服务端可以主动 push 消息到客户端。


  • 基于 SpringBoot 监控,可以查看实时连接以及各种应用信息。


效果如下:



IdleStateHandler


Netty 可以使用 IdleStateHandler 来实现连接管理,当连接空闲时间太长(没有发送、接收消息)时则会触发一个事件,我们便可在该事件中实现心跳机制。


客户端心跳


当客户端空闲了 N 秒没有给服务端发送消息时会自动发送一个心跳来维持连接。


核心代码代码如下:


public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
    private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class);
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
            if (idleStateEvent.state() == IdleState.WRITER_IDLE){
                LOGGER.info("已经 10 秒没有发送信息!");
                //向服务端发送消息
                CustomProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", CustomProtocol.class);
                ctx.writeAndFlush(heartBeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
            }
        }
        super.userEventTriggered(ctx, evt);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception {
        //从服务端收到消息时被调用
        LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
    }
}    


实现非常简单,只需要在事件回调中发送一个消息即可。


由于整合了 SpringBoot ,所以发送的心跳信息是一个单例的 Bean。


@Configuration
public class HeartBeatConfig {
    @Value("${channel.id}")
    private long id ;
    @Bean(value = "heartBeat")
    public CustomProtocol heartBeat(){
        return new CustomProtocol(id,"ping") ;
    }
}


这里涉及到了自定义协议的内容,请继续查看下文。


当然少不了启动引导:


@Component
public class HeartbeatClient {
    private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClient.class);
    private EventLoopGroup group = new NioEventLoopGroup();
    @Value("${netty.server.port}")
    private int nettyPort;
    @Value("${netty.server.host}")
    private String host;
    private SocketChannel channel;
    @PostConstruct
    public void start() throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new CustomerHandleInitializer())
        ;
        ChannelFuture future = bootstrap.connect(host, nettyPort).sync();
        if (future.isSuccess()) {
            LOGGER.info("启动 Netty 成功");
        }
        channel = (SocketChannel) future.channel();
    }
}
public class CustomerHandleInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline()
                //10 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
                .addLast(new IdleStateHandler(0, 10, 0))
                .addLast(new HeartbeatEncode())
                .addLast(new EchoClientHandle())
        ;
    }
}    


所以当应用启动每隔 10 秒会检测是否发送过消息,不然就会发送心跳信息。



服务端心跳


服务器端的心跳其实也是类似,也需要在 ChannelPipeline 中添加一个 IdleStateHandler 。


public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomProtocol> {
    private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatSimpleHandle.class);
    private static final ByteBuf HEART_BEAT =  Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(new CustomProtocol(123456L,"pong").toString(),CharsetUtil.UTF_8));
    /**
     * 取消绑定
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettySocketHolder.remove((NioSocketChannel) ctx.channel());
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
            if (idleStateEvent.state() == IdleState.READER_IDLE){
                LOGGER.info("已经5秒没有收到信息!");
                //向客户端发送消息
                ctx.writeAndFlush(HEART_BEAT).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
            }
        }
        super.userEventTriggered(ctx, evt);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol customProtocol) throws Exception {
        LOGGER.info("收到customProtocol={}", customProtocol);
        //保存客户端与 Channel 之间的关系
        NettySocketHolder.put(customProtocol.getId(),(NioSocketChannel)ctx.channel()) ;
    }
}


相关文章
|
5月前
|
开发框架 前端开发 网络协议
Spring Boot结合Netty和WebSocket,实现后台向前端实时推送信息
【10月更文挑战第18天】 在现代互联网应用中,实时通信变得越来越重要。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为客户端和服务器之间的实时数据传输提供了一种高效的解决方案。Netty作为一个高性能、事件驱动的NIO框架,它基于Java NIO实现了异步和事件驱动的网络应用程序。Spring Boot是一个基于Spring框架的微服务开发框架,它提供了许多开箱即用的功能和简化配置的机制。本文将详细介绍如何使用Spring Boot集成Netty和WebSocket,实现后台向前端推送信息的功能。
1118 1
|
3月前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
197 14
|
5月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
116 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
5月前
|
架构师 Java 开发者
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
在40岁老架构师尼恩的读者交流群中,近期多位读者成功获得了知名互联网企业的面试机会,如得物、阿里、滴滴等。然而,面对“Spring Boot自动装配机制”等核心面试题,部分读者因准备不足而未能顺利通过。为此,尼恩团队将系统化梳理和总结这一主题,帮助大家全面提升技术水平,让面试官“爱到不能自已”。
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
|
7月前
|
Java Spring 监控
Spring Boot Actuator:守护你的应用心跳,让监控变得触手可及!
【8月更文挑战第31天】Spring Boot Actuator 是 Spring Boot 框架的核心模块之一,提供了生产就绪的特性,用于监控和管理 Spring Boot 应用程序。通过 Actuator,开发者可以轻松访问应用内部状态、执行健康检查、收集度量指标等。启用 Actuator 需在 `pom.xml` 中添加 `spring-boot-starter-actuator` 依赖,并通过配置文件调整端点暴露和安全性。Actuator 还支持与外部监控工具(如 Prometheus)集成,实现全面的应用性能监控。正确配置 Actuator 可显著提升应用的稳定性和安全性。
232 0
|
7月前
|
安全 Java UED
掌握SpringBoot单点登录精髓,单点登录是一种身份认证机制
【8月更文挑战第31天】单点登录(Single Sign-On,简称SSO)是一种身份认证机制,它允许用户只需在多个相互信任的应用系统中登录一次,即可访问所有系统,而无需重复输入用户名和密码。在微服务架构日益盛行的今天,SSO成为提升用户体验和系统安全性的重要手段。本文将详细介绍如何在SpringBoot中实现SSO,并附上示例代码。
143 0
|
7月前
|
消息中间件 Java Kafka
深入SpringBoot的心脏地带:掌握其核心机制的全方位指南
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型,具有高可靠性及稳定性;RocketMQ 则是由阿里巴巴开源的高性能分布式消息队列,支持事务消息等多种特性;而 Kafka 是 LinkedIn 开源的分布式流处理平台,以其高吞吐量和良好的可扩展性著称。文中还提供了使用这三种消息队列产品的示例代码。总之,这三款产品各有优势,适用于不同场景。
28 0
|
7月前
|
消息中间件 Java Kafka
SpringBoot大揭秘:如何轻松掌握其核心机制?
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型,具有高可靠性及稳定性;RocketMQ 则是由阿里巴巴开源的高性能分布式消息队列,支持事务消息等多种特性;而 Kafka 是 LinkedIn 开源的分布式流处理平台,以其高吞吐量和良好的可扩展性著称。文中还提供了使用这三种消息队列产品的示例代码。
29 0
|
7月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
57 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13587 1