OkHttp 源码

OkHttpClient

构造函数

1
OkHttpClient okHttpClient = new OkHttpClient();
1
2
3
4
// 其内部实现了 Builder()
public OkHttpClient() {
this(new Builder());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 实现了一些默认的配置
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
proxySelector = new NullProxySelector();
}
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
// 初始化连接池
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
callTimeout = 0;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final Dispatcher dispatcher;  // 调度器
final @Nullable Proxy proxy; // 代理
final List<Protocol> protocols; // 协议
final List<ConnectionSpec> connectionSpecs; // 传输层版本和连接协议
final List<Interceptor> interceptors; // 拦截器
final List<Interceptor> networkInterceptors; // 网络拦截器
final EventListener.Factory eventListenerFactory;
final ProxySelector proxySelector; // 代理选择器
final CookieJar cookieJar; // cookie
final @Nullable Cache cache; // 缓存
final @Nullable InternalCache internalCache; // 内部缓存
final SocketFactory socketFactory; // socket 工厂
final SSLSocketFactory sslSocketFactory; // 安全套层 socket 工厂,用于 https
final CertificateChainCleaner certificateChainCleaner;//验证确认响应书,适用https请求连接的主机名
final HostnameVerifier hostnameVerifier; // 主机名字确认
final CertificatePinner certificatePinner; // 证书链
final Authenticator proxyAuthenticator; // 代理身份验证
final Authenticator authenticator; // 本地身份验证
final ConnectionPool connectionPool; // 链接池,复用链接
final Dns dns; // 域名
final boolean followSslRedirects; // 安全套接层重定向
final boolean followRedirects; // 本地重定向
final boolean retryOnConnectionFailure; // 重试连接失败
final int callTimeout; //
final int connectTimeout; // 连接超时
final int readTimeout; // 读取超时
final int writeTimeout; // 写入超时
final int pingInterval; //

请求网络

1
2
// 无论是同步请求还是异步请求,都是由 Call 来调用。
Call call = okHttpClient.newCall(request);
1
2
3
4
5
// 真正的请求实现由 RealCall 完成
@Override
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}

异步请求

1
2
3
4
5
6
7
8
9
10
11
12
13
// RealCall 类内部
@Override
public void enqueue(Callback responseCallback) {
// 确保每个 Call 只能被执行一次,不能重复执行。
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// 此行代码有版本差异
transmitter.callStart();
// 利用 dispatcher 调度器,来进行实际的执行。
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

Dispatcher 调度器

Dispatcher 主要用于控制并发的请求

它有两个构造方法,可使用自定义的线程池,或默认线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}

public Dispatcher() {
}

public synchronized ExecutorService executorService() {
if (executorService == null) {
// 类似于 CachedThreadPool,比较适合执行大量的耗时比较少的任务。
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private int maxRequests = 64;  // 最大并发请求数
private int maxRequestsPerHost = 5; // 同时请求的相同 HOST 的最大个数
private @Nullable Runnable idleCallback;

/** Executes calls. Created lazily. */
// 消费者线程池
private @Nullable ExecutorService executorService;

/** Ready async calls in the order they'll be run. */
// 异步等待队列。。双端队列,支持首尾两端 双向开口可进可出,方便移除。
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
// 正在运行的异步请求队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
// 正在运行的同步请求队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 当调用 RealCall 的 enqueue(),实际上是调用了 Dispatcher 的 enqueue()。
// 异步请求
void enqueue(AsyncCall call) {
synchronized (this) {
// 有版本差异,此前会判断当正在运行的异步请求队列中的数量小于 64 并且正在运行的请求主机数小于 5 时,把请求加载到 runningAsyncCalls 中并在线程池中执行,否则就加入到 readyAsyncCalls 中进行缓存等待。

// 加入等候队列,AsyncCall 是 RealCall 的内部类,其内部也实现了 execute()。
readyAsyncCalls.add(call);

// okhttp 会使用共享主机,即地址相同的会共享 socket。
if (!call.get().forWebSocket) {
// findExistingCallWithHost() 通过循环判断相同的 host
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 将符合条件的调用从{@link #readyAsyncCalls}提升到{@link #runningAsyncCalls},并在executor服务上运* 行它们。不能同步调用,因为执行的调用可以调用用户代码。
* @return 如果调度程序当前正在运行调用,则为true。
*/
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));

List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
// 从 readyAsyncCalls 取出下一个请求,加入 runningAsyncCalls 并交由线程池处理。
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
// 同时请求不能超过并发数(64,可配置调度器调整).
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
// 最大的 DNS 解析,默认值 5。同一个 host 最多允许 5 条线程通知执行请求。
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
// 加入运行队列,并交给线程池执行。
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}

for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}

return isRunning;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
// AsyncCall 是一个 runnable,放到线程池中去执行。
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}

AsyncCall 的实现

1
final class AsyncCall extends NamedRunnable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// NamedRunnable 是一个 Runnable ,所以 AsyncCall 是一个 Runnable。
// 线程池实际上是执行了 execute()。
public abstract class NamedRunnable implements Runnable {
protected final String name;

public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}

@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}

