SwitchMap()

flatMap과 비슷하지만 큰 차이점은 제일 마지막 것만 배출 한다는 차이가 있다.

예제를 보자.

public static void main(String[] args) {
    Observable<String> items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon",
            "Zeta", "Eta", "Theta", "Iota");

    Observable<String> processings = items.concatMap( s -> Observable.just(s).delay(randomSleepTime() , TimeUnit.MILLISECONDS));

    processings.subscribe( System.out::println);
    sleep( 20000 );
}

public static int randomSleepTime() {
    //returns random sleep time between 0 to 2000 milliseconds
    return ThreadLocalRandom.current().nextInt(2000);
}
public static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

...........
//0~2초 딜레이 랜덤으로 된다. 
Alpha
Beta
Gamma
Delta
Epsilon
Zeta
Eta
Theta
Iota

만약 매 5초간격으로 실행되는 프로그램이 있으며 내부적으로 동작되는 옵저버를 cancel 하고 새로운 아이템을 등록해서 배출 하고 싶다면 어떻게 해야 할까? 그럴 경우 switchMap() 을 사용해서 최신껄 바꿀수 있게 해준다.

Observable<String> items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon",
                "Zeta", "Eta", "Theta", "Iota");

Observable<String> processings = items.concatMap( s -> Observable.just(s).delay(randomSleepTime() , TimeUnit.MILLISECONDS));

Observable.interval(5 , TimeUnit.SECONDS)
        //5초 간격으로 dispose 되고 새로운 processing 이 만들어져서 리턴된다.
        .switchMap( i -> processings.doOnDispose( ()-> System.out.println("Disposing! Starting next")))
        .subscribe( System.out::println);

sleep( 20000 );
........
Alpha
Beta
Gamma
Delta
Epsilon
Disposing! Starting next
Alpha
Beta
Gamma
Delta
Disposing! Starting next
Alpha
Beta


+ Recent posts