AsyncSubject

AsyncSubject는 소스 Observable로부터 배출된 마지막 값(만)을 배출하고 소스 Observable의 동작이 완료된 후에야 동작한다. (만약, 소스 Observable이 아무 값도 배출하지 않으면AsyncSubject역시 아무 값도 배출하지 않는다.)

즉 쉽게 말하자면 onCompleted를 하기전 이벤트만 배출이 된다는 것이다. onCompleted가 호출이 되지 않으면 배출이 되지 않는다.

io.reactivex.subjects.Subject<String> subject = AsyncSubject.create();

subject.subscribe( v -> System.out.println("Observer 1: " + v) ,
        Throwable::printStackTrace,
        () -> System.out.println("Observer 1 done!!")
        );

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma"); <-- 이 값만 배출이 될것이다. 
//subject.onComplete(); <-- onCompleted 호출 

subject.subscribe( v -> System.out.println("Observer 2: " + v) ,
        Throwable::printStackTrace,
        () -> System.out.println("Observer 2 done!!")
);

그리고 내부적으로 Observable에 대해서 takeLast(1).replay(1) 을 사용하고 있다는 점을 유의해야 한다.

이 Subject는 java 8 에서 CompletableFuture과 유사합니다. 계산을을 수행해서 완료를 관찰하면서 값을 얻는 형식입니다.

ReplaySubject

이전 시간에 공부한 replay() , cache() 기능을 기억하는가.

이전에 배출한 모든 걸 캐싱해서 새로운 Observer등록시 캐싱된걸 다시 배출시키는 역할이다.

ReplaySubject는 그 기능을 가진 Subject이다.

즉 다시 말하자면 PublishSubject후에 .cache() 을 붙였다고 보면 된다.

//ReplaySubject 선언
io.reactivex.subjects.Subject<String> subject = ReplaySubject.create();

subject.subscribe( v -> System.out.println("Observer 1: " + v));

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

subject.subscribe( v -> System.out.println("Observer 2: " + v));
subject.subscribe( v -> System.out.println("Observer 3: " + v));
...........
Observer 1: Alpha
Observer 1: Beta
Observer 1: Gamma
Observer 2: Alpha <-- 처음부터 캐싱된 걸 보여준다.
Observer 2: Beta
Observer 2: Gamma
Observer 3: Alpha <-- 3번째도 캐싱된걸 보여주고 있다. 
Observer 3: Beta
Observer 3: Gamma


BehaviorSubject

주로 많이 쓰이는 subject중 하나이다.

기능은 최근 마지막껄 캐싱해서 보여주는 subject이다.

이전 시간에 배운 replay() , autoConnect() 을 기억하는가?

이 BehaviorSubject는 이 2가지 기능들을 통합해놓은 거라고 보면된다.

즉 PublishSubject후에 replay(1).autoConnect() 를 통합한 Subject 이다.

그럼 예제를 한번 보자.

//BehaviorSubject 선언
io.reactivex.subjects.Subject<String> subject = BehaviorSubject.create();

subject.subscribe( v -> System.out.println("Observer 1: " + v));

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");

subject.subscribe( v -> System.out.println("Observer 2: " + v));
subject.subscribe( v -> System.out.println("Observer 3: " + v));
...........
Observer 1: Alpha
Observer 1: Beta
Observer 1: Gamma
Observer 2: Gamma <-- 캐싱되서 1개만 보여주고 있다. 
Observer 3: Gamma <-- 캐싱되서 1개만 보여주고 있다.

만약 마지막 1개의 이벤트를 배출하고 싶다면 BehaviorSubject 을 사용하자.


'스터디 > RxJava2' 카테고리의 다른 글

[Rxjava2]AsyncSubject  (0) 2017.10.22
[RxJava2]ReplaySubject  (0) 2017.10.22
[Rxjava2]예제로 이해하는 PublishSubject  (0) 2017.10.22
[RxJava2] Replaying , Caching 을 알아보자.  (0) 2017.10.22
[RxJava2]refCount 와 share 기본 개념  (0) 2017.10.22

PublishSubject

통상 일반적인 subject 형태이다.

onNext() , onComplete() , onError() 를 호출해서 사용이 가능하다.

그럼 예로 보자.

io.reactivex.subjects.Subject<String> subject = PublishSubject.create();

subject.map(String::length) // 문자길이를 리턴하고 있다. 
        .subscribe(System.out::println);

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

...........
5
4
5

간단한 기본 형태라 어렵지는 않다.

onComplete() 를 하는 순간 더이상 Observer 가 없다고 알려줄 수 있다.

Subject를 언제 쓰면 유용할까?

나도 지금까지 많이 쓰면서 늘 이 질문을 했었다. 이럴때 과연 쓰는게 맞는 걸까?

우선 기본적인 사용처는 Multicasting상황에서 아주 유용하다. (예를 들어 이벤트 버스)

그리고 다른 방법으로 여러개의 Observable을 Merge할 때 매우 유용하다. ( Observable.merge() 대신)

그럼 예를 들어 보자.

//1초 간격으로 배출시킨다.
Observable<String> source1 = Observable.interval( 1, TimeUnit.SECONDS).map(v -> (v + 1) + " seconds");
// 0.3 초 간격으로 배출시킨다.
Observable<String> source2 = Observable.interval(300 , TimeUnit.MILLISECONDS).map(v -> (v + 1) + " milliseconds");

//선언
Subject<String> subject = PublishSubject.create();

//값 출력
subject.subscribe(System.out::println);

source1.subscribe( subject );
source2.subscribe( subject );

sleep(3000); //3초동안 유지

........
1 milliseconds <- source 1 실행
2 milliseconds
3 milliseconds
1 seconds  <- source 2 실행
4 milliseconds
5 milliseconds
6 milliseconds
2 seconds
7 milliseconds
8 milliseconds
9 milliseconds
10 milliseconds
3 seconds

보다 시피 2개의 Observer를 등록해서 머지 해서 사용할 수 있다. 이러한 방법이 주는 장점은 모듈화 및 캡슐화 되어 있는 모델방식에서 유용하다고 한다.

하지만 Subject 사용시 조심해야 할 부분이 있다.

onNext() , onComplete() 사용시 Observable + Observer 정의가 되어있다고 생각하고 호출해야 한다.

안그러면 생각했던 결과가 나오지 않는다.

이전 예제를 다시 보면

//선언
io.reactivex.subjects.Subject<String> subject = PublishSubject.create();

subject.map(String::length)
        .subscribe(System.out::println);

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

........................................
5
4
5

이러한 결과가 나오지만..

//선언
io.reactivex.subjects.Subject<String> subject = PublishSubject.create();

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

// 후에 등록시 문제가 발생된다. 
subject.map(String::length)
        .subscribe(System.out::println);
.........
출력안됨
.........

이런 경우 이전에 배운 publish() 또는 replay() 등으로 Hot Hot 하게 만들어서 Multicasting을 제어할수 있다.

그리고 여러번의 subject.onNext() 남발로 인해 배출들이 꼬일수 있다.

그럴 경우를 대비해서 직렬화라는 기능을 제공한다.

Subject<String> subject = PublishSubject.<String>create().toSerialized();


Replay와 Caching

멀티 캐스팅을 사용하면 여러 Observer 들 사이에서 공유되는 값을 캐시 할 수 있습니다.

우선 Replay를 보자.

이미 많은 예제사이트들을 보셨을 수도 있지만 이전에 배출한 값을 캐싱해서 가지고 있다가 새롭게 배출시 캐싱한 걸 같이 먼저 배출해준다.

예제를 보자.

Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
    .replay()
    .autoConnect();

//Observer 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observer 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observer 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기

.............
Observer 1 : 0
Observer 1 : 1
Observer 1 : 2
Observer 2: 0 <- 2번째 Observer 시작시 캐싱된 게 먼저 배출을 시작하고 있다.
Observer 2: 1
Observer 2: 2 
Observer 1 : 3
Observer 2: 3
Observer 1 : 4 
Observer 2: 4 
Observer 3: 0 <- 3번째 Observer 시작시 캐싱된 게 먼저 배출을 시작하고 있다.
Observer 3: 1
Observer 3: 2
Observer 3: 3
Observer 3: 4
Observer 1 : 5
Observer 2: 5
Observer 3: 5
Observer 1 : 6
Observer 2: 6
Observer 3: 6
Observer 1 : 7
Observer 2: 7
Observer 3: 7

위 내용을 보면 2,3번째 Observer가 배출시 1에서 먼저 배출되었던 3번의 값을 캐싱해서 같이 내보내고 있다.

하지만 캐싱작업으로 인해서 메모리 오버플로우 가 발생될 수 있으니 인자값을 통해서 버퍼수를 정해주는 방법이 좋다.

.replay(2)

.......
Observer 1 : 0
Observer 1 : 1
Observer 1 : 2
Observer 2: 1 <-- 처음 0은 제외된 걸 볼수 있다. 
Observer 2: 2
Observer 1 : 3
Observer 2: 3
Observer 1 : 4
Observer 2: 4
Observer 1 : 5
Observer 2: 5
Observer 3: 4 <-- 3부터는 캐싱작업이 되지 않는 다... Observer 2번 에서 캐싱하고 비워버림
Observer 3: 5
Observer 1 : 6
Observer 2: 6
Observer 3: 6
Observer 1 : 7
Observer 2: 7
Observer 3: 7
Observer 1 : 8
Observer 2: 8
Observer 3: 8

위에서 보다시피 2번째 Observer에서는 처음 0을 제외하고 1,2 가 배출되고 있는 걸 볼 수 있다.

그런데 3부터는 앞에 2번의 캐싱이 발생되지 않는다.. refCount(2) 라고 정의시 2번째에서 캐싱하고 비워버리고 있다.

그럼 이 걸 방지 하기 위해선 어떻게 해야 될까?

캐싱 시간을 지정해서 유지하도록 할 수 있다.

//0.3 초 간격으로 도는 걸 확인
Observable<Long> threeRandoms = Observable.interval(300,TimeUnit.MILLISECONDS)
        //출력값 계산
        .map(v -> (v + 1) * 300) // map to elapsed milliseconds
        //캐싱 시간을 지정
        .replay(2 , TimeUnit.SECONDS)
        .autoConnect();

//Observer 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observer 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observer 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기

..............
Observer 1 : 300
Observer 1 : 600
Observer 1 : 900
Observer 1 : 1200
Observer 1 : 1500
Observer 1 : 1800
Observer 1 : 2100
Observer 1 : 2400
Observer 1 : 2700
Observer 2: 1200 <- 1초동안 캐싱된 걸 보여주고 있다. 
Observer 2: 1500
Observer 2: 1800
Observer 2: 2100
Observer 2: 2400
Observer 2: 2700
Observer 1 : 3000
Observer 2: 3000
Observer 1 : 3300
Observer 2: 3300
Observer 1 : 3600
Observer 2: 3600
Observer 1 : 3900
Observer 2: 3900
Observer 1 : 4200
Observer 2: 4200
Observer 1 : 4500
Observer 2: 4500
Observer 1 : 4800
Observer 2: 4800
Observer 1 : 5100
Observer 2: 5100
Observer 1 : 5400
Observer 2: 5400
Observer 1 : 5700
Observer 2: 5700
Observer 3: 4200 <- 1초동안 캐싱된 걸 보여주고 있다. 
Observer 3: 4500
Observer 3: 4800
Observer 3: 5100
Observer 3: 5400
Observer 3: 5700
Observer 1 : 6000
Observer 2: 6000
Observer 3: 6000

위에서 보다 시피 시간단위로 캐싱된 걸 배출 시킬 수 있다.

하지만 내가 원한건 앞에 2개만 유지시키는 거였다. 그럼 다시 인자값을 변경해보자.

.replay(2 , 1, TimeUnit.SECONDS) // buffersize , time 

