«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

[RxJava2] Replaying , Caching 을 알아보자. 본문

스터디/RxJava2

[RxJava2] Replaying , Caching 을 알아보자.

행복한 수지아빠 2017. 10. 22. 01:13

Replay와 Caching

멀티 캐스팅을 사용하면 여러 Observer 들 사이에서 공유되는 값을 캐시 할 수 있습니다.

우선 Replay를 보자.

이미 많은 예제사이트들을 보셨을 수도 있지만 이전에 배출한 값을 캐싱해서 가지고 있다가 새롭게 배출시 캐싱한 걸 같이 먼저 배출해준다.

예제를 보자.

Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
    .replay()
    .autoConnect();

//Observer 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observer 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observer 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기

.............
Observer 1 : 0
Observer 1 : 1
Observer 1 : 2
Observer 2: 0 <- 2번째 Observer 시작시 캐싱된 게 먼저 배출을 시작하고 있다.
Observer 2: 1
Observer 2: 2 
Observer 1 : 3
Observer 2: 3
Observer 1 : 4 
Observer 2: 4 
Observer 3: 0 <- 3번째 Observer 시작시 캐싱된 게 먼저 배출을 시작하고 있다.
Observer 3: 1
Observer 3: 2
Observer 3: 3
Observer 3: 4
Observer 1 : 5
Observer 2: 5
Observer 3: 5
Observer 1 : 6
Observer 2: 6
Observer 3: 6
Observer 1 : 7
Observer 2: 7
Observer 3: 7

위 내용을 보면 2,3번째 Observer가 배출시 1에서 먼저 배출되었던 3번의 값을 캐싱해서 같이 내보내고 있다.

하지만 캐싱작업으로 인해서 메모리 오버플로우 가 발생될 수 있으니 인자값을 통해서 버퍼수를 정해주는 방법이 좋다.

.replay(2)

.......
Observer 1 : 0
Observer 1 : 1
Observer 1 : 2
Observer 2: 1 <-- 처음 0은 제외된 걸 볼수 있다. 
Observer 2: 2
Observer 1 : 3
Observer 2: 3
Observer 1 : 4
Observer 2: 4
Observer 1 : 5
Observer 2: 5
Observer 3: 4 <-- 3부터는 캐싱작업이 되지 않는 다... Observer 2번 에서 캐싱하고 비워버림
Observer 3: 5
Observer 1 : 6
Observer 2: 6
Observer 3: 6
Observer 1 : 7
Observer 2: 7
Observer 3: 7
Observer 1 : 8
Observer 2: 8
Observer 3: 8

위에서 보다시피 2번째 Observer에서는 처음 0을 제외하고 1,2 가 배출되고 있는 걸 볼 수 있다.

그런데 3부터는 앞에 2번의 캐싱이 발생되지 않는다.. refCount(2) 라고 정의시 2번째에서 캐싱하고 비워버리고 있다.

그럼 이 걸 방지 하기 위해선 어떻게 해야 될까?

캐싱 시간을 지정해서 유지하도록 할 수 있다.

//0.3 초 간격으로 도는 걸 확인
Observable<Long> threeRandoms = Observable.interval(300,TimeUnit.MILLISECONDS)
        //출력값 계산
        .map(v -> (v + 1) * 300) // map to elapsed milliseconds
        //캐싱 시간을 지정
        .replay(2 , TimeUnit.SECONDS)
        .autoConnect();

//Observer 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observer 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observer 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기

..............
Observer 1 : 300
Observer 1 : 600
Observer 1 : 900
Observer 1 : 1200
Observer 1 : 1500
Observer 1 : 1800
Observer 1 : 2100
Observer 1 : 2400
Observer 1 : 2700
Observer 2: 1200 <- 1초동안 캐싱된 걸 보여주고 있다. 
Observer 2: 1500
Observer 2: 1800
Observer 2: 2100
Observer 2: 2400
Observer 2: 2700
Observer 1 : 3000
Observer 2: 3000
Observer 1 : 3300
Observer 2: 3300
Observer 1 : 3600
Observer 2: 3600
Observer 1 : 3900
Observer 2: 3900
Observer 1 : 4200
Observer 2: 4200
Observer 1 : 4500
Observer 2: 4500
Observer 1 : 4800
Observer 2: 4800
Observer 1 : 5100
Observer 2: 5100
Observer 1 : 5400
Observer 2: 5400
Observer 1 : 5700
Observer 2: 5700
Observer 3: 4200 <- 1초동안 캐싱된 걸 보여주고 있다. 
Observer 3: 4500
Observer 3: 4800
Observer 3: 5100
Observer 3: 5400
Observer 3: 5700
Observer 1 : 6000
Observer 2: 6000
Observer 3: 6000

위에서 보다 시피 시간단위로 캐싱된 걸 배출 시킬 수 있다.

하지만 내가 원한건 앞에 2개만 유지시키는 거였다. 그럼 다시 인자값을 변경해보자.

.replay(2 , 1, TimeUnit.SECONDS) // buffersize , time 

....................
Observer 1 : 300
Observer 1 : 600
Observer 1 : 900
Observer 1 : 1200
Observer 1 : 1500
Observer 1 : 1800
Observer 1 : 2100
Observer 1 : 2400
Observer 1 : 2700
Observer 1 : 3000
Observer 2: 2700 <- 앞에 2개만 캐싱된 걸 볼수 있다.
Observer 2: 3000
Observer 1 : 3300
Observer 2: 3300
Observer 1 : 3600
Observer 2: 3600
Observer 1 : 3900
Observer 2: 3900
Observer 1 : 4200
Observer 2: 4200
Observer 1 : 4500
Observer 2: 4500
Observer 1 : 4800
Observer 2: 4800
Observer 1 : 5100
Observer 2: 5100
Observer 1 : 5400
Observer 2: 5400
Observer 1 : 5700
Observer 2: 5700
Observer 3: 5400 <- 앞에 2개만 캐싱된 걸 볼 수 있다. 
Observer 3: 5700
Observer 1 : 6000
Observer 2: 6000
Observer 3: 6000
Observer 1 : 6300
Observer 2: 6300
Observer 3: 6300

이제 내가 원한 형태로 캐싱되서 볼수 있다.

정리를 하자면 ConnectableObservable에서 들어오는 Observer를 모두 배출 시키고 원하면 autoConnect를 사용하면 된다.

그리고 무한 캐싱작업을 원하면 cache()또는 publish().refcount() 을 이용하면 된다.

추가적으로 캐싱작업시 예상되는 요소 수를 미리 지정하고 버퍼를 최적화를 할 수 있습니다.

Observable<Integer> cachingTestObservable =
        Observable.just(6, 2, 5, 7, 1, 4, 9, 8, 3) 
            .scan(0, (total,next) -> total + next)
            .cacheWithInitialCapacity(9); <- 9개의 요소를 미리 지정해서 버퍼를 최적화 시킴

주의 사항으로 cache를 폐기 할 계획이 없으면 사용하지 말자. 메모리를 계속 잡고 있어서 문제가 생길수 있기 때문이다.

대안으로 replay() 으로 캐시 크기등을 지정해서 제어하는 방향을 모색하는 게 바람직하다.