构造方法参数 1.构造方法就不在此赘述,重点关注构造方法种的参数。
 
参数名 
作用 
 
 
corePoolSize 
核心线程池大小 
 
maximumPoolSize 
最大线程池大小 
 
keepAliveTime 
线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间 
 
TimeUnit 
keepAliveTime时间单位 
 
workQueue 
阻塞任务队列 
 
threadFactory 
新建线程工厂 
 
RejectedExecutionHandler 
当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理 
 
2.重点理解:  corePoolSize  ,  maximumPoolSize ,workQueue这三者之间的联系
a.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线 程。     b.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 c.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务 d.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理 e.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程 f.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭 
核心变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private  final  AtomicInteger ctl = new  AtomicInteger(ctlOf(RUNNING,0 ));       private  static  final  int  COUNT_BITS = Integer.SIZE - 3 ;      private  static  final  int  CAPACITY = (1  << COUNT_BITS) - 1 ;      private  static  final  int  RUNNING = -1  << COUNT_BITS;private  static  final  int  SHUTDOWN =  0  << COUNT_BITS;private  static  final  int  STOP   =  1  << COUNT_BITS;private  static  final  int  TIDYING  =  2  << COUNT_BITS;private  static  final  int  TERMINATED =  3  << COUNT_BITSprivate  final  BlockingQueue<Runnable> workQueue;private  final  ReentrantLock mainLock = new  ReentrantLock();private  final  HashSet<Worker> workers = new  HashSet<Worker>();private  volatile  boolean  allowCoreThreadTimeOut;private  int  largestPoolSize;private  long  completedTaskCount
 
状态 
 
 
 
RUNNING 
运行态,可处理新任务并执行队列中的任务 
 
SHUTDOW 
关闭态,不接受新任务,但处理队列中的任务 
 
STOP 
停止态,不接受新任务,不处理队列中任务,且打断运行中任务 
 
TIDYING 
整理态,所有任务已经结束,workerCount = 0,将执行terminated()方法 
 
TERMINATED 
结束态,terminated() 方法已完成 
 
状态转换: 
RUNNING -> SHUTDOWN :手动调用shutdown方法,或者ThreadPoolExecutor要被GC回收的时候调用finalize方法,finalize方法内部也会调用shutdown方法
(RUNNING or SHUTDOWN) -> STOP :调用shutdownNow方法
SHUTDOWN -> TIDYING :当队列和线程池都为空的时候
STOP -> TIDYING :当线程池为空的时候
TIDYING -> TERMINATED :terminated方法调用完成之后
ThreadPoolExecutor演示状态图 
摘自:https://www.jianshu.com/p/f8a73cb0983a 
 
核心方法 runStateOf(int c)  计算线程池当前的状态 1 2 3 4 5 6 7 8 9 10 private  static  int  runStateOf (int  c)   {    return  c & ~CAPACITY; } 
 
workerCountOf(int c) 计算线程数量 1 2 3 4 5 6 7 8 private  static  int  workerCountOf (int  c)   {    return  c & CAPACITY; } 
 
ctlOf(int rs, int wc) 1 2 3 4 5 6 7   private  static  int  workerCountOf (int  c)   {    return  c & CAPACITY; } 
 
execute(Runnable command)  execute 是开启ThreadPoolExecutor工作的方法。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public  void  execute (Runnable command)   {        if  (command == null )             throw  new  NullPointerException();         int  c = ctl.get();                  if  (workerCountOf(c) < corePoolSize) {                      if  (addWorker(command, true ))                 return ;                       c = ctl.get();         }                   if  (isRunning(c) && workQueue.offer(command)) {                       int  recheck = ctl.get();                       if  (!isRunning(recheck) && remove(command))                                  reject(command);                                               else  if  (workerCountOf(recheck) == 0 )                 addWorker(null , false );         }                           else  if  (!addWorker(command, false ))             reject(command);     } 
 
   其实从以上execute 可以看出,再执行该方法时,会根据线程池的状态等进行不用的操作 a,直接创建线程池  b,添加到阻塞队列  c.  基于maximumPoolSize创建work
