CompletableFuture
Future 介面
Future 介面(JDK 1.5)定義了操作非同步任務執行的方法:
- 獲取非同步任務執行結果
- 取消非同步任務執行
- 判斷非同步任務是否被取消
- 判斷非同步任務是否執行完畢
FutureTask
FutureTask 結合了 Callable 和 Runnable 的特性。
classDiagram
class FutureTask{
Callable
}
class Callable
<<interface>> Callable
class Runnable
<<interface>> Runnable
Callable --o FutureTask
Runnable <|.. FutureTask
FutureTask 的缺點
| 方法 | 問題 |
|---|---|
get() | 容易造成阻塞,建議放在程序後面 |
isDone() | Polling 方式,容易耗費 CPU |
FutureTaskBlock.java
public static void main(String[] args) throws Exception {
FutureTask<String> ft1 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(3);
System.out.println("Thread Name: " + Thread.currentThread().getName());
return "Task1 Over";
});
Thread t1 = new Thread(ft1, "t1");
t1.start();
System.out.println("Future Task Done = " + ft1.get()); // 阻塞
System.out.println(Thread.currentThread().getName() + ": Compute other tasks");
}
// Output:
// Thread Name: t1
// Future Task Done = Task1 Over
// main: Compute other tasksFutureTaskTimeout.java
public static void main(String[] args) throws Exception {
FutureTask<String> ft1 = new FutureTask<>(() -> {
System.out.println("Thread Name: " + Thread.currentThread().getName() + " Start");
TimeUnit.SECONDS.sleep(7);
return "Task1 Over";
});
Thread t1 = new Thread(ft1, "t1");
t1.start();
System.out.println(Thread.currentThread().getName() + ": Compute other tasks");
// 超過 3 秒會拋出 TimeoutException
System.out.println("Future Task Done = " + ft1.get(3, TimeUnit.SECONDS));
}
// Output:
// main: Compute other tasks
// Thread Name: t1 Start
// Exception: java.util.concurrent.TimeoutExceptionCompletableFuture
CompletableFuture(JDK 8)是對 FutureTask 的改良:
- Callback:支援
whenComplete等回調方法 - 非同步任務組合:多個非同步任務可以組合一起處理
- 選擇最快完成的任務:
applyToEither等方法 - Observer Pattern:可以讓 Task 執行完成後通知監聽方
CompletionStage
CompletionStage 代表非同步計算過程的某個階段,一個階段完成後可能會觸發另一個階段。類似 Linux Pipe (|) 符號。
CompletionStageChain.java
stage.thenApply(x -> square(x))
.thenAccept(x -> System.out.println(x))
.thenRun(() -> System.out.println("Done"))創建非同步任務
| 方法 | 返回值 | 說明 |
|---|---|---|
runAsync(Runnable) | CompletableFuture<Void> | 無返回值 |
supplyAsync(Supplier) | CompletableFuture<T> | 有返回值 |
預設使用
ForkJoinPool.commonPool,也可以指定自訂的 Executor Thread Pool。取得結果
| 方法 | 說明 |
|---|---|
get() | 阻塞等待結果 |
join() | 阻塞等待結果(不拋出 checked exception) |
getNow(T valueIfAbsent) | 立即返回結果或預設值 |
complete(T value) | 是否中斷 get() 立即返回設定預設值 |
結果處理方法
thenApply
Task A 執行完畢,執行 Task B,並且 Task B 需要 A 的結果,同時 B 有返回值。
ThenApplyExample.java
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World");thenAccept
Task A 執行完畢,執行 Task B,並且 Task B 需要 A 的結果,但 B 無返回值。
ThenAcceptExample.java
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(System.out::println);thenRun
Task A 執行完畢,執行 Task B,並且 Task B 不需要 A 的結果。
ThenRunExample.java
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Done"));異常處理
| 方法 | 說明 |
|---|---|
exceptionally | 類似 try/catch |
handle + whenComplete | 類似 try/finally |
whenComplete
取得前一個 Thread 結果並消費,不影響上一個 Thread 的返回值。
exceptionally
前面 Thread 異常時執行,捕獲異常範圍包括前面所有非同步執行緒。
handle
相當於 whenComplete() + exceptionally(),根據是否產生異常進行 if-else 分支處理。
多任務組合
等待兩個任務完成
| 方法 | 返回值 | 說明 |
|---|---|---|
thenCombine() | 有 | 兩個 Threads 都有返回值,等待都結束後,結果合併轉換 |
thenAcceptBoth() | 無 | 兩個 Threads 都有返回值,等待都結束後,結果合併消費 |
runAfterBoth() | 無 | 兩個 Threads 都結束後,執行其他邏輯 |
任一任務完成
| 方法 | 返回值 | 說明 |
|---|---|---|
applyToEither() | 有 | 等待任一先結束,轉換其結果 |
acceptEither() | 無 | 等待任一先結束,消費其結果 |
runAfterEither() | 無 | 等待任一先結束,執行其他邏輯 |
多任務等待
MultiTaskWait.java
// 任一執行完畢即返回
Object result = CompletableFuture.anyOf(cf1, cf2, cf3).join();
// 全部執行完畢返回
CompletableFuture.allOf(cf1, cf2, cf3).join();執行緒池注意事項
- 預設 Thread Pool 會自動關閉
- 自訂 Thread Pool 記得手動關閉
thenRun vs thenRunAsync
thenRun:使用前一個任務的執行緒thenRunAsync:可以指定新的執行緒池