protected abstract void execute();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override 
protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
// 责任链模式
// 拦截器链 真正执行请求网络
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
// 回调结果,将 response 返回给用户
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
// 无论请求结果如何,通过调度器移除队列
client.dispatcher().finished(this);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
// 移除队列,将此次请求从 runningAsyncCalls 移除。
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
// 这里不再是调用 promoteCalls(),而是调用了 promoteAndExecute()。
boolean isRunning = promoteAndExecute();
// 闲置调用
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}

Interceptor 拦截器

拦截器是一种能够监控,重写,重试调用的机制。通常情况下,拦截器用来添加,移除,转换请求和响应的头部信息。比如将域名替换为 IP 地址,在请求头中添加 host 属性,也可以添加应用中的一些公共参数,比如设备 id,版本号等等。

通过拦截链的设计,让请求分成5个拦截器去处理,拦截器各司其职,扩展性非常高。拦截链是从自定义的拦截器开始,然后再到默认的5个拦截器。一般情况下想打印网络请求日志,可以自定义Log拦截器,如果要给所有请求添加Header,同样可以自定义Header拦截器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/*
* RealCall 的 getResponseWithInterceptorChain()
* 真正执行网络请求和返回响应结果
* 实现了Interceptor接口,采用责任链的模式来使每个功能分开,每个Interceptor自行完成自己的任务,并且将不属于* 自己的任务交给下一个,简化了各自的责任和逻辑。
*
* 执行流程:
* 自定义的 interceptor
* RetryAndFollowUpInterceptor 失败重试、重定向拦截器。
* BridgeInterceptor(client.cookieJar()) 桥拦截器:主要是添加和删除一些header
* CacheInterceptor(client.internalCache()) 缓存拦截器:根据缓存策略,如果缓存可用,直接返回缓存数据。
* ConnectInterceptor(client) 连接池拦截器:连接池会缓存http链接,连接池的好处是复用连接,少了3次握手,所* 以请求会更快
* client.networkInterceptors() --如果有设置的话
* CallServerInterceptor(forWebSocket) 真正访问网络的拦截器:内部使用okio去发请求
*/
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
// 责任链
List<Interceptor> interceptors = new ArrayList<>();
// 在配置 okhttpClient 时设置的intercept 由用户自己设置
interceptors.addAll(client.interceptors());
// 负责处理失败后的重试与重定向
interceptors.add(new RetryAndFollowUpInterceptor(client));
// 桥拦截器,从应用程序代码到网络代码的桥梁。负责把用户构造的请求转换为发送到服务器的请求,把服务器返回的响应转换为用户友好的响应,处理、配置请求头等信息。
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// 处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
// 设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改)
// 可配置用户自己设置的缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
// 连接服务器 负责和服务器建立连接 这里才是真正的请求网络
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
// 配置 okhttpClient 时设置的 networkInterceptors
// 返回观察单个网络请求和响应的不可变拦截器列表。
interceptors.addAll(client.networkInterceptors());
}
// 执行流操作(写出请求体、获得响应数据) 负责向服务器发送请求数据、从服务器读取响应数据
// 进行http请求报文的封装与请求报文的解析
interceptors.add(new CallServerInterceptor(forWebSocket));
// 创建责任链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());

