Observable은 주로 푸시를 기반으로 하는 성격을 지닌다.

예를 들어 하나의 쓰레드에서 1에서 999,999,999까지 숫자를 내보내는 경우 이상없는 지 체크 해보자.

우선 옵저버를 만들고 푸시(배출)을 진행해보자.

public static void main(String[] args){
    Observable.range( 1, 999_999_999)
            .map(MyItem::new)
            .subscribe(myItem -> {
                sleep(50);
                System.out.println("Received MyItem : " + myItem.id);
            });
}

static void sleep(long millseconds) {
    try{
        Thread.sleep(millseconds);
    }catch (InterruptedException ex){
        ex.printStackTrace();
    }
}

static final class MyItem {
    final int id;

    MyItem(int id){
        this.id = id;
        System.out.println("Constructing MyItem " + id);
    }
}
=============================================================================
Constructing MyItem 1
Received MyItem : 1
Constructing MyItem 2
Received MyItem : 2
Constructing MyItem 3
Received MyItem : 3
Constructing MyItem 4
Received MyItem : 4
Constructing MyItem 5
Received MyItem : 5
Constructing MyItem 6
Received MyItem : 6
.....

결과를 보다시피 일관된 흐름으로 배출되는 걸 볼수 있다. .

하지만 만약 비동기적으로 호출시 어떻게 될까?

아래 예제는observeOn (다운 스트림 쓰레드) 를 적용했다.

public static void main(String[] args){
        Observable.range( 1, 999_999_999)
                .map(MyItem::new)
                .observeOn(Schedulers.io())
                .subscribe(myItem -> {
                    sleep(50);
                    System.out.println("Received MyItem : " + myItem.id);
                });

        sleep(Integer.MAX_VALUE);
}

그럼 결과가 간혹 이런문제가 생긴다.

...
 Constructing MyItem 1001899
 Constructing MyItem 1001900
 Constructing MyItem 1001901
 Constructing MyItem 1001902
 Received MyItem 38
 Constructing MyItem 1001903
 Constructing MyItem 1001904
 Constructing MyItem 1001905
 Constructing MyItem 1001906
 Constructing MyItem 1001907
 ..

결과를 보면 생성이 배출보다 많은 걸 볼수 있다.

이런 경우 배압(Backpressure)이 필요하다. 이 부분을 무시하면 OutOfMemoryError가 발생될수 있다.

+ Recent posts