并发执行引擎

java.util.concurrent.ThreadPoolExecutor 源码学习

先贴一段java.util.concurrent 包里的ThreadPoolExecutor实现方法,有一个值得深究的点,ct1 为AtomicInteger 32位,记录线程池中的线程数
ct1的高三位用于记录线程池状态,低29位用于记录线程数,高三位用于记录当前线程池的状态,具体定义如下Running ShutDown Stop
其中runStateOf(int c) 换言之是取ctl 的高三位, int workerCountOf(int c) 是取ctl的低29位 用于计数

线程状态

RUNNING:接受新任务并且处理阻塞队列里的任务
SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
TERMINATED:终止状态。terminated方法调用完成以后的状态

线程池状态转换

1
2
3
4
5
6
7
8
9
10
RUNNING -> SHUTDOWN
显式调用shutdown()方法, 或者隐式调用了finalize()方法
(RUNNING or SHUTDOWN) -> STOP
显式调用shutdownNow()方法
SHUTDOWN -> TIDYING
当线程池和任务队列都为空的时候
STOP -> TIDYING
当线程池为空的时候
TIDYING -> TERMINATED
当 terminated() hook 方法执行完成时候

线程池状态标记源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程数量统计位数29 Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
//容量 000 11111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

//运行中 111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//关闭 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//停止 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//整理 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//终止 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;

//获取运行状态(获取前3位)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取线程个数(获取后29位)
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池构造函数

参数 类型 含义
corePoolSize int 核心线程数
maximumPoolSize int 最大线程数
keepAliveTime long 存活时间
unit TimeUnit 时间单位
workQueue BlockingQuene 存放线程的队列
threadFactory ThreadFactory 创建线程的工厂
handler RejectedExecutionHandler 多余的线程处理器

execute函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void execute(Runnable command) {
//传进来的线程为null,则抛出空指针异常
if (command == null)
throw new NullPointerException();

//获取当前线程池的状态+线程个数变量
int c = ctl.get();
/**
* 3个步骤
*/
//1.判断当前线程池线程个数是否小于corePoolSize,小于则调用addWorker方法创建新线程运行,且传进来的Runnable当做第一个任务执行。
//如果调用addWorker方法返回false,则直接返回
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

//2.如果线程池处于RUNNING状态,则添加任务到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {

//二次检查
int recheck = ctl.get();
//如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);

//否者如果当前线程池线程空,则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3.新增线程,新增失败则执行拒绝策略 由 maximumPoolSize指定
else if (!addWorker(command, false))
reject(command);
}

addwork函数

