/** Ready async calls in the order they'll be run. */ // 异步等待队列。。双端队列,支持首尾两端 双向开口可进可出,方便移除。 privatefinal Deque<AsyncCall> readyAsyncCalls = newArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ // 正在运行的异步请求队列 privatefinal Deque<AsyncCall> runningAsyncCalls = newArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */ // 正在运行的同步请求队列 privatefinal Deque<RealCall> runningSyncCalls = newArrayDeque<>();
// If we're forbidden from using the network and the cache is insufficient, fail. // 不进行网络请求并且缓存不存在或过期,则返回 504 错误,责任链此时也就终止,不会在往下继续执行。 if (networkRequest == null && cacheResponse == null) { returnnewResponse.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); }
// If we don't need the network, we're done. // 不使用网络请求且存在缓存,则直接返回缓存 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } // 其他情况则请求网络 }
/** * 下半部,有网络时的处理 */ @Override public Response intercept(Chain chain)throws IOException { // 执行下一个拦截器,也就是请求网络。 ResponsenetworkResponse=null; try { networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } } // 责任链执行完毕后,会返回最终响应数据,如果缓存存在更新缓存,如果缓存不存在加入到缓存中去。 // 责任链的好处,当责任链执行完毕,如果拦截器想要拿到最终的数据做其他的逻辑处理等,这样就不用在做其他的调用方法逻辑了,直接在当前的拦截器就可以拿到最终的数据。 // If we have a cache response too, then we're doing a conditional get. // 如果有缓存并可用,则用缓存的数据并更新缓存,否则就用网络请求返回的数据。 if (cacheResponse != null) { // 304响应码,则缓存有效,说明自从上次请求后,请求需要响应的内容未发生改变,就用当前缓存的 Response,关闭网络连接,释放连接。 if (networkResponse.code() == HTTP_NOT_MODIFIED) { Responseresponse= cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close();
// Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } } // 缓存Response Responseresponse= networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build();
if (cache != null) { if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. CacheRequestcacheRequest= cache.put(response); return cacheWritingResponse(cacheRequest, response); }
if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { // The cache cannot be written. } } }
// We need the network to satisfy this request. Possibly for validating a conditional GET. booleandoExtensiveHealthChecks= !request.method().equals("GET"); Exchangeexchange= transmitter.newExchange(chain, doExtensiveHealthChecks);
/** Returns a new exchange to carry a new request and response. */ Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) { synchronized (connectionPool) { if (noMoreExchanges) { thrownewIllegalStateException("released"); } if (exchange != null) { thrownewIllegalStateException("cannot make a new request because the previous response " + "is still open: please call response.close()"); } }
/** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */ // 获取连接的方法 private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)throws IOException { while (true) { RealConnectioncandidate= findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks. synchronized (connectionPool) { if (candidate.successCount == 0) { return candidate; } }
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges(); continue; }
booleanresponseHeadersStarted=false; Response.BuilderresponseBuilder=null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { exchange.flushRequest(); responseHeadersStarted = true; exchange.responseHeadersStart(); responseBuilder = exchange.readResponseHeaders(true); }
if (responseBuilder == null) { if (request.body().isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest(); BufferedSinkbufferedRequestBody= Okio.buffer( exchange.createRequestBody(request, true)); request.body().writeTo(bufferedRequestBody); } else { // Write the request body if the "Expect: 100-continue" expectation was met. BufferedSinkbufferedRequestBody= Okio.buffer( exchange.createRequestBody(request, false)); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } else { exchange.noRequestBody(); if (!exchange.connection().isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection(); } } } else { exchange.noRequestBody(); }
if (request.body() == null || !request.body().isDuplex()) { exchange.finishRequest(); }
if (!responseHeadersStarted) { exchange.responseHeadersStart(); }
if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false); }
intcode= response.code(); if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response response = exchange.readResponseHeaders(false) .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build();
code = response.code(); }
exchange.responseHeadersEnd(response);
if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { // 最终获取 response 方法 exchange.openResponseBody(response) response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); }
if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { exchange.noNewExchangesOnConnection(); }
/** * Background threads are used to cleanup expired connections. There will be at most a single * thread running per connection pool. The thread pool executor permits the pool itself to be * garbage collected. */ // 线程池,类似于 CachedThreadPool,工作队列采用了没有容量的 SynchronousQueue。 privatestaticfinalExecutorexecutor=newThreadPoolExecutor(0/* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L/* keepAliveTime */, TimeUnit.SECONDS, newSynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */ // 空闲的 socket 最大连接数 privatefinalint maxIdleConnections; // socket 的 keepAlive 时间 privatefinallong keepAliveDurationNs; // 双向队列,双端队列同时具有队列和栈性质,经常在缓存中被使用,里面维护了 RealConnection 也就是 socket物理连接的包装。 privatefinal Deque<RealConnection> connections = newArrayDeque<>(); // 用来记录连接失败的路线名单,当连接失败的时候就会把失败的线路加进去。 finalRouteDatabaserouteDatabase=newRouteDatabase(); boolean cleanupRunning;
/** * Attempts to acquire a recycled connection to {@code address} for {@code transmitter}. Returns * true if a connection was acquired. * * <p>If {@code routes} is non-null these are the resolved routes (ie. IP addresses) for the * connection. This is used to coalesce related domains to the same HTTP/2 connection, such as * {@code square.com} and {@code square.ca}. */ // 替换了以前版本的 get() booleantransmitterAcquirePooledConnection(Address address, Transmitter transmitter, @Nullable List<Route> routes, boolean requireMultiplexed) { assert (Thread.holdsLock(this)); // 遍历 connections 缓存列表。 for (RealConnection connection : connections) { if (requireMultiplexed && !connection.isMultiplexed()) continue; if (!connection.isEligible(address, routes)) continue; transmitter.acquireConnectionNoEvents(connection); returntrue; } returnfalse; } // 添加到 Deque 之前首先要清理空闲的线程 voidput(RealConnection connection) { assert (Thread.holdsLock(this)); if (!cleanupRunning) { cleanupRunning = true; executor.execute(cleanupRunnable); } connections.add(connection); }
/** * Notify this pool that {@code connection} has become idle. Returns true if the connection has * been removed from the pool and should be closed. */ booleanconnectionBecameIdle(RealConnection connection) { assert (Thread.holdsLock(this)); if (connection.noNewExchanges || maxIdleConnections == 0) { connections.remove(connection); returntrue; } else { notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit. returnfalse; } }
publicvoidevictAll() { List<RealConnection> evictedConnections = newArrayList<>(); synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnectionconnection= i.next(); if (connection.transmitters.isEmpty()) { connection.noNewExchanges = true; evictedConnections.add(connection); i.remove(); } } }
for (RealConnection connection : evictedConnections) { closeQuietly(connection.socket()); } }
/** * Performs maintenance on this pool, evicting the connection that has been idle the longest if * either it has exceeded the keep alive limit or the idle connections limit. * * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns * -1 if no further cleanups are required. */ // 根据连接中的引用计数来计算空闲连接数和活跃连接数,然后标记出空闲的连接。 longcleanup(long now) { intinUseConnectionCount=0; intidleConnectionCount=0; RealConnectionlongestIdleConnection=null; longlongestIdleDurationNs= Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due. synchronized (this) { for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) { RealConnectionconnection= i.next();
// If the connection is in use, keep searching. // 判断连接是否闲置。 if (pruneAndGetAllocationCount(connection, now) > 0) { // 如果返回值大于 0 则是活跃连接,否则就是空闲连接。 inUseConnectionCount++; continue; }
idleConnectionCount++;
// If the connection is ready to be evicted, we're done. longidleDurationNs= now - connection.idleAtNanos; if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; } } // 如果空闲连接 keepAlive 时间超过 5 分钟,或者空闲连接数超过 5 个,则从 Deque 中移除此连接。接下来根据空闲连接或者活跃连接来返回下次需要清理的时间数:如果空闲连接大于 0 ,则返回此连接即将到期的时间:如果都是活跃连接并且大于 0,则返回默认的 keepAlive 时间 5 分钟。 if (longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections) { // We've found a connection to evict. Remove it from the list, then close it below (outside // of the synchronized block). connections.remove(longestIdleConnection); } elseif (idleConnectionCount > 0) { // A connection will be ready to evict soon. return keepAliveDurationNs - longestIdleDurationNs; } elseif (inUseConnectionCount > 0) { // All connections are in use. It'll be at least the keep alive duration 'til we run again. return keepAliveDurationNs; } else { // No connections, idle or in use. cleanupRunning = false; // 如果没有任何连接,则跳出循环并返回 -1. return -1; } }
/** * Prunes any leaked transmitters and then returns the number of remaining live transmitters on * {@code connection}. Transmitters are leaked if the connection is tracking them but the * application code has abandoned them. Leak detection is imprecise and relies on garbage * collection. */ privateintpruneAndGetAllocationCount(RealConnection connection, long now) { List<Reference<Transmitter>> references = connection.transmitters; // 遍历传进来的 Transmitter,而不是以前版本的 RealConnection 的 StreamAllocation。 // 如果 Transmitter 被使用,则接着遍历下一个 Transmitter,如果未被使用,则从列表中移除。 for (inti=0; i < references.size(); ) { Reference<Transmitter> reference = references.get(i);
if (reference.get() != null) { i++; continue; }
// We've discovered a leaked transmitter. This is an application bug. TransmitterReferencetransmitterRef= (TransmitterReference) reference; Stringmessage="A connection to " + connection.route().address().url() + " was leaked. Did you forget to close a response body?"; Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
// If this was the last allocation, the connection is eligible for immediate eviction. // 如果列表为空,则说明此连接没有引用了,返回 0,表示此连接是空闲连接: if (references.isEmpty()) { connection.idleAtNanos = now - keepAliveDurationNs; return0; } } // 否则就返回非 0 的数,表示此连接是活跃连接. return references.size(); }
/** * Remove the transmitter from the connection's list of allocations. Returns a socket that the * caller should close. */ @Nullable Socket releaseConnectionNoEvents() { assert (Thread.holdsLock(connectionPool));
intindex= -1; for (inti=0, size = this.connection.transmitters.size(); i < size; i++) { Reference<Transmitter> reference = this.connection.transmitters.get(i); if (reference.get() == this) { index = i; break; } }