'링크모음 > Android' 카테고리의 다른 글

android 매트리얼 ui builder  (0) 2017.11.06
android 저장소 전략  (0) 2017.11.04
Android code style  (0) 2017.10.30
Android RecyclerView 팁 모음  (0) 2017.10.30
정보통신망법-준수를-위한-앱-푸시-광고-가이드라인  (0) 2017.10.25

공식 가이드

https://source.android.com/source/code-style#follow-field-naming-conventions


참고 가이드

http://givenjazz.tistory.com/44


Square code style ( java  ,  Android)

https://github.com/square/java-code-styles

'링크모음 > etc' 카테고리의 다른 글

5일만에 기획부터 프로토타입까지  (0) 2017.11.05
기기별 목업  (0) 2017.11.05
파이썬 기초  (0) 2017.10.28
아마존 직구방법 총정리  (0) 2017.10.21
데뷰 2017 자료 모음  (0) 2017.10.20

https://m.blog.naver.com/PostView.nhn?blogId=edy5016&logNo=221126776190&proxyReferer=http://m.facebook.com

'링크모음 > etc' 카테고리의 다른 글

기기별 목업  (0) 2017.11.05
개발자 치트시트 추천  (0) 2017.10.30
아마존 직구방법 총정리  (0) 2017.10.21
데뷰 2017 자료 모음  (0) 2017.10.20
파일별 시그니처 확인법  (0) 2017.09.05

http://developer88.tistory.com/m/138

출처 : http://yookeun.github.io/mac/2014/11/16/mac-mariadb/

맥에서 homebrew를 이용해서 mariadb를 설치해본다.

환경:OX Yosemite
먼저 <homebrew>를 설치해야 한다. homebrew는 우분투의 apt-get 같은 패키지인스톨 프로그램이라고 생각하면 된다.

위 사이트에서 설치를 진행한다. 설치가 완료된 다음 터미널에서 brew 를 치면 명령어의 설명이 나오면 정상으로 설치된 것이다.

먼저 brew를 업데이트해준다. (최신의 정보로 업데이트된다)

brew update

다음 maridb를 검색해보자 (꼭 할 필요는 없다)

brew search mariadb

검색하면 mariadb가 검색된다. brew에서는 기본적으로 최신버전을 설치하게 된다. 2014.11월 기준으로 mariadb의 최신버전은 10.0.14이다.

brew info mariadb (역시 꼭 할필요는 없다)

mariadb: stable 10.0.14 (bottled)

만약 brew install mariadb로 인스톨하면 위와 같이 stable버전이 설치된다. 자. 이제 설치를 해보자.

brew install mariadb
==> Downloading https://downloads.sf.net/project/machomebrew/Bottles/mariadb-10.0.14_1.yosemite.bottle.tar.gz
... 중략 ...
==> Summary

/usr/local/Cellar/mariadb/10.0.14_1: 524 files, 125M

그러면 위와 같이 설치가 진행이 된다. 이제 세팅을 해준다.

unset TM PDIR

설치된 경로로 이동하자. 10.0.14_1버전이 설치되어 있다. (탭키로 치면 경로나옴)

cd /usr/local/Cellar/mariadb/10.0.14_1/

mariadb의 DB를 설치한다.

mysql_install_db

그러면 아래와 같이 설치가 진행이 된다.

Installing MariaDB/MySQL system tables in '/usr/local/var/mysql' ...
16 18:09:08 [Note] InnoDB: Mutexes and rw_locks use GCC atomic builtins
user created by default.  
You can start the MariaDB daemon with:

..중략...

Support MariaDB development by buying support/new features from
SkySQL Ab. You can contact us about this at sales@skysql.com.
Alternatively consider joining our community based development effort:
http://mariadb.com/kb/en/contributing-to-the-mariadb-project/

자. 이제 mariadb를 가동시켜본다.

mysql.server start
...
Starting MySQL

. SUCCESS!

SUCCESS가 나오면 성공이다.


'링크모음 > Swift' 카테고리의 다른 글

swift codable  (0) 2017.11.06
swift4 tour  (0) 2017.10.22
노티피케이션 사용법  (0) 2017.10.20
swift 에서 self 값 체크  (0) 2017.10.18
pod update/install 시 라이버러리를 전부 강제 3.2로 세팅하기  (0) 2017.10.16

http://wlaxhrl.tistory.com/m/70

'링크모음 > Swift' 카테고리의 다른 글

swift codable  (0) 2017.11.06
swift defer 예제  (0) 2017.10.22
노티피케이션 사용법  (0) 2017.10.20
swift 에서 self 값 체크  (0) 2017.10.18
pod update/install 시 라이버러리를 전부 강제 3.2로 세팅하기  (0) 2017.10.16

 


쓰레드가 이어져서 나오는 걸 직렬화라고 한다.

예를 보자.

public static void main(String[] args) {
    Observable.range(1 , 10)
            .map(i -> intenseCalculation(i))
            .subscribe(i->System.out.println("Received " + i + " "  + LocalTime.now()));
}

public static <T> T intenseCalculation(T value) {
    sleep(ThreadLocalRandom.current().nextInt(3000));
    return value;
}

public static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
..........
Received 1 19:14:42.414
Received 2 19:14:44.454
Received 3 19:14:44.727
Received 4 19:14:47.028
Received 5 19:14:49.174
Received 6 19:14:49.207
Received 7 19:14:51.057
Received 8 19:14:53.875
Received 9 19:14:56.381
Received 10 19:14:57.741

약 15초가 걸렸습니다. 만약 평행하게(병렬화) 처리를 하고 싶은 경우에는 어떻게 처리를 해야 할까?

Observable.range(1,10)
       .flatMap(i -> Observable.just(i)
               .subscribeOn(Schedulers.computation())
               .map(i2 -> intenseCalculation(i2))
       )
       .subscribe(i -> System.out.println("Received " + i + " "
               + LocalTime.now() + " on thread "
               + Thread.currentThread().getName()));

