准备
1、NettyServer
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 1、创建bossGroup线程组:处理网络连接事件。默认线程数:2*处理器线程数
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 2、创建workGroup线程组:处理网络read/write事件。 默认线程数:2*处理器线程数
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 3、创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 4、服务端启动助手,设置线程组
serverBootstrap.group(bossGroup,workerGroup)
// 5、设置服务端Channel实现类
.channel(NioServerSocketChannel.class)
// 6、设置bossGroup线程队列中等待连接个数
.option(ChannelOption.SO_BACKLOG,128)
// 7、设置workerGroup中线程活跃状态
.childOption(ChannelOption.SO_KEEPALIVE,true)
// 使用channelInitializer 可以配置多个handler
.childHandler(new ChannelInitializer<SocketChannel>() {// 8、设置一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 9、向pipeline中添加自定义的channelHandler, 处理socketChannel传送的数据
ch.pipeline().addLast(new NettyServerHandler());
}
});
// 10、服务端启动并绑定端口
ChannelFuture future = serverBootstrap.bind(9999).sync();
// 给服务器启动绑定结果,对结果进行监听,触发回调
future.addListener((ChannelFuture channelFuture)-> {
if(channelFuture.isSuccess()){
System.out.println("服务器启动成功");
}else {
System.out.println("服务器启动失败");
}
});
// 11、关闭监听通道和连接池,将异步改同步
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
2、NettyServerHandler
/**
* 自定义的channelHandler处理器
*
* 事件触发,触发相应函数
*/
public class NettyServerHandler implements ChannelInboundHandler {
/**
* 通道读取事件
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuffer = (ByteBuf)msg;
System.out.println("客户端:"+byteBuffer.toString(CharsetUtil.UTF_8));
}
/**
* 通道数据读取完毕事件
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
TimeUnit.SECONDS.sleep(2);
ctx.writeAndFlush(Unpooled.copiedBuffer("叫我靓仔!!!".getBytes()));
}
/**
* 发生异常捕获事件
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 通道就绪事件
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
3、NettyClient
/**
* nettyClient
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 1、创建线程组
NioEventLoopGroup group = new NioEventLoopGroup();
// 2、创建客户端启动助手bootstrap
Bootstrap bootstrap = new Bootstrap();
// 3、配置线程组
bootstrap.group(group)
// 4、定义socketChannel的实现类
.channel(NioSocketChannel.class)
// 5、定义channelHandler, 处理socketChannel的数据
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//6、向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyClientHandler());
}
});
// 7、启动客户端, 等待连接服务端, 同时将异步改为同步
ChannelFuture future = bootstrap.connect(new InetSocketAddress(9999)).sync();
// 8、关闭通道和关闭连接池
future.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
4、NettyClientHandler
/**
* 自定义的channelHandler处理器
* <p>
* 事件触发,触发相应函数
*/
public class NettyClientHandler implements ChannelInboundHandler {
/**
* 通道读取事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务端:" +
byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 通道数据读取完毕事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("不行,不行啊!!!".getBytes()));
}
/**
* 发生异常捕获事件
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好哇 小客客!!!".getBytes()));
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
NioEventLoopGroup创建流程
1、定义线程数
如果创建线程组的时候没有指定线程数,那么默认线程数将通过指定系统参数或者CPU逻辑处理核数*2来定义。`Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));`
2、获取执行器
ThreadPerTaskExecutor本质上就要线程工厂创建新的线程执行任务,这里包装了一层。
Thread 使用的是FastThreadLocalThread,优化ThreadLocal在哪?
3、创建n个线程的NioEventLoop
3.1、创建任务队列TaskQueue
3.2、获取SelectorProvider提供器
selectNow 以非阻塞的方式获取感兴趣的事件,感兴趣事件指:SocketChannel注册到Selector上要求监听的事件
3.3、绑定SelectStrategy
SelectStrategy 实现类为 DefaultSelectStrategy,执行逻辑,判断任务队列中是否有任务,最终返回一个int值,返回SELECT = -1 为阻塞当前线程
4、为每个NioEventLoop绑定一个中断监听器
总结
NioEventLoopGroup内部结构
执行图