Windowing

이전 장에서 배운 buffering과 비슷한 형태로 배열을 나누어서 다시 배열로 만들어주는 것이지만 하나의 큰 차이점은 있다.

바로 결과가 Observable형태로 나오는 것이다. map과 flatMap의 차이점으로 보면 된다.

정해진 사이즈의 Windowing

Observable.range(1 , 50)
        .window(8)
        .flatMapSingle(obs ->
            //reduce 더하기를 진행한다.         
                obs.reduce( "" , (total ,next) -> total + (total.equals("") ? "" : "|") + next ) )
        .subscribe(System.out::println);

.......
1|2|3|4|5|6|7|8
9|10|11|12|13|14|15|16
17|18|19|20|21|22|23|24
25|26|27|28|29|30|31|32
33|34|35|36|37|38|39|40
41|42|43|44|45|46|47|48
49|50

window사용시 리턴값이 Observable형태이기 때문에 다시 reduce 를 사용해서 리턴하고 있다.

skip 인자값

 .window(2, 3) //3번째를 무시하라..
 ........
 1|2
4|5
7|8
10|11
13|14
16|17
19|20
22|23
25|26

Time base Windowing

.window(1, TimeUnit.SECONDS)  
.........
300|600|900
1200|1500|1800
2100|2400|2700
3000|3300|3600|3900
4200|4500|4800

Boundary based windowing

Observable<Long> cutOffs =
                Observable.interval(1, TimeUnit.SECONDS);
        Observable.interval(300, TimeUnit.MILLISECONDS)
            .map(i -> (i + 1) * 300) // map to elapsed time
            .window(cutOffs)
            .flatMapSingle(obs -> obs.reduce("", (total, next) -> total
                + (total.equals("") ? "" : "|") + next))
            .subscribe(System.out::println);

.........
300|600|900
1200|1500|1800
2100|2400|2700
3000|3300|3600|3900
4200|4500|4800

Buffering과 매우 유사하며 차이점은 리턴값이 Observable형태인것만 기억하면 된다.

+ Recent posts