sleep(20000);
.........
Received 1 19:28:11.163 on thread RxComputationThreadPool-1
Received 7 19:28:11.381 on thread RxComputationThreadPool-7
Received 9 19:28:11.534 on thread RxComputationThreadPool-1
Received 6 19:28:11.603 on thread RxComputationThreadPool-6
Received 8 19:28:11.629 on thread RxComputationThreadPool-8
Received 3 19:28:12.214 on thread RxComputationThreadPool-3
Received 4 19:28:12.961 on thread RxComputationThreadPool-4
Received 5 19:28:13.274 on thread RxComputationThreadPool-5
Received 2 19:28:13.374 on thread RxComputationThreadPool-2
Received 10 19:28:14.335 on thread RxComputationThreadPool-2

약 3초 정도 걸렸습니다. 이렇게 내부적으로 사용해서 병렬형태로 하기를 바란다.

RxJava 2부터 Disposable 형태로 변경됐다.

일반적인 형태로 많이 쓰는건 consumer 형태이다.

예를 보자.

Observable<Integer> testObservable = Observable.range(1 , 10);
Disposable disposable1 = testObservable
    .doOnDispose(() -> System.out.println("onDispse 1"))
    .doOnComplete(() -> System.out.println("onCompleted 1"))
    .subscribe(item -> System.out.println("emitted 1 -> " + item)); //consumer 로 진행

System.out.println( "isDisposed -> " + disposable1.isDisposed());
..........
emitted 1 -> 1
emitted 1 -> 2
emitted 1 -> 3
emitted 1 -> 4
emitted 1 -> 5
emitted 1 -> 6
emitted 1 -> 7
emitted 1 -> 8
emitted 1 -> 9
emitted 1 -> 10
onCompleted 1
isDisposed -> true <- 이미 dispose가 되었다.

consumer 사용시 doOnDispose 이벤트가 안 타지만 이미 dispose가 된걸 볼 수 있다.

하지만 이런 경우를 보자.

public static DisposableObserver<Integer> getDispose(int pos){
        DisposableObserver<Integer> disposableObserver = new DisposableObserver<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("emitted " + pos + " -> " + item);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        return disposableObserver;
    }
testObservable
    .doOnDispose(() -> System.out.println("onDispse 2"))
    .doOnComplete(() -> System.out.println("onCompleted 2"))
    .subscribe(getDispose(2));

disposableObser를 제공시 void 형태라서 메모리릭이 걸릴수 있다.

그럴때

.subscribe(getDispose(3));
  1. CompositeDispoable 을 호출해서 destory 시 clear를 해준다.
  2. .dispose() 를 해서 제거를 해준다.
Disposable disposable3 = testObservable
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onDispse 3");
            }
        })
        .doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onCompleted 3");
            }
        })
        .subscribeWith(getDispose(3));

compositeDisposable.add(disposable3);
compositeDisposable.clear(); // or disposable3.dispose();


추가적으로

Observable 와 Observer를 연결을 하는 경우에 만약 complete가 발생안되는 경우에는 계속 Observer는 배출 되기를 기다릴 수 있습니다. 이럴때 Dispose를 통해서 자원을 해제를 해주셔야 합니다. 다행히 completed가 발생하면 자원이 해제됩니다.

