BlockingFirst() , BlockingSingle()

이름 그대로 처음꺼만 가져오는 함수를 말한다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");

String firstItem = source.filter( s -> s.length() == 5).blockingFirst();
String singleItem = source.filter( s -> s.length() == 4).take(1).blockingSingle();

assertTrue( firstItem.equals("Alpha"));
assertTrue( singleItem.equals("Beta"));

결과 : Success

BlocingSingle()은 결과값이 하나로 나와야 정상작동되는 점을 유의해야 한다. (그래서 take(1) 로 지정)

BlockingGet()

Maybe , Single 의 경우 blockingFirst()를 가지고 있지 않다. 하나 또는 없음을 배출할테니..

이럴때 BlockingGet() 을 사용해서 값을 가져오면 된다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");

//리스트를 만들어서 하나로 만들어줍니다.
List<String> sourceList = source.filter(s -> s.length() == 4).toList().blockingGet();

assertTrue( sourceList.equals( Arrays.asList( "Beta" , "Zeta")));

BlockingLast()

Observable , Flowable 에서 마지막 값을 리턴시 사용한다.

주의할점은 onComplete() 가 실행전에는 값을 리턴을 하지 않는다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");
String lastItemFromSource = source.filter(s -> s.length() == 4).blockingLast();

assertTrue(lastItemFromSource.equals("Zeta"));

주의점

BlockingFirst() , BlockingLast()사용시 아이템이 없으면 no emissions 예외 오류가 나니 기본값 설정을 해주는게 좋다.

만약 리스트 형태로 받고 싶을때 어떻게 할까?

그럴때 BlockingIterable() 사용하면 된다.

BlockingIterable()

값을 리스트형태로 받아준다. onComplete() 된 시점의 값들을 받아온다. 배압(Backpressure)이 없기 때문에 OutOfMemoryException이 발생될 수 있기 때문에 주의 해서 사용해야 한다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");
Iterable<String> sourceIterable = source.filter( s -> s.length() == 5).blockingIterable();

for(String s:sourceIterable) {
    assertTrue( s.length() == 5); // 5자리가 맞는지 루프 돌면서 체크
}

위와 같은 형태의 문제점은 아이템을 다 배출하고 테스트하는데 있다.

만약 아이템을 하나씩 받을때 직접 테스트 하고 싶은 경우 어떻게 해야 할까?

그런 경우 BlockingForEach() 을 사용하면 된다.

BlockingForEach()

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");

//하나씩 배출하면서 검사를 한다. 
source.filter(s->s.length() == 5).blockingForEach(s -> assertTrue(s.length() == 5));

BlockingNext() <- 좀 더 이해가 필요함

next된 것만 가져와서 테스트를 진행하는 것 같음..공부가 좀 더 필요함..

Observable<Long> source = Observable.interval(1 , TimeUnit.MICROSECONDS).take(1000);

Iterable<Long> iterable = source.blockingNext();

for(Long i : iterable) {
    System.out.println(i);
}

결과값

0
5
11
15
18
20
23
27
31
35
39
44
48
52
58
62
66
69
72
76
80
83
86
90
94
97
100
...

BlockingLastest() <- 좀 더 이해가 필요함

캡쳐되지 않는 값은 잊어버리고 ... 좀더 공부를 해야겠다.

Observable<Long> source = Observable.interval(1 , TimeUnit.MICROSECONDS).take(1000);

Iterable<Long> iterable = source.blockingLatest();

for(Long i : iterable){
    System.out.println(i);
}

출력값은

0
127
135
140
144
147
151
154
157
161
164
168
170
172
175
177
179
181
183
185
187
189
191
193
...

BlockingMostRecent()

TheblockingMostRecent()is similar toblockingLatest(), but it will re-consume the latest value repeatedly for everynext()

call from the iterator even if it was consumed already. It also requires adefaultValueargument so it has something to return if no value is emitted yet. Here, we useblockingMostRecent()against an Observableemitting every 10 milliseconds. The default value is-1, and it consumes each value repeatedly until the next value is provided:

Observable<Long> source = Observable.interval(10 , TimeUnit.MILLISECONDS).take(5);

//기본값이 필요하다.
Iterable<Long> iterable = source.blockingMostRecent( -1L );

System.out.println( "iterable" + iterable );

for(Long i : iterable) {
    System.out.println(i);
}

결과값

-1
-1
-1
...
0
0
0
...
1
1
1
...


일반적으로 Observable 을 테스트시 Blocking을 사용한다.

먼저 일반 코드를 작성하면 다음과 같을겁니다.

AtomicInteger hitcount = new AtomicInteger();
Observable<Long> source = io.reactivex.Observable.interval( 1 , TimeUnit.SECONDS).take(5);
source.subscribe( i -> hitcount.incrementAndGet());

assertTrue( hitcount.get() == 5);

여기에서 5개를 가져오는 일반적인 코드이다. 결과는 실패이다.

이유는 interval 함수를 보면 thread가 computation 이다. 그래서 가져올수 없다.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, TimeUnit unit) {
    return interval(period, period, unit, Schedulers.computation());
}

그럴때 방법이 2가지가 있지만 우선 Blocking해서 값을 가져와서 테스트 해보는 방법을 해보자.

AtomicInteger hitcount = new AtomicInteger();
Observable<Long> source = io.reactivex.Observable.interval( 1 , TimeUnit.SECONDS).take(5);
source.blockingSubscribe( i -> hitcount.incrementAndGet());

assertTrue( hitcount.get() == 5);

결과는 성공

위 코드에서 subscribe -> blockingSubscribe을 함으로써 값을 제대로 가져올수 있다.

쓰레드 상관없이 값을 가져오고 싶을땐 BlockingSubscribe을 써서 테스트가 가능하다.

주의사항으로

절대 테스트말고 다른 곳에선 blocking을 사용하지 말자. 해보면 이유를 바로 알수있다.

실제 연산자를 사용자가 구현이 가능하다.

관련해서 자세한 내용은

영문 : http://reactivex.io/documentation/implement-operator.html

한글 : http://reactivex.io/documentation/ko/implement-operator.html

일단 구현해보자.

아래 doOnEmpty는 빈값이 들어올때 함수가 실행되는 단순한 연산자이다.

public static <T> ObservableOperator<T,T> doOnEmpty(Action action) {
    return observer -> new DisposableObserver<T>() {
        boolean isEmpty = true;
        @Override
        public void onNext(T value) {
            isEmpty = false;
            System.out.println("doOnEmpty onNext");
            observer.onNext(value);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("doOnEmpty onError");
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            if(isEmpty){
                try {
                    System.out.println("doOnEmpty run");
                    action.run();
                }catch (Exception ex){
                    ex.printStackTrace();
                    onError(ex);
                    return;
                }
            }

            observer.onComplete();
        }
};

사용방법은

  1. 1~5부터 돌면서 빈값인지 체크 해서 Operation 1 : { 숫자 } 를 찍어주는 형태
  2. 빈값이 들어올때 동작되는 걸 체크 하는 형태
Observable.range( 1, 5)
        .lift(doOnEmpty(() ->  System.out.println("Operation 1 empty!")))
        .doOnNext( v -> System.out.println("Operation 1 : " + v))
        .test();

Observable.<Integer>empty()
        .lift(doOnEmpty(() -> System.out.println("Operation 2 Empty!")))
        .doOnNext( v -> System.out.println("Operation 2 : " + v))
        .test();

==========================================================================
doOnEmpty onNext
Operation 1 : 1
doOnEmpty onNext
Operation 1 : 2
doOnEmpty onNext
Operation 1 : 3
doOnEmpty onNext
Operation 1 : 4
doOnEmpty onNext
Operation 1 : 5
doOnEmpty run
Operation 2 Empty!

Operation 2 Empty! 가 찍혔는지가 중요하다.

마지막으로 나눠서 들어오는 경우 그걸 리스트로 변환해주는 연산자를 만들어보자

연산자 설명 : 들어온 값을 바로 배출(Push) 안하고 마지막에 한번에 배출하는 연산자..(toList() 와 유사하다.)

public static <T> ObservableOperator<List<T> , T> myToList(){
    return observer -> new DisposableObserver<T>() {
        ArrayList<T> list = new ArrayList<>();

        @Override
        public void onNext(T t) {
            //리스트에 추가만 하고 다운스트림(onNext) 내보내지 않는다.
            list.add(t);
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            //다운스트림으로 한번에 배출한다.
            observer.onNext(list);
            observer.onComplete();
        }
    };
}

이 연산자를 이제 사용해보자.

Observable<IndexedValue<String>> indexedValueObservable =
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(withIndex());

indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 1 : " + v))
        .test();
indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 2 : " + v))
        .test();

=========================================================================
Operation 1 onNext : [1, 2, 3, 4, 5]
Operation 2 onNext : []

값이 모아서 한번에 배출되고 있다.

만약 Observable이 아니라 Flowable로 변경시 FlowableOperator 로 변경 하면 된다.

public static <T> FlowableOperator<T,T> doOnEmpty(Action action) {
     return subscriber -> new DisposableSubscriber<T>() {
         boolean isEmpty = true;

         @Override
         public void onNext(T value) {
             isEmpty = false;
             subscriber.onNext(value);
         }

         @Override
         public void onError(Throwable t) {
             subscriber.onError(t);
         }

         @Override
         public void onComplete() {
             if (isEmpty) {
                 try {
                     action.run();
                 } catch (Exception e) {
                     onError(e);
                     return;
                 }
             }
             subscriber.onComplete();
         }
     };
 }


'스터디 > RxJava2' 카테고리의 다른 글

[Rxjava2]Blocking Functions  (0) 2017.10.22
[RxJava2]Test시 Blocking Subscriber 방법  (0) 2017.10.22
[RxJava] Transform에서 공유 부분 피하기  (0) 2017.10.22
[RxJava2] Compose with Parameters  (0) 2017.10.22
[RxJava2]Compose 활용  (0) 2017.10.22

자신만의 커스텀 함수를 만들어서 사용시 공유되는 이슈가 발생된다.

예를 들어 보자.

인덱스를 넣고 값을 저장하는 IndexedValue데이터 클래스를 먼저 만든다.

static final class IndexedValue<T> {
    final int index;
    final T value;

    IndexedValue(int index , T value){
        this.index = index;
        this.value = value;
    }

    @Override
    public String toString() {
        return index + " - " + value;
    }
}

그리고 compose에 들어갈 커스텀 함수를 추가한다.

원하는 결과값은 index - value 을 나오는게 목표이다.

static <T>ObservableTransformer<T,IndexedValue<T>> withIndex(){
    final AtomicInteger indexer = new AtomicInteger(-1);
    return upstream -> upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet() , v));
}

아래는 테스트 함수이다.

Observable<IndexedValue<String>> indexedValueObservable =
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(withIndex());

indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 1 : " + v))
        .test();
indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 2 : " + v))
        .test();

============================================================================
Subscriber 1 : 0 - Alpha
Subscriber 1 : 1 - Beta
Subscriber 1 : 2 - Gamma
Subscriber 1 : 3 - Delta
Subscriber 1 : 4 - Epsilon
Subscriber 2 : 5 - Alpha
Subscriber 2 : 6 - Beta
Subscriber 2 : 7 - Gamma
Subscriber 2 : 8 - Delta
Subscriber 2 : 9 - Epsilon

위 결과에서 보듯이 값이 서로 공유되서 나오는 문제를 볼수 있다.

이유는 싱글 인스턴스인 AtomicInteger이기 때문이다.

이러한 문제를 해결하기 위해 defer 또는 fromCallable 을 추천한다.

static <T>ObservableTransformer<T,IndexedValue<T>> withIndex(){
    return upstream -> Observable.defer(() -> {
        final AtomicInteger indexer = new AtomicInteger(-1);
        return upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet() , v));
    });
}

이제 결과들이 공유되지 않는 걸 볼수 있다.

Subscriber 1 : 0 - Alpha
Subscriber 1 : 1 - Beta
Subscriber 1 : 2 - Gamma
Subscriber 1 : 3 - Delta
Subscriber 1 : 4 - Epsilon
Subscriber 2 : 0 - Alpha
Subscriber 2 : 1 - Beta
Subscriber 2 : 2 - Gamma
Subscriber 2 : 3 - Delta
Subscriber 2 : 4 - Epsilon

결론은 싱글 인스턴스 데이터를 사용시 공유되는 문제를 해결하기 위해 defer 또는 fromCallabe을 사용해서 오류를 피하자.

파라미터를 통해서 compose를 활용하는 방법에 대해서 알아보자.

우선 다음과 같은 내용이 있다.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .collect( StringBuilder::new , (b,s) -> {
            if ( b.length() == 0)
                b.append(s);
            else
                b.append("/").append(s);
        })
        .map(StringBuilder::toString)
        .subscribe(System.out::println);

====================================================================
Alpha/Beta/Gamma/Delta/Epsilon

주어진 문자열에 뒤에 "/" 을 추가를 해주는 아주 간단한 내용이다.

이 내용을 Compose로 변경시..

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(joinToString("/"))
                .subscribe(System.out::println);
public static ObservableTransformer<String , String> joinToString(String separator) {
    return upstream -> upstream
                        .collect( StringBuilder::new , (b,s) -> {
                            if ( b.length() == 0)
                                b.append(s);
                            else
                                b.append(separator).append(s);
                        })
                        .map(StringBuilder::toString)
                        .toObservable();
}

구분자를 넘겨서 공통함수로 뺄수 있다.

+

이런식으로 인자값을 넣어서 자주쓰는 함수를 자기만의 커스텀함수로 빼서 사용하도록 하자. (리팩토링도 할겸..)

하지만 주의 사항이 있다. 이러한 커스텀을 사용시 여러번의 Subscriber에서 자원을 공유하는 버그등이 발생될 수 있다.

이 부분에 대해서 피할 수 있는지 좀 더 알아보는 시간을 다음에 가져보자.

중복되어 있는 걸 커스텀 공통 함수로 빼고 compose로 활용하는 방법에 대해서 알아보자.

public static <T>ObservableTransformer<T , ImmutableList<T>> toImmutableList() {
        return upstream -> upstream.collect(ImmutableList::<T>builder , ImmutableList.Builder::add)
                .map(ImmutableList.Builder::build)
                //반드시 Single 또는 Observable로 리턴해야한다. 
                //Flowable -> toFlowable();
                .toObservable();
}

일단 공통 함수를 작성한다. 내용은 간단하다.

소스가 들어오면 collect로 imutableList로 변형해서 다시 옵저버 ( or Flowable)로 돌려준다.

그리고 해당 공통 함수에 다시 compose 에 이 추가된 내용을 넣어준다.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .compose(toImmutableList())
        .subscribe(System.out::println);

Observable.range(1 , 15)
        .compose(toImmutableList())
        .subscribe(System.out::println);

=================================================================
[Alpha, Beta, Gamma, Delta, Epsilon]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

코드가 줄어든게 보일것이다. 기본적인 활용은 이런식으로 만들 수 있다.

ObservableTransformer

우선 compose를 사용하는 방법에 대해서 알아보자.

compose를 사용하는 주된 이유는 어떠한 공통된 일련의 동작을 커스텀해서 변형할수 있다.

우선 사용하기 전 형태를 살펴보자.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .collect(ImmutableList::builder , ImmutableList.Builder::add)
        .map(ImmutableList.Builder::build)
        .subscribe(System.out::println);

Observable.range(1 , 15)
        .collect(ImmutableList::builder , ImmutableList.Builder::add)
        .map(ImmutableList.Builder::build)
        .subscribe(System.out::println);

