疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

java线程池源码浅析


 

java线程池源码浅析

 

  最近工作不算太忙,抽时间学习了下java线程池底层源码,废话不多说,马上“去片”!

  Executors类是java线程池的工具类,此类位于java.util.concurrent包下。在日常项目开发中,我们使用得比较多的主要有CachedThreadPool、FixedThreadPool、SingleThreadExecutor、ScheduledThreadPool这4个线程池,这些线程池底层均调用new ThreadPoolExecutor()方法来新建一个线程池。

现在简单介绍这4个池:

CachedThreadPool

CachedThreadPool默认最大线程数量为int的最大值,狭义来说已经可以说是不限制数量了。默认的空闲线程终止时间60秒。一般在项目开发中如果不明确线程池中运行任务的数量不建议使用此线程池,因为如果大量的并发任务运行时,会大量占用CPU资源,导致应用程序崩溃甚至服务器崩溃等严重问题。

FixedThreadPool

FixedThreadPool需要指定线程数量,工作任务超过线程池总线程数后会进入等待状态 ,直到已经在工作的线程完成任务后释放后再进行另一任务的工作。此线程池本人在日常开发中使用得较多,可以有效控制线程的数量,防止大量占用CPU资源,导致应用程序崩溃甚至服务器崩溃等问题的发生。

SingleThreadExecutor

SingleThreadExecutor根据命名就可以猜这是一个单线程线程池,如果任务超过一个时,则会排队等待线程池唯一的线程完成任务再运行其它任务。

ScheduledThreadPool

ScheduledThreadPool为一个延时任务线程池。(在此不作详细介绍,此线程池将作为另一篇文章单独讲解)

以上简单介绍了java常用的4个线程池,下面将深入分析线程池创建后,是如何执行任务及线程池的底层是如何实现的。

刚刚也提到了所有线程池都要使用new ThreadPoolExecutor()方法来新建,让我们先看一下JDK的源码。

复制代码
 1     public ThreadPoolExecutor(int corePoolSize,  2                               int maximumPoolSize,  3                               long keepAliveTime,  4                               TimeUnit unit,  5                               BlockingQueue<Runnable> workQueue,  6                               ThreadFactory threadFactory,  7                               RejectedExecutionHandler handler) {  8         if (corePoolSize < 0 ||  9             maximumPoolSize <= 0 || 10             maximumPoolSize < corePoolSize || 11             keepAliveTime < 0) 12             throw new IllegalArgumentException(); 13         if (workQueue == null || threadFactory == null || handler == null) 14             throw new NullPointerException(); 15         this.corePoolSize = corePoolSize; 16         this.maximumPoolSize = maximumPoolSize; 17         this.workQueue = workQueue; 18         this.keepAliveTime = unit.toNanos(keepAliveTime); 19         this.threadFactory = threadFactory; 20         this.handler = handler; 21     }
复制代码

 

先看一下需要传递的参数:

corePoolSize 线程池核心线程数,如果当前运行线程数量小于核心线程数量的话,处理新任务的时候,线程池还是会新建一个线程执行任务,直到当前运行线程数量等于核心线程数量;

maximumPoolSize: 线程池最大线程数,当运行线程数量等于线程池最大线程数时,再来新的任务会触发异常;

keepAliveTime: 空闲线程终止时间,线程执行完任务空闲时间超过指定时间后,线程将会被终止;

Unit: 空闲线程终止时间单位;

workQueue: 线程工作任务队列,线程主要使用ArrayBlockingQueue,DelayQueue,DelayedWorkQueue,LinkedBlockingDeque,PriorityBlockingQueue,SynchronousQueue,LinkedTransferQueue这些队列,这里不作详细介绍,有兴趣的童鞋可以自行谷歌学习;

threadFactory: 线程工厂;

Handler: 线程任务拒绝策略处理器,默认使用AbortPolicy策略,当任务超过最大线程时,会抛出异常。除此之外还有另外几种异常处理策略,CallerRunsPolicy使用调用者本身的线程来执行任务,DiscardPolicy丢弃当前任务,DiscardOldestPolicy丢弃最旧的任务,执行当前任务;

 

介绍完新建线程池的基本参数传递,现在到了本文最核心部分,线程池调用 execute方法时,内部究竟是如何实现的。

我们先看execute方法的源码:

