«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

[RxJava2]SubscriberWith는 무엇일까? 본문

스터디/RxJava2

[RxJava2]SubscriberWith는 무엇일까?

행복한 수지아빠 2017. 10. 22. 13:05

RxJava 2부터 Disposable 형태로 변경됐다.

일반적인 형태로 많이 쓰는건 consumer 형태이다.

예를 보자.

Observable<Integer> testObservable = Observable.range(1 , 10);
Disposable disposable1 = testObservable
    .doOnDispose(() -> System.out.println("onDispse 1"))
    .doOnComplete(() -> System.out.println("onCompleted 1"))
    .subscribe(item -> System.out.println("emitted 1 -> " + item)); //consumer 로 진행

System.out.println( "isDisposed -> " + disposable1.isDisposed());
..........
emitted 1 -> 1
emitted 1 -> 2
emitted 1 -> 3
emitted 1 -> 4
emitted 1 -> 5
emitted 1 -> 6
emitted 1 -> 7
emitted 1 -> 8
emitted 1 -> 9
emitted 1 -> 10
onCompleted 1
isDisposed -> true <- 이미 dispose가 되었다.

consumer 사용시 doOnDispose 이벤트가 안 타지만 이미 dispose가 된걸 볼 수 있다.

하지만 이런 경우를 보자.

public static DisposableObserver<Integer> getDispose(int pos){
        DisposableObserver<Integer> disposableObserver = new DisposableObserver<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("emitted " + pos + " -> " + item);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        return disposableObserver;
    }
testObservable
    .doOnDispose(() -> System.out.println("onDispse 2"))
    .doOnComplete(() -> System.out.println("onCompleted 2"))
    .subscribe(getDispose(2));

disposableObser를 제공시 void 형태라서 메모리릭이 걸릴수 있다.

그럴때

.subscribe(getDispose(3));
  1. CompositeDispoable 을 호출해서 destory 시 clear를 해준다.
  2. .dispose() 를 해서 제거를 해준다.
Disposable disposable3 = testObservable
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onDispse 3");
            }
        })
        .doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onCompleted 3");
            }
        })
        .subscribeWith(getDispose(3));

compositeDisposable.add(disposable3);
compositeDisposable.clear(); // or disposable3.dispose();


추가적으로

Observable 와 Observer를 연결을 하는 경우에 만약 complete가 발생안되는 경우에는 계속 Observer는 배출 되기를 기다릴 수 있습니다. 이럴때 Dispose를 통해서 자원을 해제를 해주셔야 합니다. 다행히 completed가 발생하면 자원이 해제됩니다.

package io.reactivex.disposables;

    public interface Disposable {
      void dispose();
      boolean isDisposed();
    }

		import io.reactivex.Observable;
    import io.reactivex.disposables.Disposable;
    import java.util.concurrent.TimeUnit;

    public class Launcher {

      public static void main(String[] args) {

        Observable<Long> seconds =
          Observable.interval(1, TimeUnit.SECONDS);

        Disposable disposable =
          seconds.subscribe(l -> System.out.println("Received: " + l));

        //sleep 5 seconds
        sleep(5000);

        //dispose and stop emissions
        disposable.dispose();

        //sleep 5 seconds to prove
        //there are no more emissions
        sleep(5000);

      }

      public static void sleep(int millis) {
        try {
          Thread.sleep(millis);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
       }
     }

5초동안 interval로 실행했지만 추후 dispose 된 Observable는 더이상 배출이 안되는 걸 볼 수 있습니다. 기본적으로 disposable은 내부적으로 구독과 동시에 생성이 되서 어디에서든 사용해서 해제를 할 수 있도록 합니다.

Observer<Integer> myObserver = new Observer<Integer>() {
      private Disposable disposable;

      @Override
      public void onSubscribe(Disposable disposable) {
        this.disposable = disposable;
      }

      @Override
      public void onNext(Integer value) {
        //has access to Disposable
      }

      @Override
      public void onError(Throwable e) {
        //has access to Disposable
      }

      @Override
      public void onComplete() {
        //has access to Disposable
       }
     };

만약 이 Observer을 좀 더 확장을 원한다면 ResourceObserver를 만들어서 사용이 가능합니다. 확장된 observer는 subscribeWith로 통해서 구독을 할 수 있습니다.

import io.reactivex.Observable;
    import io.reactivex.disposables.Disposable;
    import io.reactivex.observers.ResourceObserver;
    import java.util.concurrent.TimeUnit;

    public class Launcher {
      public static void main(String[] args) {

        Observable<Long> source =
        Observable.interval(1, TimeUnit.SECONDS);

         ResourceObserver<Long> myObserver = new  
         ResourceObserver<Long>() {
          @Override
          public void onNext(Long value) {
            System.out.println(value);
          }

          @Override
          public void onError(Throwable e) {
            e.printStackTrace();
          }

          @Override
          public void onComplete() {
            System.out.println("Done!");
          }
        };

        //capture Disposable
        Disposable disposable = source.subscribeWith(myObserver);
       }
      }