Flowable Backpressure Support - Flowableの最も包括的で詳細な説明

背圧

データストリームの送信、処理、および応答はそれぞれのスレッドで独立して実行されてもよく、上流のデータが送信される場合、下流の処理が完了したかどうかは分からず、データストリームが送信される前に下流の処理が終了する。
このようにして、上流の立ち上げが非常に高速で、下流の処理が遅い場合はどうなりますか?
キャッシュプール内のデータが処理されていない場合は、キャッシュプール内のデータが処理されず、最終的には、Rxjavaの背圧問題であるメモリオーバーフローが多く発生します。
たとえば、次のコードを実行します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

Observableオブジェクトの作成ObervableはObserverがSchedulers.newThread()の別のスレッドで5秒ごとにデータを受信して​​いる間、Schedulers.newThread()()のスレッドでデータを継続的に送信し、その後、メモリ使用量をチェックします次のように:
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

アップストリームとダウンストリームはそれぞれのスレッドで別々にデータを処理するため(アップストリームとダウンストリームが同じスレッド内にある場合、データのダウンストリーム処理はアップストリームデータ送信をブロックし、アップストリームはデータを送信してダウンストリーム処理を待ち、アップストリームデータレートはデータを受信するダウンストリームよりもはるかに高速で、上流と下流の流量が不均一になるため、データが蓄積され、メモリオーバーフローが発生します。

流動性のある

Flowableは、背圧の問題に対する解決策であり、Observableに基づくオプティマイザは、Observableと同じオブザーバパターンのメンバーではありません。Flowableは、この一連のオブザーバパターン、PublisherおよびSubscriber ObservableはObservableSource / Observerオブザーバパターンの典型的なインプリメンテーションです。
したがって、Flowableを使用する場合、オブザーバブルオブザーバブルはもはやオブザーバブルではなく、Flowableです。オブザーバはもはやオブザーバではなく、サブスクライバです。 フローティングとサブスクライバはまだアソシエーションによって購読しています()。
FlowableはObservableに基づく最適化の成果であるためObservableが解決できる問題はFlowableがObservableをFlowableだけで残さないという問題を解決するということです。 実際、これは絶対に欠かせないものです。彼らはそれぞれ独自の長所と短所を持っています。
逆流圧力のサポートは、Flowableベースのデータストリームに追加され、データ処理のための演算子に追加され、Observableよりもはるかに効率的に動作する余分なロジックを追加します。
アップストリームとダウンストリームだけが独自のスレッドで実行され、アップストリームの送信データ速度はデータ処理速度を受信するダウンストリームよりも大きいため、バックプレッシャの問題が発生します。
したがって、同じスレッドで上流側と下流側の両方、または異なるスレッドで上流側と下流側で作業することができれば、下流の処理データは上流の送信データよりも速く、背圧の問題はなく、使用する必要はありません流動性があり、性能に影響しないようにする。
Flowableを介してデータフローを処理するための基本的なコードは次のとおりです。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

実装の結果は次のとおりです。

見積もり

System.out:launch ----> 1
System.out:launch ----> 2
System.out:launch ----> 3
System.out:launch ----> done
System.out:Receive ----> 1
System.out:Receive ----> 2
System.out:Receive ----> 3
System.out:Receive ----> Finish

私たちは実行結果とObserverableの間に違いはないが、上流と下流に加えて、独自の実行スレッドを指定するコードに加えて、3つの異なるものがあり、より多くのBackpressureStrategyタイプのパラメータを作成します。
第2に、onSubscribeコールバックパラメータはDisposableではなく、Subscription、more line code:

  s.request(Long.MAX_VALUE); 

第3に、Flowableトランスミッタデータ、ObservableEmitter FlowableEmitterの代わりにトランスミッタが使用されます。

BackpressureStrategy背圧戦略

Flowableの作成方法に基づいて、より多くのBackpressureStrategyタイプのパラメータを作成し、
BackpressureStrategyは列挙型です。ソースは次のとおりです。

公開列挙型BackpressureStrategy {
   エラー、バッファ、ドロップ、最新、ミッシング
 }

その役割は何ですか?
フロー可​​能な非同期キャッシュプールはObservable Observable Observable非同期キャッシュプールとは異なり、サイズ制限はなく、内部に無制限のデータを追加できます。また、OOM、およびFlowable非同期キャッシュプールには128の固定容量があります。
BackpressureStrategyの目的は、非同期キャッシュプールを介してデータを格納するFlowableの戦略を設定することです。

エラー

このポリシーでは、Flowableに配置された非同期キャッシュプール内のデータが制限を超えると、MissingBackpressureExceptionがスローされます。
次のコードを実行します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

フローティング129のデータを起動すると、サブスクライバは10秒後にスリープして受信を開始し、コンソールを実行すると次の印刷例外が検出されます。

見積もり

W / System.err:io.reactivex.exceptions.MissingBackpressureException:create:要求の不足のために値を出力できませんでした
W / System.err:io.reactivex.internal.operators.flowable.FlowableCreate $ ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
システム管理者:
W / System.err:at net.fbi.rxjava2.RxJava2Demo $ 6.subscribe(RxJava2Demo.java:103)
システム管理者:
W / System.err:at io.reactivex.Flowable.subscribe(Flowable.java:12218)
システム管理者:
W / System.err:at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
W / System.err:at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
W / System.err:java.util.concurrent.FutureTask.run(FutureTask.java:237)
W / System.err:java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:154)
W / System.err:java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
W / System.err:java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
W / System.err:java.util.concurrent.ThreadPoolExecutorで$ Worker.run(ThreadPoolExecutor.java:588)
W / System.err:java.lang.Thread.run(Thread.java:818)

