1 Dubbo的线程池概述

这里将要讲述的线程池指Dubbo服务端使用某些线程模型(如 all 模型)时用到的业务线程池。ThreadPool 是一个扩展接口SPI。

@SPI(value = "fixed", scope = ExtensionScope.FRAMEWORK)
public interface ThreadPool {

    /**
     * Thread pool
     *
     * @param url URL contains thread parameter
     * @return thread pool
     */
    @Adaptive({THREADPOOL_KEY})
    Executor getExecutor(URL url);

}

Dubbo提供了一些基于JDK的标准ThreadPoolExecutor的接口实现(详解见 阐述ThreadPoolExecutor),具体如下。

  • FixedThreadPool:一开始就创建一个线程数固定的线程池,且空闲线程不会被回收
  • LimitedThreadPool:线程池中的线程个数随着需要量动态增加,但是数量不超过配置的阈值。另外空闲线程不会被回收
  • EagerThreadPool:线程池中的线程个数随着需要量动态增加,默认情况下,数量不超过Integer的最大值(约21.47亿)。另外默认情况下当线程空闲1min时,线程会被回收
  • CachedThreadPool:线程池中的线程个数随着需要量动态增加,默认情况下,数量不超过Integer的最大值。另外默认情况下当线程空闲1min时,线程会被回收。和 EagerThreadPool 类似。

Dubbo默认使用线程数固定的线程池(FixedThreadPool)。默认配置下,对于上述线程池来说,即使线程池中无空闲线程且达到最大线程数,请求也不会被放到线程池的任务队列中

2 源码分析

2.1 FixedThreadPool

一开始就创建一个线程数固定的线程池,且空闲线程不会被回收。源码如下所示。

public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // 线程名称的前缀,默认值为"Dubbo"
        String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
        // 线程的数量,默认为200
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        // 线程池阻塞队列的大小,默认值为0
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);

        BlockingQueue<Runnable> blockingQueue;

        if (queues == 0) {
            blockingQueue = new SynchronousQueue<>();
        } else if (queues < 0) {
            blockingQueue = new MemorySafeLinkedBlockingQueue<>();
        } else {
            blockingQueue = new LinkedBlockingQueue<>(queues);
        }
        
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, blockingQueue,
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

由上可知,核心线程数和最大线程数相等,即一开始就创建了一个固定大小的线程池,且线程即使空闲时也不会被回收。

默认情况下,使用的任务队列为 SynchronousQueue,即提交的任务不会被真实的保存在队列中,而是将新任务提交给线程执行(各种任务队列的详解见 阐述线程池中的任务队列)。

另外使用的拒绝策略为 AbortPolicyWithReport,其继承了JDK的ThreadPoolExecutor.AbortPolicy,其作用是当线程池中的线程处于忙碌状态且线程池队列已满时,新来的任务不会执行,并抛出RejectedExecutionException异常。具体如下所示。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    String msg = String.format("Thread pool is EXHAUSTED!" +
            " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d)," +
            " Task: %d (completed: %d)," +
            " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
        threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
        e.getLargestPoolSize(),
        e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
        url.getProtocol(), url.getIp(), url.getPort());

    // 0-1 - Thread pool is EXHAUSTED!
    logger.warn(COMMON_THREAD_POOL_EXHAUSTED, "too much client requesting provider", "", msg);

    // 获取当前 JVM 进程的线程堆栈跟踪信息
    // 依赖 java.lang.management.ThreadMXBean 访问Java虚拟机线程信息
    if (Boolean.parseBoolean(url.getParameter(DUMP_ENABLE, "true"))) {
        dumpJStack();
    }

    dispatchThreadPoolExhaustedEvent(msg);

    throw new RejectedExecutionException(msg);
}

2.2 LimitedThreadPool

线程池中的线程个数随着需要量动态增加,但是数量不超过配置的阈值。另外空闲线程不会被回收,会一直存在。源码如下所示。

/**
 * Creates a thread pool that creates new threads as needed until limits reaches. This thread pool will not shrink
 * automatically.
 */
public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
        // 默认为0
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        // 默认为200
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        // 默认值为0
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
            queues == 0 ? new SynchronousQueue<Runnable>() :
                (queues < 0 ? new MemorySafeLinkedBlockingQueue<Runnable>()
                    : new LinkedBlockingQueue<Runnable>(queues)),
            new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

由上可知,空闲线程的存活时间为Long的最大值,因此线程一旦创建将不会被回收。

2.3 EagerThreadPool

线程池中的线程个数随着需要量动态增加,默认情况下,数量不超过Integer的最大值(约21.47亿)。另外默认情况下当线程空闲1min时,线程会被回收。源码如下所示。

/**
 * EagerThreadPool
 * When the core threads are all in busy,
 * create new thread instead of putting task into blocking queue.
 */
