搞懂Druid之连接获取和归还

图片[1] - 搞懂Druid之连接获取和归还 - MaxSSL

前言

Druid是阿里开源的数据库连接池,是阿里监控系统Dragoon的副产品,提供了强大的可监控性和基于Filter-Chain的可扩展性。

本篇文章将对Druid数据库连接池的连接获取归还连接泄漏检测进行分析。分析Druid数据库连接池的源码前,需要明确几个概念。

  1. Druid数据库连接池中可用的连接存放在一个数组connections中;
  2. Druid数据库连接池做并发控制,主要靠一把可重入锁以及和这把锁关联的两个Condition对象;
public DruidAbstractDataSource(boolean lockFair) { lock = new ReentrantLock(lockFair); notEmpty = lock.newCondition(); empty = lock.newCondition();}复制代码
  1. 连接池没有可用连接时,应用线程会在notEmpty上等待,连接池已满时,生产连接的线程会在empty上等待;
  2. 对连接保活,就是每间隔一定时间,对达到了保活间隔周期的连接进行有效性校验,可以将无效连接销毁,也可以防止连接长时间不与数据库服务端通信。

Druid版本:1.2.11

正文

一. DruidDataSource连接获取

DruidDataSource获取连接的入口方法是DruidDataSource#getConnection方法,实现如下。