''''
[Alpha, Beta, Gamma, Delta, Epsilon]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

위 내용은 주어진 문자열을 collect 함수로 이용해서 imutableList( Guava 라이버러리 ) 형태로 변경하고 다시 그걸 Builder -> List로 변경해주는 보통 RxJava 함수이다.

.map(ImmutableList.Builder::build)

을 추가시 SingleObserver<ImmutableList> 형태로 받지만..

하지만 안하는 경우 SingleObserver<ImmutableList.Builder>

으로 받기 때문에 map을 넣어서 변형을 해준다.

그래서 저기 두개의 내용을 보게되면

.collect(ImmutableList::builder , ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)

이 2개의 부분에서 중복적으로 사용되는 걸 볼수 있다.

Flowable.generate()

배압을 직접 관리가 가능하다. Flowable.generate 는 Consumer<Emitter<T>> 을 구현하면 된다.

예를 보자.

public static void main(String[] args){
    randomGenerator( 1, 10000)
            .subscribeOn(Schedulers.computation())
            .doOnNext(i -> System.out.println("Emitting " + i))
            .observeOn(Schedulers.io())
            .subscribe(i -> {
                sleep(5);
                System.out.println("Received item : " + i);
            });

    sleep(5000);
}

static Flowable<Integer> randomGenerator(int min , int max) {
    return Flowable.generate( emitter -> emitter.onNext(ThreadLocalRandom.current().nextInt(min , max)));
}
............
 ...
 Emitting 8014
 Emitting 3112
 Emitting 5958
 Emitting 4834 //128th emission
 Received 9563
 Received 4359
 Received 9362
 ...
 Received 4880
 Received 3192
 Received 979 //96th emission
 Emitting 8268
 Emitting 3889
 Emitting 2595
...

128 -> 96 -> 96 순서로 이루어지고 있다.

주의할 점은 onNext 를 한번만 사용가능하다. 그 이상 사용시 IllgalStateException 이 발생된다.

public static void main(String[] args) {
        rangeReverse(100,-100)
                 .subscribeOn(Schedulers.computation())
                 .doOnNext(i -> System.out.println("Emitting " + i))
                 .observeOn(Schedulers.io())
                 .subscribe(i -> {
                     sleep(50);
                     System.out.println("Received " + i);
                 });
        sleep(50000);
     }
    static Flowable<Integer> rangeReverse(int upperBound, int lowerBound) {
         return Flowable.generate(() -> new AtomicInteger(upperBound + 1),
                 (state, emitter) -> {
                     int current = state.decrementAndGet();
                     emitter.onNext(current);
                     if (current == lowerBound)
                         emitter.onComplete();
                 }
         );

     }
.............
Emitting 100
 Emitting 99
 ...
 Emitting -25
 Emitting -26
 Emitting -27 //128th emission
 Received 100
 Received 99
 Received 98
 ...
 Received 7
 Received 6
 Received 5 // 96th emission
 Emitting -28
 Emitting -29
 Emitting -30

이런식으로 사용이 가능하다. Flowable.create() 사용보다 더 자세한 컨트롤이 가능하다.

Flowable 을 생성시 BackpressureStrategy 을 사용이 가능하다.

예제를 한번 보자.

Flowable<Integer> source = Flowable.create(emitter -> {
    for (int i=0;i<1000;i++) {
        if(emitter.isCancelled())
            return;

        emitter.onNext(i);
    }

    emitter.onComplete();    
}  , BackpressureStrategy.BUFFER); //BackpressureStrategy.BUFFER 을 인자로 넣어서 생성이 가능하다. 

source.observeOn(Schedulers.io())
        .subscribe(System.out::println);

sleep( 1000 );
..........
0
1
2
3
4
...

인자로 넣을 수 있는 건 5가지가 있다.

BackpressureStrategy설명
Missing배압을 적용안한다. 후에 onBackpressureXXX()등으로 컨트롤이 가능하다.
ErrorMissingBackpressureException 발생시 에러를 발생시킨다.
BufferonBackpressureBuffer() 와 같은 형태
Lastest다운스트림이 받을때까지 마지막꺼만 유지를 한다.

Observable -> Flowable 로 변경시에도 인자로 넣는게 가능하다.

Observable<Integer> source = Observable.range(1,1000);
source.toFlowable(BackpressureStrategy.BUFFER)

하지만 조심해야 한다. Buffer는 초과시 OutOfMemory 오류가 발생한다.

Flowable -> Observable 변경시에도 toObserable()으로 가능하다.

Flowable<Integer> integers =
                 Flowable.range(1, 1000)
                         .subscribeOn(Schedulers.computation());
        Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
                 .flatMap(s -> integers.map(i -> i + "-" + s)
                 //위로는 Backpressure 지원함                 
                 .toObservable())           
                 //아래로는 지원안함..만약 request를 할때에도 적용안되니 주의..(subscriber 참조)      
                 .subscribe(System.out::println);


만약 배출된 항목에 대해서 다시 조작하고 싶을 때 어떻게 하면 될까?

이 방법에 대해서 subscriber 을 적용해서 진행할 수 있다.

예를 보자.

public static void main(String[] args){
    Flowable.range(1, 100)
            .doOnNext( v -> System.out.println("push -> " + v))
            .observeOn(Schedulers.io())
            .map( i -> intenseCalculation(i))
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    System.out.println( "onSubscribe" );
                    s.request(10);
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
                    sleep( 50 );
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println( "onSubscribe" );
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });

    sleep( Long.MAX_VALUE );
}

public static <T> T intenseCalculation(T value){
    //최대 0.2초 sleep
    int time = ThreadLocalRandom.current().nextInt(200);
    System.out.println("sleep time : " + time);
    sleep( time );
    return value;
}

public static void sleep(long mills) {
    try{
        Thread.sleep( mills );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

...................
.....
push -> 124
push -> 125
push -> 126
push -> 127
push -> 128
onNext 1, thread -> RxCachedThreadScheduler-1
sleep time : 180
onNext 2, thread -> RxCachedThreadScheduler-1
sleep time : 165
onNext 3, thread -> RxCachedThreadScheduler-1
sleep time : 52
onNext 4, thread -> RxCachedThreadScheduler-1
sleep time : 63
......
onNext 95, thread -> RxCachedThreadScheduler-1
sleep time : 36
onNext 96, thread -> RxCachedThreadScheduler-1
push -> 129
push -> 130
push -> 131

위와 같이 Flowable (A) , Subscriber (B) 라고 가정하면

A(128개 배출) -> B(96개) -> A(96) -> .... 이런식으로 배출하는 걸 볼수 있다.

즉 subscriber 로 업스트림으로 진행이 가능하다.

자 그러면 이걸 활용을 어떤식으로 할수 있는지 보자.

Flowable.range(1, 1000)
        .doOnNext( v -> System.out.println("push -> " + v))
        .observeOn(Schedulers.io())
        .map( i -> intenseCalculation(i))
        .subscribe(new Subscriber<Integer>() {
            Subscription subscription;
            AtomicInteger count = new AtomicInteger(0);
            @Override
            public void onSubscribe(Subscription s) {
                this.subscription = s;
                System.out.println( "onSubscribe" );
                s.request(40);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
                sleep( 50 );
                if( count.incrementAndGet() % 20 == 0 && count.get() >= 40) {
                    System.out.println("Requesting 20 more!!");
                    subscription.request(20);
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println( "onSubscribe" );
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
...........
onSubscribe
push -> 1
push -> 2
push -> 3
push -> 4
push -> 5
push -> 6
push -> 7
....
onNext 38, thread -> RxCachedThreadScheduler-1
sleep time : 128
onNext 39, thread -> RxCachedThreadScheduler-1
sleep time : 88
onNext 40, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
.....
sleep time : 74
onNext 59, thread -> RxCachedThreadScheduler-1
sleep time : 83
onNext 60, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
sleep time : 38
onNext 61, thread -> RxCachedThreadScheduler-1
sleep time : 148
onNext 62, thread -> RxCachedThreadScheduler-1
sleep time : 159

위 내용은 배출이 되지만 40번을 끝내고 다시 request를 해서 20번을 더 진행하고 계속 요청하고 있다.

이 부분을 보면 무엇이 떠오를까.. 앱개발시 리스트 조회시 fetch를 좀 더 효율적으로 가능할 수 있는 방법이 떠오른다.

추후에 적용해볼수 있을것 같다.'

그리고 처음 push 아이템은 언제 실행될까?

128(push) -> 96(request) -> 96(push) ->.... 이런식으로 진행된다. 보면 볼수록 매력이 있다...

주의할 점은 Subscriber 을 통해서 업스트림이 되는 게 아니란 점...

단지 그 요청을 상류쪽에서 중계를 방법을 결정하는 것입니다.

Flowable.range()Flowable.just() , Flowable.fromIterable()Flowable.interval() 의 경우 배압(Backpressure) 이 설계가 되어있는 편이다.

하지만 Interval() 의 경우 시간에 따른 배출이 되는 데 이 부분에서 논리적으로 배압이 되긴 힘들다.

배출은 시간이 거리지만 이미 다운 스트림은 실행이 될수 있기 때문이다.

예를 들어보자.

public static void main(String[] args){
    Flowable.interval(1, TimeUnit.MICROSECONDS)
            .doOnNext( v -> System.out.println("push -> " + v))
            .observeOn(Schedulers.io())
            .map( i -> intenseCalculation(i))
            .subscribe( v->System.out.println("emit : " + v) , Throwable::printStackTrace);

    sleep( Long.MAX_VALUE );
}

public static <T> T intenseCalculation(T value){
    int time = ThreadLocalRandom.current().nextInt(3000);
    System.out.println("sleep time : " + time);
    sleep( time );
    return value;
}

public static void sleep(long mills) {
    try{
        Thread.sleep( mills );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

.................
o.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
    at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:96)
    at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
    ..............

위와 같이 푸시는 계속되고 있지만 순간적으로 배출이 되는 순간이 생긴다.

이렬 경우 MissingBackpressureException이 발생된다.

Flowable.interval() 을 제외한 나머지는 배압이 제대로 될 것이지만 시간관련된 이 함수는 오류가 생길수 있다.

이런 경우 onBackpressureDrop() 또는 onBackpressureBuffer() 을 사용해서 처리가 가능하다.

  1. onBackpressureDrop : Flowable(Observable)에서 항목을 보냈을때 바로 처리하지 못하는 데이터는 무시합니다.
  2. onBackpressureBuffer : Flowable(Observable)에서 보낸 항목을 큐에 쌓아 놓고 항목을 처리하는 쪽에서 해당항목을 나중에 처리할 수 있게 해줍니다. 인자값으로 (int) 를 넣을 경우 큐에 쌓아둘수 있는 수를 제한 할 수 있습니다.
.onBackpressureDrop() <- 위치는 여기에 해줘야 정상 동작된다.
.observeOn(Schedulers.io())

그럼 다음장에서는 Subscriber 에 대해서 알아보자.

이제 배압(Back pressure) 을 해결 하기 위해 Flowable(RxJava2 부터 도입) 을 했다.

한번 살펴보도록 하자.

//Observable -> Flowable 변경
Flowable.range( 1, 999_999_999)
        .map(MyItem::new)
        .observeOn(Schedulers.io())
        .subscribe(myItem -> {
            sleep(50);
            System.out.println("Received MyItem : " + myItem.id);
        });

sleep(Integer.MAX_VALUE);

''' 
Constructing MyItem 1
 Constructing MyItem 2
 Constructing MyItem 3
 ...
 Constructing MyItem 127
 Constructing MyItem 128
 Received MyItem 1
 Received MyItem 2
 Received MyItem 3
 ...
 Received MyItem 95
 Received MyItem 96
 Constructing MyItem 129
 Constructing MyItem 130
 Constructing MyItem 131
 ...
 Constructing MyItem 223
 Constructing MyItem 224
 Received MyItem 97

여기 결과에서 보면 아주 흥미롭다.

128개의 생성이 이루어지고 96개의 배출이 이루어지고 있다. 그리고 다시 96개의 생성이 이루어지고. 있다.

여기에서 재밌는 점은 왜 128개의 배출이 처음에 이루어지지 않았는가 이다.

만약 96개의 생성이 이루어지고 96개의 배출이 이루어진다고 가정하면 다음 96개의 작업에 대해서 준비를 하는 과정에서 시간이 딜레이가 될수 있다는 점이다.

이걸 유휴시간이라고 말하는데 그 시간동안 준비된 작업을 제공하여 처리량을 높일수 있다고 한다.

즉 공장에서 더 많은 것을 기다리는 동안 주문을 공급하기 위해 약간의 재고를 보유한 창고와 비슷하다고 보면 된다.

그럼 언제 Observable 을 써야 하며 Flowable은 또 언제 사용하는 것일까?

참고자료 : http://realignist.me/code/2017/01/25/rxjava2-changelog.html

Observable은 주로 푸시를 기반으로 하는 성격을 지닌다.

예를 들어 하나의 쓰레드에서 1에서 999,999,999까지 숫자를 내보내는 경우 이상없는 지 체크 해보자.

우선 옵저버를 만들고 푸시(배출)을 진행해보자.

public static void main(String[] args){
    Observable.range( 1, 999_999_999)
            .map(MyItem::new)
            .subscribe(myItem -> {
                sleep(50);
                System.out.println("Received MyItem : " + myItem.id);
            });
}

static void sleep(long millseconds) {
    try{
        Thread.sleep(millseconds);
    }catch (InterruptedException ex){
        ex.printStackTrace();
    }
}

static final class MyItem {
    final int id;

    MyItem(int id){
        this.id = id;
        System.out.println("Constructing MyItem " + id);
    }
}
=============================================================================
Constructing MyItem 1
Received MyItem : 1
Constructing MyItem 2
Received MyItem : 2
Constructing MyItem 3
Received MyItem : 3
Constructing MyItem 4
Received MyItem : 4
Constructing MyItem 5
Received MyItem : 5
Constructing MyItem 6
Received MyItem : 6
.....

결과를 보다시피 일관된 흐름으로 배출되는 걸 볼수 있다. .

하지만 만약 비동기적으로 호출시 어떻게 될까?

아래 예제는observeOn (다운 스트림 쓰레드) 를 적용했다.

public static void main(String[] args){
        Observable.range( 1, 999_999_999)
                .map(MyItem::new)
                .observeOn(Schedulers.io())
                .subscribe(myItem -> {
                    sleep(50);
                    System.out.println("Received MyItem : " + myItem.id);
                });

        sleep(Integer.MAX_VALUE);
}

그럼 결과가 간혹 이런문제가 생긴다.

...
 Constructing MyItem 1001899
 Constructing MyItem 1001900
 Constructing MyItem 1001901
 Constructing MyItem 1001902
 Received MyItem 38
 Constructing MyItem 1001903
 Constructing MyItem 1001904
 Constructing MyItem 1001905
 Constructing MyItem 1001906
 Constructing MyItem 1001907
 ..

결과를 보면 생성이 배출보다 많은 걸 볼수 있다.

이런 경우 배압(Backpressure)이 필요하다. 이 부분을 무시하면 OutOfMemoryError가 발생될수 있다.

다음 순서 예정입니다. 

  1. 배압이란 무엇인가.
  2. Flowable 과 Subscriber에 대한 이해
  3. Flowable.create() 을 사용해보는 시간
  4. Observables과 Flowables 상호관의 연동
  5. 배압관련 함수들
  6. Flowable.generate() 사용



SwitchMap()

flatMap과 비슷하지만 큰 차이점은 제일 마지막 것만 배출 한다는 차이가 있다.

예제를 보자.

public static void main(String[] args) {
    Observable<String> items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon",
            "Zeta", "Eta", "Theta", "Iota");

    Observable<String> processings = items.concatMap( s -> Observable.just(s).delay(randomSleepTime() , TimeUnit.MILLISECONDS));

    processings.subscribe( System.out::println);
    sleep( 20000 );
}

public static int randomSleepTime() {
    //returns random sleep time between 0 to 2000 milliseconds
    return ThreadLocalRandom.current().nextInt(2000);
}
public static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

...........
//0~2초 딜레이 랜덤으로 된다. 
Alpha
Beta
Gamma
Delta
Epsilon
Zeta
Eta
Theta
Iota

만약 매 5초간격으로 실행되는 프로그램이 있으며 내부적으로 동작되는 옵저버를 cancel 하고 새로운 아이템을 등록해서 배출 하고 싶다면 어떻게 해야 할까? 그럴 경우 switchMap() 을 사용해서 최신껄 바꿀수 있게 해준다.

Observable<String> items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon",
                "Zeta", "Eta", "Theta", "Iota");

Observable<String> processings = items.concatMap( s -> Observable.just(s).delay(randomSleepTime() , TimeUnit.MILLISECONDS));

Observable.interval(5 , TimeUnit.SECONDS)
        //5초 간격으로 dispose 되고 새로운 processing 이 만들어져서 리턴된다.
        .switchMap( i -> processings.doOnDispose( ()-> System.out.println("Disposing! Starting next")))
        .subscribe( System.out::println);

sleep( 20000 );
........
Alpha
Beta
Gamma
Delta
Epsilon
Disposing! Starting next
Alpha
Beta
Gamma
Delta
Disposing! Starting next
Alpha
Beta


Throttling

정해진 시간동안 발생된 걸 무시를 할 수 있는 함수이다.

  • ThrottleLast() : 정해진 시간안에 마지막 내용만 리턴한다.
  • ThrottleFirst() : 정해진 시간안에 처음 내용만 리턴한다.
  • ThrottleWithTimeout() : debuoune 와 유사하며 일정시간동안 모든 아이템은 무시한다.

우선 일반적인 형태를 보자.

Observable<String> source1 = Observable.interval(100 , TimeUnit.MILLISECONDS)
        .map(i-> (i+1) * 100)
        .map(i-> "Source 1 " + i)
        .take(10);

Observable<String> source2 = Observable.interval(300 , TimeUnit.MILLISECONDS)
        .map(i-> (i+1) * 300)
        .map(i-> "Source 2 " + i)
        .take(3);

Observable<String> source3 = Observable.interval(2000 , TimeUnit.MILLISECONDS)
        .map(i-> (i+1) * 2000)
        .map(i-> "Source 3 " + i)
        .take(2);

//concat 으로 순서대로 진행시킨다. 
Observable.concat(Arrays.asList( source1 , source2 , source3))
        .subscribe( System.out::println);

sleep
(6000);
............
Source 1 100
Source 1 200
Source 1 300
Source 1 400
Source 1 500
Source 1 600
Source 1 700
Source 1 800
Source 1 900
Source 1 1000
Source 2 300
Source 2 600
Source 2 900
Source 3 2000
Source 3 4000

throttleLast()

마지막 아이템만 리턴한다. ( sample() 이라고 불린다.)

Observable.concat(Arrays.asList( source1 , source2 , source3))
    .throttleLast( 1, TimeUnit.SECONDS)
    .subscribe( System.out::println);

........
Source 1 900 <- 1초안에서 제일 마지막 아이템만 리턴합니다. 
Source 2 900 <- 상동
Source 3 2000 <- 1초 안에 없다면 효과적으로 최신껄 끌어내는 걸 볼수 있다.

그럼 시간을 좀 더 늘릴 경우는 방출되는 아이템이 더 작아지는 걸 볼수 있다.

.throttleLast( 2, TimeUnit.SECONDS)

.........
Source 2 900
Source 3 2000

throttleFirst()

일정시간안에 최초 방출 아이템만 리턴한다.

Observable.concat(source1, source2, source3)
         .throttleFirst(1, TimeUnit.SECONDS)
         .subscribe(System.out::println);
........
SOURCE 1: 100
SOURCE 2: 300
SOURCE 3: 2000 <-- 1초안에 최초 아이템 리턴
SOURCE 3: 4000 <-- 다시 1초안에 최초 아이템 리턴

ThrottleWithTimeout() < debound() >

일정 시간에 있는 모든 아이템을 무시한다.

.throttleWithTimeout(1, TimeUnit.SECONDS)
..........
SOURCE 2: 900
SOURCE 3: 2000
SOURCE 3: 4000


Windowing

이전 장에서 배운 buffering과 비슷한 형태로 배열을 나누어서 다시 배열로 만들어주는 것이지만 하나의 큰 차이점은 있다.

바로 결과가 Observable형태로 나오는 것이다. map과 flatMap의 차이점으로 보면 된다.

정해진 사이즈의 Windowing

Observable.range(1 , 50)
        .window(8)
        .flatMapSingle(obs ->
            //reduce 더하기를 진행한다.         
                obs.reduce( "" , (total ,next) -> total + (total.equals("") ? "" : "|") + next ) )
        .subscribe(System.out::println);

.......
1|2|3|4|5|6|7|8
9|10|11|12|13|14|15|16
17|18|19|20|21|22|23|24
25|26|27|28|29|30|31|32
33|34|35|36|37|38|39|40
41|42|43|44|45|46|47|48
49|50

window사용시 리턴값이 Observable형태이기 때문에 다시 reduce 를 사용해서 리턴하고 있다.

skip 인자값

 .window(2, 3) //3번째를 무시하라..
 ........
 1|2
4|5
7|8
10|11
13|14
16|17
19|20
22|23
25|26

Time base Windowing

.window(1, TimeUnit.SECONDS)  
.........
300|600|900
1200|1500|1800
2100|2400|2700
3000|3300|3600|3900
4200|4500|4800

Boundary based windowing

Observable<Long> cutOffs =
                Observable.interval(1, TimeUnit.SECONDS);
        Observable.interval(300, TimeUnit.MILLISECONDS)
            .map(i -> (i + 1) * 300) // map to elapsed time
            .window(cutOffs)
            .flatMapSingle(obs -> obs.reduce("", (total, next) -> total
                + (total.equals("") ? "" : "|") + next))
            .subscribe(System.out::println);

.........
300|600|900
1200|1500|1800
2100|2400|2700
3000|3300|3600|3900
4200|4500|4800

Buffering과 매우 유사하며 차이점은 리턴값이 Observable형태인것만 기억하면 된다.

Buffering 은 주어진 배열을 지정된 갯수로 나누는 역할을 하게 된다.

io.reactivex.Observable.range(1,50)
                .buffer(8)
                .subscribe(System.out::println);

.......
[1, 2, 3, 4, 5, 6, 7, 8]
[9, 10, 11, 12, 13, 14, 15, 16]
[17, 18, 19, 20, 21, 22, 23, 24]
[25, 26, 27, 28, 29, 30, 31, 32]
[33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48]
[49, 50]

8개씩 나누는 과정에 깔끔하게 떨어지지 않으면 남은 만큼만 배열로 만들어서 보여주고 있다.

그리고 2번째 인자로 bufferSupplier 형태로 바꿔서 나눌 수 있다.

 .buffer(8 , HashSet::new)

skip으로 해당 번호번재를 뺄수도 있다.

.buffer(2 , 3) //2번째 인자는 skip 할 번호
......
[1, 2]
[4, 5] <- 3이 빠져있다. 
[7, 8]
[10]

여기에서 만약 count 보다 skip 숫자를 적게 넣게 되면 어떻게 될까?

.buffer(3 , 1)
..........
[1, 2, 3]
[2, 3, 4] <-- 이전 방출 이 포함되어 있는 걸 볼 수 있다.
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]
[7, 8, 9]
[8, 9, 10]
[9, 10]
[10]

이전방출량이 포함되는 형태가 된다. 그래서 이걸 활용을 하면 [ 이전 방출량 , 다음 방출량 ] 형태로 만들수 있습니다.

io.reactivex.Observable.range(1,10)
                .buffer(2 , 1)
                .filter( item -> item.size() == 2) //사이즈를 2개를 제한
                .subscribe( s-> System.out.println(s));
..........
[1, 2]
[2, 3]
[3, 4]
[4, 5]
[5, 6]
[6, 7]
[7, 8]
[8, 9]
[9, 10]

시간으로 나누는 Buffering

주어진 일정 시간동안 버퍼링을 나눠서 배출 할 수 있다.

io.reactivex.Observable.interval(300 , TimeUnit.MILLISECONDS)
                .map( i -> (i+1) * 300)
                .buffer(1, TimeUnit.SECONDS) // 1. 시간 2. 시간방법
                .subscribe( s-> System.out.println(s));

sleep(4000);
----------------
[300, 600, 900] <--
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]

이 예제는 1초간격으로 배열중 나오는 걸 Buffering해서 보여주고 있다.

io.reactivex.Observable.interval(300 , TimeUnit.MILLISECONDS)
                .map( i -> (i+1) * 300)
                .buffer(1, TimeUnit.SECONDS , 2) //3번째 인자는 Max 갯수이다.
                .subscribe( s-> System.out.println(s));

............
[300, 600, 900]
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]

최대 2개까지 1초동안 묶어서 배출하는 예제이다.

Boundary-based Buffering

또 하나 흥미로운 형태가 있다.

Observable 을 buffer 인자에 넣어서 사용이 가능하다.

다음의 예제는 1초 간격의 옵저버를 0.3 초간의 간격을 가진 옵저버에 바운더리해서 결과를 뽑아내는 방법이다.

io.reactivex.Observable<Long> cutOffs = io.reactivex.Observable.interval(1 , TimeUnit.SECONDS);

io.reactivex.Observable.interval(300 , TimeUnit.MILLISECONDS)
        .map( i -> (i+1) * 300)
        .buffer( cutOffs ) // 옵저버를 넣어서 간격 조절함..
        .subscribe( s-> System.out.println(s));

..........
[300, 600, 900]
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]
[4200, 4500, 4800]


'스터디 > RxJava2' 카테고리의 다른 글

[RxJava2]Throttling을 알아보자  (0) 2017.10.22
[Rxjava2]Windowing을 알아보자.  (0) 2017.10.22
[Rxjava2] 예제로 배우는 UnicastSubject  (0) 2017.10.22
[Rxjava2]AsyncSubject  (0) 2017.10.22
[RxJava2]ReplaySubject  (0) 2017.10.22

UnicastSubject

PublishSubject 와 굉장한 유사하지만 하나의 큰 차이점이 있다.

Observer 가 subscribe 를 한 후부터 값이 배출이 되기 시작한다.

예를 보자.

public static void main(String[] args) {

io.reactivex.subjects.Subject<String> subject = UnicastSubject.create();

System.out.println(currentTime() + "옵저버 등록시작");
Observable.interval(300 , TimeUnit.MILLISECONDS)
        .map( l -> (( l + 1) * 300) + " milliseconds")
        .subscribe(subject);

sleep(2000);

subject.subscribe( s -> System.out.println( currentTime() + " -- Observer 1 : " + s));

sleep(2000);

subject.subscribe( s -> System.out.println( currentTime() + " -- Observer 2 : " + s));

sleep(2000);

}

public static String currentTime(){
        Date date = new Date(System.currentTimeMillis());
        DateFormat formatter = new SimpleDateFormat("HH시 mm분 ss초");
        return formatter.format(date);
}

.............
58초옵저버 등록시작
00초 -- Observer 1 : 300 milliseconds <-- 2초 sleep 후 실행 (subscribe 후 시작됨) , 그리고 한꺼번에 캐쉬내용을 출력함
00초 -- Observer 1 : 600 milliseconds
00초 -- Observer 1 : 900 milliseconds
00초 -- Observer 1 : 1200 milliseconds
00초 -- Observer 1 : 1500 milliseconds
00초 -- Observer 1 : 1800 milliseconds
00초 -- Observer 1 : 2100 milliseconds
01초 -- Observer 1 : 2400 milliseconds
01초 -- Observer 1 : 2700 milliseconds
01초 -- Observer 1 : 3000 milliseconds
01초 -- Observer 1 : 3300 milliseconds
02초 -- Observer 1 : 3600 milliseconds
02초 -- Observer 1 : 3900 milliseconds

.......
io.reactivex.exceptions.OnErrorNotImplementedException: Only a single observer allowed.
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)

위에 보면 58초에 시작해서

기다리고 있다가 subscribe되는 순간 7개를 한꺼번에 배출 시킨다. 내부적으로 버퍼에 넣어두고 캐싱으로 사용한다.

배출함으로써 캐싱은 지워지게 된다.

그리고 하나의 subscribe 만 사용가능하며 두개이상일 경우 위와 같이 오류가 발생한다.

만약 두개이상 사용하기를 원하면 ReplaySubject를 고려하길 바란다.

그래도 굳이 해야하는 경우라면 다음과 같이 proxy 로 속여서 만들수 있습니다.

io.reactivex.subjects.Subject<String> subject = UnicastSubject.create();

