Notice
Recent Posts
Recent Comments
올해는 머신러닝이다.
[RxJava2]Subscriber 란 무엇인가? 본문
만약 배출된 항목에 대해서 다시 조작하고 싶을 때 어떻게 하면 될까?
이 방법에 대해서 subscriber 을 적용해서 진행할 수 있다.
예를 보자.
public static void main(String[] args){
Flowable.range(1, 100)
.doOnNext( v -> System.out.println("push -> " + v))
.observeOn(Schedulers.io())
.map( i -> intenseCalculation(i))
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println( "onSubscribe" );
s.request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
sleep( 50 );
}
@Override
public void onError(Throwable t) {
System.out.println( "onSubscribe" );
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
sleep( Long.MAX_VALUE );
}
public static <T> T intenseCalculation(T value){
//최대 0.2초 sleep
int time = ThreadLocalRandom.current().nextInt(200);
System.out.println("sleep time : " + time);
sleep( time );
return value;
}
public static void sleep(long mills) {
try{
Thread.sleep( mills );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
...................
.....
push -> 124
push -> 125
push -> 126
push -> 127
push -> 128
onNext 1, thread -> RxCachedThreadScheduler-1
sleep time : 180
onNext 2, thread -> RxCachedThreadScheduler-1
sleep time : 165
onNext 3, thread -> RxCachedThreadScheduler-1
sleep time : 52
onNext 4, thread -> RxCachedThreadScheduler-1
sleep time : 63
......
onNext 95, thread -> RxCachedThreadScheduler-1
sleep time : 36
onNext 96, thread -> RxCachedThreadScheduler-1
push -> 129
push -> 130
push -> 131
위와 같이 Flowable (A) , Subscriber (B) 라고 가정하면
A(128개 배출) -> B(96개) -> A(96)
-> .... 이런식으로 배출하는 걸 볼수 있다.
즉 subscriber
로 업스트림으로 진행이 가능하다.
자 그러면 이걸 활용을 어떤식으로 할수 있는지 보자.
Flowable.range(1, 1000)
.doOnNext( v -> System.out.println("push -> " + v))
.observeOn(Schedulers.io())
.map( i -> intenseCalculation(i))
.subscribe(new Subscriber<Integer>() {
Subscription subscription;
AtomicInteger count = new AtomicInteger(0);
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
System.out.println( "onSubscribe" );
s.request(40);
}
@Override
public void onNext(Integer integer) {
System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
sleep( 50 );
if( count.incrementAndGet() % 20 == 0 && count.get() >= 40) {
System.out.println("Requesting 20 more!!");
subscription.request(20);
}
}
@Override
public void onError(Throwable t) {
System.out.println( "onSubscribe" );
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
...........
onSubscribe
push -> 1
push -> 2
push -> 3
push -> 4
push -> 5
push -> 6
push -> 7
....
onNext 38, thread -> RxCachedThreadScheduler-1
sleep time : 128
onNext 39, thread -> RxCachedThreadScheduler-1
sleep time : 88
onNext 40, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
.....
sleep time : 74
onNext 59, thread -> RxCachedThreadScheduler-1
sleep time : 83
onNext 60, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
sleep time : 38
onNext 61, thread -> RxCachedThreadScheduler-1
sleep time : 148
onNext 62, thread -> RxCachedThreadScheduler-1
sleep time : 159
위 내용은 배출이 되지만 40번을 끝내고 다시 request를 해서 20번을 더 진행하고 계속 요청하고 있다.
이 부분을 보면 무엇이 떠오를까.. 앱개발시 리스트 조회시 fetch를 좀 더 효율적으로 가능할 수 있는 방법이 떠오른다.
추후에 적용해볼수 있을것 같다.'
그리고 처음 push 아이템은 언제 실행될까?
128(push) -> 96(request) -> 96(push) ->....
이런식으로 진행된다. 보면 볼수록 매력이 있다...
주의할 점은 Subscriber 을 통해서 업스트림이 되는 게 아니란 점...
단지 그 요청을 상류쪽에서 중계를 방법을 결정하는 것입니다.
'스터디 > RxJava2' 카테고리의 다른 글
[RxJava2]Flowable을 직접 커스텀해보자. (0) | 2017.10.22 |
---|---|
[RxJava2]Backpressure와 함께 Flowable를 만들어 보자 (0) | 2017.10.22 |
[RxJava2]Flowable에서의 Backpressure (0) | 2017.10.22 |
[RxJava2] Flowable에 대해서 알아보자 (0) | 2017.10.22 |
[RxJava2] 배압(Backpressure) 이란 무엇인가. (0) | 2017.10.22 |