목록전체 글 (1489)
오늘도 공부
ObservableTransformer우선 compose를 사용하는 방법에 대해서 알아보자.compose를 사용하는 주된 이유는 어떠한 공통된 일련의 동작을 커스텀해서 변형할수 있다.우선 사용하기 전 형태를 살펴보자.Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon") .collect(ImmutableList::builder , ImmutableList.Builder::add) .map(ImmutableList.Builder::build) .subscribe(System.out::println); Observable.range(1 , 15) .collect(ImmutableList::builder , ImmutableList.Builder::add..
Flowable.generate()배압을 직접 관리가 가능하다. Flowable.generate 는 Consumer 을 구현하면 된다.예를 보자.public static void main(String[] args){ randomGenerator( 1, 10000) .subscribeOn(Schedulers.computation()) .doOnNext(i -> System.out.println("Emitting " + i)) .observeOn(Schedulers.io()) .subscribe(i -> { sleep(5); System.out.println("Received item : " + i); }); sleep(5000); } static Flowable randomGenerator(int min ..
만약 Backpressure없는 Flowable을 사용시 어떤 함수를 사용하면 좋은지 알아보자.우선 onBackpressureBuffer() 을 알아보자.복습을 하자면Flowable.interval( 1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .subscribe(i -> { sleep(5); System.out.println("Received MyItem : " + i); }); 을 사용시 배출이 생성하는 것을 못 따라주기 때문에 오류가 난다.Received MyItem : 21 Received MyItem : 22 Received MyItem : 23 Received MyItem : 24 io.reactivex.exceptions.OnErrorNot..
Flowable 을 생성시 BackpressureStrategy 을 사용이 가능하다.예제를 한번 보자.Flowable source = Flowable.create(emitter -> { for (int i=0;i Flowable 로 변경시에도 인자로 넣는게 가능하다.Observable source = Observable.range(1,1000); source.toFlowable(BackpressureStrategy.BUFFER) 하지만 조심해야 한다. Buffer는 초과시 OutOfMemory 오류가 발생한다.Flowable -> Observable 변경시에도 toObserable()으로 가능하다.Flowable integers = Flowable.range(1, 1000) .subscribeOn(Sch..
만약 배출된 항목에 대해서 다시 조작하고 싶을 때 어떻게 하면 될까?이 방법에 대해서 subscriber 을 적용해서 진행할 수 있다.예를 보자.public static void main(String[] args){ Flowable.range(1, 100) .doOnNext( v -> System.out.println("push -> " + v)) .observeOn(Schedulers.io()) .map( i -> intenseCalculation(i)) .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { System.out.println( "onSubscribe" ); s.request(10); } @Ove..
Flowable.range(), Flowable.just() , Flowable.fromIterable(), Flowable.interval() 의 경우 배압(Backpressure) 이 설계가 되어있는 편이다.하지만 Interval() 의 경우 시간에 따른 배출이 되는 데 이 부분에서 논리적으로 배압이 되긴 힘들다.배출은 시간이 거리지만 이미 다운 스트림은 실행이 될수 있기 때문이다.예를 들어보자.public static void main(String[] args){ Flowable.interval(1, TimeUnit.MICROSECONDS) .doOnNext( v -> System.out.println("push -> " + v)) .observeOn(Schedulers.io()) .map( i -..
이제 배압(Back pressure) 을 해결 하기 위해 Flowable(RxJava2 부터 도입) 을 했다.한번 살펴보도록 하자.//Observable -> Flowable 변경 Flowable.range( 1, 999_999_999) .map(MyItem::new) .observeOn(Schedulers.io()) .subscribe(myItem -> { sleep(50); System.out.println("Received MyItem : " + myItem.id); }); sleep(Integer.MAX_VALUE); ''' Constructing MyItem 1 Constructing MyItem 2 Constructing MyItem 3 ... Constructing MyItem 127 Con..
Observable은 주로 푸시를 기반으로 하는 성격을 지닌다.예를 들어 하나의 쓰레드에서 1에서 999,999,999까지 숫자를 내보내는 경우 이상없는 지 체크 해보자.우선 옵저버를 만들고 푸시(배출)을 진행해보자.public static void main(String[] args){ Observable.range( 1, 999_999_999) .map(MyItem::new) .subscribe(myItem -> { sleep(50); System.out.println("Received MyItem : " + myItem.id); }); } static void sleep(long millseconds) { try{ Thread.sleep(millseconds); }catch (InterruptedEx..
다음 순서 예정입니다. 배압이란 무엇인가.Flowable 과 Subscriber에 대한 이해Flowable.create() 을 사용해보는 시간Observables과 Flowables 상호관의 연동배압관련 함수들Flowable.generate() 사용
SwitchMap()flatMap과 비슷하지만 큰 차이점은 제일 마지막 것만 배출 한다는 차이가 있다.예제를 보자.public static void main(String[] args) { Observable items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta", "Eta", "Theta", "Iota"); Observable processings = items.concatMap( s -> Observable.just(s).delay(randomSleepTime() , TimeUnit.MILLISECONDS)); processings.subscribe( System.out::println); sleep( 20000 ); }..
