執行器框架

ExecutorService

ExecutorService 是 Java 執行緒池的核心介面,提供管理和控制非同步任務執行的方法。

execute() vs submit()

方法返回值異常處理
execute(Runnable)void無法獲取異常
submit(Callable)Future可通過 Future 獲取異常

正確關閉執行緒池

正確關閉執行緒池的關鍵:

  1. shutdown + awaitTermination
  2. shutdownNow + awaitTermination
方法行為
shutdown()拒絕新任務提交;待執行的任務不會取消;正在執行的任務繼續執行直到結束
shutdownNow()拒絕新任務;取消待執行的任務;嘗試中斷執行中的任務
awaitTermination()等待執行緒池關閉(直到 TERMINATED 狀態),有逾時時間參數

最佳實踐

ShutdownExample.java
executorService.shutdown();
try {
    if (!executorService.awaitTermination(3000, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    }
} catch (InterruptedException e) {
    executorService.shutdownNow();
}

流程說明:

  1. 呼叫 shutdown(),阻止新提交任務,並讓等待佇列中的任務執行完成
  2. 呼叫 awaitTermination(),確保等待佇列中的任務最多執行指定時間,以防止執行任務時間太長或被阻塞
  3. awaitTermination() 等待完畢後,若 ExecutorService 中還有任務沒執行完,則呼叫 shutdownNow() 強行終止
  4. 執行 awaitTermination() 時所在的執行緒也有可能被中斷,因此需要 catch InterruptedException

ThreadPoolExecutor

Alibaba 開發規範

一般而言,不會使用 Executors 去創建,而是通過 ThreadPoolExecutor 自定義創建的方式,這樣處理可以更加明確 Thread Pool 運行規則,避免資源耗盡風險 (OOM)。

Executors 返回的執行緒池物件的缺點:

類型問題
FixedThreadPool / SingleThreadPool允許 Request Queue 長度為 Integer.MAX_VALUE,可能堆積大量請求導致 OOM
CachedThreadPool / ScheduledThreadPool允許創建 Thread 數量為 Integer.MAX_VALUE,可能大量創建 Threads 導致 OOM

7 大參數

ThreadPoolExecutorParams.java
new ThreadPoolExecutor(
    int corePoolSize,           // 常駐(核心) Thread 數量
    int maximumPoolSize,        // 最大 Thread 數量
    long keepAliveTime,         // Thread 存活時間
    TimeUnit unit,              // 時間單位
    BlockingQueue<Runnable> workQueue,  // 阻塞隊列
    ThreadFactory threadFactory,        // Thread 工廠
    RejectedExecutionHandler handler    // 拒絕策略
);
參數說明
corePoolSize常駐(核心)Thread 數量
maximumPoolSize最大 Thread 數量
keepAliveTime + unitThread 存活時間;超過常駐 Thread 數量,一段時間不使用,則將超過數量回收
workQueue阻塞隊列
threadFactoryThread 工廠
handler拒絕策略

內部工作原理

ThreadPoolExecutor

拒絕策略 (Reject Strategy)

策略說明
AbortPolicy (預設)拋出 RejectedExecutionException 阻止系統正常運行
CallerRunsPolicy將任務退回呼叫者執行,降低新任務的流量
DiscardOldestPolicy拋棄隊列中等待最久的任務,然後嘗試再次提交當前任務
DiscardPolicy默默地丟棄無法處理的任務,不作任何處理與拋出異常

常見執行緒池

ExecutorTypes.java
// 固定數量執行緒池
ExecutorService fixed = Executors.newFixedThreadPool(int nThreads);

// 單一執行緒池
ExecutorService single = Executors.newSingleThreadExecutor();

// 可擴充執行緒池
ExecutorService cached = Executors.newCachedThreadPool();
類型說明
newFixedThreadPool(n)1 個 Pool / N 個 Threads
newSingleThreadExecutor()1 個 Pool / 1 個 Thread,任務一個接一個執行
newCachedThreadPool()Thread Pool 根據需求創建 Thread,可擴充

執行緒池大小設定

JCIP 公式

Number of Threads=Number of Available Cores×(1+Wait TimeService Time) \text{Number of Threads} = \text{Number of Available Cores} \times \left(1 + \frac{\text{Wait Time}}{\text{Service Time}}\right)

Little’s Law

L=λ×W L = \lambda \times W
符號說明
L同時處理的請求數量
λ長期平均到達率 (RPS)
W處理請求的平均時間(延遲)

ForkJoinPool

Fork/Join Framework:

  • Fork:把一個複雜任務進行拆分,大事化小
  • Join:把分拆的任務結果進行合併

參考資源