OkHttp源码详解之二完结篇

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: OkHttp源码详解之二完结篇

1. 请大家思考几个问题



在开始本文之前,请大家思考如下几个问题。并请大家带着这几个问题,去本文寻找答案。如果你对下面几个问题的答案了如指掌那本文可以略过不看


在浏览器中输入一个网址,按回车后发生了什么?


Okhttp的TCP连接建立发生在什么时候?


Okhttp的Request请求发生在什么时候?


Okio与Okhttp是在什么时候发生关联的?


Okhttp的Interceptor和Chain是怎么串联起来的?


Okhttp同步请求和异步请求分别是怎么实现的。异步请求有限制吗?


本文将围绕以上6个问题对Okhttp做一个简单的讲解,如有遗漏的知识点,欢迎在评论区指出,一起探讨成长。


2. Http请求的流程



首先来回答第一个问题“在浏览器中输入一个网址,按回车后发生了什么?”。Http权威指南一书给出的答案是发生了7件事情


浏览器从url中解析出主机名


浏览器将服务器的主机名转换成服务器的IP地址


浏览器将端口号从url中解析出来


浏览器建立一条与Web服务器的TCP连接


浏览器向服务器发送一条Http请求报文


服务器向浏览器回送一条Http响应报文


关闭连接,浏览器显示文档


以上七步是每一个Http请求必须要做的事情,Okhttp库要实现Http请求也不例外。这七个步骤的每一步,在Okhttp中都有体现。


HttpUrl类负责步骤1和步骤3的主机名和端口解析


Dns接口的具体实现负责步骤2的实现


RealConnection类就是步骤4中的那条TCP连接


CallServerInterceptor拦截器的intercept方法负责步骤5和步骤6的发送报文和接收报文

ConnectPool连接池中提供了关闭TCP连接的方法


Okio在哪操作?当然是往Socket的OutputStream写请求报文和从Socket的InputStram读取响应报文了


至此文章开头提出的6个问题,前5个都已经有了一个简单的回答。至于第六个问题,异步在Okhttp中用的缓存线程池,理论上缓存线程池,是当有任务到来,就会从线程池中取一个空闲的线程或者新建线程来处理任务,而且缓存线程池的线程数是Integer.MAX_VALUE。理论上是没有限制的。但是Dispatcher类在线程池的基础上,做了强制限制,最多可以同时处理的网络请求数64个,对于同一个主机名,最多可以同时处理5个网络请求。接下来我带大家从源码的角度来寻找这几个问题的答案


3.初识Okhttp


首先我们来写两个简单的例子来使用Okhttp完成get和post请求。


同步get请求

OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
  Request request = new Request.Builder()
      .url(url)
      .build();
  Response response = client.newCall(request).execute();
  return response.body().string();
}

同步post请求

public static final MediaType JSON
    = MediaType.parse("application/json; charset=utf-8");
OkHttpClient client = new OkHttpClient();
String post(String url, String json) throws IOException {
  RequestBody body = RequestBody.create(JSON, json);
  Request request = new Request.Builder()
      .url(url)
      .post(body)
      .build();
  Response response = client.newCall(request).execute();
  return response.body().string();
}

异步get请求

OkHttpClient client = new OkHttpClient();
void run(String url) throws IOException {
    Request request = new Request.Builder()
            .url(url)
            .build();
    client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
        }
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            System.out.println(response.body().string());
        }
    });
}

异步post请求

public static final MediaType JSON
            = MediaType.parse("application/json; charset=utf-8");
    OkHttpClient client = new OkHttpClient();
    void post(String url, String json) throws IOException {
        RequestBody body = RequestBody.create(JSON, json);
        Request request = new Request.Builder()
                .url(url)
                .post(body)
                .build();
        client.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
            }
        });
    }


Okhttp完成一次网络请求,过程如下


创建一个OkHttpClient对象client


创建一个Request对象request,并设置url


调用client.newCall(request)生成一个Call对象call


调用call.execute()[同步调用]或者call.enqueue(callback)[异步调用]完成网络请求


4. 同步和异步网络请求过程



同步调用过程如下

OkHttpClient.newCall(Request r)  => RealCall.execute() => RealCall.getResponseWithInterceptorChain()

异步调用过程如下