package io.reactivex.disposables;

    public interface Disposable {
      void dispose();
      boolean isDisposed();
    }

		import io.reactivex.Observable;
    import io.reactivex.disposables.Disposable;
    import java.util.concurrent.TimeUnit;

    public class Launcher {

      public static void main(String[] args) {

        Observable<Long> seconds =
          Observable.interval(1, TimeUnit.SECONDS);

        Disposable disposable =
          seconds.subscribe(l -> System.out.println("Received: " + l));

        //sleep 5 seconds
        sleep(5000);

        //dispose and stop emissions
        disposable.dispose();

        //sleep 5 seconds to prove
        //there are no more emissions
        sleep(5000);

      }

      public static void sleep(int millis) {
        try {
          Thread.sleep(millis);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
       }
     }

5초동안 interval로 실행했지만 추후 dispose 된 Observable는 더이상 배출이 안되는 걸 볼 수 있습니다. 기본적으로 disposable은 내부적으로 구독과 동시에 생성이 되서 어디에서든 사용해서 해제를 할 수 있도록 합니다.

Observer<Integer> myObserver = new Observer<Integer>() {
      private Disposable disposable;

      @Override
      public void onSubscribe(Disposable disposable) {
        this.disposable = disposable;
      }

      @Override
      public void onNext(Integer value) {
        //has access to Disposable
      }

      @Override
      public void onError(Throwable e) {
        //has access to Disposable
      }

      @Override
      public void onComplete() {
        //has access to Disposable
       }
     };

만약 이 Observer을 좀 더 확장을 원한다면 ResourceObserver를 만들어서 사용이 가능합니다. 확장된 observer는 subscribeWith로 통해서 구독을 할 수 있습니다.

import io.reactivex.Observable;
    import io.reactivex.disposables.Disposable;
    import io.reactivex.observers.ResourceObserver;
    import java.util.concurrent.TimeUnit;

    public class Launcher {
      public static void main(String[] args) {

        Observable<Long> source =
        Observable.interval(1, TimeUnit.SECONDS);

         ResourceObserver<Long> myObserver = new  
         ResourceObserver<Long>() {
          @Override
          public void onNext(Long value) {
            System.out.println(value);
          }

          @Override
          public void onError(Throwable e) {
            e.printStackTrace();
          }

          @Override
          public void onComplete() {
            System.out.println("Done!");
          }
        };

        //capture Disposable
        Disposable disposable = source.subscribeWith(myObserver);
       }
      }

Debugging 하는 방법

RxJava 사용시 디버깅 라이버러리는

frodo 를 사용하면 좋긴 한데 ( RxJava 1지원..ㅠ)

https://github.com/android10/frodo.git

일반적인 테스트 방법을 소개한다.

.doOnNext()를 활용하면 된다.

예를 들어 다음과 같은 내용이 있다고 가정하자.

주어진 3개의 내용에 알파벳을 뽑아내는 경우이다.

TestObserver<String> testObserver = new TestObserver<>();

Observable<String> items =
        Observable.just("521934/2342/Foxtrot",
                "Bravo/12112/78886/Tango",
                "283242/4542/Whiskey/2348562");

items.concatMap( s -> Observable.fromArray( s.split("/")))
     .filter(s -> s.matches("[A-Z]+"))
     .subscribe(testObserver);

System.out.println(testObserver.values());

testObserver.assertValues( "Foxtrot","Bravo","Tango","Whiskey" );

결과는 실패이고 내용은 다음과 같다.

Value count differs; Expected: 4 [Foxtrot, Bravo, Tango, Whiskey], Actual: 0 [] (latch = 0, values = 0, errors = 0, completions = 1)

예상 결과는 4개이지만 실제 0개이다.

그럼 디버깅을 시작해보자

우선 concatMap으로 제대로 보내는 지 확인해보자

.doOnNext( s-> System.out.println("Source pushed : " + s) )
.concatMap( s -> Observable.fromArray( s.split("/")))

doOnNext를 입력시

Source pushed : 521934/2342/Foxtrot
Source pushed : Bravo/12112/78886/Tango
Source pushed : 283242/4542/Whiskey/2348562

값이 제대로 전달되는 것으로 보인다..

그럼 다음껄 확인해보자

.concatMap( s -> Observable.fromArray( s.split("/")))
.doOnNext( s-> System.out.println("Source pushed : " + s) )

Result ================================================================

Source pushed : 521934
Source pushed : 2342
Source pushed : Foxtrot
Source pushed : Bravo
Source pushed : 12112
Source pushed : 78886
Source pushed : Tango
Source pushed : 283242
Source pushed : 4542
Source pushed : Whiskey
Source pushed : 2348562

정상적으로 나오는것 같다..그럼 다음껄..

.filter(s -> s.matches("[A-Z]+"))
.doOnNext( s-> System.out.println("Source pushed : " + s) )

Result ================================================================

값이 비어있다...여기가 문제인가보다. 그럼 수정 후 다시 해보자

.filter(s -> s.matches("[A-Za-z]+"))
.doOnNext( s-> System.out.println("Source pushed : " + s) )

Result ================================================================

Source pushed : Foxtrot
Source pushed : Bravo
Source pushed : Tango
Source pushed : Whiskey

이제 작동이 잘 된다..

최종 소스는 다음과 같다.

TestObserver<String> testObserver = new TestObserver<>();

Observable<String> items =
        Observable.just("521934/2342/Foxtrot",
                "Bravo/12112/78886/Tango",
                "283242/4542/Whiskey/2348562");

items
     .concatMap( s -> Observable.fromArray( s.split("/")))
     .filter(s -> s.matches("[A-Za-z]+"))
     .subscribe(testObserver);

testObserver.assertValues( "Foxtrot","Bravo","Tango","Whiskey" );

doOnNext 말고도 doOnError() , doOnComplete() , doOnSubscribe() 등이 있으니 활용하면 빠른 디버깅이 가능하다.

이상으로 디버깅을 마친다.

TestScheduler

TestObserver(or TestSubscriber)을 이용시await를 통해서 기다릴수 있지만 그러기엔 시간이 많이 걸릴수 있다.

그래서 타임머신기능으로 시간을 미리 땡겨서 테스트가 가능하다.

예를 들어 interval(10) 으로 10초뒤에 값을 체크를 하는 경우 10초를 기다리지 않고 10초 뒤로 시간 설정 후 바로 테스트 하게끔 하는 것이다.

이 부분에 대해서 좀 더 공부가 필요해보인다.

TestScheduler testScheduler = new TestScheduler();

TestObserver<Long> testObserver = new TestObserver<>();

Observable<Long> minTicker = Observable.interval(1, TimeUnit.MINUTES , testScheduler);

minTicker.subscribe(testObserver);

//30초 뒤로 이동
testScheduler.advanceTimeBy(30 , TimeUnit.SECONDS);

//앞으로 이동했기 때문에 아직 배출이 안됨
testObserver.assertValueCount(0);

//구독 후 뒤로 70초 이동..
testScheduler.advanceTimeTo(70 , TimeUnit.SECONDS);

//1분 이동했으니 1개가 나온다. 
testObserver.assertValueCount(1);

System.out.println("#1 current time = " + testScheduler.now(TimeUnit.SECONDS));

//구독후 90분 뒤로 감
testScheduler.advanceTimeTo(90, TimeUnit.MINUTES);

System.out.println("#2 current time = " + testScheduler.now(TimeUnit.SECONDS));

//결과값 : 90개가 나온다.
testObserver.assertValueCount(90);

now() 는 얼마나 시간을 앞당겼는지 체크를 할수 있다.

triggerAction() 이 무엇인지도 한번 찾아보자.

TriggerAction()

공식 설명은 이 스케줄러의 현재 시간 또는 그 이전에 아직 트리거되지 않았거나 트리거되도록 예정된 모든 작업을 트리거합니다.

Triggers any actions that have not yet been triggered and that are scheduled to be triggered at or before this Scheduler's present time

TestScheduler s = new TestScheduler();

s.createWorker().schedule(
        () -> System.out.println("Immediate"));
s.createWorker().schedule(
        () -> System.out.println("20s"),
        20, TimeUnit.SECONDS);

s.triggerActions();
System.out.println("Virtual time: " + s.now(TimeUnit.SECONDS));

결과값

Immediate
Virtual time: 0


TestObserver , TestSubscriber

이전 내용까지는 Blocking이 주된 내용이지만 이것만으로는 한계가 있다. 그래서 나온게 TestObserver 이다.

TestObserver = Observable , Single , Maybe , Completable 에서 사용된다.

TestSubscriber =Flowable 에서 사용된다.

//우선 옵저버 생성 (5초동안 아이템을 배출)
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS).take(5);

