欢迎来到我的博客,代码的世界里,每一行都是一个故事
前言
在物联网时代,设备之间的通信变得愈发重要。本文将带你踏上一场关于如何用Spring Boot和Netty搭建TCP服务端的冒险之旅。无论是智能家居、工业自动化还是其他物联网应用,构建一个稳健的通信桥梁将成为连接未来的关键。
功能目标
- 实现springboot+netty整合TCP服务端(基础)
- 实现消息回复功能
- 实现消息太长导致的粘包问题(比如发送一个base64的图片信息)
- 实现在自定义Handler中注入spring的bean
- 保证完成任务,哈哈哈哈哈
项目实现
maven坐标
<!-- netty 这里你也可以引入全部--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>4.1.79.Final</version> </dependency>
构建自定义Handler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; /** * I/O数据读写处理类 * * @author xiaobo */ @Slf4j public class CarTcpNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter { /** * 从客户端收到新的数据时,这个方法会在收到消息时被调用 * * @param ctx * @param msg */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException { // 这里是在前面的DelimiterBasedFrameDecoder转为了ByteBuf,验证是否是ByteBuf if (msg instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) msg; try { String receivedData = byteBuf.toString(CharsetUtil.UTF_8); // 接收完整数据 handleReceivedData(receivedData); } finally { // 释放 ByteBuf 占用的资源 byteBuf.release(); // 回复消息 ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8)); } } } private void handleReceivedData(String receivedData) { // 数据处理 // 这里如果想实现spring中bean的注入,可以用geBean的方式获取 } /** * 从客户端收到新的数据、读取完成时调用 * * @param ctx */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { log.info("channelReadComplete"); ctx.flush(); } /** * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException { cause.printStackTrace(); ctx.close();// 抛出异常,断开与客户端的连接 } /** * 客户端与服务端第一次建立连接时 执行 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException { super.channelActive(ctx); ctx.channel().read(); InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = socket.getAddress().getHostAddress(); // 此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接 System.out.println("channelActive:" + clientIp + ctx.name()); // 这里是向客户端发送回应 ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8)); } /** * 客户端与服务端 断连时 执行 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException { super.channelInactive(ctx); InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = socket.getAddress().getHostAddress(); // 断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机 ctx.close(); log.info("channelInactive:{}", clientIp); } /** * 服务端当read超时, 会调用这个方法 * * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException { super.userEventTriggered(ctx, evt); InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = socket.getAddress().getHostAddress(); ctx.close();// 超时时断开连接 log.info("userEventTriggered:" + clientIp); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { log.info("channelWritabilityChanged"); } }
ChannelInitializer实现
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.handler.codec.DelimiterBasedFrameDecoder; /** * description: <h1>通道初始化</h1> * * @author bo * @version 1.0 * @date 2024/2/27 16:13 */ public class CarTcpNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delemiter = Unpooled.buffer(); delemiter.writeBytes("$".getBytes()); // 这里就是解决数据过长问题,而且数据是以$结尾的 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(907200, true, true, delemiter)); // 自定义ChannelInboundHandlerAdapter ch.pipeline().addLast(new CarTcpNettyChannelInboundHandlerAdapter()); } }
server实现
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; /** * description: <h1>netty创建的TCP</h1> * * @author bo * @version 1.0 * @date 2024/2/27 16:25 */ @Slf4j public class CarTcpNettyServer { public void bind(int port) throws Exception { // 配置服务端的NIO线程组 // NioEventLoopGroup 是用来处理I/O操作的Reactor线程组 // bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写, // bossGroup接收到连接后就会把连接信息注册到workerGroup // workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // netty 默认数据包传输大小为1024字节, 设置它可以自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费 最小 初始化 最大 (根据生产环境实际情况来定) // 使用对象池,重用缓冲区 .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576)) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576)) // 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息 .childHandler(new CarTcpNettyChannelInitializer<SocketChannel>()); log.info("<===========netty server start success!==============>"); // 绑定端口,同步等待成功 ChannelFuture f = serverBootstrap.bind(port).sync(); // 等待服务器监听端口关闭 f.channel().closeFuture().sync(); } finally { // 退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }