«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

[RxJava2]Backpressure 의 함수들을 알아보자 본문

링크모음/rxjava

[RxJava2]Backpressure 의 함수들을 알아보자

행복한 수지아빠 2017. 10. 22. 12:36

만약 Backpressure없는 Flowable을 사용시 어떤 함수를 사용하면 좋은지 알아보자.

우선 onBackpressureBuffer() 을 알아보자.

복습을 하자면

Flowable.interval( 1, TimeUnit.MILLISECONDS)
        .observeOn(Schedulers.io())
        .subscribe(i -> {
            sleep(5);
            System.out.println("Received MyItem : " + i);
        });

을 사용시 배출이 생성하는 것을 못 따라주기 때문에 오류가 난다.

Received MyItem : 21
Received MyItem : 22
Received MyItem : 23
Received MyItem : 24
io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:76)

그러기 때문에 배압(Backpressure)을 넣어줘야 한다.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer()
    .observeOn(Schedulers.io())
    .subscribe(i -> {
        sleep(5);
        System.out.println("Received MyItem : " + i);
});
...........
....
Received MyItem : 47
Received MyItem : 48
Received MyItem : 49
Received MyItem : 50
Received MyItem : 51
Received MyItem : 52
Received MyItem : 53
Received MyItem : 54
Received MyItem : 55
Received MyItem : 56
Received MyItem : 57
Received MyItem : 58
....

정상적으로 작동이 된다. onBackpressureBuffer 의 경우 배압이 될 만한 부분을 버퍼에 넣어서 한꺼번에 배출시키는 역할을 한다.

인자로 주는 경우 주로 많이 쓰이는 내용(BackpressOverflowStrategy) 를 알아보자. 오버플로가 될 경우 발생된다.

ERROR용량이 가득차면 간단하게 오류를 뱉는다.
DROP_OLDEST최신껄 위해 가장 오랜된 걸 먼저 비운다.
DROP_LATEST최신껄 삭제해서 사용되지 않은 오래된 값을 우선순위를 지정한다.

그럼 DROP_LATEST 를 사용해보자.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
        //long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy
        //Action은 오버플로우 발생시 콜백을 지정할 수 있다.
        .onBackpressureBuffer(10 , ()-> System.out.println("overflow") , BackpressureOverflowStrategy.DROP_LATEST)
        .observeOn(Schedulers.io())
        .subscribe(i -> {
            sleep(5);
            System.out.println("Received MyItem : " + i);
});
.................
Received MyItem : 135
overflow
overflow
overflow
overflow
overflow
Received MyItem : 136 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
Received MyItem : 489 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
overflow
Received MyItem : 490

넘칠 경우 overflow 출력이 되고 있다. DROP_LAST 를 사용시 136 에서 489 사이에 숫자들이 생략된게 보인다.

이유는 이미 가득차서 최신껄 삭제를 해서 다시 넣는 과정을 거친다.

onBackpressureLatest()

onBackpressureBuffer() 의 약간 변형이다. 최신 값을 제공하며 그 시간동안 방출된 이전 값은 모두 제거한다.

예를 보자.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
        .onBackpressureLatest()
        .observeOn(Schedulers.io())
        .subscribe(i -> {
            sleep(5);
            System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 125
Received MyItem : 126
Received MyItem : 127 <------
Received MyItem : 485 <------
Received MyItem : 486
Received MyItem : 487

숫자가 갑자기 점프를 하였다. 이유는 이미 안은 차서 최신껄 제공하고 이전껀 제거를 하고 있기 때문이다.

onBackPressureDrop()

오버플로 발생시 단순히 해당 아이템을 드랍함으로써 질서를 유지한다.

예를 보자.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop(i -> System.out.println("Dropping " + i))
    .observeOn(Schedulers.io())
    .subscribe(i -> {
        sleep(5);
        System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 1583
Dropping 2013
Dropping 2014
Dropping 2015
Dropping 2016
Received MyItem : 1584
Dropping 2017
Received MyItem : 1585
Received MyItem : 1586
Received MyItem : 1587
Received MyItem : 1588

오버 플로우시 Drop 을 하고 있다.

다음 장에선 Flowable.generate() 을 살펴볼 예정이다.

만약 우리가 배압을 직접 컨트롤 하고 싶을때 사용하는 함수이다.