什么是bio,nio,aio 以及优缺点
什么是netty
Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的
它已经得到成百上千的商用项目验证,例如Hadoop的RPC框架Avro就使用了Netty作为底层通信框架,其他还有业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。
为什么选择netty
通过对Netty的分析,我们将它的优点总结如下。
◎ API使用简单,开发门槛低;
◎ 功能强大,预置了多种编解码功能,支持多种主流协议;
◎ 定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展;
◎ 性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优;
◎ 成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼;
◎ 社区活跃,版本迭代周期短,发现的 BUG 可以被及时修复,同时,更多的新功能会加入;
◎ 经历了大规模的商业应用考验,质量得到验证。Netty 在互联网、大数据、网络游戏、企业应用、电信软件等众多行业已经得到了成功商用,证明它已经完全能够满足不同行业的商业应用了。
正是因为这些优点,Netty逐渐成为了Java NIO编程的首选框架。
为什么要基于netty封装net-framework框架
虽然上面说了netty很多优点,但它本身就是一面像所有场景的网络框架,提供了无限的可能,我们使用的时候还是需要前后加很多铺垫逻辑的
我们多数使用的时候一般都是客户端和服务端的交互场景,因此我们希望开发时候直接就可以关心自己的业务场景,直接编写
服务端和业务端的的业务代码,数据读取,写入等逻辑不想重复编写,解耦数据和对应处理逻辑,直接可以针对某个数据编写对应的业务逻辑
基于以上思考,我业余时间开发了net-framework 框架,当前数据第一版提交,还很基础,也会有很多情况考虑不到,敬请谅解,也可能会有一些问题,后续会不断扩充完善
net-framework框架介绍
整体介绍
net-framework是一款基于netty开发的网络通讯框架,封装了数据读写和业务定义的逻辑
将所有服务端和业务端传输抽象成了signle(信号),一共有两种形式信号,一种是msgSignal(消息信号),一种是actionSignal(动作信号),
服务端和客户端只需要继承扩展这两种消息类型即可
msg消息一般就是我们的读取,比如系统消息,谁上线了的消息,聊天的公开消息等消息
action消息一般就是我们的动作,比如登陆,设置,文件传输等消息
将所有服务端和业务端对msgSignal和actionSignal的处理封装了共通的handler去处理这些消息,服务端和客户端只需要继承handler,并实现针对msgSignal 和actionSignal 的处理即可
net-framework适用场景
1.聊天室
2.远程控制
3.分布式计算
4.文件传输
5.其他交互场景
net-framework框架处理流程
框架流程处理流程描述
1.使用方添加各种自定义signal 到框架共通
2.在服务端和客户端创建对应signal 的处理handler 并注册入框架
3.服务端或者客户端发送自定义signal,服务端或者客户端接受
4.服务端或者客户端从框架获取到对应的处理handler 并处理
net-framework 源码部分
整体目录
共通端signal相关类结构和相关关键源码说明
共通端signal类图
Signal.java
/** * @Author alan.wang * @description: signal 信号顶层父类 */ public class Signal implements Serializable { public static final Integer SIGNAL_MSG = 1; public static final Integer SIGNAL_ACTION = 2; public Signal(){} public Signal(int type,int subType){ this.type = type; this.subType = subType; } protected int type ; protected int subType ; public int getType() { return type; } public void setType(int type) { this.type = type; } public boolean isTypeOf(int type){ return this.type == type; } public int getSubType() { return subType; } public void setSubType(int subType) { this.subType = subType; } }
ActionSignal.java
/** * @Author alan.wang * @description: actionSignal 动作信号父类 */ public class ActionSignal extends Signal { public static final Integer ACTION_TYPE_SET_NAME = 1; private String msg; public ActionSignal(){}; public ActionSignal(Integer actionType ,String msg){ super(Signal.SIGNAL_ACTION,actionType); this.msg = msg; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }
MsgSignal.java
/** * @Author alan.wang * @description: MsgSignal 消息信号父类 */ public class MsgSignal extends Signal { public static final Integer MSG_TYPE_MSG_PUBLIC = 3; private String msg; public MsgSignal(){} public MsgSignal(Integer actionType ,String msg){ super(Signal.SIGNAL_MSG,actionType); this.msg = msg; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }
PublicNormalMsgSignal.java
/** * @Author alan.wang * @description: SetNameActionSignal 修改名称动作信号类 */ public class SetNameActionSignal extends ActionSignal { public SetNameActionSignal(){ super(); } public SetNameActionSignal(Integer actionType, String msg) { super(actionType, msg); } }
共通端handler相关类结构以及源码说明
AbstractSignalHandler.java
/** * @Author alan.wang * @description: 对ActionSignal 动作信息的处理抽象类 */ public abstract class AbstractSignalHandler { public abstract boolean isHandlerof(int type,int actionType); public abstract void handle(String signalStr, ChannelHandlerContext ctx,String ip); }
AbstractMsgSignalHandler.java
/** * @Author alan.wang * @description: 对MsgSignal 信息信号的处理抽象类,根据业务自定义扩展方法即可 */ public abstract class AbstractMsgSignalHandler extends AbstractSignalHandler{ }
AbstractActionSignalHandler.java
/** * @Author alan.wang * @description: 对ActionSignal 动作信息的处理抽象类,根据业务自定义扩展方法即可 */ public abstract class AbstractActionSignalHandler extends AbstractSignalHandler { }
MsgSignalPublicNormalHandler.java
/** * @Author alan.wang * @description: 对 AbstractActionSignalHandler 的扩展,用来处理PublicNormalMsgSignal 信号 */ public class MsgSignalPublicNormalHandler extends AbstractMsgSignalHandler { @Override public boolean isHandlerof(int type, int subType) { return type == Signal.SIGNAL_MSG && subType == MsgSignal.MSG_TYPE_MSG_PUBLIC; } @Override public void handle(String signalStr, ChannelHandlerContext ctx, String ip) { PublicNormalMsgSignal publicNormalMsgSignal = JSONUtil.toBean(signalStr, PublicNormalMsgSignal.class); System.out.println("接到公共消息:"+publicNormalMsgSignal.getMsg()); } }
ActionSignalSetNameHandler.java
/** * @Author alan.wang * @description: 对 AbstractActionSignalHandler 的扩展,用来处理SetNameActionSignal 信号 */ public class ActionSignalSetNameHandler extends AbstractActionSignalHandler { @Override public boolean isHandlerof(int type, int subType) { return type == Signal.SIGNAL_ACTION && subType == ActionSignal.ACTION_TYPE_SET_NAME; } @Override public void handle(String signalStr, ChannelHandlerContext ctx,String ip) { SetNameActionSignal setNameActionSignal = JSONUtil.toBean(signalStr, SetNameActionSignal.class); SocketChannelInfo socketChannelInfo = SocketChannelInfoQuene.getScFromQuene(ip); socketChannelInfo.setName(setNameActionSignal.getMsg()); //从queue 中获取所有客户端连接并循环发送 List<SocketChannelInfo> socketChannelInfos = SocketChannelInfoQuene.quene(); socketChannelInfos.forEach(scinfo->{ PublicNormalMsgSignal publicNormalMsgSignal = new PublicNormalMsgSignal(MsgSignal.MSG_TYPE_MSG_PUBLIC,"["+setNameActionSignal.getMsg()+"]"+"上线"); byte[] bytes = JSONUtil.toJsonPrettyStr(publicNormalMsgSignal).getBytes(); ByteBuf byteBuf = Unpooled.buffer(bytes.length); byteBuf.writeBytes(bytes); scinfo.getNioSocketChannel().writeAndFlush(byteBuf); }); } }
共通端dispatcher相关类结构以及相源码码说明
AbstractSignalHandlerDispatcher.java
/** * @Author alan.wang * @description: 对signal 信号类型派发处理handler 的抽象类 */ public abstract class AbstractSignalHandlerDispatcher { public abstract List<AbstractSignalHandler> getSignalHandlers(); public AbstractSignalHandler dispatch(Signal signal) { //判断是否是某个自定义handler 处理的方法,如果是则返回 return getSignalHandlers().stream().filter(handler->handler.isHandlerof(signal.getType(),signal.getSubType())).findFirst().get(); } }
MsgSignalPublicNormalHandler.java
/** * @Author alan.wang * @description: 对 AbstractActionSignalHandler 的扩展,用来处理PublicNormalMsgSignal 信号 */ public class MsgSignalPublicNormalHandler extends AbstractMsgSignalHandler { @Override public boolean isHandlerof(int type, int subType) { return type == Signal.SIGNAL_MSG && subType == MsgSignal.MSG_TYPE_MSG_PUBLIC; } @Override public void handle(String signalStr, ChannelHandlerContext ctx, String ip) { PublicNormalMsgSignal publicNormalMsgSignal = JSONUtil.toBean(signalStr, PublicNormalMsgSignal.class); System.out.println("接到公共消息:"+publicNormalMsgSignal.getMsg()); } }
ServerSignalHandlerDispatcher.java
/** * @Author alan.wang * @description: 对 AbstractSignalHandlerDispatcher 的扩展,只需要stream 里面不断添加自定义handler即可 */ public class ServerSignalHandlerDispatcher extends AbstractSignalHandlerDispatcher { //在这里添加自己的服务端业务处理handler即可 private static List<AbstractSignalHandler> signalHandlers = Stream.of(new ActionSignalSetNameHandler()).collect(Collectors.toList()); @Override public List<AbstractSignalHandler> getSignalHandlers() { return signalHandlers; } }
quene 服务端所有连接客户端channel 相关类结构以及源码说明
SocketChannelInfoQuene.java
/** * @Author alan.wang * @description: 保存了所有客户端连接channel 连接的quene,可以用来获取所有客户端连接,应加上lock 操作,后续会添加 */ public class SocketChannelInfoQuene { private static Map<String, SocketChannelInfo> scMap = new HashMap<>(); public static void putScInQuene(String ip,SocketChannelInfo socketChannelInfo){ scMap.put(ip,socketChannelInfo); } public static boolean scIsQuene(String ip){ return scMap.containsKey(ip); } public static void removeScFromQuene(String ip){ scMap.remove(ip); } public static SocketChannelInfo getScFromQuene(String ip){ return scMap.get(ip); } public static List<SocketChannelInfo> quene() { return new ArrayList<>(scMap.values()); } }
SocketChannelInfo.java
/** * @Author alan.wang * @description: 保存了所有客户端连接channel 的ip和名称信息,可以自定义扩展 */ public class SocketChannelInfo { private String name ; private String ip; private NioSocketChannel nioSocketChannel; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public NioSocketChannel getNioSocketChannel() { return nioSocketChannel; } public void setNioSocketChannel(NioSocketChannel nioSocketChannel) { this.nioSocketChannel = nioSocketChannel; } public static SocketChannelInfo build(String ip ,NioSocketChannel nioSocketChannel){ SocketChannelInfo socketChannelInfo = new SocketChannelInfo(); socketChannelInfo.setIp(ip); socketChannelInfo.setNioSocketChannel(nioSocketChannel); return socketChannelInfo; } }
服务端和客户端启动类拷贝并启动
Server.java
** * @Author alan.wang * @description: 服务端启动类,仅供参考,可以自己扩充 */ public class Server { public static void main(String[] args) throws InterruptedException { //设置主进程线程 EventLoopGroup mainGroup = new NioEventLoopGroup(); //设置工作线程 EventLoopGroup workGroup = new NioEventLoopGroup(); try { //获取启动参数并设置,目前只有port ,比如 --port 8080 等 ArgsUtil.RunArgs runArgs = ArgsUtil.init(args); ServerBootstrap serverBootstrap = new ServerBootstrap(); //设置channel 类型为NioServerSocketChannel 并设置操作参数 serverBootstrap.group(mainGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler()); ChannelFuture channelFuture; if (runArgs.hasPort()) { //设置启动端口,端口从启动参数获取 channelFuture = serverBootstrap.bind(runArgs.getPort()).sync(); } else { //设置启动端口,外部没有设定则写死9898 channelFuture = serverBootstrap.bind(9898).sync(); } //设置阻塞主进程 channelFuture.channel().closeFuture().sync(); }finally { //优雅退出并关闭主进程线程 mainGroup.shutdownGracefully(); // workGroup.shutdownGracefully(); } } }
client.java
/** * @Author alan.wang * @description: 客户端启动类,仅供参考,可以自己扩充 */ public class Client { public static void main(String[] args) throws InterruptedException { //注册主进程线程 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); //配置cannel 为NioSocketChannel 并配置参数 bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientChannelHandler()); } }); //连接到服务端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9898).sync(); //阻塞主进程 channelFuture.channel().closeFuture().sync(); }finally { eventLoopGroup.shutdownGracefully(); } } }
如何使用net-framework框架
步骤如下
1.扩展msgSignal和actionSignal
2.扩展dispatcher
3.扩展handler
4.common 部分打包或拷贝到自己项目中
5.服务端和客户端启动类拷贝并启动
1.扩展msgSignal或actionSignal
2.扩展dispatcher
可直接使用源码中的两个dispatcher:
ClientSignalHandlerDispatcher
ServerSignalHandlerDispatcher
3.扩展handler
参考MsgSignalPublicNormalHandler 和 ActionSignalSetNameHandler
需要加进各自的dispacther中
4.common 部分打包或拷贝到自己项目中
以上common模块可以直接拷贝或者打包到自己业务工程内
5.服务端和客户端启动类拷贝并启动
直接拷贝源码中的服务端或者客户端启动类到自己项目或者自定义启动类
使用 demo示例
场景 :服务端和客户端连接并发送上线消息demo示例
源码本身的客户端修改名称以及服务端发送某某上线通知
服务端接到改名signal 并发送通知signal 到所有客户端