因开发需要,我们大数据组,基于Apache Common-Pool,对 InfluxDB的封装了一个连接池,现就主要逻辑涉及到的源代码(与Apache Common-Pool交叉)进行梳理。

我们将从以下四个方面进行梳理:

  • 1、连接池初始化的操作:
  • 2、出借连接对象时的逻辑。
  • 3、归还连接对象时的逻辑。
  • 4、主要的异常信息 列举(与Apache Common-Pool交叉),

1、连接池初始化的操作。

创建连接池对象时,由代码可见:

就是把各种配置信息初始化到连接池对象中,但是并不创建任何连接对象。
只是在获取连接的时候,才会做判断。

/**
     * Create a new <code>GenericObjectPool</code> using a specific
     * configuration.
     *
     * @param factory   The object factory to be used to create object instances
     *                  used by this pool
     * @param config    The configuration to use for this pool instance. The
     *                  configuration is used by value. Subsequent changes to
     *                  the configuration object will not be reflected in the
     *                  pool.
     */
    public GenericObjectPool(final PooledObjectFactory<T> factory,
            final GenericObjectPoolConfig config) {

        super(config, ONAME_BASE, config.getJmxNamePrefix());

        if (factory == null) {
            jmxUnregister(); // tidy up
            throw new IllegalArgumentException("factory may not be null");
        }
        this.factory = factory;

        idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());

        setConfig(config);

        startEvictor(getTimeBetweenEvictionRunsMillis());
    }

2、出借连接对象时的逻辑。

2.1 出借连接对象,主要是 下面 getResource 方法。

  public T getResource() {
    try {
      return internalPool.borrowObject();   //获取连接对象,
    } catch (NoSuchElementException nse) {
      if (null == nse.getCause()) { // The exception was caused by an exhausted pool
        throw new PydInfluxDBExhaustedPoolException(
            "Could not get a resource since the pool is exhausted", nse);
      }
      // Otherwise, the exception was caused by the implemented activateObject() or ValidateObject()
      throw new PydInfluxDBException("Could not get a resource from the pool", nse);
    } catch (Exception e) {
      throw new PydInfluxDBConnectionException("Could not get a resource from the pool", e);
    }
  }

2.2 然后再调用borrowObject这个方法

出借连接对象的逻辑,主要都在下面GenericObjectPool 类的这个方法中(见代码行注释):

 public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
     assertOpen();

     final AbandonedConfig ac = this.abandonedConfig;
     if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
             (getNumIdle() < 2) &&
             (getNumActive() > getMaxTotal() - 3) ) {
         removeAbandoned(ac);
     }

     PooledObject<T> p = null;

     // Get local copy of current config so it is consistent for entire
     // method execution
     final boolean blockWhenExhausted = getBlockWhenExhausted();

     boolean create;
     final long waitTime = System.currentTimeMillis();

     while (p == null) {
         create = false;
         p = idleObjects.pollFirst();  // 1、先 从 队列中获取连接对象
         if (p == null) {  
             p = create();   // 2、如果没有从队列中获取到,则创建对象
             if (p != null) {
                 create = true;
             }
         }
         if (blockWhenExhausted) {  // 3、如果无法创建(可能连接被耗尽),则要进入耗尽等待阻塞状态。
             if (p == null) {
                 if (borrowMaxWaitMillis < 0) {
                     p = idleObjects.takeFirst();
                 } else {
                     p = idleObjects.pollFirst(borrowMaxWaitMillis,TimeUnit.MILLISECONDS);  // 4、基于等待时间 等待。
                 }
             }
             if (p == null) {
                 throw new NoSuchElementException(
                         "Timeout waiting for idle object");
             }
         } else {
             if (p == null) {
                 throw new NoSuchElementException("Pool exhausted");
             }
         }
         if (!p.allocate()) {  //5、 获取成功后,将连接对象 从 Idle 状态,修改为 allocate 状态。
             p = null;
         }

         if (p != null) {
             try {
                 factory.activateObject(p);  // 6、获取成功后,可以 开始做激活操作
             } catch (final Exception e) {
                 try {
                     destroy(p);
                 } catch (final Exception e1) {
                     // Ignore - activation failure is more important
                 }
                 p = null;
                 if (create) {
                     final NoSuchElementException nsee = new NoSuchElementException(
                             "Unable to activate object");
                     nsee.initCause(e);
                     throw nsee;
                 }
             }
             if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                 boolean validate = false;
                 Throwable validationThrowable = null;
                 try {
                     validate = factory.validateObject(p);   // 7、获取成功后,可以 开始做  测试校验 操作
                 } catch (final Throwable t) {
                     PoolUtils.checkRethrow(t);
                     validationThrowable = t;
                 }
                 if (!validate) {
                     try {
                         destroy(p);
                         destroyedByBorrowValidationCount.incrementAndGet();
                     } catch (final Exception e) {
                         // Ignore - validation failure is more important
                     }
                     p = null;
                     if (create) {
                         final NoSuchElementException nsee = new NoSuchElementException(
                                 "Unable to validate object");
                         nsee.initCause(validationThrowable);
                         throw nsee;
                     }
                 }
             }
         }
     }

     updateStatsBorrow(p, System.currentTimeMillis() - waitTime);

     return p.getObject();   // 8、最终 返回 要 借出的 连接对象。
 }

