构造方法参数 1.构造方法就不在此赘述,重点关注构造方法种的参数。
参数名
作用
corePoolSize
核心线程池大小
maximumPoolSize
最大线程池大小
keepAliveTime
线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnit
keepAliveTime时间单位
workQueue
阻塞任务队列
threadFactory
新建线程工厂
RejectedExecutionHandler
当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
2.重点理解: corePoolSize , maximumPoolSize ,workQueue这三者之间的联系
a.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线 程。 b.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 c.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务 d.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理 e.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程 f.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
核心变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITSprivate final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();private final HashSet<Worker> workers = new HashSet<Worker>();private volatile boolean allowCoreThreadTimeOut;private int largestPoolSize;private long completedTaskCount
状态
RUNNING
运行态,可处理新任务并执行队列中的任务
SHUTDOW
关闭态,不接受新任务,但处理队列中的任务
STOP
停止态,不接受新任务,不处理队列中任务,且打断运行中任务
TIDYING
整理态,所有任务已经结束,workerCount = 0,将执行terminated()方法
TERMINATED
结束态,terminated() 方法已完成
状态转换:
RUNNING -> SHUTDOWN :手动调用shutdown方法,或者ThreadPoolExecutor要被GC回收的时候调用finalize方法,finalize方法内部也会调用shutdown方法
(RUNNING or SHUTDOWN) -> STOP :调用shutdownNow方法
SHUTDOWN -> TIDYING :当队列和线程池都为空的时候
STOP -> TIDYING :当线程池为空的时候
TIDYING -> TERMINATED :terminated方法调用完成之后
ThreadPoolExecutor演示状态图
摘自:https://www.jianshu.com/p/f8a73cb0983a
核心方法 runStateOf(int c) 计算线程池当前的状态 1 2 3 4 5 6 7 8 9 10 private static int runStateOf (int c) { return c & ~CAPACITY; }
workerCountOf(int c) 计算线程数量 1 2 3 4 5 6 7 8 private static int workerCountOf (int c) { return c & CAPACITY; }
ctlOf(int rs, int wc) 1 2 3 4 5 6 7 private static int workerCountOf (int c) { return c & CAPACITY; }
execute(Runnable command) execute 是开启ThreadPoolExecutor工作的方法。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
其实从以上execute 可以看出,再执行该方法时,会根据线程池的状态等进行不用的操作 a,直接创建线程池 b,添加到阻塞队列 c. 基于maximumPoolSize创建work
addWorker(Runnable firstTask, boolean core) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false ; for (; ; ) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; ThreadPoolExecutor.Worker w = null ; try { w = new ThreadPoolExecutor.Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
在 addWork方法种,我们可以看到,ThreadPoolExecutor为了更高效的运行,只在必要的地方加锁,而其他操作采用CAS 操作来解决并发问题,关于CAS操作,请浏览本文最后一节。
Worker类 程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);
来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中,这时就不能中断该线程。
如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
之所以设置为不可重入,是因为不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。下面是setCorePoolSize方法,interruptIdleWorkers请接着往下看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void setCorePoolSize (int corePoolSize) { if (corePoolSize < 0 ) throw new IllegalArgumentException(); int delta = corePoolSize - this .corePoolSize; this .corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0 ) { int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null , true )) { if (workQueue.isEmpty()) break ; } } }
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
在构造方法中setState(-1);
,把state变量设置为-1,为什么这么做呢?是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:
1 2 3 4 5 6 7 8 9 10 public boolean tryLock () { return tryAcquire(1 ); } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ;
runWork () 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 final void runWorker (ThreadPoolExecutorBak.Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
getTask() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private Runnable getTask () { boolean timedOut = false ; for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
getActiveCount() 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public int getActiveCount () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int n = 0 ; for (ThreadPoolExecutorBak.Worker w : workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } } public boolean isLocked () { return isHeldExclusively(); } protected boolean isHeldExclusively () { return getState() != 0 ; public void unlock () { release(1 ); } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
之前一直在疑惑 ThreadPoolExecutor.getActiveCount()为什么不准确,在work开始运行时,会执行一次unlock,也就是说会将 state 设置为0 ,而线程还未开始运行,所以当调用w.isLocked 时,只是判断 state != 0 ,那这是 isHeldExclusively 肯定返回false ,而 getActiveCount的值就不是最新,最准确的值。
shutdown() 与 shutdownNow() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } private void advanceRunState (int targetState) { for (; ; ) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break ; } } public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (ThreadPoolExecutorBak.Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } private List<Runnable> drainQueue () { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0 ])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
上面程序的解释,可以明显看出shutdownNow 和 shutdown的区别。shutdown只会将线程池状态修改,也会中断线程,但是会的等所有正在运行的线程运行完成后中断但;shutdownNow 会直接结束正在运行的线程和清除队列中的线程。中断方法接着往下看。
tryTerminate() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 final void tryTerminate () { for (; ; ) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
tryTerminate这个方法会在多处调用,比如processWorkerExit、addWorkerFailed、shutdown、shutdownNow、remove、purge 中调用,为什么会多处调用? 在这个方法中调用了 interruptIdleWorkers方法,请接着往下看。
interruptIdleWorkers(boolean onlyOne) 和 interruptIdleWorkers() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void interruptIdleWorkers () { interruptIdleWorkers(false ); } private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (ThreadPoolExecutorBak.Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
processWorkerExit(ThreadPoolExecutorBak.Worker w, boolean completedAbruptly) 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private void processWorkerExit (ThreadPoolExecutorBak.Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
processWorkerExit方法是在runWork方法中的 finally 中调用,也就是说,执行线程不管结果如何都会调用该方法,该方法就是统计线程执行完成数量、维护works、尝试终止线程池、以及如果线程执行异常,会创建工作线程去执行队列中的用户自定义线程,其实也可以认为是“收尾工作”
为什么在shutdown 和 shutdownNow中都会采用不同的方式中断线程,因为在这两个方法中,已经将线程池的状态改SHUTDOWN,而这时如果workQueue为空,那肯定有线程在getTask方法中阻塞,而用户已经无法提交线程到线程池,这时线程池就无法关闭。
这也可以解释shutdown,shutdownNow 的不同,因为 shutdownNow 不会关心线程处于什么状态就直接中断,而shutdown会等所有线程执行完成而再进行中断。
附录 1. CAS 锁机制存在以下问题:
(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。
(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。
(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。
独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止
乐观锁用的机制就是CAS ,Compare and Swap
CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
在Java中可以使用锁和循环CAS来进行原子操作,自旋CAS的基本思路就是循环进行CAS操作,直到成功为止。在JDK1.5之后,JDK的并发包里提供了一些类来支持原子操作,如AtomicBoolean,AtomicInteger,AtomicLong都是用原子的方式来更新指定类型的值。例如AtomicInteger的用法如下:
1 2 3 AtomicInteger atomicInteger=new AtomicInteger(0 ); int i=atomicInteger.get();atomicInteger.compareAndSet(i,i++);
CAS还存在以下三个问题:
(1)ABA问题。如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时发现它的值没有发生变 化,但实际上却发生了。ABA的解决思路就是使用版本号,在变量前面追加版本号,那么A——B——A 就变成了1A——2B——3A。 (2)循环时间长开销大
(3)只能保证一个共性变量的原子操作。也就是多个共享变量操作时,循环CAS就无法保证操作的原子性了。但从JDK1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。