TestObserver<Long> testObserver = new TestObserver();

//아직 구독이 안된건지 체크
testObserver.assertNotSubscribed();

//이제 구독함
source.subscribe(testObserver);

//구독이 되었는지 확인
testObserver.assertSubscribed();

//종료(Terminate) 될 때까지 기다린다.
testObserver.awaitTerminalEvent();

//onCompleted가 과연 호출되었는지?
testObserver.assertComplete();

//에러가 없나?
testObserver.assertNoErrors();

//5개의 아이템을 배출했나?
testObserver.assertValueCount(5);

//5개의 예상되는 값 확인
testObserver.assertValues( 0L , 1L , 2L , 3L , 4L );

하지만 문제는 awaitTerminalEvent()를 통해 5초동안 기다려야 한다. 만약 시간이 길다면 어떻게 해야할까?

무작정 기다리면서 하는 건 테스트가 아니다..우린 빨리 결과를 받아야 한다. 그래서 나온게 타임머신 기능이 나왔다.

다음장에서 계속한다.

BlockingFirst() , BlockingSingle()

이름 그대로 처음꺼만 가져오는 함수를 말한다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");

String firstItem = source.filter( s -> s.length() == 5).blockingFirst();
String singleItem = source.filter( s -> s.length() == 4).take(1).blockingSingle();

assertTrue( firstItem.equals("Alpha"));
assertTrue( singleItem.equals("Beta"));

결과 : Success

BlocingSingle()은 결과값이 하나로 나와야 정상작동되는 점을 유의해야 한다. (그래서 take(1) 로 지정)

BlockingGet()

Maybe , Single 의 경우 blockingFirst()를 가지고 있지 않다. 하나 또는 없음을 배출할테니..

이럴때 BlockingGet() 을 사용해서 값을 가져오면 된다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");

//리스트를 만들어서 하나로 만들어줍니다.
List<String> sourceList = source.filter(s -> s.length() == 4).toList().blockingGet();

assertTrue( sourceList.equals( Arrays.asList( "Beta" , "Zeta")));

BlockingLast()

Observable , Flowable 에서 마지막 값을 리턴시 사용한다.

주의할점은 onComplete() 가 실행전에는 값을 리턴을 하지 않는다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");
String lastItemFromSource = source.filter(s -> s.length() == 4).blockingLast();

assertTrue(lastItemFromSource.equals("Zeta"));

주의점

BlockingFirst() , BlockingLast()사용시 아이템이 없으면 no emissions 예외 오류가 나니 기본값 설정을 해주는게 좋다.

만약 리스트 형태로 받고 싶을때 어떻게 할까?

그럴때 BlockingIterable() 사용하면 된다.

BlockingIterable()

값을 리스트형태로 받아준다. onComplete() 된 시점의 값들을 받아온다. 배압(Backpressure)이 없기 때문에 OutOfMemoryException이 발생될 수 있기 때문에 주의 해서 사용해야 한다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");
Iterable<String> sourceIterable = source.filter( s -> s.length() == 5).blockingIterable();

for(String s:sourceIterable) {
    assertTrue( s.length() == 5); // 5자리가 맞는지 루프 돌면서 체크
}

위와 같은 형태의 문제점은 아이템을 다 배출하고 테스트하는데 있다.

만약 아이템을 하나씩 받을때 직접 테스트 하고 싶은 경우 어떻게 해야 할까?

그런 경우 BlockingForEach() 을 사용하면 된다.

BlockingForEach()

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");

//하나씩 배출하면서 검사를 한다. 
source.filter(s->s.length() == 5).blockingForEach(s -> assertTrue(s.length() == 5));

BlockingNext() <- 좀 더 이해가 필요함

next된 것만 가져와서 테스트를 진행하는 것 같음..공부가 좀 더 필요함..

Observable<Long> source = Observable.interval(1 , TimeUnit.MICROSECONDS).take(1000);

Iterable<Long> iterable = source.blockingNext();

for(Long i : iterable) {
    System.out.println(i);
}

결과값

0
5
11
15
18
20
23
27
31
35
39
44
48
52
58
62
66
69
72
76
80
83
86
90
94
97
100
...

BlockingLastest() <- 좀 더 이해가 필요함

캡쳐되지 않는 값은 잊어버리고 ... 좀더 공부를 해야겠다.

Observable<Long> source = Observable.interval(1 , TimeUnit.MICROSECONDS).take(1000);

Iterable<Long> iterable = source.blockingLatest();

for(Long i : iterable){
    System.out.println(i);
}

출력값은

0
127
135
140
144
147
151
154
157
161
164
168
170
172
175
177
179
181
183
185
187
189
191
193
...

BlockingMostRecent()

TheblockingMostRecent()is similar toblockingLatest(), but it will re-consume the latest value repeatedly for everynext()

call from the iterator even if it was consumed already. It also requires adefaultValueargument so it has something to return if no value is emitted yet. Here, we useblockingMostRecent()against an Observableemitting every 10 milliseconds. The default value is-1, and it consumes each value repeatedly until the next value is provided:

Observable<Long> source = Observable.interval(10 , TimeUnit.MILLISECONDS).take(5);

//기본값이 필요하다.
Iterable<Long> iterable = source.blockingMostRecent( -1L );

System.out.println( "iterable" + iterable );

for(Long i : iterable) {
    System.out.println(i);
}

결과값

-1
-1
-1
...
0
0
0
...
1
1
1
...


일반적으로 Observable 을 테스트시 Blocking을 사용한다.

먼저 일반 코드를 작성하면 다음과 같을겁니다.

AtomicInteger hitcount = new AtomicInteger();
Observable<Long> source = io.reactivex.Observable.interval( 1 , TimeUnit.SECONDS).take(5);
source.subscribe( i -> hitcount.incrementAndGet());

assertTrue( hitcount.get() == 5);

여기에서 5개를 가져오는 일반적인 코드이다. 결과는 실패이다.

