/***
 * This is a helper wrapper Subscriber that helps you lazily defer
 * continuous paging of a result set from some API.
 * Through the use of a {@link Subject}, it helps notify the original {@link Observable}
 * when to perform an additional fetch.
 * The notification is sent when a certain count of items has been reached.
 * Generally this count represents the page.
 * @param <T> The event type
 */
@Data
public class PagingSubscriber<T> extends Subscriber<T> {

    private final Subject<Void,Void> nextPageTrigger = PublishSubject.create();
    private final long pageSize;
    private long count = 0;
    private final Subscriber<T> delegate;

    /***
     * Creates the {@link PagingSubscriber}
     * @param pageSize
     * @param delegate
     */
    public PagingSubscriber(long pageSize, Subscriber<T> delegate) {
        this.pageSize = pageSize;
        this.delegate = delegate;
    }

    public Observable<Void> getNextPageTrigger() {
        return nextPageTrigger;
    }

    @Override
    public void onStart() {
        delegate.onStart();
    }

    @Override
    public void onCompleted() {
        delegate.onCompleted();
    }

    @Override
    public void onError(Throwable e) {
        delegate.onError(e);
    }

    @Override
    public void onNext(T t) {
        count+=1;
        if (count == pageSize) {
            nextPageTrigger.onNext(null);
            count= 0;
        }
        delegate.onNext(t);
    }

}

@Data
public class GitHubRepositoryApplication {

    private final GitHubService gitHubService;

    @Inject
    public GitHubRepositoryApplication(GitHubService githubService) {
        this.gitHubService = githubService;
    }

    public Observable<GitHubRepository> printAllRepositories(Observable<Void> nextPageTrigger) {
        return printRepositoryPages(GitHubService.FIRST_PAGE, nextPageTrigger)
                .flatMapIterable(r -> r.body());
    }


    public Observable<Response<List<GitHubRepository>>> printRepositoryPages(String startingPage, Observable<Void> nextPageTrigger) {
        return gitHubService.listRepos(startingPage)
                .concatMap(response -> {
                    Optional<String> nextPage = Optional.ofNullable(response.headers().get(HttpHeaders.LINK))
                            .flatMap(header -> GitHubServiceUtils.getNextToken(header));

                    if (!nextPage.isPresent()) {
                        return Observable.just(response);
                    }
                    return Observable.just(response)
                            .concatWith(nextPageTrigger.limit(1).ignoreElements().cast(Response.class))
                            .concatWith(printRepositoryPages(nextPage.get(), nextPageTrigger));
                });
    }

    public static void main(String[] args) {
        Injector injector = Guice.createInjector(new GitHubModule());

        GitHubRepositoryApplication app = injector.getInstance(GitHubRepositoryApplication.class);

        Subscriber<GitHubRepository> subscriber = new Subscriber<GitHubRepository>() {

            private final Logger log = LoggerFactory.getLogger(getClass());

            @Override
            public void onStart() {
                log.debug("STARTING");
                request(1l);//we need to begin the request
            }

            @Override
            public void onCompleted() {
                log.debug("COMPLETED");
            }

            @Override
            public void onError(Throwable e) {
                log.error("ERROR",e);
            }

            @Override
            public void onNext(GitHubRepository gitHubRepository) {
                log.debug("{}",gitHubRepository);
                request(1l);//we need to make sure we have asked for another element
            }
        };

        PagingSubscriber<GitHubRepository> pagingSubscriber = new PagingSubscriber<>(GitHubService.PAGE_SIZE, subscriber);

        //In order for the JVM not to quit out, we make sure we turn our Observable to
        //a BlockingObservable, so that all of it will finish.
        Observable<GitHubRepository> observable =
                app.printAllRepositories(pagingSubscriber.getNextPageTrigger());
        observable.toBlocking().subscribe(pagingSubscriber);

    }

}


+ Recent posts