boolean calledNoMoreExchanges = false;
try {
// 执行责任链,使用责任链模式开启链式调用。
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* RealInterceptorChain 实现了 Interceptor.Chain 接口,是一个负责管理拦截链的类,每个拦截器调用 * chain.proceed(request),就会走到下一个拦截器的 intercept方法。
*
* 拦截链最底部的拦截器是 CallServerInterceptor,用 okio 请求网络返回了 Response,然后 Response 往这个* 拦截链回传,上一个拦截器通过 response = chain.proceed(request);就获取到response了。
*/
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {

...

// Call the next interceptor in the chain.
// 创建新的拦截链,链中的拦截器集合index+1。
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
// 执行当前的拦截器,如果在配置 okhttpClient 时没有设置 intercept。默认是先执行;RetryAndFollowUpInterceptor 拦截器
Interceptor interceptor = interceptors.get(index);
// 顺序执行拦截器
Response response = interceptor.intercept(next);

...

return response;
}

RetryAndFollowUpInterceptor

失败重试和重定向拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Response intercept(Chain chain) throws IOException {
while (true) {
...
response = realChain.proceed(request, streamAllocation, null, null);
...
// followUpRequest 判断响应中是否有失败或者重定向(Location)标志,失败的话返回response.request,重定向的话构造新的Request返回
Request followUp = followUpRequest(response, streamAllocation.route());
if (followUp == null) {
//如果没有失败或重定向,返回response
return response;
}
...
// 用新重定向的request继续while走拦截链
request = followUp;
priorResponse = response;
}

}
}

BridgeInterceptor

桥拦截器,添加或者移除一些 header

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public Response intercept(Chain chain) throws IOException {
...
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
// contentType 不空就加Content-Type这个header
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
//contentLength 不是-1就加Content-Length这个header
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
//移除Content-Length 这个header
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
...
//后面还有响应的header
}

CacheInterceptor

缓存拦截器:根据缓存策略,判断是否返回缓存数据,响应的数据是否要缓存起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 上半部,没有网络时的处理
*/
@Override
public Response intercept(Chain chain) throws IOException {
// 如果用户自己配置了缓存拦截器,cacheCandidate = cache.Response 获取用户自己存储的Response,否则 cacheCandidate = null;同时从CacheStrategy 获取cacheResponse 和 networkRequest
// cacheCandidate 是上次与服务器交互时缓存的 Response,可以读取缓存 Header 的 Respone。这里的缓存均基于 Map。Key 是请求中 url 的 md5,value 是文件中查询到的缓存,页面置换基于 LRU 算法
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();
// 执行响应缓存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
// 网络请求,如果networkRequest == null 则说明不使用网络请求
Request networkRequest = strategy.networkRequest;
// 缓存的响应,获取缓存中(CacheStrategy)的Response
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
// 记录当前请求是网络发起还是缓存发起
cache.trackResponse(strategy);
}
// 如果cacheCandidate != null 而 cacheResponse == null 说明缓存无效,清除cacheCandidate缓存。
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// If we're forbidden from using the network and the cache is insufficient, fail.
// 不进行网络请求并且缓存不存在或过期,则返回 504 错误,责任链此时也就终止,不会在往下继续执行。
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// If we don't need the network, we're done.
// 不使用网络请求且存在缓存,则直接返回缓存
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
// 其他情况则请求网络
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 下半部,有网络时的处理
*/
@Override
public Response intercept(Chain chain) throws IOException {
// 执行下一个拦截器,也就是请求网络。
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// 责任链执行完毕后,会返回最终响应数据,如果缓存存在更新缓存,如果缓存不存在加入到缓存中去。
// 责任链的好处,当责任链执行完毕,如果拦截器想要拿到最终的数据做其他的逻辑处理等,这样就不用在做其他的调用方法逻辑了,直接在当前的拦截器就可以拿到最终的数据。
// If we have a cache response too, then we're doing a conditional get.
// 如果有缓存并可用,则用缓存的数据并更新缓存,否则就用网络请求返回的数据。
if (cacheResponse != null) {
// 304响应码,则缓存有效,说明自从上次请求后,请求需要响应的内容未发生改变,就用当前缓存的 Response,关闭网络连接,释放连接。
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
// 缓存Response
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();

if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}

if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}

return response;
}

ConnectInterceptor

连接池拦截器:主要是从连接池去取连接,http请求要先3次握手才能建立连接,复用连接可以免去握手的时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override 
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();

// 以前的版本:StreamAllocation streamAllocation = realChain.streamAllocation();
Transmitter transmitter = realChain.transmitter();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

return realChain.proceed(request, transmitter, exchange);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Returns a new exchange to carry a new request and response. */
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
synchronized (connectionPool) {
if (noMoreExchanges) {
throw new IllegalStateException("released");
}
if (exchange != null) {
throw new IllegalStateException("cannot make a new request because the previous response "
+ "is still open: please call response.close()");
}
}

ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

