올해는 머신러닝이다.
[RxJava2] Replaying , Caching 을 알아보자. 본문
Replay
와 Caching
멀티 캐스팅을 사용하면 여러 Observer
들 사이에서 공유되는 값을 캐시 할 수 있습니다.
우선 Replay
를 보자.
이미 많은 예제사이트들을 보셨을 수도 있지만 이전에 배출한 값을 캐싱해서 가지고 있다가 새롭게 배출시 캐싱한 걸 같이 먼저 배출해준다.
예제를 보자.
Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
.replay()
.autoConnect();
//Observer 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));
sleep(3000); // 3초동안 잠자기
//Observer 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));
sleep(3000); // 3초동안 잠자기
//Observer 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));
sleep(3000); // 3초동안 잠자기
.............
Observer 1 : 0
Observer 1 : 1
Observer 1 : 2
Observer 2: 0 <- 2번째 Observer 시작시 캐싱된 게 먼저 배출을 시작하고 있다.
Observer 2: 1
Observer 2: 2
Observer 1 : 3
Observer 2: 3
Observer 1 : 4
Observer 2: 4
Observer 3: 0 <- 3번째 Observer 시작시 캐싱된 게 먼저 배출을 시작하고 있다.
Observer 3: 1
Observer 3: 2
Observer 3: 3
Observer 3: 4
Observer 1 : 5
Observer 2: 5
Observer 3: 5
Observer 1 : 6
Observer 2: 6
Observer 3: 6
Observer 1 : 7
Observer 2: 7
Observer 3: 7
위 내용을 보면 2,3번째 Observer
가 배출시 1에서 먼저 배출되었던 3번의 값을 캐싱해서 같이 내보내고 있다.
하지만 캐싱작업으로 인해서 메모리 오버플로우 가 발생될 수 있으니 인자값을 통해서 버퍼수를 정해주는 방법이 좋다.
.replay(2)
.......
Observer 1 : 0
Observer 1 : 1
Observer 1 : 2
Observer 2: 1 <-- 처음 0은 제외된 걸 볼수 있다.
Observer 2: 2
Observer 1 : 3
Observer 2: 3
Observer 1 : 4
Observer 2: 4
Observer 1 : 5
Observer 2: 5
Observer 3: 4 <-- 3부터는 캐싱작업이 되지 않는 다... Observer 2번 에서 캐싱하고 비워버림
Observer 3: 5
Observer 1 : 6
Observer 2: 6
Observer 3: 6
Observer 1 : 7
Observer 2: 7
Observer 3: 7
Observer 1 : 8
Observer 2: 8
Observer 3: 8
위에서 보다시피 2번째 Observer
에서는 처음 0을 제외하고 1,2 가 배출되고 있는 걸 볼 수 있다.
그런데 3부터는 앞에 2번의 캐싱이 발생되지 않는다.. refCount(2)
라고 정의시 2번째에서 캐싱하고 비워버리고 있다.
그럼 이 걸 방지 하기 위해선 어떻게 해야 될까?
캐싱 시간을 지정해서 유지하도록 할 수 있다.
//0.3 초 간격으로 도는 걸 확인
Observable<Long> threeRandoms = Observable.interval(300,TimeUnit.MILLISECONDS)
//출력값 계산
.map(v -> (v + 1) * 300) // map to elapsed milliseconds
//캐싱 시간을 지정
.replay(2 , TimeUnit.SECONDS)
.autoConnect();
//Observer 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));
sleep(3000); // 3초동안 잠자기
//Observer 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));
sleep(3000); // 3초동안 잠자기
//Observer 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));
sleep(3000); // 3초동안 잠자기
..............
Observer 1 : 300
Observer 1 : 600
Observer 1 : 900
Observer 1 : 1200
Observer 1 : 1500
Observer 1 : 1800
Observer 1 : 2100
Observer 1 : 2400
Observer 1 : 2700
Observer 2: 1200 <- 1초동안 캐싱된 걸 보여주고 있다.
Observer 2: 1500
Observer 2: 1800
Observer 2: 2100
Observer 2: 2400
Observer 2: 2700
Observer 1 : 3000
Observer 2: 3000
Observer 1 : 3300
Observer 2: 3300
Observer 1 : 3600
Observer 2: 3600
Observer 1 : 3900
Observer 2: 3900
Observer 1 : 4200
Observer 2: 4200
Observer 1 : 4500
Observer 2: 4500
Observer 1 : 4800
Observer 2: 4800
Observer 1 : 5100
Observer 2: 5100
Observer 1 : 5400
Observer 2: 5400
Observer 1 : 5700
Observer 2: 5700
Observer 3: 4200 <- 1초동안 캐싱된 걸 보여주고 있다.
Observer 3: 4500
Observer 3: 4800
Observer 3: 5100
Observer 3: 5400
Observer 3: 5700
Observer 1 : 6000
Observer 2: 6000
Observer 3: 6000
위에서 보다 시피 시간단위로 캐싱된 걸 배출 시킬 수 있다.
하지만 내가 원한건 앞에 2개만 유지시키는 거였다. 그럼 다시 인자값을 변경해보자.
.replay(2 , 1, TimeUnit.SECONDS) // buffersize , time
....................
Observer 1 : 300
Observer 1 : 600
Observer 1 : 900
Observer 1 : 1200
Observer 1 : 1500
Observer 1 : 1800
Observer 1 : 2100
Observer 1 : 2400
Observer 1 : 2700
Observer 1 : 3000
Observer 2: 2700 <- 앞에 2개만 캐싱된 걸 볼수 있다.
Observer 2: 3000
Observer 1 : 3300
Observer 2: 3300
Observer 1 : 3600
Observer 2: 3600
Observer 1 : 3900
Observer 2: 3900
Observer 1 : 4200
Observer 2: 4200
Observer 1 : 4500
Observer 2: 4500
Observer 1 : 4800
Observer 2: 4800
Observer 1 : 5100
Observer 2: 5100
Observer 1 : 5400
Observer 2: 5400
Observer 1 : 5700
Observer 2: 5700
Observer 3: 5400 <- 앞에 2개만 캐싱된 걸 볼 수 있다.
Observer 3: 5700
Observer 1 : 6000
Observer 2: 6000
Observer 3: 6000
Observer 1 : 6300
Observer 2: 6300
Observer 3: 6300
이제 내가 원한 형태로 캐싱되서 볼수 있다.
정리를 하자면 ConnectableObservable
에서 들어오는 Observer
를 모두 배출 시키고 원하면 autoConnect
를 사용하면 된다.
그리고 무한 캐싱작업을 원하면 cache()
또는 publish().refcount()
을 이용하면 된다.
추가적으로 캐싱작업시 예상되는 요소 수를 미리 지정하고 버퍼를 최적화를 할 수 있습니다.
Observable<Integer> cachingTestObservable =
Observable.just(6, 2, 5, 7, 1, 4, 9, 8, 3)
.scan(0, (total,next) -> total + next)
.cacheWithInitialCapacity(9); <- 9개의 요소를 미리 지정해서 버퍼를 최적화 시킴
주의 사항으로 cache를 폐기 할 계획이 없으면 사용하지 말자. 메모리를 계속 잡고 있어서 문제가 생길수 있기 때문이다.
대안으로 replay
() 으로 캐시 크기등을 지정해서 제어하는 방향을 모색하는 게 바람직하다.
'스터디 > RxJava2' 카테고리의 다른 글
[Rxjava2]BehaviorSubject 에 대해서 알아보자. (0) | 2017.10.22 |
---|---|
[Rxjava2]예제로 이해하는 PublishSubject (0) | 2017.10.22 |
[RxJava2]refCount 와 share 기본 개념 (0) | 2017.10.22 |
[RxJava2] publish <-> Connect 예제로 이해해보자. (0) | 2017.10.22 |
[RxJava2]Multicasting 기본 개념 이해 (0) | 2017.10.22 |