今天是实战篇,没有理论,只是记录一下使用Netty搭建一个本地服务,实现客户端数据发送到服务端数据Demo的教程,理论篇以及更深层次学习Netty可以点个关注,保证后续更新不错过!!!
搭建Netty服务端
服务端
使用main方法的形式构建,启动在本地8888
端口
package com.zuiyu.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @author @醉鱼 * @link https://github.com/TianPuJun * @ClassName NettyService * @Description * @Date 22:05 2022/6/28 **/ public class NettyService { public static void main(String[] args) throws InterruptedException { // 用于接收客户端链接的线程工作组 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); // 用于接收客户端链接读写操作的线程组 EventLoopGroup workGroup = new NioEventLoopGroup(); // 辅助类,帮助创建netty服务 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap // 绑定两个工作组 .group(eventLoopGroup,workGroup) // 设置nio 模式 .channel(NioServerSocketChannel.class) // option 针对服务器端配置,childOption 针对客户端链接通道配置 // 设置tcp 缓冲区 .option(ChannelOption.SO_BACKLOG,1024) // 设置发送数据的缓存大小 .childOption(ChannelOption.SO_SNDBUF,32*1024) // 设置读取数据的缓存大小 .childOption(ChannelOption.SO_RCVBUF,32*1024) // 设置保持长链接 .childOption(ChannelOption.SO_KEEPALIVE,true) // 初始化绑定服务通道 .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // 为通道进行初始化,数据传输过来的适合会进行拦截和执行(可以有多个拦截器) socketChannel.pipeline().addLast(new ServerHandler()); } }); ChannelFuture sync = serverBootstrap.bind(8888).sync(); // 释放链接 sync.channel().closeFuture().sync(); workGroup.shutdownGracefully(); eventLoopGroup.shutdownGracefully(); } }
服务端监听
新建ServerHandler
监听数据
package com.zuiyu.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import java.util.logging.Logger; /** * @author @醉鱼 * @link https://github.com/TianPuJun * @ClassName ServerHandler * @Description 服务回调 * @Date 22:15 2022/6/28 **/ public class ServerHandler extends ChannelInboundHandlerAdapter { public static final Logger LOGGER = Logger.getLogger("ServerHandler"); public ServerHandler() { super(); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("======服务端通道激活======"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } /** * 当通道有数据读取时的监听 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { // 传输buffer 对象 ByteBuf buf = (ByteBuf)msg; // 定义byte数组 byte[] req =new byte[buf.readableBytes()]; // 从缓冲区获取数据到req buf.readBytes(req); // 读取数据转为字符串 String body = new String(req,"utf-8"); LOGGER.info("服务端读取的数据:"+body); // 响应给客户端的数据 ctx.writeAndFlush(Unpooled.copiedBuffer("netty server response data: you are beautiful".getBytes())) // 添加addListener可以触发关闭通道监听事件,客户端短链接场景使用 // .addListener(ChannelFutureListener.CLOSE); ; }catch (Exception e){ e.printStackTrace(); }finally { ReferenceCountUtil.release(msg); } } /** * 当我们读取完成数据时触发的操作 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { LOGGER.info("====== 服务端数据读取完成 ======"); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { super.channelWritabilityChanged(ctx); } /** * 当我们读取数据异常的时候触发的监听 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOGGER.info("====== 服务端数据读取异常 ======"); LOGGER.info(cause.getMessage()); ctx.close(); } }
搭建Netty客户端
客户端
同样适用main方法的形式搭建测试,发送数据到本地8888
端口
package com.zuiyu.netty; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @author @醉鱼 * @link https://github.com/TianPuJun * @ClassName NettyClient * @Description * @Date 22:27 2022/6/28 **/ public class NettyClient { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ClientHandler()); } }); ChannelFuture channelFuture = b.connect("127.0.0.1", 8888).syncUninterruptibly(); channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("netty 客户端请求数据:zuiyu netty 服务发送数据测试".getBytes())); // 释放链接 channelFuture.channel().closeFuture().sync(); } }
客户端监听
新建客户端监听ClientHandler
,发送数据时打印数据
package com.zuiyu.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import java.util.logging.Logger; /** * @author @醉鱼 * @link https://github.com/TianPuJun * @ClassName ClientHandler * @Description * @Date 22:32 2022/6/28 **/ public class ClientHandler extends ChannelInboundHandlerAdapter { public static final Logger LOGGER = Logger.getLogger("ClientHandler"); public ClientHandler() { super(); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("====== 客户端通道激活======"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { // 传输buffer 对象 ByteBuf buf = (ByteBuf)msg; // 定义byte数组 byte[] req =new byte[buf.readableBytes()]; // 从缓冲区获取数据到req buf.readBytes(req); // 读取数据转为字符串 String body = new String(req,"utf-8"); LOGGER.info("客户端读取的数据:"+body); }catch (Exception e){ e.printStackTrace(); }finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { LOGGER.info("====== 客户端读取数据完毕======"); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { super.channelWritabilityChanged(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOGGER.info("====== 客户端读取数据异常======"); ctx.close(); } }
验证打印输出
- 先启动
NettyServer#main
- 再启动
NettyClient#main
- 结果如下
Demo 教程到此结束,只为记录过程,代码详细位置NettyServer
中都有注释,觉得有帮助动动小手点个关注再走哦,点关注不迷路
Demo 地址 :
https://github.com/TianPuJun/spring-notes/tree/master/netty
本文由 mdnice 多平台发布