前言 Fork/Join框架是一个用于并行执行任务的框架,核心是:把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果。 体现了分而治之
的思想。类似于算法中的归并排序
。该框架之所以有更好的并发性能,是因为其充分利用了所有线程的工作能力,避免空闲线程,充分发挥多核并行的处理能力。
使用 预备知识点 在java的官方文档介绍Fork/Join 中,可以知道Fork/Join框架的基本使用的伪代码为:
1 2 3 4 5 if (my portion of the work is small enough) do the work directly else split my work into two pieces invoke the two pieces and wait for the results
在使用的时候, 将这些伪代码封装到ForkJoinTask
的子类中。通常使用的ForkJoinTask
的子类是:RecursiveTask(有返回结果)
和 RecursiveAction(无返回结果)
。我们只要根据需要,继承这两个类中的一个,并实现其中的方法,即可实现自己定义的任务类。然后在使用ForkJoinPool
去执行任务。
使用实例:排序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 public class SortTest { private static int MAX = 1000 ; private static int inits[] = new int [MAX]; static { Random random = new Random(); for (int i = 0 ; i < MAX; i++) { inits[i] = random.nextInt(1000000 ); } } public static void main (String[] args) { int a[] = inits.clone(); SortTask task = new SortTask(a); ForkJoinPool pool = new ForkJoinPool(); System.out.println("排序前:" + Arrays.toString(a)); pool.submit(task); do { try { TimeUnit.MILLISECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone()); System.out.println("排序后:" + Arrays.toString(a)); } static class SortTask extends RecursiveAction { static final int THRESHOLD = 10 ; final int [] array; final int lo, hi; SortTask(int [] array, int lo, int hi) { this .array = array; this .lo = lo; this .hi = hi; } SortTask(int [] array) { this (array, 0 , array.length); } @Override protected void compute () { if (hi - lo < THRESHOLD) { sortSequentially(lo, hi); } else { int mid = (lo + hi) >>> 1 ; invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi)); merge(lo, mid, hi); } } void sortSequentially (int lo, int hi) { Arrays.sort(array, lo, hi); } void merge (int lo, int mid, int hi) { int [] buf = Arrays.copyOfRange(array, lo, mid); for (int i = 0 , j = lo, k = mid; i < buf.length; j++) { array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++]; } } } }
源码解析 由上面的使用实例可知,Fork/Join框架的使用,分为三个步骤:
创建任务
创建执行任务的线程池ForkJoinPool对象。
执行任务
首先我们分析ForkJoinTask
源码
ForkJoinTask源码分析 内部数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public abstract class ForkJoinTask <V > implements Future <V >, Serializable { volatile int status; static final int DONE_MASK = 0xf0000000 ; static final int NORMAL = 0xf0000000 ; static final int CANCELLED = 0xc0000000 ; static final int EXCEPTIONAL = 0x80000000 ; static final int SIGNAL = 0x00010000 ; static final int SMASK = 0x0000ffff ; }
ForkJoinTask类继承了Future
类。该类内部主要是有个属性,记录任务的执行状态。由使用实例可知,ForkJoinTask类的主要方法是:fork,join,invokeAll等,下面进行主要方法分析
fork源码 1 2 3 4 5 6 7 8 9 10 11 12 public final ForkJoinTask<V> fork () { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this ); else ForkJoinPool.common.externalPush(this ); return this ; }
该方法的主要功能是:将任务进行拆分
。在内部实现中,如果当前线程为ForkJoinWorkerThread
线程时,则直接将该任务添加进当前线程的工作队列中;否则,则通过线程池ForkJoinPool
将任务添加到池中的随机一个工作队列中。具体逻辑,后文再分析。
join源码 1 2 3 4 5 6 7 8 public final V join () { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
该方法的功能是:执行子任务并合并子任务的结果集
。内部实现中,会调用doJoin()
方法,执行任务,如果执行成功,则调用抽象方法getRawResult()
返回执行结果;否则,执行reportException()
方法处理异常。
doJoin源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private int doJoin () { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this ) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this , 0L ) : externalAwaitDone(); }
实现逻辑:
判断任务的执行状态status。如果status < 0
,则直接返回;否则执行第2步
判断当前的任务是否可以执行,如果可以执行则调用doExec方法,如果不能执行则加入等待的队列。具体逻辑为:
如果当前线程为ForkJoinWorkerThread
线程,则从当前线程的工作队列中弹出顶部任务,并执行该任务。如果执行成功,则返回;否则,如果执行失败,则将任务加入到等待的队列。(帮助其他线程处理任务,即窃取任务 或者阻塞直到任务完成或超时)
如果不是ForkJoinWorkerThread
线程fork/join线程 阻塞
在ForkJoinPool类有关WorkQueue
介绍如下:
1 2 3 4 5 6 7 8 9 10 11 The pop operation (always performed by owner) is: if ((base != top) and (the task at top slot is not null ) and (CAS slot to null ) ) decrement top and return task ;And the poll operation (usually by a stealer) is if ((base != top) and (the task at base slot is not null ) and (base has not changed) and (CAS slot to null ) ) increment base and return task ;
可知,在线程处理自己内部的工作队列时,是从头部开始,即pop();但是在窃取工作帮助其他线程处理任务时,是从尾部开始,即poll()。
所以在doJoin()
中,如果为ForkJoinWorkerThread
线程时,会调用tryUnpush()
方法从头部获取待处理任务。
doExec源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final int doExec () { int s; boolean completed; if ((s = status) >= 0 ) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
该方法的功能是:执行任务。在之前前会判断任务状态,如果任务可执行,则调用exec()
方法,由继承关系可知,实际调用的是ForkJoinTask
子类的相应实现。在RecursiveAction
和 RecursiveTask
中,都是调用了compute()
方法。即调用了开发者自己的实现逻辑。在执行成功后,会通过setCompletion()方法设置任务的状态。
externalAwaitDone源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private int externalAwaitDone () { int s = ((this instanceof CountedCompleter) ? ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)this , 0 ) : ForkJoinPool.common.tryExternalUnpush(this ) ? doExec() : 0 ); if (s >= 0 && (s = status) >= 0 ) { boolean interrupted = false ; do { if (U.compareAndSwapInt(this , STATUS, s, s | SIGNAL)) { synchronized (this ) { if (status >= 0 ) { try { wait(0L ); } catch (InterruptedException ie) { interrupted = true ; } } else notifyAll(); } } } while ((s = status) >= 0 ); if (interrupted) Thread.currentThread().interrupt(); } return s; }
invoke源码 1 2 3 4 5 6 7 public final V invoke () { int s; if ((s = doInvoke() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
该方法主要是:执行任务并返回执行结果
。其内部实现与join()
方法类似,但是在实现任务执行的逻辑时其内部调用的是doInvoke()
方法。doInvoke()
方法与doJoin()
方法内部实现逻辑也很类似。
doInvoke源码 1 2 3 4 5 6 7 8 9 10 private int doInvoke () { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this , 0L ) : externalAwaitDone(); }
该方法功能和doJoin()
方法也很类似。区别是:doInvoke()方法直接调用doExec()
方法执行任务。执行成功,直接返回;否则,判断是否为ForkJoinWorkerThread
线程。是,调用awaitJoin()
方法将任务添加到等待的队列;否则调用externalAwaitDone()
阻塞任务。
invokeAll源码 1 2 3 4 5 6 7 8 9 10 11 12 public static void invokeAll (ForkJoinTask<?> t1, ForkJoinTask<?> t2) { int s1, s2; t2.fork(); if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) t1.reportException(s1); if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) t2.reportException(s2); }
invokeAll()方法将处理两个子任务。由内部实现可知,实现逻辑是:现将子任务2放进工作队列,然后执行子任务1,在子任务1处理完后,执行子任务2。该方法等同于:
1 2 3 4 5 6 7 t2.fork(); t1.fork(); t1.join(); t2.join();
但需要注意的是,子任务t1,t2调用join()
方法的顺序要与调用fork()
方法的顺序相反。因为在后面分析ForkJoinPool
的入队逻辑时,我们可以知道,t1会放置在工作队列的top位置,而在join()
时,会从工作队列的top位置取出任务并执行,如果执行的任务不是top位置的任务的话,线程最终只能挂起阻塞,等待通知。有关信息可以 点击查看
ForkJoinTask重要实现类 RecursiveTask源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public abstract class RecursiveTask <V > extends ForkJoinTask <V > { private static final long serialVersionUID = 5232453952276485270L ; V result; protected abstract V compute () ; public final V getRawResult () { return result; } protected final void setRawResult (V value) { result = value; } protected final boolean exec () { result = compute(); return true ; } }
RecursiveTask 类是ForkJoinTask 的子类,该类会有返回值。在并发任务中,如果需要处理结果有返回值的话,继承该类并实现compute()
即可。
RecursiveAction源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public abstract class RecursiveAction extends ForkJoinTask <Void > { private static final long serialVersionUID = 5232453952276485070L ; protected abstract void compute () ; public final Void getRawResult () { return null ; } protected final void setRawResult (Void mustBeNull) { } protected final boolean exec () { compute(); return true ; } }
RecursiveAction 类与RecursiveTask 类非常类似。唯一的区别时 RecursiveAction
类不会返回执行结果。用法与RecursiveTask
类一样。
至此,有关ForkJoinTask
类大致分析完,下篇文章接着分析 ForkJoinPool
类
参考文章 Fork/Join
多任务处理(12)——Fork/Join框架(基本使用)
多任务处理(13)——Fork/Join框架(解决排序问题)
多任务处理(14)——Fork/Join框架(要点1)
多任务处理(15)——Fork/Join框架(要点2)
聊聊并发(八)——Fork/Join框架介绍
Fork/Join框架使用与分析
Java源码分析 - ForkJoinTask篇
Fork and Join: Java Can Excel at Painless Parallel Programming Too!
Doug Lea –《A Java Fork/Join Framework》
ForkJoinPool解读