요구사항

내 응용 프로그램에서는 최근에 다음과 같은 요구 사항이 있습니다. 항목의 반응적인 스트림에서 특정 항목이 하나 방출 될 때까지 기다린 다음 모든 항목을 관찰자에게 내 보냅니다. 해당 항목을 찾을 수 없으면 아무 것도 출력하지 말고 오류로 종료하십시오.


https://medium.com/@Cypressious/rxjava-kotlin-conditionally-delaying-the-first-item-in-a-stream-9d4e7a8d0071

만약 Backpressure없는 Flowable을 사용시 어떤 함수를 사용하면 좋은지 알아보자.

우선 onBackpressureBuffer() 을 알아보자.

복습을 하자면

Flowable.interval( 1, TimeUnit.MILLISECONDS)
        .observeOn(Schedulers.io())
        .subscribe(i -> {
            sleep(5);
            System.out.println("Received MyItem : " + i);
        });

을 사용시 배출이 생성하는 것을 못 따라주기 때문에 오류가 난다.

Received MyItem : 21
Received MyItem : 22
Received MyItem : 23
Received MyItem : 24
io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:76)

그러기 때문에 배압(Backpressure)을 넣어줘야 한다.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer()
    .observeOn(Schedulers.io())
    .subscribe(i -> {
        sleep(5);
        System.out.println("Received MyItem : " + i);
});
...........
....
Received MyItem : 47
Received MyItem : 48
Received MyItem : 49
Received MyItem : 50
Received MyItem : 51
Received MyItem : 52
Received MyItem : 53
Received MyItem : 54
Received MyItem : 55
Received MyItem : 56
Received MyItem : 57
Received MyItem : 58
....

정상적으로 작동이 된다. onBackpressureBuffer 의 경우 배압이 될 만한 부분을 버퍼에 넣어서 한꺼번에 배출시키는 역할을 한다.

인자로 주는 경우 주로 많이 쓰이는 내용(BackpressOverflowStrategy) 를 알아보자. 오버플로가 될 경우 발생된다.

ERROR용량이 가득차면 간단하게 오류를 뱉는다.
DROP_OLDEST최신껄 위해 가장 오랜된 걸 먼저 비운다.
DROP_LATEST최신껄 삭제해서 사용되지 않은 오래된 값을 우선순위를 지정한다.

그럼 DROP_LATEST 를 사용해보자.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
        //long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy
        //Action은 오버플로우 발생시 콜백을 지정할 수 있다.
        .onBackpressureBuffer(10 , ()-> System.out.println("overflow") , BackpressureOverflowStrategy.DROP_LATEST)
        .observeOn(Schedulers.io())
        .subscribe(i -> {
            sleep(5);
            System.out.println("Received MyItem : " + i);
});
.................
Received MyItem : 135
overflow
overflow
overflow
overflow
overflow
Received MyItem : 136 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
Received MyItem : 489 <----- 이 부분을 유의해서 보자.
overflow
overflow
overflow
overflow
overflow
Received MyItem : 490

넘칠 경우 overflow 출력이 되고 있다. DROP_LAST 를 사용시 136 에서 489 사이에 숫자들이 생략된게 보인다.

이유는 이미 가득차서 최신껄 삭제를 해서 다시 넣는 과정을 거친다.

onBackpressureLatest()

onBackpressureBuffer() 의 약간 변형이다. 최신 값을 제공하며 그 시간동안 방출된 이전 값은 모두 제거한다.