3、有关借出连接的几个细节

3.1首先会调用工厂类,创建连接对象。

org.apache.commons.pool2.impl.GenericObjectPool

try {
           p = factory.makeObject();
       } catch (final Exception e) {
           createCount.decrementAndGet();
           throw e;
} finally {

3.2 自定义实现了接口的工厂类,创建了连接对象,会被装包到默认池对象类中。

PydInfluxDBFactory
   @Override
   public PooledObject<PydInfluxDB> makeObject() throws Exception {
       PydInfluxDB pydInfluxDB =new PydInfluxDBImpl(this.url, this.username, this.password, new OkHttpClient.Builder());
       //可以在此处做一些判断、校验类的操作。
       return new DefaultPooledObject<>(pydInfluxDB);
   }

3.3 默认的池对象,会缺省设置几个描述连接对象状态的属性值。

public class DefaultPooledObject<T> implements PooledObject<T> {

   private final T object;
   private PooledObjectState state = PooledObjectState.IDLE; // @GuardedBy("this") to ensure transitions are valid
   private final long createTime = System.currentTimeMillis();
   private volatile long lastBorrowTime = createTime;
   private volatile long lastUseTime = createTime;
   private volatile long lastReturnTime = createTime;
   private volatile boolean logAbandoned = false;
   private final CallStack borrowedBy = CallStackUtils.newCallStack("'Pooled object created' " +
       "yyyy-MM-dd HH:mm:ss Z 'by the following code has not been returned to the pool:'", true);
   private final CallStack usedBy = CallStackUtils.newCallStack("The last code to use this object was:",
       false);
   private volatile long borrowedCount = 0;

   /**
    * Create a new instance that wraps the provided object so that the pool can
    * track the state of the pooled object.
    *
    * @param object The object to wrap
    */
   public DefaultPooledObject(final T object) {
       this.object = object;
   }

重点 如 private PooledObjectState state = PooledObjectState.IDLE;
这一句,池对象的状态。是一个枚举类,有多个对象的状态取值。重点的是下面这两个。

public enum PooledObjectState {
   /**
    * In the queue, not in use.
    */
   IDLE,  // 此种状态的连接对象才会被借出。且 借出后修改状态为ALLOCATED。

   /**
    * In use.
    */
   ALLOCATED,     // 此种状态的连接对象才会被归还。且 归还后修改状态为 IDLE  。
}

3.4 借出连接时,会修改的标识信息。

在 章节1 中, 主要逻辑代码中,有如下 代码语句:p.allocate() 方法的内容和逻辑如下:

首先,判断连接对象状态是否为Idle;
然后,会修改 其标识信息为Alllcate

   /**
    * Allocates the object.
    *
    * @return {@code true} if the original state was {@link PooledObjectState#IDLE IDLE}
    */
   @Override
   public synchronized boolean allocate() {
       if (state == PooledObjectState.IDLE) {
           state = PooledObjectState.ALLOCATED;
           lastBorrowTime = System.currentTimeMillis();
           lastUseTime = lastBorrowTime;
           borrowedCount++;
           if (logAbandoned) {
               borrowedBy.fillInStackTrace();
           }
           return true;
       } else if (state == PooledObjectState.EVICTION) {
           // TODO Allocate anyway and ignore eviction test
           state = PooledObjectState.EVICTION_RETURN_TO_HEAD;
           return false;
       }
       // TODO if validating and testOnBorrow == true then pre-allocate for
       // performance
       return false;
   }

3.5 分析:当借连接的等待时间 超时,或者 未设置时,执行的逻辑。

           if (blockWhenExhausted) {  //连接池资源耗尽时,会等待、阻塞。
               if (p == null) {
                   if (borrowMaxWaitMillis < 0) {
                       p = idleObjects.takeFirst();  // 不设置等待超时,则直接获取
                   } else {    // 设置了等待超时,会进行判断。
                       p = idleObjects.pollFirst(borrowMaxWaitMillis,TimeUnit.MILLISECONDS);
                   }
               }
               if (p == null) {  // 等待超时后,仍旧没有获取连接,则会抛出下面的异常信息。
                   throw new NoSuchElementException("Timeout waiting for idle object");
               }
           } else {
               if (p == null) {  
                   throw new NoSuchElementException("Pool exhausted");
               }
           }

4、归还连接对象时的逻辑。

有关 归还连接的 逻辑代码, 都在主要都在下面GenericObjectPool 类 的这个方法中:
详见 代码 行注释。

@Override
   public void returnObject(final T obj) {
       final PooledObject<T> p = allObjects.get(new IdentityWrapper<T>(obj));

       if (p == null) {
           if (!isAbandonedConfig()) {
               throw new IllegalStateException(
                       "Returned object not currently part of this pool");
           }
           return; // Object was abandoned and removed
       }

       synchronized(p) {
           final PooledObjectState state = p.getState();
           if (state != PooledObjectState.ALLOCATED) {
               throw new IllegalStateException(
                       "Object has already been returned to this pool or is invalid");
           }
           p.markReturning(); // Keep from being marked abandoned    // 1、此句代码修改了连接的状态,下面会进一步分析内部代码。

       }

       final long activeTime = p.getActiveTimeMillis();

       if (getTestOnReturn()) {  // 2、归还时,可以先测试一下。需要配置。
           if (!factory.validateObject(p)) {
               try {
                   destroy(p);
               } catch (final Exception e) {
                   swallowException(e);
               }
               try {
                   ensureIdle(1, false);
               } catch (final Exception e) {
                   swallowException(e);
               }
               updateStatsReturn(activeTime);
               return;
           }
       }

       try {
           factory.passivateObject(p);
       } catch (final Exception e1) {
           swallowException(e1);
           try {
               destroy(p);
           } catch (final Exception e) {
               swallowException(e);
           }
           try {
               ensureIdle(1, false);
           } catch (final Exception e) {
               swallowException(e);
           }
           updateStatsReturn(activeTime);
           return;
       }

       if (!p.deallocate()) {   // 3、此句代码修改了连接的状态,下面会进一步分析内部代码。
           throw new IllegalStateException(
                   "Object has already been returned to this pool or is invalid");
       }

       final int maxIdleSave = getMaxIdle();
       if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
           try {
               destroy(p);
           } catch (final Exception e) {
               swallowException(e);
           }
       } else {
           if (getLifo()) {
               idleObjects.addFirst(p);
           } else {
               idleObjects.addLast(p);
           }
           if (isClosed()) {
               // Pool closed while object was being added to idle objects.
               // Make sure the returned object is destroyed rather than left
               // in the idle object pool (which would effectively be a leak)
               clear();
           }
       }
       updateStatsReturn(activeTime);
   }

5、有关归还连接的细节

在上面的“归还连接”代码中,有一句:p.deallocate() ,这是关键代码,会更新连接的状态:

   /**
    * Deallocates the object and sets it {@link PooledObjectState#IDLE IDLE}
    * if it is currently {@link PooledObjectState#ALLOCATED ALLOCATED}.
    *
    * @return {@code true} if the state was {@link PooledObjectState#ALLOCATED ALLOCATED}
    */
   @Override
   public synchronized boolean deallocate() {
       if (state == PooledObjectState.ALLOCATED ||  state == PooledObjectState.RETURNING) {
           state = PooledObjectState.IDLE;   // 此句将连接的状态,由ALLOCATED修改为IDLE。
           lastReturnTime = System.currentTimeMillis();
           borrowedBy.clear();
           return true;
       }

       return false;
   }

6、主要的异常信息 列举

下面列举出,几处重要的异常信息,便于在日志中搜索。

6.1 借出连接,设置激活失败时,报出的异常信息。

               try {
                   factory.activateObject(p);
               } catch (final Exception e) {
                   try {
                       destroy(p);
                   } catch (final Exception e1) {
                       // Ignore - activation failure is more important
                   }
                   p = null;
                   if (create) {
                       final NoSuchElementException nsee = new NoSuchElementException(
                               "Unable to activate object");
                       nsee.initCause(e);
                       throw nsee;
                   }
               }

6.2 借出连接,进行校验操作失败时,报出的异常信息。

if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                   boolean validate = false;
                   Throwable validationThrowable = null;
                   try {
                       validate = factory.validateObject(p);
                   } catch (final Throwable t) {
                       PoolUtils.checkRethrow(t);
                       validationThrowable = t;
                   }
                   if (!validate) {
                       try {
                           destroy(p);
                           destroyedByBorrowValidationCount.incrementAndGet();
                       } catch (final Exception e) {
                           // Ignore - validation failure is more important
                       }
                       p = null;
                       if (create) {
                           final NoSuchElementException nsee = new NoSuchElementException(
                                   "Unable to validate object");
                           nsee.initCause(validationThrowable);
                           throw nsee;
                       }
                   }
               }

