UnicastSubject

PublishSubject 와 굉장한 유사하지만 하나의 큰 차이점이 있다.

Observer 가 subscribe 를 한 후부터 값이 배출이 되기 시작한다.

예를 보자.

public static void main(String[] args) {

io.reactivex.subjects.Subject<String> subject = UnicastSubject.create();

System.out.println(currentTime() + "옵저버 등록시작");
Observable.interval(300 , TimeUnit.MILLISECONDS)
        .map( l -> (( l + 1) * 300) + " milliseconds")
        .subscribe(subject);

sleep(2000);

subject.subscribe( s -> System.out.println( currentTime() + " -- Observer 1 : " + s));

sleep(2000);

subject.subscribe( s -> System.out.println( currentTime() + " -- Observer 2 : " + s));

sleep(2000);

}

public static String currentTime(){
        Date date = new Date(System.currentTimeMillis());
        DateFormat formatter = new SimpleDateFormat("HH시 mm분 ss초");
        return formatter.format(date);
}

.............
58초옵저버 등록시작
00초 -- Observer 1 : 300 milliseconds <-- 2초 sleep 후 실행 (subscribe 후 시작됨) , 그리고 한꺼번에 캐쉬내용을 출력함
00초 -- Observer 1 : 600 milliseconds
00초 -- Observer 1 : 900 milliseconds
00초 -- Observer 1 : 1200 milliseconds
00초 -- Observer 1 : 1500 milliseconds
00초 -- Observer 1 : 1800 milliseconds
00초 -- Observer 1 : 2100 milliseconds
01초 -- Observer 1 : 2400 milliseconds
01초 -- Observer 1 : 2700 milliseconds
01초 -- Observer 1 : 3000 milliseconds
01초 -- Observer 1 : 3300 milliseconds
02초 -- Observer 1 : 3600 milliseconds
02초 -- Observer 1 : 3900 milliseconds

.......
io.reactivex.exceptions.OnErrorNotImplementedException: Only a single observer allowed.
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)

위에 보면 58초에 시작해서

기다리고 있다가 subscribe되는 순간 7개를 한꺼번에 배출 시킨다. 내부적으로 버퍼에 넣어두고 캐싱으로 사용한다.

배출함으로써 캐싱은 지워지게 된다.

그리고 하나의 subscribe 만 사용가능하며 두개이상일 경우 위와 같이 오류가 발생한다.

만약 두개이상 사용하기를 원하면 ReplaySubject를 고려하길 바란다.

그래도 굳이 해야하는 경우라면 다음과 같이 proxy 로 속여서 만들수 있습니다.

io.reactivex.subjects.Subject<String> subject = UnicastSubject.create();

System.out.println(currentTime() + " 옵저버 등록시작");
Observable.interval(300 , TimeUnit.MILLISECONDS)
        .map( l -> (( l + 1) * 300) + " milliseconds")
        .subscribe(subject);

sleep(2000);

//multicasting
Observable<String> multicast = subject.publish().autoConnect(); //자동연결로 만들기

multicast.subscribe( s -> System.out.println( currentTime() + " -- Observer 1 : " + s));
sleep(2000);

multicast.subscribe( s -> System.out.println( currentTime() + " -- Observer 2 : " + s));
sleep(2000);
.......
31초 옵저버 등록시작
33초 -- Observer 1 : 300 milliseconds <-- subscribe되는 동시에 한꺼번에 배출이 시작된다..
33초 -- Observer 1 : 600 milliseconds
33초 -- Observer 1 : 900 milliseconds
33초 -- Observer 1 : 1200 milliseconds
33초 -- Observer 1 : 1500 milliseconds
33초 -- Observer 1 : 1800 milliseconds
33초 -- Observer 1 : 2100 milliseconds
34초 -- Observer 1 : 2400 milliseconds
34초 -- Observer 1 : 2700 milliseconds
34초 -- Observer 1 : 3000 milliseconds
35초 -- Observer 1 : 3300 milliseconds
35초 -- Observer 1 : 3600 milliseconds
35초 -- Observer 1 : 3900 milliseconds
36초 -- Observer 1 : 4200 milliseconds
36초 -- Observer 2 : 4200 milliseconds
36초 -- Observer 1 : 4500 milliseconds
36초 -- Observer 2 : 4500 milliseconds
36초 -- Observer 1 : 4800 milliseconds
36초 -- Observer 2 : 4800 milliseconds
36초 -- Observer 1 : 5100 milliseconds
36초 -- Observer 2 : 5100 milliseconds
37초 -- Observer 1 : 5400 milliseconds
37초 -- Observer 2 : 5400 milliseconds
37초 -- Observer 1 : 5700 milliseconds
37초 -- Observer 2 : 5700 milliseconds
37초 -- Observer 1 : 6000 milliseconds
37초 -- Observer 2 : 6000 milliseconds


'스터디 > RxJava2' 카테고리의 다른 글

[Rxjava2]Windowing을 알아보자.  (0) 2017.10.22
[RxJava2]Buffering 에 대해서 알아보자.  (0) 2017.10.22
[Rxjava2]AsyncSubject  (0) 2017.10.22
[RxJava2]ReplaySubject  (0) 2017.10.22
[Rxjava2]BehaviorSubject 에 대해서 알아보자.  (0) 2017.10.22

+ Recent posts