이유는 interval 함수를 보면 thread가 computation 이다. 그래서 가져올수 없다.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, TimeUnit unit) {
    return interval(period, period, unit, Schedulers.computation());
}

그럴때 방법이 2가지가 있지만 우선 Blocking해서 값을 가져와서 테스트 해보는 방법을 해보자.

AtomicInteger hitcount = new AtomicInteger();
Observable<Long> source = io.reactivex.Observable.interval( 1 , TimeUnit.SECONDS).take(5);
source.blockingSubscribe( i -> hitcount.incrementAndGet());

assertTrue( hitcount.get() == 5);

결과는 성공

위 코드에서 subscribe -> blockingSubscribe을 함으로써 값을 제대로 가져올수 있다.

쓰레드 상관없이 값을 가져오고 싶을땐 BlockingSubscribe을 써서 테스트가 가능하다.

주의사항으로

절대 테스트말고 다른 곳에선 blocking을 사용하지 말자. 해보면 이유를 바로 알수있다.

실제 연산자를 사용자가 구현이 가능하다.

관련해서 자세한 내용은

영문 : http://reactivex.io/documentation/implement-operator.html

한글 : http://reactivex.io/documentation/ko/implement-operator.html

일단 구현해보자.

아래 doOnEmpty는 빈값이 들어올때 함수가 실행되는 단순한 연산자이다.

public static <T> ObservableOperator<T,T> doOnEmpty(Action action) {
    return observer -> new DisposableObserver<T>() {
        boolean isEmpty = true;
        @Override
        public void onNext(T value) {
            isEmpty = false;
            System.out.println("doOnEmpty onNext");
            observer.onNext(value);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("doOnEmpty onError");
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            if(isEmpty){
                try {
                    System.out.println("doOnEmpty run");
                    action.run();
                }catch (Exception ex){
                    ex.printStackTrace();
                    onError(ex);
                    return;
                }
            }

            observer.onComplete();
        }
};

사용방법은

  1. 1~5부터 돌면서 빈값인지 체크 해서 Operation 1 : { 숫자 } 를 찍어주는 형태
  2. 빈값이 들어올때 동작되는 걸 체크 하는 형태
Observable.range( 1, 5)
        .lift(doOnEmpty(() ->  System.out.println("Operation 1 empty!")))
        .doOnNext( v -> System.out.println("Operation 1 : " + v))
        .test();

Observable.<Integer>empty()
        .lift(doOnEmpty(() -> System.out.println("Operation 2 Empty!")))
        .doOnNext( v -> System.out.println("Operation 2 : " + v))
        .test();

==========================================================================
doOnEmpty onNext
Operation 1 : 1
doOnEmpty onNext
Operation 1 : 2
doOnEmpty onNext
Operation 1 : 3
doOnEmpty onNext
Operation 1 : 4
doOnEmpty onNext
Operation 1 : 5
doOnEmpty run
Operation 2 Empty!

Operation 2 Empty! 가 찍혔는지가 중요하다.

마지막으로 나눠서 들어오는 경우 그걸 리스트로 변환해주는 연산자를 만들어보자

연산자 설명 : 들어온 값을 바로 배출(Push) 안하고 마지막에 한번에 배출하는 연산자..(toList() 와 유사하다.)

public static <T> ObservableOperator<List<T> , T> myToList(){
    return observer -> new DisposableObserver<T>() {
        ArrayList<T> list = new ArrayList<>();

        @Override
        public void onNext(T t) {
            //리스트에 추가만 하고 다운스트림(onNext) 내보내지 않는다.
            list.add(t);
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            //다운스트림으로 한번에 배출한다.
            observer.onNext(list);
            observer.onComplete();
        }
    };
}

이 연산자를 이제 사용해보자.

Observable<IndexedValue<String>> indexedValueObservable =
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(withIndex());

indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 1 : " + v))
        .test();
indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 2 : " + v))
        .test();

=========================================================================
Operation 1 onNext : [1, 2, 3, 4, 5]
Operation 2 onNext : []

값이 모아서 한번에 배출되고 있다.

만약 Observable이 아니라 Flowable로 변경시 FlowableOperator 로 변경 하면 된다.

public static <T> FlowableOperator<T,T> doOnEmpty(Action action) {
     return subscriber -> new DisposableSubscriber<T>() {
         boolean isEmpty = true;

         @Override
         public void onNext(T value) {
             isEmpty = false;
             subscriber.onNext(value);
         }

         @Override
         public void onError(Throwable t) {
             subscriber.onError(t);
         }

         @Override
         public void onComplete() {
             if (isEmpty) {
                 try {
                     action.run();
                 } catch (Exception e) {
                     onError(e);
                     return;
                 }
             }
             subscriber.onComplete();
         }
     };
 }


'스터디 > RxJava2' 카테고리의 다른 글

[Rxjava2]Blocking Functions  (0) 2017.10.22
[RxJava2]Test시 Blocking Subscriber 방법  (0) 2017.10.22
[RxJava] Transform에서 공유 부분 피하기  (0) 2017.10.22
[RxJava2] Compose with Parameters  (0) 2017.10.22
[RxJava2]Compose 활용  (0) 2017.10.22

자신만의 커스텀 함수를 만들어서 사용시 공유되는 이슈가 발생된다.

예를 들어 보자.

인덱스를 넣고 값을 저장하는 IndexedValue데이터 클래스를 먼저 만든다.

static final class IndexedValue<T> {
    final int index;
    final T value;

    IndexedValue(int index , T value){
        this.index = index;
        this.value = value;
    }

    @Override
    public String toString() {
        return index + " - " + value;
    }
}

그리고 compose에 들어갈 커스텀 함수를 추가한다.

원하는 결과값은 index - value 을 나오는게 목표이다.

static <T>ObservableTransformer<T,IndexedValue<T>> withIndex(){
    final AtomicInteger indexer = new AtomicInteger(-1);
    return upstream -> upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet() , v));
}

아래는 테스트 함수이다.