예를 보자.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
        .onBackpressureLatest()
        .observeOn(Schedulers.io())
        .subscribe(i -> {
            sleep(5);
            System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 125
Received MyItem : 126
Received MyItem : 127 <------
Received MyItem : 485 <------
Received MyItem : 486
Received MyItem : 487

숫자가 갑자기 점프를 하였다. 이유는 이미 안은 차서 최신껄 제공하고 이전껀 제거를 하고 있기 때문이다.

onBackPressureDrop()

오버플로 발생시 단순히 해당 아이템을 드랍함으로써 질서를 유지한다.

예를 보자.

Flowable.interval( 1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop(i -> System.out.println("Dropping " + i))
    .observeOn(Schedulers.io())
    .subscribe(i -> {
        sleep(5);
        System.out.println("Received MyItem : " + i);
});
.........
Received MyItem : 1583
Dropping 2013
Dropping 2014
Dropping 2015
Dropping 2016
Received MyItem : 1584
Dropping 2017
Received MyItem : 1585
Received MyItem : 1586
Received MyItem : 1587
Received MyItem : 1588

오버 플로우시 Drop 을 하고 있다.

다음 장에선 Flowable.generate() 을 살펴볼 예정이다.

만약 우리가 배압을 직접 컨트롤 하고 싶을때 사용하는 함수이다.

출처: http://doohyun.tistory.com/44 [N`s Lab]


List<SubjectRelation> subjectRelationList = Arrays.asList(
                    new SubjectRelation(11001"Doohyun Nam"1)
                    , new SubjectRelation(11002"Dolkin"2)
                    , new SubjectRelation(11003"hshawng"1)
                    , new SubjectRelation(11004"spKwon"1)
                    , new SubjectRelation(21005"Other Person1"3)
                    , new SubjectRelation(21006"Other Person2"4)
            );
 
// create Map
Map<Integer, Collection<String>> mapTest = Observable.fromIterable(subjectRelationList).
           toMultimap(SubjectRelation::getCompanySubjectSn, SubjectRelation::getMemberName).
           blockingGet();
 
 
// only subscribe
Observable.fromIterable(subjectRelationList)
            .groupBy(SubjectRelation::getCompanySubjectSn).subscribe(group -> {
                  System.out.println(group.getKey());
                group.map(SubjectRelation::getMemberName).forEach(System.out::println);
            });
 
// create multi group
 Map<Integer, Map<Integer, Collection<String>>> doubleKeyMap = new HashMap<>();
            Observable.fromIterable(subjectRelationList).
                    groupBy(SubjectRelation::getCompanySubjectSn).
                    blockingSubscribe(group ->
                        doubleKeyMap.put(group.getKey(),
                                group.toMultimap(
                                        SubjectRelation::getOrganizationSubjectSn
                                        , SubjectRelation::getMemberName).blockingGet())
                    );
 
// partitioning
Map<Boolean, Collection<String>> partitioningMap = Observable.fromIterable(subjectRelationList).
             toMultimap(subjectRelation -> subjectRelation.getCompanySubjectSn().intValue() == 1
                 , SubjectRelation::getMemberName).
             blockingGet();



출처 : https://stackoverflow.com/a/43343039
Observable<String> observable1 = Observable.from(new String[]{"A", "B", "C", "D"});
Observable<String> observable2 = Observable.from(new String[]{"E", "C", "B", "G", "J", "O"});

observable1.concatMap(new Func1<String, Observable<Boolean>>() {
    @Override
    public Observable<Boolean> call(final String string) {
        return observable2.contains(string);
    }
}).zipWith(observable1, new Func2<Boolean, String, String>() {
    @Override
    public String call(final Boolean contains, final String string) {
        return contains ? "" : string;
    }
}).filter(new Func1<String, Boolean>() {
    @Override
    public Boolean call(final String string) {
        return !TextUtils.isEmpty(string);
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(final String string) {
        Log.d("observable:", string);
    }
});


'링크모음 > rxjava' 카테고리의 다른 글

Rxjava2 Collection 예제  (0) 2017.09.15
RxJava Extection 라이버러리  (0) 2017.09.15
RxAndroid 활용한 라이버러리 모음  (0) 2017.09.14
RxJava를 활용한 페이징(Paging)  (0) 2017.09.13
RxJava1 -< 2로 올릴때 주의할 점  (0) 2017.08.28

출처 : https://github.com/ReactiveX/RxAndroid/wiki


  • RxLifecycle - Lifecycle handling APIs for Android apps using RxJava
  • RxBinding - RxJava binding APIs for Android's UI widgets.
  • SqlBrite - A lightweight wrapper around SQLiteOpenHelper and ContentResolver which introduces reactive stream semantics to queries.
  • Android-ReactiveLocation - Library that wraps location play services API boilerplate with a reactive friendly API. (RxJava 1)
  • RxLocation - Reactive Location APIs Library for Android. (RxJava 2)
  • rx-preferences - Reactive SharedPreferences for Android
  • RxFit - Reactive Fitness API Library for Android
  • RxWear - Reactive Wearable API Library for Android
  • RxPermissions - Android runtime permissions powered by RxJava
  • RxNotification - Easy way to register, remove and manage notifications using RxJava
  • RxClipboard - RxJava binding APIs for Android clipboard.
  • RxBroadcast - RxJava bindings for Broadcast and LocalBroadcast.
  • RxAndroidBle - Reactive library for handling Bluetooth LE devices.
  • RxImagePicker - Reactive library for selecting images from gallery or camera.
  • ReactiveNetwork - Reactive library listening network connection state and Internet connectivity (compatible with RxJava1.x and RxJava2.x)
  • ReactiveBeacons - Reactive library scanning BLE (Bluetooth Low Energy) beacons nearby (compatible with RxJava1.x and RxJava2.x)
  • ReactiveAirplaneMode - Reactive library listening airplane mode (compatible with RxJava2.x)
  • RxDataBinding - RxJava2 binding APIs for Android's Data Binding Library.
  • RxLocationManager - RxJava/RxJava2 wrap around standard Android LocationManager without Google Play Services.


/***
 * This is a helper wrapper Subscriber that helps you lazily defer
 * continuous paging of a result set from some API.
 * Through the use of a {@link Subject}, it helps notify the original {@link Observable}
 * when to perform an additional fetch.
 * The notification is sent when a certain count of items has been reached.
 * Generally this count represents the page.
 * @param <T> The event type
 */
@Data
public class PagingSubscriber<T> extends Subscriber<T> {

    private final Subject<Void,Void> nextPageTrigger = PublishSubject.create();
    private final long pageSize;
    private long count = 0;
    private final Subscriber<T> delegate;

    /***
     * Creates the {@link PagingSubscriber}
     * @param pageSize
     * @param delegate
     */
    public PagingSubscriber(long pageSize, Subscriber<T> delegate) {
        this.pageSize = pageSize;
        this.delegate = delegate;
    }

    public Observable<Void> getNextPageTrigger() {
        return nextPageTrigger;
    }

    @Override
    public void onStart() {
        delegate.onStart();
    }

    @Override
    public void onCompleted() {
        delegate.onCompleted();
    }

    @Override
    public void onError(Throwable e) {
        delegate.onError(e);
    }

    @Override
    public void onNext(T t) {
        count+=1;
        if (count == pageSize) {
            nextPageTrigger.onNext(null);
            count= 0;
        }
        delegate.onNext(t);
    }

}

@Data
public class GitHubRepositoryApplication {

    private final GitHubService gitHubService;

    @Inject
    public GitHubRepositoryApplication(GitHubService githubService) {
        this.gitHubService = githubService;
    }

    public Observable<GitHubRepository> printAllRepositories(Observable<Void> nextPageTrigger) {
        return printRepositoryPages(GitHubService.FIRST_PAGE, nextPageTrigger)
                .flatMapIterable(r -> r.body());
    }


    public Observable<Response<List<GitHubRepository>>> printRepositoryPages(String startingPage, Observable<Void> nextPageTrigger) {
        return gitHubService.listRepos(startingPage)
                .concatMap(response -> {
                    Optional<String> nextPage = Optional.ofNullable(response.headers().get(HttpHeaders.LINK))
                            .flatMap(header -> GitHubServiceUtils.getNextToken(header));

                    if (!nextPage.isPresent()) {
                        return Observable.just(response);
                    }
                    return Observable.just(response)
                            .concatWith(nextPageTrigger.limit(1).ignoreElements().cast(Response.class))
                            .concatWith(printRepositoryPages(nextPage.get(), nextPageTrigger));
                });
    }

    public static void main(String[] args) {
        Injector injector = Guice.createInjector(new GitHubModule());

        GitHubRepositoryApplication app = injector.getInstance(GitHubRepositoryApplication.class);

        Subscriber<GitHubRepository> subscriber = new Subscriber<GitHubRepository>() {

            private final Logger log = LoggerFactory.getLogger(getClass());

            @Override
            public void onStart() {
                log.debug("STARTING");
                request(1l);//we need to begin the request
            }

            @Override
            public void onCompleted() {
                log.debug("COMPLETED");
            }

            @Override
            public void onError(Throwable e) {
                log.error("ERROR",e);
            }

            @Override
            public void onNext(GitHubRepository gitHubRepository) {
                log.debug("{}",gitHubRepository);
                request(1l);//we need to make sure we have asked for another element
            }
        };

        PagingSubscriber<GitHubRepository> pagingSubscriber = new PagingSubscriber<>(GitHubService.PAGE_SIZE, subscriber);

        //In order for the JVM not to quit out, we make sure we turn our Observable to
        //a BlockingObservable, so that all of it will finish.
        Observable<GitHubRepository> observable =
                app.printAllRepositories(pagingSubscriber.getNextPageTrigger());
        observable.toBlocking().subscribe(pagingSubscriber);

    }

}


rxjava 가 메이저 버전 업(1->2)을 하면서 몇 가지 변경점이 생겼다.

변경점에 대한 자세한 내용은 아래 링크를 참조하기 바란다.

Flowable 이라는 base reactive class 가 추가 되었다. Observable 과의 차이는 backpressure buffer의 기본 탑재 유무이다.

출처 : https://stackoverflow.com/a/42005735

I'd suggest using a Single as it is more accurate representation of the data flow: you make a request to the server and the you get either one emission of data OR an error:

Single:     onSubscribe (onSuccess | onError)?

For an Observable you could theoretically get several emissions of data AND an error

Observable: onSubscribe onNext? (onCompleted | onError)?

However, if you are using , I'd suggest using a Maybe instead of Single. The difference between those two is that Maybe handles also the case when you get the response from server but it contains no body.

Maybe:      onSubscribe (onSuccess | onCompleted | onError)?


+ Recent posts