addWorker(Runnable firstTask, boolean core) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 private  boolean  addWorker (Runnable firstTask, boolean  core)   {               retry:        for  (; ; ) {            int  c = ctl.get();            int  rs = runStateOf(c);                                    if  (rs >= SHUTDOWN &&                    !(rs == SHUTDOWN &&                            firstTask == null  &&                            !workQueue.isEmpty()))                return  false ;            for  (; ; ) {                int  wc = workerCountOf(c);                                if  (wc >= CAPACITY ||                        wc >= (core ? corePoolSize : maximumPoolSize))                    return  false ;                                if  (compareAndIncrementWorkerCount(c))                    break  retry;                                  c = ctl.get();                                                                 if  (runStateOf(c) != rs)                    continue  retry;                            }        }        boolean  workerStarted = false ;        boolean  workerAdded = false ;        ThreadPoolExecutor.Worker w = null ;        try  {                                    w = new  ThreadPoolExecutor.Worker(firstTask);                                   final  Thread t = w.thread;            if  (t != null ) {                final  ReentrantLock mainLock = this .mainLock;                                 mainLock.lock();                try  {                                        int  rs = runStateOf(ctl.get());                     if  (rs < SHUTDOWN ||                            (rs == SHUTDOWN && firstTask == null )) {                                                if  (t.isAlive())                             throw  new  IllegalThreadStateException();                                                workers.add(w);                                   int  s = workers.size();                                                if  (s > largestPoolSize)                              largestPoolSize = s;                        workerAdded = true ;                    }                } finally  {                    mainLock.unlock();                }                                if  (workerAdded) {                    t.start();                    workerStarted = true ;                }            }        } finally  {                        if  (!workerStarted)                addWorkerFailed(w);        }        return  workerStarted;    } 
 
   在 addWork方法种,我们可以看到,ThreadPoolExecutor为了更高效的运行,只在必要的地方加锁,而其他操作采用CAS 操作来解决并发问题,关于CAS操作,请浏览本文最后一节。
Worker类    程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 private  final  class  Worker           extends  AbstractQueuedSynchronizer            implements  Runnable   {      private  static  final  long  serialVersionUID = 6138294804551838833L ;       final  Thread thread;            Runnable firstTask;       volatile  long  completedTasks;       Worker(Runnable firstTask) {           setState(-1 );            this .firstTask = firstTask;           this .thread = getThreadFactory().newThread(this );       }       public  void  run ()   {           runWorker(this );       }                                   protected  boolean  isHeldExclusively ()   {           return  getState() != 0 ;       }       protected  boolean  tryAcquire (int  unused)   {           if  (compareAndSetState(0 , 1 )) {               setExclusiveOwnerThread(Thread.currentThread());               return  true ;           }           return  false ;       }       protected  boolean  tryRelease (int  unused)   {           setExclusiveOwnerThread(null );           setState(0 );           return  true ;       }       public  void  lock ()   {           acquire(1 );       }       public  boolean  tryLock ()   {           return  tryAcquire(1 );       }       public  void  unlock ()   {           release(1 );       }       public  boolean  isLocked ()   {           return  isHeldExclusively();       }       void  interruptIfStarted ()   {           Thread t;           if  (getState() >= 0  && (t = thread) != null  && !t.isInterrupted()) {               try  {                   t.interrupt();               } catch  (SecurityException ignore) {               }           }       }   } 
 
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中,这时就不能中断该线程。
 
如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
 
线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
 
之所以设置为不可重入,是因为不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。下面是setCorePoolSize方法,interruptIdleWorkers请接着往下看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public  void  setCorePoolSize (int  corePoolSize)   {       if  (corePoolSize < 0 )            throw  new  IllegalArgumentException();        int  delta = corePoolSize - this .corePoolSize;        this .corePoolSize = corePoolSize;        if  (workerCountOf(ctl.get()) > corePoolSize)                        interruptIdleWorkers();        else  if  (delta > 0 ) {            int  k = Math.min(delta, workQueue.size());            while  (k-- > 0  && addWorker(null , true )) {                if  (workQueue.isEmpty())                    break ;            }        }    } 
 
 
 
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
   在构造方法中setState(-1);,把state变量设置为-1,为什么这么做呢?是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:
1 2 3 4 5 6 7 8 9 10  public  boolean  tryLock ()   {           return  tryAcquire(1 );    } protected  boolean  tryAcquire (int  unused)   {          if  (compareAndSetState(0 , 1 )) {               setExclusiveOwnerThread(Thread.currentThread());               return  true ;           }           return  false ;    
 
runWork () 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 final  void  runWorker (ThreadPoolExecutorBak.Worker w)   {       Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null ;        w.unlock();         boolean  completedAbruptly = true ;        try  {                                                while  (task != null  || (task = getTask()) != null ) {                w.lock();                                                                                                                                                if  ((runStateAtLeast(ctl.get(), STOP) ||                        (Thread.interrupted() &&                                runStateAtLeast(ctl.get(), STOP))) &&                        !wt.isInterrupted())                    wt.interrupt();                try  {                                        beforeExecute(wt, task);                    Throwable thrown = null ;                    try  {                                                task.run();                    } catch  (RuntimeException x) {                        thrown = x;                        throw  x;                    } catch  (Error x) {                        thrown = x;                        throw  x;                    } catch  (Throwable x) {                        thrown = x;                        throw  new  Error(x);                    } finally  {                                                afterExecute(task, thrown);                    }                } finally  {                    task = null ;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false ;        } finally  {                        processWorkerExit(w, completedAbruptly);        }    } 
 
getTask() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private  Runnable getTask ()   {    boolean  timedOut = false ;           for  (; ; ) {         int  c = ctl.get();         int  rs = runStateOf(c);                           if  (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {             decrementWorkerCount();             return  null ;         }         int  wc = workerCountOf(c);                  boolean  timed = allowCoreThreadTimeOut || wc > corePoolSize;                  if  ((wc > maximumPoolSize || (timed && timedOut))                 && (wc > 1  || workQueue.isEmpty())) {             if  (compareAndDecrementWorkerCount(c))                 return  null ;             continue ;         }         try  {                          Runnable r = timed ?                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                     workQueue.take();             if  (r != null )                 return  r;             timedOut = true ;         } catch  (InterruptedException retry) {             timedOut = false ;         }     } } 
 
   
getActiveCount() 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public  int  getActiveCount ()   {    final  ReentrantLock mainLock = this .mainLock;     mainLock.lock();     try  {         int  n = 0 ;                  for  (ThreadPoolExecutorBak.Worker w : workers)             if  (w.isLocked())                 ++n;         return  n;     } finally  {         mainLock.unlock();     } }  public  boolean  isLocked ()   {        return  isHeldExclusively();  }  protected  boolean  isHeldExclusively ()   {        return  getState() != 0 ;  public  void  unlock ()   {             release(1 );  }  protected  boolean  tryRelease (int  unused)   {             setExclusiveOwnerThread(null );             setState(0 );             return  true ;   }  public  final  boolean  release (int  arg)   {         if  (tryRelease(arg)) {             Node h = head;             if  (h != null  && h.waitStatus != 0 )                 unparkSuccessor(h);             return  true ;         }         return  false ;  } 
 
   之前一直在疑惑 ThreadPoolExecutor.getActiveCount()为什么不准确,在work开始运行时,会执行一次unlock,也就是说会将 state 设置为0 ,而线程还未开始运行,所以当调用w.isLocked 时,只是判断 state != 0 ,那这是 isHeldExclusively 肯定返回false ,而  getActiveCount的值就不是最新,最准确的值。
shutdown() 与 shutdownNow() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86      public  void  shutdown ()   {         final  ReentrantLock mainLock = this .mainLock;         mainLock.lock();         try  {                          checkShutdownAccess();                          advanceRunState(SHUTDOWN);                          interruptIdleWorkers();             onShutdown();          } finally  {             mainLock.unlock();         }         tryTerminate();     }     private  void  advanceRunState (int  targetState)   {         for  (; ; ) {             int  c = ctl.get();                                       if  (runStateAtLeast(c, targetState) ||                     ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))                 break ;         }     }    public  List<Runnable> shutdownNow ()   {         List<Runnable> tasks;         final  ReentrantLock mainLock = this .mainLock;         mainLock.lock();         try  {             checkShutdownAccess();                          advanceRunState(STOP);                          interruptWorkers();                          tasks = drainQueue();         } finally  {             mainLock.unlock();         }         tryTerminate();         return  tasks;   }      private  void  interruptWorkers ()   {         final  ReentrantLock mainLock = this .mainLock;         mainLock.lock();         try  {             for  (ThreadPoolExecutorBak.Worker w : workers)                 w.interruptIfStarted();         } finally  {             mainLock.unlock();         }   }     void  interruptIfStarted ()   {             Thread t;             if  (getState() >= 0  && (t = thread) != null  && !t.isInterrupted()) {                 try  {                     t.interrupt();                 } catch  (SecurityException ignore) {                 }             }         }       private  List<Runnable> drainQueue ()   {         BlockingQueue<Runnable> q = workQueue;         ArrayList<Runnable> taskList = new  ArrayList<Runnable>();                                    q.drainTo(taskList);                  if  (!q.isEmpty()) {             for  (Runnable r : q.toArray(new  Runnable[0 ])) {                 if  (q.remove(r))                     taskList.add(r);             }         }         return  taskList;     } 
 
   上面程序的解释,可以明显看出shutdownNow 和 shutdown的区别。shutdown只会将线程池状态修改,也会中断线程,但是会的等所有正在运行的线程运行完成后中断但;shutdownNow 会直接结束正在运行的线程和清除队列中的线程。中断方法接着往下看。
tryTerminate() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 final  void  tryTerminate ()   {    for  (; ; ) {                                             int  c = ctl.get();         if  (isRunning(c) ||                 runStateAtLeast(c, TIDYING) ||                 (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))             return ;                           if  (workerCountOf(c) != 0 ) {                                                     interruptIdleWorkers(ONLY_ONE);             return ;         }         final  ReentrantLock mainLock = this .mainLock;         mainLock.lock();         try  {                          if  (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) {                 try  {                     terminated();                 } finally  {                     ctl.set(ctlOf(TERMINATED, 0 ));                                          termination.signalAll();                 }                 return ;             }         } finally  {             mainLock.unlock();         }              } } 
 
   tryTerminate这个方法会在多处调用,比如processWorkerExit、addWorkerFailed、shutdown、shutdownNow、remove、purge 中调用,为什么会多处调用? 在这个方法中调用了 interruptIdleWorkers方法,请接着往下看。
interruptIdleWorkers(boolean onlyOne) 和 interruptIdleWorkers() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private  void  interruptIdleWorkers ()   {    interruptIdleWorkers(false ); }  private  void  interruptIdleWorkers (boolean  onlyOne)   {                 final  ReentrantLock mainLock = this .mainLock;         mainLock.lock();         try  {             for  (ThreadPoolExecutorBak.Worker w : workers) {                 Thread t = w.thread;                                                   if  (!t.isInterrupted() && w.tryLock()) {                     try  {                         t.interrupt();                     } catch  (SecurityException ignore) {                     } finally  {                         w.unlock();                     }                 }                 if  (onlyOne)                     break ;             }         } finally  {             mainLock.unlock();         }     } 
 
processWorkerExit(ThreadPoolExecutorBak.Worker w, boolean completedAbruptly) 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private  void  processWorkerExit (ThreadPoolExecutorBak.Worker w, boolean  completedAbruptly)   {         if  (completedAbruptly)          decrementWorkerCount();     final  ReentrantLock mainLock = this .mainLock;     mainLock.lock();     try  {                  completedTaskCount += w.completedTasks;                  workers.remove(w);     } finally  {         mainLock.unlock();     }          tryTerminate();     int  c = ctl.get();     if  (runStateLessThan(c, STOP)) {                  if  (!completedAbruptly) {                                             int  min = allowCoreThreadTimeOut ? 0  : corePoolSize;             if  (min == 0  && !workQueue.isEmpty())                 min = 1 ;             if  (workerCountOf(c) >= min)                 return ;          }                  addWorker(null , false );     } } 
 
   processWorkerExit方法是在runWork方法中的 finally 中调用,也就是说,执行线程不管结果如何都会调用该方法,该方法就是统计线程执行完成数量、维护works、尝试终止线程池、以及如果线程执行异常,会创建工作线程去执行队列中的用户自定义线程,其实也可以认为是“收尾工作”
   为什么在shutdown 和 shutdownNow中都会采用不同的方式中断线程,因为在这两个方法中,已经将线程池的状态改SHUTDOWN,而这时如果workQueue为空,那肯定有线程在getTask方法中阻塞,而用户已经无法提交线程到线程池,这时线程池就无法关闭。
   这也可以解释shutdown,shutdownNow 的不同,因为 shutdownNow 不会关心线程处于什么状态就直接中断,而shutdown会等所有线程执行完成而再进行中断。
附录 1. CAS 锁机制存在以下问题:
(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。
(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。
(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。
   独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止 
   乐观锁用的机制就是CAS ,Compare and Swap 
   CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
        在Java中可以使用锁和循环CAS来进行原子操作,自旋CAS的基本思路就是循环进行CAS操作,直到成功为止。在JDK1.5之后,JDK的并发包里提供了一些类来支持原子操作,如AtomicBoolean,AtomicInteger,AtomicLong都是用原子的方式来更新指定类型的值。例如AtomicInteger的用法如下:
1 2 3 AtomicInteger atomicInteger=new  AtomicInteger(0 ); int  i=atomicInteger.get();atomicInteger.compareAndSet(i,i++); 
 
CAS还存在以下三个问题: 
(1)ABA问题。如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时发现它的值没有发生变        化,但实际上却发生了。ABA的解决思路就是使用版本号,在变量前面追加版本号,那么A——B——A 就变成了1A——2B——3A。 (2)循环时间长开销大 
(3)只能保证一个共性变量的原子操作。也就是多个共享变量操作时,循环CAS就无法保证操作的原子性了。但从JDK1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。