public class EagerThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
        // 默认为0
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        // 默认为0
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        // 默认为60000
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

        // init queue and executor
        // TaskQueue 继承了 LinkedBlockingQueue,且默认的队列容量为1
        TaskQueue<Runnable> taskQueue = new TaskQueue<>(queues <= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedInternalThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }
}

由上可知,默认情况下,最大线程数为Integer的最大值(约21.47亿)。

因为创建的任务队列的容量为1,因此任务基本不会被放在任务队列中,当所有线程都忙碌时,将创建新线程来执行新任务。

另外当线程空闲1min时,线程会被回收。

2.4 CachedThreadPool

线程池中的线程个数随着需要量动态增加,默认情况下,数量不超过Integer的最大值。另外默认情况下当线程空闲1min时,线程会被回收。和 EagerThreadPool 类似。源码如下所示。

/**
 * This thread pool is self-tuned. Thread will be recycled after idle for one minute, and new thread will be created for
 * the upcoming request.
 *
 * @see java.util.concurrent.Executors#newCachedThreadPool()
 */
public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
        // 默认值为0
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        // 默认值为0
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        // 默认值为60000
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new MemorySafeLinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

3 线程池的确定时机

线程模型中使用的线程池SPI扩展在什么时候被加载的呢?下面以AllDispatcher为例介绍线程池的确定时机。以 connected 方法为例,获取业务线程的方法为 getSharedExecutorService,最终通过 DefaultExecutorRepository#createExecutor() 方法创建指定的线程池。

即服务提供端在有请求连接时,将创建线程池,之后的请求连接将直接使用此线程池,不再创建新的线程池。具体源码如下所示。

// 连接完成事件,交给业务线程池处理
@Override
public void connected(Channel channel) throws RemotingException {
    // 获取业务线程池
    ExecutorService executor = getSharedExecutorService();
    try {
        // 执行连接事件
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
    }
}


/**
 * get the shared executor for current Server or Client
 *
 * @return
 */
public ExecutorService getSharedExecutorService() {
    // Application may be destroyed before channel disconnected, avoid create new application model
    // see https://github.com/apache/dubbo/issues/9127
    if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {
        return GlobalResourcesRepository.getGlobalExecutorService();
    }

    // note: url.getOrDefaultApplicationModel() may create new application model
    ApplicationModel applicationModel = url.getOrDefaultApplicationModel();

    ExecutorRepository executorRepository = ExecutorRepository.getInstance(applicationModel);

    ExecutorService executor = executorRepository.getExecutor(url);

    if (executor == null) {
        // 获取或创建线程池
        executor = executorRepository.createExecutorIfAbsent(url);
    }

    return executor;
}


/**
 * Get called when the server or client instance initiating.
 *
 * @param url
 * @return
 */
@Override
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
    String executorKey = getExecutorKey(url);
    ConcurrentMap<String, ExecutorService> executors = ConcurrentHashMapUtils.computeIfAbsent(data, executorKey, k -> new ConcurrentHashMap<>());

    String executorCacheKey = getExecutorSecondKey(url);

    url = setThreadNameIfAbsent(url, executorCacheKey);

    URL finalUrl = url;
    ExecutorService executor = ConcurrentHashMapUtils.computeIfAbsent(executors, executorCacheKey, k -> createExecutor(finalUrl));
    // If executor has been shut down, create a new one
    if (executor.isShutdown() || executor.isTerminated()) {
        executors.remove(executorCacheKey);
        // 创建线程池
        executor = createExecutor(url);
        executors.put(executorCacheKey, executor);
    }
    dataStore.put(executorKey, executorCacheKey, executor);
    return executor;
}


// 创建指定的线程池
protected ExecutorService createExecutor(URL url) {
    return (ExecutorService) extensionAccessor.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}

4 自定义线程池策略

4.1 创建自定义的线程池类

创建自定义的线程池类,实现扩展接口 ThreadPool(org.apache.dubbo.common.threadpool.ThreadPool)。

public class MyThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // 线程名称的前缀,默认值为"Dubbo"
        String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
        // 线程的数量,默认为200
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);

        BlockingQueue<Runnable> blockingQueue = new SynchronousQueue<>();
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, blockingQueue,
            new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

4.2 配置

在 resources 目录下, 添加 META-INF/dubbo 目录, 继而添加 org.apache.dubbo.common.threadpool.ThreadPool 文件。并将自定义的线程池类配置到该文件中。

myThreadPool=org.apache.dubbo.common.threadpool.support.MyThreadPool

扩展:Dubbo自带的配置如下所示。

fixed=org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=org.apache.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=org.apache.dubbo.common.threadpool.support.limited.LimitedThreadPool
eager=org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool

4.3 使用

在服务提供端指定使用的线程池以及最大线程数量等信息。举例如下。

<dubbo:protocol name="dubbo" threadpool="myThreadPool" threads="100"/>

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