Executor フレームワーク

ExecutorService

ExecutorService は Java スレッドプールの核心かくしんインターフェースで、非同期ひどうきタスクの実行じっこう管理かんり制御せいぎょするメソッドを提供ていきょうします。

execute() vs submit()

メソッドもど例外れいがい処理しょり
execute(Runnable)void例外れいがい取得しゅとくできない
submit(Callable)FutureFuture 経由けいゆ例外れいがい取得しゅとく可能かのう

スレッドプールの正しいシャットダウン

スレッドプールをただしくじるための重要じゅうようなポイント:

  1. shutdown + awaitTermination
  2. shutdownNow + awaitTermination
メソッド動作どうさ
shutdown()あたらしいタスクの送信そうしん拒否きょひ待機中たいきちゅうのタスクはキャンセルされない;実行中じっこうちゅうのタスクは完了かんりょうまで継続けいぞく
shutdownNow()あたらしいタスクを拒否きょひ待機中たいきちゅうのタスクをキャンセル;実行中じっこうちゅうのタスクの中断ちゅうだん試行しこう
awaitTermination()スレッドプールの終了しゅうりょう待機たいき(TERMINATED 状態じょうたいまで)、タイムアウト時間じかんパラメータあり

ベストプラクティス

ShutdownExample.java
executorService.shutdown();
try {
    if (!executorService.awaitTermination(3000, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    }
} catch (InterruptedException e) {
    executorService.shutdownNow();
}

フローの説明せつめい

  1. shutdown()し、あたらしいタスクの送信そうしんをブロックし、待機たいきキューないのタスクを完了かんりょうさせる
  2. awaitTermination()し、待機たいきキューないのタスクが指定してい時間じかんない完了かんりょうすることを確保かくほ
  3. awaitTermination()待機たいきもタスクがのこっている場合ばあいは、shutdownNow()強制きょうせい終了しゅうりょう
  4. awaitTermination() 実行中じっこうちゅうのスレッドも中断ちゅうだんされる可能性かのうせいがあるため、InterruptedException を catch

ThreadPoolExecutor

Alibaba 開発規範

一般的いっぱんてきExecutors使用しようせず、ThreadPoolExecutor でカスタム作成さくせいすることで、スレッドプールの動作どうさルールを明確めいかくにし、リソース枯渇こかつリスク(OOM)を回避かいひできます。

Executors がかえすスレッドプールオブジェクトの欠点けってん

種類しゅるい問題もんだい
FixedThreadPool / SingleThreadPoolリクエストキューのながさが Integer.MAX_VALUE まで許可きょかされ、大量たいりょうのリクエストが蓄積ちくせきして OOM の原因げんいんになる可能性かのうせい
CachedThreadPool / ScheduledThreadPoolスレッド作成さくせいすうInteger.MAX_VALUE まで許可きょかされ、大量たいりょうのスレッドが作成さくせいされて OOM の原因げんいんになる可能性かのうせい

7つのパラメータ

ThreadPoolExecutorParams.java
new ThreadPoolExecutor(
    int corePoolSize,           // 常駐(コア)スレッド数
    int maximumPoolSize,        // 最大スレッド数
    long keepAliveTime,         // スレッド生存時間
    TimeUnit unit,              // 時間単位
    BlockingQueue<Runnable> workQueue,  // ブロッキングキュー
    ThreadFactory threadFactory,        // スレッドファクトリ
    RejectedExecutionHandler handler    // 拒否ポリシー
);
パラメータ説明せつめい
corePoolSize常駐じょうちゅう(コア)スレッドすう
maximumPoolSize最大さいだいスレッドすう
keepAliveTime + unitスレッド生存せいぞん時間じかん;コアスレッドすうえたぶんは、一定いってい時間じかん使用しようされないと回収かいしゅう
workQueueブロッキングキュー
threadFactoryスレッドファクトリ
handler拒否きょひポリシー

内部動作原理

ThreadPoolExecutor

拒否ポリシー (Reject Strategy)

ポリシー説明せつめい
AbortPolicy (デフォルト)RejectedExecutionException をスローしてシステムの正常せいじょう動作どうさ阻止そし
CallerRunsPolicyタスクをもともどして実行じっこうし、あたらしいタスクの流量りゅうりょう低減ていげん
DiscardOldestPolicyキューないもっとなが待機たいきしているタスクを破棄はきし、現在げんざいのタスクの再送信さいそうしん試行しこう
DiscardPolicy処理しょりできないタスクをしずかに破棄はきなに処理しょりせず例外れいがいもスローしない

一般的なスレッドプール

ExecutorTypes.java
// 固定数スレッドプール
ExecutorService fixed = Executors.newFixedThreadPool(int nThreads);

// 単一スレッドプール
ExecutorService single = Executors.newSingleThreadExecutor();

// 拡張可能スレッドプール
ExecutorService cached = Executors.newCachedThreadPool();
種類しゅるい説明せつめい
newFixedThreadPool(n)1つのプール / Nのスレッド
newSingleThreadExecutor()1つのプール / 1のスレッド、タスクは順番じゅんばん実行じっこう
newCachedThreadPool()需要じゅようおうじてスレッドを作成さくせい拡張かくちょう可能かのう

スレッドプールサイズの設定

JCIP 公式

スレッド数=利用可能なコア数×(1+待機時間サービス時間) \text{スレッド数} = \text{利用可能なコア数} \times \left(1 + \frac{\text{待機時間}}{\text{サービス時間}}\right)

Little’s Law

L=λ×W L = \lambda \times W
記号きごう説明せつめい
L同時どうじ処理しょりされるリクエストすう
λ長期ちょうき平均へいきん到着とうちゃくりつ (RPS)
Wリクエスト処理しょり平均へいきん時間じかん(レイテンシ)

ForkJoinPool

Fork/Join フレームワーク:

  • Fork複雑ふくざつなタスクを分割ぶんかつおおきな仕事しごとちいさく
  • Join分割ぶんかつしたタスクの結果けっか統合とうごう

参考リソース