«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

[Rxjava2]예제로 이해하는 PublishSubject 본문

스터디/RxJava2

[Rxjava2]예제로 이해하는 PublishSubject

행복한 수지아빠 2017. 10. 22. 01:22

PublishSubject

통상 일반적인 subject 형태이다.

onNext() , onComplete() , onError() 를 호출해서 사용이 가능하다.

그럼 예로 보자.

io.reactivex.subjects.Subject<String> subject = PublishSubject.create();

subject.map(String::length) // 문자길이를 리턴하고 있다. 
        .subscribe(System.out::println);

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

...........
5
4
5

간단한 기본 형태라 어렵지는 않다.

onComplete() 를 하는 순간 더이상 Observer 가 없다고 알려줄 수 있다.

Subject를 언제 쓰면 유용할까?

나도 지금까지 많이 쓰면서 늘 이 질문을 했었다. 이럴때 과연 쓰는게 맞는 걸까?

우선 기본적인 사용처는 Multicasting상황에서 아주 유용하다. (예를 들어 이벤트 버스)

그리고 다른 방법으로 여러개의 Observable을 Merge할 때 매우 유용하다. ( Observable.merge() 대신)

그럼 예를 들어 보자.

//1초 간격으로 배출시킨다.
Observable<String> source1 = Observable.interval( 1, TimeUnit.SECONDS).map(v -> (v + 1) + " seconds");
// 0.3 초 간격으로 배출시킨다.
Observable<String> source2 = Observable.interval(300 , TimeUnit.MILLISECONDS).map(v -> (v + 1) + " milliseconds");

//선언
Subject<String> subject = PublishSubject.create();

//값 출력
subject.subscribe(System.out::println);

source1.subscribe( subject );
source2.subscribe( subject );

sleep(3000); //3초동안 유지

........
1 milliseconds <- source 1 실행
2 milliseconds
3 milliseconds
1 seconds  <- source 2 실행
4 milliseconds
5 milliseconds
6 milliseconds
2 seconds
7 milliseconds
8 milliseconds
9 milliseconds
10 milliseconds
3 seconds

보다 시피 2개의 Observer를 등록해서 머지 해서 사용할 수 있다. 이러한 방법이 주는 장점은 모듈화 및 캡슐화 되어 있는 모델방식에서 유용하다고 한다.

하지만 Subject 사용시 조심해야 할 부분이 있다.

onNext() , onComplete() 사용시 Observable + Observer 정의가 되어있다고 생각하고 호출해야 한다.

안그러면 생각했던 결과가 나오지 않는다.

이전 예제를 다시 보면

//선언
io.reactivex.subjects.Subject<String> subject = PublishSubject.create();

subject.map(String::length)
        .subscribe(System.out::println);

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

........................................
5
4
5

이러한 결과가 나오지만..

//선언
io.reactivex.subjects.Subject<String> subject = PublishSubject.create();

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

// 후에 등록시 문제가 발생된다. 
subject.map(String::length)
        .subscribe(System.out::println);
.........
출력안됨
.........

이런 경우 이전에 배운 publish() 또는 replay() 등으로 Hot Hot 하게 만들어서 Multicasting을 제어할수 있다.

그리고 여러번의 subject.onNext() 남발로 인해 배출들이 꼬일수 있다.

그럴 경우를 대비해서 직렬화라는 기능을 제공한다.

Subject<String> subject = PublishSubject.<String>create().toSerialized();