«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

[RxJava2]Flowable을 직접 커스텀해보자. 본문

스터디/RxJava2

[RxJava2]Flowable을 직접 커스텀해보자.

행복한 수지아빠 2017. 10. 22. 12:37

Flowable.generate()

배압을 직접 관리가 가능하다. Flowable.generate 는 Consumer<Emitter<T>> 을 구현하면 된다.

예를 보자.

public static void main(String[] args){
    randomGenerator( 1, 10000)
            .subscribeOn(Schedulers.computation())
            .doOnNext(i -> System.out.println("Emitting " + i))
            .observeOn(Schedulers.io())
            .subscribe(i -> {
                sleep(5);
                System.out.println("Received item : " + i);
            });

    sleep(5000);
}

static Flowable<Integer> randomGenerator(int min , int max) {
    return Flowable.generate( emitter -> emitter.onNext(ThreadLocalRandom.current().nextInt(min , max)));
}
............
 ...
 Emitting 8014
 Emitting 3112
 Emitting 5958
 Emitting 4834 //128th emission
 Received 9563
 Received 4359
 Received 9362
 ...
 Received 4880
 Received 3192
 Received 979 //96th emission
 Emitting 8268
 Emitting 3889
 Emitting 2595
...

128 -> 96 -> 96 순서로 이루어지고 있다.

주의할 점은 onNext 를 한번만 사용가능하다. 그 이상 사용시 IllgalStateException 이 발생된다.

public static void main(String[] args) {
        rangeReverse(100,-100)
                 .subscribeOn(Schedulers.computation())
                 .doOnNext(i -> System.out.println("Emitting " + i))
                 .observeOn(Schedulers.io())
                 .subscribe(i -> {
                     sleep(50);
                     System.out.println("Received " + i);
                 });
        sleep(50000);
     }
    static Flowable<Integer> rangeReverse(int upperBound, int lowerBound) {
         return Flowable.generate(() -> new AtomicInteger(upperBound + 1),
                 (state, emitter) -> {
                     int current = state.decrementAndGet();
                     emitter.onNext(current);
                     if (current == lowerBound)
                         emitter.onComplete();
                 }
         );

     }
.............
Emitting 100
 Emitting 99
 ...
 Emitting -25
 Emitting -26
 Emitting -27 //128th emission
 Received 100
 Received 99
 Received 98
 ...
 Received 7
 Received 6
 Received 5 // 96th emission
 Emitting -28
 Emitting -29
 Emitting -30

이런식으로 사용이 가능하다. Flowable.create() 사용보다 더 자세한 컨트롤이 가능하다.