本文共 8150 字,大约阅读时间需要 27 分钟。
这篇文章是我在阅读源码时整理的一些笔记,对源码的关键点进行了比较详细的注释,然后加上一些自己对线程池机制的理解。最终目的是要弄清楚下面这些问题:
首先需要介绍一下线程池的两个重要成员:
AtomInteger 类型。高3位存储线程池状态,低29位存储当前线程数量。workerCountOf(c) 返回当前线程数量。runStateOf(c) 返回当前线程池状态。 线程池有如下状态:
这个线程在线程池中的包装类。一个 Worker 代表一个线程。线程池用一个 HashSet 管理这些线程。
需要注意的是,Worker 本身并不区分核心线程和非核心线程,核心线程只是概念模型上的叫法,特性是依靠对线程数量的判断来实现的 Worker 特性如下:
submit 返回一个 Future 对象,我们可以调用其 get 方法获取任务执行的结果。代码很简单,就是将 Runnable 包装成 FutureTask 而已。可以看到,最终还是调用 Execute 方法:
public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask;}复制代码
FutureTask 的代码就不贴了,简述一下原理:
这个机制大家应该都很熟了,再简述一遍:
具体的代码分析如下:
int c = ctl.get();if (workerCountOf(c) < corePoolSize) { //小于核心线程数 if (addWorker(command, true)) //启动核心线程并执行任务 return; c = ctl.get(); //执行失败时重新获取值}if (isRunning(c) && workQueue.offer(command)) { //检查运行状态并将任务添加到队列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //重新检查,防止状态有变化。如果有,移出队列并拒绝任务 reject(command); else if (workerCountOf(recheck) == 0) //如果线程数为0,创建非核心线程,第一个参数为空时会从队列中取任务执行 addWorker(null, false);}else if (!addWorker(command, false)) //添加到队列失败,说明队列已满,创建非核心线程执行任务 reject(command); //执行失败说明达到最大线程数,拒绝任务复制代码
线程池使用 addWorker 方法新建线程,第一个参数代表要执行的任务,线程会将这个任务执行完毕后再从队列取任务执行。第二参数是核心线程的标志,它并不是 Worker 本身的属性,在这里只用来判断工作线程数量是否超标。
这个方法可以分成两部分,第一部分进行一些前置判断,并使用循环 CAS 结构将线程数量加1。代码如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: //这个语法不常用,用于给外层 for 循环命名。方便嵌套 for 循环中,break 和 continue 指定是外层还是内层循环 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // firstTask 不为空代表这个方法用于添加任务,为空代表新建线程。SHUTDOWN 状态下不接受新任务,但处理队列中的任务。这就是第二个判断的逻辑。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 使用循环 CAS 自旋,增加线程数量直到成功为止 for (;;) { int wc = workerCountOf(c); //判断是否超过线程容量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //使用 CAS 将线程数量加1 if (compareAndIncrementWorkerCount(c)) break retry; //修改不成功说明线程数量有变化 //重新判断线程池状态,有变化时跳到外层循环重新获取线程池状态 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; //到这里说明状态没有变化,重新尝试增加线程数量 } } ... ...}复制代码
第二部分负责新建并启动线程,并将 Worker 添加至 Hashset 中。代码很简单,没什么好注释的,用了 ReentrantLock 确保线程安全。
boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //这个参数是测试用的,不用管它 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } }} finally { if (! workerStarted) addWorkerFailed(w); //添加失败时移除 Worker 并将线程数量减 1}return workerStarted;}复制代码
在 addWorker 方法中,线程会被启动。新建线程时,Worker 将自身传入,所以线程启动后会执行 Worker 的 run 方法,这个方法调用了 ThreadPoolExecutor 的 runWorker 方法执行任务,runWorker 中会循环取任务执行,执行逻辑如下:
具体代码分析如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; //task 为我们传给 execute 的任务。task 为空时从队列中取任务执行 try { while (task != null || (task = getTask()) != null) { w.lock(); //这段逻辑非常绕。实际上它实现了以下逻辑: //1.如果线程池已停止且线程未中断,条件成立,中断线程 //2.如果线程池未停止,线程为中断状态,将线程状态重置,并重新进行1的判断 //3.如果线程池未停止,线程不为中断状态,条件不成立 //Thread.interrupted() 会重置中断状态,保证 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); //beforeExecute 和 afterExecute 为空方法,交给子类实现 try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); //执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //执行到这里时说明线程执行完毕,此方法将线程从 HashSet 中移出。线程终止且没有引用,会被自动回收。 processWorkerExit(w, completedAbruptly); }}复制代码
在 runWorker 方法中 getTask 方法返回 null 之后会导致线程执行完毕,被移除出 HashSet,从而被系统销毁。 线程的超时机制也是在这个方法实现的,借助于 BlockingQueue 的 poll 和 take 方法。
超时机制实现原理如下:
具体代码如下:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 允许核心线程超时或者线程数大于核心线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // timed && timedOut 这两个参数结合起来控制超时机制 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 队列为空时,poll 方法会阻塞等待,超过 keepAliveTime 时返回空值。take 方法会直接返回异常。 // 当 allowCoreThreadTimeOut 为 true 时,核心线程和非核心线程没有区别,一律调用poll方法 // 当 allowCoreThreadTimeOut 为 false 时,线程数量超过核心线程数才会进入超时机制,如果不超过,则将当前线程当作核心线程处理,调用 take,抛出异常后进入下一次循环。如果队列为空,此处会一直循环。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}复制代码
转载地址:http://jebfa.baihongyu.com/