spark2.1.0之源码分析——RPC服务端引导程序TransportServerBootstrap

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81867045 提示:阅读本文前最好先阅读:《Spark2.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81867045

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》
  5. 《spark2.1.0之源码分析——RPC管道初始化》
  6. spark2.1.0之源码分析——RPC传输管道处理器详解
  7. spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解

通过《spark2.1.0之源码分析——RPC服务器TransportServer》一文的介绍,我们知道TransportServer的构造器中的bootstraps是TransportServerBootstrap的列表。接口TransportServerBootstrap定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。TransportServerBootstrap的定义见代码清单1。

代码清单1         TransportServerBootstrap的定义 

public interface TransportServerBootstrap {
  RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

TransportServerBootstrap的doBootstrap方法将对服务端的RpcHandler进行代理,接收客户端的请求。TransportServerBootstrap有SaslServerBootstrap和EncryptionCheckerBootstrap两个实现类。为了更清楚的说明TransportServerBootstrap的意义,我们以SaslServerBootstrap为例,来讲解其实现(见代码清单2)。

代码清单2         SaslServerBootstrap的doBootstrap实现

  public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
    return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
  }

根据代码清单2,我们知道SaslServerBootstrap的doBootstrap方法实际创建了SaslRpcHandler,SaslRpcHandler负责对管道进行SASL(Simple Authentication and Security Layer)加密。SaslRpcHandler本身也继承了RpcHandler,所以我们重点来看其receive方法的实现,见代码清单3。

代码清单3        SaslRpcHandler的receive方法

  @Override
  public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
    if (isComplete) {
      // 将消息传递给SaslRpcHandler所代理的下游RpcHandler并返回
      delegate.receive(client, message, callback);
      return;
    }

    ByteBuf nettyBuf = Unpooled.wrappedBuffer(message);
    SaslMessage saslMessage;
    try {
      saslMessage = SaslMessage.decode(nettyBuf);// 对客户端发送的消息进行SASL解密
    } finally {
      nettyBuf.release();
    }

    if (saslServer == null) {
      // 如果saslServer还未创建,则需要创建SparkSaslServer
      client.setClientId(saslMessage.appId);
      saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
        conf.saslServerAlwaysEncrypt());
    }

    byte[] response;
    try {
      response = saslServer.response(JavaUtils.bufferToArray(// 使用saslServer处理已解密的消息
        saslMessage.body().nioByteBuffer()));
    } catch (IOException ioe) {
      throw new RuntimeException(ioe);
    }
    callback.onSuccess(ByteBuffer.wrap(response));

    if (saslServer.isComplete()) {
      logger.debug("SASL authentication successful for channel {}", client);
      isComplete = true;// SASL认证交换已经完成
      if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
        logger.debug("Enabling encryption for channel {}", client);
        // 对管道进行SASL加密
        SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());
        saslServer = null;
      } else {
        saslServer.dispose();
        saslServer = null;
      }
    }
  }

根据代码清单3,SaslRpcHandler处理客户端消息的步骤如下:

  1. 如果SASL认证交换已经完成(isComplete等于true),则将消息传递给SaslRpcHandler所代理的下游RpcHandler并返回。
  2. 如果SASL认证交换未完成(isComplete等于false),则对客户端发送的消息进行SASL解密。
  3. 如果saslServer还未创建,则需要创建SparkSaslServer。当SaslRpcHandler接收到客户端的第一条消息时会做此操作。
  4. 使用saslServer处理已解密的消息,并将处理结果通过RpcResponseCallback的回调方法返回给客户端。
  5. 如果SASL认证交换已经完成,则将isComplete置为true。
  6. 对管道进行SASL加密。

SaslServerBootstrap是通过SaslRpcHandler对下游RpcHandler进行代理的一种TransportServerBootstrap。EncryptionCheckerBootstrap是另一种TransportServerBootstrap的实现,它通过将自身加入Netty的管道中实现引导,EncryptionCheckerBootstrap的doBootstrap方法的实现见代码清单4。

代码清单4         EncryptionCheckerBootstrap的doBootstrap实现

    @Override
    public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
      channel.pipeline().addFirst("encryptionChecker", this);
      return rpcHandler;
    }

在详细介绍了TransportChannelHandler之后我们就可以对《spark2.1.0之源码分析——RPC管道初始化》文中的图1进行扩展,把TransportRequestHandler、TransportServerBootstrap及RpcHandler的处理流程增加进来,如下图所示。

RPC框架服务端处理请求、响应流程图

                                                                          RPC框架服务端处理请求、响应流程图

有读者可能会问,上图中并未见TransportServerBootstrap的身影。根据对TransportServerBootstrap的两种实现的举例,我们知道TransportServerBootstrap将可能存在于图中任何两个组件的箭头连线中间,起到引导、包装、代理的作用。

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

相关文章
|
4月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
11月前
|
Go API 数据库
Go 微服务框架 go-micro 使用客户端 RPC 调用服务端方法返回 408 怎么解决?
Go 微服务框架 go-micro 使用客户端 RPC 调用服务端方法返回 408 怎么解决?
67 0
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
207 0
|
JSON 网络协议 安全
gRPC(八)生态 grpc-gateway 应用:同一个服务端支持Rpc和Restful Api
版权声明:本文为博主原创文章,未经博主允许不得转载。https://blog.csdn.net/weixin_46618592/article/details/127776709?spm=1001.2014.3001.5501
476 0
gRPC(八)生态 grpc-gateway 应用:同一个服务端支持Rpc和Restful Api
|
网络协议 Linux 网络安全
AnolisOS8.6做NFS服务端,挂载失败 mount: RPC: Unable to receive; errno = Connection refused
anolis8.6安装nfs服务端,在显示共享目录时,始终报错
|
消息中间件 Java RocketMQ
RocketMQ源码分析-Rpc通信模块(remoting)二
今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。
436 0
RocketMQ源码分析-Rpc通信模块(remoting)二
|
消息中间件 编解码 网络协议
RocketMQ源码分析-Rpc通信模块(remoting)一
上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。
516 0
RocketMQ源码分析-Rpc通信模块(remoting)一
|
编解码 JSON 网络协议
透视RPC协议:SOFA-BOLT协议源码分析
最近在看Netty相关的资料,刚好SOFA-BOLT是一个比较成熟的Netty自定义协议栈实现,于是决定研读SOFA-BOLT的源码,详细分析其协议的组成,简单分析其客户端和服务端的源码实现。当前阅读的源码是2021-08左右的SOFA-BOLT仓库的master分支源码。
321 0
透视RPC协议:SOFA-BOLT协议源码分析
RPC框架(7 - 实现服务端自动注册服务)
RPC框架(7 - 实现服务端自动注册服务)
|
存储 Java 编译器
RPC框架(1 - 实现服务端注册一个服务)
RPC框架(1 - 实现服务端注册一个服务)