실제 연산자를 사용자가 구현이 가능하다.
관련해서 자세한 내용은
영문 : http://reactivex.io/documentation/implement-operator.html
한글 : http://reactivex.io/documentation/ko/implement-operator.html
일단 구현해보자.
아래 doOnEmpty
는 빈값이 들어올때 함수가 실행되는 단순한 연산자이다.
public static <T> ObservableOperator<T,T> doOnEmpty(Action action) {
return observer -> new DisposableObserver<T>() {
boolean isEmpty = true;
@Override
public void onNext(T value) {
isEmpty = false;
System.out.println("doOnEmpty onNext");
observer.onNext(value);
}
@Override
public void onError(Throwable e) {
System.out.println("doOnEmpty onError");
observer.onError(e);
}
@Override
public void onComplete() {
if(isEmpty){
try {
System.out.println("doOnEmpty run");
action.run();
}catch (Exception ex){
ex.printStackTrace();
onError(ex);
return;
}
}
observer.onComplete();
}
};
사용방법은
- 1~5부터 돌면서 빈값인지 체크 해서 Operation 1 : { 숫자 } 를 찍어주는 형태
- 빈값이 들어올때 동작되는 걸 체크 하는 형태
Observable.range( 1, 5)
.lift(doOnEmpty(() -> System.out.println("Operation 1 empty!")))
.doOnNext( v -> System.out.println("Operation 1 : " + v))
.test();
Observable.<Integer>empty()
.lift(doOnEmpty(() -> System.out.println("Operation 2 Empty!")))
.doOnNext( v -> System.out.println("Operation 2 : " + v))
.test();
==========================================================================
doOnEmpty onNext
Operation 1 : 1
doOnEmpty onNext
Operation 1 : 2
doOnEmpty onNext
Operation 1 : 3
doOnEmpty onNext
Operation 1 : 4
doOnEmpty onNext
Operation 1 : 5
doOnEmpty run
Operation 2 Empty!
Operation 2 Empty!
가 찍혔는지가 중요하다.
마지막으로 나눠서 들어오는 경우 그걸 리스트로 변환해주는 연산자를 만들어보자
연산자 설명 : 들어온 값을 바로 배출(Push) 안하고 마지막에 한번에 배출하는 연산자..(toList() 와 유사하다
.)
public static <T> ObservableOperator<List<T> , T> myToList(){
return observer -> new DisposableObserver<T>() {
ArrayList<T> list = new ArrayList<>();
@Override
public void onNext(T t) {
//리스트에 추가만 하고 다운스트림(onNext) 내보내지 않는다.
list.add(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
//다운스트림으로 한번에 배출한다.
observer.onNext(list);
observer.onComplete();
}
};
}
이 연산자를 이제 사용해보자.
Observable<IndexedValue<String>> indexedValueObservable =
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.compose(withIndex());
indexedValueObservable
.doOnNext(v -> System.out.println("Subscriber 1 : " + v))
.test();
indexedValueObservable
.doOnNext(v -> System.out.println("Subscriber 2 : " + v))
.test();
=========================================================================
Operation 1 onNext : [1, 2, 3, 4, 5]
Operation 2 onNext : []
값이 모아서 한번에 배출되고 있다.
만약 Observable
이 아니라 Flowable
로 변경시 FlowableOperator 로 변경 하면 된다.
public static <T> FlowableOperator<T,T> doOnEmpty(Action action) {
return subscriber -> new DisposableSubscriber<T>() {
boolean isEmpty = true;
@Override
public void onNext(T value) {
isEmpty = false;
subscriber.onNext(value);
}
@Override
public void onError(Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
if (isEmpty) {
try {
action.run();
} catch (Exception e) {
onError(e);
return;
}
}
subscriber.onComplete();
}
};
}
'스터디 > RxJava2' 카테고리의 다른 글
[Rxjava2]Blocking Functions (0) | 2017.10.22 |
---|---|
[RxJava2]Test시 Blocking Subscriber 방법 (0) | 2017.10.22 |
[RxJava] Transform에서 공유 부분 피하기 (0) | 2017.10.22 |
[RxJava2] Compose with Parameters (0) | 2017.10.22 |
[RxJava2]Compose 활용 (0) | 2017.10.22 |