실제 연산자를 사용자가 구현이 가능하다.

관련해서 자세한 내용은

영문 : http://reactivex.io/documentation/implement-operator.html

한글 : http://reactivex.io/documentation/ko/implement-operator.html

일단 구현해보자.

아래 doOnEmpty는 빈값이 들어올때 함수가 실행되는 단순한 연산자이다.

public static <T> ObservableOperator<T,T> doOnEmpty(Action action) {
    return observer -> new DisposableObserver<T>() {
        boolean isEmpty = true;
        @Override
        public void onNext(T value) {
            isEmpty = false;
            System.out.println("doOnEmpty onNext");
            observer.onNext(value);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("doOnEmpty onError");
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            if(isEmpty){
                try {
                    System.out.println("doOnEmpty run");
                    action.run();
                }catch (Exception ex){
                    ex.printStackTrace();
                    onError(ex);
                    return;
                }
            }

            observer.onComplete();
        }
};

사용방법은

  1. 1~5부터 돌면서 빈값인지 체크 해서 Operation 1 : { 숫자 } 를 찍어주는 형태
  2. 빈값이 들어올때 동작되는 걸 체크 하는 형태
Observable.range( 1, 5)
        .lift(doOnEmpty(() ->  System.out.println("Operation 1 empty!")))
        .doOnNext( v -> System.out.println("Operation 1 : " + v))
        .test();

Observable.<Integer>empty()
        .lift(doOnEmpty(() -> System.out.println("Operation 2 Empty!")))
        .doOnNext( v -> System.out.println("Operation 2 : " + v))
        .test();

==========================================================================
doOnEmpty onNext
Operation 1 : 1
doOnEmpty onNext
Operation 1 : 2
doOnEmpty onNext
Operation 1 : 3
doOnEmpty onNext
Operation 1 : 4
doOnEmpty onNext
Operation 1 : 5
doOnEmpty run
Operation 2 Empty!

Operation 2 Empty! 가 찍혔는지가 중요하다.

마지막으로 나눠서 들어오는 경우 그걸 리스트로 변환해주는 연산자를 만들어보자

연산자 설명 : 들어온 값을 바로 배출(Push) 안하고 마지막에 한번에 배출하는 연산자..(toList() 와 유사하다.)

public static <T> ObservableOperator<List<T> , T> myToList(){
    return observer -> new DisposableObserver<T>() {
        ArrayList<T> list = new ArrayList<>();

        @Override
        public void onNext(T t) {
            //리스트에 추가만 하고 다운스트림(onNext) 내보내지 않는다.
            list.add(t);
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            //다운스트림으로 한번에 배출한다.
            observer.onNext(list);
            observer.onComplete();
        }
    };
}

이 연산자를 이제 사용해보자.

Observable<IndexedValue<String>> indexedValueObservable =
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(withIndex());

indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 1 : " + v))
        .test();
indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 2 : " + v))
        .test();

=========================================================================
Operation 1 onNext : [1, 2, 3, 4, 5]
Operation 2 onNext : []

값이 모아서 한번에 배출되고 있다.

만약 Observable이 아니라 Flowable로 변경시 FlowableOperator 로 변경 하면 된다.

public static <T> FlowableOperator<T,T> doOnEmpty(Action action) {
     return subscriber -> new DisposableSubscriber<T>() {
         boolean isEmpty = true;

         @Override
         public void onNext(T value) {
             isEmpty = false;
             subscriber.onNext(value);
         }

         @Override
         public void onError(Throwable t) {
             subscriber.onError(t);
         }

         @Override
         public void onComplete() {
             if (isEmpty) {
                 try {
                     action.run();
                 } catch (Exception e) {
                     onError(e);
                     return;
                 }
             }
             subscriber.onComplete();
         }
     };
 }


'스터디 > RxJava2' 카테고리의 다른 글

[Rxjava2]Blocking Functions  (0) 2017.10.22
[RxJava2]Test시 Blocking Subscriber 방법  (0) 2017.10.22
[RxJava] Transform에서 공유 부분 피하기  (0) 2017.10.22
[RxJava2] Compose with Parameters  (0) 2017.10.22
[RxJava2]Compose 활용  (0) 2017.10.22

+ Recent posts