前言 在Java中,使用线程来异步执行任务。但是Java线程的创建与销毁需要一定的开销,因此我们可能会考虑使用线程池来复用线程以达到较高的性能。使用线程池的好处 :
降低资源消耗。
提高响应速度。
提高线程的可管理性。
由于线程池的以上好处,JDK1.5中的Executors框架就因此而问世。
Java线程既是工作单元,也是执行单元。从JDK1.5开始,把工作单元与执行机制分离开
来。工作单元包括Runnable 和 Callable,而执行机制由Executor框架提供。
Executors框架类结构图
简单使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 public class ExecutorsTest { static class Task implements Callable <String > { private int seed; public Task (int seed) { this .seed = seed; } @Override public String call () throws Exception { TimeUnit.SECONDS.sleep(5 ); return "线程 " + Thread.currentThread().getName() + " 获取种子数: " + seed; } } static class TestRunnable implements Runnable { @Override public void run () { System.out.println(Thread.currentThread().getName() + "线程被调用了。" ); } } public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 200 ,0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); Random random = new Random(10000 ); List<Future> list = new ArrayList<>(); for (int index = 0 ; index < 1 ; index++) { Future<String> future = executor.submit(new Task(random.nextInt())); list.add(future); } executor.shutdown(); for (Future<String> future : list) { while (!future.isDone()) { System.out.println("Future返回如果没有完成,则一直循环等待,直到Future返回完成" ); } System.out.println(future.get()); } ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); for (int i = 0 ; i < 200 ; i++){ executorService.execute(new TestRunnable()); } executorService.shutdown(); while (! executorService.isTerminated()) { System.out.println("任务执行中。。。。" ); } System.out.println("完成" ); } }
Executors源码 Java API还提供的工具类Executors,可以帮我们针对不同的应用场景创建不同的Executor实现类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newWorkStealingPool (int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null , true ); } public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ScheduledExecutorService newSingleThreadScheduledExecutor () { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1 )); } public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
由工具类Executors获取具体执行器后,会调用相应的方法将任务添加到执行队列中。其中 ForkJoinPool 我们已经学习过,本章已最常用的ThreadPoolExecutor
进行分析。关于ScheduledThreadPoolExecutor
类日后再说。
ThreadPoolExecutor源码分析 数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile int corePoolSize; private volatile int maximumPoolSize; private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... } }
在 ThreadPoolExecutor 类内部结构中,定义的属性有
线程池的各种状态
1 2 3 4 5 RUNNING: 能接受新提交的任务,并且也能处理阻塞队列中的任务 SHUTDOWN: 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。 STOP: 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。 IDYING: 如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。 TERMINATED: 在terminated() 方法执行完后进入该状态。
线程池的状态转换过程:
主池控制状态ctl
ctl相关的计算的方法:
1 2 3 4 private static int runStateOf (int c) { return c & ~CAPACITY; } private static int workerCountOf (int c) { return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; }
等待执行的任务的阻塞队列。在线程池中的线程数不小于corePoolSize
时,会将新提交的任务丢进阻塞队列中。
可重入ReentrantLock对象锁
工作线程集合。用于维护线程池中的所有工作线程。工作线程会封装在内部类Worker
中。仅仅在获取可重入锁后才能访问。
饱和策略RejectedExecutionHandler类。在阻塞队列也满后,会使用RejectedExecutionHandler类对新提交任务进行处理。
其他通过构造函数传递的一些基本参数。如:corePoolSize, maximumPoolSize,threadFactory等。
execute源码 由上面实例可知,在获取到执行器 ExecutorService
后,会直接调用其execute()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
在代码注释中,也已说明,该方法分三步处理:
如果线程运行的数量少于corePoolSize,则尝试将给定任务作为第一个任务来创建新线程。添加新线程成功,则方法返回;否则重新获取ctl值,执行第下步。
如果线程池处于运行状态且任务成功添加到阻塞队列后,需要再次检查线程池是否已关闭。已关闭则将已入队的任务移除,使用饱和策略处理新提交任务。没有关闭,则判断是否需要添加一个工作线程
如果线程池为非运行状态或阻塞队列已满,则尝试新加工作线程,如果失败,则使用饱和策略处理新提交任务。
addWorker源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
功能逻辑:
进入外部for循环。
获取线程池的运行状态rs
如果线程池的运行状态为非运行状态,只有在状态为 SHUTDOWN
且任务队列不为空的情况下,执行下一步;否则,直接返回FALSE。(因为在池状态为SHUTDOWN
时,虽不能接收新任务,但是却可以处理已提交任务,所以可以往下执行。其他非运行状态,则不允许往下执行)。
进入内部for循环
获取活动线程数
如果活跃线程数超过容量CAPACITY=2^29-1
,则不创建新线程,返回FALSE。否则执行下一步。
增加工作线程数。成功,跳出外部for循环,执行第2步;失败,重新获取ctl值,如果线程池状态发生改变,则从外部for循环重新开始处理。
根据新提交任务,创建工作线程。
工作线程不为空的情况下,如果线程池处于运行状态或者(线程池已经SHUTDOWN且头任务为空),则将新创建的工作线程添加到线程集合workers
中,然后启动工作线程。
如果工作线程启动失败后,则调用addWorkerFailed()
方法将新创建的工作线程从 线程集合workers
中移除。
addWorkerFailed源码 1 2 3 4 5 6 7 8 9 10 11 12 13 private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
该方法的主要功能是:在新建工作线程启动失败后,将新建工作线程从线程集合workers
中移除,并将工作线程数减一,然后尝试中断线程池。
tryTerminate源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
功能逻辑:
判断是否需要中断。当前线程池的状态为以下几种情况时,方法直接返回结束。
1 2 3 1 . RUNNING,因为还在运行中,不能停止;2 . TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;3 . SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
如果需要中断且工作线程数不为0,则调用 interruptIdleWorkers()
尝试中断一个线程,然后返回结束。
如果工作线程数为0,则尝试将线程池状态修改为 TIDYING
,成功后,调用terminated方法中断线程池。然后设置状态为 TERMINATED
。最后唤醒所以沉睡线程。
interruptIdleWorkers源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void interruptIdleWorkers () { interruptIdleWorkers(false ); } private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
从代码实现逻辑可知,interruptIdleWorkers方法主要就是循环工作集合,然后将工作线程全部中断。
Worker源码解析 在 addWorker()
方法中,添加工作线程时,会出现 Worker
类,该类封装了工作线程和任务。在上面的ThreadPoolExecutor
的数据结构中,我们已知ThreadPool维护的其实就是一组Worker对象。线程池中的每一个线程被封装成一个Worker对象。
Worker数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } }
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。所以在addWorker()
方法中 t.start();
一句会调用 Worker类
的 run()
方法。
run源码 1 2 3 4 public void run () { runWorker(this ); }
在Worker类中的run方法调用了runWorker方法来执行任务。
runWorker源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
该方法的功能就是:重复获取任务,然后执行。先执行当前工作线程所拥有的首任务,如果没有,则从任务列表中获取,依次执行。代码逻辑:
在首任务不为空或者调用方法getTask()
获取的任务不为空,执行下一步;否则,执行执行processWorkerExit()方法。
如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
调用task.run()执行任务;
runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
getTask源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
getTask方法用来从阻塞队列中取任务。代码逻辑:
进入for无限循环。
获取线程池的运行状态。
如果线程池的运行状态为非运行状态(SHUTDOWN,STOP,TIDYING,TERMINATED)
且任务队列为空,则直接返回null;如果线程池的运行状态为SHUTDOWN且任务队列不为空
,则往下执行。
备注:在线程池状态为SHUTDOWN时,不能接收新任务,但是可以执行已提交任务,所以getTask会获取任务返回;如果线程池状态为STOP,TIDYING,TERMINATED,则既不能接收任务,也不能处理已提交任务,所以getTask会直接返回null
获取工作线程的数量
控制线程池的有效线程数量。
从阻塞队列中获取任务
如果任务不为空,则返回任务,方法结束;否则继续for循环。
processWorkerExit源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
该方法的主要功能是:销毁工作线程,尝试中断线程池,并维护维护池中有效线程。代码逻辑:
从工作集合中清除指定工作线程
尝试中断线程池
如果线程池的状态为可执行状态(SHUTDOWN 或 RUNNING),则维护池中有效工作线程。
整体处理流程
shutdown源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
该方法将线程池的状态改为SHUTDOWN
,然后中断工作集合中的一个线程。
shutdownNow源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownNow方法与shutdown方法类似,不同的地方在于:
设置状态为STOP;
中断所有工作线程,无论是否是空闲的;
取出阻塞队列中没有被执行的任务并返回。
shutdownNow方法执行完之后调用tryTerminate方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为TERMINATED。
参考文章 Executor框架使用与分析一
Executor框架使用与分析二
Executor框架使用与分析三
【Java多线程】Executor框架的详解
深入理解 Java 线程池:ThreadPoolExecutor