java.util.concurrent.ThreadPoolExecutor 源码学习
先贴一段java.util.concurrent 包里的ThreadPoolExecutor实现方法,有一个值得深究的点,ct1 为AtomicInteger 32位,记录线程池中的线程数
ct1的高三位用于记录线程池状态,低29位用于记录线程数,高三位用于记录当前线程池的状态,具体定义如下Running ShutDown Stop
其中runStateOf(int c) 换言之是取ctl 的高三位, int workerCountOf(int c) 是取ctl的低29位 用于计数
线程状态
RUNNING:接受新任务并且处理阻塞队列里的任务
SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
TERMINATED:终止状态。terminated方法调用完成以后的状态
线程池状态转换
1 | RUNNING -> SHUTDOWN |
线程池状态标记源码
1 | //记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量) |
线程池构造函数
参数 | 类型 | 含义 |
---|---|---|
corePoolSize | int | 核心线程数 |
maximumPoolSize | int | 最大线程数 |
keepAliveTime | long | 存活时间 |
unit | TimeUnit | 时间单位 |
workQueue | BlockingQuene | 存放线程的队列 |
threadFactory | ThreadFactory | 创建线程的工厂 |
handler | RejectedExecutionHandler | 多余的线程处理器 |
execute函数
1 | public void execute(Runnable command) { |
addwork函数
addWorker方法主要先查询线程池是否能够新增,包括线程池状态检查,以及线程池数量检查,通过检查之后,会将当前的Runnable 扔入workers队列并执行
1) 原子增加线程池个数
内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,如果变了,则重新进入外层循环重新获取线程池状态,否者进入内层循环继续进行cas尝试。
2) 将任务添加至workder里执行 真正的创建线程在 new Worker里面,并且会加锁江Workers对象扔进Workers里,添加成功之后,在调用Thread.start启动
到了第二部分说明CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可以使用并发安全的set,但是性能没有独占锁好(这个从注释中知道的)。这里需要注意的是要在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
Worker对象
Worker是定义在ThreadPoolExecutor中的finnal类,其中继承了AbstractQueuedSynchronizer类和实现Runnable接口,其中的run方法如下
1 | public void run() { |
关闭线程池
调用shutdown方法之后,线程池不会再接受新的任务,然后会将先前队列中的任务执行完成
1 | /** |
停止所有任务
shutdownNow 立即停止所有的执行任务,并将任务队列中的任务返回
1 | public List<Runnable> shutdownNow() { |
线程分配的原则
根据任务类型分配,如果CPU密集型任务,线程数最多分配为CPU个数相当的大小,IO密集型的任务可以较多的分配线程数。
并发执行引擎封装实例
在公用线程池执行多个任务的单元,由execute发布任务到执行引擎并且执行
基础接口
1 | /** |
上层实现类
上层 ParallelEngineImpl 任务到这里分流,根据config选择是串行执行还是并行执行,扔到不同的线程池执行
1 | public class ParallelEngineImpl implements ParallelEngine { |
并发线程池
线程池引擎,实际上就是对ThreadPoolExecutor做了封装,包含了线程池参数设置,更新,以及execute,换言之也是整个并行执行引擎的最底部,所有其他上层的并行执行引擎最终都由 ThreadPoolEngine 执行
1 | public class ThreadPoolEngine implements ParallelEngine { |
串行线程池
串行备份引擎,重写了execute方法, 如果并行执行返回false ,加入到串行备份引擎执行队列,串行执行作为兜底
1 | public class SerialEngine implements ParallelEngine { |