Notice
Recent Posts
Recent Comments
올해는 머신러닝이다.
[Rxjava2] 예제로 배우는 UnicastSubject 본문
UnicastSubject
PublishSubject
와 굉장한 유사하지만 하나의 큰 차이점이 있다.
Observer 가 subscribe 를 한 후부터 값이 배출이 되기 시작한다.
예를 보자.
public static void main(String[] args) {
io.reactivex.subjects.Subject<String> subject = UnicastSubject.create();
System.out.println(currentTime() + "옵저버 등록시작");
Observable.interval(300 , TimeUnit.MILLISECONDS)
.map( l -> (( l + 1) * 300) + " milliseconds")
.subscribe(subject);
sleep(2000);
subject.subscribe( s -> System.out.println( currentTime() + " -- Observer 1 : " + s));
sleep(2000);
subject.subscribe( s -> System.out.println( currentTime() + " -- Observer 2 : " + s));
sleep(2000);
}
public static String currentTime(){
Date date = new Date(System.currentTimeMillis());
DateFormat formatter = new SimpleDateFormat("HH시 mm분 ss초");
return formatter.format(date);
}
.............
58초옵저버 등록시작
00초 -- Observer 1 : 300 milliseconds <-- 2초 sleep 후 실행 (subscribe 후 시작됨) , 그리고 한꺼번에 캐쉬내용을 출력함
00초 -- Observer 1 : 600 milliseconds
00초 -- Observer 1 : 900 milliseconds
00초 -- Observer 1 : 1200 milliseconds
00초 -- Observer 1 : 1500 milliseconds
00초 -- Observer 1 : 1800 milliseconds
00초 -- Observer 1 : 2100 milliseconds
01초 -- Observer 1 : 2400 milliseconds
01초 -- Observer 1 : 2700 milliseconds
01초 -- Observer 1 : 3000 milliseconds
01초 -- Observer 1 : 3300 milliseconds
02초 -- Observer 1 : 3600 milliseconds
02초 -- Observer 1 : 3900 milliseconds
.......
io.reactivex.exceptions.OnErrorNotImplementedException: Only a single observer allowed.
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
위에 보면 58초에 시작해서
기다리고 있다가 subscribe
되는 순간 7개를 한꺼번에 배출 시킨다. 내부적으로 버퍼에 넣어두고 캐싱으로 사용한다.
배출함으로써 캐싱은 지워지게 된다.
그리고 하나의 subscribe
만 사용가능하며 두개이상일 경우 위와 같이 오류가 발생한다.
만약 두개이상 사용하기를 원하면 ReplaySubject
를 고려하길 바란다.
그래도 굳이 해야하는 경우라면 다음과 같이 proxy 로 속여서 만들수 있습니다.
io.reactivex.subjects.Subject<String> subject = UnicastSubject.create();
System.out.println(currentTime() + " 옵저버 등록시작");
Observable.interval(300 , TimeUnit.MILLISECONDS)
.map( l -> (( l + 1) * 300) + " milliseconds")
.subscribe(subject);
sleep(2000);
//multicasting
Observable<String> multicast = subject.publish().autoConnect(); //자동연결로 만들기
multicast.subscribe( s -> System.out.println( currentTime() + " -- Observer 1 : " + s));
sleep(2000);
multicast.subscribe( s -> System.out.println( currentTime() + " -- Observer 2 : " + s));
sleep(2000);
.......
31초 옵저버 등록시작
33초 -- Observer 1 : 300 milliseconds <-- subscribe되는 동시에 한꺼번에 배출이 시작된다..
33초 -- Observer 1 : 600 milliseconds
33초 -- Observer 1 : 900 milliseconds
33초 -- Observer 1 : 1200 milliseconds
33초 -- Observer 1 : 1500 milliseconds
33초 -- Observer 1 : 1800 milliseconds
33초 -- Observer 1 : 2100 milliseconds
34초 -- Observer 1 : 2400 milliseconds
34초 -- Observer 1 : 2700 milliseconds
34초 -- Observer 1 : 3000 milliseconds
35초 -- Observer 1 : 3300 milliseconds
35초 -- Observer 1 : 3600 milliseconds
35초 -- Observer 1 : 3900 milliseconds
36초 -- Observer 1 : 4200 milliseconds
36초 -- Observer 2 : 4200 milliseconds
36초 -- Observer 1 : 4500 milliseconds
36초 -- Observer 2 : 4500 milliseconds
36초 -- Observer 1 : 4800 milliseconds
36초 -- Observer 2 : 4800 milliseconds
36초 -- Observer 1 : 5100 milliseconds
36초 -- Observer 2 : 5100 milliseconds
37초 -- Observer 1 : 5400 milliseconds
37초 -- Observer 2 : 5400 milliseconds
37초 -- Observer 1 : 5700 milliseconds
37초 -- Observer 2 : 5700 milliseconds
37초 -- Observer 1 : 6000 milliseconds
37초 -- Observer 2 : 6000 milliseconds
'스터디 > RxJava2' 카테고리의 다른 글
[Rxjava2]Windowing을 알아보자. (0) | 2017.10.22 |
---|---|
[RxJava2]Buffering 에 대해서 알아보자. (0) | 2017.10.22 |
[Rxjava2]AsyncSubject (0) | 2017.10.22 |
[RxJava2]ReplaySubject (0) | 2017.10.22 |
[Rxjava2]BehaviorSubject 에 대해서 알아보자. (0) | 2017.10.22 |