Notice
Recent Posts
Recent Comments
올해는 머신러닝이다.
[RxJava2]Backpressure 의 함수들을 알아보자 본문
만약 Backpressure
없는 Flowable
을 사용시 어떤 함수를 사용하면 좋은지 알아보자.
우선 onBackpressureBuffer()
을 알아보자.
복습을 하자면
Flowable.interval( 1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
을 사용시 배출이 생성하는 것을 못 따라주기 때문에 오류가 난다.
Received MyItem : 21
Received MyItem : 22
Received MyItem : 23
Received MyItem : 24
io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:76)
그러기 때문에 배압(Backpressure
)을 넣어줘야 한다.
Flowable.interval( 1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
...........
....
Received MyItem : 47
Received MyItem : 48
Received MyItem : 49
Received MyItem : 50
Received MyItem : 51
Received MyItem : 52
Received MyItem : 53
Received MyItem : 54
Received MyItem : 55
Received MyItem : 56
Received MyItem : 57
Received MyItem : 58
....
정상적으로 작동이 된다. onBackpressureBuffer
의 경우 배압이 될 만한 부분을 버퍼에 넣어서 한꺼번에 배출시키는 역할을 한다.
인자로 주는 경우 주로 많이 쓰이는 내용(BackpressOverflowStrategy) 를 알아보자. 오버플로가 될 경우 발생된다.
ERROR | 용량이 가득차면 간단하게 오류를 뱉는다. |
---|---|
DROP_OLDEST | 최신껄 위해 가장 오랜된 걸 먼저 비운다. |
DROP_LATEST | 최신껄 삭제해서 사용되지 않은 오래된 값을 우선순위를 지정한다. |
그럼 DROP_LATEST 를 사용해보자.
Flowable.interval( 1, TimeUnit.MILLISECONDS)
//long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy
//Action은 오버플로우 발생시 콜백을 지정할 수 있다.
.onBackpressureBuffer(10 , ()-> System.out.println("overflow") , BackpressureOverflowStrategy.DROP_LATEST)
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
.................
Received MyItem : 135
overflow
overflow
overflow
overflow
overflow
Received MyItem : 136 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
Received MyItem : 489 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
overflow
Received MyItem : 490
넘칠 경우 overflow
출력이 되고 있다. DROP_LAST 를 사용시 136 에서 489 사이에 숫자들이 생략된게 보인다.
이유는 이미 가득차서 최신껄 삭제를 해서 다시 넣는 과정을 거친다.
onBackpressureLatest()
onBackpressureBuffer() 의 약간 변형이다. 최신 값을 제공하며 그 시간동안 방출된 이전 값은 모두 제거한다.
예를 보자.
Flowable.interval( 1, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 125
Received MyItem : 126
Received MyItem : 127 <------
Received MyItem : 485 <------
Received MyItem : 486
Received MyItem : 487
숫자가 갑자기 점프를 하였다. 이유는 이미 안은 차서 최신껄 제공하고 이전껀 제거를 하고 있기 때문이다.
onBackPressureDrop()
오버플로 발생시 단순히 해당 아이템을 드랍함으로써 질서를 유지한다.
예를 보자.
Flowable.interval( 1, TimeUnit.MILLISECONDS)
.onBackpressureDrop(i -> System.out.println("Dropping " + i))
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 1583
Dropping 2013
Dropping 2014
Dropping 2015
Dropping 2016
Received MyItem : 1584
Dropping 2017
Received MyItem : 1585
Received MyItem : 1586
Received MyItem : 1587
Received MyItem : 1588
오버 플로우시 Drop 을 하고 있다.
다음 장에선 Flowable.generate()
을 살펴볼 예정이다.
만약 우리가 배압을 직접 컨트롤 하고 싶을때 사용하는 함수이다.
'링크모음 > rxjava' 카테고리의 다른 글
RxJava1->2로 이동시 유의해야할 내용 링크 (0) | 2017.11.27 |
---|---|
하나의 아이템이 배출되면 다른 모든게 배출되도록 하기 (0) | 2017.11.20 |
RxJava 를 활용한 효과적인 네트워크 체크 풀링 (0) | 2017.09.27 |
RxJava2 함수 간단 예제 모음 (0) | 2017.09.15 |
Rxjava2 Collection 예제 (0) | 2017.09.15 |