CompletableFuture
Future インターフェース
Future インターフェース(JDK 1.5)は非同期タスク実行を操作するメソッドを定義する:
- 非同期タスク実行結果の取得
- 非同期タスク実行の取消し
- 非同期タスクが取消されたかの判定
- 非同期タスクが完了したかの判定
FutureTask
FutureTask は Callable と Runnable の特性を組み合わせている。
classDiagram
class FutureTask{
Callable
}
class Callable
<<interface>> Callable
class Runnable
<<interface>> Runnable
Callable --o FutureTask
Runnable <|.. FutureTask
FutureTask の欠点
| メソッド | 問題 |
|---|---|
get() | ブロッキングを引き起こしやすい、プログラムの後半に配置することを推奨 |
isDone() | ポーリング方式、CPU を消費しやすい |
FutureTaskBlock.java
public static void main(String[] args) throws Exception {
FutureTask<String> ft1 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(3);
System.out.println("Thread Name: " + Thread.currentThread().getName());
return "Task1 Over";
});
Thread t1 = new Thread(ft1, "t1");
t1.start();
System.out.println("Future Task Done = " + ft1.get()); // ブロッキング
System.out.println(Thread.currentThread().getName() + ": Compute other tasks");
}
// Output:
// Thread Name: t1
// Future Task Done = Task1 Over
// main: Compute other tasksFutureTaskTimeout.java
public static void main(String[] args) throws Exception {
FutureTask<String> ft1 = new FutureTask<>(() -> {
System.out.println("Thread Name: " + Thread.currentThread().getName() + " Start");
TimeUnit.SECONDS.sleep(7);
return "Task1 Over";
});
Thread t1 = new Thread(ft1, "t1");
t1.start();
System.out.println(Thread.currentThread().getName() + ": Compute other tasks");
// 3秒を超えると TimeoutException がスローされる
System.out.println("Future Task Done = " + ft1.get(3, TimeUnit.SECONDS));
}
// Output:
// main: Compute other tasks
// Thread Name: t1 Start
// Exception: java.util.concurrent.TimeoutExceptionCompletableFuture
CompletableFuture(JDK 8)は FutureTask の改良版:
- Callback:
whenCompleteなどのコールバックメソッドを対応 - 非同期タスクの組み合わせ:複数の非同期タスクを組み合わせて処理できる
- 最速タスクの選択:
applyToEitherなどのメソッド - Observer Pattern:タスク完了後にリスナーに通知できる
CompletionStage
CompletionStage は非同期計算過程の特定の段階を表し、一つの段階が完了すると次の段階がトリガーされる可能性がある。Linux の Pipe (|) シンボルに類似。
CompletionStageChain.java
stage.thenApply(x -> square(x))
.thenAccept(x -> System.out.println(x))
.thenRun(() -> System.out.println("Done"))非同期タスクの作成
| メソッド | 戻り値 | 説明 |
|---|---|---|
runAsync(Runnable) | CompletableFuture<Void> | 戻り値なし |
supplyAsync(Supplier) | CompletableFuture<T> | 戻り値あり |
デフォルトでは
ForkJoinPool.commonPool を使用。カスタム Executor Thread Pool も指定可能。結果の取得
| メソッド | 説明 |
|---|---|
get() | 結果をブロッキング待機 |
join() | 結果をブロッキング待機(checked exception をスローしない) |
getNow(T valueIfAbsent) | 即座に結果またはデフォルト値を返す |
complete(T value) | get() を中断してデフォルト値を即座に返すかどうか |
結果処理メソッド
thenApply
タスク A が完了したら、タスク B を実行。タスク B は A の結果を必要とし、B には戻り値がある。
ThenApplyExample.java
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World");thenAccept
タスク A が完了したら、タスク B を実行。タスク B は A の結果を必要とするが、B には戻り値がない。
ThenAcceptExample.java
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(System.out::println);thenRun
タスク A が完了したら、タスク B を実行。タスク B は A の結果を必要としない。
ThenRunExample.java
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Done"));例外処理
| メソッド | 説明 |
|---|---|
exceptionally | try/catch に類似 |
handle + whenComplete | try/finally に類似 |
whenComplete
前のスレッドの結果を取得して消費し、前のスレッドの戻り値に影響しない。
exceptionally
前のスレッドで例外が発生した時に実行。例外キャプチャ範囲は前の全ての非同期スレッドを含む。
handle
whenComplete() + exceptionally() に相当。例外の有無に応じて if-else 分岐処理を行う。
マルチタスク組み合わせ
二つのタスク完了待ち
| メソッド | 戻り値 | 説明 |
|---|---|---|
thenCombine() | あり | 両方のスレッドが完了後、結果を統合変換 |
thenAcceptBoth() | なし | 両方のスレッドが完了後、結果を統合消費 |
runAfterBoth() | なし | 両方のスレッドが完了後、他のロジックを実行 |
いずれかのタスク完了
| メソッド | 戻り値 | 説明 |
|---|---|---|
applyToEither() | あり | いずれか先に完了した結果を変換 |
acceptEither() | なし | いずれか先に完了した結果を消費 |
runAfterEither() | なし | いずれか先に完了後、他のロジックを実行 |
マルチタスク待機
MultiTaskWait.java
// いずれか完了で即返却
Object result = CompletableFuture.anyOf(cf1, cf2, cf3).join();
// 全完了で返却
CompletableFuture.allOf(cf1, cf2, cf3).join();スレッドプールの注意事項
- デフォルトの Thread Pool は自動終了する
- カスタム Thread Pool は手動で終了すること
thenRun vs thenRunAsync
thenRun:前のタスクのスレッドを使用thenRunAsync:新しいスレッドプールを指定可能