....................
Observer 1 : 300
Observer 1 : 600
Observer 1 : 900
Observer 1 : 1200
Observer 1 : 1500
Observer 1 : 1800
Observer 1 : 2100
Observer 1 : 2400
Observer 1 : 2700
Observer 1 : 3000
Observer 2: 2700 <- 앞에 2개만 캐싱된 걸 볼수 있다.
Observer 2: 3000
Observer 1 : 3300
Observer 2: 3300
Observer 1 : 3600
Observer 2: 3600
Observer 1 : 3900
Observer 2: 3900
Observer 1 : 4200
Observer 2: 4200
Observer 1 : 4500
Observer 2: 4500
Observer 1 : 4800
Observer 2: 4800
Observer 1 : 5100
Observer 2: 5100
Observer 1 : 5400
Observer 2: 5400
Observer 1 : 5700
Observer 2: 5700
Observer 3: 5400 <- 앞에 2개만 캐싱된 걸 볼 수 있다. 
Observer 3: 5700
Observer 1 : 6000
Observer 2: 6000
Observer 3: 6000
Observer 1 : 6300
Observer 2: 6300
Observer 3: 6300

이제 내가 원한 형태로 캐싱되서 볼수 있다.

정리를 하자면 ConnectableObservable에서 들어오는 Observer를 모두 배출 시키고 원하면 autoConnect를 사용하면 된다.

그리고 무한 캐싱작업을 원하면 cache()또는 publish().refcount() 을 이용하면 된다.

추가적으로 캐싱작업시 예상되는 요소 수를 미리 지정하고 버퍼를 최적화를 할 수 있습니다.

Observable<Integer> cachingTestObservable =
        Observable.just(6, 2, 5, 7, 1, 4, 9, 8, 3) 
            .scan(0, (total,next) -> total + next)
            .cacheWithInitialCapacity(9); <- 9개의 요소를 미리 지정해서 버퍼를 최적화 시킴

주의 사항으로 cache를 폐기 할 계획이 없으면 사용하지 말자. 메모리를 계속 잡고 있어서 문제가 생길수 있기 때문이다.

대안으로 replay() 으로 캐시 크기등을 지정해서 제어하는 방향을 모색하는 게 바람직하다.

refCount() 는 autoConnect(1) 와 유사하다.

그럼 autoConnect와 refCount와 차이점은 무엇일까?

차이점은 더이상 배출할 Observer가 없을시 refCount는 자동으로 자신을 해지를 하고 다시 새로운 Observer 이 오면 처음부터 자동으로 시작을 한다.

autoConnect는 동작이 끝나면 dispose 가 되지만 다시 새로운게 오면 시작을 하진 않는다.

Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
    .publish()
    .refCount();

//Observable 1 배출
threeRandoms.take(5).subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observable 2 배출
threeRandoms.take(2).subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observable 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기
....................
Observable 1 : 0
Observable 1 : 1
Observable 1 : 2
Observable 1 : 3
Observable 2: 3
Observable 1 : 4
Observable 2: 4 <-- 배출이 완료가 되고 dispose가 됐다. 
Observable 3: 0 <-- 새로운 Observer 왔기 때문에 시작된다. 
Observable 3: 1

그리고 publish().refCount() 을 줄여서 share() 라고 사용가능하다.

Observable.interval(1,TimeUnit.SECONDS).share(); // publish().refCount() 와 같은 의미


앞에서 본 예제처럼 connect() 를 통해 동시 실행할 수 있다.

하지만 자동으로 connect를 할수도 있다.

주의 할 내용은 자동 연결(Auto connection) 은 사용할 때 상당한 주의를 요한다. 뜻하지 않게 배출시 제대로 되지 않는 경우가 발생될 수 있기 때문이다.

그럼 예제를 살펴보자 .

이전장에서 했던 2가지 서로 다른 배출 방법에 자동 연결만 추가한 부분이다.

Observable<Integer> threeRandoms = Observable.range(1,3)
    .map(i->randomInt())
    .publish()
    .autoConnect(2); //2번에 대해서 자동 연결을 하겠다는 뜻이다.!!


//Observable 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

//Observable 2 배출
threeRandoms.reduce( 0 ,(total , next) -> total + next)
.subscribe( i -> System.out.println("Observable 2: " + i));

//Observable 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

