Flowable 을 생성시 BackpressureStrategy 을 사용이 가능하다.

예제를 한번 보자.

Flowable<Integer> source = Flowable.create(emitter -> {
    for (int i=0;i<1000;i++) {
        if(emitter.isCancelled())
            return;

        emitter.onNext(i);
    }

    emitter.onComplete();    
}  , BackpressureStrategy.BUFFER); //BackpressureStrategy.BUFFER 을 인자로 넣어서 생성이 가능하다. 

source.observeOn(Schedulers.io())
        .subscribe(System.out::println);

sleep( 1000 );
..........
0
1
2
3
4
...

인자로 넣을 수 있는 건 5가지가 있다.

BackpressureStrategy설명
Missing배압을 적용안한다. 후에 onBackpressureXXX()등으로 컨트롤이 가능하다.
ErrorMissingBackpressureException 발생시 에러를 발생시킨다.
BufferonBackpressureBuffer() 와 같은 형태
Lastest다운스트림이 받을때까지 마지막꺼만 유지를 한다.

Observable -> Flowable 로 변경시에도 인자로 넣는게 가능하다.

Observable<Integer> source = Observable.range(1,1000);
source.toFlowable(BackpressureStrategy.BUFFER)

하지만 조심해야 한다. Buffer는 초과시 OutOfMemory 오류가 발생한다.

Flowable -> Observable 변경시에도 toObserable()으로 가능하다.

Flowable<Integer> integers =
                 Flowable.range(1, 1000)
                         .subscribeOn(Schedulers.computation());
        Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
                 .flatMap(s -> integers.map(i -> i + "-" + s)
                 //위로는 Backpressure 지원함                 
                 .toObservable())           
                 //아래로는 지원안함..만약 request를 할때에도 적용안되니 주의..(subscriber 참조)      
                 .subscribe(System.out::println);


+ Recent posts