«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

올해는 머신러닝이다.

RxKotlin 정리 본문

스터디/RxJava2

RxKotlin 정리

행복한 수지아빠 2017. 11. 6. 07:33

RxKotlin

우선 dependencies을 추가를 하자.

 compile 'io.reactivex.rxjava2:rxjava:2.1.0'
 compile 'io.reactivex.rxjava2:rxkotlin:2.0.2'
io.reactivex.rxjava2:rxkotlin

의 경우 코틀린으로 확장해서 따로 유틸을 만든 것이다.

주소는 여기 클릭 해서 한번 보도록 하자.

그리고 소스셋을 지정해서 폴더를 구분해주자.

sourceSets {
     main.kotlin.srcDirs += 'src/main/kotlin'
}

그럼 코틀린 파일을 새로 만들어서 테스트 해보자.

1~10까지 돌면서 문자값으로 변경해주는 예제이다.

import io.reactivex.Observable

fun main(args : Array<String>) {
    Observable.range(1 , 10)
            .map { "Number : $it" }
            .subscribe(System.out::println)
}
=======================
Number : 1
Number : 2
Number : 3
Number : 4
Number : 5
Number : 6
Number : 7
Number : 8
Number : 9
Number : 10

또 다른 Scan 을 테스트 해보자.

fun testScan(){
    Observable.range(1 , 10)
            .scan(0) { total , next -> total + next} //람다를 이용한 축
            .subscribe(System.out::println)
}
=====================
0
1
3
6
10
15
21
28
36
45
55


Extension Operators

코틀린은 확장시 정말 편하게 가능하다.

스위프트도 더 편한것 같다.

RxJava 에서 확장 하는compose 나 life 을 대체할수 있다.

toSet으로 확장해서 사용하는 예제이다.

fun test_extenstion_toset(){
    val source = Observable.range(1, 10)

    val asSet = source.toSet()
    asSet.subscribe( { s -> System.out.println( s ) } )
}
//toSet 으로 확장하기.
fun <T> Observable<T>.toSet() = collect(
                                            { HashSet<T>() } , // HashSet으로 초기화
                                            { set , next -> set.add( next )} // set에 next를 추가
                                        )
                                .map { it as Set<T> } // 다시 Set으로 변경

==========================================
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Generic Type으로 말고 타겟을 직접 정의도 충분히 가능하다.

아래 예제는 주어진 값을 총합으로 구한다.

fun test_extenstion_sum(){
    val source = Observable.just( 100 , 50 , 250 , 150)

    val total = source.sum();
    total.subscribe( { s -> System.out.println( s ) } )
}

fun Observable<Int>.sum() =
        reduce(0) { total , next -> total + next}

==========================================================
550

RxKotlin

RxJava에서 Kotlin 으로 변경시 조금 다른 형태를 받을수 있습니다.

만약 리스트를 받아서 그 형태 그대로 리스트 형태의 옵저버를 만들고 싶을때 사용하면 편하다.

import io.reactivex.rxkotlin.toObservable

fun main(args: Array<String>) {

     val myList = listOf("Alpha", "Beta", "Gamma", "Delta", 
     "Epsilon")

     myList.toObservable() //일반 리스트를 Observable로 변경할 수 있는 확장 함수 가능
             .map(String::length)
             .subscribe(::println)
 }

그 외에도 여러가지 함수들이 있다. 한번 꼭 들어가서 확인 해보자.

https://github.com/ReactiveX/RxKotlin

Extensions