...........
Observable 1 : 44005
Observable 1 : 633061
Observable 1 : 464828
Observable 2: 1141894

예제에서 보면 2번만 연결하겠다고 선언했다.(기본값은 1이다.).

그래서 3번째 Observer무시되고 있다.

autoConnect() 을 사용시 중간에 들어오는 Observable 도 연결이 가능하다.

위에 말은 무슨 뜻일까?

예를 들어 Observer1개를 자동연결하는 과정에서 시간흐름에 따른(IntervalObserver배출이 달라질 수 있다는 것이다.

말로 설명하니 어려우니 우선 예제를 보자.

Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
                .publish()
                .autoConnect(); //기본값 1

//Observable 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observable 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observable 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기

................
Observable 1 : 0
Observable 1 : 1
Observable 1 : 2
Observable 1 : 3
Observable 2: 3 <- 추가적으로 연결된다. 
Observable 1 : 4 <- 다른 Observable 이 연결되더라도 이전 껀 사라지지 않는다. 
Observable 2: 4
Observable 1 : 5
Observable 2: 5
Observable 1 : 6
Observable 2: 6
Observable 3: 6
Observable 1 : 7
Observable 2: 7
Observable 3: 7 <- 다시 새로운 Observable 이 연결된다..
Observable 1 : 8
Observable 2: 8
Observable 3: 8

위 예제를 보면 새로운 Observer 이 연결되더라도 이전 배출도 계속 이루어지는 걸 볼수 있다.

이 기능은 어떠한 Observer 를 기다리지 않고 즉시 배출들을 시작할 수 있다는 장점이 있다.

그럼 refCount() 와 share() 기능에 대해서 다음장에서 살펴보자.

기본적인 Observable 생성 후 배출은 다음과 같습니다.

Observable<Integer> threeIntegers = Observable.range(1 , 3);
threeIntegers.subscribe( i -> System.out.println("Observer 1 : " + i));
threeIntegers.subscribe( i -> System.out.println("Observer 2 : " + i));
--------------------------
Observer 1 : 1
Observer 1 : 2
Observer 1 : 3
Observer 2 : 1
Observer 2 : 2
Observer 2 : 3

보다시피 Observable 1 이 끝나고Observable 2가 다시 실행되는 구조로 되어있다.

동시에 배출 하고 싶을땐 어떻게 할까.

그럴때 ConnectableObservable 을 사용할 수 있습니다.

사용법은 publish -> connect 로 연결해서 배출을 동시에 시킬수 있다 .

그럼 기본적인 사용예를 보자.

ConnectableObservable<Integer> threeIntegers = Observable.range(1 , 3).publish();
threeIntegers.subscribe( i -> System.out.println("Observer 1 : " + i));
threeIntegers.subscribe( i -> System.out.println("Observer 2 : " + i));

//여기까지 아무런 배출이 이루어지지 않는다.
threeIntegers.connect(); //이제 동시에 배출을 시킨다.

-----------------
Observer 1 : 1
Observer 2 : 1
Observer 1 : 2
Observer 2 : 2
Observer 1 : 3
Observer 2 : 3

조금 더 깊이 들어가보자.

다시 일반적인cold Observable 을 보자

public static void main(String[] args){

    Observable<Integer> threeRandoms = Observable.range(1 , 3)
                                        .map( i-> randomInt());

    threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));
    threeRandoms.subscribe( i -> System.out.println("Observer 2 : " + i));

}

public static int randomInt(){
    return ThreadLocalRandom.current().nextInt(100000);
}
---------------------------------
Observer 1 : 25134
Observer 1 : 66916
Observer 1 : 81
Observer 2 : 54593
Observer 2 : 95417
Observer 2 : 25863

생각했던 기본 형태인 첫번째 Observer 배출이 다 되고 두번째가 실행된다.

그럼 이 구조의 형태는 다음과 같다고 본다.

각각의 나누어진 형태로 결과를 배출 하고 있다. 어려운게 없다.

자 그럼 publish 사용시 어떻게 바뀌는 지 보자.

ConnectableObservable<Integer> threeInts = Observable.range(1, 3).publish(); //publish 붙는 위치가 중요하다. 