public DruidPooledConnection getConnection() throws SQLException {// maxWait表示获取连接时最大等待时间,单位毫秒,默认值为-1return getConnection(maxWait);}public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {// 首次获取连接时触发数据库连接池初始化init();if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(this);return filterChain.dataSource_connect(this, maxWaitMillis);} else {// 直接获取连接return getConnectionDirect(maxWaitMillis);}}复制代码

DruidDataSource#getConnection方法会调用到DruidDataSource#getConnectionDirect方法来获取连接,实现如下所示。

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {int notFullTimeoutRetryCnt = 0;for (; ; ) {DruidPooledConnection poolableConnection;try {// 从连接池拿到连接poolableConnection = getConnectionInternal(maxWaitMillis);} catch (GetConnectionTimeoutException ex) {// 拿连接时有异常,可以重试// 重试次数由notFullTimeoutRetryCount指定if (notFullTimeoutRetryCnt  lastActiveTimeMillis) {lastActiveTimeMillis = lastKeepTimeMillis;}// 计算空闲时间long idleMillis = currentTimeMillis - lastActiveTimeMillis;// testWhileIdle为true时的判断时间间隔long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;if (timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis|| idleMillis < 0) {boolean validate = testConnectionInternal(poolableConnection.holder,poolableConnection.conn);if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");}discardConnection(poolableConnection.holder);continue;}}}}// 如果设置removeAbandoned为true// 则将连接放到activeConnections活跃连接map中if (removeAbandoned) {StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();poolableConnection.connectStackTrace = stackTrace;poolableConnection.setConnectedTimeNano();poolableConnection.traceEnable = true;activeConnectionLock.lock();try {activeConnections.put(poolableConnection, PRESENT);} finally {activeConnectionLock.unlock();}}if (!this.defaultAutoCommit) {poolableConnection.setAutoCommit(false);}return poolableConnection;}}复制代码

DruidDataSource#getConnectionDirect方法中会先调用getConnectionInternal()方法从连接池中拿连接,然后如果开启了testOnBorrow,则校验一下连接的有效性,如果无效则重新调用getConnectionInternal()方法拿连接,直到拿到的连接通过校验。如果没有开启testOnBorrow但是开启了testWhileIdle,则会判断连接的空闲时间是否大于等于timeBetweenEvictionRunsMillis参数,如果满足则校验一下连接的有效性,若没有通过校验,那么需要重新调用getConnectionInternal()方法拿连接,直到拿到的连接通过校验或者连接的空闲时间小于timeBetweenEvictionRunsMillis

下面看一下实际从连接池拿连接的getConnectionInternal()方法的实现,如下所示。

private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {// 省略final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);final int maxWaitThreadCount = this.maxWaitThreadCount;DruidConnectionHolder holder;// 在死循环中从连接池拿连接// 一开始createDirect为false,表示先从池子中拿for (boolean createDirect = false; ; ) {if (createDirect) {// createDirect为true表示直接创建连接createStartNanosUpdater.set(this, System.nanoTime());// creatingCount为0表示当前没有其它连接正在被创建if (creatingCountUpdater.compareAndSet(this, 0, 1)) {// 创建物理连接PhysicalConnectionInfo pyConnInfo= DruidDataSource.this.createPhysicalConnection();holder = new DruidConnectionHolder(this, pyConnInfo);holder.lastActiveTimeMillis = System.currentTimeMillis();creatingCountUpdater.decrementAndGet(this);directCreateCountUpdater.incrementAndGet(this);// 省略boolean discard;lock.lock();try {// 如果当前正在使用的连接数未达到最大连接数// 则当前正在使用的连接数加1// 否则销毁刚刚创建出来的连接if (activeCount  activePeak) {activePeak = activeCount;activePeakTime = System.currentTimeMillis();}break;} else {discard = true;}} finally {lock.unlock();}if (discard) {JdbcUtils.close(pyConnInfo.getPhysicalConnection());}}}// 上锁try {lock.lockInterruptibly();} catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);throw new SQLException("interrupt", e);}try {// maxWaitThreadCount表示允许的最大等待连接的应用线程数// notEmptyWaitThreadCount表示正在等待连接的应用线程数// 等待连接的应用线程数达到最大值时,抛出异常if (maxWaitThreadCount > 0&& notEmptyWaitThreadCount >= maxWaitThreadCount) {connectErrorCountUpdater.incrementAndGet(this);throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "+ lock.getQueueLength());}// 发生了致命错误,且设置了致命错误数最大值大于0,且正在使用的连接数大于等于致命错误数最大值if (onFatalError&& onFatalErrorMaxActive > 0&& activeCount >= onFatalErrorMaxActive) {// 拼接异常并抛出// 省略throw new SQLException(errorMsg.toString(), lastFatalError);}connectCount++;// 如果配置的创建连接的线程池是一个定时线程池// 且连接池已经没有可用连接,// 且当前借出的连接数未达到允许的最大连接数// 且当前没有其它线程(应用线程,创建连接的线程,创建连接的线程池里的线程)在创建连接// 此时将createDirect置为true,让当前应用线程直接创建连接if (createScheduler != null&& poolingCount == 0&& activeCount  0) {createDirect = true;continue;}}if (maxWait > 0) {// 如果设置了等待连接的最大等待时间,则调用pollLast()方法来拿连接// pollLast()方法执行时如果池中没有连接,则应用线程会在notEmpty上最多等待maxWait的时间holder = pollLast(nanos);} else {// 调用takeLast()方法拿连接时,如果池中没有连接,则会在notEmpty上一直等待,直到池中有连接holder = takeLast();}if (holder != null) {if (holder.discard) {continue;}// 正在使用的连接数加1activeCount++;holder.active = true;if (activeCount > activePeak) {activePeak = activeCount;activePeakTime = System.currentTimeMillis();}}} catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);throw new SQLException(e.getMessage(), e);} catch (SQLException e) {connectErrorCountUpdater.incrementAndGet(this);throw e;} finally {lock.unlock();}break;}// 如果拿到的连接为null,说明拿连接时等待超时了// 此时抛出连接超时异常if (holder == null) {// 省略final Throwable createError;try {lock.lock();// 省略createError = this.createError;} finally {lock.unlock();}// 省略if (createError != null) {throw new GetConnectionTimeoutException(errorMessage, createError);} else {throw new GetConnectionTimeoutException(errorMessage);}}holder.incrementUseCount();DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);return poolalbeConnection;}复制代码

getConnectionInternal()方法中拿到连接的方式有三种,如下所示。

  1. 直接创建连接。需要满足配置的创建连接的线程池是一个定时线程池,且连接池已经没有可用连接,且当前借出的连接数未达到允许的最大连接数,且当前没有其它线程在创建连接;
  2. 从池中拿连接,并最多等待maxWait的时间。需要设置了maxWait
  3. 从池中拿连接,并一直等待直到拿到连接。

下面最后看一下超时等待拿连接的DruidDataSource#pollLast方法的实现。

private DruidConnectionHolder pollLast(long nanos)throws InterruptedException, SQLException {long estimate = nanos;for (; ; ) {if (poolingCount == 0) {// 如果池中已经没有连接,则唤醒在empty上等待的创建连接线程来创建连接emptySignal();if (failFast && isFailContinuous()) {throw new DataSourceNotAvailableException(createError);}// 等待时间耗尽,返回nullif (estimate  notEmptyWaitThreadPeak) {notEmptyWaitThreadPeak = notEmptyWaitThreadCount;}try {long startEstimate = estimate;// 应用线程在notEmpty上等待// 有连接被创建或者被归还时,会唤醒在notEmpty上等待的应用线程estimate = notEmpty.awaitNanos(estimate);notEmptyWaitCount++;notEmptyWaitNanos += (startEstimate - estimate);if (!enable) {connectErrorCountUpdater.incrementAndGet(this);if (disableException != null) {throw disableException;}throw new DataSourceDisableException();}} catch (InterruptedException ie) {notEmpty.signal();notEmptySignalCount++;throw ie;} finally {notEmptyWaitThreadCount--;}if (poolingCount == 0) {if (estimate > 0) {// 若唤醒后池中还是没有连接,且此时等待时间还有剩余// 则重新在notEmpty上等待continue;}waitNanosLocal.set(nanos - estimate);return null;}}// poolingCount--decrementPoolingCount();// 从池中拿到连接DruidConnectionHolder last = connections[poolingCount];connections[poolingCount] = null;long waitNanos = nanos - estimate;last.setLastNotEmptyWaitNanos(waitNanos);return last;}}复制代码

二. DruidDataSource连接归还

Druid数据库连接池中,每一个物理连接都会被包装成DruidConnectionHolder,在提供给应用线程前,还会将DruidConnectionHolder包装成DruidPooledConnection,类图如下所示。

图片[2] - 搞懂Druid之连接获取和归还 - MaxSSL

应用线程中使用连接完毕后,会调用DruidPooledConnectionclose()方法来归还连接,也就是将连接放回连接池。DruidPooledConnection#close方法如下所示。

public void close() throws SQLException {if (this.disable) {return;}DruidConnectionHolder holder = this.holder;if (holder == null) {if (dupCloseLogEnable) {LOG.error("dup close");}return;}DruidAbstractDataSource dataSource = holder.getDataSource();// 判断归还连接的线程和获取连接的线程是否是同一个线程boolean isSameThread = this.getOwnerThread() == Thread.currentThread();// 如果不是同一个线程,则设置asyncCloseConnectionEnable为trueif (!isSameThread) {dataSource.setAsyncCloseConnectionEnable(true);}// 如果开启了removeAbandoned机制// 或者asyncCloseConnectionEnable为true// 则调用syncClose()方法来归还连接// syncClose()方法中会先加锁,然后调用recycle()方法来回收连接if (dataSource.isAsyncCloseConnectionEnable()) {syncClose();return;}if (!CLOSING_UPDATER.compareAndSet(this, 0, 1)) {return;}try {for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {listener.connectionClosed(new ConnectionEvent(this));}List filters = dataSource.getProxyFilters();if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(dataSource);filterChain.dataSource_recycle(this);} else {// 回收连接recycle();}} finally {CLOSING_UPDATER.set(this, 0);}this.disable = true;}复制代码

DruidPooledConnection#close方法中,会先判断本次归还连接的线程和获取连接的线程是否是同一个线程,如果不是,则先加锁然后再调用recycle()方法来回收连接,如果是则直接调用recycle()方法来回收连接。当开启了removeAbandoned机制时,就可能会出现归还连接的线程和获取连接的线程不是同一个线程的情况,这是因为一旦开启了removeAbandoned机制,那么每一个被借出的连接都会被放到activeConnections活跃连接map中,并且在销毁连接的线程DestroyConnectionThread中会每间隔timeBetweenEvictionRunsMillis的时间就遍历一次activeConnections活跃连接map,一旦有活跃连接被借出的时间大于了removeAbandonedTimeoutMillis,那么销毁连接的线程DestroyConnectionThread就会主动去回收这个连接,以防止连接泄漏

下面看一下DruidPooledConnection#recycle方法的实现。

public void recycle() throws SQLException {if (this.disable) {return;}DruidConnectionHolder holder = this.holder;if (holder == null) {if (dupCloseLogEnable) {LOG.error("dup close");}return;}if (!this.abandoned) {DruidAbstractDataSource dataSource = holder.getDataSource();// 调用DruidAbstractDataSource#recycle回收当前连接dataSource.recycle(this);}this.holder = null;conn = null;transactionInfo = null;closed = true;}复制代码

DruidPooledConnection#recycle方法中会调用到DruidDataSource#recycle方法来回收连接。DruidDataSource#recycle方法实现如下所示。

protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {final DruidConnectionHolder holder = pooledConnection.holder;// 省略final boolean isAutoCommit = holder.underlyingAutoCommit;final boolean isReadOnly = holder.underlyingReadOnly;final boolean testOnReturn = this.testOnReturn;try {// 如果是非自动提交且存在事务// 则回滚事务if ((!isAutoCommit) && (!isReadOnly)) {pooledConnection.rollback();}// 重置连接信息(配置还原为默认值,关闭Statement,清除连接的Warnings等)boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();if (!isSameThread) {final ReentrantLock lock = pooledConnection.lock;lock.lock();try {holder.reset();} finally {lock.unlock();}} else {holder.reset();}// 省略// 开启了testOnReturn机制,则校验连接有效性if (testOnReturn) {boolean validate = testConnectionInternal(holder, physicalConnection);// 校验不通过则关闭物理连接if (!validate) {JdbcUtils.close(physicalConnection);destroyCountUpdater.incrementAndGet(this);lock.lock();try {if (holder.active) {activeCount--;holder.active = false;}closeCount++;} finally {lock.unlock();}return;}}// 省略lock.lock();try {// 连接即将放回连接池,需要将active设置为falseif (holder.active) {activeCount--;holder.active = false;}closeCount++;// 将连接放到connections数组的poolingCount位置// 然后poolingCount加1// 然后唤醒在notEmpty上等待连接的一个应用线程result = putLast(holder, currentTimeMillis);recycleCount++;} finally {lock.unlock();}if (!result) {JdbcUtils.close(holder.conn);LOG.info("connection recyle failed.");}} catch (Throwable e) {// 省略}}复制代码

DruidDataSource#recycle方法中会先重置连接信息,即将连接的一些配置重置为默认值,然后关闭连接的StatementWarnings,如果开启了testOnReturn机制,则还需要校验一下连接的有效性,校验不通过则直接关闭物理连接,最后,将连接放回到connections数组的poolingCount位置,然后唤醒一个在notEmpty上等待连接的应用线程。

三. removeAbandoned机制

Druid数据库连接池提供了removeAbandoned机制来防止连接泄漏。要开启removeAbandoned机制,需要设置如下参数。

参数说明
removeAbandoned发生连接泄漏时,是否需要回收泄漏的连接。默认为false,表示不回收。
removeAbandonedTimeoutMillis判断发生连接泄漏的超时时间。默认为300秒。
logAbandoned是否打印堆栈。默认为false,包是不打印。

下面将对开启removeAbandoned机制后,如何回收发生了泄漏的连接进行说明。当应用线程从连接池获取到一个连接后,如果开启了removeAbandoned机制,那么会将这个连接放到activeConnections活跃连接map中,对应的方法为DruidDataSource#getConnectionDirect,源码片段如下所示。

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {int notFullTimeoutRetryCnt = 0;for (; ; ) {DruidPooledConnection poolableConnection;// 省略if (removeAbandoned) {StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();poolableConnection.connectStackTrace = stackTrace;// 设置connectedTimeNano,用于后续判断连接借出时间是否大于removeAbandonedTimeoutMillispoolableConnection.setConnectedTimeNano();poolableConnection.traceEnable = true;activeConnectionLock.lock();try {// 将从连接池获取到的连接放到activeConnections中activeConnections.put(poolableConnection, PRESENT);} finally {activeConnectionLock.unlock();}}if (!this.defaultAutoCommit) {poolableConnection.setAutoCommit(false);}return poolableConnection;}}复制代码

又已知Druid数据库连接池有一个销毁连接的线程会每间隔timeBetweenEvictionRunsMillis执行一次DestroyTask#run方法来销毁连接,DestroyTask#run方法如下所示。

public void run() {shrink(true, keepAlive);// 如果开启了removeAbandoned机制// 则执行removeAbandoned()方法来检测发生了泄漏的连接并回收if (isRemoveAbandoned()) {removeAbandoned();}}复制代码

DestroyTask#run方法的最后会判断是否开启了removeAbandoned机制,如果开启了则会执行DruidDataSource#removeAbandoned方法来检测哪些连接发生了泄漏,并主动回收这些连接。DruidDataSource#removeAbandoned方法如下所示。

public int removeAbandoned() {int removeCount = 0;long currrentNanos = System.nanoTime();List abandonedList = new ArrayList();activeConnectionLock.lock();try {Iterator iter = activeConnections.keySet().iterator();for (; iter.hasNext(); ) {DruidPooledConnection pooledConnection = iter.next();// 运行中的连接不会被判定为发生了泄漏if (pooledConnection.isRunning()) {continue;}long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);// 判断连接借出时间是否达到连接泄漏的超时时间if (timeMillis >= removeAbandonedTimeoutMillis) {// 将发生了泄漏的连接从activeConnections中移除iter.remove();pooledConnection.setTraceEnable(false);// 将发生了泄露的连接添加到abandonedList集合中abandonedList.add(pooledConnection);}}} finally {activeConnectionLock.unlock();}if (abandonedList.size() > 0) {// 遍历abandonedList集合// 主动调用每个发生了泄漏的DruidPooledConnection的close()方法来回收连接for (DruidPooledConnection pooledConnection : abandonedList) {final ReentrantLock lock = pooledConnection.lock;lock.lock();try {if (pooledConnection.isDisable()) {continue;}} finally {lock.unlock();}JdbcUtils.close(pooledConnection);pooledConnection.abandond();removeAbandonedCount++;removeCount++;// 省略}}return removeCount;}复制代码

DruidDataSource#removeAbandoned方法中主要完成的事情就是将每个发生了泄漏的连接从activeConnections中移动到abandonedList中,然后遍历abandonedList中的每个连接并调用DruidPooledConnection#close方法,最终完成泄漏连接的回收。

总结

结合搞懂Druid之连接池初始化和搞懂Druid之连接创建和销毁,对Druid的一个基本原理进行如下总结。

Druid数据库连接池中,应用线程向连接池获取连接时,如果池中没有连接,则应用线程会在notEmpty上等待,同时Druid数据库连接池中有一个创建连接的线程,会持续的向连接池创建连接,如果连接池已满,则创建连接的线程会在empty上等待。

当有连接被生产,或者有连接被归还,会唤醒在notEmpty上等待的应用线程,同理有连接被销毁时,会唤醒在empty上等待的生产连接的线程。

Druid数据库连接池中还有一个销毁连接的线程,会每间隔timeBetweenEvictionRunsMillis的时间执行一次DestroyTask任务来销毁连接,这些被销毁的连接可以是存活时间达到最大值的连接,也可以是空闲时间达到指定值的连接。如果还开启了保活机制,那么空闲时间大于keepAliveBetweenTimeMillis的连接都会被校验一次有效性,校验不通过的连接会被销毁。

最后,Druid数据库连接池提供了removeAbandoned机制来防止连接泄漏,当开启了removeAbandoned机制时,每一个被应用线程获取的连接都会被添加到activeConnections活跃连接map中,如果这个连接在应用线程中使用完毕后没有被关闭,那么Druid数据库连接池会从activeConnections中将其识别出来并主动回收。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享