前言 Fork/Join框架是一个用于并行执行任务的框架,核心是:把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果。 体现了分而治之的思想。类似于算法中的归并排序。该框架之所以有更好的并发性能,是因为其充分利用了所有线程的工作能力,避免空闲线程,充分发挥多核并行的处理能力。
使用 预备知识点 在java的官方文档介绍Fork/Join 中,可以知道Fork/Join框架的基本使用的伪代码为:
1 2 3 4 5 if (my portion of the work is small enough) do the work directly else split my work into two pieces invoke the two pieces and wait for the results
在使用的时候, 将这些伪代码封装到ForkJoinTask的子类中。通常使用的ForkJoinTask的子类是:RecursiveTask(有返回结果) 和 RecursiveAction(无返回结果) 。我们只要根据需要,继承这两个类中的一个,并实现其中的方法,即可实现自己定义的任务类。然后在使用ForkJoinPool去执行任务。
使用实例:排序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 public class SortTest { private static int MAX = 1000 ; private static int inits[] = new int [MAX]; static { Random random = new Random(); for (int i = 0 ; i < MAX; i++) { inits[i] = random.nextInt(1000000 ); } } public static void main (String[] args) { int a[] = inits.clone(); SortTask task = new SortTask(a); ForkJoinPool pool = new ForkJoinPool(); System.out.println("排序前:" + Arrays.toString(a)); pool.submit(task); do { try { TimeUnit.MILLISECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone()); System.out.println("排序后:" + Arrays.toString(a)); } static class SortTask extends RecursiveAction { static final int THRESHOLD = 10 ; final int [] array; final int lo, hi; SortTask(int [] array, int lo, int hi) { this .array = array; this .lo = lo; this .hi = hi; } SortTask(int [] array) { this (array, 0 , array.length); } @Override protected void compute () { if (hi - lo < THRESHOLD) { sortSequentially(lo, hi); } else { int mid = (lo + hi) >>> 1 ; invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi)); merge(lo, mid, hi); } } void sortSequentially (int lo, int hi) { Arrays.sort(array, lo, hi); } void merge (int lo, int mid, int hi) { int [] buf = Arrays.copyOfRange(array, lo, mid); for (int i = 0 , j = lo, k = mid; i < buf.length; j++) { array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++]; } } } }
源码解析 由上面的使用实例可知,Fork/Join框架的使用,分为三个步骤:
创建任务
创建执行任务的线程池ForkJoinPool对象。
执行任务
首先我们分析ForkJoinTask源码
ForkJoinTask源码分析 内部数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public abstract class ForkJoinTask <V > implements Future <V >, Serializable { volatile int status; static final int DONE_MASK = 0xf0000000 ; static final int NORMAL = 0xf0000000 ; static final int CANCELLED = 0xc0000000 ; static final int EXCEPTIONAL = 0x80000000 ; static final int SIGNAL = 0x00010000 ; static final int SMASK = 0x0000ffff ; }
ForkJoinTask类继承了Future类。该类内部主要是有个属性,记录任务的执行状态。由使用实例可知,ForkJoinTask类的主要方法是:fork,join,invokeAll等,下面进行主要方法分析
fork源码 1 2 3 4 5 6 7 8 9 10 11 12 public final ForkJoinTask<V> fork () { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this ); else ForkJoinPool.common.externalPush(this ); return this ; }
该方法的主要功能是:将任务进行拆分。在内部实现中,如果当前线程为ForkJoinWorkerThread线程时,则直接将该任务添加进当前线程的工作队列中;否则,则通过线程池ForkJoinPool将任务添加到池中的随机一个工作队列中。具体逻辑,后文再分析。
join源码 1 2 3 4 5 6 7 8 public final V join () { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
该方法的功能是:执行子任务并合并子任务的结果集。内部实现中,会调用doJoin()方法,执行任务,如果执行成功,则调用抽象方法getRawResult()返回执行结果;否则,执行reportException()方法处理异常。
doJoin源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private int doJoin () { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this ) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this , 0L ) : externalAwaitDone(); }
实现逻辑:
判断任务的执行状态status。如果status < 0,则直接返回;否则执行第2步
判断当前的任务是否可以执行,如果可以执行则调用doExec方法,如果不能执行则加入等待的队列。具体逻辑为:
如果当前线程为ForkJoinWorkerThread线程,则从当前线程的工作队列中弹出顶部任务,并执行该任务。如果执行成功,则返回;否则,如果执行失败,则将任务加入到等待的队列。(帮助其他线程处理任务,即窃取任务 或者阻塞直到任务完成或超时)
如果不是ForkJoinWorkerThread线程fork/join线程 阻塞
在ForkJoinPool类有关WorkQueue介绍如下:
1 2 3 4 5 6 7 8 9 10 11 The pop operation (always performed by owner) is: if ((base != top) and (the task at top slot is not null ) and (CAS slot to null ) ) decrement top and return task ;And the poll operation (usually by a stealer) is if ((base != top) and (the task at base slot is not null ) and (base has not changed) and (CAS slot to null ) ) increment base and return task ;
可知,在线程处理自己内部的工作队列时,是从头部开始,即pop();但是在窃取工作帮助其他线程处理任务时,是从尾部开始,即poll()。
所以在doJoin()中,如果为ForkJoinWorkerThread线程时,会调用tryUnpush()方法从头部获取待处理任务。
doExec源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final int doExec () { int s; boolean completed; if ((s = status) >= 0 ) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
该方法的功能是:执行任务。在之前前会判断任务状态,如果任务可执行,则调用exec()方法,由继承关系可知,实际调用的是ForkJoinTask子类的相应实现。在RecursiveAction 和 RecursiveTask中,都是调用了compute()方法。即调用了开发者自己的实现逻辑。在执行成功后,会通过setCompletion()方法设置任务的状态。
externalAwaitDone源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private int externalAwaitDone () { int s = ((this instanceof CountedCompleter) ? ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)this , 0 ) : ForkJoinPool.common.tryExternalUnpush(this ) ? doExec() : 0 ); if (s >= 0 && (s = status) >= 0 ) { boolean interrupted = false ; do { if (U.compareAndSwapInt(this , STATUS, s, s | SIGNAL)) { synchronized (this ) { if (status >= 0 ) { try { wait(0L ); } catch (InterruptedException ie) { interrupted = true ; } } else notifyAll(); } } } while ((s = status) >= 0 ); if (interrupted) Thread.currentThread().interrupt(); } return s; }
invoke源码 1 2 3 4 5 6 7 public final V invoke () { int s; if ((s = doInvoke() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
该方法主要是:执行任务并返回执行结果。其内部实现与join()方法类似,但是在实现任务执行的逻辑时其内部调用的是doInvoke()方法。doInvoke()方法与doJoin()方法内部实现逻辑也很类似。
doInvoke源码 1 2 3 4 5 6 7 8 9 10 private int doInvoke () { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this , 0L ) : externalAwaitDone(); }
该方法功能和doJoin()方法也很类似。区别是:doInvoke()方法直接调用doExec()方法执行任务。执行成功,直接返回;否则,判断是否为ForkJoinWorkerThread线程。是,调用awaitJoin()方法将任务添加到等待的队列;否则调用externalAwaitDone()阻塞任务。
invokeAll源码 1 2 3 4 5 6 7 8 9 10 11 12 public static void invokeAll (ForkJoinTask<?> t1, ForkJoinTask<?> t2) { int s1, s2; t2.fork(); if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) t1.reportException(s1); if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) t2.reportException(s2); }
invokeAll()方法将处理两个子任务。由内部实现可知,实现逻辑是:现将子任务2放进工作队列,然后执行子任务1,在子任务1处理完后,执行子任务2。该方法等同于:
1 2 3 4 5 6 7 t2.fork(); t1.fork(); t1.join(); t2.join();
但需要注意的是,子任务t1,t2调用join()方法的顺序要与调用fork()方法的顺序相反。因为在后面分析ForkJoinPool的入队逻辑时,我们可以知道,t1会放置在工作队列的top位置,而在join()时,会从工作队列的top位置取出任务并执行,如果执行的任务不是top位置的任务的话,线程最终只能挂起阻塞,等待通知。有关信息可以 点击查看
ForkJoinTask重要实现类 RecursiveTask源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public abstract class RecursiveTask <V > extends ForkJoinTask <V > { private static final long serialVersionUID = 5232453952276485270L ; V result; protected abstract V compute () ; public final V getRawResult () { return result; } protected final void setRawResult (V value) { result = value; } protected final boolean exec () { result = compute(); return true ; } }
RecursiveTask 类是ForkJoinTask 的子类,该类会有返回值。在并发任务中,如果需要处理结果有返回值的话,继承该类并实现compute()即可。
RecursiveAction源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public abstract class RecursiveAction extends ForkJoinTask <Void > { private static final long serialVersionUID = 5232453952276485070L ; protected abstract void compute () ; public final Void getRawResult () { return null ; } protected final void setRawResult (Void mustBeNull) { } protected final boolean exec () { compute(); return true ; } }
RecursiveAction 类与RecursiveTask 类非常类似。唯一的区别时 RecursiveAction类不会返回执行结果。用法与RecursiveTask 类一样。
至此,有关ForkJoinTask 类大致分析完,下篇文章接着分析 ForkJoinPool 类
参考文章 Fork/Join
多任务处理(12)——Fork/Join框架(基本使用)
多任务处理(13)——Fork/Join框架(解决排序问题)
多任务处理(14)——Fork/Join框架(要点1)
多任务处理(15)——Fork/Join框架(要点2)
聊聊并发(八)——Fork/Join框架介绍
Fork/Join框架使用与分析
Java源码分析 - ForkJoinTask篇
Fork and Join: Java Can Excel at Painless Parallel Programming Too!
Doug Lea –《A Java Fork/Join Framework》
ForkJoinPool解读