Observable<IndexedValue<String>> indexedValueObservable =
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(withIndex());

indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 1 : " + v))
        .test();
indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 2 : " + v))
        .test();

============================================================================
Subscriber 1 : 0 - Alpha
Subscriber 1 : 1 - Beta
Subscriber 1 : 2 - Gamma
Subscriber 1 : 3 - Delta
Subscriber 1 : 4 - Epsilon
Subscriber 2 : 5 - Alpha
Subscriber 2 : 6 - Beta
Subscriber 2 : 7 - Gamma
Subscriber 2 : 8 - Delta
Subscriber 2 : 9 - Epsilon

위 결과에서 보듯이 값이 서로 공유되서 나오는 문제를 볼수 있다.

이유는 싱글 인스턴스인 AtomicInteger이기 때문이다.

이러한 문제를 해결하기 위해 defer 또는 fromCallable 을 추천한다.

static <T>ObservableTransformer<T,IndexedValue<T>> withIndex(){
    return upstream -> Observable.defer(() -> {
        final AtomicInteger indexer = new AtomicInteger(-1);
        return upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet() , v));
    });
}

이제 결과들이 공유되지 않는 걸 볼수 있다.

Subscriber 1 : 0 - Alpha
Subscriber 1 : 1 - Beta
Subscriber 1 : 2 - Gamma
Subscriber 1 : 3 - Delta
Subscriber 1 : 4 - Epsilon
Subscriber 2 : 0 - Alpha
Subscriber 2 : 1 - Beta
Subscriber 2 : 2 - Gamma
Subscriber 2 : 3 - Delta
Subscriber 2 : 4 - Epsilon

결론은 싱글 인스턴스 데이터를 사용시 공유되는 문제를 해결하기 위해 defer 또는 fromCallabe을 사용해서 오류를 피하자.

파라미터를 통해서 compose를 활용하는 방법에 대해서 알아보자.

우선 다음과 같은 내용이 있다.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .collect( StringBuilder::new , (b,s) -> {
            if ( b.length() == 0)
                b.append(s);
            else
                b.append("/").append(s);
        })
        .map(StringBuilder::toString)
        .subscribe(System.out::println);

====================================================================
Alpha/Beta/Gamma/Delta/Epsilon

주어진 문자열에 뒤에 "/" 을 추가를 해주는 아주 간단한 내용이다.

이 내용을 Compose로 변경시..

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(joinToString("/"))
                .subscribe(System.out::println);
public static ObservableTransformer<String , String> joinToString(String separator) {
    return upstream -> upstream
                        .collect( StringBuilder::new , (b,s) -> {
                            if ( b.length() == 0)
                                b.append(s);
                            else
                                b.append(separator).append(s);
                        })
                        .map(StringBuilder::toString)
                        .toObservable();
}

구분자를 넘겨서 공통함수로 뺄수 있다.

+

이런식으로 인자값을 넣어서 자주쓰는 함수를 자기만의 커스텀함수로 빼서 사용하도록 하자. (리팩토링도 할겸..)

하지만 주의 사항이 있다. 이러한 커스텀을 사용시 여러번의 Subscriber에서 자원을 공유하는 버그등이 발생될 수 있다.

이 부분에 대해서 피할 수 있는지 좀 더 알아보는 시간을 다음에 가져보자.

중복되어 있는 걸 커스텀 공통 함수로 빼고 compose로 활용하는 방법에 대해서 알아보자.

public static <T>ObservableTransformer<T , ImmutableList<T>> toImmutableList() {
        return upstream -> upstream.collect(ImmutableList::<T>builder , ImmutableList.Builder::add)
                .map(ImmutableList.Builder::build)
                //반드시 Single 또는 Observable로 리턴해야한다. 
                //Flowable -> toFlowable();
                .toObservable();
}

일단 공통 함수를 작성한다. 내용은 간단하다.

소스가 들어오면 collect로 imutableList로 변형해서 다시 옵저버 ( or Flowable)로 돌려준다.

그리고 해당 공통 함수에 다시 compose 에 이 추가된 내용을 넣어준다.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .compose(toImmutableList())
        .subscribe(System.out::println);

Observable.range(1 , 15)
        .compose(toImmutableList())
        .subscribe(System.out::println);

=================================================================
[Alpha, Beta, Gamma, Delta, Epsilon]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

코드가 줄어든게 보일것이다. 기본적인 활용은 이런식으로 만들 수 있다.

ObservableTransformer

우선 compose를 사용하는 방법에 대해서 알아보자.

compose를 사용하는 주된 이유는 어떠한 공통된 일련의 동작을 커스텀해서 변형할수 있다.

우선 사용하기 전 형태를 살펴보자.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .collect(ImmutableList::builder , ImmutableList.Builder::add)
        .map(ImmutableList.Builder::build)
        .subscribe(System.out::println);

Observable.range(1 , 15)
        .collect(ImmutableList::builder , ImmutableList.Builder::add)
        .map(ImmutableList.Builder::build)
        .subscribe(System.out::println);

