Buffering 은 주어진 배열을 지정된 갯수로 나누는 역할을 하게 된다.

io.reactivex.Observable.range(1,50)
                .buffer(8)
                .subscribe(System.out::println);

.......
[1, 2, 3, 4, 5, 6, 7, 8]
[9, 10, 11, 12, 13, 14, 15, 16]
[17, 18, 19, 20, 21, 22, 23, 24]
[25, 26, 27, 28, 29, 30, 31, 32]
[33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48]
[49, 50]

8개씩 나누는 과정에 깔끔하게 떨어지지 않으면 남은 만큼만 배열로 만들어서 보여주고 있다.

그리고 2번째 인자로 bufferSupplier 형태로 바꿔서 나눌 수 있다.

 .buffer(8 , HashSet::new)

skip으로 해당 번호번재를 뺄수도 있다.

.buffer(2 , 3) //2번째 인자는 skip 할 번호
......
[1, 2]
[4, 5] <- 3이 빠져있다. 
[7, 8]
[10]

여기에서 만약 count 보다 skip 숫자를 적게 넣게 되면 어떻게 될까?

.buffer(3 , 1)
..........
[1, 2, 3]
[2, 3, 4] <-- 이전 방출 이 포함되어 있는 걸 볼 수 있다.
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]
[7, 8, 9]
[8, 9, 10]
[9, 10]
[10]

이전방출량이 포함되는 형태가 된다. 그래서 이걸 활용을 하면 [ 이전 방출량 , 다음 방출량 ] 형태로 만들수 있습니다.

io.reactivex.Observable.range(1,10)
                .buffer(2 , 1)
                .filter( item -> item.size() == 2) //사이즈를 2개를 제한
                .subscribe( s-> System.out.println(s));
..........
[1, 2]
[2, 3]
[3, 4]
[4, 5]
[5, 6]
[6, 7]
[7, 8]
[8, 9]
[9, 10]

시간으로 나누는 Buffering

주어진 일정 시간동안 버퍼링을 나눠서 배출 할 수 있다.

io.reactivex.Observable.interval(300 , TimeUnit.MILLISECONDS)
                .map( i -> (i+1) * 300)
                .buffer(1, TimeUnit.SECONDS) // 1. 시간 2. 시간방법
                .subscribe( s-> System.out.println(s));

sleep(4000);
----------------
[300, 600, 900] <--
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]

이 예제는 1초간격으로 배열중 나오는 걸 Buffering해서 보여주고 있다.

io.reactivex.Observable.interval(300 , TimeUnit.MILLISECONDS)
                .map( i -> (i+1) * 300)
                .buffer(1, TimeUnit.SECONDS , 2) //3번째 인자는 Max 갯수이다.
                .subscribe( s-> System.out.println(s));

............
[300, 600, 900]
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]

최대 2개까지 1초동안 묶어서 배출하는 예제이다.

Boundary-based Buffering

또 하나 흥미로운 형태가 있다.

Observable 을 buffer 인자에 넣어서 사용이 가능하다.

다음의 예제는 1초 간격의 옵저버를 0.3 초간의 간격을 가진 옵저버에 바운더리해서 결과를 뽑아내는 방법이다.

io.reactivex.Observable<Long> cutOffs = io.reactivex.Observable.interval(1 , TimeUnit.SECONDS);

io.reactivex.Observable.interval(300 , TimeUnit.MILLISECONDS)
        .map( i -> (i+1) * 300)
        .buffer( cutOffs ) // 옵저버를 넣어서 간격 조절함..
        .subscribe( s-> System.out.println(s));

..........
[300, 600, 900]
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]
[4200, 4500, 4800]


'스터디 > RxJava2' 카테고리의 다른 글

[RxJava2]Throttling을 알아보자  (0) 2017.10.22
[Rxjava2]Windowing을 알아보자.  (0) 2017.10.22
[Rxjava2] 예제로 배우는 UnicastSubject  (0) 2017.10.22
[Rxjava2]AsyncSubject  (0) 2017.10.22
[RxJava2]ReplaySubject  (0) 2017.10.22

+ Recent posts