Netty的分隔符和定长解码器应用

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010741376/article/details/46355755

TCP以流的方式进行数据传输,上层的应用协议为了对消息进行分区,往往采用下面4种方式:

    (1)消息长度固定,累计读取到长度总和为定长LEN的报文后,就以为读取到了一个完整的消息;将计数器置位,重新开始读取下一个数据报;

     (2)将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;

     (3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符;

      (4)通过在消息头中定义长度字段来标识消息的总长度。

我们先来看看通过DelimiterBasedFrameDecoder的使用,我们可以自动完成以分隔符作为码流结束标识的消息的解码;通过一个实例来演示,EchoServer接受到EchoClient的请求 消息后,将其打印出来,然后将原始消息返回给客户端,消息以"$_"作为分隔符。

    EchoServer服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class EchoServer {
      public void bind(int port) throws InterruptedException{
    	  //配置服务端的NIO线程组
    	  EventLoopGroup bossGroup=new NioEventLoopGroup();
    	  EventLoopGroup workerGroup=new NioEventLoopGroup();
    	  
    	  try {
			ServerBootstrap b=new ServerBootstrap();
			b.group(bossGroup, workerGroup)
			.channel(NioServerSocketChannel.class)
			.option(ChannelOption.SO_BACKLOG, 100)
			.handler(new LoggingHandler(LogLevel.INFO))
			.childHandler(new ChannelInitializer<SocketChannel>(){

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					//创建分隔符缓冲对象,使用$_作为分隔符
				  ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
				  //创建DelimiterBasedFrameDecoder对象,加入到ChannelPipeline中,它有两个参数,第一个参数表示单条消息的
				  //最大长度,当达到该长度的后仍然没有查找到分隔符,就抛出TooLongFrameException异常,防止由于异常码流缺失分隔符导致的内存溢出
				  //这是Netty解码器的可靠性保护;第二个参数就是分隔符缓冲对象
					ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
					ch.pipeline().addLast(new StringDecoder());
					ch.pipeline().addLast(new EchoServerHandler());
				}
				 
			});
			//绑定端口,同步等待成功
			ChannelFuture f=b.bind(port).sync();
			//等待服务端监听端口关闭
			f.channel().closeFuture().sync();
			
			
		} finally{
			//优雅退出,释放线程资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
 
      }
      public static void main(String[] args) throws InterruptedException {
		int port=8088;
		if(args!=null&&args.length>0){
			 try {
				 port=Integer.valueOf(args[0]);
			} catch (Exception e) {
				// TODO: handle exception
			}
		}
    	  new EchoServer().bind(port);
    	  
	}
      
      
}

EchoServerHandler服务端EchoServerHandler:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoServerHandler extends ChannelHandlerAdapter{
      int counter=0;
      
      public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
    	  String body=(String)msg;
    	  System.out.println("this is "+ ++counter+" time receive client:["+body+"]");
    	  //由于我们设置DelimiterBasedFrameDecoder过滤掉了分隔符,所以,返回给客户端时需要在请求消息尾拼接分隔符“$_”
    	  //最后创建ByteBuf,将原始消息重新返回给客户端
    	  body+= "$_";
    	  ByteBuf echo=Unpooled.copiedBuffer(body.getBytes());
    	  ctx.writeAndFlush(echo);
    	  
      }
      public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
    	  cause.printStackTrace();
    	  ctx.close();
      }
      
}
EchoClient客户端EchoClient:

 

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class EchoClient {
       public void connect(int port,String host)throws Exception{
    	   //配置客户端NIO线程组
    	   EventLoopGroup group=new NioEventLoopGroup();
    	   try {
			 Bootstrap b=new Bootstrap();
			 b.group(group).channel(NioSocketChannel.class)
			 .option(ChannelOption.TCP_NODELAY, true)
			 .handler(new ChannelInitializer<SocketChannel>(){

				@Override
				protected void initChannel(SocketChannel arg0) throws Exception {
				  ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
					arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
					arg0.pipeline().addLast(new StringDecoder());
					arg0.pipeline().addLast(new EchoClientHandler());
				}
				  
			 });
    		 //发起异步链接操作
			ChannelFuture f=b.connect(host,port).sync();
			 //等待客户端链路关闭
			f.channel().closeFuture().sync();
			 
		} finally{
			//优雅退出,释放NIO线程组
			group.shutdownGracefully();
		}
 
       }
       public static void main(String[] args) throws Exception {
		  int port=8088;
		  if(args!=null&&args.length>0){
			  try {
				port=Integer.valueOf(args[0]);
			} catch (Exception e) {
				// TODO: handle exception
			}
		  }
    	   new EchoClient().connect(port, "127.0.0.1");   
    	   
	}
       
}

EchoClient客户端EchoClientHandler:

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoClientHandler extends ChannelHandlerAdapter{
      private int counter;
      static final String ECHO_REQ="hi,Mr yang Welcome to Netty.$_";
      
      public EchoClientHandler(){
    	  
      }
      public void channelActive(ChannelHandlerContext ctx){
    	  for(int i=0;i<10;i++){
    		  ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
    	  }
    	  
      }
      public void channelRead(ChannelHandlerContext ctx,Object msg){
    	  System.out.println("this is "+ ++counter+ "time receive server:["+msg+"]");
    	  
      }
      public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
    	  ctx.flush();
      }
      public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
    	  cause.printStackTrace();
    	  ctx.close();
      }
      
      
      
}
服务端运行结果:

客户端运行结果:


还有一个FixedLengthFrameDecoder解码器,它是一个固定长度解码器.

只要在服务端的ChannelPipeline中新增FixedLengthFrameDecoder,设置一个长度,然后再添加字符串解码器和处理事件就可以了,用法都差不多。

相关文章
|
移动开发 编解码 Java
Netty编码器和解码器
Netty从底层Java通道读到ByteBuf二进制数据,传入Netty通道的流水线,随后开始入站处理。在入站处理过程中,需要将ByteBuf二进制类型解码成Java POJO对象。这个解码过程可以通过Netty的Decoder解码器去完成。在出站处理过程中,业务处理后的结果需要从某个Java POJO对象编码为最终的ByteBuf二进制数据,然后通过底层 Java通道发送到对端。在编码过程中,需要用到Netty的Encoder编码器去完成数据的编码工作。
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
812 2
|
前端开发 Java 数据处理
使用Netty构建高性能的网络应用
使用Netty构建高性能的网络应用
|
前端开发 Java 数据处理
使用Netty构建高性能的网络应用
使用Netty构建高性能的网络应用
|
移动开发 网络协议 Java
通信密码学:探秘Netty中解码器的神奇力量
通信密码学:探秘Netty中解码器的神奇力量
374 0
|
Java Maven
Netty | 属于你的第一款Netty应用程序 🚀
Netty | 属于你的第一款Netty应用程序 🚀
201 0
|
编解码
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
281 0
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
360 0
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
412 0
|
存储 编解码 Java
Netty使用篇:自定义编解码器
Netty使用篇:自定义编解码器