synchronized (connectionPool) {
this.exchange = result;
this.exchangeRequestDone = false;
this.exchangeResponseDone = false;
return result;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ExchangeCodec find(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();

try {
// 找一个连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
return resultConnection.newCodec(client, chain);
} catch (RouteException e) {
trackFailure();
throw e;
} catch (IOException e) {
trackFailure();
throw new RouteException(e);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
// 获取连接的方法
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);

// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}

// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges();
continue;
}

return candidate;
}
}

CallServerInterceptor

发送和接收数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// class:CallServerInterceptor
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Exchange exchange = realChain.exchange();
Request request = realChain.request();

long sentRequestMillis = System.currentTimeMillis();

exchange.writeRequestHeaders(request);

boolean responseHeadersStarted = false;
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
exchange.flushRequest();
responseHeadersStarted = true;
exchange.responseHeadersStart();
responseBuilder = exchange.readResponseHeaders(true);
}

if (responseBuilder == null) {
if (request.body().isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest();
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, true));
request.body().writeTo(bufferedRequestBody);
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, false));
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
} else {
exchange.noRequestBody();
if (!exchange.connection().isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection();
}
}
} else {
exchange.noRequestBody();
}

if (request.body() == null || !request.body().isDuplex()) {
exchange.finishRequest();
}

if (!responseHeadersStarted) {
exchange.responseHeadersStart();
}

if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(false);
}

Response response = responseBuilder
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
response = exchange.readResponseHeaders(false)
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

code = response.code();
}

exchange.responseHeadersEnd(response);

if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
// 最终获取 response 方法 exchange.openResponseBody(response)
response = response.newBuilder()
.body(exchange.openResponseBody(response))
.build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
exchange.noNewExchangesOnConnection();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
// 拦截链已经到底部,直接返回 response,将response回传给上一个拦截器
return response;
}
1
2
3
4
5
6
7
8
9
// class:Exchange
public ResponseBody openResponseBody(Response response) throws IOException {
...

//内部就是通过Okio.buffer去请求网络
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));

...
}

模拟 OkHttp 责任链

