목록전체 글 (1489)
오늘도 공부
Throttling정해진 시간동안 발생된 걸 무시를 할 수 있는 함수이다.ThrottleLast() : 정해진 시간안에 마지막 내용만 리턴한다.ThrottleFirst() : 정해진 시간안에 처음 내용만 리턴한다.ThrottleWithTimeout() : debuoune 와 유사하며 일정시간동안 모든 아이템은 무시한다.우선 일반적인 형태를 보자.Observable source1 = Observable.interval(100 , TimeUnit.MILLISECONDS) .map(i-> (i+1) * 100) .map(i-> "Source 1 " + i) .take(10); Observable source2 = Observable.interval(300 , TimeUnit.MILLISECONDS) .map(..
Windowing이전 장에서 배운 buffering과 비슷한 형태로 배열을 나누어서 다시 배열로 만들어주는 것이지만 하나의 큰 차이점은 있다.바로 결과가 Observable형태로 나오는 것이다. map과 flatMap의 차이점으로 보면 된다.정해진 사이즈의 WindowingObservable.range(1 , 50) .window(8) .flatMapSingle(obs -> //reduce 더하기를 진행한다. obs.reduce( "" , (total ,next) -> total + (total.equals("") ? "" : "|") + next ) ) .subscribe(System.out::println); ....... 1|2|3|4|5|6|7|8 9|10|11|12|13|14|15|16 17|1..
Buffering 은 주어진 배열을 지정된 갯수로 나누는 역할을 하게 된다.io.reactivex.Observable.range(1,50) .buffer(8) .subscribe(System.out::println); ....... [1, 2, 3, 4, 5, 6, 7, 8] [9, 10, 11, 12, 13, 14, 15, 16] [17, 18, 19, 20, 21, 22, 23, 24] [25, 26, 27, 28, 29, 30, 31, 32] [33, 34, 35, 36, 37, 38, 39, 40] [41, 42, 43, 44, 45, 46, 47, 48] [49, 50] 8개씩 나누는 과정에 깔끔하게 떨어지지 않으면 남은 만큼만 배열로 만들어서 보여주고 있다.그리고 2번째 인자로 bufferSu..
UnicastSubjectPublishSubject 와 굉장한 유사하지만 하나의 큰 차이점이 있다.Observer 가 subscribe 를 한 후부터 값이 배출이 되기 시작한다.예를 보자.public static void main(String[] args) { io.reactivex.subjects.Subject subject = UnicastSubject.create(); System.out.println(currentTime() + "옵저버 등록시작"); Observable.interval(300 , TimeUnit.MILLISECONDS) .map( l -> (( l + 1) * 300) + " milliseconds") .subscribe(subject); sleep(2000); subject.s..
AsyncSubjectAsyncSubject는 소스 Observable로부터 배출된 마지막 값(만)을 배출하고 소스 Observable의 동작이 완료된 후에야 동작한다. (만약, 소스 Observable이 아무 값도 배출하지 않으면AsyncSubject역시 아무 값도 배출하지 않는다.)즉 쉽게 말하자면 onCompleted를 하기전 이벤트만 배출이 된다는 것이다. onCompleted가 호출이 되지 않으면 배출이 되지 않는다.io.reactivex.subjects.Subject subject = AsyncSubject.create(); subject.subscribe( v -> System.out.println("Observer 1: " + v) , Throwable::printStackTrace, ()..
ReplaySubject이전 시간에 공부한 replay() , cache() 기능을 기억하는가.이전에 배출한 모든 걸 캐싱해서 새로운 Observer등록시 캐싱된걸 다시 배출시키는 역할이다.ReplaySubject는 그 기능을 가진 Subject이다.즉 다시 말하자면 PublishSubject후에 .cache() 을 붙였다고 보면 된다.//ReplaySubject 선언 io.reactivex.subjects.Subject subject = ReplaySubject.create(); subject.subscribe( v -> System.out.println("Observer 1: " + v)); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNe..
BehaviorSubject주로 많이 쓰이는 subject중 하나이다.기능은 최근 마지막껄 캐싱해서 보여주는 subject이다.이전 시간에 배운 replay() , autoConnect() 을 기억하는가?이 BehaviorSubject는 이 2가지 기능들을 통합해놓은 거라고 보면된다.즉 PublishSubject후에 replay(1).autoConnect() 를 통합한 Subject 이다.그럼 예제를 한번 보자.//BehaviorSubject 선언 io.reactivex.subjects.Subject subject = BehaviorSubject.create(); subject.subscribe( v -> System.out.println("Observer 1: " + v)); subject.onNext..
PublishSubject통상 일반적인 subject 형태이다.onNext() , onComplete() , onError() 를 호출해서 사용이 가능하다.그럼 예로 보자.io.reactivex.subjects.Subject subject = PublishSubject.create(); subject.map(String::length) // 문자길이를 리턴하고 있다. .subscribe(System.out::println); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNext("Gamma"); subject.onComplete(); ........... 5 4 5 간단한 기본 형태라 어렵지는 않다.onComplete() 를 하는 순간 더이상 ..
Replay와 Caching멀티 캐스팅을 사용하면 여러 Observer 들 사이에서 공유되는 값을 캐시 할 수 있습니다.우선 Replay를 보자.이미 많은 예제사이트들을 보셨을 수도 있지만 이전에 배출한 값을 캐싱해서 가지고 있다가 새롭게 배출시 캐싱한 걸 같이 먼저 배출해준다.예제를 보자.Observable threeRandoms = Observable.interval(1,TimeUnit.SECONDS) .replay() .autoConnect(); //Observer 1 배출 threeRandoms.subscribe( i->System.out.println("Observer 1 : " + i)); sleep(3000); // 3초동안 잠자기 //Observer 2 배출 threeRandoms.subs..
refCount() 는 autoConnect(1) 와 유사하다.그럼 autoConnect와 refCount와 차이점은 무엇일까?차이점은 더이상 배출할 Observer가 없을시 refCount는 자동으로 자신을 해지를 하고 다시 새로운 Observer 이 오면 처음부터 자동으로 시작을 한다.autoConnect는 동작이 끝나면 dispose 가 되지만 다시 새로운게 오면 시작을 하진 않는다.Observable threeRandoms = Observable.interval(1,TimeUnit.SECONDS) .publish() .refCount(); //Observable 1 배출 threeRandoms.take(5).subscribe( i->System.out.println("Observer 1 : " +..