Target TypeMethodReturn TypeDescription
BooleanArraytoObservable()ObservableTurns a Boolean array into an Observable
ByteArraytoObservable()ObservableTurns a Byte array into an Observable
ShortArraytoObservable()ObservableTurns a Short array into an Observable
IntArraytoObservable()ObservableTurns an Int array into an Observable
LongArraytoObservable()ObservableTurns a Long array into an Observable
FloatArraytoObservable()ObservableTurns an Float array into an Observable
DoubleArraytoObservable()ObservableTurns an Double array into an Observable
ArraytoObservable()ObservableTurns aTarray into an Observable
IntProgressiontoObservable()ObservableTurns anIntProgressioninto an Observable
IterabletoObservable()ObservableTurns anIterable<T>into an Observable
IteratortoObservable()ObservableTurns anIterator<T>into an Observable
ObservableflatMapSequence()ObservableFlat maps eachTemission to aSequenece<R>
Observable<Pair<A,B>>toMap()CollectsPair<A,B>emissions into aMap<A,B>
Observable<Pair<A,B>>toMultimap()CollectsPair<A,B>emissions into aMap<A,List<B>>
Observable<Observable>mergeAll()ObservableMerges all Observables emitted from an Observable
Observable<Observable>concatAll()ObservableCocnatenates all Observables emitted from an Observable
Observable<Observable>switchLatest()ObservableEmits from the last emitted Observable
Observable<*>cast()ObservableCasts all emissions to the reified type
Observable<*>ofType()ObservableFilters all emissions to only the reified type
Iterable<Observable>merge()Merges an Iterable of Observables into a single Observable
Iterable<Observable>mergeDelayError()Merges an Iterable of Observables into a single Observable, but delays any error
BooleanArraytoFlowable()FlowableTurns a Boolean array into an Flowable
ByteArraytoFlowable()FlowableTurns a Byte array into an Flowable
ShortArraytoFlowable()FlowableTurns a Short array into an Flowable
IntArraytoFlowable()FlowableTurns an Int array into an Flowable
LongArraytoFlowable()FlowableTurns a Long array into an Flowable
FloatArraytoFlowable()FlowableTurns an Float array into an Flowable
DoubleArraytoFlowable()FlowableTurns an Double array into an Flowable
ArraytoFlowable()FlowableTurns aTarray into an Flowable
IntProgressiontoFlowable()FlowableTurns anIntProgressioninto an Flowable
IterabletoFlowable()FlowableTurns anIterable<T>into an Flowable
IteratortoFlowable()FlowableTurns anIterator<T>into an Flowable
FlowableflatMapSequence()FlowableFlat maps eachTemission to aSequenece<R>
Flowable<Pair<A,B>>toMap()CollectsPair<A,B>emissions into aMap<A,B>
Flowable<Pair<A,B>>toMultimap()CollectsPair<A,B>emissions into aMap<A,List<B>>
Flowable<Flowable>mergeAll()FlowableMerges all Flowables emitted from an Flowable
Flowable<Flowable>concatAll()FlowableCocnatenates all Flowables emitted from an Flowable
Flowable<Flowable>switchLatest()FlowableEmits from the last emitted Flowable
Flowablecast()FlowableCasts all emissions to the reified type
FlowableofType()FlowableFilters all emissions to only the reified type
Iterable<Flowable>merge()Merges an Iterable of Flowables into a single Flowable
Iterable<Flowable>mergeDelayError()Merges an Iterable of Flowables into a single Flowable, but delays any error
TtoSingle()SingleTurns anyTitem into aSingle<T>
FuturetoSingle()SingleTurns aFuture<T>into aSingle<T>
CallabletoSingle()SingleTurns aCallable<T>into aSingle<T>
() -> TtoSingle()SingleTurns a() -> Tinto aSingle<T>
Singlecast()SingleCasts all emissions to the reified type
Observable<Single>mergeAllSingles()ObservableMergaes all Singles emitted from an Observable
Flowable<Single>mergeAllSingles()FlowableMergaes all Singles emitted from a Flowable
T?.toMaybe()toMaybe()MaybeTurns a nullableTvalue into aMaybe<T>that will only emit if not null
FuturetoMaybe()MaybeTurns aFuture<T>into aMaybe<T>
CallabletoMaybe()MaybeTurns aCallable<T>into aMaybe<T>
() -> TtoMaybe()MaybeTurns a() -> Tinto aMaybe<T>
Maybecast()MaybeCasts any emissions to the reified type
MaybeofType()MaybeFilters any emission that is the reified type
Observable<Maybe>mergeAllMaybes()ObservableMerges all emitted Maybes
Flowable<Maybe>mergeAllMaybes()FlowableMerges all emitted Maybes
ActiontoCompletable()CompletableTurns anActioninto aCompletable
CallabletoCompletable()CompletableTurns aCallableinto aCompletable
FuturetoCompletable()CompletableTurns aFutureinto aCompletable
(() -> Any)toCompletable()CompletableTurns a(() -> Any)into aCompletable
ObservablemergeAllCompletables()Completable>Merges all emitted Completables
FlowablemergeAllCompletables()CompletableMerges all emitted Completables
ObservablesubscribeBy()DisposableAllows named arguments to construct an Observer
FlowablesubscribeBy()DisposableAllows named arguments to construct a Subscriber
SinglesubscribeBy()DisposableAllows named arguments to construct a SingleObserver
MaybesubscribeBy()DisposableAllows named arguments to construct a MaybeObserver
CompletablesubscribeBy()DisposableAllows named arguments to construct a CompletableObserver
ObservableblockingSubscribeBy()UnitAllows named arguments to construct a blocking Observer
FlowableblockingSubscribeBy()UnitAllows named arguments to construct a blocking Subscriber
DisposableaddTo()DisposableAdds aDisposableto the specifiedCompositeDisposable
CompositeDisposableplusAssign()DisposableOperator function to add aDisposableto thisCompositeDisposable


RxKotlin SAM(Single Abstract Methods) Ambiguity

아쉽게도 RxJava 만들시 코틀린을 염두에 두고 만들지는 못 했다. 그래서 RxKotlin 에서도 SAM 이슈는 발생하고 있다.

