Appearance
Java 基础:并发
1. 初识线程
启动一个线程的简单示例:
Java
Runnable r = () -> {
try {
for (int i = 0; i < STEPS; i++) {
double amount = MAX_AMOUNT * Math.random();
bank.transfer(0, 1, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
} catch (InterruptedException e) { } // the sleep method threatens to throw.
};
var t = new Thread(r);
t.start(); // note: not run method1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
2. 线程属性
2.1. 线程的状态
线程可以有如下 6 种状态:
New
当用
new操作符创建一个新线程时,如new Thread(r),这个线程还没有开始运行;Runnable
一旦调用
start方法,线程就处于可运行(runnable)状态。在任何给定时刻,一个可运行的线程可能正在运行也可能没有运行(正是因为这样,这个状态称为 “可运行” 而不是 “运行”);Blocked
当一个线程试图获取一个内部的对象锁,而这个锁目前被其他线程占有,该线程就会被阻塞。当所有其他线程都释放了这个锁,并且线程调度器允许该线程持有这个锁时,它将变成非阻塞状态;
Waiting
当线程等待另一个线程通知调度器出现一个条件时,这个线程会进入等待状态。调用
Object.wait方法或Thread.join方法,或者是等待java.util.concurrent库中的Lock或Condition时,就会出现这种情况。实际上,阻塞状态与等待状态并没有太大区别;Timed waiting
有几个方法有超时参数,调用这些方法会让线程进入计时等待状态。这一状态将一直保持到超时期满或者接收到适当的通知。带有超时参数的方法有
Thread.sleep和计时版的Object.wait、Thread.join、Lock.tryLock以及Condition.await;Terminated
线程会由于以下两个原因之一而终止:
run方法正常退出,线程自然终止;- 因为一个没有捕获的异常终止了
run方法,使线程意外终止;
具体来说,可以调用线程的
stop方法杀死一个线程。该方法抛出一个ThreadDeath错误对象,这会杀死线程。不过,stop方法已经废弃,不要在你自己的代码中调用这个方法。
要确定一个线程的当前状态,只需要调用 getState 方法。
2.2. 中断线程
除了已经废弃的 stop 方法,没有办法可以强制线程终止。不过,interrupt 方法可以用来请求终止一个线程。当对一个线程调用 interrupt 方法时,就会设置线程的中断状态。这是每个线程都有的标志。每个线程都应该不时地检查这个标志,以判断线程是否被中断。
要想知道是否设置了中断状态,首先调用静态的 Thread.currentThread 方法获得当前线程,然后调用 isInterrupted 方法。但是,如果线程被阻塞,就无法检查中断状态。这里就要引入 InterruptedException 异常。当在一个被 sleep 或 wait 调用阻塞的线程上调用 interrupt 方法时,那个阻塞调用(即 sleep 或 wait 调用)将被一个 InterruptedException 异常中断(有一些阻塞 I/O 调用不能被中断,对此应该考虑选择可中断的调用)。
通常线程希望将中断解释为一个终止请求,这种线程的 run 方法具有如下形式:
Java
Runnable r = () -> {
try {
// ...
while (!Thread.currentThread().isInterrupted() && more work to do) {
// do more work
}
} catch (InterruptedException e) {
// thread was interrupted during sleep or wait
} finally {
// cleanup, if required
}
// exiting the run method terminates the thread
};1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
如果在每次工作迭代之后都调用 sleep 方法(或者其他可中断方法),isInterrupted 检查既没有必要也没有用处。如果设置了中断状态,此时倘若调用 sleep 方法,它不会休眠。实际上,它会清除中断状态并抛出 InterruptedException。因此,如果你的循环调用了 sleep,不要检测中断状态,而应当捕获 InterruptedException 异常,如下所示:
Java
Runnable r = () -> {
try {
// ...
while (more work to do) {
// do more work
Thread.sleep(delay);
}
} catch (InterruptedException e) {
// thread was interrupted during sleep or wait
} finally {
// cleanup, if required
}
// exiting the run method terminates the thread
};1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
Note:有两个非常类似的方法,
interrupted和isInterrupted。interrupted方法是一个静态方法,它检查当前线程是否被中断。而且,调用interrupted方法会清除该线程的中断状态。另一方面,isInterrupted方法是一个实例方法,可以用来检查是否有线程被中断状态。
2.3. 守护线程
可以通过调用:
Java
t.setDaemon(true);将一个线程转换为守护线程(daemon thread)。这样一个线程并没有什么魔力。守护线程的唯一用途是为其他线程提供服务。计时器线程就是一个例子,它定时地发送 “计时器嘀嗒” 信号给其他线程,另外清空过时缓存项的线程也是守护线程。当只剩下守护线程时,虚拟机就会退出。因为如果只剩下守护线程,就没必要继续运行程序了。
2.4. 未捕获异常的处理器
线程中抛出的异常会传递到一个用于处理未捕获异常的处理器。这个处理器必须属于一个实现了 Thread.UncaughtExceptionHandler 接口的类。这个接口只有一个方法:
Java
void uncaughtException(Thread t, Throwable e)可以用 setUncaughtExceptionHandler 方法为任何线程安装一个处理器。也可以用 Thread 类的静态方法 setDefaultUncaughtExceptionHandler 为所有线程安装一个默认的处理器。如果没有为单个线程安装处理器,那么处理器就是该线程的 ThreadGroup 对象。
Note:线程组是可以一起管理的线程的集合。默认情况下,创建的所有线程都属于同一个线程组,但是也可以建立其他的组。由于现在引入了更好的特性来处理线程集合,所以建议不要在你自己的程序中使用线程组。
ThreadGroup 类实现了 Thread.UncaughtExceptionHandler 接口。它的 uncaughtException 方法执行以下操作:
- 如果该线程组有父线程组,那么调用父线程组的
uncaughtException方法; - 否则,如果
Thread.getDefaultUncaughtExceptionHandler方法返回一个非null的处理器,则调用该处理器; - 否则,如果
Throwable是ThreadDeath的一个实例,什么都不做; - 否则,将线程的名字以及
Throwable的栈轨迹输出到system.err;
2.5. 线程优先级
在 Java 程序设计语言中,每一个线程有一个优先级。默认情况下,一个线程会继承构造它的那个线程的优先级。可以用 setPriority 方法提高或降低任何一个线程的优先级。可以将优先级设置为 MIN_PRIORITY(在 Thread 类中定义为 1)与 MAX_PRIORITY(定义为 10)之间的任何值。NORM_PRIORITY 定义为 5。
每当线程调度器有机会选择新线程时,它首先选择具有较高优先级的线程。但是,线程优先级高度依赖于系统。当虚拟机依赖于宿主机平台的线程实现时,Java 线程的优先级会映射到宿主机平台的优先级,平台的线程优先级别可能比上述的 10 个级别多,也可能更少。例如,Windows 有 7 个优先级别。Java 的一些优先级会映射到同一个操作系统优先级。在 Oracle 为 Linux 提供的 Java 虚拟机中,会完全忽略线程优先级 —— 所有线程都有相同的优先级。
Warning:在没有使用操作系统线程的 Java 早期版本中,线程优先级可能很有用。不过现在不要使用线程优先级了。
3. 同步
3.1. 锁对象
Java 语言提供了两种机制防止并发访问代码块:
synchronized关键字;- Java 5 引入的
ReentrantLock类;
用 ReentrantLock 保护代码块的基本结构如下:
Java
myLock.lock(); // a ReentrantLock object
try {
// critical section
} finally {
myLock.unlock(); // make sure the lock is unlocked even if an exception is thrown
}1
2
3
4
5
6
2
3
4
5
6
ReentrantLock 锁称为重入(reentrant)锁,因为线程可以反复获得已拥有的锁。锁有一个持有计数(hold count)跟踪对 lock 方法的嵌套调用。线程每一次调用 lock 后都要调用 unlock 来释放锁。由于这个特性,被一个锁保护的代码可以调用另一个使用相同锁的方法。
3.2. 条件对象
通常,线程进入临界区后却发现只有满足了某个条件之后它才能执行。可以使用一个条件对象来管理那些已经获得了一个锁却不能做有用工作的线程。由于历史原因,条件对象经常又被称为条件变量(conditional variable)。
一个锁对象可以有一个或多个相关联的条件对象。可以用 newCondition 方法获得一个条件对象:
Java
class Bank {
private var bankLock = new ReentrantLock();
private Condition sufficientFunds;
// ...
public Bank() {
// ...
sufficientFunds = bankLock.newCondition();
}
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
调用 await 方法时当前线程暂停并放弃锁:
Java
sufficientFunds.await();直到另外一个线程在同一条件上调用 signalAll/signal 方法:
Java
sufficientFunds.signalAll();signalAll 会重新激活等待这个条件的所有线程。当这些线程从等待集中移出时,它们再次成为可运行的线程,调度器最终将再次将它们激活。同时,它们会尝试重新进入该对象。一旦锁可用,它们中的某个线程将从 await 调用返回,得到这个锁,并从之前暂停的地方继续执行。
Tip
注意
signalAll调用不会立即激活一个等待的线程。它只是解除等待线程的阻塞,使这些线程可以在当前线程释放锁之后竞争访问对象。另一个方法
signal只是随机选择等待集中的一个线程,并解除这个线程的阻塞状态。这比解除所有线程的阻塞更高效,但也存在危险。如果随机选择的线程发现自己仍然不能运行,它就会再次阻塞。如果没有其他线程再次调用signal,系统就会进入死锁。
Warning:只有当线程拥有一个条件的锁时,它才能在这个条件上调用
await、signalAll或signal方法。
此时,线程应当再次测试条件。不能保证现在一定满足条件 —— signalAll 方法仅仅是通知等待的线程。通常,await 调用应该放在如以下形式的循环中:
Java
while (!(OK to proceed))
condition.await();1
2
2
3.3. synchronized 关键字
从 1.0 版开始,Java 中的每个对象都有一个内部锁。如果一个方法声明时有 synchronized 关键字,那么对象的锁将保护整个方法。也就是说:
Java
public synchronized void method() {
// method body
}1
2
3
2
3
等价于:
Java
public void method() {
this.intrinsicLock.lock();
try {
// method body
} finally { this.intrinsicLock.unlock(); }
}1
2
3
4
5
6
2
3
4
5
6
使用 synchronized 可以让我们的代码保持简洁,但也会存在一些限制条件:
- 无法中断一个正在尝试获得锁的线程;
- 不能指定尝试获得锁时的超时时间;
- 内部对象锁只有一个关联条件;
wait 方法将一个线程增加到等待集中,notifyAll/notify 方法可以解除等待线程的阻塞。换句话说,调用 wait 或 notifyAll 等价于:
Java
intrinsicCondition.await();
intrinsicCondition.signalAll();1
2
2
Note:没错这里是 “notify”、“notifyAll” 方法而不是 “signal”、“signalAll”。
将静态方法声明为同步也是合法的。如果调用这样一个方法,它会获得相关类对象的内部锁。例如,如果 Bank 类有一个静态同步方法,那么当调用这个方法时,Bank.class 对象的锁会锁定。因此,没有其他线程可以调用这个类的该方法或任何其他同步静态方法。
除了可以将方法声明为 synchronized,我们也可以将一段代码标记为 synchronized,即同步块:
Java
synchronized (obj) { // this is the syntax for a synchronized
// critical section
}1
2
3
2
3
它会获得 obj 的锁。有时我们会特地声明这样一个对象:
Java
public class Bank {
private double[] accounts;
private var lock = new Object();
// ...
public void transfer(int from, int to, int amount) {
synchronized (lock) { // an ad-hoc lock
accounts[from] -= amount;
accounts[to] += amount;
}
System.out.println(...);
}
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
使用一个对象的锁来实现额外的原子操作,这种做法称为客户端锁定(client-side locking)。例如,Vector 的方法是同步的,但在一个地方调用 get 方法之后,仍然可能会有另外一个线程紧随其后在该位置写入另外一个值。幸运的是,我们可以通过 synchronized 截获这个锁:
Java
public void transfer(Vector<Double> accounts, int from, int to, int amount) {
synchronized (accounts) {
accounts.set(from, accounts.get(from) - amount);
accounts.set(to, accounts.get(to) + amount);
}
System.out.println(...);
}1
2
3
4
5
6
7
2
3
4
5
6
7
这要求 Vector 类对自己的所有更改方法都使用该内部锁。不过,Vector 类的文档没有给出这样的承诺。所以,客户端锁定是非常脆弱的,通常不推荐使用。
3.4. volatile 字段
如果一个字段可以同时被多个线程读写,那么该字段的值被修改后可能不会立即对其它线程可见,因为:
- 有多处理器的计算机能够暂时在寄存器或本地内存缓存中保存内存值。其结果是,运行在不同处理器上的线程可能看到同一个内存位置有不同的值;
- 编译器可以改变指令执行的顺序以使吞吐量最大化。编译器不会选择可能改变代码语义的顺序,但是编译器有一个假定,认为内存值只在代码中有显式的修改指令时才会改变。然而,内存值有可能被另一个线程改变;
通过锁机制可以避免这个问题:
Java
private boolean done;
public synchronized boolean isDone() { return done; }
public synchronized void setDone() { done = true; }1
2
3
2
3
因为,编译器被要求在必要的时候刷新本地缓存来支持锁,而且不能不相应地重新排列指令顺序。详细的解释见 JSR 133 的 Java 内存模型和线程规范(参看 https://www.jcp.org/en/jsr/detail?id=133)。该规范的大部分内容都很复杂而且技术性很强,但是文档中也包含了很多解释得很清晰的例子。https://www.ibm.com/developerworks/library/jjtp02244 提供了 Brian Goetz 写的一个更易懂的概述文章。
不过,在这种情况下,将字段声明为 volatile 更合适:
Java
private volatile boolean done;
public boolean isDone() { return done; }
public void setDone() { done = true; }1
2
3
2
3
编译器会插入适当的代码,以确保如果一个线程对 done 变量做了修改,这个修改对读取这个变量的所有其他线程都可见。
3.5. final 变量
通过上面我们可以了解到,除非使用锁或 volatile 修饰符,否则无法从多个线程安全地读取一个字段。不过,还有一种情况可以安全地访问一个共享字段,即这个字段声明为 final 时:
Java
final var accounts = new HashMap<String, Double>();其他线程会在构造器完成构造之后才看到这个 accounts 变量。如果不使用 final,就不能保证其他线程看到的是更新后的值,它们可能都只是看到 null,而不是新构造的 HashMap。
3.6. 原子性
java.util.concurrent.atomic 包中有很多类使用了很高效的机器级指令(而没有使用锁)来保证其他操作的原子性。例如,AtomicInteger 类提供了方法 incrementAndGet 和 decrementAndGet,它们分别以原子方式将一个整数进行自增或自减。例如,可以安全地生成一个数值序列,如下所示:
Java
public static AtomicLong nextNumber = new AtomicLong();
// in some thread ...
long id = nextNumber.incrementAndGet();1
2
3
2
3
如果需要完成一些更复杂的更新,就可能需要使用到 compareAndSet、updateAndGet 或 accumulateAndGet 等方法:
Java
largest.updateAndGet(x -> Math.max(x, observed));
// or
largest.accumulateAndGet(observed, Math::max);1
2
3
2
3
Note:
compareAndSet是原生机器级指令就支持的,而updateAndGet、accumulateAndGet内部做了一些简单的封装,采用了乐观更新的方式进行实现的,参数中的函数式接口有可能会被调用多次。
如果有大量线程要调用上述类似的方法,性能会大幅下降,因为乐观更新需要太多次重试。LongAdder 和 LongAccumulator 类解决了这个问题。LongAdder 包括多个变量(加数),其总和为当前值。可以有多个线程更新不同的加数,线程个数增加时会自动提供新的加数。通常情况下,只有当所有工作都完成之后才需要总和的值,对于这种情况,这种方法会很高效,性能会有显著的提升。如果预期可能存在大量竞争,只需要使用 LongAdder 而不是 AtomicLong。方法名稍有区别。要调用 increment 让计数器自增,或者调用 add 来增加一个量,另外调用 sum 来获取总和:
Java
var adder = new LongAdder();
for (...)
pool.submit(() -> {
while (...) {
...
if (...) adder.increment();
}
});
...
long total = adder.sum();1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
3.7. 线程局部变量
可以使用 ThreadLocal 辅助类为各个线程提供各自的实例。例如,SimpleDateFormat 类不是线程安全的:
Java
public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
...
String dateStamp = dateFormat.format(new Date()); // in multi-threaded environment1
2
3
2
3
其输出结果可能很混乱,因为 dateFormat 使用的内部数据结构可能会被并发的访问所破坏。可以使用同步,但开销很大。或者可以在每次需要的时候都构造一个局部的 SimpleDateFormat 对象,不过太浪费了。如果我们为每个线程构造一个实例就再合适不过了:
Java
public static final ThreadLocal<SimpleDateFormat> dateFormat
= ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));1
2
2
调用:
Java
String dateStamp = dateFormat.get().format(new Date());在一个给定线程中首次调用时,会调用构造器中的 lambda 表达式。在此之后,get 方法会返回属于当前线程的那个实例。
Note
虽然
java.util.Random也是线程安全的,但有时出于效率方面的考虑,也可以使用ThreadLocal辅助类为各个线程提供一个单独的实例。不过在 Java 7 中,提供了一个更为便利的类:Javaint random = ThreadLocalRandom.current().nextInt(upperBound);
4. 任务和线程池
构造一个新的线程开销有些大,因为这涉及与操作系统的交互。如果你的程序中创建了大量的生命期很短的线程,那么不应该把每个任务映射到一个单独的线程,而应该使用线程池(thread pool)。线程池中包含许多准备运行的线程。为线程池提供一个 Runnable,就会有一个线程调用 run 方法。当 run 方法退出时,这个线程不会死亡,而是留在池中准备为下一个请求提供服务。
4.1. Callable 与 Future
Runnable 封装一个异步运行的任务,可以把它想象成一个没有参数和返回值的异步方法。Callable 与 Runnable 类似,但是有返回值。Callable 接口是一个参数化的类型,只有一个方法 call:
Java
public interface Callable<V> {
V call() throws Exception;
}1
2
3
2
3
Future 保存异步计算的结果。可以启动一个计算,将 Future 对象交给某个线程,然后忘掉它。这个 Future 对象的所有者在结果计算好之后就可以获得结果。Future<V> 接口有下面的方法:
Java
V get()
V get(long timeout, TimeUnit unit)
void cancel(boolean mayInterrupt)
boolean isCancelled()
boolean isDone()1
2
3
4
5
2
3
4
5
第一个 get 方法的调用会阻塞,直到计算完成。第二个 get 方法也会阻塞,不过如果在计算完成之前调用超时,会抛出一个 TimeoutException 异常。如果运行该计算的线程被中断,这两个方法都将抛出 InterruptedException。如果计算已经完成,那么 get 方法立即返回。如果计算还在进行,isDone 方法返回 false; 如果已经完成,则返回 true。可以用 cancel 方法取消计算。如果计算还没有开始,它会被取消而且不再开始。如果计算正在进行,那么如果参数为 true,它就会被中断。
Tip:取消一个任务涉及两个步骤。必须找到并中断底层线程。另外任务实现(在
call方法中)必须感知到中断,并放弃它的工作。如果一个Future对象不知道任务在哪个线程中执行,或者如果任务没有监视执行该任务的线程的中断状态,那么取消任务没有任何效果。
执行 Callable 的一种方法是使用 FutureTask,它实现了 Future 和 Runnable 接口,所以可以构造一个线程来运行这个任务:
Java
Callable<Integer> task = ...;
var futureTask = new FutureTask<Integer>(task);
var t = new Thread(futureTask); // it's a Runnable
t.start();
// ...
Integer result = task.get(); // it's a Future1
2
3
4
5
6
2
3
4
5
6
不过,更常见的情况是,可以将一个 Callable 传递到一个执行器。
4.2. 执行器
执行器(Executors)类有许多静态工厂方法,用来构造线程池,表 4.1 中对这些方法进行了汇总:
| 方法 | 描述 |
|---|---|
newCachedThreadPool | 必要时创建新线程;空闲线程会保留 60 秒 |
newFixedThreadPool | 池中包含固定数目的线程;空闲线程会一直保留 |
newWorkStealingPool | 一种适合 “fork-join” 任务的线程池,其中复杂的任务会分解为更简单的任务,空闲线程会 “密取” 较简单的任务 |
newSingleThreadExecutor | 只有一个线程的 “池”,会顺序地执行所提交的任务 |
newScheduledThreadPool | 用于调度执行的固定线程池 |
newSingleThreadScheduledExecutor | 用于调度执行的单线程 “池” |
newCachedThreadPool 方法构造一个线程池,会立即执行各个任务,如果有空闲线程可用,就使用现有空闲线程执行任务;如果没有可用的空闲线程,则创建一个新线程。
newFixedThreadPool 方法构造一个具有固定大小的线程池。如果提交的任务数多于空闲线程数,就把未得到服务的任务放到队列中。当其他任务完成以后再运行这些排队的任务。
newSingleThreadExecutor 是一个退化了的大小为 1 的线程池,由一个线程顺序地执行所提交的任务(一个接着一个执行)。
这 3 个方法返回实现了 ExecutorService 接口的 ThreadPoolExecutor 类的对象。
如果线程生存期很短,或者大量时间都在阻塞,那么可以使用一个缓存线程池(newCachedThreadPool)。不过,如果线程工作量很大而且并不阻塞,你肯定不希望运行太多线程。为了得到最优的运行速度,并发线程数等于处理器内核数。在这种情况下,就应当使用固定线程池(newFixedThreadPool),即并发线程总数有一个上限。
单线程执行器(newSingleThreadExecutor)对于性能分析很有帮助。如果临时用一个单线程池替换缓存或固定线程池,就能测量不使用并发的情况下应用的运行速度会慢多少。
可用下面的方法之一将 Runnable 或 Callable 对象提交给 ExecutorService:
Java
Future<T> submit(Callable<T> task)
Future<?> submit(Runnable task)
Future<T> submit(Runnable task, T result)1
2
3
2
3
使用完一个线程池时,调用 shutdown 这个方法启动线程池的关闭序列。被关闭的执行器不再接受新的任务。当所有任务都完成时,线程池中的线程死亡。另一种方法是调用 shutdownNow 线程池会取消所有尚未开始的任务。
ScheduledExecutorService 接口为调度执行或重复执行任务提供了一些方法。这是对支持建立线程池的 java.util.Timer 的泛化。Executors 类的 newScheduledThreadPool 和 newSingleThreadScheduledExecutor 方法返回实现 ScheduledExecutorService 接口的对象。可以调度 Runnable 或 Callable 在一个初始延迟之后运行一次。也可以调度 Runnable 定期运行。有关详细内容参见 API 注释。
4.3. 控制任务组
invokeAny 方法提交一个 Callable 对象集合中的所有对象,并返回某个已完成任务的结果我们不知道返回的究竟是哪个任务的结果,这往往是最快完成的那个任务。对于搜索问题,如果我们愿意接受任何一种答案,就可以使用这个方法。例如,假定需要对一个大整数进行因数分解,这是 RSA 解码时需要完成的一种计算。可以提交很多任务,每个任务尝试对不同范围内的数进行分解。只要其中一个任务得到了答案,计算就可以停止了。
invokeAll 方法提交一个 Callable 对象集合中的所有对象,这个方法会阻塞,直到所有任务都完成,并返回表示所有任务答案的一个 Future 对象列表。得到计算结果后,还可以像下面这样对结果进行处理:
Java
List<Callable<T>> tasks = ...;
List<Future<T>> results = executor.invokeAll(tasks);
for (Future<T> result : results)
processFurther(result.get());1
2
3
4
2
3
4
不过,如果想要按计算出结果的顺序得到这些结果,可以利用 ExecutorCompletionService 来管理。首先以通常的方式得到一个执行器。然后构造一个 ExecutorCompletionService。将任务提交到这个完成服务。该服务会管理 Future 对象的一个阻塞队列,其中包含所提交任务的结果(一旦结果可用,就会放入队列)。因此,要完成之前的计算,以下组织更为高效:
Java
var service = new ExecutorCompletionService<T>(executor);
for (Callable<T> task : tasks) service.submit(task);
for (int i = 0; i < tasks.size(); i++)
processFurther(service.take().get());1
2
3
4
2
3
4
程序清单 4.1 中的程序展示了如何使用 Callable 和执行器。在第一个计算中,我们统计了一个目录树中包含一个给定单词的文件的个数。为每个文件建立了一个单独的任务:
Java
Set<Path> files = descendants(Path.of(start));
var tasks = new ArrayList<Callable<Long>>();
for (Path file : files) {
Callable<Long> task = () -> occurrences(word, file);
tasks.add(task);
}1
2
3
4
5
6
2
3
4
5
6
然后把这些任务传递到一个执行器服务:
Java
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<Long>> results = executor.invokeAll(tasks);1
2
2
为了得到组合后的统计结果,要将所有结果相加,这个工作会阻塞,直到所有结果都可用:
Java
long total = 0;
for (Future<Long> result : results)
total += result.get();1
2
3
2
3
这个程序还会显示搜索过程所花费的时间。
在程序的第二部分,要搜索包含指定单词的第一个文件。我们使用 invokeAny 来并行化这个搜索。在这里,更要注意任务的建立。一旦有任务返回,invokeAny 方法就会终止。所以不能让搜索任务返回一个 boolean 来指示成功或失败。我们不希望一个任务失败时就停止搜索。实际上,失败的任务要抛出一个 NoSuchElementException 异常。另外,当一个任务成功时,其他任务就要取消。因此,我们要监视中断状态。如果底层线程被中断,搜索任务在终止之前要打印一个消息,使我们能看到其他任务确实已经取消。
Java
public static Callable<Path> searchForTask(String word, Path path) {
return () -> {
try (var in = new Scanner(path)) {
while (in.hasNext()) {
if (in.next().equals(word)) return path;
if (Thread.currentThread().isInterrupted()) {
System.out.println("Search in " + path + " canceled.");
return null;
}
}
throw new NoSuchElementException();
}
};
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
为了提供更多信息,这个程序会打印执行期间线程池的最大大小。这个信息无法由 ExecutorService 接口提供。出于这个原因,我们必须把线程池对象强制转换为 ThreadPoolExecutor 类。
Java
package executors;
import java.io.*;
import java.nio.file.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
/**
* This program demonstrates the Callable interface and executors.
* @version 1.0 2018-01-04
* @author Cay Horstmann
*/
public class ExecutorDemo {
/**
* Counts occurrences of a given word in a file.
* @return the number of times the word occurs in the given word
*/
public static long occurrences(String word, Path path) {
try (var in = new Scanner(path)) {
int count = 0;
while (in.hasNext())
if (in.next().equals(word)) count++;
return count;
} catch (IOException ex) {
return 0;
}
}
/**
* Returns all descendants of a given directory
* @param rootDir the root directory
* @return a set of all descendants of the root directory
*/
public static Set<Path> descendants(Path rootDir) throws IOException {
try (Stream<Path> entries = Files.walk(rootDir)) {
return entries.filter(Files::isRegularFile)
.collect(Collectors.toSet());
}
}
/**
* Yields a task that searches for a word in a file.
* @param word the word to search
* @param path the file in which to search
* @return the search task that yields the path upon success
*/
public static Callable<Path> searchForTask(String word, Path path) {
return () -> {
try (var in = new Scanner(path)) {
while (in.hasNext()) {
if (in.next().equals(word)) return path;
if (Thread.currentThread().isInterrupted()) {
System.out.println("Search in " + path + " canceled.");
return null;
}
}
throw new NoSuchElementException();
}
};
}
public static void main(String[] args)
throws InterruptedException, ExecutionException, IOException {
try (var in = new Scanner(System.in)) {
System.out.print("Enter base directory (e.g. /opt/jdk-9-src): ");
String start = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String word = in.nextLine();
Set<Path> files = descendants(Path.of(start));
var tasks = new ArrayList<Callable<Long>>();
for (Path file : files) {
Callable<Long> task = () -> occurrences(word, file);
tasks.add(task);
}
ExecutorService executor = Executors.newCachedThreadPool();
// use a single thread executor instead to see if multiple threads
// speed up the search
// ExecutorService executor = Executors.newSingleThreadExecutor();
Instant startTime = Instant.now();
List<Future<Long>> results = executor.invokeAll(tasks);
long total = 0;
for (Future<Long> result : results)
total += result.get();
Instant endTime = Instant.now();
System.out.println("Occurrences of " + word + ": " + total);
System.out.println("Time elapsed: "
+ Duration.between(startTime, endTime).toMillis() + " ms");
var searchTasks = new ArrayList<Callable<Path>>();
for (Path file : files)
searchTasks.add(searchForTask(word, file));
Path found = executor.invokeAny(searchTasks);
System.out.println(word + " occurs in: " + found);
if (executor instanceof ThreadPoolExecutor) // the single thread executor isn't
System.out.println("Largest pool size: "
+ ((ThreadPoolExecutor) executor).getLargestPoolSize());
executor.shutdown();
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
4.4. fork-join 框架
fork-join 框架是 Java 7 中引入的一个并发编程工具,用于处理递归式的分治任务。它旨在充分利用多核处理器的计算能力,提高并行计算性能。fork-join 框架主要基于 “分而治之”(Divide and Conquer)的思想,将大任务拆分成小任务,并将子任务的计算结果进行合并,从而实现高效的并行计算。
fork-join 框架的核心组件是 ForkJoinPool,它是一种特殊的线程池,用于管理并执行 fork-join 任务。在 fork-join 框架中,任务通常继承自 RecursiveTask<T>(用于有返回结果的任务)或 RecursiveAction(用于无返回结果的任务)。这两个类都是 ForkJoinTask<T> 的子类。
以下是 fork-join 框架的基本概念和工作原理:
- 分解任务:fork-join 框架将大任务递归地分解成更小的子任务,直到子任务足够小而可以直接计算得出结果;
- 执行任务:当一个线程执行一个 fork-join 任务时,它会检查任务是否足够小以直接执行,如果是,则进行计算。如果任务过大,线程会将任务划分成更小的子任务,然后将子任务交给其他线程执行。这个过程是递归的,直到所有的子任务都被分配到不同的线程执行;
- 合并结果:当所有的子任务执行完成后,线程会等待所有子任务返回结果,并将这些结果进行合并,得到最终的计算结果;
例如,想统计一个数组中有多少个元素满足某个特定的属性。可以将这个数组一分为二,分别对这两部分进行统计,再将结果相加:
Java
package forkJoin;
import java.util.concurrent.*;
import java.util.function.*;
/**
* This program demonstrates the fork-join framework.
* @version 1.01 2015-06-21
* @author Cay Horstmann
*/
public class ForkJoinTest {
public static void main(String[] args) {
final int SIZE = 10000000;
var numbers = new double[SIZE];
for (int i = 0; i < SIZE; i++) numbers[i] = Math.random();
var counter = new Counter(numbers, 0, numbers.length, x -> x > 0.5);
var pool = new ForkJoinPool();
pool.invoke(counter);
System.out.println(counter.join());
}
}
class Counter extends RecursiveTask<Integer> {
public static final int THRESHOLD = 1000;
private double[] values;
private int from;
private int to;
private DoublePredicate filter;
public Counter(double[] values, int from, int to, DoublePredicate filter) {
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}
protected Integer compute() {
if (to - from < THRESHOLD) {
int count = 0;
for (int i = from; i < to; i++) {
if (filter.test(values[i])) count++;
}
return count;
} else {
int mid = (from + to) / 2;
var first = new Counter(values, from, mid, filter);
var second = new Counter(values, mid, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
在这里,如果任务大小小于阈值(THRESHOLD),则直接计算和返回结果。否则,将任务拆分成两个子任务并调用 invokeAll 方法,阻塞直到所有这些任务全部完成。最后再对每个子任务应用 join 返回其总和。
在后台,fork-join 框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。每个工作线程都有一个双端队列(deque)来完成任务。一个工作线程将子任务压入其双端队列的队头。(只有一个线程可以访问队头,所以不需要加锁。)一个工作线程空闲时,它会从另一个双端队列的队尾 “密取” 一个任务。由于大的子任务都在队尾,这种密取很少出现。
Tip:fork-join 池是针对非阻塞工作负载优化的。如果向一个 fork-join 池增加很多阻塞任务,会让它无法有效工作。可以让任务实现
ForkJoinPool.ManagedBlocker接口来解决这个问题,不过这是一种高级技术,在这里不作讨论。
5. 异步计算
5.1. 可完成 Future
当有一个 Future 对象时,需要调用 get 来获得值,这个方法会阻塞,直到值可用。CompletableFuture 类实现了 Future 接口,它提供了获得结果的另一种机制。你要注册一个回调,一旦结果可用,就会(在某个线程中)利用该结果调用这个回调。
Java
CompletableFuture<String> f = ...;
f.thenAccept(s -> Process the result string s);1
2
2
如果能有方法生成一个现成的 CompletableFuture 就好了,不过,大多数情况下,你都需要建立自己的 CompletableFuture。要想异步运行任务并得到 CompletableFuture,不要把它直接提交给执行器服务,而应当调用静态方法 CompletableFuture.supplyAsync。例如:
Java
public CompletableFuture<String> readPage(URL url) {
// Actually, using the HttpClient class can make it more
// convenient to obtain CompletableFuture objects.
return CompletableFuture.supplyAsync(() -> {
try {
return new String(url.openStream().readAllBytes(), "UTF-8");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, executor);
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
如果省略执行器,任务会在一个默认执行器(具体就是 ForkJoinPool.commonPool() 返回的执行器)上运行。通常你可能并不希望这么做。
Warning:注意
supplyAsync方法的第一个参数是一个Supplier<T>,而不是Callable<T>。这两个接口都描述了无参数而且返回值类型为T的函数,不过Supplier函数不能抛出检查型异常。
CompletableFuture 可以采用两种方式完成:得到一个结果,或者有一个未捕获的异常。要处理这两种情况,可以使用 whenComplete 方法:
Java
f.whenComplete((s, t) -> {
if (t == null) { Process the result s; }
else { Process the Throwable t; }
});1
2
3
4
2
3
4
CompletableFuture 之所以被称为是可完成的,是因为你可以手动地设置一个完成值。在其他并发库中,这样的对象称为承诺(promise)。当然,用 supplyAsync 创建一个 CompletableFuture 时,任务完成时就会隐式地设置完成值。不过,显式地设置结果可以提供更大的灵活性。例如,两个任务可以同时计算一个答案:
Java
var f = new CompletableFuture<Integer>();
executor.execute(() -> {
int n = workHard(arg);
f.complete(n);
});
executor.execute(() -> {
int n = workSmart(arg);
f.complete(n);
});1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
要对一个异常完成 future,需要调用:
Java
Throwable t = ...;
f.completeExceptionally(t);1
2
2
Tip:可以在多个线程中在同一个 future 上安全地调用
complete或completeExceptionally。如果这个 future 已经完成,这些调用没有任何作用。
isDone 方法指出一个 Future 对象是否已经完成(正常完成或者产生一个异常)。在前面的例子中,如果结果已经由另一个方法得出,workHard 和 workSmart 方法可以使用这个信息停止工作。
Warning:与普通的
Future不同,调用cancel方法时,CompletableFuture的计算不会中断。取消只会把这个Future对象设置为以异常方式完成(有一个CancellationException异常)。一般来讲,这是有道理的,因为CompletableFuture可能没有一个线程负责它的完成。不过,这个限制也适用于supplyAsync等方法返回的CompletableFuture实例,而这些对象原则上讲是可以中断的。
5.2. 组合 Future
| 方法 | 参数 | 返回类型变量 |
|---|---|---|
thenApply | Function<? super T, ? extends U> fn | <U> |
thenAccept | Consumer<? super T> action | <Void> |
thenCompose | Function<? super T, ? extends CompletionStage<U>> fn | <U> |
handle | BiFunction<? super T, Throwable, ? extends U> fn | <U> |
whenComplete | BiConsumer<? super T, ? super Throwable> action | <T> |
exceptionally | Function<Throwable, ? extends T> fn | <T> |
completeOnTimeout | T value, long timeout, TimeUnit unit | <T> |
orTimeout | long timeout, TimeUnit unit | <T> |
thenRun | Runnable action | <Void> |
其中 thenApply、thenAccept、thenCompose、handle、whenComplete、thenRun 方法还有相应的两个 Async 的形式,其中一种形式使用一个共享 ForkJoinPool,另一种形式有一个 Executor 参数。普通形式回调方法的执行线程是不确定的,有可能是在调用 thenApply 等方法的线程上执行回调,也有可能是在执行 CompletableFuture.complete 方法的线程上执行的。
当出现一个异常时,可以通过 exceptionally 方法计算一个假值。
completeOnTimeout 方法指定一个超时时间和默认值,如果在超时时间之前未完成,则使用该值完成此 CompletableFuture。或者,也可以使用 orTimeout 方法,在超时时使用 TimeoutException 异常来完成此 CompletableFuture。需要注意的是,超时后原 CompletableFuture 的计算不会中断(详见这里)。
还可以组合多个 future 的方法:
| 方法 | 参数 | 返回类型变量 |
|---|---|---|
thenCombine | CompletionStage<? extends U> otherBiFunction<? super T, ? super U, ? extends V> fn | <V> |
thenAcceptBoth | CompletionStage<? extends U> otherBiConsumer<? super T, ? super U> action | <Void> |
runAfterBoth | CompletionStage<?> otherRunnable action | <Void> |
applyToEither | CompletionStage<? extends T> otherFunction<? super T, U> fn | <U> |
acceptEither | CompletionStage<? extends T> otherConsumer<? super T> action | <Void> |
runAfterEither | CompletionStage<?> otherRunnable action | <Void> |
static allOf | CompletableFuture<?>... cfs | <Void> |
static anyOf | CompletableFuture<?>... cfs | <Object> |
Note:
CompletionStage接口描述了如何组合异步计算,而Future接口强调的是计算的结果。CompletableFuture既是CompletionStage,也是Future。
6. 进程
有时你还需要执行另一个程序。为此,可以使用 ProcessBuilder 和 Process 类。Process 类在一个单独的操作系统进程中执行一个命令,允许我们与标准输入、输出和错误流交互。ProcessBuilder 类则允许我们配置 Process 对象。
Note:
ProcessBuilder类可以取代Runtime.exec调用,而且更为灵活。
6.1. 建立一个进程
首先指定你想要执行的命令。可以提供一个 List<String>,或者直接提供命令字符串。
Java
var builder = new ProcessBuilder("gcc", "myapp.c");Note:第一个字符串必须是一个可执行的命令,而不是一个 shell 内置命令。例如,要在 Windows 中运行
dir命令,就需要提供字符串"cmd.exe"、"/C"和"dir"来建立进程。
每个进程都有一个工作目录,用来解析相对目录名。默认情况下,进程的工作目录与虚拟机相同,通常是启动 java 程序的那个目录。可以用 directory 方法改变工作目录:
Java
builder = builder.directory(path.toFile());Note:配置
ProcessBuilder的各个方法都返回其自身,所以可以把命令串起来。最终会调用:Process p = new ProcessBuilder(command).directory(file). ... .start();
接下来,要指定如何处理进程的标准输入、输出和错误流。默认情况下,它们分别是一个管道,可以用以下方法访问:
Java
OutputStream processIn = p.getOutputStream();
InputStream processOut = p.getInputStream();
InputStream processErr = p.getErrorStream();1
2
3
2
3
注意,进程的输入流是 JVM 的一个输出流!我们会写入这个流,而我们写的内容会成为进程的输入。与之相反,我们会读取进程写入输出和错误流的内容。对我们来说,它们都是输入流。
可以指定新进程的输入、输出和错误流与 JVM 相同。如果用户在一个控制台运行 JVM,所有用户输入都会转发到进程,而进程的输出将显示在控制台上。可以调用:
Java
builder.redirectIO()如果你只想继承某些流,可以把值 ProcessBuilder.Redirect.INHERIT 传入 redirectInput、redirectOutput 或 redirectError 方法,例如:
Java
builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);通过提供 File 对象,可以将进程流重定向到文件:
Java
builder.redirectInput(inputFile)
.redirectOutput(outputFile)
.redirectError(errorFile)1
2
3
2
3
进程启动时,会创建或删除输出和错误文件。要追加到现有的文件,可以使用:
Java
builder.redirectOutput(ProcessBuilder.Redirect.appendTo(outputFile));合并输出和错误流通常很有用,这样就能按进程生成这些消息的顺序显示输出和错误消息。可以调用 builder.redirectErrorStream(true) 启用合并。如果这样做,就不能再在 ProcessBuilder 上调用 redirectError,也不能在 Process 上调用 getErrorStream。
你可能还想修改进程的环境变量。你需要得到构建器的环境(由运行 JVM 的那个进程的环境变量初始化),然后加入或删除环境变量条目:
Java
Map<String, String> env = builder.environment();
env.put("LANG", "fr_FR");
env.remove("JAVA_HOME");
Process p = builder.start();1
2
3
4
2
3
4
如果希望利用管道将一个进程的输出作为另一个进程的输入(类似于 shell 中的 | 操作符),Java 9 提供了一个 startPipeline 方法。可以传入一个进程构建器列表,并从最后一个进程读取结果。下面给出一个例子,这里会枚举一个目录树中的各个扩展:
Java
List<Process> processes = ProcessBuilder.startPipeline(List.of(
new ProcessBuilder("find", "/opt/jdk-9"),
new ProcessBuilder("grep", "-o", "\\.[^./]*$"),
new ProcessBuilder("sort"),
new ProcessBuilder("uniq")
));
Process last = processes.get(processes.size() - 1);
var result = new String(last.getInputStream().readAllBytes());1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
6.2. 运行一个进程
配置了构建器之后,要调用它的 start 方法启动进程。如果把输入、输出和错误流配置为管道,现在可以写输入流,并读取输出和错误流。例如:
Java
Process process = new ProcessBuilder("/bin/ls", "-l")
.directory(Path.of("/tmp").toFile())
.start();
try (var in = new Scanner(process.getInputStream())) {
while (in.hasNextLine())
System.out.println(in.nextLine());
}1
2
3
4
5
6
7
2
3
4
5
6
7
Note:进程流的缓冲空间是有限的。不能写入太多输入,而且要及时读取输出。如果有大量输入和输出,可能需要在单独的线程中生产和消费这些输入输出。
要等待进程完成,可以调用:
Java
int result = process.waitFor();或者,如果不想无限期地等待,可以这样做:
Java
long delay = ...;
if (process.waitfor(delay, TimeUnit.SECONDS)) {
int result = process.exitValue();
// ...
} else {
process.destroyForcibly();
}1
2
3
4
5
6
7
2
3
4
5
6
7
你可能并不会等待进程结束,而只是让它继续运行,不时调用 isAlive 来查看进程是否仍存活。要杀死这个进程,可以调用 destroy 或 destroyForcibly。这两个调用之间的区别取决于平台。在 UNIX 上,前者会以 SIGTERM 终止进程,后者会以 SIGKILL 终止进程。(如果 destroy 方法可以正常终止进程,supportsNormalTermination 方法将返回 true。)
最后会在进程完成时接收到一个异步通知。调用 process.onExit() 会得到一个 CompletableFuture<Process>,可以用来调度任何动作。
Java
process.onExit().thenAccept(
p -> System.out.println("Exit value: " + p.exitValue()));1
2
2
6.3. 进程句柄
要获得程序启动的一个进程的更多信息,或者想更多地了解你的计算机上正在运行的任何其他进程,可以使用 ProcessHandle 接口。可以用 4 种方式得到一个 ProcessHandle:
- 给定一个
Process对象p,p.toHandle()会生成它的ProcessHandle; - 给定一个
long类型的操作系统进程 ID,ProcessHandle.of(id)可以生成这个进程的句柄; Process.current()是运行这个 Java 虚拟机的进程的句柄;ProcessHandle.allProcesses()可以生成对当前进程可见的所有操作系统进程的Stream<ProcessHandle>;
给定一个进程句柄,可以得到它的进程 ID、父进程、子进程和后代进程:
Java
long pid = handle.pid();
Optional<ProcessHandle> parent = handle.parent();
Stream<ProcessHandle> children = handle.children();
Stream<ProcessHandle> descendants = handle.descendants();1
2
3
4
2
3
4
Note:
allProcesses、children和descendants方法返回的Stream<ProcessHandle>实例只是当时的快照。流中的任何进程在你看到它们的时候可能已经终止了,而且可能已经启动了其他进程,而那些新启动的进程不在流中。
info 方法可以生成一个 ProcessHandle.Info 对象,它提供了一些方法来获得进程的有关信息。
Java
Optional<String[]> arguments()
Optional<String> command()
Optional<String> commandLine()
Optional<String> startInstant()
Optional<String> totalCpuDuration()
Optional<String> user()1
2
3
4
5
6
2
3
4
5
6
所有这些方法都返回 Optional 值,因为可能某个特定的操作系统不能报告这个信息。
要监视或强制进程终止,与 Process 类一样,ProcessHandle 接口也有 isAlive、supportsNormalTermination、destroy、destroyForcibly 和 onExit 方法。不过,没有对应 waitFor 的方法。