addWorker方法主要先查询线程池是否能够新增,包括线程池状态检查,以及线程池数量检查,通过检查之后,会将当前的Runnable 扔入workers队列并执行
1) 原子增加线程池个数
内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,如果变了,则重新进入外层循环重新获取线程池状态,否者进入内层循环继续进行cas尝试。
2) 将任务添加至workder里执行 真正的创建线程在 new Worker里面,并且会加锁江Workers对象扔进Workers里,添加成功之后,在调用Thread.start启动
到了第二部分说明CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可以使用并发安全的set,但是性能没有独占锁好(这个从注释中知道的)。这里需要注意的是要在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 检查当前线程池状态是否是SHUTDOWN、STOP、TIDYING或者TERMINATED
// 且!(当前状态为SHUTDOWN、且传入的任务为null,且队列不为null)
// 条件都成立则返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//循环
for (;;) {
int wc = workerCountOf(c);
//如果当前的线程数量超过最大容量或者大于(根据传入的core决定是核心线程数还是最大线程数)核心线程数 || 最大线程数,则返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加c,成功则跳出retry
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS失败执行下面方法,查看当前线程数是否变化,变化则继续retry循环,没变化则继续内部循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//CAS成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {

//重新检查线程池状态
//避免ThreadFactory退出故障或者在锁获取前线程池被关闭
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 先检查线程是否是可启动的
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//判断worker是否添加成功,成功则启动线程,然后将workerStarted设置为true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//判断线程有没有启动成功,没有则调用addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Worker对象

Worker是定义在ThreadPoolExecutor中的finnal类,其中继承了AbstractQueuedSynchronizer类和实现Runnable接口,其中的run方法如下

1
2
3
public void run() {
runWorker(this);
}

关闭线程池

调用shutdown方法之后,线程池不会再接受新的任务,然后会将先前队列中的任务执行完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全性检查
checkShutdownAccess();
// 先改动标记为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有workders
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

停止所有任务

shutdownNow 立即停止所有的执行任务,并将任务队列中的任务返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

线程分配的原则

根据任务类型分配,如果CPU密集型任务,线程数最多分配为CPU个数相当的大小,IO密集型的任务可以较多的分配线程数。

并发执行引擎封装实例

在公用线程池执行多个任务的单元,由execute发布任务到执行引擎并且执行

基础接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* ParallelEngine executes tasks in a common thread pool.
*/
public interface ParallelEngine {

/**
* submit task to parallel engine and execute it.
*
* @param taskName 任务名
* @param callable callable
* @param <T> T
* @return
*/
<T> CruzeCompletableFuture<T> execute(String taskName, Callable<T> callable);

class CruzeCompletableFuture<T> extends CompletableFuture<T> {

public T get() {
try {
return super.get();
} catch (Exception e) {
throw new RuntimeException("failed to get result from ParallelEngine", e);
}
}

public T get(long timeout, TimeUnit unit) {
try {
return super.get(timeout, unit);
} catch (Exception e) {
throw new RuntimeException("failed to get result from ParallelEngine", e);
}
}
}
}

上层实现类

上层 ParallelEngineImpl 任务到这里分流,根据config选择是串行执行还是并行执行,扔到不同的线程池执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ParallelEngineImpl implements ParallelEngine {

private Logger logger = LoggerFactory.getLogger(getClass());
private volatile ParallelEngineConfig config;
private volatile ThreadPoolEngine threadPoolEngine;
private SerialEngine serialEngine;

public ParallelEngineImpl(String path) {
this(ParallelEngineConfig.load(ConfigCache.getInstance().getProperty(path)));
ConfigCache.getInstance().addChange((k, v) -> {
if (path.equals(k)) {
try {
ParallelEngineConfig config = ParallelEngineConfig.load(v);
// create or update thread pool before update config
if (config.isParallel()) {
if (threadPoolEngine == null) {
threadPoolEngine = new ThreadPoolEngine(config);
} else {
threadPoolEngine.update(config);
}
}
this.config = config;
} catch (Exception e) {
logger.error("failed to parse ParallelEngine config", e);
}
}
});
}

// 对 ThreadPoolEngine 又封装了一层
public ParallelEngineImpl(ParallelEngineConfig config) {
this.config = config;
if (config.isParallel()) {
threadPoolEngine = new ThreadPoolEngine(config);
}
serialEngine = new SerialEngine();
}

// 如果config设置为串行执行,那调用串行执行引擎执行,如果为并行执行,调用并行执行引擎执行
@Override
@CheckInterrupt
public <T> CruzeCompletableFuture<T> execute(String taskName, Callable<T> callable) {
if (config.isParallel()) {
return threadPoolEngine.execute(taskName, callable);
} else {
return serialEngine.execute(taskName, callable);
}
}
}

并发线程池

线程池引擎,实际上就是对ThreadPoolExecutor做了封装,包含了线程池参数设置,更新,以及execute,换言之也是整个并行执行引擎的最底部,所有其他上层的并行执行引擎最终都由 ThreadPoolEngine 执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ThreadPoolEngine implements ParallelEngine {

// 线程池
private ThreadPoolExecutor executor;

// 设置线程池参数
public ThreadPoolEngine(ParallelEngineConfig config) {
this.executor = new ThreadPoolExecutor(
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveTime(),
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(config.getCapacity()),
new ThreadFactoryBuilder().setNameFormat("parallel-engine-%d").build()
);
}

// 更新线程池参数
public void update(ParallelEngineConfig config) {
this.executor.setCorePoolSize(config.getCorePoolSize());
this.executor.setMaximumPoolSize(config.getMaximumPoolSize());
this.executor.setKeepAliveTime(config.getKeepAliveTime(), TimeUnit.SECONDS);
}

// 添加task至线程池并执行task
@Override
public <T> CruzeCompletableFuture<T> execute(String taskName, Callable<T> callable) {
// check whether main thread is sampled by Cat
boolean isDiscardable = Cat.getManager().getThreadLocalMessageTree().isDiscardable();
boolean isHitSample = Cat.getManager().getThreadLocalMessageTree().isHitSample();
String transactionName = RecGlobalContextUtils.getFlowGroupName() + "." + taskName;
ForkedTransaction transaction = Cat.newForkedTransaction("Parallel", transactionName);

CruzeCompletableFuture<T> future = new CruzeCompletableFuture<>();
executor.execute(() -> {
try {
// create forked transaction and join main thread
transaction.fork();
Cat.getManager().getThreadLocalMessageTree().setDiscardable(isDiscardable);
Cat.getManager().getThreadLocalMessageTree().setHitSample(isHitSample);
// execute supplier
T result = callable.call();
// set status and result
transaction.setStatus(Message.SUCCESS);
future.complete(result);
} catch (Throwable throwable) {
transaction.setStatus(throwable);
future.completeExceptionally(throwable);
} finally {
transaction.complete();
}
});
return future;
}
}

串行线程池

串行备份引擎,重写了execute方法, 如果并行执行返回false ,加入到串行备份引擎执行队列,串行执行作为兜底

1
2
3
4
5
6
7
8
9
10
11
12
13
public class SerialEngine implements ParallelEngine {

@Override
public <T> CruzeCompletableFuture<T> execute(String taskName, Callable<T> callable) {
CruzeCompletableFuture<T> future = new CruzeCompletableFuture<>();
try {
future.complete(callable.call());
} catch (Throwable e) {
future.completeExceptionally(e);
}
return future;
}
}