CompletableFuture

Future 介面

Future 介面(JDK 1.5)定義了操作非同步任務執行的方法:

  • 獲取非同步任務執行結果
  • 取消非同步任務執行
  • 判斷非同步任務是否被取消
  • 判斷非同步任務是否執行完畢

FutureTask

FutureTask 結合了 Callable 和 Runnable 的特性。

  classDiagram
	class FutureTask{
		Callable
	}
	class Callable
	<<interface>> Callable
	class Runnable
	<<interface>> Runnable

	Callable --o FutureTask
	Runnable <|.. FutureTask

FutureTask 的缺點

方法問題
get()容易造成阻塞,建議放在程序後面
isDone()Polling 方式,容易耗費 CPU
FutureTaskBlock.java
public static void main(String[] args) throws Exception {
    FutureTask<String> ft1 = new FutureTask<>(() -> {
        TimeUnit.SECONDS.sleep(3);
        System.out.println("Thread Name: " + Thread.currentThread().getName());
        return "Task1 Over";
    });
    Thread t1 = new Thread(ft1, "t1");
    t1.start();
    System.out.println("Future Task Done = " + ft1.get()); // 阻塞

    System.out.println(Thread.currentThread().getName() + ": Compute other tasks");
}
// Output:
// Thread Name: t1
// Future Task Done = Task1 Over
// main: Compute other tasks
FutureTaskTimeout.java
public static void main(String[] args) throws Exception {
    FutureTask<String> ft1 = new FutureTask<>(() -> {
        System.out.println("Thread Name: " + Thread.currentThread().getName() + " Start");
        TimeUnit.SECONDS.sleep(7);
        return "Task1 Over";
    });
    Thread t1 = new Thread(ft1, "t1");
    t1.start();
    System.out.println(Thread.currentThread().getName() + ": Compute other tasks");

    // 超過 3 秒會拋出 TimeoutException
    System.out.println("Future Task Done = " + ft1.get(3, TimeUnit.SECONDS));
}
// Output:
// main: Compute other tasks
// Thread Name: t1 Start
// Exception: java.util.concurrent.TimeoutException

CompletableFuture

CompletableFuture(JDK 8)是對 FutureTask 的改良:

  • Callback:支援 whenComplete 等回調方法
  • 非同步任務組合:多個非同步任務可以組合一起處理
  • 選擇最快完成的任務applyToEither 等方法
  • Observer Pattern:可以讓 Task 執行完成後通知監聽方

CompletionStage

CompletionStage 代表非同步計算過程的某個階段,一個階段完成後可能會觸發另一個階段。類似 Linux Pipe (|) 符號。

CompletionStageChain.java
stage.thenApply(x -> square(x))
    .thenAccept(x -> System.out.println(x))
    .thenRun(() -> System.out.println("Done"))

創建非同步任務

方法返回值說明
runAsync(Runnable)CompletableFuture<Void>無返回值
supplyAsync(Supplier)CompletableFuture<T>有返回值
預設使用 ForkJoinPool.commonPool,也可以指定自訂的 Executor Thread Pool。

取得結果

方法說明
get()阻塞等待結果
join()阻塞等待結果(不拋出 checked exception)
getNow(T valueIfAbsent)立即返回結果或預設值
complete(T value)是否中斷 get() 立即返回設定預設值

結果處理方法

thenApply

Task A 執行完畢,執行 Task B,並且 Task B 需要 A 的結果,同時 B 有返回值。

ThenApplyExample.java
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World");

thenAccept

Task A 執行完畢,執行 Task B,並且 Task B 需要 A 的結果,但 B 無返回值。

ThenAcceptExample.java
CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(System.out::println);

thenRun

Task A 執行完畢,執行 Task B,並且 Task B 不需要 A 的結果。

ThenRunExample.java
CompletableFuture.supplyAsync(() -> "Hello")
    .thenRun(() -> System.out.println("Done"));

異常處理

方法說明
exceptionally類似 try/catch
handle + whenComplete類似 try/finally

whenComplete

取得前一個 Thread 結果並消費,不影響上一個 Thread 的返回值。

exceptionally

前面 Thread 異常時執行,捕獲異常範圍包括前面所有非同步執行緒。

handle

相當於 whenComplete() + exceptionally(),根據是否產生異常進行 if-else 分支處理。

多任務組合

等待兩個任務完成

方法返回值說明
thenCombine()兩個 Threads 都有返回值,等待都結束後,結果合併轉換
thenAcceptBoth()兩個 Threads 都有返回值,等待都結束後,結果合併消費
runAfterBoth()兩個 Threads 都結束後,執行其他邏輯

任一任務完成

方法返回值說明
applyToEither()等待任一先結束,轉換其結果
acceptEither()等待任一先結束,消費其結果
runAfterEither()等待任一先結束,執行其他邏輯

多任務等待

MultiTaskWait.java
// 任一執行完畢即返回
Object result = CompletableFuture.anyOf(cf1, cf2, cf3).join();

// 全部執行完畢返回
CompletableFuture.allOf(cf1, cf2, cf3).join();

執行緒池注意事項

  • 預設 Thread Pool 會自動關閉
  • 自訂 Thread Pool 記得手動關閉

thenRun vs thenRunAsync

  • thenRun:使用前一個任務的執行緒
  • thenRunAsync:可以指定新的執行緒池

參考資源