SAM 이슈 관련 문서 링크 ( 태환님 블로그 )

우선 문제가 생기는 코드를 보자. 자주 쓰는 Zip이므로 꼭 이해를 해야 한다.

fun test_sam1(){
    val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
    val numbers = Observable.range(1,4)

    val result = Observable.zip( source , numbers) { s , n -> "$s $n"} //오류 발생한다. 

    result.subscribe(::println)
}

위와 같이 BiFunction<String,Int,String>을 찾을 수 없다고 나온다.

그러면 수정을 다시 하면 정상적으로 나온다.

val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

val result = Observable.zip( source , numbers , 
                                BiFunction<String,Int,String>{ s , n -> "$s $n"} //이렇게 BiFunction을 따로 지정
                            )
result.subscribe(::println)
===================
Alpha 1
Beta 2
Gamma 3
Delta 4

그런데 소스가 콜백함수로 지저분해진다. 보기가 싫어진다.

그래서 앞서 소개한 RxKotlin 라이버러리를 이용하면 보기 좋게 만들수 있다.

  • io.reactivex.rxkotlin.Observables
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

val result = io.reactivex.rxkotlin.Observables.zip( source , numbers) { s, n -> "$s $n"}

result.subscribe(::println)

그리고 zipWith을 확장해서 제공하고 있다.

val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

//    val result = io.reactivex.rxkotlin.Observables.zip( source , numbers) { s, n -> "$s $n"}
val result2 = source.zipWith( numbers ) { s, n -> "$s $n"} //zipWith는 따로 확장해서 사용중

result2.subscribe(::println)


let() 와 apply() 응용

Kotlin 에서 let 과 apply 는 아주 유용한 함수이다.

자세한 설명은 다음 링크 를 참조해서 보자.

val numbers = Observable.just( 180.0 , 160.0 , 140.0 , 100.0 , 120.0)

val average = numbers
        .map { it * 2 }
        .doOnNext({ println("doOnNExt => $it")} )
        .publish()
        .autoConnect(2)
        .let {
            val sum = it.doOnNext({ println("sum doOnNext => $it")}).reduce(0.0) { total , next -> total + next }
            val count = it.count()

            println("count -> "+ count)
            sum.doOnEvent( {d , e -> println("sum emmit -> $d")}).zipWith( count ) { s,c -> s/c }
        }
average.subscribeBy (::println )

============================

count -> io.reactivex.internal.operators.observable.ObservableCountSingle@7b1d7fff
doOnNExt => 360.0
sum doOnNext => 360.0
doOnNExt => 320.0
sum doOnNext => 320.0
doOnNExt => 280.0
sum doOnNext => 280.0
doOnNExt => 200.0
sum doOnNext => 200.0
doOnNExt => 240.0
sum doOnNext => 240.0
sum emmit -> 1400.0
280.0

Apply()

자체 함수를 실행할 수 있는 장점이 있다.

val statusObserver = PublishSubject.create<Long>()
statusObserver.subscribe{ println("Status Observer : $it")}

Observable.interval(1 , TimeUnit.SECONDS)
        .take(5)
        .publish()
        .autoConnect(2)
        .apply {
            subscribe(statusObserver) //Observable 클래스 내부에 있는 함수를 호출할 수 있다. 
        }
        .map{ it * 1000 }
        .subscribe{
            println("Main Observer : $it")
        }

Thread.sleep(7000);
============================================
Status Observer : 0
Main Observer : 0
Status Observer : 1
Main Observer : 1000
Status Observer : 2
Main Observer : 2000
Status Observer : 3
Main Observer : 3000
Status Observer : 4
Main Observer : 4000


Tuples and data classes

코틀린 함수로 Tuple 함수를 빨리 만드는 방법을 알아보자.

주어진 문자 , 숫자 를 뭉쳐서 Pair 로 만드는 예제이다.

val strings = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

Observables
.zip(strings , numbers) { s, n -> s to n} // Pair로 뭉쳐서 리턴
.subscribe{
println(it)
}
=================
(Alpha, 1)
(Beta, 2)
(Gamma, 3)
(Delta, 4)

위의 내용을 Data 클래스로 변경하면 더 보기가 편해진다.

val strings = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)

data class StringAndNumber(val myString : String , val myNumber : Int) //Data Class 생성 후 만들기

Observables
.zip(strings , numbers) { s, n -> StringAndNumber(s , n)} // Pair로 뭉쳐서 리턴
.subscribe{
println(it)
}

=====================
StringAndNumber(myString=Alpha, myNumber=1)
StringAndNumber(myString=Beta, myNumber=2)
StringAndNumber(myString=Gamma, myNumber=3)
StringAndNumber(myString=Delta, myNumber=4)