复制代码
 1 public void execute(Runnable command) {  2 //先判断任务是否为空,如果为空则抛出异常;    
 3 
 4 if (command == null)  5         throw new NullPointerException();  6 
 7 //获取当前运行线程数,与线程池核心线程数作对比,如果当前运行线程数小于核心线程数,则调用addWorker方法将任务加入队列执行任务。这是一个任务加入线程池运行最基本的过程。
 8     int c = ctl.get();  9     if (workerCountOf(c) < corePoolSize) { 10         if (addWorker(command, true)) 11             return; 12         c = ctl.get(); 13     } 14     if (isRunning(c) && workQueue.offer(command)) { 15         int recheck = ctl.get(); 16         if (! isRunning(recheck) && remove(command)) 17             reject(command); 18         else if (workerCountOf(recheck) == 0) 19             addWorker(null, false); 20     } 21     else if (!addWorker(command, false)) 22         reject(command); 23 }
复制代码

 

后面一段源码我们待会再分析,先看addWorker方法做了什么,以下为源码:

复制代码
 1 private boolean addWorker(Runnable firstTask, boolean core) {  2     //首先看到这是一个无限循环
 3     retry:  4     for (;;) {  5         int c = ctl.get();  6         int rs = runStateOf(c);  7 
 8         //检查任务和队列是否为空,如果为空则返回false
 9         if (rs >= SHUTDOWN &&
10             ! (rs == SHUTDOWN &&
11                firstTask == null &&
12                ! workQueue.isEmpty())) 13             return false; 14 
15         for (;;) { 16 
17 //此处检查当前运行的任务数,参数core在此起作用,如果true则比较当前任务数是否大于等于线程池核心线程数,false则比较当前任务数是否大于等于线程池总线程数。
18             int wc = workerCountOf(c); 19             if (wc >= CAPACITY ||
20                 wc >= (core ? corePoolSize : maximumPoolSize)) 21                 return false; 22 
23 //此处使用AtomicInteger类compareAndSet方法(此方法为原子性)将当前运行线程数加1,成功加1则退出循环
24             if (compareAndIncrementWorkerCount(c)) 25                 break retry; 26 
27 //由于此循环代码段没有加锁,为防止多个线程并发引起数据问题,这里再次检查当前运行任务数
28             c = ctl.get();   29             if (runStateOf(c) != rs) 30                 continue retry; 31                     } 32     } 33 
34     boolean workerStarted = false; 35     boolean workerAdded = false; 36     Worker w = null; 37     try { 38         final ReentrantLock mainLock = this.mainLock; 39         //此处新建一个Worker,Worker继承了Runnable接口,用作执行实际任务用
40 
41 w = new Worker(firstTask); 42         final Thread t = w.thread; 43         if (t != null) { 44 
45 //此处加锁,防止并发
46             mainLock.lock(); 47             try { 48                 int c = ctl.get(); 49                 int rs = runStateOf(c); 50      //检查任务是否为空
51                 if (rs < SHUTDOWN ||
52                     (rs == SHUTDOWN && firstTask == null)) { 53 
54 //检查线程是否已经启动,如果是则抛出异常
55                     if (t.isAlive()) 56 
57                         throw new IllegalThreadStateException(); 58                     workers.add(w); 59                     int s = workers.size(); 60                     if (s > largestPoolSize) 61                         largestPoolSize = s; 62                     workerAdded = true; 63                 } 64             } finally { 65                 mainLock.unlock(); 66             } 67 
68   //最后启动线程,执行任务
69             if (workerAdded) { 70                 t.start(); 71                 workerStarted = true; 72             } 73         } 74     } finally { 75         if (! workerStarted) 76             addWorkerFailed(w); 77     } 78     return workerStarted; 79 }
复制代码

 

以上就是线程池新建一个线程,成功执行一个任务的过程。那当线程池中的线程已经达到最大数量了,线程池又会是如何工作,并触发任务拒绝策略的呢?请看以下源码:

复制代码
 1 //当前工作任务数如果大于等于核心线程数的话,将会执行以下代码
 2 
 3 if (isRunning(c) && workQueue.offer(command)) {  4         int recheck = ctl.get();  5 
 6 //如果任务不是正在运行并且能将任务从队列移除,执行reject方法,根据任务拒绝策略进行处理
 7         if (! isRunning(recheck) && remove(command))  8             reject(command);  9 
10 //如果当前运行的任务为0,传入一个空的任务
11         else if (workerCountOf(recheck) == 0) 12             addWorker(null, false); 13     } 14 
15 //如果添加任务到队列失败,执行reject方法,根据任务拒绝策略进行处理
16     else if (!addWorker(command, false)) 17 
18      reject(command); 19 }