목록전체 글 (1525)
오늘도 공부
만약 배출된 항목에 대해서 다시 조작하고 싶을 때 어떻게 하면 될까?이 방법에 대해서 subscriber 을 적용해서 진행할 수 있다.예를 보자.public static void main(String[] args){ Flowable.range(1, 100) .doOnNext( v -> System.out.println("push -> " + v)) .observeOn(Schedulers.io()) .map( i -> intenseCalculation(i)) .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { System.out.println( "onSubscribe" ); s.request(10); } @Ove..
Flowable.range(), Flowable.just() , Flowable.fromIterable(), Flowable.interval() 의 경우 배압(Backpressure) 이 설계가 되어있는 편이다.하지만 Interval() 의 경우 시간에 따른 배출이 되는 데 이 부분에서 논리적으로 배압이 되긴 힘들다.배출은 시간이 거리지만 이미 다운 스트림은 실행이 될수 있기 때문이다.예를 들어보자.public static void main(String[] args){ Flowable.interval(1, TimeUnit.MICROSECONDS) .doOnNext( v -> System.out.println("push -> " + v)) .observeOn(Schedulers.io()) .map( i -..
이제 배압(Back pressure) 을 해결 하기 위해 Flowable(RxJava2 부터 도입) 을 했다.한번 살펴보도록 하자.//Observable -> Flowable 변경 Flowable.range( 1, 999_999_999) .map(MyItem::new) .observeOn(Schedulers.io()) .subscribe(myItem -> { sleep(50); System.out.println("Received MyItem : " + myItem.id); }); sleep(Integer.MAX_VALUE); ''' Constructing MyItem 1 Constructing MyItem 2 Constructing MyItem 3 ... Constructing MyItem 127 Con..
Observable은 주로 푸시를 기반으로 하는 성격을 지닌다.예를 들어 하나의 쓰레드에서 1에서 999,999,999까지 숫자를 내보내는 경우 이상없는 지 체크 해보자.우선 옵저버를 만들고 푸시(배출)을 진행해보자.public static void main(String[] args){ Observable.range( 1, 999_999_999) .map(MyItem::new) .subscribe(myItem -> { sleep(50); System.out.println("Received MyItem : " + myItem.id); }); } static void sleep(long millseconds) { try{ Thread.sleep(millseconds); }catch (InterruptedEx..
다음 순서 예정입니다. 배압이란 무엇인가.Flowable 과 Subscriber에 대한 이해Flowable.create() 을 사용해보는 시간Observables과 Flowables 상호관의 연동배압관련 함수들Flowable.generate() 사용
SwitchMap()flatMap과 비슷하지만 큰 차이점은 제일 마지막 것만 배출 한다는 차이가 있다.예제를 보자.public static void main(String[] args) { Observable items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta", "Eta", "Theta", "Iota"); Observable processings = items.concatMap( s -> Observable.just(s).delay(randomSleepTime() , TimeUnit.MILLISECONDS)); processings.subscribe( System.out::println); sleep( 20000 ); }..
Throttling정해진 시간동안 발생된 걸 무시를 할 수 있는 함수이다.ThrottleLast() : 정해진 시간안에 마지막 내용만 리턴한다.ThrottleFirst() : 정해진 시간안에 처음 내용만 리턴한다.ThrottleWithTimeout() : debuoune 와 유사하며 일정시간동안 모든 아이템은 무시한다.우선 일반적인 형태를 보자.Observable source1 = Observable.interval(100 , TimeUnit.MILLISECONDS) .map(i-> (i+1) * 100) .map(i-> "Source 1 " + i) .take(10); Observable source2 = Observable.interval(300 , TimeUnit.MILLISECONDS) .map(..
Windowing이전 장에서 배운 buffering과 비슷한 형태로 배열을 나누어서 다시 배열로 만들어주는 것이지만 하나의 큰 차이점은 있다.바로 결과가 Observable형태로 나오는 것이다. map과 flatMap의 차이점으로 보면 된다.정해진 사이즈의 WindowingObservable.range(1 , 50) .window(8) .flatMapSingle(obs -> //reduce 더하기를 진행한다. obs.reduce( "" , (total ,next) -> total + (total.equals("") ? "" : "|") + next ) ) .subscribe(System.out::println); ....... 1|2|3|4|5|6|7|8 9|10|11|12|13|14|15|16 17|1..
Buffering 은 주어진 배열을 지정된 갯수로 나누는 역할을 하게 된다.io.reactivex.Observable.range(1,50) .buffer(8) .subscribe(System.out::println); ....... [1, 2, 3, 4, 5, 6, 7, 8] [9, 10, 11, 12, 13, 14, 15, 16] [17, 18, 19, 20, 21, 22, 23, 24] [25, 26, 27, 28, 29, 30, 31, 32] [33, 34, 35, 36, 37, 38, 39, 40] [41, 42, 43, 44, 45, 46, 47, 48] [49, 50] 8개씩 나누는 과정에 깔끔하게 떨어지지 않으면 남은 만큼만 배열로 만들어서 보여주고 있다.그리고 2번째 인자로 bufferSu..
UnicastSubjectPublishSubject 와 굉장한 유사하지만 하나의 큰 차이점이 있다.Observer 가 subscribe 를 한 후부터 값이 배출이 되기 시작한다.예를 보자.public static void main(String[] args) { io.reactivex.subjects.Subject subject = UnicastSubject.create(); System.out.println(currentTime() + "옵저버 등록시작"); Observable.interval(300 , TimeUnit.MILLISECONDS) .map( l -> (( l + 1) * 300) + " milliseconds") .subscribe(subject); sleep(2000); subject.s..
