«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

[RxJava2]Multicasting 기본 개념 이해 본문

스터디/RxJava2

[RxJava2]Multicasting 기본 개념 이해

행복한 수지아빠 2017. 10. 22. 01:09

기본적인 Observable 생성 후 배출은 다음과 같습니다.

Observable<Integer> threeIntegers = Observable.range(1 , 3);
threeIntegers.subscribe( i -> System.out.println("Observer 1 : " + i));
threeIntegers.subscribe( i -> System.out.println("Observer 2 : " + i));
--------------------------
Observer 1 : 1
Observer 1 : 2
Observer 1 : 3
Observer 2 : 1
Observer 2 : 2
Observer 2 : 3

보다시피 Observable 1 이 끝나고Observable 2가 다시 실행되는 구조로 되어있다.

동시에 배출 하고 싶을땐 어떻게 할까.

그럴때 ConnectableObservable 을 사용할 수 있습니다.

사용법은 publish -> connect 로 연결해서 배출을 동시에 시킬수 있다 .

그럼 기본적인 사용예를 보자.

ConnectableObservable<Integer> threeIntegers = Observable.range(1 , 3).publish();
threeIntegers.subscribe( i -> System.out.println("Observer 1 : " + i));
threeIntegers.subscribe( i -> System.out.println("Observer 2 : " + i));

//여기까지 아무런 배출이 이루어지지 않는다.
threeIntegers.connect(); //이제 동시에 배출을 시킨다.

-----------------
Observer 1 : 1
Observer 2 : 1
Observer 1 : 2
Observer 2 : 2
Observer 1 : 3
Observer 2 : 3

조금 더 깊이 들어가보자.

다시 일반적인cold Observable 을 보자

public static void main(String[] args){

    Observable<Integer> threeRandoms = Observable.range(1 , 3)
                                        .map( i-> randomInt());

    threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));
    threeRandoms.subscribe( i -> System.out.println("Observer 2 : " + i));

}

public static int randomInt(){
    return ThreadLocalRandom.current().nextInt(100000);
}
---------------------------------
Observer 1 : 25134
Observer 1 : 66916
Observer 1 : 81
Observer 2 : 54593
Observer 2 : 95417
Observer 2 : 25863

생각했던 기본 형태인 첫번째 Observer 배출이 다 되고 두번째가 실행된다.

그럼 이 구조의 형태는 다음과 같다고 본다.

각각의 나누어진 형태로 결과를 배출 하고 있다. 어려운게 없다.

자 그럼 publish 사용시 어떻게 바뀌는 지 보자.

ConnectableObservable<Integer> threeInts = Observable.range(1, 3).publish(); //publish 붙는 위치가 중요하다. 

Observable<Integer> threeRandoms = threeInts.map( i-> randomInt());

threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));
threeRandoms.subscribe( i -> System.out.println("Observer 2 : " + i));

threeInts.connect();
---------------------------------------------------
Observer 1 : 12581
Observer 2 : 65620
Observer 1 : 85896
Observer 2 : 43633
Observer 1 : 77427
Observer 2 : 82409

이런식으로 이루어질텐데.. 문제가 뭘까?

결과가 보면 Observable 1과 Observable 2가 전혀 값이 똑같지 않다.

이유는 map에서 랜덤 숫자를 생성하는데 그 전에 publish을 호출했기 때문이다.

그러면 위치를 바꿔보자.

//publish 붙는 위치가 중요하다.

ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3).map( i-> randomInt()).publish(); 

threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));
threeRandoms.subscribe( i -> System.out.println("Observer 2 : " + i));

threeRandoms.connect();
--------------------------
Observer 1 : 74650
Observer 2 : 74650
Observer 1 : 33655
Observer 2 : 33655
Observer 1 : 99136
Observer 2 : 99136

값이 동일하게 이루어진 걸 볼수 있다.

그럼 흐름은 다음과 같다.

그림으로 보니 쉽게 이해가 된다.

그럼 언제 Multicast을 해야 할까?

일반적으로 중복되는 형태 , 재활용되는 Observable 형태를 사용시 자주 쓰인다.

그럼 재활용해서 다시 가공하는 예제를 보자.

ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3).map( i-> randomInt()).publish(); //publish 붙는 위치가 중요하다.

//첫번째 배출 시작
threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));

//두번째는 재활용해서 다른 형태로 가공
//랜덤으로 들어오는 숫자들을 합해서(누진계) 마지막에 한번 배출 한다. 
threeRandoms.reduce(0,(total , next) -> total + next).subscribe( i -> System.out.println("Observer 2 : " + i));

threeRandoms.connect();
--------------------------
Observer 1 : 82613
Observer 1 : 96148
Observer 1 : 27865
Observer 2 : 206626 <- 한번 배출된걸 보면 된다.

그림 형태는 다음과 같을것이다.