Spring Reactive

Project Reactor

アーキテクチャ

Reactive Stream

仕様

Flux & Mono

オペレーター

  • 作成(さくせい)
  flowchart TB
    F([Flux])
    Cr[Create]
    c[create]
    j[just]
    f[from]
    fI[fromIterable]
    err[error]
    et[empty]
    n[never]
    F --> Cr
    Cr --> c
    Cr --> j
    Cr --> f
    Cr --> fI
    Cr --> err
    Cr --> et
    Cr --> n
  • リトライ
  flowchart TB
    F([Flux])
    R[Retry]
    r[retry]
    rW[retryWhen]
    rB[retryBackoff]

    F --> R
    R --> r
    R --> rW
    R --> rB
  • イベント処理(しょり)
  flowchart TB
    F([Flux])
    EH[Event Handling]
    dON[doOnNext]
    dOE[doOnError]
    dOC[doOnComplete]
    h[handler]
    etc[...etc.]

    F --> EH
    EH --> dON
    EH --> dOE
    EH --> dOC
    EH --> h
    EH --> etc
  • 時間管理(じかんかんり)
  flowchart TB
    F([Flux])
    T[Time]
    t[timeout]
    i[internal]
    df[defer]
    dl[delay]

    F --> T
    T --> t
    T --> i
    T --> df
    T --> dl
  • 変換(へんかん)結合(けつごう)
  flowchart LR
    F([Flux])
    TC[Transform and Compose]
    m[map]
    fM[flatMap]
    cM[concatMap]
    mW[mergeWith]
    zW[zipWith]
    rd[reduce]
    bf[buffer]
    gr[group]
    cl[collect]

    F --> TC
    TC --> m
    TC --> fM
    TC --> cM
    TC --> mW
    TC --> zW
    TC --> rd
    TC --> bf
    TC --> gr
    TC --> cl
  • スレッドとスケジューラ

    1. parallel():CPU コア(すう)(もと) づく。
    2. fromExecutorService()独自(どくりつ) のプール。
    3. boundedElastic():タスク(すう)(おう) じてスレッドを調整(ちょうせい)
    4. single():シングルスレッド。
    5. immediate()現在(げんざい) のスレッド。
  flowchart LR
    F([Flux])
    TS[Threading and Schedulers]
    p[parallel]
    sO[subscribeOn]
    pO[publishOn]
    Sd[Schedulers]

    F --> TS
    TS --> p
    p --> runOn
    p --> sequential

    TS --> sO
    TS --> pO

    TS --> Sd
    Sd --> paralle
    Sd --> fromExecutorService
    Sd --> elastic
    Sd --> single
    Sd --> immediate

Cold vs Hot Stream

バックプレッシャー

スケジューラ

テスト

Spring WebFlux

WebClient


用語集


アーキテクチャ

モデル

バックプレッシャー

バックプレッシャー とは、発行者(はっこうしゃ)送信(そうしん) するデータ(りょう)購読者(こうどくしゃ)処理能力(しょりのうりょく)() える状況(じょうきょう) のことです。これを制御(せいぎょ) することでデータ損失(そんしつ)(ふせ) ぎます。

Reactive Stream

  • Project Reactor
    • Reactive Stream 仕様(しよう)実装(じっそう)
    • Spring WebFlux で採用(さいよう) されています。

フロー

I

II

仕様

Publisher

public interface Publisher<T> {
	void subscribe(Subscriber<? super T> var1);
}

Subscriber

public interface Subscriber<T> {
	void onSubscribe(Subscription var1);
	void onNext(T var1);
	void onError(Throwable var1);
	void onComplete();
}

Subscription

  • 発行者(はっこうしゃ)購読者(こうどくしゃ)接続(せつぞく) します
public interface Subscription {
	void request(long var1);
	void cancel();
}

Processor

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Flux & Mono

  • Reactive Streams 仕様(しよう)実装(じっそう) したリアクティブタイプ。
  • Flux:0 から N ()要素(ようそ)
  • Mono:0 から 1 ()要素(ようそ)

メソッド

just()

シーケンスを作成(さくせい) し、データストリームを宣言(せんげん) します。

subscribe()

シーケンスをサブスクライブします。これによりデータストリームが開始(かいし) されます。

fromArray(), fromIterable(), fromStream()

配列(はいれつ) や Iterable からシーケンスを作成(さくせい) します。

Flux 専用

range()

interval()

Mono 専用

fromSupplier()

fromCallable()

fromFuture()

fromRunnable()

スケジューラ

実行戦略

  • immediate()現在(げんざい) のスレッド。
  • single():シングルスレッド。
  • boundedElastic():ブロッキング処理用(しょりよう)
  • parallel()並列計算用(へいれつけいさんよう)
  • publishOnsubscribeOn:実行コンテキストの()() え。

Cold vs Hot Stream

Cold Publisher

サブスクライブされるまで送信(そうしん)開始(かいし) しません。新規購読(しんきこうどく) ごとにデータが生成(せいせい) されます。

Hot Publisher

購読者の有無(うむ)(かか) わらずデータを送信(そうしん) する可能性(かのうせい) があります。

refCount()

パブリッシャーは(ほか) の購読者を待機(たいき) します。

cache()

パブリッシャーが送信(そうしん) したデータをキャッシュします。

WebFlux - WebClient

メソッド

retrieve()

レスポンスボディを直接取得(ちょくせつしゅとく) します。

exchangeToMono()

HTTP レスポンス(ステータス、ヘッダー、ボディ)を詳細(しょうさい)制御(せいぎょ) します。

onErrorResume()

onErrorReturn()

リトライ

  • リトライ 3 (かい)遅延(ちえん) 2 (びょう) 、ジッター(Jitter)を適用(てきよう) します。
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));