在ForkJoinPool的介绍中有说:ForkJoinPool是一种用于运行ForkJoinTasks的ExecutorService(An ExecutorService for running ForkJoinTasks)。即ForkJoinPool也是一种ExecutorService。但由于工作窃取(work-stealing)的缘故,该类与其他ExecutorService又不太一样。
A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool. Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).
volatilelong ctl; // main pool control //线程池运行状态。对应状态为下面各值。在为SHUTDOWN时,其值为负数 volatileint runState; // lockable status // runState bits: SHUTDOWN must be negative, others arbitrary powers of two privatestaticfinalint RSLOCK = 1; privatestaticfinalint RSIGNAL = 1 << 1; privatestaticfinalint STARTED = 1 << 2; privatestaticfinalint STOP = 1 << 29; privatestaticfinalint TERMINATED = 1 << 30; privatestaticfinalint SHUTDOWN = 1 << 31; //负数
//默认工作线程工厂 publicstaticfinal ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; //静态方法commonPool()会初始化该值 staticfinal ForkJoinPool common; volatile WorkQueue[] workQueues; // main registry final ForkJoinWorkerThreadFactory factory; final UncaughtExceptionHandler ueh; // per-worker UEH //内部工作队列 staticfinalclassWorkQueue{} //工作队列线程创建工厂类,该类用于创建工作线程 publicstaticinterfaceForkJoinWorkerThreadFactory{ /** * Returns a new worker thread operating in the given pool. * * @param pool the pool this thread works in * @return the new worker thread * @throws NullPointerException if the pool is null */ public ForkJoinWorkerThread newThread(ForkJoinPool pool); }
publicclassForkJoinWorkerThreadextendsThread{ /* * ForkJoinWorkerThreads are managed by ForkJoinPools and perform * ForkJoinTasks. For explanation, see the internal documentation * of class ForkJoinPool. * * This class just maintains links to its pool and WorkQueue. The * pool field is set immediately upon construction, but the * workQueue field is not set until a call to registerWorker * completes. This leads to a visibility race, that is tolerated * by requiring that the workQueue field is only accessed by the * owning thread. * * Support for (non-public) subclass InnocuousForkJoinWorkerThread * requires that we break quite a lot of encapsulation (via Unsafe) * both here and in the subclass to access and set Thread fields. */ final ForkJoinPool pool; // the pool this thread works in final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics }
protectedForkJoinWorkerThread(ForkJoinPool pool){ // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); }
staticfinalint INITIAL_QUEUE_CAPACITY = 1 << 13; staticfinalint MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // Instance fields volatileint scanState; // versioned, <0: inactive; odd:scanning int stackPred; // pool stack (ctl) predecessor int nsteals; // number of steals int hint; // randomization and stealer index hint int config; // pool index and mode // 队列状态 volatileint qlock; // 1: locked, < 0: terminate; else 0 // 下一个出队元素的索引位 volatileint base; // index of next slot for poll // 为下一个入队元素准备的索引位 int top; // index of next slot for push // 队列中使用数组存储元素 ForkJoinTask<?>[] array; // the elements (initially unallocated) // 队列所属的ForkJoinPool(可能为空) final ForkJoinPool pool; // the containing pool (may be null) // 这个队列所属的归并计算工作线程。注意,工作队列也可能不属于任何工作线程 final ForkJoinWorkerThread owner; // owning thread or null if shared volatile Thread parker; // == owner during call to park; else null // 记录当前正在进行join等待的其它任务 volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin // 当前正在偷取的任务 volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; // Place indices in the center of array (that is not yet allocated) base = top = INITIAL_QUEUE_CAPACITY >>> 1; }
/** * Pops the given task only if it is at the current top. * (A shared version is available only via FJP.tryExternalUnpush) */ //当指定的任务为当前队列顶部时,弹出 finalbooleantryUnpush(ForkJoinTask<?> t){ ForkJoinTask<?>[] a; int s; if ((a = array) != null && (s = top) != base && //待处理任务列表不为空且有元素 U.compareAndSwapObject (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { //将任务出队 U.putOrderedInt(this, QTOP, s); returntrue; } returnfalse; }
/** * Executes the given task and any remaining local tasks. */ finalvoidrunTask(ForkJoinTask<?> task){ if (task != null) { scanState &= ~SCANNING; //标识该线程处于繁忙状态 mark as busy (currentSteal = task).doExec(); //执行偷取的Task U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
//调用execLocalTasks对线程所属的WorkQueue内的任务进行LIFO执行 execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }
/** * Removes and executes all local tasks. If LIFO, invokes * pollAndExecAll. Otherwise implements a specialized pop loop * to exec until empty. */ //删除并执行所有本地任务。 finalvoidexecLocalTasks(){ int b = base, m, s; //获取当前线程内部任务 ForkJoinTask<?>[] a = array;
if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0) { //有任务
if ((config & FIFO_QUEUE) == 0) { //如果是先进先出(FIFO)队列,则从top位置依次取出任务并执行 for (ForkJoinTask<?> t;;) {
/** * If present, removes from queue and executes the given task, * or any other cancelled task. Used only by awaitJoin. * * @return true if queue empty and task not known to be done */ finalbooleantryRemoveAndExec(ForkJoinTask<?> task){ ForkJoinTask<?>[] a; int m, s, b, n;
//遍历 找到task,尝试进行执行和删除操作 while ((n = (s = top) - (b = base)) > 0) { for (ForkJoinTask<?> t;;) { // traverse from s to b //从索引s处开始遍历 long j = ((--s & m) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) return s + 1 == top; // shorter than expected elseif (t == task) { //找到任务 boolean removed = false;
//移除任务 if (s + 1 == top) { // pop if (U.compareAndSwapObject(a, j, task, null)) { U.putOrderedInt(this, QTOP, s); removed = true; } } elseif (base == b) // replace with proxy removed = U.compareAndSwapObject( a, j, task, new EmptyTask());
//如果任务被移除,执行task任务 if (removed) task.doExec(); break; } elseif (t.status < 0 && s + 1 == top) { //遍历到的任务t被取消,则移除任务t,并重新设置top值,然后重新遍历 if (U.compareAndSwapObject(a, j, t, null)) U.putOrderedInt(this, QTOP, s); break; // was cancelled } if (--n == 0) //没有带处理任务 returnfalse; } if (task.status < 0) returnfalse; } } returntrue; }
/** * Submits a ForkJoinTask for execution. * * @param task the task to submit * @param <T> the type of the task's result * @return the task * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ //提交一个ForkJoinTask执行 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task){ if (task == null) thrownew NullPointerException(); externalPush(task); return task; }
/** * Tries to add the given task to a submission queue at * submitter's current queue. Only the (vastly) most common path * is directly handled in this method, while screening for need * for externalSubmit. * * @param task the task. Caller must ensure non-null. */ //该方法是在当前线程为非工作线程插入任务时调用。如果当前线程为任务线程, // 则直接将任务插入当前线程中的工作队列中;如果当前线程不是任务线程,则需要从池中先随机定位到一个工作线程,然后将任务插入。 finalvoidexternalPush(ForkJoinTask<?> task){ WorkQueue[] ws; WorkQueue q; int m; //取得一个随机探查数,可能为0也可能为其它数 int r = ThreadLocalRandom.getProbe();
/** * Full version of externalPush, handling uncommon cases, as well * as performing secondary initialization upon the first * submission of the first task to the pool. It also detects * first submission by an external thread and creates a new shared * queue if the one at index if empty or contended. * * @param task the task. Caller must ensure non-null. */ privatevoidexternalSubmit(ForkJoinTask<?> task){ int r; // initialize caller's probe // 初始化探测器 if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); }
for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false;
/** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. */ //该方法会初始化或者进行队列2倍扩容 final ForkJoinTask<?>[] growArray() { ForkJoinTask<?>[] oldA = array;
/** * Tries to create or activate a worker if too few are active. * * @param ws the worker array to use to find signallees * @param q a WorkQueue --if non-null, don't retry if now empty */ //如果工作者太少活跃的话,则尝试新建或激活工作者 finalvoidsignalWork(WorkQueue[] ws, WorkQueue q){ long c; int sp, i; WorkQueue v; Thread p; //这里初始化后的ctl是个long型负数,但是转换成int后,变成0 while ((c = ctl) < 0L) { // too few active if ((sp = (int)c) == 0) { // no idle workers if ((c & ADD_WORKER) != 0L) // too few workers //初始状态,会调用这个方法,调用完退出 tryAddWorker(c); break; } if (ws == null) // unstarted/terminated break; if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; } if (q != null && q.base == q.top) // no more work break; } }
/** * Tries to construct and start one worker. Assumes that total * count has already been incremented as a reservation. Invokes * deregisterWorker on any failure. * * @return true if successful */ privatebooleancreateWorker(){ ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { if (fac != null && (wt = fac.newThread(this)) != null) { wt.start(); returntrue; } } catch (Throwable rex) { ex = rex; } deregisterWorker(wt, ex); returnfalse; }
/** * Top-level runloop for workers, called by ForkJoinWorkerThread.run. */ finalvoidrunWorker(WorkQueue w){ //分配工作队列,初始化或者扩容 w.growArray(); // allocate queue int seed = w.hint; // initially holds randomization hint //准备偷取的队列索引 int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask<?> t;;) { if ((t = scan(w, r)) != null) //调用scan尝试去偷取一个任务,然后调用runTask或者awaitWork w.runTask(t); elseif (!awaitWork(w, r)) //如果任务中断,返回false break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
/** * Scans for and tries to steal a top-level task. Scans start at a * random location, randomly moving on apparent contention, * otherwise continuing linearly until reaching two consecutive * empty passes over all queues with the same checksum (summing * each base index of each queue, that moves on each steal), at * which point the worker tries to inactivate and then re-scans, * attempting to re-activate (itself or some other worker) if * finding a task; otherwise returning null to await work. Scans * otherwise touch as little memory as possible, to reduce * disruption on other scanning threads. * * @param w the worker (via its WorkQueue) * @param r a random seed * @return a task, or null if none found */ //扫描并尝试窃取顶级任务。 private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { //任务队列列表不为空 int ss = w.scanState; // initially non-negative for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; if ((q = ws[k]) != null) { if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty 有任务
//获取索引。在扫描任务时,从工作队列的尾部进行扫描 long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { //指定索引处的任务不空且没有其他线程并行操作 if (ss >= 0) { if (U.compareAndSwapObject(a, i, t, null)) { q.base = b + 1; if (n < -1) // signal others signalWork(ws, q); return t; } } elseif (oldSum == 0 && // try to activate w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; }
//如果我们遍历了一圈(((k = (k + 1) & m) == origin))都没有偷到,我们就认为当前的active 线程过剩了,我们准备将当前的线程(即owner)挂起 if ((k = (k + 1) & m) == origin) { // continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } returnnull; }
/** * Callback from ForkJoinWorkerThread constructor to establish and * record its WorkQueue. * * @param wt the worker thread * @return the worker's queue */ final WorkQueue registerWorker(ForkJoinWorkerThread wt){ UncaughtExceptionHandler handler; //将工作线程设置为后台线程 wt.setDaemon(true); // configure thread
//设置异常处理类 if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler);
//创建工作队列 WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index //确定工作队列的类型 int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array
//池中有工作队列列表时,将新创建的工作队列添加进去。在添加的过程中,如果池中有工作队列列表容量不够,则需要进行2倍扩容操作 if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }