목록전체 글 (1495)
오늘도 공부
BlockingFirst() , BlockingSingle()이름 그대로 처음꺼만 가져오는 함수를 말한다.Observable source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta"); String firstItem = source.filter( s -> s.length() == 5).blockingFirst(); String singleItem = source.filter( s -> s.length() == 4).take(1).blockingSingle(); assertTrue( firstItem.equals("Alpha")); assertTrue( singleItem.equals("Beta")); 결과 : SuccessBlocingSing..
일반적으로 Observable 을 테스트시 Blocking을 사용한다.먼저 일반 코드를 작성하면 다음과 같을겁니다.AtomicInteger hitcount = new AtomicInteger(); Observable source = io.reactivex.Observable.interval( 1 , TimeUnit.SECONDS).take(5); source.subscribe( i -> hitcount.incrementAndGet()); assertTrue( hitcount.get() == 5); 여기에서 5개를 가져오는 일반적인 코드이다. 결과는 실패이다.이유는 interval 함수를 보면 thread가 computation 이다. 그래서 가져올수 없다.@CheckReturnValue @Schedule..
실제 연산자를 사용자가 구현이 가능하다.관련해서 자세한 내용은영문 : http://reactivex.io/documentation/implement-operator.html한글 : http://reactivex.io/documentation/ko/implement-operator.html일단 구현해보자.아래 doOnEmpty는 빈값이 들어올때 함수가 실행되는 단순한 연산자이다.public static ObservableOperator doOnEmpty(Action action) { return observer -> new DisposableObserver() { boolean isEmpty = true; @Override public void onNext(T value) { isEmpty = false;..
자신만의 커스텀 함수를 만들어서 사용시 공유되는 이슈가 발생된다.예를 들어 보자.인덱스를 넣고 값을 저장하는 IndexedValue데이터 클래스를 먼저 만든다.static final class IndexedValue { final int index; final T value; IndexedValue(int index , T value){ this.index = index; this.value = value; } @Override public String toString() { return index + " - " + value; } } 그리고 compose에 들어갈 커스텀 함수를 추가한다.원하는 결과값은 index - value 을 나오는게 목표이다.static ObservableTransformer w..
파라미터를 통해서 compose를 활용하는 방법에 대해서 알아보자.우선 다음과 같은 내용이 있다.Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon") .collect( StringBuilder::new , (b,s) -> { if ( b.length() == 0) b.append(s); else b.append("/").append(s); }) .map(StringBuilder::toString) .subscribe(System.out::println); ==================================================================== Alpha/Beta/Gamma/Delta/Epsilon 주어진 문자열에 ..
중복되어 있는 걸 커스텀 공통 함수로 빼고 compose로 활용하는 방법에 대해서 알아보자.public static ObservableTransformer toImmutableList() { return upstream -> upstream.collect(ImmutableList::builder , ImmutableList.Builder::add) .map(ImmutableList.Builder::build) //반드시 Single 또는 Observable로 리턴해야한다. //Flowable -> toFlowable(); .toObservable(); } 일단 공통 함수를 작성한다. 내용은 간단하다.소스가 들어오면 collect로 imutableList로 변형해서 다시 옵저버 ( or Flowable)로..
ObservableTransformer우선 compose를 사용하는 방법에 대해서 알아보자.compose를 사용하는 주된 이유는 어떠한 공통된 일련의 동작을 커스텀해서 변형할수 있다.우선 사용하기 전 형태를 살펴보자.Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon") .collect(ImmutableList::builder , ImmutableList.Builder::add) .map(ImmutableList.Builder::build) .subscribe(System.out::println); Observable.range(1 , 15) .collect(ImmutableList::builder , ImmutableList.Builder::add..
Flowable.generate()배압을 직접 관리가 가능하다. Flowable.generate 는 Consumer 을 구현하면 된다.예를 보자.public static void main(String[] args){ randomGenerator( 1, 10000) .subscribeOn(Schedulers.computation()) .doOnNext(i -> System.out.println("Emitting " + i)) .observeOn(Schedulers.io()) .subscribe(i -> { sleep(5); System.out.println("Received item : " + i); }); sleep(5000); } static Flowable randomGenerator(int min ..
만약 Backpressure없는 Flowable을 사용시 어떤 함수를 사용하면 좋은지 알아보자.우선 onBackpressureBuffer() 을 알아보자.복습을 하자면Flowable.interval( 1, TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .subscribe(i -> { sleep(5); System.out.println("Received MyItem : " + i); }); 을 사용시 배출이 생성하는 것을 못 따라주기 때문에 오류가 난다.Received MyItem : 21 Received MyItem : 22 Received MyItem : 23 Received MyItem : 24 io.reactivex.exceptions.OnErrorNot..
Flowable 을 생성시 BackpressureStrategy 을 사용이 가능하다.예제를 한번 보자.Flowable source = Flowable.create(emitter -> { for (int i=0;i Flowable 로 변경시에도 인자로 넣는게 가능하다.Observable source = Observable.range(1,1000); source.toFlowable(BackpressureStrategy.BUFFER) 하지만 조심해야 한다. Buffer는 초과시 OutOfMemory 오류가 발생한다.Flowable -> Observable 변경시에도 toObserable()으로 가능하다.Flowable integers = Flowable.range(1, 1000) .subscribeOn(Sch..
