Collection 함수들

RxJava에서 자주 쓰이는 함수중 하나인 Collection형태를 알아보자.

우선 toList() 이다.

여러개의 배출을 하나의 리스트로 만들어주는 것이다. 주의점은 리턴값이 Single<T> 인것을 유념하자.

아주 간단한 예제를 보자.

ToList

fun testToList(){
    Observable.just("Alpha" , "Beta" , "Gamma" , "Delta" , "Epslion")
            .toList() //Single로 배출
            .subscribeBy(
                    onSuccess = {
                        println(it)
                    }
            )
}
====================================
[Alpha, Beta, Gamma, Delta, Epslion]

정렬을 원할땐 ToSortedList를 사용하면 된다.

ToSortedList

Observable.just(3,1,5,20,2,7)
    .toSortedList()
    .subscribeBy(
            onSuccess = {
                println(it)
            }
    )
====================================
[1, 2, 3, 5, 7, 20]

Key + Value 로 묶을수 있는 Map 도 제공한다.

ToMap

Observable.just("Alpha" , "Beta" , "Gamma" , "Delta" , "Epslion")
    .toMap { it.toCharArray()[0] }
    .subscribeBy(
            onSuccess = {
                println(it)
            }
)
// {A=Alpha, B=Beta, D=Delta, E=Epslion, G=Gamma}

ToMap 인자를 다르게

Observable.just("Alpha" , "Beta" , "Gamma" , "Delta" , "Epslion")
            .toMap({
                it.toCharArray()[0]
            },
            String::length,{
                ConcurrentHashMap()
            })
            .subscribeBy(
                    onSuccess = {
                        println(it)
                    }
            )
//{A=5, B=4, D=5, E=7, G=5}

만약 같은 키에 여러개의 중복된 값이 있는 경우 마지막 아이템이 나온다.

Observable.just("Alpha" , "Beta" , "Gamma" , "Delta" , "Epslion")
            .toMap(String::length)
            .subscribeBy(
                    onSuccess = {
                        println(it)
                    }
            )
// {4=Beta, 5=Delta, 7=Epslion}

그래서 이 부분을 수정하면 toMultiMap 이란것도 제공한다.

ToMultiMap

Observable.just("Alpha" , "Beta" , "Gamma" , "Delta" , "Epslion")
            .toMultimap(String::length)
            .subscribeBy(::println)

// {4=[Beta], 5=[Alpha, Gamma, Delta], 7=[Epslion]}


그룹핑

하나의 Observable을 여러개의 옵저버로 패턴에 맞게 분리하는 방법에 대해서 알아보자.

+

4개의 문자 배열이 있다.

+

"Alpha" , "Beta" , "Delat" , "Epsilon"

이걸 문자 길이에 맞게끔 분리를 해보자.

+

val source = Observable.just("Alpha" , "Beta" , "Delat" , "Epsilon")

val lengthGroupObservable = source.groupBy { it.length } //groupBy 가 중요함

lengthGroupObservable.flatMapSingle { it.toList() }
        .subscribeBy(
                onNext = {
                    println(it)
                }
        )
======================
[Beta]
[Alpha, Delat]
[Epsilon]

보는 것과 같이 문자열 수에 따라 나뉘어 진다.

+

반환형태는 GroupedObservable으로 나온다.

+

그리고 정렬시 키값을 가져올 수 있다.

+

여기서 말하는 키값은 length 가 될것이다.

+

val source = Observable.just("Alpha" , "Beta" , "Delat" , "Epsilon" , "Te")

val lengthGroupObservable = source.groupBy { it.length }

lengthGroupObservable.flatMapSingle {grp ->
    grp.reduce("" , { x,y ->
        if(x.isEmpty()){
            y
        }else{
            "$x,$y"
        }
    }).map {
       "${grp.key} : $it"
    }
}
.subscribeBy(
        onNext = {
            println(it)
        }
)
===================================
2 : Te
4 : Beta
5 : Alpha,Delat
7 : Epsilon

혹시 reduce관련 해서 기억이 안날수 있으니 다시 설명하자면

+

발행한 데이터를 모두 사용하여 어떤 최종적인 결과 데이터를 합성할 때 활용가능하다.

+

즉 보통 Observable에 입력된 데이터를 필요한 map으로 매핑하고 , 원하는 데이터만 추출할 때 는 불필요한 데이터를 걸러내는 filter() 함수를 호출한다. 또는 상황에 따라 발행된 데이터를 취합하여 어떤 결과를 만들어낼 때는 reduce 계열의 함수를 사용한다.

+

val sources = listOf("1","2","3")
Observable.fromIterable(sources).reduce{ value1 , value2 ->
    "$value2  ( $value1 )"
}.subscribe( ::println )

============================
3  ( 2  ( 1 ) )


Combinelatest

두개의 리스트를 비교하는 걸 구현한다고 가정했을때 여러 가지 방법이 있을테지만

그중에 conbinelatest 를 사용해서 리스트와 각각의 값을 비교하는 방법으로 진행해보자.

다이어그램은 다음과 같다.

val observable1 = Observable.fromArray("A" , "B" , "C" , "D")
    val observable2 = Observable.fromArray("E" , "C" , "B" , "G","F")

    Observables.combineLatest( observable1.toList().toObservable() , observable2 ){ list , value ->
                println("$list <> $value")
                if(list.contains(value)) value else ""
            }
            .filter { !it.isEmpty() }
            .subscribeBy(
                    onNext = {
                        println("value -> $it")
                    }
            )

====================
[A, B, C, D] <> E
[A, B, C, D] <> C
value -> C
[A, B, C, D] <> B
value -> B
[A, B, C, D] <> G
[A, B, C, D] <> F

우선 zip 과 combinelatest 성격은 합쳐주는데 있지만 조합하는 게 좀 다르다.

우선 zip을 보자,

순서대로 묶여서 나오는걸 볼 수 있다.

val observable1= Observable.interval(100 , TimeUnit.MILLISECONDS)
val observable2 = Observable.interval( 250 , TimeUnit.MILLISECONDS)

Observable.zip(observable1 , observable2 , BiFunction{ t1 : Long , t2 : Long ->
    "t1 : $t1 , t2 : $t2"
}).subscribe{
    println("값 -> $it")
}

Thread.sleep(1000)
=======================
값 -> t1 : 0 , t2 : 0
값 -> t1 : 1 , t2 : 1
값 -> t1 : 2 , t2 : 2
값 -> t1 : 3 , t2 : 3

그럼 combinelatest를 보자.

val observable1= Observable.interval(100 , TimeUnit.MILLISECONDS)
val observable2 = Observable.interval( 250 , TimeUnit.MILLISECONDS)

Observable.combineLatest(observable1 , observable2 , BiFunction{ t1 : Long , t2 : Long ->
    "t1 : $t1 , t2 : $t2"
}).subscribe{
    println("값 -> $it")
}

Thread.sleep(1000)
======================
값 -> t1 : 1 , t2 : 0  
값 -> t1 : 2 , t2 : 0 <- 먼저 나온것과 이전 나온것을 묶여서 보여주는 걸 볼 수 있다.
값 -> t1 : 3 , t2 : 0
값 -> t1 : 4 , t2 : 0
값 -> t1 : 4 , t2 : 1
값 -> t1 : 5 , t2 : 1
값 -> t1 : 6 , t2 : 1
값 -> t1 : 6 , t2 : 2
값 -> t1 : 7 , t2 : 2
값 -> t1 : 8 , t2 : 2


Lift 연산자

연산자 오버로딩 하는 방법 줌 RxJava에서는 compose 와 lift 가 대표적으로 사용된다.

compose 는 전 내용에서 설명해놓은 내용이 있다.

그럼 lift는 차이점이 무엇인가.

Lift는 연산자를 오버로딩해서 새로 만드는 걸 목적으로 하며

compose는 여러 연산자를 하나의 연산자로 만드는게 주 목적이라고 생각된다.

우선 lift 구현 동작 부터 확인해보자.

interface ObservableOperator<Downstream, Upstream> {
/**
* Applies a function to the child Observer and returns a new
parent Observer.
* @PARAM observer the child Observer instance
* @return the parent Observer instance
* @THROWS Exception on failure
*/
@NonNull
@Throws(Exception::class)
fun apply(@NonNull observer: Observer<in Downstream>):
Observer<in Upstream>;
}

그럼 Lift 실제 사용법은 어떤지 확인해보자.

우선 예제 목적은 각각의 수에 앞에 번호를 붙어는 간단한 예제이다.

//T 형태의 내용을 받아서 Pair 로 변경해서 보내는 예제이다.
class AddSerialNumber<T> : ObservableOperator<Pair<Int , T>, T> {
override fun apply(observer: Observer<in Pair<Int, T>>): Observer<in T> {
val counter = AtomicInteger()

return object : Observer<T> {
override fun onSubscribe(d: Disposable) {
println("onSubscribe")
observer.onSubscribe(d)
}

override fun onNext(t: T) {
println("onNext")
observer.onNext( counter.incrementAndGet() to t)
}

override fun onComplete() {
println("onComplete #1")
observer.onComplete()
}

override fun onError(e: Throwable) {
println("onError #1")
observer.onError(e)
}

}
}
}
Observable.range(10,20)
.lift(AddSerialNumber<Int>()) //<-- 리프트 연산자 추가
.subscribeBy(
onNext = {
println("Next $it")
},
onError = {
it.printStackTrace()
},
onComplete = {
println("Completed")
}
)

==============================================
onSubscribe
onNext
Next (1, 10)
onNext
Next (2, 11)
onNext
Next (3, 12)
onNext
Next (4, 13)
onNext
Next (5, 14)
onNext
Next (6, 15)
onNext
Next (7, 16)
onNext
Next (8, 17)
onNext
Next (9, 18)
onNext
Next (10, 19)
onNext
Next (11, 20)
onNext
Next (12, 21)
onNext
Next (13, 22)
onNext
Next (14, 23)
onNext
Next (15, 24)
onNext
Next (16, 25)
onNext
Next (17, 26)
onNext
Next (18, 27)
onNext
Next (19, 28)
onNext
Next (20, 29)
onComplete #1
Completed

이런식으로 앞에 번호를 붙일수가 있다.

Using

코틀린에서 기본적으로 Use을 제공한다.

myStream.use { // 자동으로 열고 닫는다.
val line = readLine()
}

하지만 Rx에도 자원관리를 할 수 있는 방법이 있다. 바로 Using 이다.

공식문서는 

http://reactivex.io/documentation/operators/using.html


그럼 우선 RxKotlin 에서 어떻게 보이는 지 살펴보자.

fun <T, D> using(
resourceSupplier: Callable<out D>, //첫번째 인자 (열고)
sourceSupplier: Function<in D, //두번째 인자 (값 만들고)
out ObservableSource<out T>>, //세번째 인자 (닫기)
disposer: Consumer<in D>): Observable<T> {
return using(resourceSupplier, sourceSupplier, disposer, true)
}

분해해보면 단순하다. 열어서 처리하고 닫으면 된다.

그럼 한번 사용을 해보자.

우선 닫을수 있는 Resource 클래스를 정의하자

class Resource : Closeable {
init {
println("Resource Created")
}

fun open() : Resource{
println("Resource Opened")
return this
}
val data : String = "Hello World"

override fun close() {
println("Resource closed")
}
}

이 걸 테스트 해보자.

@Test
fun testResource() {
Observable.using(
{
Resource().open() //열고
} ,
{ resource : Resource ->
Observable.just(resource) //실행
},
{ resource: Resource ->
resource.close() //닫고
}
).subscribe{
println("Resource Result -> ${it.data}")
}
}
===========
Resource Created
Resource Opened
Resource Result -> Hello World
Resource closed

잘 실행되는 걸 볼 수 있다. 의외로 간단한 사용에 놀랍다.

리소스 관리가 필요한 DB 또는 스트림에서 유용해보인다.

RxKotlin

우선 dependencies을 추가를 하자.

 compile 'io.reactivex.rxjava2:rxjava:2.1.0'
 compile 'io.reactivex.rxjava2:rxkotlin:2.0.2'
io.reactivex.rxjava2:rxkotlin

의 경우 코틀린으로 확장해서 따로 유틸을 만든 것이다.

주소는 여기 클릭 해서 한번 보도록 하자.

그리고 소스셋을 지정해서 폴더를 구분해주자.

sourceSets {
     main.kotlin.srcDirs += 'src/main/kotlin'
}

그럼 코틀린 파일을 새로 만들어서 테스트 해보자.

1~10까지 돌면서 문자값으로 변경해주는 예제이다.

import io.reactivex.Observable

fun main(args : Array<String>) {
    Observable.range(1 , 10)
            .map { "Number : $it" }
            .subscribe(System.out::println)
}
=======================
Number : 1
Number : 2
Number : 3
Number : 4
Number : 5
Number : 6
Number : 7
Number : 8
Number : 9
Number : 10

또 다른 Scan 을 테스트 해보자.

fun testScan(){
    Observable.range(1 , 10)
            .scan(0) { total , next -> total + next} //람다를 이용한 축
            .subscribe(System.out::println)
}
=====================
0
1
3
6
10
15
21
28
36
45
55


Extension Operators

코틀린은 확장시 정말 편하게 가능하다.

스위프트도 더 편한것 같다.

RxJava 에서 확장 하는compose 나 life 을 대체할수 있다.

toSet으로 확장해서 사용하는 예제이다.

fun test_extenstion_toset(){
    val source = Observable.range(1, 10)

    val asSet = source.toSet()
    asSet.subscribe( { s -> System.out.println( s ) } )
}
//toSet 으로 확장하기.
fun <T> Observable<T>.toSet() = collect(
                                            { HashSet<T>() } , // HashSet으로 초기화
                                            { set , next -> set.add( next )} // set에 next를 추가
                                        )
                                .map { it as Set<T> } // 다시 Set으로 변경

==========================================
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Generic Type으로 말고 타겟을 직접 정의도 충분히 가능하다.

아래 예제는 주어진 값을 총합으로 구한다.

fun test_extenstion_sum(){
    val source = Observable.just( 100 , 50 , 250 , 150)

    val total = source.sum();
    total.subscribe( { s -> System.out.println( s ) } )
}

fun Observable<Int>.sum() =
        reduce(0) { total , next -> total + next}

==========================================================
550

RxKotlin

RxJava에서 Kotlin 으로 변경시 조금 다른 형태를 받을수 있습니다.

만약 리스트를 받아서 그 형태 그대로 리스트 형태의 옵저버를 만들고 싶을때 사용하면 편하다.

import io.reactivex.rxkotlin.toObservable

fun main(args: Array<String>) {

     val myList = listOf("Alpha", "Beta", "Gamma", "Delta", 
     "Epsilon")

     myList.toObservable() //일반 리스트를 Observable로 변경할 수 있는 확장 함수 가능
             .map(String::length)
             .subscribe(::println)
 }

그 외에도 여러가지 함수들이 있다. 한번 꼭 들어가서 확인 해보자.

https://github.com/ReactiveX/RxKotlin

Extensions

Target TypeMethodReturn TypeDescription
BooleanArraytoObservable()ObservableTurns a Boolean array into an Observable
ByteArraytoObservable()ObservableTurns a Byte array into an Observable
ShortArraytoObservable()ObservableTurns a Short array into an Observable
IntArraytoObservable()ObservableTurns an Int array into an Observable
LongArraytoObservable()ObservableTurns a Long array into an Observable
FloatArraytoObservable()ObservableTurns an Float array into an Observable
DoubleArraytoObservable()ObservableTurns an Double array into an Observable
ArraytoObservable()ObservableTurns aTarray into an Observable
IntProgressiontoObservable()ObservableTurns anIntProgressioninto an Observable
IterabletoObservable()ObservableTurns anIterable<T>into an Observable
IteratortoObservable()ObservableTurns anIterator<T>into an Observable
ObservableflatMapSequence()ObservableFlat maps eachTemission to aSequenece<R>
Observable<Pair<A,B>>toMap()CollectsPair<A,B>emissions into aMap<A,B>
Observable<Pair<A,B>>toMultimap()CollectsPair<A,B>emissions into aMap<A,List<B>>
Observable<Observable>mergeAll()ObservableMerges all Observables emitted from an Observable
Observable<Observable>concatAll()ObservableCocnatenates all Observables emitted from an Observable
Observable<Observable>switchLatest()ObservableEmits from the last emitted Observable
Observable<*>cast()ObservableCasts all emissions to the reified type
Observable<*>ofType()ObservableFilters all emissions to only the reified type
Iterable<Observable>merge()Merges an Iterable of Observables into a single Observable
Iterable<Observable>mergeDelayError()Merges an Iterable of Observables into a single Observable, but delays any error
BooleanArraytoFlowable()FlowableTurns a Boolean array into an Flowable
ByteArraytoFlowable()FlowableTurns a Byte array into an Flowable
ShortArraytoFlowable()FlowableTurns a Short array into an Flowable
IntArraytoFlowable()FlowableTurns an Int array into an Flowable
LongArraytoFlowable()FlowableTurns a Long array into an Flowable
FloatArraytoFlowable()FlowableTurns an Float array into an Flowable
DoubleArraytoFlowable()FlowableTurns an Double array into an Flowable
ArraytoFlowable()FlowableTurns aTarray into an Flowable
IntProgressiontoFlowable()FlowableTurns anIntProgressioninto an Flowable
IterabletoFlowable()FlowableTurns anIterable<T>into an Flowable
IteratortoFlowable()FlowableTurns anIterator<T>into an Flowable
FlowableflatMapSequence()FlowableFlat maps eachTemission to aSequenece<R>
Flowable<Pair<A,B>>toMap()CollectsPair<A,B>emissions into aMap<A,B>
Flowable<Pair<A,B>>toMultimap()CollectsPair<A,B>emissions into aMap<A,List<B>>
Flowable<Flowable>mergeAll()FlowableMerges all Flowables emitted from an Flowable
Flowable<Flowable>concatAll()FlowableCocnatenates all Flowables emitted from an Flowable
Flowable<Flowable>switchLatest()FlowableEmits from the last emitted Flowable
Flowablecast()FlowableCasts all emissions to the reified type
FlowableofType()FlowableFilters all emissions to only the reified type
Iterable<Flowable>merge()Merges an Iterable of Flowables into a single Flowable
Iterable<Flowable>mergeDelayError()Merges an Iterable of Flowables into a single Flowable, but delays any error
TtoSingle()SingleTurns anyTitem into aSingle<T>
FuturetoSingle()SingleTurns aFuture<T>into aSingle<T>
CallabletoSingle()SingleTurns aCallable<T>into aSingle<T>
() -> TtoSingle()SingleTurns a() -> Tinto aSingle<T>
Singlecast()SingleCasts all emissions to the reified type
Observable<Single>mergeAllSingles()ObservableMergaes all Singles emitted from an Observable
Flowable<Single>mergeAllSingles()FlowableMergaes all Singles emitted from a Flowable
T?.toMaybe()toMaybe()MaybeTurns a nullableTvalue into aMaybe<T>that will only emit if not null
FuturetoMaybe()MaybeTurns aFuture<T>into aMaybe<T>
CallabletoMaybe()MaybeTurns aCallable<T>into aMaybe<T>
() -> TtoMaybe()MaybeTurns a() -> Tinto aMaybe<T>
Maybecast()MaybeCasts any emissions to the reified type
MaybeofType()MaybeFilters any emission that is the reified type
Observable<Maybe>mergeAllMaybes()ObservableMerges all emitted Maybes
Flowable<Maybe>mergeAllMaybes()FlowableMerges all emitted Maybes
ActiontoCompletable()CompletableTurns anActioninto aCompletable
CallabletoCompletable()CompletableTurns aCallableinto aCompletable
FuturetoCompletable()CompletableTurns aFutureinto aCompletable
(() -> Any)toCompletable()CompletableTurns a(() -> Any)into aCompletable
ObservablemergeAllCompletables()Completable>Merges all emitted Completables
FlowablemergeAllCompletables()CompletableMerges all emitted Completables
ObservablesubscribeBy()DisposableAllows named arguments to construct an Observer
FlowablesubscribeBy()DisposableAllows named arguments to construct a Subscriber
SinglesubscribeBy()DisposableAllows named arguments to construct a SingleObserver
MaybesubscribeBy()DisposableAllows named arguments to construct a MaybeObserver
CompletablesubscribeBy()DisposableAllows named arguments to construct a CompletableObserver
ObservableblockingSubscribeBy()UnitAllows named arguments to construct a blocking Observer
FlowableblockingSubscribeBy()UnitAllows named arguments to construct a blocking Subscriber
DisposableaddTo()DisposableAdds aDisposableto the specifiedCompositeDisposable
CompositeDisposableplusAssign()DisposableOperator function to add aDisposableto thisCompositeDisposable


RxKotlin SAM(Single Abstract Methods) Ambiguity

아쉽게도 RxJava 만들시 코틀린을 염두에 두고 만들지는 못 했다. 그래서 RxKotlin 에서도 SAM 이슈는 발생하고 있다.

SAM 이슈 관련 문서 링크 ( 태환님 블로그 )

우선 문제가 생기는 코드를 보자. 자주 쓰는 Zip이므로 꼭 이해를 해야 한다.

fun test_sam1(){
    val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
    val numbers = Observable.range(1,4)

    val result = Observable.zip( source , numbers) { s , n -> "$s $n"} //오류 발생한다. 

    result.subscribe(::println)
}

위와 같이 BiFunction<String,Int,String>을 찾을 수 없다고 나온다.

그러면 수정을 다시 하면 정상적으로 나온다.

val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

val result = Observable.zip( source , numbers , 
                                BiFunction<String,Int,String>{ s , n -> "$s $n"} //이렇게 BiFunction을 따로 지정
                            )
result.subscribe(::println)
===================
Alpha 1
Beta 2
Gamma 3
Delta 4

그런데 소스가 콜백함수로 지저분해진다. 보기가 싫어진다.

그래서 앞서 소개한 RxKotlin 라이버러리를 이용하면 보기 좋게 만들수 있다.

  • io.reactivex.rxkotlin.Observables
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

val result = io.reactivex.rxkotlin.Observables.zip( source , numbers) { s, n -> "$s $n"}

result.subscribe(::println)

그리고 zipWith을 확장해서 제공하고 있다.

val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

//    val result = io.reactivex.rxkotlin.Observables.zip( source , numbers) { s, n -> "$s $n"}
val result2 = source.zipWith( numbers ) { s, n -> "$s $n"} //zipWith는 따로 확장해서 사용중

result2.subscribe(::println)


let() 와 apply() 응용

Kotlin 에서 let 과 apply 는 아주 유용한 함수이다.

자세한 설명은 다음 링크 를 참조해서 보자.

val numbers = Observable.just( 180.0 , 160.0 , 140.0 , 100.0 , 120.0)

val average = numbers
        .map { it * 2 }
        .doOnNext({ println("doOnNExt => $it")} )
        .publish()
        .autoConnect(2)
        .let {
            val sum = it.doOnNext({ println("sum doOnNext => $it")}).reduce(0.0) { total , next -> total + next }
            val count = it.count()

            println("count -> "+ count)
            sum.doOnEvent( {d , e -> println("sum emmit -> $d")}).zipWith( count ) { s,c -> s/c }
        }
average.subscribeBy (::println )

============================

count -> io.reactivex.internal.operators.observable.ObservableCountSingle@7b1d7fff
doOnNExt => 360.0
sum doOnNext => 360.0
doOnNExt => 320.0
sum doOnNext => 320.0
doOnNExt => 280.0
sum doOnNext => 280.0
doOnNExt => 200.0
sum doOnNext => 200.0
doOnNExt => 240.0
sum doOnNext => 240.0
sum emmit -> 1400.0
280.0

Apply()

자체 함수를 실행할 수 있는 장점이 있다.

val statusObserver = PublishSubject.create<Long>()
statusObserver.subscribe{ println("Status Observer : $it")}

Observable.interval(1 , TimeUnit.SECONDS)
        .take(5)
        .publish()
        .autoConnect(2)
        .apply {
            subscribe(statusObserver) //Observable 클래스 내부에 있는 함수를 호출할 수 있다. 
        }
        .map{ it * 1000 }
        .subscribe{
            println("Main Observer : $it")
        }

Thread.sleep(7000);
============================================
Status Observer : 0
Main Observer : 0
Status Observer : 1
Main Observer : 1000
Status Observer : 2
Main Observer : 2000
Status Observer : 3
Main Observer : 3000
Status Observer : 4
Main Observer : 4000


Tuples and data classes

코틀린 함수로 Tuple 함수를 빨리 만드는 방법을 알아보자.

주어진 문자 , 숫자 를 뭉쳐서 Pair 로 만드는 예제이다.

val strings = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

Observables
.zip(strings , numbers) { s, n -> s to n} // Pair로 뭉쳐서 리턴
.subscribe{
println(it)
}
=================
(Alpha, 1)
(Beta, 2)
(Gamma, 3)
(Delta, 4)

위의 내용을 Data 클래스로 변경하면 더 보기가 편해진다.

val strings = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

data class StringAndNumber(val myString : String , val myNumber : Int) //Data Class 생성 후 만들기

Observables
.zip(strings , numbers) { s, n -> StringAndNumber(s , n)} // Pair로 뭉쳐서 리턴
.subscribe{
println(it)
}

=====================
StringAndNumber(myString=Alpha, myNumber=1)
StringAndNumber(myString=Beta, myNumber=2)
StringAndNumber(myString=Gamma, myNumber=3)
StringAndNumber(myString=Delta, myNumber=4)


 


쓰레드가 이어져서 나오는 걸 직렬화라고 한다.

예를 보자.

public static void main(String[] args) {
    Observable.range(1 , 10)
            .map(i -> intenseCalculation(i))
            .subscribe(i->System.out.println("Received " + i + " "  + LocalTime.now()));
}

public static <T> T intenseCalculation(T value) {
    sleep(ThreadLocalRandom.current().nextInt(3000));
    return value;
}

public static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
..........
Received 1 19:14:42.414
Received 2 19:14:44.454
Received 3 19:14:44.727
Received 4 19:14:47.028
Received 5 19:14:49.174
Received 6 19:14:49.207
Received 7 19:14:51.057
Received 8 19:14:53.875
Received 9 19:14:56.381
Received 10 19:14:57.741

약 15초가 걸렸습니다. 만약 평행하게(병렬화) 처리를 하고 싶은 경우에는 어떻게 처리를 해야 할까?

Observable.range(1,10)
       .flatMap(i -> Observable.just(i)
               .subscribeOn(Schedulers.computation())
               .map(i2 -> intenseCalculation(i2))
       )
       .subscribe(i -> System.out.println("Received " + i + " "
               + LocalTime.now() + " on thread "
               + Thread.currentThread().getName()));

sleep(20000);
.........
Received 1 19:28:11.163 on thread RxComputationThreadPool-1
Received 7 19:28:11.381 on thread RxComputationThreadPool-7
Received 9 19:28:11.534 on thread RxComputationThreadPool-1
Received 6 19:28:11.603 on thread RxComputationThreadPool-6
Received 8 19:28:11.629 on thread RxComputationThreadPool-8
Received 3 19:28:12.214 on thread RxComputationThreadPool-3
Received 4 19:28:12.961 on thread RxComputationThreadPool-4
Received 5 19:28:13.274 on thread RxComputationThreadPool-5
Received 2 19:28:13.374 on thread RxComputationThreadPool-2
Received 10 19:28:14.335 on thread RxComputationThreadPool-2

약 3초 정도 걸렸습니다. 이렇게 내부적으로 사용해서 병렬형태로 하기를 바란다.

RxJava 2부터 Disposable 형태로 변경됐다.

일반적인 형태로 많이 쓰는건 consumer 형태이다.

예를 보자.

Observable<Integer> testObservable = Observable.range(1 , 10);
Disposable disposable1 = testObservable
    .doOnDispose(() -> System.out.println("onDispse 1"))
    .doOnComplete(() -> System.out.println("onCompleted 1"))
    .subscribe(item -> System.out.println("emitted 1 -> " + item)); //consumer 로 진행

System.out.println( "isDisposed -> " + disposable1.isDisposed());
..........
emitted 1 -> 1
emitted 1 -> 2
emitted 1 -> 3
emitted 1 -> 4
emitted 1 -> 5
emitted 1 -> 6
emitted 1 -> 7
emitted 1 -> 8
emitted 1 -> 9
emitted 1 -> 10
onCompleted 1
isDisposed -> true <- 이미 dispose가 되었다.

consumer 사용시 doOnDispose 이벤트가 안 타지만 이미 dispose가 된걸 볼 수 있다.

하지만 이런 경우를 보자.

public static DisposableObserver<Integer> getDispose(int pos){
        DisposableObserver<Integer> disposableObserver = new DisposableObserver<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("emitted " + pos + " -> " + item);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        return disposableObserver;
    }
testObservable
    .doOnDispose(() -> System.out.println("onDispse 2"))
    .doOnComplete(() -> System.out.println("onCompleted 2"))
    .subscribe(getDispose(2));

disposableObser를 제공시 void 형태라서 메모리릭이 걸릴수 있다.

그럴때

.subscribe(getDispose(3));
  1. CompositeDispoable 을 호출해서 destory 시 clear를 해준다.
  2. .dispose() 를 해서 제거를 해준다.
Disposable disposable3 = testObservable
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onDispse 3");
            }
        })
        .doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onCompleted 3");
            }
        })
        .subscribeWith(getDispose(3));

compositeDisposable.add(disposable3);
compositeDisposable.clear(); // or disposable3.dispose();


추가적으로

Observable 와 Observer를 연결을 하는 경우에 만약 complete가 발생안되는 경우에는 계속 Observer는 배출 되기를 기다릴 수 있습니다. 이럴때 Dispose를 통해서 자원을 해제를 해주셔야 합니다. 다행히 completed가 발생하면 자원이 해제됩니다.

package io.reactivex.disposables;

    public interface Disposable {
      void dispose();
      boolean isDisposed();
    }

		import io.reactivex.Observable;
    import io.reactivex.disposables.Disposable;
    import java.util.concurrent.TimeUnit;

    public class Launcher {

      public static void main(String[] args) {

        Observable<Long> seconds =
          Observable.interval(1, TimeUnit.SECONDS);

        Disposable disposable =
          seconds.subscribe(l -> System.out.println("Received: " + l));

        //sleep 5 seconds
        sleep(5000);

        //dispose and stop emissions
        disposable.dispose();

        //sleep 5 seconds to prove
        //there are no more emissions
        sleep(5000);

      }

      public static void sleep(int millis) {
        try {
          Thread.sleep(millis);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
       }
     }

5초동안 interval로 실행했지만 추후 dispose 된 Observable는 더이상 배출이 안되는 걸 볼 수 있습니다. 기본적으로 disposable은 내부적으로 구독과 동시에 생성이 되서 어디에서든 사용해서 해제를 할 수 있도록 합니다.

Observer<Integer> myObserver = new Observer<Integer>() {
      private Disposable disposable;

      @Override
      public void onSubscribe(Disposable disposable) {
        this.disposable = disposable;
      }

      @Override
      public void onNext(Integer value) {
        //has access to Disposable
      }

      @Override
      public void onError(Throwable e) {
        //has access to Disposable
      }

      @Override
      public void onComplete() {
        //has access to Disposable
       }
     };

만약 이 Observer을 좀 더 확장을 원한다면 ResourceObserver를 만들어서 사용이 가능합니다. 확장된 observer는 subscribeWith로 통해서 구독을 할 수 있습니다.

import io.reactivex.Observable;
    import io.reactivex.disposables.Disposable;
    import io.reactivex.observers.ResourceObserver;
    import java.util.concurrent.TimeUnit;

    public class Launcher {
      public static void main(String[] args) {

        Observable<Long> source =
        Observable.interval(1, TimeUnit.SECONDS);

         ResourceObserver<Long> myObserver = new  
         ResourceObserver<Long>() {
          @Override
          public void onNext(Long value) {
            System.out.println(value);
          }

          @Override
          public void onError(Throwable e) {
            e.printStackTrace();
          }

          @Override
          public void onComplete() {
            System.out.println("Done!");
          }
        };

        //capture Disposable
        Disposable disposable = source.subscribeWith(myObserver);
       }
      }

Debugging 하는 방법

RxJava 사용시 디버깅 라이버러리는

frodo 를 사용하면 좋긴 한데 ( RxJava 1지원..ㅠ)

https://github.com/android10/frodo.git

일반적인 테스트 방법을 소개한다.

.doOnNext()를 활용하면 된다.

예를 들어 다음과 같은 내용이 있다고 가정하자.

주어진 3개의 내용에 알파벳을 뽑아내는 경우이다.

TestObserver<String> testObserver = new TestObserver<>();

Observable<String> items =
        Observable.just("521934/2342/Foxtrot",
                "Bravo/12112/78886/Tango",
                "283242/4542/Whiskey/2348562");

items.concatMap( s -> Observable.fromArray( s.split("/")))
     .filter(s -> s.matches("[A-Z]+"))
     .subscribe(testObserver);

System.out.println(testObserver.values());

testObserver.assertValues( "Foxtrot","Bravo","Tango","Whiskey" );

결과는 실패이고 내용은 다음과 같다.

Value count differs; Expected: 4 [Foxtrot, Bravo, Tango, Whiskey], Actual: 0 [] (latch = 0, values = 0, errors = 0, completions = 1)

예상 결과는 4개이지만 실제 0개이다.

그럼 디버깅을 시작해보자

우선 concatMap으로 제대로 보내는 지 확인해보자

.doOnNext( s-> System.out.println("Source pushed : " + s) )
.concatMap( s -> Observable.fromArray( s.split("/")))

doOnNext를 입력시

Source pushed : 521934/2342/Foxtrot
Source pushed : Bravo/12112/78886/Tango
Source pushed : 283242/4542/Whiskey/2348562

값이 제대로 전달되는 것으로 보인다..

그럼 다음껄 확인해보자

.concatMap( s -> Observable.fromArray( s.split("/")))
.doOnNext( s-> System.out.println("Source pushed : " + s) )

Result ================================================================

Source pushed : 521934
Source pushed : 2342
Source pushed : Foxtrot
Source pushed : Bravo
Source pushed : 12112
Source pushed : 78886
Source pushed : Tango
Source pushed : 283242
Source pushed : 4542
Source pushed : Whiskey
Source pushed : 2348562

정상적으로 나오는것 같다..그럼 다음껄..

.filter(s -> s.matches("[A-Z]+"))
.doOnNext( s-> System.out.println("Source pushed : " + s) )

Result ================================================================

값이 비어있다...여기가 문제인가보다. 그럼 수정 후 다시 해보자

.filter(s -> s.matches("[A-Za-z]+"))
.doOnNext( s-> System.out.println("Source pushed : " + s) )

Result ================================================================

Source pushed : Foxtrot
Source pushed : Bravo
Source pushed : Tango
Source pushed : Whiskey

이제 작동이 잘 된다..

최종 소스는 다음과 같다.

TestObserver<String> testObserver = new TestObserver<>();

Observable<String> items =
        Observable.just("521934/2342/Foxtrot",
                "Bravo/12112/78886/Tango",
                "283242/4542/Whiskey/2348562");

items
     .concatMap( s -> Observable.fromArray( s.split("/")))
     .filter(s -> s.matches("[A-Za-z]+"))
     .subscribe(testObserver);

testObserver.assertValues( "Foxtrot","Bravo","Tango","Whiskey" );

doOnNext 말고도 doOnError() , doOnComplete() , doOnSubscribe() 등이 있으니 활용하면 빠른 디버깅이 가능하다.

이상으로 디버깅을 마친다.

TestScheduler

TestObserver(or TestSubscriber)을 이용시await를 통해서 기다릴수 있지만 그러기엔 시간이 많이 걸릴수 있다.

그래서 타임머신기능으로 시간을 미리 땡겨서 테스트가 가능하다.

예를 들어 interval(10) 으로 10초뒤에 값을 체크를 하는 경우 10초를 기다리지 않고 10초 뒤로 시간 설정 후 바로 테스트 하게끔 하는 것이다.

이 부분에 대해서 좀 더 공부가 필요해보인다.

TestScheduler testScheduler = new TestScheduler();

TestObserver<Long> testObserver = new TestObserver<>();

Observable<Long> minTicker = Observable.interval(1, TimeUnit.MINUTES , testScheduler);

minTicker.subscribe(testObserver);

//30초 뒤로 이동
testScheduler.advanceTimeBy(30 , TimeUnit.SECONDS);

//앞으로 이동했기 때문에 아직 배출이 안됨
testObserver.assertValueCount(0);

//구독 후 뒤로 70초 이동..
testScheduler.advanceTimeTo(70 , TimeUnit.SECONDS);

//1분 이동했으니 1개가 나온다. 
testObserver.assertValueCount(1);

System.out.println("#1 current time = " + testScheduler.now(TimeUnit.SECONDS));

//구독후 90분 뒤로 감
testScheduler.advanceTimeTo(90, TimeUnit.MINUTES);

System.out.println("#2 current time = " + testScheduler.now(TimeUnit.SECONDS));

//결과값 : 90개가 나온다.
testObserver.assertValueCount(90);

now() 는 얼마나 시간을 앞당겼는지 체크를 할수 있다.

triggerAction() 이 무엇인지도 한번 찾아보자.

TriggerAction()

공식 설명은 이 스케줄러의 현재 시간 또는 그 이전에 아직 트리거되지 않았거나 트리거되도록 예정된 모든 작업을 트리거합니다.

Triggers any actions that have not yet been triggered and that are scheduled to be triggered at or before this Scheduler's present time

TestScheduler s = new TestScheduler();

s.createWorker().schedule(
        () -> System.out.println("Immediate"));
s.createWorker().schedule(
        () -> System.out.println("20s"),
        20, TimeUnit.SECONDS);

s.triggerActions();
System.out.println("Virtual time: " + s.now(TimeUnit.SECONDS));

결과값

Immediate
Virtual time: 0


TestObserver , TestSubscriber

이전 내용까지는 Blocking이 주된 내용이지만 이것만으로는 한계가 있다. 그래서 나온게 TestObserver 이다.

TestObserver = Observable , Single , Maybe , Completable 에서 사용된다.

TestSubscriber =Flowable 에서 사용된다.

//우선 옵저버 생성 (5초동안 아이템을 배출)
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS).take(5);

TestObserver<Long> testObserver = new TestObserver();

//아직 구독이 안된건지 체크
testObserver.assertNotSubscribed();

//이제 구독함
source.subscribe(testObserver);

//구독이 되었는지 확인
testObserver.assertSubscribed();

//종료(Terminate) 될 때까지 기다린다.
testObserver.awaitTerminalEvent();

//onCompleted가 과연 호출되었는지?
testObserver.assertComplete();

//에러가 없나?
testObserver.assertNoErrors();

//5개의 아이템을 배출했나?
testObserver.assertValueCount(5);

//5개의 예상되는 값 확인
testObserver.assertValues( 0L , 1L , 2L , 3L , 4L );

하지만 문제는 awaitTerminalEvent()를 통해 5초동안 기다려야 한다. 만약 시간이 길다면 어떻게 해야할까?

무작정 기다리면서 하는 건 테스트가 아니다..우린 빨리 결과를 받아야 한다. 그래서 나온게 타임머신 기능이 나왔다.

다음장에서 계속한다.

BlockingFirst() , BlockingSingle()

이름 그대로 처음꺼만 가져오는 함수를 말한다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");

String firstItem = source.filter( s -> s.length() == 5).blockingFirst();
String singleItem = source.filter( s -> s.length() == 4).take(1).blockingSingle();

assertTrue( firstItem.equals("Alpha"));
assertTrue( singleItem.equals("Beta"));

결과 : Success

BlocingSingle()은 결과값이 하나로 나와야 정상작동되는 점을 유의해야 한다. (그래서 take(1) 로 지정)

BlockingGet()

Maybe , Single 의 경우 blockingFirst()를 가지고 있지 않다. 하나 또는 없음을 배출할테니..

이럴때 BlockingGet() 을 사용해서 값을 가져오면 된다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");

//리스트를 만들어서 하나로 만들어줍니다.
List<String> sourceList = source.filter(s -> s.length() == 4).toList().blockingGet();

assertTrue( sourceList.equals( Arrays.asList( "Beta" , "Zeta")));

BlockingLast()

Observable , Flowable 에서 마지막 값을 리턴시 사용한다.

주의할점은 onComplete() 가 실행전에는 값을 리턴을 하지 않는다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");
String lastItemFromSource = source.filter(s -> s.length() == 4).blockingLast();

assertTrue(lastItemFromSource.equals("Zeta"));

주의점

BlockingFirst() , BlockingLast()사용시 아이템이 없으면 no emissions 예외 오류가 나니 기본값 설정을 해주는게 좋다.

만약 리스트 형태로 받고 싶을때 어떻게 할까?

그럴때 BlockingIterable() 사용하면 된다.

BlockingIterable()

값을 리스트형태로 받아준다. onComplete() 된 시점의 값들을 받아온다. 배압(Backpressure)이 없기 때문에 OutOfMemoryException이 발생될 수 있기 때문에 주의 해서 사용해야 한다.

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");
Iterable<String> sourceIterable = source.filter( s -> s.length() == 5).blockingIterable();

for(String s:sourceIterable) {
    assertTrue( s.length() == 5); // 5자리가 맞는지 루프 돌면서 체크
}

위와 같은 형태의 문제점은 아이템을 다 배출하고 테스트하는데 있다.

만약 아이템을 하나씩 받을때 직접 테스트 하고 싶은 경우 어떻게 해야 할까?

그런 경우 BlockingForEach() 을 사용하면 된다.

BlockingForEach()

Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta");

//하나씩 배출하면서 검사를 한다. 
source.filter(s->s.length() == 5).blockingForEach(s -> assertTrue(s.length() == 5));

BlockingNext() <- 좀 더 이해가 필요함

next된 것만 가져와서 테스트를 진행하는 것 같음..공부가 좀 더 필요함..

Observable<Long> source = Observable.interval(1 , TimeUnit.MICROSECONDS).take(1000);

Iterable<Long> iterable = source.blockingNext();

for(Long i : iterable) {
    System.out.println(i);
}

결과값

0
5
11
15
18
20
23
27
31
35
39
44
48
52
58
62
66
69
72
76
80
83
86
90
94
97
100
...

BlockingLastest() <- 좀 더 이해가 필요함

캡쳐되지 않는 값은 잊어버리고 ... 좀더 공부를 해야겠다.

Observable<Long> source = Observable.interval(1 , TimeUnit.MICROSECONDS).take(1000);

Iterable<Long> iterable = source.blockingLatest();

for(Long i : iterable){
    System.out.println(i);
}

출력값은

0
127
135
140
144
147
151
154
157
161
164
168
170
172
175
177
179
181
183
185
187
189
191
193
...

BlockingMostRecent()

TheblockingMostRecent()is similar toblockingLatest(), but it will re-consume the latest value repeatedly for everynext()

call from the iterator even if it was consumed already. It also requires adefaultValueargument so it has something to return if no value is emitted yet. Here, we useblockingMostRecent()against an Observableemitting every 10 milliseconds. The default value is-1, and it consumes each value repeatedly until the next value is provided:

Observable<Long> source = Observable.interval(10 , TimeUnit.MILLISECONDS).take(5);

//기본값이 필요하다.
Iterable<Long> iterable = source.blockingMostRecent( -1L );

System.out.println( "iterable" + iterable );

for(Long i : iterable) {
    System.out.println(i);
}

결과값

-1
-1
-1
...
0
0
0
...
1
1
1
...


일반적으로 Observable 을 테스트시 Blocking을 사용한다.

먼저 일반 코드를 작성하면 다음과 같을겁니다.

AtomicInteger hitcount = new AtomicInteger();
Observable<Long> source = io.reactivex.Observable.interval( 1 , TimeUnit.SECONDS).take(5);
source.subscribe( i -> hitcount.incrementAndGet());

assertTrue( hitcount.get() == 5);

여기에서 5개를 가져오는 일반적인 코드이다. 결과는 실패이다.

이유는 interval 함수를 보면 thread가 computation 이다. 그래서 가져올수 없다.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, TimeUnit unit) {
    return interval(period, period, unit, Schedulers.computation());
}

그럴때 방법이 2가지가 있지만 우선 Blocking해서 값을 가져와서 테스트 해보는 방법을 해보자.

AtomicInteger hitcount = new AtomicInteger();
Observable<Long> source = io.reactivex.Observable.interval( 1 , TimeUnit.SECONDS).take(5);
source.blockingSubscribe( i -> hitcount.incrementAndGet());

assertTrue( hitcount.get() == 5);

결과는 성공

위 코드에서 subscribe -> blockingSubscribe을 함으로써 값을 제대로 가져올수 있다.

쓰레드 상관없이 값을 가져오고 싶을땐 BlockingSubscribe을 써서 테스트가 가능하다.

주의사항으로

절대 테스트말고 다른 곳에선 blocking을 사용하지 말자. 해보면 이유를 바로 알수있다.

실제 연산자를 사용자가 구현이 가능하다.

관련해서 자세한 내용은

영문 : http://reactivex.io/documentation/implement-operator.html

한글 : http://reactivex.io/documentation/ko/implement-operator.html

일단 구현해보자.

아래 doOnEmpty는 빈값이 들어올때 함수가 실행되는 단순한 연산자이다.

public static <T> ObservableOperator<T,T> doOnEmpty(Action action) {
    return observer -> new DisposableObserver<T>() {
        boolean isEmpty = true;
        @Override
        public void onNext(T value) {
            isEmpty = false;
            System.out.println("doOnEmpty onNext");
            observer.onNext(value);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("doOnEmpty onError");
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            if(isEmpty){
                try {
                    System.out.println("doOnEmpty run");
                    action.run();
                }catch (Exception ex){
                    ex.printStackTrace();
                    onError(ex);
                    return;
                }
            }

            observer.onComplete();
        }
};

사용방법은

  1. 1~5부터 돌면서 빈값인지 체크 해서 Operation 1 : { 숫자 } 를 찍어주는 형태
  2. 빈값이 들어올때 동작되는 걸 체크 하는 형태
Observable.range( 1, 5)
        .lift(doOnEmpty(() ->  System.out.println("Operation 1 empty!")))
        .doOnNext( v -> System.out.println("Operation 1 : " + v))
        .test();

Observable.<Integer>empty()
        .lift(doOnEmpty(() -> System.out.println("Operation 2 Empty!")))
        .doOnNext( v -> System.out.println("Operation 2 : " + v))
        .test();

==========================================================================
doOnEmpty onNext
Operation 1 : 1
doOnEmpty onNext
Operation 1 : 2
doOnEmpty onNext
Operation 1 : 3
doOnEmpty onNext
Operation 1 : 4
doOnEmpty onNext
Operation 1 : 5
doOnEmpty run
Operation 2 Empty!

Operation 2 Empty! 가 찍혔는지가 중요하다.

마지막으로 나눠서 들어오는 경우 그걸 리스트로 변환해주는 연산자를 만들어보자

연산자 설명 : 들어온 값을 바로 배출(Push) 안하고 마지막에 한번에 배출하는 연산자..(toList() 와 유사하다.)

public static <T> ObservableOperator<List<T> , T> myToList(){
    return observer -> new DisposableObserver<T>() {
        ArrayList<T> list = new ArrayList<>();

        @Override
        public void onNext(T t) {
            //리스트에 추가만 하고 다운스트림(onNext) 내보내지 않는다.
            list.add(t);
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            //다운스트림으로 한번에 배출한다.
            observer.onNext(list);
            observer.onComplete();
        }
    };
}

이 연산자를 이제 사용해보자.

Observable<IndexedValue<String>> indexedValueObservable =
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(withIndex());

indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 1 : " + v))
        .test();
indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 2 : " + v))
        .test();

=========================================================================
Operation 1 onNext : [1, 2, 3, 4, 5]
Operation 2 onNext : []

값이 모아서 한번에 배출되고 있다.

만약 Observable이 아니라 Flowable로 변경시 FlowableOperator 로 변경 하면 된다.

public static <T> FlowableOperator<T,T> doOnEmpty(Action action) {
     return subscriber -> new DisposableSubscriber<T>() {
         boolean isEmpty = true;

         @Override
         public void onNext(T value) {
             isEmpty = false;
             subscriber.onNext(value);
         }

         @Override
         public void onError(Throwable t) {
             subscriber.onError(t);
         }

         @Override
         public void onComplete() {
             if (isEmpty) {
                 try {
                     action.run();
                 } catch (Exception e) {
                     onError(e);
                     return;
                 }
             }
             subscriber.onComplete();
         }
     };
 }


'스터디 > RxJava2' 카테고리의 다른 글

[Rxjava2]Blocking Functions  (0) 2017.10.22
[RxJava2]Test시 Blocking Subscriber 방법  (0) 2017.10.22
[RxJava] Transform에서 공유 부분 피하기  (0) 2017.10.22
[RxJava2] Compose with Parameters  (0) 2017.10.22
[RxJava2]Compose 활용  (0) 2017.10.22

자신만의 커스텀 함수를 만들어서 사용시 공유되는 이슈가 발생된다.

예를 들어 보자.

인덱스를 넣고 값을 저장하는 IndexedValue데이터 클래스를 먼저 만든다.

static final class IndexedValue<T> {
    final int index;
    final T value;

    IndexedValue(int index , T value){
        this.index = index;
        this.value = value;
    }

    @Override
    public String toString() {
        return index + " - " + value;
    }
}

그리고 compose에 들어갈 커스텀 함수를 추가한다.

원하는 결과값은 index - value 을 나오는게 목표이다.

static <T>ObservableTransformer<T,IndexedValue<T>> withIndex(){
    final AtomicInteger indexer = new AtomicInteger(-1);
    return upstream -> upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet() , v));
}

아래는 테스트 함수이다.

Observable<IndexedValue<String>> indexedValueObservable =
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(withIndex());

indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 1 : " + v))
        .test();
indexedValueObservable
        .doOnNext(v -> System.out.println("Subscriber 2 : " + v))
        .test();

============================================================================
Subscriber 1 : 0 - Alpha
Subscriber 1 : 1 - Beta
Subscriber 1 : 2 - Gamma
Subscriber 1 : 3 - Delta
Subscriber 1 : 4 - Epsilon
Subscriber 2 : 5 - Alpha
Subscriber 2 : 6 - Beta
Subscriber 2 : 7 - Gamma
Subscriber 2 : 8 - Delta
Subscriber 2 : 9 - Epsilon

위 결과에서 보듯이 값이 서로 공유되서 나오는 문제를 볼수 있다.

이유는 싱글 인스턴스인 AtomicInteger이기 때문이다.

이러한 문제를 해결하기 위해 defer 또는 fromCallable 을 추천한다.

static <T>ObservableTransformer<T,IndexedValue<T>> withIndex(){
    return upstream -> Observable.defer(() -> {
        final AtomicInteger indexer = new AtomicInteger(-1);
        return upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet() , v));
    });
}

이제 결과들이 공유되지 않는 걸 볼수 있다.

Subscriber 1 : 0 - Alpha
Subscriber 1 : 1 - Beta
Subscriber 1 : 2 - Gamma
Subscriber 1 : 3 - Delta
Subscriber 1 : 4 - Epsilon
Subscriber 2 : 0 - Alpha
Subscriber 2 : 1 - Beta
Subscriber 2 : 2 - Gamma
Subscriber 2 : 3 - Delta
Subscriber 2 : 4 - Epsilon

결론은 싱글 인스턴스 데이터를 사용시 공유되는 문제를 해결하기 위해 defer 또는 fromCallabe을 사용해서 오류를 피하자.

파라미터를 통해서 compose를 활용하는 방법에 대해서 알아보자.

우선 다음과 같은 내용이 있다.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .collect( StringBuilder::new , (b,s) -> {
            if ( b.length() == 0)
                b.append(s);
            else
                b.append("/").append(s);
        })
        .map(StringBuilder::toString)
        .subscribe(System.out::println);

====================================================================
Alpha/Beta/Gamma/Delta/Epsilon

주어진 문자열에 뒤에 "/" 을 추가를 해주는 아주 간단한 내용이다.

이 내용을 Compose로 변경시..

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(joinToString("/"))
                .subscribe(System.out::println);
public static ObservableTransformer<String , String> joinToString(String separator) {
    return upstream -> upstream
                        .collect( StringBuilder::new , (b,s) -> {
                            if ( b.length() == 0)
                                b.append(s);
                            else
                                b.append(separator).append(s);
                        })
                        .map(StringBuilder::toString)
                        .toObservable();
}

구분자를 넘겨서 공통함수로 뺄수 있다.

+

이런식으로 인자값을 넣어서 자주쓰는 함수를 자기만의 커스텀함수로 빼서 사용하도록 하자. (리팩토링도 할겸..)

하지만 주의 사항이 있다. 이러한 커스텀을 사용시 여러번의 Subscriber에서 자원을 공유하는 버그등이 발생될 수 있다.

이 부분에 대해서 피할 수 있는지 좀 더 알아보는 시간을 다음에 가져보자.

중복되어 있는 걸 커스텀 공통 함수로 빼고 compose로 활용하는 방법에 대해서 알아보자.

public static <T>ObservableTransformer<T , ImmutableList<T>> toImmutableList() {
        return upstream -> upstream.collect(ImmutableList::<T>builder , ImmutableList.Builder::add)
                .map(ImmutableList.Builder::build)
                //반드시 Single 또는 Observable로 리턴해야한다. 
                //Flowable -> toFlowable();
                .toObservable();
}

일단 공통 함수를 작성한다. 내용은 간단하다.

소스가 들어오면 collect로 imutableList로 변형해서 다시 옵저버 ( or Flowable)로 돌려준다.

그리고 해당 공통 함수에 다시 compose 에 이 추가된 내용을 넣어준다.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .compose(toImmutableList())
        .subscribe(System.out::println);

Observable.range(1 , 15)
        .compose(toImmutableList())
        .subscribe(System.out::println);

=================================================================
[Alpha, Beta, Gamma, Delta, Epsilon]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

코드가 줄어든게 보일것이다. 기본적인 활용은 이런식으로 만들 수 있다.

ObservableTransformer

우선 compose를 사용하는 방법에 대해서 알아보자.

compose를 사용하는 주된 이유는 어떠한 공통된 일련의 동작을 커스텀해서 변형할수 있다.

우선 사용하기 전 형태를 살펴보자.

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .collect(ImmutableList::builder , ImmutableList.Builder::add)
        .map(ImmutableList.Builder::build)
        .subscribe(System.out::println);

Observable.range(1 , 15)
        .collect(ImmutableList::builder , ImmutableList.Builder::add)
        .map(ImmutableList.Builder::build)
        .subscribe(System.out::println);

''''
[Alpha, Beta, Gamma, Delta, Epsilon]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

위 내용은 주어진 문자열을 collect 함수로 이용해서 imutableList( Guava 라이버러리 ) 형태로 변경하고 다시 그걸 Builder -> List로 변경해주는 보통 RxJava 함수이다.

.map(ImmutableList.Builder::build)

을 추가시 SingleObserver<ImmutableList> 형태로 받지만..

하지만 안하는 경우 SingleObserver<ImmutableList.Builder>

으로 받기 때문에 map을 넣어서 변형을 해준다.

그래서 저기 두개의 내용을 보게되면

.collect(ImmutableList::builder , ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)

이 2개의 부분에서 중복적으로 사용되는 걸 볼수 있다.

Flowable.generate()

배압을 직접 관리가 가능하다. Flowable.generate 는 Consumer<Emitter<T>> 을 구현하면 된다.

예를 보자.

public static void main(String[] args){
    randomGenerator( 1, 10000)
            .subscribeOn(Schedulers.computation())
            .doOnNext(i -> System.out.println("Emitting " + i))
            .observeOn(Schedulers.io())
            .subscribe(i -> {
                sleep(5);
                System.out.println("Received item : " + i);
            });

    sleep(5000);
}

static Flowable<Integer> randomGenerator(int min , int max) {
    return Flowable.generate( emitter -> emitter.onNext(ThreadLocalRandom.current().nextInt(min , max)));
}
............
 ...
 Emitting 8014
 Emitting 3112
 Emitting 5958
 Emitting 4834 //128th emission
 Received 9563
 Received 4359
 Received 9362
 ...
 Received 4880
 Received 3192
 Received 979 //96th emission
 Emitting 8268
 Emitting 3889
 Emitting 2595
...

128 -> 96 -> 96 순서로 이루어지고 있다.

주의할 점은 onNext 를 한번만 사용가능하다. 그 이상 사용시 IllgalStateException 이 발생된다.

public static void main(String[] args) {
        rangeReverse(100,-100)
                 .subscribeOn(Schedulers.computation())
                 .doOnNext(i -> System.out.println("Emitting " + i))
                 .observeOn(Schedulers.io())
                 .subscribe(i -> {
                     sleep(50);
                     System.out.println("Received " + i);
                 });
        sleep(50000);
     }
    static Flowable<Integer> rangeReverse(int upperBound, int lowerBound) {
         return Flowable.generate(() -> new AtomicInteger(upperBound + 1),
                 (state, emitter) -> {
                     int current = state.decrementAndGet();
                     emitter.onNext(current);
                     if (current == lowerBound)
                         emitter.onComplete();
                 }
         );

     }
.............
Emitting 100
 Emitting 99
 ...
 Emitting -25
 Emitting -26
 Emitting -27 //128th emission
 Received 100
 Received 99
 Received 98
 ...
 Received 7
 Received 6
 Received 5 // 96th emission
 Emitting -28
 Emitting -29
 Emitting -30

이런식으로 사용이 가능하다. Flowable.create() 사용보다 더 자세한 컨트롤이 가능하다.

Flowable 을 생성시 BackpressureStrategy 을 사용이 가능하다.

예제를 한번 보자.

Flowable<Integer> source = Flowable.create(emitter -> {
    for (int i=0;i<1000;i++) {
        if(emitter.isCancelled())
            return;

        emitter.onNext(i);
    }

    emitter.onComplete();    
}  , BackpressureStrategy.BUFFER); //BackpressureStrategy.BUFFER 을 인자로 넣어서 생성이 가능하다. 

source.observeOn(Schedulers.io())
        .subscribe(System.out::println);

sleep( 1000 );
..........
0
1
2
3
4
...

인자로 넣을 수 있는 건 5가지가 있다.

BackpressureStrategy설명
Missing배압을 적용안한다. 후에 onBackpressureXXX()등으로 컨트롤이 가능하다.
ErrorMissingBackpressureException 발생시 에러를 발생시킨다.
BufferonBackpressureBuffer() 와 같은 형태
Lastest다운스트림이 받을때까지 마지막꺼만 유지를 한다.

Observable -> Flowable 로 변경시에도 인자로 넣는게 가능하다.

Observable<Integer> source = Observable.range(1,1000);
source.toFlowable(BackpressureStrategy.BUFFER)

하지만 조심해야 한다. Buffer는 초과시 OutOfMemory 오류가 발생한다.

Flowable -> Observable 변경시에도 toObserable()으로 가능하다.

Flowable<Integer> integers =
                 Flowable.range(1, 1000)
                         .subscribeOn(Schedulers.computation());
        Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
                 .flatMap(s -> integers.map(i -> i + "-" + s)
                 //위로는 Backpressure 지원함                 
                 .toObservable())           
                 //아래로는 지원안함..만약 request를 할때에도 적용안되니 주의..(subscriber 참조)      
                 .subscribe(System.out::println);


만약 배출된 항목에 대해서 다시 조작하고 싶을 때 어떻게 하면 될까?

이 방법에 대해서 subscriber 을 적용해서 진행할 수 있다.

예를 보자.

public static void main(String[] args){
    Flowable.range(1, 100)
            .doOnNext( v -> System.out.println("push -> " + v))
            .observeOn(Schedulers.io())
            .map( i -> intenseCalculation(i))
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    System.out.println( "onSubscribe" );
                    s.request(10);
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
                    sleep( 50 );
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println( "onSubscribe" );
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });

    sleep( Long.MAX_VALUE );
}

public static <T> T intenseCalculation(T value){
    //최대 0.2초 sleep
    int time = ThreadLocalRandom.current().nextInt(200);
    System.out.println("sleep time : " + time);
    sleep( time );
    return value;
}

public static void sleep(long mills) {
    try{
        Thread.sleep( mills );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

...................
.....
push -> 124
push -> 125
push -> 126
push -> 127
push -> 128
onNext 1, thread -> RxCachedThreadScheduler-1
sleep time : 180
onNext 2, thread -> RxCachedThreadScheduler-1
sleep time : 165
onNext 3, thread -> RxCachedThreadScheduler-1
sleep time : 52
onNext 4, thread -> RxCachedThreadScheduler-1
sleep time : 63
......
onNext 95, thread -> RxCachedThreadScheduler-1
sleep time : 36
onNext 96, thread -> RxCachedThreadScheduler-1
push -> 129
push -> 130
push -> 131

위와 같이 Flowable (A) , Subscriber (B) 라고 가정하면

A(128개 배출) -> B(96개) -> A(96) -> .... 이런식으로 배출하는 걸 볼수 있다.

즉 subscriber 로 업스트림으로 진행이 가능하다.

자 그러면 이걸 활용을 어떤식으로 할수 있는지 보자.

Flowable.range(1, 1000)
        .doOnNext( v -> System.out.println("push -> " + v))
        .observeOn(Schedulers.io())
        .map( i -> intenseCalculation(i))
        .subscribe(new Subscriber<Integer>() {
            Subscription subscription;
            AtomicInteger count = new AtomicInteger(0);
            @Override
            public void onSubscribe(Subscription s) {
                this.subscription = s;
                System.out.println( "onSubscribe" );
                s.request(40);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println( "onNext " + integer +", thread -> "+ Thread.currentThread().getName());
                sleep( 50 );
                if( count.incrementAndGet() % 20 == 0 && count.get() >= 40) {
                    System.out.println("Requesting 20 more!!");
                    subscription.request(20);
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println( "onSubscribe" );
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
...........
onSubscribe
push -> 1
push -> 2
push -> 3
push -> 4
push -> 5
push -> 6
push -> 7
....
onNext 38, thread -> RxCachedThreadScheduler-1
sleep time : 128
onNext 39, thread -> RxCachedThreadScheduler-1
sleep time : 88
onNext 40, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
.....
sleep time : 74
onNext 59, thread -> RxCachedThreadScheduler-1
sleep time : 83
onNext 60, thread -> RxCachedThreadScheduler-1
Requesting 20 more!! <-------------------------------- 이 부분
sleep time : 38
onNext 61, thread -> RxCachedThreadScheduler-1
sleep time : 148
onNext 62, thread -> RxCachedThreadScheduler-1
sleep time : 159

위 내용은 배출이 되지만 40번을 끝내고 다시 request를 해서 20번을 더 진행하고 계속 요청하고 있다.

이 부분을 보면 무엇이 떠오를까.. 앱개발시 리스트 조회시 fetch를 좀 더 효율적으로 가능할 수 있는 방법이 떠오른다.

추후에 적용해볼수 있을것 같다.'

그리고 처음 push 아이템은 언제 실행될까?

128(push) -> 96(request) -> 96(push) ->.... 이런식으로 진행된다. 보면 볼수록 매력이 있다...

주의할 점은 Subscriber 을 통해서 업스트림이 되는 게 아니란 점...

단지 그 요청을 상류쪽에서 중계를 방법을 결정하는 것입니다.

Flowable.range()Flowable.just() , Flowable.fromIterable()Flowable.interval() 의 경우 배압(Backpressure) 이 설계가 되어있는 편이다.

하지만 Interval() 의 경우 시간에 따른 배출이 되는 데 이 부분에서 논리적으로 배압이 되긴 힘들다.

배출은 시간이 거리지만 이미 다운 스트림은 실행이 될수 있기 때문이다.

예를 들어보자.

public static void main(String[] args){
    Flowable.interval(1, TimeUnit.MICROSECONDS)
            .doOnNext( v -> System.out.println("push -> " + v))
            .observeOn(Schedulers.io())
            .map( i -> intenseCalculation(i))
            .subscribe( v->System.out.println("emit : " + v) , Throwable::printStackTrace);

    sleep( Long.MAX_VALUE );
}

public static <T> T intenseCalculation(T value){
    int time = ThreadLocalRandom.current().nextInt(3000);
    System.out.println("sleep time : " + time);
    sleep( time );
    return value;
}

public static void sleep(long mills) {
    try{
        Thread.sleep( mills );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

.................
o.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
    at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:96)
    at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
    ..............

위와 같이 푸시는 계속되고 있지만 순간적으로 배출이 되는 순간이 생긴다.

이렬 경우 MissingBackpressureException이 발생된다.

Flowable.interval() 을 제외한 나머지는 배압이 제대로 될 것이지만 시간관련된 이 함수는 오류가 생길수 있다.

이런 경우 onBackpressureDrop() 또는 onBackpressureBuffer() 을 사용해서 처리가 가능하다.

  1. onBackpressureDrop : Flowable(Observable)에서 항목을 보냈을때 바로 처리하지 못하는 데이터는 무시합니다.
  2. onBackpressureBuffer : Flowable(Observable)에서 보낸 항목을 큐에 쌓아 놓고 항목을 처리하는 쪽에서 해당항목을 나중에 처리할 수 있게 해줍니다. 인자값으로 (int) 를 넣을 경우 큐에 쌓아둘수 있는 수를 제한 할 수 있습니다.
.onBackpressureDrop() <- 위치는 여기에 해줘야 정상 동작된다.
.observeOn(Schedulers.io())

그럼 다음장에서는 Subscriber 에 대해서 알아보자.

이제 배압(Back pressure) 을 해결 하기 위해 Flowable(RxJava2 부터 도입) 을 했다.

한번 살펴보도록 하자.

//Observable -> Flowable 변경
Flowable.range( 1, 999_999_999)
        .map(MyItem::new)
        .observeOn(Schedulers.io())
        .subscribe(myItem -> {
            sleep(50);
            System.out.println("Received MyItem : " + myItem.id);
        });

sleep(Integer.MAX_VALUE);

''' 
Constructing MyItem 1
 Constructing MyItem 2
 Constructing MyItem 3
 ...
 Constructing MyItem 127
 Constructing MyItem 128
 Received MyItem 1
 Received MyItem 2
 Received MyItem 3
 ...
 Received MyItem 95
 Received MyItem 96
 Constructing MyItem 129
 Constructing MyItem 130
 Constructing MyItem 131
 ...
 Constructing MyItem 223
 Constructing MyItem 224
 Received MyItem 97

여기 결과에서 보면 아주 흥미롭다.

128개의 생성이 이루어지고 96개의 배출이 이루어지고 있다. 그리고 다시 96개의 생성이 이루어지고. 있다.

여기에서 재밌는 점은 왜 128개의 배출이 처음에 이루어지지 않았는가 이다.

만약 96개의 생성이 이루어지고 96개의 배출이 이루어진다고 가정하면 다음 96개의 작업에 대해서 준비를 하는 과정에서 시간이 딜레이가 될수 있다는 점이다.

이걸 유휴시간이라고 말하는데 그 시간동안 준비된 작업을 제공하여 처리량을 높일수 있다고 한다.

즉 공장에서 더 많은 것을 기다리는 동안 주문을 공급하기 위해 약간의 재고를 보유한 창고와 비슷하다고 보면 된다.

그럼 언제 Observable 을 써야 하며 Flowable은 또 언제 사용하는 것일까?

참고자료 : http://realignist.me/code/2017/01/25/rxjava2-changelog.html

<