如何解决 Netty Channel.isWritable 返回 false

简介: 在 Netty 里,有4个方法用来查询 Channel 的状态:isOpen,isRegistered,isActive,isWritable,其中,isWritable 在并发量很高时会返回很多 false。 isWritable 是什么含义? isWritable:Returns true if and only if the I/O thread will perform the req

在 Netty 里,有4个方法用来查询 Channel 的状态:isOpen,isRegistered,isActive,isWritable,其中,isWritable 在并发量很高时会返回很多 false。

isWritable 是什么含义?

isWritable:Returns true if and only if the I/O thread will perform the requested write operation immediately. Any write requests made when this method returns false are queued until the I/O thread is ready to process the queued write requests.

为什么 isWritable 会返回 false?而且这个问题在 stackoverflow,netty 社区也被很多人问到。

i.e.

if (channel.isWritable()) {
    channel.writeAndFlush(data);
}

 

首先,说下 netty 处理 writeAndFlush 的原理:

1、业务线程调用 writeAndFlush 发送消息,会生成 WriteAndFlushTask,交由 IO 线程处理,write 操作将消息写入 ChannelOutboundBuffer(不会写到 socket),flush 操作将 ChannelOutboundBuffer 写 入socket 的发送缓冲区;(这里注意,writeAndFlush 它只是一个语法糖,意味着这不是原子操作,因此在此方法执行的中间,可能在多个线程之间进行了上下文切换。)

2、ChannelOutboundBuffer 它配置一个高水位线和低水位线,当 buffer 的大小超过高水位线的时候对应 channel 的 isWritable 就会变成 false,当 buffer 的大小低于低水位线的时候,isWritable 就会变成 true。

其中,高水位线和低水位线是字节数,默认高水位是64K,低水位是32K,通过以下方式进行设置:

.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 64 * 1024)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024)

 

如何解决这个问题?

ChannelOutboundBuffer 的容量过高或过低时都会触发 fireChannelWritabilityChanged 方法,因此可通过重写 channelWritabilityChanged 方法调整消息产生速度。

在常用的中间件里,我们看看它们是如何处理的:

1、Notify 在给订阅组投递消息时,先检查此订阅组的 Channel 是否超过最高位,如果是,则此次不投递,如果不是,继续投递。

2、Flink 核心发送方法中如果 Channel 不可写,则会跳过发送,当 Channel 再次可写后,Netty 会调用该 Handle 的 ChannelWritabilityChanged 方法,从而重新触发发送函数。

3、Duboo 发送请求时,判断是否已经关闭的 Channel,如果是,不再放入连接池,重新申请连接。

总之,在使用 Channel 写数据之前,建议使用 isWritable 方法来判断一下当前 ChannelOutboundBuffer 里的写缓存水位,防止 OOM 发生。

目录
相关文章
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
前端开发 算法 Java
Netty组件Future、Promise、Handler、Pipline、ByteBuf
Netty组件Future、Promise、Handler、Pipline、ByteBuf
141 0
|
前端开发 Java 数据处理
【Netty】Netty 异步任务模型 及 Future-Listener 机制
【Netty】Netty 异步任务模型 及 Future-Listener 机制
759 0
【Netty】Netty 异步任务模型 及 Future-Listener 机制
|
缓存 移动开发 网络协议
第 8 章 Netty 编解码器和 Handler 调用机制
第 8 章 Netty 编解码器和 Handler 调用机制
163 0
|
网络协议 前端开发 UED
Netty之服务端channel的初始化
Netty之服务端channel的初始化
106 0
|
网络协议 API 调度
Netty「源码分析」之 Idle 检测
Netty「源码分析」之 Idle 检测
204 0
|
网络协议 Java 程序员
Netty源码分析(一) backlog 参数
Netty源码分析(一) backlog 参数
284 0
|
设计模式 缓存 网络协议
Netty4 Channel 概述(通道篇)
Netty4 Channel 概述(通道篇)
Netty4 Channel 概述(通道篇)
|
网络协议 Java API
Java NIO 概述(Channel、Buffer、Selector)
Java NIO (New IO 或 Non Bloking IO) 是从 Java 1.4 版本开始引入一个新的 IO API, 可以替代标准的 Java IO API。NIO 全面支持面向缓冲区的、基于通道的 IO 操作。NIO 将以高效的方式进行文件的读写操作。
185 0
Java NIO 概述(Channel、Buffer、Selector)