System.out.println(currentTime() + " 옵저버 등록시작");
Observable.interval(300 , TimeUnit.MILLISECONDS)
        .map( l -> (( l + 1) * 300) + " milliseconds")
        .subscribe(subject);

sleep(2000);

//multicasting
Observable<String> multicast = subject.publish().autoConnect(); //자동연결로 만들기

multicast.subscribe( s -> System.out.println( currentTime() + " -- Observer 1 : " + s));
sleep(2000);

multicast.subscribe( s -> System.out.println( currentTime() + " -- Observer 2 : " + s));
sleep(2000);
.......
31초 옵저버 등록시작
33초 -- Observer 1 : 300 milliseconds <-- subscribe되는 동시에 한꺼번에 배출이 시작된다..
33초 -- Observer 1 : 600 milliseconds
33초 -- Observer 1 : 900 milliseconds
33초 -- Observer 1 : 1200 milliseconds
33초 -- Observer 1 : 1500 milliseconds
33초 -- Observer 1 : 1800 milliseconds
33초 -- Observer 1 : 2100 milliseconds
34초 -- Observer 1 : 2400 milliseconds
34초 -- Observer 1 : 2700 milliseconds
34초 -- Observer 1 : 3000 milliseconds
35초 -- Observer 1 : 3300 milliseconds
35초 -- Observer 1 : 3600 milliseconds
35초 -- Observer 1 : 3900 milliseconds
36초 -- Observer 1 : 4200 milliseconds
36초 -- Observer 2 : 4200 milliseconds
36초 -- Observer 1 : 4500 milliseconds
36초 -- Observer 2 : 4500 milliseconds
36초 -- Observer 1 : 4800 milliseconds
36초 -- Observer 2 : 4800 milliseconds
36초 -- Observer 1 : 5100 milliseconds
36초 -- Observer 2 : 5100 milliseconds
37초 -- Observer 1 : 5400 milliseconds
37초 -- Observer 2 : 5400 milliseconds
37초 -- Observer 1 : 5700 milliseconds
37초 -- Observer 2 : 5700 milliseconds
37초 -- Observer 1 : 6000 milliseconds
37초 -- Observer 2 : 6000 milliseconds


'스터디 > RxJava2' 카테고리의 다른 글

[Rxjava2]Windowing을 알아보자.  (0) 2017.10.22
[RxJava2]Buffering 에 대해서 알아보자.  (0) 2017.10.22
[Rxjava2]AsyncSubject  (0) 2017.10.22
[RxJava2]ReplaySubject  (0) 2017.10.22
[Rxjava2]BehaviorSubject 에 대해서 알아보자.  (0) 2017.10.22

AsyncSubject

AsyncSubject는 소스 Observable로부터 배출된 마지막 값(만)을 배출하고 소스 Observable의 동작이 완료된 후에야 동작한다. (만약, 소스 Observable이 아무 값도 배출하지 않으면AsyncSubject역시 아무 값도 배출하지 않는다.)

즉 쉽게 말하자면 onCompleted를 하기전 이벤트만 배출이 된다는 것이다. onCompleted가 호출이 되지 않으면 배출이 되지 않는다.

io.reactivex.subjects.Subject<String> subject = AsyncSubject.create();

subject.subscribe( v -> System.out.println("Observer 1: " + v) ,
        Throwable::printStackTrace,
        () -> System.out.println("Observer 1 done!!")
        );

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma"); <-- 이 값만 배출이 될것이다. 
//subject.onComplete(); <-- onCompleted 호출 

subject.subscribe( v -> System.out.println("Observer 2: " + v) ,
        Throwable::printStackTrace,
        () -> System.out.println("Observer 2 done!!")
);

그리고 내부적으로 Observable에 대해서 takeLast(1).replay(1) 을 사용하고 있다는 점을 유의해야 한다.

이 Subject는 java 8 에서 CompletableFuture과 유사합니다. 계산을을 수행해서 완료를 관찰하면서 값을 얻는 형식입니다.

ReplaySubject

이전 시간에 공부한 replay() , cache() 기능을 기억하는가.

이전에 배출한 모든 걸 캐싱해서 새로운 Observer등록시 캐싱된걸 다시 배출시키는 역할이다.

ReplaySubject는 그 기능을 가진 Subject이다.

즉 다시 말하자면 PublishSubject후에 .cache() 을 붙였다고 보면 된다.

//ReplaySubject 선언
io.reactivex.subjects.Subject<String> subject = ReplaySubject.create();

subject.subscribe( v -> System.out.println("Observer 1: " + v));

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

subject.subscribe( v -> System.out.println("Observer 2: " + v));
subject.subscribe( v -> System.out.println("Observer 3: " + v));
...........
Observer 1: Alpha
Observer 1: Beta
Observer 1: Gamma
Observer 2: Alpha <-- 처음부터 캐싱된 걸 보여준다.
Observer 2: Beta
Observer 2: Gamma
Observer 3: Alpha <-- 3번째도 캐싱된걸 보여주고 있다. 
Observer 3: Beta
Observer 3: Gamma


BehaviorSubject

주로 많이 쓰이는 subject중 하나이다.

기능은 최근 마지막껄 캐싱해서 보여주는 subject이다.

이전 시간에 배운 replay() , autoConnect() 을 기억하는가?

이 BehaviorSubject는 이 2가지 기능들을 통합해놓은 거라고 보면된다.

즉 PublishSubject후에 replay(1).autoConnect() 를 통합한 Subject 이다.

그럼 예제를 한번 보자.

//BehaviorSubject 선언
io.reactivex.subjects.Subject<String> subject = BehaviorSubject.create();

subject.subscribe( v -> System.out.println("Observer 1: " + v));

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");

subject.subscribe( v -> System.out.println("Observer 2: " + v));
subject.subscribe( v -> System.out.println("Observer 3: " + v));
...........
Observer 1: Alpha
Observer 1: Beta
Observer 1: Gamma
Observer 2: Gamma <-- 캐싱되서 1개만 보여주고 있다. 
Observer 3: Gamma <-- 캐싱되서 1개만 보여주고 있다.

만약 마지막 1개의 이벤트를 배출하고 싶다면 BehaviorSubject 을 사용하자.


'스터디 > RxJava2' 카테고리의 다른 글

[Rxjava2]AsyncSubject  (0) 2017.10.22
[RxJava2]ReplaySubject  (0) 2017.10.22
[Rxjava2]예제로 이해하는 PublishSubject  (0) 2017.10.22
[RxJava2] Replaying , Caching 을 알아보자.  (0) 2017.10.22
[RxJava2]refCount 와 share 기본 개념  (0) 2017.10.22

PublishSubject

통상 일반적인 subject 형태이다.

onNext() , onComplete() , onError() 를 호출해서 사용이 가능하다.

그럼 예로 보자.

io.reactivex.subjects.Subject<String> subject = PublishSubject.create();

subject.map(String::length) // 문자길이를 리턴하고 있다. 
        .subscribe(System.out::println);

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

...........
5
4
5

간단한 기본 형태라 어렵지는 않다.

onComplete() 를 하는 순간 더이상 Observer 가 없다고 알려줄 수 있다.

Subject를 언제 쓰면 유용할까?

나도 지금까지 많이 쓰면서 늘 이 질문을 했었다. 이럴때 과연 쓰는게 맞는 걸까?

우선 기본적인 사용처는 Multicasting상황에서 아주 유용하다. (예를 들어 이벤트 버스)

그리고 다른 방법으로 여러개의 Observable을 Merge할 때 매우 유용하다. ( Observable.merge() 대신)

그럼 예를 들어 보자.

//1초 간격으로 배출시킨다.
Observable<String> source1 = Observable.interval( 1, TimeUnit.SECONDS).map(v -> (v + 1) + " seconds");
// 0.3 초 간격으로 배출시킨다.
Observable<String> source2 = Observable.interval(300 , TimeUnit.MILLISECONDS).map(v -> (v + 1) + " milliseconds");

//선언
Subject<String> subject = PublishSubject.create();

//값 출력
subject.subscribe(System.out::println);

source1.subscribe( subject );
source2.subscribe( subject );

sleep(3000); //3초동안 유지

........
1 milliseconds <- source 1 실행
2 milliseconds
3 milliseconds
1 seconds  <- source 2 실행
4 milliseconds
5 milliseconds
6 milliseconds
2 seconds
7 milliseconds
8 milliseconds
9 milliseconds
10 milliseconds
3 seconds

보다 시피 2개의 Observer를 등록해서 머지 해서 사용할 수 있다. 이러한 방법이 주는 장점은 모듈화 및 캡슐화 되어 있는 모델방식에서 유용하다고 한다.

하지만 Subject 사용시 조심해야 할 부분이 있다.

onNext() , onComplete() 사용시 Observable + Observer 정의가 되어있다고 생각하고 호출해야 한다.

안그러면 생각했던 결과가 나오지 않는다.

이전 예제를 다시 보면

//선언
io.reactivex.subjects.Subject<String> subject = PublishSubject.create();

