목록스터디/RxJava2 (39)
올해는 머신러닝이다.
Collection 함수들RxJava에서 자주 쓰이는 함수중 하나인 Collection형태를 알아보자.우선 toList() 이다.여러개의 배출을 하나의 리스트로 만들어주는 것이다. 주의점은 리턴값이 Single 인것을 유념하자.아주 간단한 예제를 보자.ToListfun testToList(){ Observable.just("Alpha" , "Beta" , "Gamma" , "Delta" , "Epslion") .toList() //Single로 배출 .subscribeBy( onSuccess = { println(it) } ) } ==================================== [Alpha, Beta, Gamma, Delta, Epslion] 정렬을 원할땐 ToSortedList를 사용..
그룹핑하나의 Observable을 여러개의 옵저버로 패턴에 맞게 분리하는 방법에 대해서 알아보자.+4개의 문자 배열이 있다.+"Alpha" , "Beta" , "Delat" , "Epsilon" 이걸 문자 길이에 맞게끔 분리를 해보자.+val source = Observable.just("Alpha" , "Beta" , "Delat" , "Epsilon") val lengthGroupObservable = source.groupBy { it.length } //groupBy 가 중요함 lengthGroupObservable.flatMapSingle { it.toList() } .subscribeBy( onNext = { println(it) } ) ====================== [Beta] [..
Combinelatest두개의 리스트를 비교하는 걸 구현한다고 가정했을때 여러 가지 방법이 있을테지만그중에 conbinelatest 를 사용해서 리스트와 각각의 값을 비교하는 방법으로 진행해보자.다이어그램은 다음과 같다.val observable1 = Observable.fromArray("A" , "B" , "C" , "D") val observable2 = Observable.fromArray("E" , "C" , "B" , "G","F") Observables.combineLatest( observable1.toList().toObservable() , observable2 ){ list , value -> println("$list $value") if(list.contains(value)) va..
Lift 연산자연산자 오버로딩 하는 방법 줌 RxJava에서는 compose 와 lift 가 대표적으로 사용된다.compose 는 전 내용에서 설명해놓은 내용이 있다.Compose 활용Compose with parameter그럼 lift는 차이점이 무엇인가.Lift는 연산자를 오버로딩해서 새로 만드는 걸 목적으로 하며compose는 여러 연산자를 하나의 연산자로 만드는게 주 목적이라고 생각된다.우선 lift 구현 동작 부터 확인해보자. interface ObservableOperator { /** * Applies a function to the child Observer and returns a new parent Observer. * @PARAM observer the child Observer in..
Using코틀린에서 기본적으로 Use을 제공한다.myStream.use { // 자동으로 열고 닫는다. val line = readLine()}하지만 Rx에도 자원관리를 할 수 있는 방법이 있다. 바로 Using 이다.공식문서는 http://reactivex.io/documentation/operators/using.html 그럼 우선 RxKotlin 에서 어떻게 보이는 지 살펴보자.fun using(resourceSupplier: Callable, //첫번째 인자 (열고)sourceSupplier: Function, //세번째 인자 (닫기) disposer: Consumer): Observable { return using(resourceSupplier, sourceSupplier, disposer, ..
RxKotlin우선 dependencies을 추가를 하자. compile 'io.reactivex.rxjava2:rxjava:2.1.0' compile 'io.reactivex.rxjava2:rxkotlin:2.0.2' io.reactivex.rxjava2:rxkotlin 의 경우 코틀린으로 확장해서 따로 유틸을 만든 것이다.주소는 여기 클릭 해서 한번 보도록 하자.그리고 소스셋을 지정해서 폴더를 구분해주자.sourceSets { main.kotlin.srcDirs += 'src/main/kotlin' } 그럼 코틀린 파일을 새로 만들어서 테스트 해보자.1~10까지 돌면서 문자값으로 변경해주는 예제이다.import io.reactivex.Observable fun main(args : Array) { O..
쓰레드가 이어져서 나오는 걸 직렬화라고 한다.예를 보자.public static void main(String[] args) { Observable.range(1 , 10) .map(i -> intenseCalculation(i)) .subscribe(i->System.out.println("Received " + i + " " + LocalTime.now())); } public static T intenseCalculation(T value) { sleep(ThreadLocalRandom.current().nextInt(3000)); return value; } public static void sleep(int millis) { try { Thread.sleep(millis); } catch (In..
RxJava 2부터 Disposable 형태로 변경됐다.일반적인 형태로 많이 쓰는건 consumer 형태이다.예를 보자.Observable testObservable = Observable.range(1 , 10); Disposable disposable1 = testObservable .doOnDispose(() -> System.out.println("onDispse 1")) .doOnComplete(() -> System.out.println("onCompleted 1")) .subscribe(item -> System.out.println("emitted 1 -> " + item)); //consumer 로 진행 System.out.println( "isDisposed -> " + disposab..
Debugging 하는 방법RxJava 사용시 디버깅 라이버러리는frodo 를 사용하면 좋긴 한데 ( RxJava 1지원..ㅠ)https://github.com/android10/frodo.git일반적인 테스트 방법을 소개한다..doOnNext()를 활용하면 된다.예를 들어 다음과 같은 내용이 있다고 가정하자.주어진 3개의 내용에 알파벳을 뽑아내는 경우이다.TestObserver testObserver = new TestObserver(); Observable items = Observable.just("521934/2342/Foxtrot", "Bravo/12112/78886/Tango", "283242/4542/Whiskey/2348562"); items.concatMap( s -> Observable..
TestSchedulerTestObserver(or TestSubscriber)을 이용시await를 통해서 기다릴수 있지만 그러기엔 시간이 많이 걸릴수 있다.그래서 타임머신기능으로 시간을 미리 땡겨서 테스트가 가능하다.예를 들어 interval(10) 으로 10초뒤에 값을 체크를 하는 경우 10초를 기다리지 않고 10초 뒤로 시간 설정 후 바로 테스트 하게끔 하는 것이다.이 부분에 대해서 좀 더 공부가 필요해보인다.TestScheduler testScheduler = new TestScheduler(); TestObserver testObserver = new TestObserver(); Observable minTicker = Observable.interval(1, TimeUnit.MINUTES , ..
TestObserver , TestSubscriber이전 내용까지는 Blocking이 주된 내용이지만 이것만으로는 한계가 있다. 그래서 나온게 TestObserver 이다.TestObserver = Observable , Single , Maybe , Completable 에서 사용된다.TestSubscriber =Flowable 에서 사용된다.//우선 옵저버 생성 (5초동안 아이템을 배출) Observable source = Observable.interval(1, TimeUnit.SECONDS).take(5); TestObserver testObserver = new TestObserver(); //아직 구독이 안된건지 체크 testObserver.assertNotSubscribed(); //이제 구..
BlockingFirst() , BlockingSingle()이름 그대로 처음꺼만 가져오는 함수를 말한다.Observable source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta"); String firstItem = source.filter( s -> s.length() == 5).blockingFirst(); String singleItem = source.filter( s -> s.length() == 4).take(1).blockingSingle(); assertTrue( firstItem.equals("Alpha")); assertTrue( singleItem.equals("Beta")); 결과 : SuccessBlocingSing..
일반적으로 Observable 을 테스트시 Blocking을 사용한다.먼저 일반 코드를 작성하면 다음과 같을겁니다.AtomicInteger hitcount = new AtomicInteger(); Observable source = io.reactivex.Observable.interval( 1 , TimeUnit.SECONDS).take(5); source.subscribe( i -> hitcount.incrementAndGet()); assertTrue( hitcount.get() == 5); 여기에서 5개를 가져오는 일반적인 코드이다. 결과는 실패이다.이유는 interval 함수를 보면 thread가 computation 이다. 그래서 가져올수 없다.@CheckReturnValue @Schedule..
실제 연산자를 사용자가 구현이 가능하다.관련해서 자세한 내용은영문 : http://reactivex.io/documentation/implement-operator.html한글 : http://reactivex.io/documentation/ko/implement-operator.html일단 구현해보자.아래 doOnEmpty는 빈값이 들어올때 함수가 실행되는 단순한 연산자이다.public static ObservableOperator doOnEmpty(Action action) { return observer -> new DisposableObserver() { boolean isEmpty = true; @Override public void onNext(T value) { isEmpty = false;..
자신만의 커스텀 함수를 만들어서 사용시 공유되는 이슈가 발생된다.예를 들어 보자.인덱스를 넣고 값을 저장하는 IndexedValue데이터 클래스를 먼저 만든다.static final class IndexedValue { final int index; final T value; IndexedValue(int index , T value){ this.index = index; this.value = value; } @Override public String toString() { return index + " - " + value; } } 그리고 compose에 들어갈 커스텀 함수를 추가한다.원하는 결과값은 index - value 을 나오는게 목표이다.static ObservableTransformer w..
파라미터를 통해서 compose를 활용하는 방법에 대해서 알아보자.우선 다음과 같은 내용이 있다.Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon") .collect( StringBuilder::new , (b,s) -> { if ( b.length() == 0) b.append(s); else b.append("/").append(s); }) .map(StringBuilder::toString) .subscribe(System.out::println); ==================================================================== Alpha/Beta/Gamma/Delta/Epsilon 주어진 문자열에 ..
중복되어 있는 걸 커스텀 공통 함수로 빼고 compose로 활용하는 방법에 대해서 알아보자.public static ObservableTransformer toImmutableList() { return upstream -> upstream.collect(ImmutableList::builder , ImmutableList.Builder::add) .map(ImmutableList.Builder::build) //반드시 Single 또는 Observable로 리턴해야한다. //Flowable -> toFlowable(); .toObservable(); } 일단 공통 함수를 작성한다. 내용은 간단하다.소스가 들어오면 collect로 imutableList로 변형해서 다시 옵저버 ( or Flowable)로..
ObservableTransformer우선 compose를 사용하는 방법에 대해서 알아보자.compose를 사용하는 주된 이유는 어떠한 공통된 일련의 동작을 커스텀해서 변형할수 있다.우선 사용하기 전 형태를 살펴보자.Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon") .collect(ImmutableList::builder , ImmutableList.Builder::add) .map(ImmutableList.Builder::build) .subscribe(System.out::println); Observable.range(1 , 15) .collect(ImmutableList::builder , ImmutableList.Builder::add..
Flowable.generate()배압을 직접 관리가 가능하다. Flowable.generate 는 Consumer 을 구현하면 된다.예를 보자.public static void main(String[] args){ randomGenerator( 1, 10000) .subscribeOn(Schedulers.computation()) .doOnNext(i -> System.out.println("Emitting " + i)) .observeOn(Schedulers.io()) .subscribe(i -> { sleep(5); System.out.println("Received item : " + i); }); sleep(5000); } static Flowable randomGenerator(int min ..
Flowable 을 생성시 BackpressureStrategy 을 사용이 가능하다.예제를 한번 보자.Flowable source = Flowable.create(emitter -> { for (int i=0;i Flowable 로 변경시에도 인자로 넣는게 가능하다.Observable source = Observable.range(1,1000); source.toFlowable(BackpressureStrategy.BUFFER) 하지만 조심해야 한다. Buffer는 초과시 OutOfMemory 오류가 발생한다.Flowable -> Observable 변경시에도 toObserable()으로 가능하다.Flowable integers = Flowable.range(1, 1000) .subscribeOn(Sch..
만약 배출된 항목에 대해서 다시 조작하고 싶을 때 어떻게 하면 될까?이 방법에 대해서 subscriber 을 적용해서 진행할 수 있다.예를 보자.public static void main(String[] args){ Flowable.range(1, 100) .doOnNext( v -> System.out.println("push -> " + v)) .observeOn(Schedulers.io()) .map( i -> intenseCalculation(i)) .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { System.out.println( "onSubscribe" ); s.request(10); } @Ove..
Flowable.range(), Flowable.just() , Flowable.fromIterable(), Flowable.interval() 의 경우 배압(Backpressure) 이 설계가 되어있는 편이다.하지만 Interval() 의 경우 시간에 따른 배출이 되는 데 이 부분에서 논리적으로 배압이 되긴 힘들다.배출은 시간이 거리지만 이미 다운 스트림은 실행이 될수 있기 때문이다.예를 들어보자.public static void main(String[] args){ Flowable.interval(1, TimeUnit.MICROSECONDS) .doOnNext( v -> System.out.println("push -> " + v)) .observeOn(Schedulers.io()) .map( i -..
이제 배압(Back pressure) 을 해결 하기 위해 Flowable(RxJava2 부터 도입) 을 했다.한번 살펴보도록 하자.//Observable -> Flowable 변경 Flowable.range( 1, 999_999_999) .map(MyItem::new) .observeOn(Schedulers.io()) .subscribe(myItem -> { sleep(50); System.out.println("Received MyItem : " + myItem.id); }); sleep(Integer.MAX_VALUE); ''' Constructing MyItem 1 Constructing MyItem 2 Constructing MyItem 3 ... Constructing MyItem 127 Con..
Observable은 주로 푸시를 기반으로 하는 성격을 지닌다.예를 들어 하나의 쓰레드에서 1에서 999,999,999까지 숫자를 내보내는 경우 이상없는 지 체크 해보자.우선 옵저버를 만들고 푸시(배출)을 진행해보자.public static void main(String[] args){ Observable.range( 1, 999_999_999) .map(MyItem::new) .subscribe(myItem -> { sleep(50); System.out.println("Received MyItem : " + myItem.id); }); } static void sleep(long millseconds) { try{ Thread.sleep(millseconds); }catch (InterruptedEx..
다음 순서 예정입니다. 배압이란 무엇인가.Flowable 과 Subscriber에 대한 이해Flowable.create() 을 사용해보는 시간Observables과 Flowables 상호관의 연동배압관련 함수들Flowable.generate() 사용
SwitchMap()flatMap과 비슷하지만 큰 차이점은 제일 마지막 것만 배출 한다는 차이가 있다.예제를 보자.public static void main(String[] args) { Observable items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta", "Eta", "Theta", "Iota"); Observable processings = items.concatMap( s -> Observable.just(s).delay(randomSleepTime() , TimeUnit.MILLISECONDS)); processings.subscribe( System.out::println); sleep( 20000 ); }..
Throttling정해진 시간동안 발생된 걸 무시를 할 수 있는 함수이다.ThrottleLast() : 정해진 시간안에 마지막 내용만 리턴한다.ThrottleFirst() : 정해진 시간안에 처음 내용만 리턴한다.ThrottleWithTimeout() : debuoune 와 유사하며 일정시간동안 모든 아이템은 무시한다.우선 일반적인 형태를 보자.Observable source1 = Observable.interval(100 , TimeUnit.MILLISECONDS) .map(i-> (i+1) * 100) .map(i-> "Source 1 " + i) .take(10); Observable source2 = Observable.interval(300 , TimeUnit.MILLISECONDS) .map(..
Windowing이전 장에서 배운 buffering과 비슷한 형태로 배열을 나누어서 다시 배열로 만들어주는 것이지만 하나의 큰 차이점은 있다.바로 결과가 Observable형태로 나오는 것이다. map과 flatMap의 차이점으로 보면 된다.정해진 사이즈의 WindowingObservable.range(1 , 50) .window(8) .flatMapSingle(obs -> //reduce 더하기를 진행한다. obs.reduce( "" , (total ,next) -> total + (total.equals("") ? "" : "|") + next ) ) .subscribe(System.out::println); ....... 1|2|3|4|5|6|7|8 9|10|11|12|13|14|15|16 17|1..
Buffering 은 주어진 배열을 지정된 갯수로 나누는 역할을 하게 된다.io.reactivex.Observable.range(1,50) .buffer(8) .subscribe(System.out::println); ....... [1, 2, 3, 4, 5, 6, 7, 8] [9, 10, 11, 12, 13, 14, 15, 16] [17, 18, 19, 20, 21, 22, 23, 24] [25, 26, 27, 28, 29, 30, 31, 32] [33, 34, 35, 36, 37, 38, 39, 40] [41, 42, 43, 44, 45, 46, 47, 48] [49, 50] 8개씩 나누는 과정에 깔끔하게 떨어지지 않으면 남은 만큼만 배열로 만들어서 보여주고 있다.그리고 2번째 인자로 bufferSu..
UnicastSubjectPublishSubject 와 굉장한 유사하지만 하나의 큰 차이점이 있다.Observer 가 subscribe 를 한 후부터 값이 배출이 되기 시작한다.예를 보자.public static void main(String[] args) { io.reactivex.subjects.Subject subject = UnicastSubject.create(); System.out.println(currentTime() + "옵저버 등록시작"); Observable.interval(300 , TimeUnit.MILLISECONDS) .map( l -> (( l + 1) * 300) + " milliseconds") .subscribe(subject); sleep(2000); subject.s..