OkHttpClient.newCall(Request r) => RealCall.enqueue(Callback) => Dispatcher.enqueue(AsyncCall)
    => 线程池执行AsyncCall =>AsyncCall.execute() => RealCall.getResponseWithInterceptorChain()

从上图可以看出异步调用比同步调用步骤更长,也更复杂。在这里我把整个调用过程分成两个阶段,RealCall.getResponseWithInterceptorChain()称作“网络请求执行阶段”,其余部分称作“网络请求准备阶段”。由于“网络请求执行阶段”涉及到链式(Chain)调用以及各种拦截器的执行比“网络请求准备阶段”复杂太多。所以我们先来看“网络请求准备阶段”,这个阶段也需要分成同步和异步两种方式来讲解


首先我们来看下准备阶段的公共调用部分OkHttpClient.newCall(Request r)

 @Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
  }

Call是什么,在我看来Call可以理解成是对网络请求封装,它可以执行网络请求,也可以取消网络请求。我们看下Call的类定义

public interface Call extends Cloneable {
  Request request();
  Response execute() throws IOException;
  void enqueue(Callback responseCallback);
  void cancel();
  boolean isExecuted();
  boolean isCanceled();
  Call clone();
  interface Factory {
    Call newCall(Request request);
  }
}

request()返回的Request对象


execute()同步执行网络操作


enqueue(Callback responseCallback)异步执行网络操作


cancel()取消网络操作


RealCall是Call的实现类。我们重点来看下execute()和enqueue(Callback responseCallback)


execute()方法

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      //这里只是把call存到Dispatcher的列表中
      client.dispatcher().executed(this);
      //真正执行网络操作
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }

enqueue(Callback callback)

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

我们注意到AsyncCall类,首先它是RealCall类的非静态内部类,它持有RealCall的对象,它可以做任何RealCall能做的事情。同时它继承自NamedRunnable。NamedRunnable是Ruannable的子类。它主要有两个功能,1.可以被线程池执行 2.修改当前执行线程的线程名(方便debug)。由于这几个类都比较简单,而且篇幅有限,这里就不贴代码了。请自行查阅


当然到这里异步执行的准备阶段并没有结束,它是怎么被子线程执行的呢。这里我们就需要在Dispatcher类中寻找答案了。Dispatcher是干嘛用的,它的功能就是负责分发用户创建的网络请求,以及控制网络请求的个数,以及上一个网络请求结束后,要不要执行等待中的网络请求。下面我们来看下Dispatcher都有哪些成员变量,以及这些成员变量的作用

public final class Dispatcher {
  //最多可以同时请求数量
  private int maxRequests = 64;
  //每个host最大同时请求数量
  private int maxRequestsPerHost = 5;
  //当没有任务执行的回调 比如说执行10个任务,10个任务都执行完了会回调该接口
  private Runnable idleCallback;
  /** Executes calls. Created lazily. */
  //线程池 用的是缓存线程池(提高吞吐量,并且能及时回收无用的线程)
  private ExecutorService executorService;
  /** Ready async calls in the order they'll be run. */
  //等待的异步任务队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  //正在执行的异步任务队列 这个放的是Runnable
  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  //正在执行的同步任务队列 这个存储的是RealCall
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }
  public Dispatcher() {
  }
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
 }

总结下几个比较重要的概念


executorService 线程池并且是缓存线程池,最最大程度提高吞吐量,并且能回收空闲的线程


Deque readyAsyncCalls异步请求等待队列,当异步请求数超过64个,或者单个主机名请求超过5个,网络任务(AsyncCall)会放到该队列里。当任务执行完毕,会调用promoteCalls()把满足条件的网络任务(AsyncCall)放到线程池中执行


Deque runningAsyncCalls 正在执行的异步任务队列


Deque runningSyncCalls 正在执行的同步任务队列


紧接着我们来看下Dispatcher的enqueue(AsyncCall call)

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
     //如果请求没有超过限制 用线程池执行网络请求
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      //如果超过了限制,把请求放到异步等待队列中去,什么时候被唤醒?finished后
      readyAsyncCalls.add(call);
    }
  }

通过线程池执行AsyncCall 最终调用的是AsyncCall的run(),而run()中调用的是AsyncCall的execute()

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

在execute()中调用了getResponseWithInterceptorChain(),也就是我们前面说的“网络请求执行阶段”。至此,同步和异步网络请求的“网络请求准备阶段”就讲解完了,接下来我们讲解“网络请求执行阶段”