Observable<Integer> threeRandoms = threeInts.map( i-> randomInt());

threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));
threeRandoms.subscribe( i -> System.out.println("Observer 2 : " + i));

threeInts.connect();
---------------------------------------------------
Observer 1 : 12581
Observer 2 : 65620
Observer 1 : 85896
Observer 2 : 43633
Observer 1 : 77427
Observer 2 : 82409

이런식으로 이루어질텐데.. 문제가 뭘까?

결과가 보면 Observable 1과 Observable 2가 전혀 값이 똑같지 않다.

이유는 map에서 랜덤 숫자를 생성하는데 그 전에 publish을 호출했기 때문이다.

그러면 위치를 바꿔보자.

//publish 붙는 위치가 중요하다.

ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3).map( i-> randomInt()).publish(); 

threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));
threeRandoms.subscribe( i -> System.out.println("Observer 2 : " + i));

threeRandoms.connect();
--------------------------
Observer 1 : 74650
Observer 2 : 74650
Observer 1 : 33655
Observer 2 : 33655
Observer 1 : 99136
Observer 2 : 99136

값이 동일하게 이루어진 걸 볼수 있다.

그럼 흐름은 다음과 같다.

그림으로 보니 쉽게 이해가 된다.

그럼 언제 Multicast을 해야 할까?

일반적으로 중복되는 형태 , 재활용되는 Observable 형태를 사용시 자주 쓰인다.

그럼 재활용해서 다시 가공하는 예제를 보자.

ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3).map( i-> randomInt()).publish(); //publish 붙는 위치가 중요하다.

//첫번째 배출 시작
threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));

//두번째는 재활용해서 다른 형태로 가공
//랜덤으로 들어오는 숫자들을 합해서(누진계) 마지막에 한번 배출 한다. 
threeRandoms.reduce(0,(total , next) -> total + next).subscribe( i -> System.out.println("Observer 2 : " + i));

threeRandoms.connect();
--------------------------
Observer 1 : 82613
Observer 1 : 96148
Observer 1 : 27865
Observer 2 : 206626 <- 한번 배출된걸 보면 된다.

그림 형태는 다음과 같을것이다.

일반적으로 Observable들은 cold상태인것들이다.

RxJava를 하다보면 많이 듣는 게 cold와 hot Observable이다.

도대체 이것들이 뭔가..

Cold Observable

  • 일반적인 옵저버형태를 말한다.
  • 누가 구독해주지 않으면 데이터를 배출을 하지 않는다.
  • 일반적인 웹 요청 , 데이터베이스 쿼리등이 사용되며 내가 요청하면 결과를 받는 과정을 거친다.
  • 처음부터 발행하는 걸 기본으로 한다.

Hot Observable

  • 구독자의 존재 여부와 상관없이 데이터를 배출하는 Observable 이다.
  • 그래서 여러 구독자에 선택적으로 고려가능하다.
  • 구독시점으로부터 발행하는 값을 받는 걸 기본으로 한다.
  • 마우스 이벤트 , 키보드 이벤트 , 시스템 이벤트등이 주로 사용된다.
  • 멀티캐스팅도 포함된다.

데이터 발행자와 수신자도 이참에 정리해보자.

데이터 발행자데이터 수신자
ObservableSubscriber
SingleObserver
MaybeConsumer
Subject
Completable
  • Subscriber (구독자) : Observable 을 연결할 시 subscribe 함수를 사용하는데 이 과정에서 구독이 된다고 하여 구독자라고 부른다.
  • Observer (옵저버) : 스타의 옵저버가 아니다. RxJava는 옵저버 패턴을 사용해서 늘 관찰하고 작업을 수행합니다. 발신자를 Observable 이 되고 데이터 수신자를 옵저버라고 한다.
  • Consumer : Rxjava2 부터 소비자 패턴을 이용해서 소비를 시킨다. 이 패턴에 대해서 더 자세히 알아볼려면 클릭

일반덕으로 Observable 은 Code 성격이며 특별히 스트림을 Hot으로 변환하지 않으면 Code 상태이다

+ Recent posts