subject.map(String::length)
        .subscribe(System.out::println);

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

........................................
5
4
5

이러한 결과가 나오지만..

//선언
io.reactivex.subjects.Subject<String> subject = PublishSubject.create();

subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();

// 후에 등록시 문제가 발생된다. 
subject.map(String::length)
        .subscribe(System.out::println);
.........
출력안됨
.........

이런 경우 이전에 배운 publish() 또는 replay() 등으로 Hot Hot 하게 만들어서 Multicasting을 제어할수 있다.

그리고 여러번의 subject.onNext() 남발로 인해 배출들이 꼬일수 있다.

그럴 경우를 대비해서 직렬화라는 기능을 제공한다.

Subject<String> subject = PublishSubject.<String>create().toSerialized();


Replay와 Caching

멀티 캐스팅을 사용하면 여러 Observer 들 사이에서 공유되는 값을 캐시 할 수 있습니다.

우선 Replay를 보자.

이미 많은 예제사이트들을 보셨을 수도 있지만 이전에 배출한 값을 캐싱해서 가지고 있다가 새롭게 배출시 캐싱한 걸 같이 먼저 배출해준다.

예제를 보자.

Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
    .replay()
    .autoConnect();

//Observer 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observer 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observer 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기

.............
Observer 1 : 0
Observer 1 : 1
Observer 1 : 2
Observer 2: 0 <- 2번째 Observer 시작시 캐싱된 게 먼저 배출을 시작하고 있다.
Observer 2: 1
Observer 2: 2 
Observer 1 : 3
Observer 2: 3
Observer 1 : 4 
Observer 2: 4 
Observer 3: 0 <- 3번째 Observer 시작시 캐싱된 게 먼저 배출을 시작하고 있다.
Observer 3: 1
Observer 3: 2
Observer 3: 3
Observer 3: 4
Observer 1 : 5
Observer 2: 5
Observer 3: 5
Observer 1 : 6
Observer 2: 6
Observer 3: 6
Observer 1 : 7
Observer 2: 7
Observer 3: 7

위 내용을 보면 2,3번째 Observer가 배출시 1에서 먼저 배출되었던 3번의 값을 캐싱해서 같이 내보내고 있다.

하지만 캐싱작업으로 인해서 메모리 오버플로우 가 발생될 수 있으니 인자값을 통해서 버퍼수를 정해주는 방법이 좋다.

.replay(2)

.......
Observer 1 : 0
Observer 1 : 1
Observer 1 : 2
Observer 2: 1 <-- 처음 0은 제외된 걸 볼수 있다. 
Observer 2: 2
Observer 1 : 3
Observer 2: 3
Observer 1 : 4
Observer 2: 4
Observer 1 : 5
Observer 2: 5
Observer 3: 4 <-- 3부터는 캐싱작업이 되지 않는 다... Observer 2번 에서 캐싱하고 비워버림
Observer 3: 5
Observer 1 : 6
Observer 2: 6
Observer 3: 6
Observer 1 : 7
Observer 2: 7
Observer 3: 7
Observer 1 : 8
Observer 2: 8
Observer 3: 8

위에서 보다시피 2번째 Observer에서는 처음 0을 제외하고 1,2 가 배출되고 있는 걸 볼 수 있다.

그런데 3부터는 앞에 2번의 캐싱이 발생되지 않는다.. refCount(2) 라고 정의시 2번째에서 캐싱하고 비워버리고 있다.

그럼 이 걸 방지 하기 위해선 어떻게 해야 될까?

캐싱 시간을 지정해서 유지하도록 할 수 있다.

//0.3 초 간격으로 도는 걸 확인
Observable<Long> threeRandoms = Observable.interval(300,TimeUnit.MILLISECONDS)
        //출력값 계산
        .map(v -> (v + 1) * 300) // map to elapsed milliseconds
        //캐싱 시간을 지정
        .replay(2 , TimeUnit.SECONDS)
        .autoConnect();

//Observer 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observer 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observer 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기

..............
Observer 1 : 300
Observer 1 : 600
Observer 1 : 900
Observer 1 : 1200
Observer 1 : 1500
Observer 1 : 1800
Observer 1 : 2100
Observer 1 : 2400
Observer 1 : 2700
Observer 2: 1200 <- 1초동안 캐싱된 걸 보여주고 있다. 
Observer 2: 1500
Observer 2: 1800
Observer 2: 2100
Observer 2: 2400
Observer 2: 2700
Observer 1 : 3000
Observer 2: 3000
Observer 1 : 3300
Observer 2: 3300
Observer 1 : 3600
Observer 2: 3600
Observer 1 : 3900
Observer 2: 3900
Observer 1 : 4200
Observer 2: 4200
Observer 1 : 4500
Observer 2: 4500
Observer 1 : 4800
Observer 2: 4800
Observer 1 : 5100
Observer 2: 5100
Observer 1 : 5400
Observer 2: 5400
Observer 1 : 5700
Observer 2: 5700
Observer 3: 4200 <- 1초동안 캐싱된 걸 보여주고 있다. 
Observer 3: 4500
Observer 3: 4800
Observer 3: 5100
Observer 3: 5400
Observer 3: 5700
Observer 1 : 6000
Observer 2: 6000
Observer 3: 6000

위에서 보다 시피 시간단위로 캐싱된 걸 배출 시킬 수 있다.

하지만 내가 원한건 앞에 2개만 유지시키는 거였다. 그럼 다시 인자값을 변경해보자.

.replay(2 , 1, TimeUnit.SECONDS) // buffersize , time 

....................
Observer 1 : 300
Observer 1 : 600
Observer 1 : 900
Observer 1 : 1200
Observer 1 : 1500
Observer 1 : 1800
Observer 1 : 2100
Observer 1 : 2400
Observer 1 : 2700
Observer 1 : 3000
Observer 2: 2700 <- 앞에 2개만 캐싱된 걸 볼수 있다.
Observer 2: 3000
Observer 1 : 3300
Observer 2: 3300
Observer 1 : 3600
Observer 2: 3600
Observer 1 : 3900
Observer 2: 3900
Observer 1 : 4200
Observer 2: 4200
Observer 1 : 4500
Observer 2: 4500
Observer 1 : 4800
Observer 2: 4800
Observer 1 : 5100
Observer 2: 5100
Observer 1 : 5400
Observer 2: 5400
Observer 1 : 5700
Observer 2: 5700
Observer 3: 5400 <- 앞에 2개만 캐싱된 걸 볼 수 있다. 
Observer 3: 5700
Observer 1 : 6000
Observer 2: 6000
Observer 3: 6000
Observer 1 : 6300
Observer 2: 6300
Observer 3: 6300

이제 내가 원한 형태로 캐싱되서 볼수 있다.

정리를 하자면 ConnectableObservable에서 들어오는 Observer를 모두 배출 시키고 원하면 autoConnect를 사용하면 된다.

그리고 무한 캐싱작업을 원하면 cache()또는 publish().refcount() 을 이용하면 된다.

추가적으로 캐싱작업시 예상되는 요소 수를 미리 지정하고 버퍼를 최적화를 할 수 있습니다.

Observable<Integer> cachingTestObservable =
        Observable.just(6, 2, 5, 7, 1, 4, 9, 8, 3) 
            .scan(0, (total,next) -> total + next)
            .cacheWithInitialCapacity(9); <- 9개의 요소를 미리 지정해서 버퍼를 최적화 시킴

주의 사항으로 cache를 폐기 할 계획이 없으면 사용하지 말자. 메모리를 계속 잡고 있어서 문제가 생길수 있기 때문이다.

대안으로 replay() 으로 캐시 크기등을 지정해서 제어하는 방향을 모색하는 게 바람직하다.

refCount() 는 autoConnect(1) 와 유사하다.

그럼 autoConnect와 refCount와 차이점은 무엇일까?

차이점은 더이상 배출할 Observer가 없을시 refCount는 자동으로 자신을 해지를 하고 다시 새로운 Observer 이 오면 처음부터 자동으로 시작을 한다.

autoConnect는 동작이 끝나면 dispose 가 되지만 다시 새로운게 오면 시작을 하진 않는다.

Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
    .publish()
    .refCount();

//Observable 1 배출
threeRandoms.take(5).subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observable 2 배출
threeRandoms.take(2).subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observable 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기
....................
Observable 1 : 0
Observable 1 : 1
Observable 1 : 2
Observable 1 : 3
Observable 2: 3
Observable 1 : 4
Observable 2: 4 <-- 배출이 완료가 되고 dispose가 됐다. 
Observable 3: 0 <-- 새로운 Observer 왔기 때문에 시작된다. 
Observable 3: 1

그리고 publish().refCount() 을 줄여서 share() 라고 사용가능하다.

Observable.interval(1,TimeUnit.SECONDS).share(); // publish().refCount() 와 같은 의미


앞에서 본 예제처럼 connect() 를 통해 동시 실행할 수 있다.

하지만 자동으로 connect를 할수도 있다.