6.3 获取资源失败时,可能会报出的异常信息。

 public T getResource() {
   try {
     return internalPool.borrowObject();
   } catch (NoSuchElementException nse) {
     if (null == nse.getCause()) { // The exception was caused by an exhausted pool
       throw new PydInfluxDBExhaustedPoolException(
           "Could not get a resource since the pool is exhausted", nse);
     }
     // Otherwise, the exception was caused by the implemented activateObject() or ValidateObject()
     throw new PydInfluxDBException("Could not get a resource from the pool", nse);
   } catch (Exception e) {
     throw new PydInfluxDBConnectionException("Could not get a resource from the pool", e);
   }
 }

6.4 当借连接的等待时间 超时,或者 未设置时,执行的逻辑。

           if (blockWhenExhausted) {
               if (p == null) {
                   if (borrowMaxWaitMillis < 0) {
                       p = idleObjects.takeFirst();
                   } else {
                       p = idleObjects.pollFirst(borrowMaxWaitMillis,
                               TimeUnit.MILLISECONDS);
                   }
               }
               if (p == null) {
                   throw new NoSuchElementException(
                           "Timeout waiting for idle object");
               }
           } else {
               if (p == null) {
                   throw new NoSuchElementException("Pool exhausted");
               }
           }

以上就是 对于InfluxDBPool 开发代码 重要逻辑的梳理,大部分的代码都是 Apache Common-Pool 中的源码,大家可以参见。
谢谢!

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