5. Interceptor和Chain


Response getResponseWithInterceptorChain() throws IOException {
    List<Interceptor> interceptors = new ArrayList<>();
    //用户自定义的拦截器
    interceptors.addAll(client.interceptors());
    //重试和重定向拦截器
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接拦截器
    interceptors.add(new ConnectInterceptor(client));
    //用户自定义的网络拦截器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    //真正请求服务器的拦截器
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //用Chain把List中的interceptors串起来执行
    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

要理解上面的代码。我们需要搞清楚两个概念 Interceptor和Chain。我们先来看下它们的定义

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;
  interface Chain {
    Request request();
    Response proceed(Request request) throws IOException;
    Connection connection();
  }
}

可以举一个比较形象的例子来解释它们的关系。就以我们IT行业来讲。老板说想要一个APP,然后把需求转化成任务流转给产品经理,产品经理构思了下,然后把任务流转给设计师,设计师一通设计之后,把设计图流转给码农,码农接到任务后就开始加班加点的编码,最终写出了APP,然后把APP交给设计师,让设计师查看界面是否美观,设计师再把APP流转给产品经理,产品经理觉得很满意,最终把APP交付给老板。老板看了很满意,说大家伙晚上加个鸡腿。虽然实际生产中并不是这样的一个流程,想了很久觉得还是这样讲,更好理解。对应到这个例子中,产品经理、设计师、程序员他们都是真正干活的家伙,他们对应的是Interceptor。Chain是什么,是老板吗?不是!!Chain只是一套规则,对应的就是例子里的流转流程。

链和Interceptors.png

链式调用Interceptor.png

对照图片,“网络请求执行阶段”会依次执行Interceptors里的Interceptor。Interceptor执行分成三步。第一步:处理请求 第二步:执行下个Interceptor 第三步:处理响应

 List<Interceptor> interceptors = new ArrayList<>();
    //用户自定义的拦截器
    interceptors.addAll(client.interceptors());
    //重试和重定向拦截器
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接拦截器
    interceptors.add(new ConnectInterceptor(client));
    //用户自定义的网络拦截器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    //真正请求服务器的拦截器
    interceptors.add(new CallServerInterceptor(forWebSocket));

首先执行的就是retryAndFollowUpInterceptor

RetryAndFollowUpInterceptor

//第一部分 根据url解析出主机名 解析端口
Request request = chain.request();
    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);
....省略
while(true){
    ....
    //第二部分处理下一个Interceptor
    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
    releaseConnection = false;
    ....
    //第三部分 处理响应,如果是重定向,用while循环,重新处理下一个Interceptor
     if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }
      closeQuietly(response.body());
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }
}

BridgeInterceptor和CacheInterceptor请读者自行分析


ConnectInterceptor中会解析主机DNS并且建立TCP连接,Socket的输入输出流通过Source和Sink处理


public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;
  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }
  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //这里会解析DNS并且建立TCP连接,Socket的输入输出流会和Okio的Source和Sink关联
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

CallServerInterceptor 通过httpCodec的sink向socket发送请求,并且通过httpCodec的openResponseBody把socket的输入流写入到Response中

@Override public Response intercept(Chain chain) throws IOException {
    HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();
    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);
    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }
      // Write the request body, unless an "Expect: 100-continue" expectation failed.
      if (responseBuilder == null) {
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      }
    }
    httpCodec.finishRequest();
    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))//真正把body写进去
          .build();
    }
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
    return response;
  }

6. 结束语



OkHttp库还是挺庞大的,涉及到很多Http的基础知识。这里只是讲解了OkHttp的一小部分。很多细节的东西也没有深入讲解。比如说Socket的建立,连接池的管理,HttpCodec如何解析输入输出流,路由的细节。希望有机会可以再深入的讲解一番






