Notice
Recent Posts
Recent Comments
올해는 머신러닝이다.
[RxJava2]Flowable을 직접 커스텀해보자. 본문
Flowable.generate()
배압을 직접 관리가 가능하다. Flowable.generate
는 Consumer<Emitter<T>>
을 구현하면 된다.
예를 보자.
public static void main(String[] args){
randomGenerator( 1, 10000)
.subscribeOn(Schedulers.computation())
.doOnNext(i -> System.out.println("Emitting " + i))
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.println("Received item : " + i);
});
sleep(5000);
}
static Flowable<Integer> randomGenerator(int min , int max) {
return Flowable.generate( emitter -> emitter.onNext(ThreadLocalRandom.current().nextInt(min , max)));
}
............
...
Emitting 8014
Emitting 3112
Emitting 5958
Emitting 4834 //128th emission
Received 9563
Received 4359
Received 9362
...
Received 4880
Received 3192
Received 979 //96th emission
Emitting 8268
Emitting 3889
Emitting 2595
...
128 -> 96 -> 96
순서로 이루어지고 있다.
주의할 점은 onNext 를 한번만 사용가능하다. 그 이상 사용시 IllgalStateException
이 발생된다.
public static void main(String[] args) {
rangeReverse(100,-100)
.subscribeOn(Schedulers.computation())
.doOnNext(i -> System.out.println("Emitting " + i))
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(50);
System.out.println("Received " + i);
});
sleep(50000);
}
static Flowable<Integer> rangeReverse(int upperBound, int lowerBound) {
return Flowable.generate(() -> new AtomicInteger(upperBound + 1),
(state, emitter) -> {
int current = state.decrementAndGet();
emitter.onNext(current);
if (current == lowerBound)
emitter.onComplete();
}
);
}
.............
Emitting 100
Emitting 99
...
Emitting -25
Emitting -26
Emitting -27 //128th emission
Received 100
Received 99
Received 98
...
Received 7
Received 6
Received 5 // 96th emission
Emitting -28
Emitting -29
Emitting -30
이런식으로 사용이 가능하다. Flowable.create()
사용보다 더 자세한 컨트롤이 가능하다.
'스터디 > RxJava2' 카테고리의 다른 글
[RxJava2]Compose 활용 (0) | 2017.10.22 |
---|---|
[RxJava2]Transformer 활용 (0) | 2017.10.22 |
[RxJava2]Backpressure와 함께 Flowable를 만들어 보자 (0) | 2017.10.22 |
[RxJava2]Subscriber 란 무엇인가? (0) | 2017.10.22 |
[RxJava2]Flowable에서의 Backpressure (0) | 2017.10.22 |