«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

[RxJava2]Flowable에서의 Backpressure 본문

스터디/RxJava2

[RxJava2]Flowable에서의 Backpressure

행복한 수지아빠 2017. 10. 22. 12:33

Flowable.range()Flowable.just() , Flowable.fromIterable()Flowable.interval() 의 경우 배압(Backpressure) 이 설계가 되어있는 편이다.

하지만 Interval() 의 경우 시간에 따른 배출이 되는 데 이 부분에서 논리적으로 배압이 되긴 힘들다.

배출은 시간이 거리지만 이미 다운 스트림은 실행이 될수 있기 때문이다.

예를 들어보자.

public static void main(String[] args){
    Flowable.interval(1, TimeUnit.MICROSECONDS)
            .doOnNext( v -> System.out.println("push -> " + v))
            .observeOn(Schedulers.io())
            .map( i -> intenseCalculation(i))
            .subscribe( v->System.out.println("emit : " + v) , Throwable::printStackTrace);

    sleep( Long.MAX_VALUE );
}

public static <T> T intenseCalculation(T value){
    int time = ThreadLocalRandom.current().nextInt(3000);
    System.out.println("sleep time : " + time);
    sleep( time );
    return value;
}

public static void sleep(long mills) {
    try{
        Thread.sleep( mills );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

.................
o.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
    at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:96)
    at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
    ..............

위와 같이 푸시는 계속되고 있지만 순간적으로 배출이 되는 순간이 생긴다.

이렬 경우 MissingBackpressureException이 발생된다.

Flowable.interval() 을 제외한 나머지는 배압이 제대로 될 것이지만 시간관련된 이 함수는 오류가 생길수 있다.

이런 경우 onBackpressureDrop() 또는 onBackpressureBuffer() 을 사용해서 처리가 가능하다.

  1. onBackpressureDrop : Flowable(Observable)에서 항목을 보냈을때 바로 처리하지 못하는 데이터는 무시합니다.
  2. onBackpressureBuffer : Flowable(Observable)에서 보낸 항목을 큐에 쌓아 놓고 항목을 처리하는 쪽에서 해당항목을 나중에 처리할 수 있게 해줍니다. 인자값으로 (int) 를 넣을 경우 큐에 쌓아둘수 있는 수를 제한 할 수 있습니다.
.onBackpressureDrop() <- 위치는 여기에 해줘야 정상 동작된다.
.observeOn(Schedulers.io())

그럼 다음장에서는 Subscriber 에 대해서 알아보자.