Flowableが128に送信するデータの数を変更すると、この例外は発生しません。

DROP

このポリシーでは、Flowableの非同期キャッシュプールがいっぱいになると、キャッシュプールにドロップされるデータは破棄されます。
次のコードを実行します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

上記のコードでは、Flowableを作成して500データを作成し、100ミリ秒ごとに送信し、送信開始時刻と送信終了時刻を記録します。ダウンストリームは300ミリ秒ごとにデータを受信します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

ログを通して
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

128番目のデータを受信した後、加入者はすでに288回受信しており、バッファプールが満杯であるために60個のデータが破棄されることがわかります。
さて、Flowableが129番目のデータを送信すると、Subscriberは42個のデータを受信し、129番目のデータがキャッシュプールにない理由は何ですか? ログは次のとおりです。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

これは、キャッシュプール内のデータが1つを受信し、1つをクリーンアップするためのサブスクライバではなく、95のクリーンアップまで蓄積されているためです。 Flowableデータを入れることができた後、サブスクライバは96番目のデータを受信し、キャッシュプールはデータをクリーンアップし始めました。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

ログが見つかるかどうかを確認し、加入者はデータの後に最初の96を受信し、Flowable 288番目のデータ送信を受信します。 キャッシュフル状態の128と288の間のデータは破棄されたので、128番目のデータを受信した後、サブスクライバは129番目のデータの代わりに288番目のデータを受信します。

最新

Dropポリシーと同様に、LATESTは、キャッシュプールが満杯で、キャッシュプールに置かれるデータが破棄された場合、キャッシュプールの状態に関係なく、最後のデータをキャッシュプールに強制します。
上記のコードのDROPポリシーをLATESTに変更します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

実行後のログの比較は次のようになります。
DROP:
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

最新:
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

最新のポリシーFlowableによって受信されたデータの完了前に受信されたサブスクライバは、送信された最後のデータでDropポリシーでは送信されません。

バッファ

この戦略では、ObservableのようなFlowableの非同期キャッシュプールには固定サイズがなく、MissingBackpressureExceptionをスローせずにOOMを引き起こすことなく無制限のデータを追加できます。
次のコードを実行します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

メモリ使用量を表示:
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

Observalbeを使用した場合と同じことが判明し、メモリサージにつながり、最終的にOOMにつながります。その違いは、Flowableメモリの使用がはるかに遅いことです。これは、Flowableベースのデータフローとデータ処理操作バックプレッシャサポートが追加され、余分なロジックが追加され、Observableよりもはるかに効率が低下します。

ミッシング

