우선 onBackpressureBuffer()
을 알아보자.
Flowable.interval( 1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
Received MyItem : 21
Received MyItem : 22
Received MyItem : 23
Received MyItem : 24
io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:76)
Flowable.interval( 1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
...........
....
Received MyItem : 47
Received MyItem : 48
Received MyItem : 49
Received MyItem : 50
Received MyItem : 51
Received MyItem : 52
Received MyItem : 53
Received MyItem : 54
Received MyItem : 55
Received MyItem : 56
Received MyItem : 57
Received MyItem : 58
....
ERROR | 용량이 가득차면 간단하게 오류를 뱉는다. |
---|
DROP_OLDEST | 최신껄 위해 가장 오랜된 걸 먼저 비운다. |
DROP_LATEST | 최신껄 삭제해서 사용되지 않은 오래된 값을 우선순위를 지정한다. |
Flowable.interval( 1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(10 , ()-> System.out.println("overflow") , BackpressureOverflowStrategy.DROP_LATEST)
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
.................
Received MyItem : 135
overflow
overflow
overflow
overflow
overflow
Received MyItem : 136 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
Received MyItem : 489 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
overflow
Received MyItem : 490
onBackpressureLatest()
Flowable.interval( 1, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 125
Received MyItem : 126
Received MyItem : 127 <------
Received MyItem : 485 <------
Received MyItem : 486
Received MyItem : 487
onBackPressureDrop()
Flowable.interval( 1, TimeUnit.MILLISECONDS)
.onBackpressureDrop(i -> System.out.println("Dropping " + i))
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 1583
Dropping 2013
Dropping 2014
Dropping 2015
Dropping 2016
Received MyItem : 1584
Dropping 2017
Received MyItem : 1585
Received MyItem : 1586
Received MyItem : 1587
Received MyItem : 1588