/***
* This is a helper wrapper Subscriber that helps you lazily defer
* continuous paging of a result set from some API.
* Through the use of a {@link Subject}, it helps notify the original {@link Observable}
* when to perform an additional fetch.
* The notification is sent when a certain count of items has been reached.
* Generally this count represents the page.
* @param <T> The event type
*/
@Data
public class PagingSubscriber<T> extends Subscriber<T> {
private final Subject<Void,Void> nextPageTrigger = PublishSubject.create();
private final long pageSize;
private long count = 0;
private final Subscriber<T> delegate;
/***
* Creates the {@link PagingSubscriber}
* @param pageSize
* @param delegate
*/
public PagingSubscriber(long pageSize, Subscriber<T> delegate) {
this.pageSize = pageSize;
this.delegate = delegate;
}
public Observable<Void> getNextPageTrigger() {
return nextPageTrigger;
}
@Override
public void onStart() {
delegate.onStart();
}
@Override
public void onCompleted() {
delegate.onCompleted();
}
@Override
public void onError(Throwable e) {
delegate.onError(e);
}
@Override
public void onNext(T t) {
count+=1;
if (count == pageSize) {
nextPageTrigger.onNext(null);
count= 0;
}
delegate.onNext(t);
}
}
@Data
public class GitHubRepositoryApplication {
private final GitHubService gitHubService;
@Inject
public GitHubRepositoryApplication(GitHubService githubService) {
this.gitHubService = githubService;
}
public Observable<GitHubRepository> printAllRepositories(Observable<Void> nextPageTrigger) {
return printRepositoryPages(GitHubService.FIRST_PAGE, nextPageTrigger)
.flatMapIterable(r -> r.body());
}
public Observable<Response<List<GitHubRepository>>> printRepositoryPages(String startingPage, Observable<Void> nextPageTrigger) {
return gitHubService.listRepos(startingPage)
.concatMap(response -> {
Optional<String> nextPage = Optional.ofNullable(response.headers().get(HttpHeaders.LINK))
.flatMap(header -> GitHubServiceUtils.getNextToken(header));
if (!nextPage.isPresent()) {
return Observable.just(response);
}
return Observable.just(response)
.concatWith(nextPageTrigger.limit(1).ignoreElements().cast(Response.class))
.concatWith(printRepositoryPages(nextPage.get(), nextPageTrigger));
});
}
public static void main(String[] args) {
Injector injector = Guice.createInjector(new GitHubModule());
GitHubRepositoryApplication app = injector.getInstance(GitHubRepositoryApplication.class);
Subscriber<GitHubRepository> subscriber = new Subscriber<GitHubRepository>() {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void onStart() {
log.debug("STARTING");
request(1l);//we need to begin the request
}
@Override
public void onCompleted() {
log.debug("COMPLETED");
}
@Override
public void onError(Throwable e) {
log.error("ERROR",e);
}
@Override
public void onNext(GitHubRepository gitHubRepository) {
log.debug("{}",gitHubRepository);
request(1l);//we need to make sure we have asked for another element
}
};
PagingSubscriber<GitHubRepository> pagingSubscriber = new PagingSubscriber<>(GitHubService.PAGE_SIZE, subscriber);
//In order for the JVM not to quit out, we make sure we turn our Observable to
//a BlockingObservable, so that all of it will finish.
Observable<GitHubRepository> observable =
app.printAllRepositories(pagingSubscriber.getNextPageTrigger());
observable.toBlocking().subscribe(pagingSubscriber);
}
}