Notice
Recent Posts
Recent Comments
올해는 머신러닝이다.
[RxJava2]Flowable에서의 Backpressure 본문
Flowable.range()
, Flowable.just()
, Flowable.fromIterable()
, Flowable.interval()
의 경우 배압(Backpressure
) 이 설계가 되어있는 편이다.
하지만 Interval()
의 경우 시간에 따른 배출이 되는 데 이 부분에서 논리적으로 배압이 되긴 힘들다.
배출은 시간이 거리지만 이미 다운 스트림은 실행이 될수 있기 때문이다.
예를 들어보자.
public static void main(String[] args){
Flowable.interval(1, TimeUnit.MICROSECONDS)
.doOnNext( v -> System.out.println("push -> " + v))
.observeOn(Schedulers.io())
.map( i -> intenseCalculation(i))
.subscribe( v->System.out.println("emit : " + v) , Throwable::printStackTrace);
sleep( Long.MAX_VALUE );
}
public static <T> T intenseCalculation(T value){
int time = ThreadLocalRandom.current().nextInt(3000);
System.out.println("sleep time : " + time);
sleep( time );
return value;
}
public static void sleep(long mills) {
try{
Thread.sleep( mills );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
.................
o.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:96)
at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
..............
위와 같이 푸시는 계속되고 있지만 순간적으로 배출이 되는 순간이 생긴다.
이렬 경우 MissingBackpressureException
이 발생된다.
Flowable.interval() 을 제외한 나머지는 배압이 제대로 될 것이지만 시간관련된 이 함수는 오류가 생길수 있다.
이런 경우 onBackpressureDrop()
또는 onBackpressureBuffer()
을 사용해서 처리가 가능하다.
onBackpressureDrop
: Flowable(Observable)에서 항목을 보냈을때 바로 처리하지 못하는 데이터는 무시합니다.onBackpressureBuffer
: Flowable(Observable)에서 보낸 항목을 큐에 쌓아 놓고 항목을 처리하는 쪽에서 해당항목을 나중에 처리할 수 있게 해줍니다. 인자값으로 (int) 를 넣을 경우 큐에 쌓아둘수 있는 수를 제한 할 수 있습니다.
.onBackpressureDrop() <- 위치는 여기에 해줘야 정상 동작된다.
.observeOn(Schedulers.io())
그럼 다음장에서는 Subscriber 에 대해서 알아보자.
'스터디 > RxJava2' 카테고리의 다른 글
[RxJava2]Backpressure와 함께 Flowable를 만들어 보자 (0) | 2017.10.22 |
---|---|
[RxJava2]Subscriber 란 무엇인가? (0) | 2017.10.22 |
[RxJava2] Flowable에 대해서 알아보자 (0) | 2017.10.22 |
[RxJava2] 배압(Backpressure) 이란 무엇인가. (0) | 2017.10.22 |
[RxJava2]Backpressure 들어가기 전 (0) | 2017.10.22 |