logo头像
Snippet 博客主题

java线程池

构造方法

从ThreadPoolExecutor的构造方法入手

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

3 个最重要的参数:

corePoolSize : 核心线程数,定义了最小可以同时运行的线程数量。
maximumPoolSize : 线程池允许同时运行的最大线程数。当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,就会被存放在队列中。

常见参数:

keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
unit : keepAliveTime 参数的时间单位。
threadFactory :executor 创建新线程的时候会用到。
handler :饱和策略。

饱和策略

ThreadPoolExecutor.AbortPolicy: 拒绝请求,同时抛出RejectedExecutionException。默认为此策略。
ThreadPoolExecutor.CallerRunsPolicy: 直接在execute方法中运行新任务
ThreadPoolExecutor.DiscardOldestPolicy: 丢掉最早未执行的任务
ThreadPoolExecutor.DiscardPolicy: 直接丢掉新任务

使用示例

public class T implements Runnable {

    private String s;

    public T(String s) {
        this.s = s;
    }

    @Override
    public void run() {
        System.out.println("thread name:" + Thread.currentThread().getName() + " | " + s + " running " + LocalTime.now());
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
//        System.out.println("thread name:" + Thread.currentThread().getName() + " | " + s + " close " + LocalTime.now());
    }
}
public class Tests {
    public static void main(String[] args) throws IOException, InterruptedException {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10,
                0, TimeUnit.SECONDS,
                queue);


        for (int i = 1; i <= 7; i++) {
            executor.execute(new T("" + i));
        }

        System.out.println("activeCount = " + executor.getActiveCount());
        TimeUnit.SECONDS.sleep(31);
        System.out.println(executor.getActiveCount());
        executor.shutdown();
        System.exit(1);
    }
}

当前线程池配置:
corePoolSize = 2
maximumPoolSize = 10
workQueue 大小 =5

  1. 当启动7个线程的时候:

    activeCount = 2
    thread name:pool-1-thread-2 | 2 running 17:27:35.362
    thread name:pool-1-thread-1 | 1 running 17:27:35.362
    thread name:pool-1-thread-2 | 4 running 17:27:45.362
    thread name:pool-1-thread-1 | 3 running 17:27:45.362
    thread name:pool-1-thread-2 | 5 running 17:27:55.374
    thread name:pool-1-thread-1 | 6 running 17:27:55.374
    

    可以看出,线程池每次会同时执行2个任务,并没有按最大线程数10来执行,原因是什么呢?

  2. 我们把线程数调到8个再看下:

    activeCount = 3
    thread name:pool-1-thread-3 | 8 running 17:28:44.389
    thread name:pool-1-thread-1 | 1 running 17:28:44.389
    thread name:pool-1-thread-2 | 2 running 17:28:44.389
    thread name:pool-1-thread-1 | 3 running 17:28:54.392
    thread name:pool-1-thread-3 | 4 running 17:28:54.392
    thread name:pool-1-thread-2 | 5 running 17:28:54.392
    

    这个就和workQueue有关系了,当workQueue没有满的时候,线程池的最大线程数=corePoolSize,当workQueue被提交满的时候,maximumPoolSize就生效了,最大线程数=maximumPoolSize。
    当前情况下,第7个任务提交后,线程池活跃线程=corePoolSize=2,队列为5,刚好填满队列,当第8个任务提交的时候,线程池就开辟一个新线程,来执行任务8了。

  3. 再把线程数调到16个再看下:

    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.wekri.abroad.T@2077d4de rejected from java.util.concurrent.ThreadPoolExecutor@7591083d[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
     at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
     at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
     at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
     at com.wekri.abroad.Tests.main(Tests.java:22)
    thread name:pool-1-thread-3 | 8 running 17:32:59.186
    thread name:pool-1-thread-10 | 15 running 17:32:59.186
    thread name:pool-1-thread-8 | 13 running 17:32:59.187
    thread name:pool-1-thread-6 | 11 running 17:32:59.188
    thread name:pool-1-thread-2 | 2 running 17:32:59.187
    thread name:pool-1-thread-1 | 1 running 17:32:59.188
    thread name:pool-1-thread-9 | 14 running 17:32:59.189
    thread name:pool-1-thread-5 | 10 running 17:32:59.186
    thread name:pool-1-thread-4 | 9 running 17:32:59.189
    thread name:pool-1-thread-7 | 12 running 17:32:59.187
    

    当前情况下,第15个任务提交的后,线程池活跃线程=maximumPoolSize=10,队列为5,刚好填满队列,当第16个任务提交的时候,就按照饱和策略处理第16个任务了。
    总结:当线程池中的活跃线程到达maximumPoolSize的时候,并且workQueue满了,就开始使用饱和策略了。

分析

我们看下源码中的execute方法:

/**
* 在将来的某个时间执行给定任务。任务会在新的线程中执行,或者在已经存在于线程池的线程执行。
* 
* 如果任务不能正常提交,是因为executor已经shutdown,或者已达到其容量,任务会被当前的RejectedExecutionHandler处理。
*             
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
*         {@code RejectedExecutionHandler}, if the task
*         cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
    //任务为空就抛出异常
    if (command == null)
        throw new NullPointerException();
    //按一下3步执行:
    int c = ctl.get();
    //1. 如果运行线程数量小于corePoolSize,
    if (workerCountOf(c) < corePoolSize) {
        //添加一个新的线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2. 如果新的任务可以被添加到队列(队列没有满)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3. 如果队列满了,创建一个新的线程
    else if (!addWorker(command, false))
        // 走饱和逻辑
        reject(command);
}

通过下图可以更好的对上面这 3 步做一个展示(来源:互联网)

生命周期

状态 描述
RUNNING 接受新任务并处理排队的任务
SHUTDOWN 不接受新任务,但是处理排队的任务
STOP 不接受新任务,不处理排队的任务,并中断正在进行的任务
TIDYING 所有任务已终止,workerCount为零,线程过渡到状态TIDYING将运行terminated()方法
TERMINATED terminated() 方法执行完成
微信打赏

赞赏是不耍流氓的鼓励