1
2
3
4
5
6
7
8
9
public interface Interceptor {

String interceptor(Chain chain);

interface Chain {
String request();
String proceed(String request);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class BridgeInterceptor implements Interceptor{

@Override
public String interceptor(Chain chain) {
System.out.println("执行 BridgeInterceptor 拦截器之前代码");
String proceed = chain.proceed(chain.request());
System.out.println("执行 BridgeInterceptor 拦截器之后代码 得到最终数据:"+proceed);
return proceed;
}
}

public class RetryAndFollowInterceptor implements Interceptor {
@Override
public String interceptor(Chain chain) {
System.out.println("执行 RetryAndFollowInterceptor 拦截器之前代码");
String proceed = chain.proceed(chain.request());
System.out.println("执行 RetryAndFollowInterceptor 拦截器之后代码 得到最终数据:" + proceed);
return proceed;
}
}

public class CacheInterceptor implements Interceptor {
@Override
public String interceptor(Chain chain) {
System.out.println("执行 CacheInterceptor 最后一个拦截器 返回最终数据");
return "success";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RealInterceptorChain implements Interceptor.Chain {

private List<Interceptor> interceptors;
private int index;
private String request;

public RealInterceptorChain(List<Interceptor> interceptors,int index,String request){
this.interceptors = interceptors;
this.index = index;
this.request = request;
}

@Override
public String request() {
return request;
}

@Override
public String proceed(String request) {
if (index >= interceptors.size())
return null;
// 获取下一个责任链
RealInterceptorChain next = new RealInterceptorChain(interceptors,index+1,request);
// 执行当前的拦截器
Interceptor interceptor = interceptors.get(index);

return interceptor.interceptor(next);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
List<Interceptor> interceptors = new ArrayList<>();
interceptors.add(new BridgeInterceptor());
interceptors.add(new RetryAndFollowInterceptor());
interceptors.add(new CacheInterceptor());
RealInterceptorChain realInterceptorChain = new RealInterceptorChain(interceptors,0,"request");
realInterceptorChain.proceed("request");

// 运行结果:
// 执行 BridgeInterceptor 拦截器之前代码
// 执行 RetryAndFollowInterceptor 拦截器之前代码
// 执行 CacheInterceptor 最后一个拦截器 返回最终数据
// 执行 RetryAndFollowInterceptor 拦截器之后代码 得到最终数据:success
// 执行 BridgeInterceptor 拦截器之后代码 得到最终数据:success

同步请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 同步请求,直接返回一个请求的结果。
// 在同步请求的方法中,涉及到dispatcher 只是告知了执行状态,开始执行了(调用 executed),执行完毕了(调用 finished)其他的并没有涉及到。dispatcher 更多的是服务异步请求。
@Override
public Response execute() throws IOException {
// 避免重复执行
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();
transmitter.callStart();
try {
// 交给调度器去执行,实际上调度器只是将 call 加入到了同步执行队列中。
// 将 RealCall 加到队列 Deque<RealCall> runningSyncCalls
client.dispatcher().executed(this);
// 通过一系列的拦截器请求处理和响应处理得到最终的返回结果
return getResponseWithInterceptorChain();
} finally {
// 执行调度器的完成方法,移除队列
client.dispatcher().finished(this);
}
}

简述请求执行流程

1、OkhttpClient 实现了 Call.Fctory,负责为 Request 创建 Call;

2、RealCall 为 Call 的具体实现,其 enqueue() 异步请求接口通过 Dispatcher() 调度器利用 ExcutorService 实现,而最终进行网络请求时和同步的 execute() 接口一致,都是通过 getResponseWithInterceptorChain() 函数实现。

3、getResponseWithInterceptorChain() 中利用 Interceptor 链条,责任链模式 分层实现缓存、透明压缩、网络 IO 等功能,最终将响应数据返回给用户。

复用连接池

连接池位于 okhttp3.ConnectionPool:

连接池复用的核心就是用 Deque 来存储连接,通过 put 等操作来对 Deque 进行操作,另外通过判断连接中的计数对象 Transmitter 来进行自动回收连接。

okhttp3.ConnectionPool 在 OkHttpClient 实例化时创建,它的构造方法如下:

1
2
3
4
5
6
7
8
// 默认空闲的 socket 最大连接数为 5 个,socket 的 keepAlive 时间为 5 分钟。
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}

public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
}

它的主要变量位于 okhttp3.internal.connection.RealConnectionPool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
// 线程池,类似于 CachedThreadPool,工作队列采用了没有容量的 SynchronousQueue。
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));

/** The maximum number of idle connections for each address. */
// 空闲的 socket 最大连接数
private final int maxIdleConnections;
// socket 的 keepAlive 时间
private final long keepAliveDurationNs;
// 双向队列,双端队列同时具有队列和栈性质,经常在缓存中被使用,里面维护了 RealConnection 也就是 socket物理连接的包装。
private final Deque<RealConnection> connections = new ArrayDeque<>();
// 用来记录连接失败的路线名单,当连接失败的时候就会把失败的线路加进去。
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;

缓存操作

RealConnectionPool 提供了对 Deque 的一些操作,比如放入连接,获取连接,移除连接,移除所有连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Attempts to acquire a recycled connection to {@code address} for {@code transmitter}. Returns
* true if a connection was acquired.
*
* <p>If {@code routes} is non-null these are the resolved routes (ie. IP addresses) for the
* connection. This is used to coalesce related domains to the same HTTP/2 connection, such as
* {@code square.com} and {@code square.ca}.
*/
// 替换了以前版本的 get()
boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
@Nullable List<Route> routes, boolean requireMultiplexed) {
assert (Thread.holdsLock(this));
// 遍历 connections 缓存列表。
for (RealConnection connection : connections) {
if (requireMultiplexed && !connection.isMultiplexed()) continue;
if (!connection.isEligible(address, routes)) continue;
transmitter.acquireConnectionNoEvents(connection);
return true;
}
return false;
}
// 添加到 Deque 之前首先要清理空闲的线程
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}

/**
* Notify this pool that {@code connection} has become idle. Returns true if the connection has
* been removed from the pool and should be closed.
*/
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
if (connection.noNewExchanges || maxIdleConnections == 0) {
connections.remove(connection);
return true;
} else {
notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
return false;
}
}

public void evictAll() {
List<RealConnection> evictedConnections = new ArrayList<>();
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
if (connection.transmitters.isEmpty()) {
connection.noNewExchanges = true;
evictedConnections.add(connection);
i.remove();
}
}
}

for (RealConnection connection : evictedConnections) {
closeQuietly(connection.socket());
}
}

自动回收连接

以前版本是根据 StreamAllocation 引用计数是否为 0 来实现自动回收连接,而现在是通过 Transmitter 类完成.

RealConnection 是 socket 物理连接的包装,里面维护了 List<Reference> 的引用。List 中 Transmitter 的数量也就是 socket 被引用的计数。如果计数为 0,则说明此连接没有被使用,也就是空闲的,需要通过算法实现回收:如果计数不为 0,则表示上层代码仍然在引用,就无需关闭连接。