相关文章
|
5月前
|
Dubbo Java Nacos
【实战攻略】破解Dubbo+Nacos+Spring Boot 3 Native打包后运行异常的终极秘籍——从零开始彻底攻克那些让你头疼不已的技术难题!
【8月更文挑战第15天】Nacos作为微服务注册与配置中心受到欢迎,但使用Dubbo+Nacos+Spring Boot 3进行GraalVM native打包后常遇运行异常。本文剖析此问题及其解决策略:确认GraalVM版本兼容性;配置反射列表以支持必要类和方法;采用静态代理替代动态代理;检查并调整配置文件;禁用不支持的功能;利用日志和GraalVM诊断工具定位问题;根据诊断结果调整GraalVM配置。通过系统排查方法,能有效解决此类问题,确保服务稳定运行。
137 0
|
6月前
|
设计模式 Java 测试技术
《手把手教你》系列基础篇(九十四)-java+ selenium自动化测试-框架设计基础-POM设计模式实现-下篇(详解教程)
【7月更文挑战第12天】在本文中,作者宏哥介绍了如何在不使用PageFactory的情况下,用Java和Selenium实现Page Object Model (POM)。文章通过一个百度首页登录的实战例子来说明。首先,创建了一个名为`BaiduHomePage1`的页面对象类,其中包含了页面元素的定位和相关操作方法。接着,创建了测试类`TestWithPOM1`,在测试类中初始化WebDriver,设置驱动路径,最大化窗口,并调用页面对象类的方法进行登录操作。这样,测试脚本保持简洁,遵循了POM模式的高可读性和可维护性原则。
61 2
|
8月前
|
Dubbo 中间件 应用服务中间件
【想进大厂还不会阅读源码】ShenYu源码-支持motan协议
ShenYu源码阅读📚。原来的插件只支持 motan2 协议,并且是硬编码的,本次修改使MotanRpcExt 得到增强。我们可以通过以上的线索来思考我们本次的阅读线索,贡献者是做了什么实现了增强motan插件、这个motan的插件的功能是什么。
|
设计模式 缓存 Java
一步步带你读懂 Okhttp 源码
一步步带你读懂 Okhttp 源码
|
安全 Java Spring
Java开发 - Spring框架初体验(二)
Java开发 - Spring框架初体验
121 0
Java开发 - Spring框架初体验(二)
|
负载均衡 Java uml
再谈openfeign,聊聊它的源代码
再谈openfeign,聊聊它的源代码
149 0
再谈openfeign,聊聊它的源代码
带你吃透Servlet核心编程下篇(完整图文教程)(下)
文章目录 1 Http协议 1.1 什么是 HTTP 协议 1.2 GET请求与POST请求 1.3 响应的HTTP协议格式 1.4 MIME数据类型 2 HttpServletRequest类 2.1 HttpServletRequest说明及常用方法 2.2 HttpServletRequest类演示 2.3 获取请求表单中的参数值(POST请求) 2.4 解决post请求中的中文乱码问题 3 请求转发 4 HttpServletResponse类 4.1 两个输出流 4.2 如何回传客户端数据 5 请求重定向 5.1 什么是请求重定向 5.2 请求重定向演示
带你吃透Servlet核心编程下篇(完整图文教程)(下)
|
XML 应用服务中间件 数据格式
带你吃透Servlet核心编程下篇(完整图文教程)(中)
文章目录 1 Http协议 1.1 什么是 HTTP 协议 1.2 GET请求与POST请求 1.3 响应的HTTP协议格式 1.4 MIME数据类型 2 HttpServletRequest类 2.1 HttpServletRequest说明及常用方法 2.2 HttpServletRequest类演示 2.3 获取请求表单中的参数值(POST请求) 2.4 解决post请求中的中文乱码问题 3 请求转发 4 HttpServletResponse类 4.1 两个输出流 4.2 如何回传客户端数据 5 请求重定向 5.1 什么是请求重定向 5.2 请求重定向演示
带你吃透Servlet核心编程下篇(完整图文教程)(中)
|
XML 前端开发 JavaScript
带你吃透Servlet核心编程下篇(完整图文教程)(上)
文章目录 1 Http协议 1.1 什么是 HTTP 协议 1.2 GET请求与POST请求 1.3 响应的HTTP协议格式 1.4 MIME数据类型 2 HttpServletRequest类 2.1 HttpServletRequest说明及常用方法 2.2 HttpServletRequest类演示 2.3 获取请求表单中的参数值(POST请求) 2.4 解决post请求中的中文乱码问题 3 请求转发 4 HttpServletResponse类 4.1 两个输出流 4.2 如何回传客户端数据 5 请求重定向 5.1 什么是请求重定向 5.2 请求重定向演示
带你吃透Servlet核心编程下篇(完整图文教程)(上)
|
存储 缓存 网络协议
源码阅读 | Okhttp
源码阅读 | Okhttp