RxJava 2부터 Disposable 형태로 변경됐다.
일반적인 형태로 많이 쓰는건 consumer 형태이다.
예를 보자.
Observable<Integer> testObservable = Observable.range(1 , 10);
Disposable disposable1 = testObservable
.doOnDispose(() -> System.out.println("onDispse 1"))
.doOnComplete(() -> System.out.println("onCompleted 1"))
.subscribe(item -> System.out.println("emitted 1 -> " + item)); //consumer 로 진행
System.out.println( "isDisposed -> " + disposable1.isDisposed());
..........
emitted 1 -> 1
emitted 1 -> 2
emitted 1 -> 3
emitted 1 -> 4
emitted 1 -> 5
emitted 1 -> 6
emitted 1 -> 7
emitted 1 -> 8
emitted 1 -> 9
emitted 1 -> 10
onCompleted 1
isDisposed -> true <- 이미 dispose가 되었다.
consumer 사용시 doOnDispose 이벤트가 안 타지만 이미 dispose가 된걸 볼 수 있다.
하지만 이런 경우를 보자.
public static DisposableObserver<Integer> getDispose(int pos){
DisposableObserver<Integer> disposableObserver = new DisposableObserver<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("emitted " + pos + " -> " + item);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
return disposableObserver;
}
testObservable
.doOnDispose(() -> System.out.println("onDispse 2"))
.doOnComplete(() -> System.out.println("onCompleted 2"))
.subscribe(getDispose(2));
disposableObser를 제공시 void 형태라서 메모리릭이 걸릴수 있다.
그럴때
.subscribe(getDispose(3));
- CompositeDispoable 을 호출해서 destory 시 clear를 해준다.
- .dispose() 를 해서 제거를 해준다.
Disposable disposable3 = testObservable
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
System.out.println("onDispse 3");
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("onCompleted 3");
}
})
.subscribeWith(getDispose(3));
compositeDisposable.add(disposable3);
compositeDisposable.clear(); // or disposable3.dispose();
추가적으로
Observable 와 Observer를 연결을 하는 경우에 만약 complete가 발생안되는 경우에는 계속 Observer는 배출 되기를 기다릴 수 있습니다. 이럴때 Dispose를 통해서 자원을 해제를 해주셔야 합니다. 다행히 completed가 발생하면 자원이 해제됩니다.
package io.reactivex.disposables;
public interface Disposable {
void dispose();
boolean isDisposed();
}
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS);
Disposable disposable =
seconds.subscribe(l -> System.out.println("Received: " + l));
//sleep 5 seconds
sleep(5000);
//dispose and stop emissions
disposable.dispose();
//sleep 5 seconds to prove
//there are no more emissions
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5초동안 interval로 실행했지만 추후 dispose 된 Observable는 더이상 배출이 안되는 걸 볼 수 있습니다. 기본적으로 disposable은 내부적으로 구독과 동시에 생성이 되서 어디에서든 사용해서 해제를 할 수 있도록 합니다.
Observer<Integer> myObserver = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
}
@Override
public void onNext(Integer value) {
//has access to Disposable
}
@Override
public void onError(Throwable e) {
//has access to Disposable
}
@Override
public void onComplete() {
//has access to Disposable
}
};
만약 이 Observer을 좀 더 확장을 원한다면 ResourceObserver를 만들어서 사용이 가능합니다. 확장된 observer는 subscribeWith로 통해서 구독을 할 수 있습니다.
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.ResourceObserver;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> source =
Observable.interval(1, TimeUnit.SECONDS);
ResourceObserver<Long> myObserver = new
ResourceObserver<Long>() {
@Override
public void onNext(Long value) {
System.out.println(value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};
//capture Disposable
Disposable disposable = source.subscribeWith(myObserver);
}
}
'스터디 > RxJava2' 카테고리의 다른 글
RxKotlin 정리 (0) | 2017.11.06 |
---|---|
[RxJava2]동시성과 병렬화 테스트 (0) | 2017.10.22 |
[RxJava2] 디버깅 하는 기본적인 방법 (1) | 2017.10.22 |
[RxJava2]TestSubscriber 을 이용해서 시간을 조절 해보자 (0) | 2017.10.22 |
[RxJava2]TestObserver , TestSubscriber 에 대해서 알아보자 (0) | 2017.10.22 |