在 put 操作时,会调用 executor.execute(cleanupRunnable); 来清理闲置的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private final Runnable cleanupRunnable = () -> {
while (true) {
// 线程不断地调用 cleanup() 进行清理,并返回下次需要清理的间隔时间,然后调用 wait() 进行等待以释放锁与时间片。
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (RealConnectionPool.this) {
try {
RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
*
* <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
* -1 if no further cleanups are required.
*/
// 根据连接中的引用计数来计算空闲连接数和活跃连接数,然后标记出空闲的连接。
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;

// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();

// If the connection is in use, keep searching.
// 判断连接是否闲置。
if (pruneAndGetAllocationCount(connection, now) > 0) {
// 如果返回值大于 0 则是活跃连接,否则就是空闲连接。
inUseConnectionCount++;
continue;
}

idleConnectionCount++;

// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
// 如果空闲连接 keepAlive 时间超过 5 分钟,或者空闲连接数超过 5 个,则从 Deque 中移除此连接。接下来根据空闲连接或者活跃连接来返回下次需要清理的时间数:如果空闲连接大于 0 ,则返回此连接即将到期的时间:如果都是活跃连接并且大于 0,则返回默认的 keepAlive 时间 5 分钟。
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
// 如果没有任何连接,则跳出循环并返回 -1.
return -1;
}
}

closeQuietly(longestIdleConnection.socket());

// Cleanup again immediately.
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Prunes any leaked transmitters and then returns the number of remaining live transmitters on
* {@code connection}. Transmitters are leaked if the connection is tracking them but the
* application code has abandoned them. Leak detection is imprecise and relies on garbage
* collection.
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<Transmitter>> references = connection.transmitters;
// 遍历传进来的 Transmitter,而不是以前版本的 RealConnection 的 StreamAllocation。
// 如果 Transmitter 被使用,则接着遍历下一个 Transmitter,如果未被使用,则从列表中移除。
for (int i = 0; i < references.size(); ) {
Reference<Transmitter> reference = references.get(i);

if (reference.get() != null) {
i++;
continue;
}

// We've discovered a leaked transmitter. This is an application bug.
TransmitterReference transmitterRef = (TransmitterReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);

references.remove(i);
connection.noNewExchanges = true;

// If this was the last allocation, the connection is eligible for immediate eviction.
// 如果列表为空,则说明此连接没有引用了,返回 0,表示此连接是空闲连接:
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
// 否则就返回非 0 的数,表示此连接是活跃连接.
return references.size();
}

引用计数

使用类似于引用计数的方式跟踪 socket 流的调用.计数对象是 Transmitter,会被反复执行这两个方法,这其实是在改变 List<Reference> 的大小.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void acquireConnectionNoEvents(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));

if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
connection.transmitters.add(new TransmitterReference(this, callStackTrace));
}

/**
* Remove the transmitter from the connection's list of allocations. Returns a socket that the
* caller should close.
*/
@Nullable Socket releaseConnectionNoEvents() {
assert (Thread.holdsLock(connectionPool));

int index = -1;
for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
Reference<Transmitter> reference = this.connection.transmitters.get(i);
if (reference.get() == this) {
index = i;
break;
}
}

if (index == -1) throw new IllegalStateException();

RealConnection released = this.connection;
released.transmitters.remove(index);
this.connection = null;

if (released.transmitters.isEmpty()) {
released.idleAtNanos = System.nanoTime();
if (connectionPool.connectionBecameIdle(released)) {
return released.socket();
}
}

return null;
}

优势

1、支持 SPDY,允许连接同一主机的所有请求分享一个socket。 如果SPDY不可用,会使用连接池减少请求延迟。
2、使用GZIP压缩下载内容,且压缩操作对用户是透明的。
3、利用响应缓存来避免重复的网络请求。
4、失败重试。如果服务端有多个IP地址,当第一个地址连接失败时,OKHttp会尝试连接其他备用地址,这对IPV4和IPV6以及寄宿在多个数据中心的服务而言,是非常有必要的。
5、对 http 协议的封装,比较底层,因此拓展性强,便于封装。
6、OKhttp 使用非阻塞 I/O 模型 OKio (算是nio吧,和一般的nio不太一致,一般的nio定时去检查是否有数据到来,有的话就读,没有就返回,但是okio的实现是定时去检查是否已经读写完成,没完成就认为超时,close掉该socket),该I/O框架的内存表现也很好(mars使用epoll)。

链接

参考资料:
彻底理解OkHttp
Android 进阶之光