1.问题
一个好的通信框架是怎样的?同时如何使用netty?
一个好的通信框架,必然是支持双工通信的,同时它能够对半包黏包进行处理,方便高效的编解码、序列化,拥有心跳超时机制,同时支持大量连接并发,方便调用。而这个通信的过程,我始终是觉得它的起源是三次握手和四次挥手。它们影响着消息中间件和通信框架以及SOA框架的发展。
2.netty中的echo例子
netty最简单的是它的EchoServer和EchoClient,两者有同时有自己对应的处理器EchoServerHandler、EchoClientHandler。
在EchoServer中,通常需要创建两个线程boss线程组和worker线程组。而在于创建线程组NioEventLoop的过程中,会执行openSelector()方法。此时会开启多路复用器。因此这个时候联想到如果使用java的nio的时候,必然需要首先创建需要连接的对象,然后根据连接对象,打开多路复用选择器。然后执行读写方法。而Netty的思想就是这样的。首先创建好连接,然后根据连接执行,然后执行读事件。而这个过程的执行都会经过processSelectedKeys处理选中的keys,也即事件。当如果写满了,没法写的时候,此时就会注册写事件,当可以写的时候,然后会执行写操作。
读写是在I/O 事件由 ChannelInboundHandler 或ChannelOutboundHandler}处理并通过调用中定义的事件传播方法转发到ChannelHandlerContext,例如 ChannelHandlerContext#fireChannelRead(Object)和ChannelHandlerContext#write(Object),进行数据的处理和写操作。
入站InBound数据通常是通过事件输入操作从远程读取的,如:SocketChannel#read(Buffer)。
出站OutBound处理程序通常会生产或者转换出站数据,如写入数据,I/O线程经常执行实际的输出操作,如SocketChannel#write(ByteBuffer)
Netty中的相关事件ops:
OP_READ=1<<0; 读事件1OP_WRITE=1<<2; 写事件4OP_CONNECT=1<<3; 连接事件8OP_ACCEPT=1<<4; accpet事件16
3.业务中使用Netty
这里不对netty整合spring,同时设置编解码、相应的请求code和响应code,这里就不进行展开了。
在血透业务中,对接测温仪,首先需要采集人脸图片信息,方便人脸识别来测定人体温的时候,存储起来。
业务处理整合Netty
为了将体温仪嵌入到我们的业务中,使用了Netty来保证高性能的信息通信和传输。自然我们需要考虑的首先是netty如何加入到血透系统中。
考虑到设备方需要进行人脸识别的对比,因此首先需要采集人脸数据,然后对人脸数据进行保存。然后后期以此进行对比。比对识别成功,则可以进行体温的测量,否则需要先进行人脸采集,然后进行体温的测量。
首先启动netty的过程中需要考虑加入到服务中,然后在启动nettyServer,而在启动的过程中,需要考虑传输过程中的编解码问题,这里需要根据传入信息的大小,进行内存的分配,为了合理的使用,分配一个新的接收缓冲区,其容量可能足够大以读取所有入站数据,而又足够小不要浪费其空间。
//设置bossGroup组配置serverBootstrap=serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, newAdaptiveRecvByteBufAllocator(64, 10496, 1048576)); // 设置workerGroup组配置serverBootstrap=serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, newAdaptiveRecvByteBufAllocator(64, 10496, 1048576)); // 设置处理事件链 进行编解码的设置以及适配器,考虑后期的扩展serverBootstrap=serverBootstrap.childHandler(newNettyChannelInitializer<SocketChannel>());
而适配器中,是我们进行业务处理的关键,执行入站操作。为什么是入站?
在netty中,入站是消息进入接收缓冲区,而出站是消息从发送缓冲区中刷出。也即入站操作主要是指读取数据的操作;而出站操作主要是指写入数据的操作。由于我们需要从设备方中读取数据,因此是入站操作。
那么这个过程首先需要请求设备方,然后根据设备方的响应,从而执行具体业务逻辑的处理。因为业务是基于人脸来来识别的,因此第一个信息是基于人脸的,服务端如果识别人脸,则基于深度学习根据人脸识别特征匹配的人脸,则可以根据人脸信息的数据信息,来记录人的数据信息推送到客户端,而客户端可以根据服务端的信息,进行记录。在这个过程中,同时支持进行离线识别上传的情况。
4.具体业务处理代码
这里省略编解码、序列化,只截取逻辑上的一部分:
publicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) throwsException { super.channelRead(ctx, msg); Messagemessage= (Message) msg; MessageDatamessageData=JSONObject.parseObject(message.getData(), MessageData.class); //创建响应,设置响应信息Responseresponse=newResponse(); response.setResult(CmdConstant.resultStatus); response.setDesc(CmdConstant.descStr); response.setMessageId(messageData.getMessageId()); log.debug("客户端消息:"+JSONObject.toJSONString(message)); autowired(); switch (message.getCmd()) { caseCmdConstant.loginReq: // 设备登录(login) 客户端请求服务端log.info("设备登录"); ChannelMap.channels.add(ctx.channel()); response.setMessage(CmdConstant.loginRspStr); //将信息推送到设备中PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.loginRsp, response); break; caseCmdConstant.registerReq: // 设备注册(register)log.info("设备注册"); RegisterRespregisterResp=newRegisterResp(); registerResp.setMachId("123456778888"); registerResp.setPasswd("123456"); response.setData(registerResp); response.setMessage(CmdConstant.registerRspStr); PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.registerRsp, response); break; caseCmdConstant.heartBeatReq: // 设备心跳log.debug("设备心跳"); HeartBeatRespheartBeatResp=newHeartBeatResp(); response.setData(heartBeatResp); response.setMessage(CmdConstant.heartbeatRspStr); PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.heartBeatRsp, response); break; caseCmdConstant.getUserReq: log.info("获取人员信息"); UserRequserReq=getUserReq(messageData); response.setData(userReq); response.setMessage(CmdConstant.getUserRspStr); PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.getUserRsp, response); break; // 服务端传过来的,进行解析,然后入库caseCmdConstant.snapshotFaceReq: log.info("上传识别记录"); SnapshotFacesnapshotFace=JSONObject.parseObject(messageData.getData(), SnapshotFace.class); if (snapshotFace!=null) { LongfkPatientId=PushCmdUtils.intToPatientId(Integer.valueOf(snapshotFace.getUserId()), sysTenantService.getDefaultId()); HdPatienthdPatient=hdPatientManager.selectByPrimaryKey(fkPatientId); BigDecimaltemperature=BigDecimal.valueOf(Double.parseDouble(snapshotFace.getTemperature())); if (Objects.nonNull(hdPatient)) { //更新患者透析体温数据,此时保存体温信息hdDrBaseService.saveTemperature(fkPatientId, temperature); } } PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response); break; ... //省略部分逻辑//离线上传和识别,通过系统 caseCmdConstant.offlineFaceReq: log.info("离线识别记录上传(带图片)"); PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response); break; caseCmdConstant.offlineNoFaceReq: log.info("离线识别记录上传(无图片)"); PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.snapshotFaceRsp, response); break; default: log.info("客户端消息:"+JSONObject.toJSONString(message)); response.setData(newHashMap<>()); PushCmdUtils.pushCmdToServer(ctx.channel(), CmdConstant.heartBeatRsp, response); log.info("无法识别命令"); break; } }
而服务端会根据客户端请求的reqCode来进行对应的处理,使用处理器进行处理。而这种处理方式在使用Netty的RocketMQ和sofa-bolt中是可以看到的。那么你是不是很好奇netty的应用,同时方便netty的使用。
下一篇来看sofa-bolt是如何封装netty来给我们带来方便使用的。