주의 할 내용은 자동 연결(Auto connection) 은 사용할 때 상당한 주의를 요한다. 뜻하지 않게 배출시 제대로 되지 않는 경우가 발생될 수 있기 때문이다.

그럼 예제를 살펴보자 .

이전장에서 했던 2가지 서로 다른 배출 방법에 자동 연결만 추가한 부분이다.

Observable<Integer> threeRandoms = Observable.range(1,3)
    .map(i->randomInt())
    .publish()
    .autoConnect(2); //2번에 대해서 자동 연결을 하겠다는 뜻이다.!!


//Observable 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

//Observable 2 배출
threeRandoms.reduce( 0 ,(total , next) -> total + next)
.subscribe( i -> System.out.println("Observable 2: " + i));

//Observable 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

...........
Observable 1 : 44005
Observable 1 : 633061
Observable 1 : 464828
Observable 2: 1141894

예제에서 보면 2번만 연결하겠다고 선언했다.(기본값은 1이다.).

그래서 3번째 Observer무시되고 있다.

autoConnect() 을 사용시 중간에 들어오는 Observable 도 연결이 가능하다.

위에 말은 무슨 뜻일까?

예를 들어 Observer1개를 자동연결하는 과정에서 시간흐름에 따른(IntervalObserver배출이 달라질 수 있다는 것이다.

말로 설명하니 어려우니 우선 예제를 보자.

Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
                .publish()
                .autoConnect(); //기본값 1

//Observable 1 배출
threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observable 2 배출
threeRandoms.subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observable 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기

................
Observable 1 : 0
Observable 1 : 1
Observable 1 : 2
Observable 1 : 3
Observable 2: 3 <- 추가적으로 연결된다. 
Observable 1 : 4 <- 다른 Observable 이 연결되더라도 이전 껀 사라지지 않는다. 
Observable 2: 4
Observable 1 : 5
Observable 2: 5
Observable 1 : 6
Observable 2: 6
Observable 3: 6
Observable 1 : 7
Observable 2: 7
Observable 3: 7 <- 다시 새로운 Observable 이 연결된다..
Observable 1 : 8
Observable 2: 8
Observable 3: 8

위 예제를 보면 새로운 Observer 이 연결되더라도 이전 배출도 계속 이루어지는 걸 볼수 있다.

이 기능은 어떠한 Observer 를 기다리지 않고 즉시 배출들을 시작할 수 있다는 장점이 있다.

그럼 refCount() 와 share() 기능에 대해서 다음장에서 살펴보자.

기본적인 Observable 생성 후 배출은 다음과 같습니다.

Observable<Integer> threeIntegers = Observable.range(1 , 3);
threeIntegers.subscribe( i -> System.out.println("Observer 1 : " + i));
threeIntegers.subscribe( i -> System.out.println("Observer 2 : " + i));
--------------------------
Observer 1 : 1
Observer 1 : 2
Observer 1 : 3
Observer 2 : 1
Observer 2 : 2
Observer 2 : 3

보다시피 Observable 1 이 끝나고Observable 2가 다시 실행되는 구조로 되어있다.

동시에 배출 하고 싶을땐 어떻게 할까.

그럴때 ConnectableObservable 을 사용할 수 있습니다.

사용법은 publish -> connect 로 연결해서 배출을 동시에 시킬수 있다 .

그럼 기본적인 사용예를 보자.

ConnectableObservable<Integer> threeIntegers = Observable.range(1 , 3).publish();
threeIntegers.subscribe( i -> System.out.println("Observer 1 : " + i));
threeIntegers.subscribe( i -> System.out.println("Observer 2 : " + i));

//여기까지 아무런 배출이 이루어지지 않는다.
threeIntegers.connect(); //이제 동시에 배출을 시킨다.

-----------------
Observer 1 : 1
Observer 2 : 1
Observer 1 : 2
Observer 2 : 2
Observer 1 : 3
Observer 2 : 3

조금 더 깊이 들어가보자.

다시 일반적인cold Observable 을 보자

public static void main(String[] args){

    Observable<Integer> threeRandoms = Observable.range(1 , 3)
                                        .map( i-> randomInt());

    threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));
    threeRandoms.subscribe( i -> System.out.println("Observer 2 : " + i));

}

public static int randomInt(){
    return ThreadLocalRandom.current().nextInt(100000);
}
---------------------------------
Observer 1 : 25134
Observer 1 : 66916
Observer 1 : 81
Observer 2 : 54593
Observer 2 : 95417
Observer 2 : 25863

생각했던 기본 형태인 첫번째 Observer 배출이 다 되고 두번째가 실행된다.

그럼 이 구조의 형태는 다음과 같다고 본다.

각각의 나누어진 형태로 결과를 배출 하고 있다. 어려운게 없다.

자 그럼 publish 사용시 어떻게 바뀌는 지 보자.

ConnectableObservable<Integer> threeInts = Observable.range(1, 3).publish(); //publish 붙는 위치가 중요하다. 

Observable<Integer> threeRandoms = threeInts.map( i-> randomInt());

threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));
threeRandoms.subscribe( i -> System.out.println("Observer 2 : " + i));

threeInts.connect();
---------------------------------------------------
Observer 1 : 12581
Observer 2 : 65620
Observer 1 : 85896
Observer 2 : 43633
Observer 1 : 77427
Observer 2 : 82409

이런식으로 이루어질텐데.. 문제가 뭘까?

결과가 보면 Observable 1과 Observable 2가 전혀 값이 똑같지 않다.

이유는 map에서 랜덤 숫자를 생성하는데 그 전에 publish을 호출했기 때문이다.

그러면 위치를 바꿔보자.

//publish 붙는 위치가 중요하다.

ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3).map( i-> randomInt()).publish(); 

threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));
threeRandoms.subscribe( i -> System.out.println("Observer 2 : " + i));

threeRandoms.connect();
--------------------------
Observer 1 : 74650
Observer 2 : 74650
Observer 1 : 33655
Observer 2 : 33655
Observer 1 : 99136
Observer 2 : 99136

값이 동일하게 이루어진 걸 볼수 있다.

그럼 흐름은 다음과 같다.

그림으로 보니 쉽게 이해가 된다.

그럼 언제 Multicast을 해야 할까?

일반적으로 중복되는 형태 , 재활용되는 Observable 형태를 사용시 자주 쓰인다.

그럼 재활용해서 다시 가공하는 예제를 보자.

ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3).map( i-> randomInt()).publish(); //publish 붙는 위치가 중요하다.

//첫번째 배출 시작
threeRandoms.subscribe( i -> System.out.println("Observer 1 : " + i));

//두번째는 재활용해서 다른 형태로 가공
//랜덤으로 들어오는 숫자들을 합해서(누진계) 마지막에 한번 배출 한다. 
threeRandoms.reduce(0,(total , next) -> total + next).subscribe( i -> System.out.println("Observer 2 : " + i));

threeRandoms.connect();
--------------------------
Observer 1 : 82613
Observer 1 : 96148
Observer 1 : 27865
Observer 2 : 206626 <- 한번 배출된걸 보면 된다.

그림 형태는 다음과 같을것이다.

일반적으로 Observable들은 cold상태인것들이다.

RxJava를 하다보면 많이 듣는 게 cold와 hot Observable이다.

도대체 이것들이 뭔가..

Cold Observable

  • 일반적인 옵저버형태를 말한다.
  • 누가 구독해주지 않으면 데이터를 배출을 하지 않는다.
  • 일반적인 웹 요청 , 데이터베이스 쿼리등이 사용되며 내가 요청하면 결과를 받는 과정을 거친다.
  • 처음부터 발행하는 걸 기본으로 한다.

Hot Observable

  • 구독자의 존재 여부와 상관없이 데이터를 배출하는 Observable 이다.
  • 그래서 여러 구독자에 선택적으로 고려가능하다.
  • 구독시점으로부터 발행하는 값을 받는 걸 기본으로 한다.
  • 마우스 이벤트 , 키보드 이벤트 , 시스템 이벤트등이 주로 사용된다.
  • 멀티캐스팅도 포함된다.

데이터 발행자와 수신자도 이참에 정리해보자.

데이터 발행자데이터 수신자
ObservableSubscriber
SingleObserver
MaybeConsumer
Subject
Completable
  • Subscriber (구독자) : Observable 을 연결할 시 subscribe 함수를 사용하는데 이 과정에서 구독이 된다고 하여 구독자라고 부른다.
  • Observer (옵저버) : 스타의 옵저버가 아니다. RxJava는 옵저버 패턴을 사용해서 늘 관찰하고 작업을 수행합니다. 발신자를 Observable 이 되고 데이터 수신자를 옵저버라고 한다.
  • Consumer : Rxjava2 부터 소비자 패턴을 이용해서 소비를 시킨다. 이 패턴에 대해서 더 자세히 알아볼려면 클릭

일반덕으로 Observable 은 Code 성격이며 특별히 스트림을 Hot으로 변환하지 않으면 Code 상태이다

+ Recent posts