このメソッドは、Createメソッドで作成されたFlowableがBackpressureポリシーを指定せず、OnNextを介して送信されたデータをキャッシュまたは破棄しないことを意味します(backBackureBuffer()/ onBackpressureDrop()/ onBackpressureLatest指定された背圧戦略。

onBackpressureXXX背圧オペレータ

Flowableは、createを使用して作成されたときにバックプレッシャポリシーを指定することに加えて、バックプレッシャ演算子がjust、fromArrayなどの他の作成演算子を使用して作成された後で背圧ポリシーを指定することもできます。
onBackpressureBuffer()はBackpressureStrategy.BUFFERに対応します。
onBackpressureDrop()はBackpressureStrategy.DROPに対応します。
onBackpressureLatest()はBackpressureStrategy.LATESTに対応します
コード例
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

コードに相当する:
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

購読

SubscriptionとDisposableは、ObserverとObververable、Subscriber間のサブスクリプション関係を解除するDisposableのdispose()メソッドと同様に、ObserverとObserverの両方がサブスクリプションステータスにサブスクライブした後にオブザーバとオブザーバによって返されるパラメータです。流動的な購読関係。
相違点は、上記のコードのように、複数のメソッドリクエスト(long n)のインターフェイスSubscriptionです。

  s.request(Long.MAX_VALUE); 

このメソッドの役割は何ですか、このメソッドを削除すると効果がありますか?
次のコードを実行します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

結果は次のとおりです。

見積もり

System.out:launch ----> 1
System.out:launch ----> 2
System.out:launch ----> 3
System.out:launch ----> done

私たちは、Flowableが通常のようにデータを送信し、サブスクライバはデータを受信しないことを発見しました。
これは、Flowableがデータのダウンストリーム要求を設定するための新しい考え方のレスポンスプルを使用するためです。アップストリームは、ダウンストリームの要求に応じてオンデマンドでデータを送信できます。
コール要求を表示しない場合、デフォルトのダウンストリームデマンドはゼロです。上のコードを実行すると、アップストリームのフローデータ送信はダウンストリームサブスクライバに渡されません。
次のコードを実行します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

結果は次のとおりです。

 System.out:launch ----> 1
 System.out:launch ----> 2
 System.out:launch ----> 3
 System.out:launch ----> Finish System.out:Receive ----> 1
 System.out:Receive ----> 2

私たちはs.request(2)によってそれを発見しました;サブスクライバは2のためのデータ要求を設定しましたが、要求データの範囲を超えて受信されませんでした。
繰り返された通話要求により、どのような結果が生成されますか?
次のコードを実行します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

onSubscribe(Subscription)メソッドの2回の呼び出しで10個のデータをFlowableで起動すると、結果は次のようになります。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

我々は、加入者が番号の後の2つの要件の合計である7つのデータの合計を受け取ったことがわかった。

ログを見ると、上流側は実際の下流側のニーズに応じてデータを送信しないことがわかりましたが、下流側の必要性にかかわらず、どれくらい送ることができるか、どれだけ送ることができます。
下流の需要データを超えても、依然として非同期キャッシュプールに配置されます。 この点を確認するには、次のコードを使用します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

Flowableを介して130個のデータが送信され、s.request(1)によってダウンストリームデータ要求の数が1に設定されます。
実行後、ログは次のようになります。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

例外が長時間発生したため、下流の要件外のデータは依然として非同期キャッシュ・プールに置かれ、オーバーフローしました。

では、データを送信するダウンストリーム要求の数に応じて、どのようにアップストリームを行うことができますか?
リクエストはリクエストの数の下流に設定できますが、アップストリームではこの番号を取得できませんでしたが、どうすれば入手できますか?
これには、FlowableとObservableの第3の違いが必要です。Flowable Emitter FlowableEmitter

FlowableEmitter

フロー可​​能なエミッタFlowableEmitterと観測可能なエミッタObservableEmitterは、ともにEmitterから継承されています(Emitterは既にレッスン2で述べています)
2つのソースを比較することができます。

パブリックインタフェースObservableEmitter <T> Emitter <T> {

     void setDisposable(使い捨て可能なd);

     void setCancellable(Cancellable c);

     isDisposed();

     ObservableEmitter <T> serialize();
 }

パブリックインターフェイスFlowableEmitter <T>はEmitter <T> {

     void setDisposable(Disposable s);

     void setCancellable(Cancellable c);

     long requested();

     isCancelled();

     FlowableEmitter <T> serialize();
 }

Interface FlowableEmitterのもう1つのメソッド

  long requested(); 

このメソッドを使用すると、未処理のリクエストの現在の数を取得できます。
私たちは、同期状態の下でFlowableの使用が、今度は我々は同期の状態を見なければならないと言いましたが、次のコードを実行し、今回我々は原則を失う必要があります。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

次のようにログを出力します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

ログを通して、e.requested()によって得られた値は動的な値であり、これは下流で受信されたデータの量とともに減少することがわかります。
上記のコードでは、同じスレッド内で上流と下流で実行される上流と下流のスレッドを指定しませんでした。
これは先に述べたことに反し、Flowableを同期させて使用していません。 これは、非同期の場合のe.requested()の値が複雑すぎるため、同期の状況を遷移させることによって明確にする必要があるためです。
上記のコードでは、上流と下流に別々のスレッドを指定するために、コードは次のとおりです
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

実行後に次のようにログに記録します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

ダウンストリームデータ要求を3と指定しましたが、未処理要求の数が上流になると、3ではなく128になります。 未処理の未処理要求が最小限に抑えられていますか? ダウンストリームセットのデータ要求が128未満である限り、すべての128へのアップストリームアクセスは可能ですか?
この質問では、ダウンストリームデータ要求が500、状況が128以上の場合に試行します。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

操作ログは以下の通りです
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

結果はまだ128です。
実際、s.request()によって設定されたダウンストリーム要求の数に関係なく、アップストリームで取得した最初の未処理要求の数は128です。
これは何ですか?
私たちが言いましたように、Flowableには非同期キャッシュプールがあります。上流に送信されたデータは最初に非同期キャッシュプールに格納され、次に非同期キャッシュプールによって下流に転送されます。 したがって、上流にデータを送信する場合、まず下流のデータ要求の量ではなく、キャッシュプールを置くことはできません。それ以外の場合、キャッシュプールはデータ損失またはバックプレッシャー異常を引き起こします。 キャッシュ・プールを置くことができれば、ダウンストリーム・データ要件を超えているかどうかについて、バッファ・プール内のデータをダウンストリームに転送し、超えていなければ、バッファ・プール内のデータをダウンストリームに渡すことができます超えた場合、それは渡されません。
データのダウンストリームの需要がキャッシュプールのサイズを超えており、アップストリームが最大需要を得ることができる場合は128、アップストリームの需要以上の128それを取得する方法ですか?
この質問で、我々は、次のコードを実行し、上流は150のデータを送信し、下流にも150のデータが必要です。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

ログのインターセプトされた部分は次のとおりです。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

e.requested()によって取得された上流の未完了要求の数は常に減少しませんが、33に減少すると128に戻ります。96の下流データが受信された後に時間がかかる。 前にも述べたように、非同期キャッシュプールのデータはダウンストリームではクリーンアップされませんが、95に累積されるたびにクリーンアップされます。 e.requested()で得られた値は、非同期キャッシュプールのクリーニングによって取得されます。 つまり、非同期キャッシュプールがクリーンアップされるたびに、残りのスペースが未処理の未処理要求の数を増加させ、異常なバックプレッシャーやデータ損失を引き起こさないようにします。
上流側では、データを送信する際に考慮する必要がある下流の必要性を考慮する必要はありませんが、非同期キャッシュプールが行かないようにするかどうかを検討するだけで済みます。 したがって、e.requested()で取得される値は実際のダウンストリームデータ要求数ではなく、非同期キャッシュプールに入れることができるデータ量です。 キャッシュプールにデータを格納し、その後、ダウンストリームデータ要求量に従ってバッファプールによって、終了データが95に蓄積されるまでクリアして、新しいデータストレージのためのスペースを確保します。 下流の処理データが遅い場合、下流のデータ転送のバッファプールも対応する低速であり、データの転送が行われない場合、e.requested()を介して新しいデータを格納するための十分な領域がない場合、これは0になり、この時点でデータが送信され、BackpressureStrategyのバックプレッシャー戦略に従ってMissingBackpressureExceptionの異常が発生するか、このデータが失われます。
したがって、アップストリームは0に等しいe.requested()を必要とし、起動データの一時停止は、背圧の問題を解決することができます。
最終プラン

次のコードを実行するには、元の質問に戻りましょう:
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

下流の処理データ速度(Thread.sleep(50))が上流のデータ転送速度を維持できないため、背圧の問題が発生します。
実行後にメモリ使用量を確認すると、次のようになります。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

すぐにメモリサージ、OOM
次に、Flowableを使用していくつかの改善を行い、背圧の問題を起こさないように、異常やデータの消失を引き起こさないようにします。
コードは次のとおりです。
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

ダウンストリーム処理データ速度Thread.sleep(50)は、アップストリームデータ伝送の速度を維持することができず、
違いは、受信したデータごとにonNext(整数整数)メソッドでリクエストを追加し、

 mSubscription.request(1)

上流にコードを追加する

  if(e.requested()== 0)continue; 

上流側が必要に応じてデータを送信するようにします。
実行してメモリを表示した後:
Flowable Backpressure Support  -  Flowableの最も包括的で詳細な説明

メモリは、データを送信する下流の需要に厳密に従って、かなり穏やかであり、アップストリームはMissingBackpressureException異常、またはデータ損失を持っていません。

カテゴリ:モバイル開発 時間:2017-12-14 人気:99
この記事では、

関連記事

Copyright (C) socapnw.com, All Rights Reserved.

Socapnw All Rights Reserved.

processed in 3.593 (s). 11 q(s)