«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

[RxJava2] publish <-> Connect 예제로 이해해보자. 본문

스터디/RxJava2

[RxJava2] publish <-> Connect 예제로 이해해보자.

행복한 수지아빠 2017. 10. 22. 01:10

앞에서 본 예제처럼 connect() 를 통해 동시 실행할 수 있다.

하지만 자동으로 connect를 할수도 있다.

주의 할 내용은 자동 연결(Auto connection) 은 사용할 때 상당한 주의를 요한다. 뜻하지 않게 배출시 제대로 되지 않는 경우가 발생될 수 있기 때문이다.

그럼 예제를 살펴보자 .

이전장에서 했던 2가지 서로 다른 배출 방법에 자동 연결만 추가한 부분이다.

Observable<Integer> threeRandoms = Observable.range(1,3)
    .map(i->randomInt())
    .publish()
    .autoConnect(2); //2번에 대해서 자동 연결을 하겠다는 뜻이다.!!


//Observable 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

//Observable 2 배출
threeRandoms.reduce( 0 ,(total , next) -> total + next)
.subscribe( i -> System.out.println("Observable 2: " + i));

//Observable 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

...........
Observable 1 : 44005
Observable 1 : 633061
Observable 1 : 464828
Observable 2: 1141894

예제에서 보면 2번만 연결하겠다고 선언했다.(기본값은 1이다.).

그래서 3번째 Observer무시되고 있다.

autoConnect() 을 사용시 중간에 들어오는 Observable 도 연결이 가능하다.

위에 말은 무슨 뜻일까?

예를 들어 Observer1개를 자동연결하는 과정에서 시간흐름에 따른(IntervalObserver배출이 달라질 수 있다는 것이다.

말로 설명하니 어려우니 우선 예제를 보자.

Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
                .publish()
                .autoConnect(); //기본값 1

//Observable 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observable 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observable 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기

................
Observable 1 : 0
Observable 1 : 1
Observable 1 : 2
Observable 1 : 3
Observable 2: 3 <- 추가적으로 연결된다. 
Observable 1 : 4 <- 다른 Observable 이 연결되더라도 이전 껀 사라지지 않는다. 
Observable 2: 4
Observable 1 : 5
Observable 2: 5
Observable 1 : 6
Observable 2: 6
Observable 3: 6
Observable 1 : 7
Observable 2: 7
Observable 3: 7 <- 다시 새로운 Observable 이 연결된다..
Observable 1 : 8
Observable 2: 8
Observable 3: 8

위 예제를 보면 새로운 Observer 이 연결되더라도 이전 배출도 계속 이루어지는 걸 볼수 있다.

이 기능은 어떠한 Observer 를 기다리지 않고 즉시 배출들을 시작할 수 있다는 장점이 있다.

그럼 refCount() 와 share() 기능에 대해서 다음장에서 살펴보자.