Android 网络编程(8): 源码解析 OkHttp 中篇[复用连接池]

1.引子

在了解OkHttp的复用连接池之前,我们首先要了解几个概念。

TCP三次握手

通常我们进行HTTP连接网络的时候我们会进行TCP的三次握手,然后传输数据,然后再释放连接。

TCP三次握手的过程为:

  • 第一次握手:建立连接。客户端发送连接请求报文段,将SYN位置为1,Sequence Number为x;然后,客户端进入SYN_SEND状态,等待服务器的确认;
  • 第二次握手:服务器收到客户端的SYN报文段,需要对这个SYN报文段进行确认,设置Acknowledgment Number为x+1(Sequence Number+1);同时,自己自己还要发送SYN请求信息,将SYN位置为1,Sequence Number为y;服务器端将上述所有信息放到一个报文段(即SYN+ACK报文段)中,一并发送给客户端,此时服务器进入SYN_RECV状态;
  • 第三次握手:客户端收到服务器的SYN+ACK报文段。然后将Acknowledgment Number设置为y+1,向服务器发送ACK报文段,这个报文段发送完毕以后,客户端和服务器端都进入ESTABLISHED状态,完成TCP三次握手。

TCP四次分手

当客户端和服务器通过三次握手建立了TCP连接以后,当数据传送完毕,断开连接就需要进行TCP四次分手:

  • 第一次分手:主机1(可以使客户端,也可以是服务器端),设置Sequence Number和Acknowledgment
    Number,向主机2发送一个FIN报文段;此时,主机1进入FIN_WAIT_1状态;这表示主机1没有数据要发送给主机2了;
  • 第二次分手:主机2收到了主机1发送的FIN报文段,向主机1回一个ACK报文段,Acknowledgment Number为Sequence
  • 第三次分手:主机2向主机1发送FIN报文段,请求关闭连接,同时主机2进入LAST_ACK状态;
  • 第四次分手:主机1收到主机2发送的FIN报文段,向主机2发送ACK报文段,然后主机1进入TIME_WAIT状态;主机2收到主机1的ACK报文段以后,就关闭连接;此时,主机1等待2MSL后依然没有收到回复,则证明Server端已正常关闭,那好,主机1也可以关闭连接了。

来看下面的图加强下理解:

keepalive connections

当然大量的连接每次连接关闭都要三次握手四次分手的很显然会造成性能低下,因此http有一种叫做keepalive connections的机制,它可以在传输数据后仍然保持连接,当客户端需要再次获取数据时,直接使用刚刚空闲下来的连接而不需要再次握手。

Okhttp支持5个并发KeepAlive,默认链路生命为5分钟(链路空闲后,保持存活的时间)。

2.连接池(ConnectionPool)分析

引用计数

在okhttp中,在高层代码的调用中,使用了类似于引用计数的方式跟踪Socket流的调用,这里的计数对象是StreamAllocation,它被反复执行aquire与release操作,这两个函数其实是在改变RealConnection中的List<Reference<StreamAllocation>> 的大小。(StreamAllocation.Java)


public void acquire(RealConnection connection) {
    connection.allocations.add(new WeakReference<>(this));
  }


 private void release(RealConnection connection) {
    for (int i = 0, size = connection.allocations.size(); i < size; i++) {
      Reference reference = connection.allocations.get(i);
      if (reference.get() == this) {
        connection.allocations.remove(i);
        return;
      }
    }
    throw new IllegalStateException();
  }

RealConnection是socket物理连接的包装,它里面维护了List<Reference<StreamAllocation>>的引用。List中StreamAllocation的数量也就是socket被引用的计数,如果计数为0的话,说明此连接没有被使用就是空闲的,需要通过下文的算法实现回收;如果计数不为0,则表示上层代码仍然引用,就不需要关闭连接。

主要变量

连接池的类位于okhttp3.ConnectionPool:


private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  //空闲的socket最大连接数
  private final int maxIdleConnections;
  //socket的keepAlive时间
  private final long keepAliveDurationNs;
  // 双向队列
  private final Deque connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning;

主要的变量有必要说明一下:

  • executor线程池,类似于CachedThreadPool,需要注意的是这种线程池的工作队列采用了没有容量的SynchronousQueue,不了解它的请查看Java并发编程(六)阻塞队列这篇文章。
  • Deque<RealConnection>,双向队列,双端队列同时具有队列和栈性质,经常在缓存中被使用,里面维护了RealConnection也就是socket物理连接的包装。
  • RouteDatabase,它用来记录连接失败的Route的黑名单,当连接失败的时候就会把失败的线路加进去。

