목록스터디 (171)
오늘도 공부
Flowable.generate()배압을 직접 관리가 가능하다. Flowable.generate 는 Consumer 을 구현하면 된다.예를 보자.public static void main(String[] args){ randomGenerator( 1, 10000) .subscribeOn(Schedulers.computation()) .doOnNext(i -> System.out.println("Emitting " + i)) .observeOn(Schedulers.io()) .subscribe(i -> { sleep(5); System.out.println("Received item : " + i); }); sleep(5000); } static Flowable randomGenerator(int min ..
Flowable 을 생성시 BackpressureStrategy 을 사용이 가능하다.예제를 한번 보자.Flowable source = Flowable.create(emitter -> { for (int i=0;i Flowable 로 변경시에도 인자로 넣는게 가능하다.Observable source = Observable.range(1,1000); source.toFlowable(BackpressureStrategy.BUFFER) 하지만 조심해야 한다. Buffer는 초과시 OutOfMemory 오류가 발생한다.Flowable -> Observable 변경시에도 toObserable()으로 가능하다.Flowable integers = Flowable.range(1, 1000) .subscribeOn(Sch..
만약 배출된 항목에 대해서 다시 조작하고 싶을 때 어떻게 하면 될까?이 방법에 대해서 subscriber 을 적용해서 진행할 수 있다.예를 보자.public static void main(String[] args){ Flowable.range(1, 100) .doOnNext( v -> System.out.println("push -> " + v)) .observeOn(Schedulers.io()) .map( i -> intenseCalculation(i)) .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { System.out.println( "onSubscribe" ); s.request(10); } @Ove..
Flowable.range(), Flowable.just() , Flowable.fromIterable(), Flowable.interval() 의 경우 배압(Backpressure) 이 설계가 되어있는 편이다.하지만 Interval() 의 경우 시간에 따른 배출이 되는 데 이 부분에서 논리적으로 배압이 되긴 힘들다.배출은 시간이 거리지만 이미 다운 스트림은 실행이 될수 있기 때문이다.예를 들어보자.public static void main(String[] args){ Flowable.interval(1, TimeUnit.MICROSECONDS) .doOnNext( v -> System.out.println("push -> " + v)) .observeOn(Schedulers.io()) .map( i -..
이제 배압(Back pressure) 을 해결 하기 위해 Flowable(RxJava2 부터 도입) 을 했다.한번 살펴보도록 하자.//Observable -> Flowable 변경 Flowable.range( 1, 999_999_999) .map(MyItem::new) .observeOn(Schedulers.io()) .subscribe(myItem -> { sleep(50); System.out.println("Received MyItem : " + myItem.id); }); sleep(Integer.MAX_VALUE); ''' Constructing MyItem 1 Constructing MyItem 2 Constructing MyItem 3 ... Constructing MyItem 127 Con..
Observable은 주로 푸시를 기반으로 하는 성격을 지닌다.예를 들어 하나의 쓰레드에서 1에서 999,999,999까지 숫자를 내보내는 경우 이상없는 지 체크 해보자.우선 옵저버를 만들고 푸시(배출)을 진행해보자.public static void main(String[] args){ Observable.range( 1, 999_999_999) .map(MyItem::new) .subscribe(myItem -> { sleep(50); System.out.println("Received MyItem : " + myItem.id); }); } static void sleep(long millseconds) { try{ Thread.sleep(millseconds); }catch (InterruptedEx..
다음 순서 예정입니다. 배압이란 무엇인가.Flowable 과 Subscriber에 대한 이해Flowable.create() 을 사용해보는 시간Observables과 Flowables 상호관의 연동배압관련 함수들Flowable.generate() 사용
SwitchMap()flatMap과 비슷하지만 큰 차이점은 제일 마지막 것만 배출 한다는 차이가 있다.예제를 보자.public static void main(String[] args) { Observable items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta", "Eta", "Theta", "Iota"); Observable processings = items.concatMap( s -> Observable.just(s).delay(randomSleepTime() , TimeUnit.MILLISECONDS)); processings.subscribe( System.out::println); sleep( 20000 ); }..
Throttling정해진 시간동안 발생된 걸 무시를 할 수 있는 함수이다.ThrottleLast() : 정해진 시간안에 마지막 내용만 리턴한다.ThrottleFirst() : 정해진 시간안에 처음 내용만 리턴한다.ThrottleWithTimeout() : debuoune 와 유사하며 일정시간동안 모든 아이템은 무시한다.우선 일반적인 형태를 보자.Observable source1 = Observable.interval(100 , TimeUnit.MILLISECONDS) .map(i-> (i+1) * 100) .map(i-> "Source 1 " + i) .take(10); Observable source2 = Observable.interval(300 , TimeUnit.MILLISECONDS) .map(..
Windowing이전 장에서 배운 buffering과 비슷한 형태로 배열을 나누어서 다시 배열로 만들어주는 것이지만 하나의 큰 차이점은 있다.바로 결과가 Observable형태로 나오는 것이다. map과 flatMap의 차이점으로 보면 된다.정해진 사이즈의 WindowingObservable.range(1 , 50) .window(8) .flatMapSingle(obs -> //reduce 더하기를 진행한다. obs.reduce( "" , (total ,next) -> total + (total.equals("") ? "" : "|") + next ) ) .subscribe(System.out::println); ....... 1|2|3|4|5|6|7|8 9|10|11|12|13|14|15|16 17|1..
Buffering 은 주어진 배열을 지정된 갯수로 나누는 역할을 하게 된다.io.reactivex.Observable.range(1,50) .buffer(8) .subscribe(System.out::println); ....... [1, 2, 3, 4, 5, 6, 7, 8] [9, 10, 11, 12, 13, 14, 15, 16] [17, 18, 19, 20, 21, 22, 23, 24] [25, 26, 27, 28, 29, 30, 31, 32] [33, 34, 35, 36, 37, 38, 39, 40] [41, 42, 43, 44, 45, 46, 47, 48] [49, 50] 8개씩 나누는 과정에 깔끔하게 떨어지지 않으면 남은 만큼만 배열로 만들어서 보여주고 있다.그리고 2번째 인자로 bufferSu..
UnicastSubjectPublishSubject 와 굉장한 유사하지만 하나의 큰 차이점이 있다.Observer 가 subscribe 를 한 후부터 값이 배출이 되기 시작한다.예를 보자.public static void main(String[] args) { io.reactivex.subjects.Subject subject = UnicastSubject.create(); System.out.println(currentTime() + "옵저버 등록시작"); Observable.interval(300 , TimeUnit.MILLISECONDS) .map( l -> (( l + 1) * 300) + " milliseconds") .subscribe(subject); sleep(2000); subject.s..
AsyncSubjectAsyncSubject는 소스 Observable로부터 배출된 마지막 값(만)을 배출하고 소스 Observable의 동작이 완료된 후에야 동작한다. (만약, 소스 Observable이 아무 값도 배출하지 않으면AsyncSubject역시 아무 값도 배출하지 않는다.)즉 쉽게 말하자면 onCompleted를 하기전 이벤트만 배출이 된다는 것이다. onCompleted가 호출이 되지 않으면 배출이 되지 않는다.io.reactivex.subjects.Subject subject = AsyncSubject.create(); subject.subscribe( v -> System.out.println("Observer 1: " + v) , Throwable::printStackTrace, ()..
ReplaySubject이전 시간에 공부한 replay() , cache() 기능을 기억하는가.이전에 배출한 모든 걸 캐싱해서 새로운 Observer등록시 캐싱된걸 다시 배출시키는 역할이다.ReplaySubject는 그 기능을 가진 Subject이다.즉 다시 말하자면 PublishSubject후에 .cache() 을 붙였다고 보면 된다.//ReplaySubject 선언 io.reactivex.subjects.Subject subject = ReplaySubject.create(); subject.subscribe( v -> System.out.println("Observer 1: " + v)); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNe..
BehaviorSubject주로 많이 쓰이는 subject중 하나이다.기능은 최근 마지막껄 캐싱해서 보여주는 subject이다.이전 시간에 배운 replay() , autoConnect() 을 기억하는가?이 BehaviorSubject는 이 2가지 기능들을 통합해놓은 거라고 보면된다.즉 PublishSubject후에 replay(1).autoConnect() 를 통합한 Subject 이다.그럼 예제를 한번 보자.//BehaviorSubject 선언 io.reactivex.subjects.Subject subject = BehaviorSubject.create(); subject.subscribe( v -> System.out.println("Observer 1: " + v)); subject.onNext..
PublishSubject통상 일반적인 subject 형태이다.onNext() , onComplete() , onError() 를 호출해서 사용이 가능하다.그럼 예로 보자.io.reactivex.subjects.Subject subject = PublishSubject.create(); subject.map(String::length) // 문자길이를 리턴하고 있다. .subscribe(System.out::println); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNext("Gamma"); subject.onComplete(); ........... 5 4 5 간단한 기본 형태라 어렵지는 않다.onComplete() 를 하는 순간 더이상 ..
Replay와 Caching멀티 캐스팅을 사용하면 여러 Observer 들 사이에서 공유되는 값을 캐시 할 수 있습니다.우선 Replay를 보자.이미 많은 예제사이트들을 보셨을 수도 있지만 이전에 배출한 값을 캐싱해서 가지고 있다가 새롭게 배출시 캐싱한 걸 같이 먼저 배출해준다.예제를 보자.Observable threeRandoms = Observable.interval(1,TimeUnit.SECONDS) .replay() .autoConnect(); //Observer 1 배출 threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i)); sleep(3000); // 3초동안 잠자기 //Observer 2 배출 threeRandoms.subs..
refCount() 는 autoConnect(1) 와 유사하다.그럼 autoConnect와 refCount와 차이점은 무엇일까?차이점은 더이상 배출할 Observer가 없을시 refCount는 자동으로 자신을 해지를 하고 다시 새로운 Observer 이 오면 처음부터 자동으로 시작을 한다.autoConnect는 동작이 끝나면 dispose 가 되지만 다시 새로운게 오면 시작을 하진 않는다.Observable threeRandoms = Observable.interval(1,TimeUnit.SECONDS) .publish() .refCount(); //Observable 1 배출 threeRandoms.take(5).subscribe( i->System.out.println("Observer 1 : " +..
앞에서 본 예제처럼 connect() 를 통해 동시 실행할 수 있다.하지만 자동으로 connect를 할수도 있다.주의 할 내용은 자동 연결(Auto connection) 은 사용할 때 상당한 주의를 요한다. 뜻하지 않게 배출시 제대로 되지 않는 경우가 발생될 수 있기 때문이다.그럼 예제를 살펴보자 .이전장에서 했던 2가지 서로 다른 배출 방법에 자동 연결만 추가한 부분이다.Observable threeRandoms = Observable.range(1,3) .map(i->randomInt()) .publish() .autoConnect(2); //2번에 대해서 자동 연결을 하겠다는 뜻이다.!! //Observable 1 배출 threeRandoms.subscribe( i->System.out.printl..
기본적인 Observable 생성 후 배출은 다음과 같습니다.Observable threeIntegers = Observable.range(1 , 3); threeIntegers.subscribe( i -> System.out.println("Observer 1 : " + i)); threeIntegers.subscribe( i -> System.out.println("Observer 2 : " + i)); -------------------------- Observer 1 : 1 Observer 1 : 2 Observer 1 : 3 Observer 2 : 1 Observer 2 : 2 Observer 2 : 3 보다시피 Observable 1 이 끝나고Observable 2가 다시 실행되는 구조로 되어..
일반적으로 Observable들은 cold상태인것들이다.RxJava를 하다보면 많이 듣는 게 cold와 hot Observable이다.도대체 이것들이 뭔가..Cold Observable일반적인 옵저버형태를 말한다.누가 구독해주지 않으면 데이터를 배출을 하지 않는다.일반적인 웹 요청 , 데이터베이스 쿼리등이 사용되며 내가 요청하면 결과를 받는 과정을 거친다.처음부터 발행하는 걸 기본으로 한다.Hot Observable구독자의 존재 여부와 상관없이 데이터를 배출하는 Observable 이다.그래서 여러 구독자에 선택적으로 고려가능하다.구독시점으로부터 발행하는 값을 받는 걸 기본으로 한다.마우스 이벤트 , 키보드 이벤트 , 시스템 이벤트등이 주로 사용된다.멀티캐스팅도 포함된다.데이터 발행자와 수신자도 이참에 ..