CompletableFuture

Future インターフェース

Future インターフェース(JDK 1.5)は非同期ひどうきタスク実行じっこう操作そうさするメソッドを定義ていぎする:

  • 非同期ひどうきタスク実行じっこう結果けっか取得しゅとく
  • 非同期ひどうきタスク実行じっこう取消とりけ
  • 非同期ひどうきタスクが取消とりけされたかの判定はんてい
  • 非同期ひどうきタスクが完了かんりょうしたかの判定はんてい

FutureTask

FutureTask は Callable と Runnable の特性とくせいわせている。

  classDiagram
	class FutureTask{
		Callable
	}
	class Callable
	<<interface>> Callable
	class Runnable
	<<interface>> Runnable

	Callable --o FutureTask
	Runnable <|.. FutureTask

FutureTask の欠点

メソッド問題もんだい
get()ブロッキングをこしやすい、プログラムの後半こうはん配置はいちすることを推奨すいしょう
isDone()ポーリング方式ほうしき、CPU を消費しょうひしやすい
FutureTaskBlock.java
public static void main(String[] args) throws Exception {
    FutureTask<String> ft1 = new FutureTask<>(() -> {
        TimeUnit.SECONDS.sleep(3);
        System.out.println("Thread Name: " + Thread.currentThread().getName());
        return "Task1 Over";
    });
    Thread t1 = new Thread(ft1, "t1");
    t1.start();
    System.out.println("Future Task Done = " + ft1.get()); // ブロッキング

    System.out.println(Thread.currentThread().getName() + ": Compute other tasks");
}
// Output:
// Thread Name: t1
// Future Task Done = Task1 Over
// main: Compute other tasks
FutureTaskTimeout.java
public static void main(String[] args) throws Exception {
    FutureTask<String> ft1 = new FutureTask<>(() -> {
        System.out.println("Thread Name: " + Thread.currentThread().getName() + " Start");
        TimeUnit.SECONDS.sleep(7);
        return "Task1 Over";
    });
    Thread t1 = new Thread(ft1, "t1");
    t1.start();
    System.out.println(Thread.currentThread().getName() + ": Compute other tasks");

    // 3秒を超えると TimeoutException がスローされる
    System.out.println("Future Task Done = " + ft1.get(3, TimeUnit.SECONDS));
}
// Output:
// main: Compute other tasks
// Thread Name: t1 Start
// Exception: java.util.concurrent.TimeoutException

CompletableFuture

CompletableFuture(JDK 8)は FutureTask の改良版かいりょうばん

  • CallbackwhenComplete などのコールバックメソッドを対応たいおう
  • 非同期ひどうきタスクのわせ複数ふくすう非同期ひどうきタスクをわせて処理しょりできる
  • 最速さいそくタスクの選択せんたくapplyToEither などのメソッド
  • Observer Pattern:タスク完了かんりょうにリスナーに通知つうちできる

CompletionStage

CompletionStage は非同期ひどうき計算けいさん過程かてい特定とくてい段階だんかいあらわし、ひとつの段階だんかい完了かんりょうするとつぎ段階だんかいがトリガーされる可能性かのうせいがある。Linux の Pipe (|) シンボルに類似るいじ

CompletionStageChain.java
stage.thenApply(x -> square(x))
    .thenAccept(x -> System.out.println(x))
    .thenRun(() -> System.out.println("Done"))

非同期タスクの作成

メソッドもど説明せつめい
runAsync(Runnable)CompletableFuture<Void>もどなし
supplyAsync(Supplier)CompletableFuture<T>もどあり
デフォルトでは ForkJoinPool.commonPool使用しよう。カスタム Executor Thread Pool も指定してい可能かのう

結果の取得

メソッド説明せつめい
get()結果けっかをブロッキング待機たいき
join()結果けっかをブロッキング待機たいき(checked exception をスローしない)
getNow(T valueIfAbsent)即座そくざ結果けっかまたはデフォルトかえ
complete(T value)get()中断ちゅうだんしてデフォルト即座そくざかえすかどうか

結果処理メソッド

thenApply

タスク A が完了かんりょうしたら、タスク B を実行じっこう。タスク B は A の結果けっか必要ひつようとし、B にはもどがある。

ThenApplyExample.java
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World");

thenAccept

タスク A が完了かんりょうしたら、タスク B を実行じっこう。タスク B は A の結果けっか必要ひつようとするが、B にはもどがない。

ThenAcceptExample.java
CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(System.out::println);

thenRun

タスク A が完了かんりょうしたら、タスク B を実行じっこう。タスク B は A の結果けっか必要ひつようとしない。

ThenRunExample.java
CompletableFuture.supplyAsync(() -> "Hello")
    .thenRun(() -> System.out.println("Done"));

例外処理

メソッド説明せつめい
exceptionallytry/catch類似るいじ
handle + whenCompletetry/finally類似るいじ

whenComplete

まえのスレッドの結果けっか取得しゅとくして消費しょうひし、まえのスレッドのもど影響えいきょうしない。

exceptionally

まえのスレッドで例外れいがい発生はっせいしたとき実行じっこう例外れいがいキャプチャ範囲はんいまえすべての非同期ひどうきスレッドをふくむ。

handle

whenComplete() + exceptionally()相当そうとう例外れいがい有無うむおうじて if-else 分岐ぶんき処理しょりおこなう。

マルチタスク組み合わせ

二つのタスク完了待ち

メソッドもど説明せつめい
thenCombine()あり両方りょうほうのスレッドが完了かんりょう結果けっか統合とうごう変換へんかん
thenAcceptBoth()なし両方りょうほうのスレッドが完了かんりょう結果けっか統合とうごう消費しょうひ
runAfterBoth()なし両方りょうほうのスレッドが完了かんりょうほかのロジックを実行じっこう

いずれかのタスク完了

メソッドもど説明せつめい
applyToEither()ありいずれかさき完了かんりょうした結果けっか変換へんかん
acceptEither()なしいずれかさき完了かんりょうした結果けっか消費しょうひ
runAfterEither()なしいずれかさき完了かんりょうほかのロジックを実行じっこう

マルチタスク待機

MultiTaskWait.java
// いずれか完了で即返却
Object result = CompletableFuture.anyOf(cf1, cf2, cf3).join();

// 全完了で返却
CompletableFuture.allOf(cf1, cf2, cf3).join();

スレッドプールの注意事項

  • デフォルトの Thread Pool は自動じどう終了しゅうりょうする
  • カスタム Thread Pool は手動しゅどう終了しゅうりょうすること

thenRun vs thenRunAsync

  • thenRunまえのタスクのスレッドを使用しよう
  • thenRunAsyncあたらしいスレッドプールを指定してい可能かのう

参考リソース