''''
[Alpha, Beta, Gamma, Delta, Epsilon]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

위 내용은 주어진 문자열을 collect 함수로 이용해서 imutableList( Guava 라이버러리 ) 형태로 변경하고 다시 그걸 Builder -> List로 변경해주는 보통 RxJava 함수이다.

.map(ImmutableList.Builder::build)

을 추가시 SingleObserver<ImmutableList> 형태로 받지만..

하지만 안하는 경우 SingleObserver<ImmutableList.Builder>

으로 받기 때문에 map을 넣어서 변형을 해준다.

그래서 저기 두개의 내용을 보게되면

.collect(ImmutableList::builder , ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)

이 2개의 부분에서 중복적으로 사용되는 걸 볼수 있다.

Flowable.generate()

배압을 직접 관리가 가능하다. Flowable.generate 는 Consumer<Emitter<T>> 을 구현하면 된다.

예를 보자.

public static void main(String[] args){
    randomGenerator( 1, 10000)
            .subscribeOn(Schedulers.computation())
            .doOnNext(i -> System.out.println("Emitting " + i))
            .observeOn(Schedulers.io())
            .subscribe(i -> {
                sleep(5);
                System.out.println("Received item : " + i);
            });

    sleep(5000);
}

static Flowable<Integer> randomGenerator(int min , int max) {
    return Flowable.generate( emitter -> emitter.onNext(ThreadLocalRandom.current().nextInt(min , max)));
}
............
 ...
 Emitting 8014
 Emitting 3112
 Emitting 5958
 Emitting 4834 //128th emission
 Received 9563
 Received 4359
 Received 9362
 ...
 Received 4880
 Received 3192
 Received 979 //96th emission
 Emitting 8268
 Emitting 3889
 Emitting 2595
...

128 -> 96 -> 96 순서로 이루어지고 있다.

주의할 점은 onNext 를 한번만 사용가능하다. 그 이상 사용시 IllgalStateException 이 발생된다.

public static void main(String[] args) {
        rangeReverse(100,-100)
                 .subscribeOn(Schedulers.computation())
                 .doOnNext(i -> System.out.println("Emitting " + i))
                 .observeOn(Schedulers.io())
                 .subscribe(i -> {
                     sleep(50);
                     System.out.println("Received " + i);
                 });
        sleep(50000);
     }
    static Flowable<Integer> rangeReverse(int upperBound, int lowerBound) {
         return Flowable.generate(() -> new AtomicInteger(upperBound + 1),
                 (state, emitter) -> {
                     int current = state.decrementAndGet();
                     emitter.onNext(current);
                     if (current == lowerBound)
                         emitter.onComplete();
                 }
         );

     }
.............
Emitting 100
 Emitting 99
 ...
 Emitting -25
 Emitting -26
 Emitting -27 //128th emission
 Received 100
 Received 99
 Received 98
 ...
 Received 7
 Received 6
 Received 5 // 96th emission
 Emitting -28
 Emitting -29
 Emitting -30

이런식으로 사용이 가능하다. Flowable.create() 사용보다 더 자세한 컨트롤이 가능하다.

만약 Backpressure없는 Flowable을 사용시 어떤 함수를 사용하면 좋은지 알아보자.

우선 onBackpressureBuffer() 을 알아보자.

복습을 하자면

Flowable.interval( 1, TimeUnit.MILLISECONDS)
        .observeOn(Schedulers.io())
        .subscribe(i -> {
            sleep(5);
            System.out.println("Received MyItem : " + i);
        });

을 사용시 배출이 생성하는 것을 못 따라주기 때문에 오류가 난다.

Received MyItem : 21
Received MyItem : 22
Received MyItem : 23
Received MyItem : 24
io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:76)

그러기 때문에 배압(Backpressure)을 넣어줘야 한다.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer()
    .observeOn(Schedulers.io())
    .subscribe(i -> {
        sleep(5);
        System.out.println("Received MyItem : " + i);
});
...........
....
Received MyItem : 47
Received MyItem : 48
Received MyItem : 49
Received MyItem : 50
Received MyItem : 51
Received MyItem : 52
Received MyItem : 53
Received MyItem : 54
Received MyItem : 55
Received MyItem : 56
Received MyItem : 57
Received MyItem : 58
....

정상적으로 작동이 된다. onBackpressureBuffer 의 경우 배압이 될 만한 부분을 버퍼에 넣어서 한꺼번에 배출시키는 역할을 한다.

인자로 주는 경우 주로 많이 쓰이는 내용(BackpressOverflowStrategy) 를 알아보자. 오버플로가 될 경우 발생된다.

ERROR용량이 가득차면 간단하게 오류를 뱉는다.
DROP_OLDEST최신껄 위해 가장 오랜된 걸 먼저 비운다.
DROP_LATEST최신껄 삭제해서 사용되지 않은 오래된 값을 우선순위를 지정한다.

그럼 DROP_LATEST 를 사용해보자.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
        //long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy
        //Action은 오버플로우 발생시 콜백을 지정할 수 있다.
        .onBackpressureBuffer(10 , ()-> System.out.println("overflow") , BackpressureOverflowStrategy.DROP_LATEST)
        .observeOn(Schedulers.io())
        .subscribe(i -> {
            sleep(5);
            System.out.println("Received MyItem : " + i);
});
.................
Received MyItem : 135
overflow
overflow
overflow
overflow
overflow
Received MyItem : 136 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
Received MyItem : 489 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
overflow
Received MyItem : 490

넘칠 경우 overflow 출력이 되고 있다. DROP_LAST 를 사용시 136 에서 489 사이에 숫자들이 생략된게 보인다.

이유는 이미 가득차서 최신껄 삭제를 해서 다시 넣는 과정을 거친다.

onBackpressureLatest()

onBackpressureBuffer() 의 약간 변형이다. 최신 값을 제공하며 그 시간동안 방출된 이전 값은 모두 제거한다.

예를 보자.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
        .onBackpressureLatest()
        .observeOn(Schedulers.io())
        .subscribe(i -> {
            sleep(5);
            System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 125
Received MyItem : 126
Received MyItem : 127 <------
Received MyItem : 485 <------
Received MyItem : 486
Received MyItem : 487

숫자가 갑자기 점프를 하였다. 이유는 이미 안은 차서 최신껄 제공하고 이전껀 제거를 하고 있기 때문이다.

onBackPressureDrop()

오버플로 발생시 단순히 해당 아이템을 드랍함으로써 질서를 유지한다.

예를 보자.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop(i -> System.out.println("Dropping " + i))
    .observeOn(Schedulers.io())
    .subscribe(i -> {
        sleep(5);
        System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 1583
Dropping 2013
Dropping 2014
Dropping 2015
Dropping 2016
Received MyItem : 1584
Dropping 2017
Received MyItem : 1585
Received MyItem : 1586
Received MyItem : 1587
Received MyItem : 1588

오버 플로우시 Drop 을 하고 있다.

다음 장에선 Flowable.generate() 을 살펴볼 예정이다.

만약 우리가 배압을 직접 컨트롤 하고 싶을때 사용하는 함수이다.

Flowable 을 생성시 BackpressureStrategy 을 사용이 가능하다.

예제를 한번 보자.

Flowable<Integer> source = Flowable.create(emitter -> {
    for (int i=0;i<1000;i++) {
        if(emitter.isCancelled())
            return;

        emitter.onNext(i);
    }

    emitter.onComplete();    
}  , BackpressureStrategy.BUFFER); //BackpressureStrategy.BUFFER 을 인자로 넣어서 생성이 가능하다. 

source.observeOn(Schedulers.io())
        .subscribe(System.out::println);

sleep( 1000 );
..........
0
1
2
3
4
...

인자로 넣을 수 있는 건 5가지가 있다.

BackpressureStrategy설명
Missing배압을 적용안한다. 후에 onBackpressureXXX()등으로 컨트롤이 가능하다.
ErrorMissingBackpressureException 발생시 에러를 발생시킨다.
BufferonBackpressureBuffer() 와 같은 형태
Lastest다운스트림이 받을때까지 마지막꺼만 유지를 한다.

Observable -> Flowable 로 변경시에도 인자로 넣는게 가능하다.

Observable<Integer> source = Observable.range(1,1000);
source.toFlowable(BackpressureStrategy.BUFFER)

하지만 조심해야 한다. Buffer는 초과시 OutOfMemory 오류가 발생한다.

Flowable -> Observable 변경시에도 toObserable()으로 가능하다.

Flowable<Integer> integers =
                 Flowable.range(1, 1000)
                         .subscribeOn(Schedulers.computation());
        Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
                 .flatMap(s -> integers.map(i -> i + "-" + s)
                 //위로는 Backpressure 지원함                 
                 .toObservable())           
                 //아래로는 지원안함..만약 request를 할때에도 적용안되니 주의..(subscriber 참조)      
                 .subscribe(System.out::println);


만약 배출된 항목에 대해서 다시 조작하고 싶을 때 어떻게 하면 될까?

이 방법에 대해서 subscriber 을 적용해서 진행할 수 있다.

예를 보자.

public static void main(String[] args){
    Flowable.range(1, 100)
            .doOnNext( v -> System.out.println("push -> " + v))
            .observeOn(Schedulers.io())
            .map( i -> intenseCalculation(i))
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    System.out.println( "onSubscribe" );
                    s.request(10);
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
                    sleep( 50 );
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println( "onSubscribe" );
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });

    sleep( Long.MAX_VALUE );
}

public static <T> T intenseCalculation(T value){
    //최대 0.2초 sleep
    int time = ThreadLocalRandom.current().nextInt(200);
    System.out.println("sleep time : " + time);
    sleep( time );
    return value;
}

public static void sleep(long mills) {
    try{
        Thread.sleep( mills );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

...................
.....
push -> 124
push -> 125
push -> 126
push -> 127
push -> 128
onNext 1, thread -> RxCachedThreadScheduler-1
sleep time : 180
onNext 2, thread -> RxCachedThreadScheduler-1
sleep time : 165
onNext 3, thread -> RxCachedThreadScheduler-1
sleep time : 52
onNext 4, thread -> RxCachedThreadScheduler-1
sleep time : 63
......
onNext 95, thread -> RxCachedThreadScheduler-1
sleep time : 36
onNext 96, thread -> RxCachedThreadScheduler-1
push -> 129
push -> 130
push -> 131

위와 같이 Flowable (A) , Subscriber (B) 라고 가정하면

A(128개 배출) -> B(96개) -> A(96) -> .... 이런식으로 배출하는 걸 볼수 있다.

즉 subscriber 로 업스트림으로 진행이 가능하다.

자 그러면 이걸 활용을 어떤식으로 할수 있는지 보자.

Flowable.range(1, 1000)
        .doOnNext( v -> System.out.println("push -> " + v))
        .observeOn(Schedulers.io())
        .map( i -> intenseCalculation(i))
        .subscribe(new Subscriber<Integer>() {
            Subscription subscription;
            AtomicInteger count = new AtomicInteger(0);
            @Override
            public void onSubscribe(Subscription s) {
                this.subscription = s;
                System.out.println( "onSubscribe" );
                s.request(40);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
                sleep( 50 );
                if( count.incrementAndGet() % 20 == 0 && count.get() >= 40) {
                    System.out.println("Requesting 20 more!!");
                    subscription.request(20);
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println( "onSubscribe" );
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
...........
onSubscribe
push -> 1
push -> 2
push -> 3
push -> 4
push -> 5
push -> 6
push -> 7
....
onNext 38, thread -> RxCachedThreadScheduler-1
sleep time : 128
onNext 39, thread -> RxCachedThreadScheduler-1
sleep time : 88
onNext 40, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
.....
sleep time : 74
onNext 59, thread -> RxCachedThreadScheduler-1
sleep time : 83
onNext 60, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
sleep time : 38
onNext 61, thread -> RxCachedThreadScheduler-1
sleep time : 148
onNext 62, thread -> RxCachedThreadScheduler-1
sleep time : 159

위 내용은 배출이 되지만 40번을 끝내고 다시 request를 해서 20번을 더 진행하고 계속 요청하고 있다.

이 부분을 보면 무엇이 떠오를까.. 앱개발시 리스트 조회시 fetch를 좀 더 효율적으로 가능할 수 있는 방법이 떠오른다.

추후에 적용해볼수 있을것 같다.'

그리고 처음 push 아이템은 언제 실행될까?

128(push) -> 96(request) -> 96(push) ->.... 이런식으로 진행된다. 보면 볼수록 매력이 있다...

주의할 점은 Subscriber 을 통해서 업스트림이 되는 게 아니란 점...

단지 그 요청을 상류쪽에서 중계를 방법을 결정하는 것입니다.

+ Recent posts