构造函数


 public ConnectionPool() {
  //默认空闲的socket最大连接数为5个,socket的keepAlive时间为5秒
    this(5, 5, TimeUnit.MINUTES);
  }
  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

通过构造函数可以看出ConnectionPool默认的空闲的socket最大连接数为5个,socket的keepAlive时间为5秒。

实例化

ConnectionPool实例化是在OkHttpClient实例化时进行的:


public OkHttpClient() {
    this(new Builder());
  }

在OkHttpClient的构造函数中调用了new Builder():


 public Builder() {
      dispatcher = new Dispatcher();
     ...省略
      connectionPool = new ConnectionPool();
     ...省略
    }

缓存操作

ConnectionPool提供对Deque<RealConnection>进行操作的方法分别为put、get、connectionBecameIdle和evictAll几个操作。分别对应放入连接、获取连接、移除连接和移除所有连接操作,这里我们举例put和get操作。

put操作


void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

在添加到Deque<RealConnection>之前首先要清理空闲的线程,这个后面会讲到。

get操作


RealConnection get(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.allocations.size() < connection.allocationLimit
          && address.equals(connection.route().address)
          && !connection.noNewStreams) {
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
  }

遍历connections缓存列表,当某个连接计数的次数小于限制的大小并且request的地址和缓存列表中此连接的地址完全匹配。则直接复用缓存列表中的connection作为request的连接。

自动回收连接

okhttp是根据StreamAllocation引用计数是否为0来实现自动回收连接的。我们在put操作前首先要调用executor.execute(cleanupRunnable)来清理闲置的线程。我们来看看cleanupRunnable到底做了什么:


private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

线程不断的调用cleanup来进行清理,并返回下次需要清理的间隔时间,然后调用wait进行等待以释放锁与时间片,当等待时间到了后,再次进行清理,并返回下次要清理的间隔时间,如此循环下去,接下来看看cleanup方法:


long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
    //遍历连接
      for (Iterator i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        //查询此连接的StreamAllocation的引用数量,如果大于0则inUseConnectionCount数量加1,否则idleConnectionCount加1
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
        idleConnectionCount++;
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }
      //如果空闲连接keepAlive时间超过5分钟,或者空闲连接数超过5个,则从Deque中移除此连接
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
        connections.remove(longestIdleConnection);
       //如果空闲连接大于0,则返回此连接即将到期的时间
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        return keepAliveDurationNs - longestIdleDurationNs;
        //如果没有空闲连接,并且活跃连接大于0则返回5分钟
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
        return keepAliveDurationNs;
      } else {
      //如果没有任何连接则跳出循环
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
  }

cleanup所做的简单总结就是根据连接中的引用计数来计算空闲连接数和活跃连接数,然后标记出空闲的连接,如果空闲连接keepAlive时间超过5分钟,或者空闲连接数超过5个,则从Deque中移除此连接。接下来根据空闲连接或者活跃连接来返回下次需要清理的时间数:如果空闲连接大于0则返回此连接即将到期的时间,如果都是活跃连接并且大于0则返回默认的keepAlive时间5分钟,如果没有任何连接则跳出循环并返回-1。在上述代码中的第13行,通过pruneAndGetAllocationCount方法来判断连接是否闲置的,如果pruneAndGetAllocationCount方法返回值大于0则是空闲连接,否则就是活跃连接,让我们来看看pruneAndGetAllocationCount方法:


private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List> references = connection.allocations;
    //遍历弱引用列表
    for (int i = 0; i < references.size(); ) {
      Reference reference = references.get(i);
      //若StreamAllocation被使用则接着循环
      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      Internal.logger.warning("A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?");
      //若StreamAllocation未被使用则移除引用
      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      //如果列表为空则说明此连接没有被引用了,则返回0,表示此连接是空闲连接
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }
    //否则返回非0的数,表示此连接是活跃连接
    return references.size();
  }

pruneAndGetAllocationCount方法首先遍历传进来的RealConnection的StreamAllocation列表,如果StreamAllocation被使用则接着遍历下一个StreamAllocation,如果StreamAllocation未被使用则从列表中移除。如果列表为空则说明此连接没有引用了,则返回0,表示此连接是空闲连接,否则就返回非0的数表示此连接是活跃连接。

总结

可以看出连接池复用的核心就是用Deque<RealConnection>来存储连接,通过put、get、connectionBecameIdle和evictAll几个操作来对Deque进行操作,另外通过判断连接中的计数对象StreamAllocation来进行自动回收连接。

参考资料
okhttp3源码
简析TCP的三次握手与四次分手
TCP三次握手过程
短连接、长连接与keep-alive
OkHttp3源码分析[复用连接池]
okhttp连接池复用机制