«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

[RxJava2]Backpressure와 함께 Flowable를 만들어 보자 본문

스터디/RxJava2

[RxJava2]Backpressure와 함께 Flowable를 만들어 보자

행복한 수지아빠 2017. 10. 22. 12:35

Flowable 을 생성시 BackpressureStrategy 을 사용이 가능하다.

예제를 한번 보자.

Flowable<Integer> source = Flowable.create(emitter -> {
    for (int i=0;i<1000;i++) {
        if(emitter.isCancelled())
            return;

        emitter.onNext(i);
    }

    emitter.onComplete();    
}  , BackpressureStrategy.BUFFER); //BackpressureStrategy.BUFFER 을 인자로 넣어서 생성이 가능하다. 

source.observeOn(Schedulers.io())
        .subscribe(System.out::println);

sleep( 1000 );
..........
0
1
2
3
4
...

인자로 넣을 수 있는 건 5가지가 있다.

BackpressureStrategy설명
Missing배압을 적용안한다. 후에 onBackpressureXXX()등으로 컨트롤이 가능하다.
ErrorMissingBackpressureException 발생시 에러를 발생시킨다.
BufferonBackpressureBuffer() 와 같은 형태
Lastest다운스트림이 받을때까지 마지막꺼만 유지를 한다.

Observable -> Flowable 로 변경시에도 인자로 넣는게 가능하다.

Observable<Integer> source = Observable.range(1,1000);
source.toFlowable(BackpressureStrategy.BUFFER)

하지만 조심해야 한다. Buffer는 초과시 OutOfMemory 오류가 발생한다.

Flowable -> Observable 변경시에도 toObserable()으로 가능하다.

Flowable<Integer> integers =
                 Flowable.range(1, 1000)
                         .subscribeOn(Schedulers.computation());
        Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
                 .flatMap(s -> integers.map(i -> i + "-" + s)
                 //위로는 Backpressure 지원함                 
                 .toObservable())           
                 //아래로는 지원안함..만약 request를 할때에도 적용안되니 주의..(subscriber 참조)      
                 .subscribe(System.out::println);