만약 배출된 항목에 대해서 다시 조작하고 싶을 때 어떻게 하면 될까?

이 방법에 대해서 subscriber 을 적용해서 진행할 수 있다.

예를 보자.

public static void main(String[] args){
    Flowable.range(1, 100)
            .doOnNext( v -> System.out.println("push -> " + v))
            .observeOn(Schedulers.io())
            .map( i -> intenseCalculation(i))
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    System.out.println( "onSubscribe" );
                    s.request(10);
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
                    sleep( 50 );
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println( "onSubscribe" );
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });

    sleep( Long.MAX_VALUE );
}

public static <T> T intenseCalculation(T value){
    //최대 0.2초 sleep
    int time = ThreadLocalRandom.current().nextInt(200);
    System.out.println("sleep time : " + time);
    sleep( time );
    return value;
}

public static void sleep(long mills) {
    try{
        Thread.sleep( mills );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

...................
.....
push -> 124
push -> 125
push -> 126
push -> 127
push -> 128
onNext 1, thread -> RxCachedThreadScheduler-1
sleep time : 180
onNext 2, thread -> RxCachedThreadScheduler-1
sleep time : 165
onNext 3, thread -> RxCachedThreadScheduler-1
sleep time : 52
onNext 4, thread -> RxCachedThreadScheduler-1
sleep time : 63
......
onNext 95, thread -> RxCachedThreadScheduler-1
sleep time : 36
onNext 96, thread -> RxCachedThreadScheduler-1
push -> 129
push -> 130
push -> 131

위와 같이 Flowable (A) , Subscriber (B) 라고 가정하면

A(128개 배출) -> B(96개) -> A(96) -> .... 이런식으로 배출하는 걸 볼수 있다.

즉 subscriber 로 업스트림으로 진행이 가능하다.

자 그러면 이걸 활용을 어떤식으로 할수 있는지 보자.

Flowable.range(1, 1000)
        .doOnNext( v -> System.out.println("push -> " + v))
        .observeOn(Schedulers.io())
        .map( i -> intenseCalculation(i))
        .subscribe(new Subscriber<Integer>() {
            Subscription subscription;
            AtomicInteger count = new AtomicInteger(0);
            @Override
            public void onSubscribe(Subscription s) {
                this.subscription = s;
                System.out.println( "onSubscribe" );
                s.request(40);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
                sleep( 50 );
                if( count.incrementAndGet() % 20 == 0 && count.get() >= 40) {
                    System.out.println("Requesting 20 more!!");
                    subscription.request(20);
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println( "onSubscribe" );
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
...........
onSubscribe
push -> 1
push -> 2
push -> 3
push -> 4
push -> 5
push -> 6
push -> 7
....
onNext 38, thread -> RxCachedThreadScheduler-1
sleep time : 128
onNext 39, thread -> RxCachedThreadScheduler-1
sleep time : 88
onNext 40, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
.....
sleep time : 74
onNext 59, thread -> RxCachedThreadScheduler-1
sleep time : 83
onNext 60, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
sleep time : 38
onNext 61, thread -> RxCachedThreadScheduler-1
sleep time : 148
onNext 62, thread -> RxCachedThreadScheduler-1
sleep time : 159

위 내용은 배출이 되지만 40번을 끝내고 다시 request를 해서 20번을 더 진행하고 계속 요청하고 있다.

이 부분을 보면 무엇이 떠오를까.. 앱개발시 리스트 조회시 fetch를 좀 더 효율적으로 가능할 수 있는 방법이 떠오른다.

추후에 적용해볼수 있을것 같다.'

그리고 처음 push 아이템은 언제 실행될까?

128(push) -> 96(request) -> 96(push) ->.... 이런식으로 진행된다. 보면 볼수록 매력이 있다...

주의할 점은 Subscriber 을 통해서 업스트림이 되는 게 아니란 점...

단지 그 요청을 상류쪽에서 중계를 방법을 결정하는 것입니다.

+ Recent posts