RxKotlin
우선 dependencies
을 추가를 하자.
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxkotlin:2.0.2'
io.reactivex.rxjava2:rxkotlin
의 경우 코틀린으로 확장해서 따로 유틸을 만든 것이다.
주소는 여기 클릭 해서 한번 보도록 하자.
그리고 소스셋을 지정해서 폴더를 구분해주자.
sourceSets {
main.kotlin.srcDirs += 'src/main/kotlin'
}
그럼 코틀린 파일을 새로 만들어서 테스트 해보자.
1~10까지 돌면서 문자값으로 변경해주는 예제이다.
import io.reactivex.Observable
fun main(args : Array<String>) {
Observable.range(1 , 10)
.map { "Number : $it" }
.subscribe(System.out::println)
}
=======================
Number : 1
Number : 2
Number : 3
Number : 4
Number : 5
Number : 6
Number : 7
Number : 8
Number : 9
Number : 10
또 다른 Scan 을 테스트 해보자.
fun testScan(){
Observable.range(1 , 10)
.scan(0) { total , next -> total + next} //람다를 이용한 축
.subscribe(System.out::println)
}
=====================
0
1
3
6
10
15
21
28
36
45
55
Extension Operators
코틀린은 확장시 정말 편하게 가능하다.
스위프트도 더 편한것 같다.
RxJava 에서 확장 하는compose
나 life
을 대체할수 있다.
toSet
으로 확장해서 사용하는 예제이다.
fun test_extenstion_toset(){
val source = Observable.range(1, 10)
val asSet = source.toSet()
asSet.subscribe( { s -> System.out.println( s ) } )
}
//toSet 으로 확장하기.
fun <T> Observable<T>.toSet() = collect(
{ HashSet<T>() } , // HashSet으로 초기화
{ set , next -> set.add( next )} // set에 next를 추가
)
.map { it as Set<T> } // 다시 Set으로 변경
==========================================
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Generic Type으로 말고 타겟을 직접 정의도 충분히 가능하다.
아래 예제는 주어진 값을 총합으로 구한다.
fun test_extenstion_sum(){
val source = Observable.just( 100 , 50 , 250 , 150)
val total = source.sum();
total.subscribe( { s -> System.out.println( s ) } )
}
fun Observable<Int>.sum() =
reduce(0) { total , next -> total + next}
==========================================================
550
RxKotlin
RxJava에서 Kotlin 으로 변경시 조금 다른 형태를 받을수 있습니다.
만약 리스트를 받아서 그 형태 그대로 리스트 형태의 옵저버를 만들고 싶을때 사용하면 편하다.
import io.reactivex.rxkotlin.toObservable
fun main(args: Array<String>) {
val myList = listOf("Alpha", "Beta", "Gamma", "Delta",
"Epsilon")
myList.toObservable() //일반 리스트를 Observable로 변경할 수 있는 확장 함수 가능
.map(String::length)
.subscribe(::println)
}
그 외에도 여러가지 함수들이 있다. 한번 꼭 들어가서 확인 해보자.
https://github.com/ReactiveX/RxKotlin
Extensions
Target Type | Method | Return Type | Description |
---|---|---|---|
BooleanArray | toObservable() | Observable | Turns a Boolean array into an Observable |
ByteArray | toObservable() | Observable | Turns a Byte array into an Observable |
ShortArray | toObservable() | Observable | Turns a Short array into an Observable |
IntArray | toObservable() | Observable | Turns an Int array into an Observable |
LongArray | toObservable() | Observable | Turns a Long array into an Observable |
FloatArray | toObservable() | Observable | Turns an Float array into an Observable |
DoubleArray | toObservable() | Observable | Turns an Double array into an Observable |
Array | toObservable() | Observable | Turns aT array into an Observable |
IntProgression | toObservable() | Observable | Turns anIntProgression into an Observable |
Iterable | toObservable() | Observable | Turns anIterable<T> into an Observable |
Iterator | toObservable() | Observable | Turns anIterator<T> into an Observable |
Observable | flatMapSequence() | Observable | Flat maps eachT emission to aSequenece<R> |
Observable<Pair<A,B>> | toMap() | CollectsPair<A,B> emissions into aMap<A,B> | |
Observable<Pair<A,B>> | toMultimap() | CollectsPair<A,B> emissions into aMap<A,List<B>> | |
Observable<Observable> | mergeAll() | Observable | Merges all Observables emitted from an Observable |
Observable<Observable> | concatAll() | Observable | Cocnatenates all Observables emitted from an Observable |
Observable<Observable> | switchLatest() | Observable | Emits from the last emitted Observable |
Observable<*> | cast() | Observable | Casts all emissions to the reified type |
Observable<*> | ofType() | Observable | Filters all emissions to only the reified type |
Iterable<Observable> | merge() | Merges an Iterable of Observables into a single Observable | |
Iterable<Observable> | mergeDelayError() | Merges an Iterable of Observables into a single Observable, but delays any error | |
BooleanArray | toFlowable() | Flowable | Turns a Boolean array into an Flowable |
ByteArray | toFlowable() | Flowable | Turns a Byte array into an Flowable |
ShortArray | toFlowable() | Flowable | Turns a Short array into an Flowable |
IntArray | toFlowable() | Flowable | Turns an Int array into an Flowable |
LongArray | toFlowable() | Flowable | Turns a Long array into an Flowable |
FloatArray | toFlowable() | Flowable | Turns an Float array into an Flowable |
DoubleArray | toFlowable() | Flowable | Turns an Double array into an Flowable |
Array | toFlowable() | Flowable | Turns aT array into an Flowable |
IntProgression | toFlowable() | Flowable | Turns anIntProgression into an Flowable |
Iterable | toFlowable() | Flowable | Turns anIterable<T> into an Flowable |
Iterator | toFlowable() | Flowable | Turns anIterator<T> into an Flowable |
Flowable | flatMapSequence() | Flowable | Flat maps eachT emission to aSequenece<R> |
Flowable<Pair<A,B>> | toMap() | CollectsPair<A,B> emissions into aMap<A,B> | |
Flowable<Pair<A,B>> | toMultimap() | CollectsPair<A,B> emissions into aMap<A,List<B>> | |
Flowable<Flowable> | mergeAll() | Flowable | Merges all Flowables emitted from an Flowable |
Flowable<Flowable> | concatAll() | Flowable | Cocnatenates all Flowables emitted from an Flowable |
Flowable<Flowable> | switchLatest() | Flowable | Emits from the last emitted Flowable |
Flowable | cast() | Flowable | Casts all emissions to the reified type |
Flowable | ofType() | Flowable | Filters all emissions to only the reified type |
Iterable<Flowable> | merge() | Merges an Iterable of Flowables into a single Flowable | |
Iterable<Flowable> | mergeDelayError() | Merges an Iterable of Flowables into a single Flowable, but delays any error | |
T | toSingle() | Single | Turns anyT item into aSingle<T> |
Future | toSingle() | Single | Turns aFuture<T> into aSingle<T> |
Callable | toSingle() | Single | Turns aCallable<T> into aSingle<T> |
() -> T | toSingle() | Single | Turns a() -> T into aSingle<T> |
Single | cast() | Single | Casts all emissions to the reified type |
Observable<Single> | mergeAllSingles() | Observable | Mergaes all Singles emitted from an Observable |
Flowable<Single> | mergeAllSingles() | Flowable | Mergaes all Singles emitted from a Flowable |
T?.toMaybe() | toMaybe() | Maybe | Turns a nullableT value into aMaybe<T> that will only emit if not null |
Future | toMaybe() | Maybe | Turns aFuture<T> into aMaybe<T> |
Callable | toMaybe() | Maybe | Turns aCallable<T> into aMaybe<T> |
() -> T | toMaybe() | Maybe | Turns a() -> T into aMaybe<T> |
Maybe | cast() | Maybe | Casts any emissions to the reified type |
Maybe | ofType() | Maybe | Filters any emission that is the reified type |
Observable<Maybe> | mergeAllMaybes() | Observable | Merges all emitted Maybes |
Flowable<Maybe> | mergeAllMaybes() | Flowable | Merges all emitted Maybes |
Action | toCompletable() | Completable | Turns anAction into aCompletable |
Callable | toCompletable() | Completable | Turns aCallable into aCompletable |
Future | toCompletable() | Completable | Turns aFuture into aCompletable |
(() -> Any) | toCompletable() | Completable | Turns a(() -> Any) into aCompletable |
Observable | mergeAllCompletables() | Completable> | Merges all emitted Completables |
Flowable | mergeAllCompletables() | Completable | Merges all emitted Completables |
Observable | subscribeBy() | Disposable | Allows named arguments to construct an Observer |
Flowable | subscribeBy() | Disposable | Allows named arguments to construct a Subscriber |
Single | subscribeBy() | Disposable | Allows named arguments to construct a SingleObserver |
Maybe | subscribeBy() | Disposable | Allows named arguments to construct a MaybeObserver |
Completable | subscribeBy() | Disposable | Allows named arguments to construct a CompletableObserver |
Observable | blockingSubscribeBy() | Unit | Allows named arguments to construct a blocking Observer |
Flowable | blockingSubscribeBy() | Unit | Allows named arguments to construct a blocking Subscriber |
Disposable | addTo() | Disposable | Adds aDisposable to the specifiedCompositeDisposable |
CompositeDisposable | plusAssign() | Disposable | Operator function to add aDisposable to thisCompositeDisposable |
RxKotlin SAM(Single Abstract Methods) Ambiguity
아쉽게도 RxJava 만들시 코틀린을 염두에 두고 만들지는 못 했다. 그래서 RxKotlin 에서도 SAM 이슈는 발생하고 있다.
SAM 이슈 관련 문서 링크 ( 태환님 블로그 )
우선 문제가 생기는 코드를 보자. 자주 쓰는 Zip
이므로 꼭 이해를 해야 한다.
fun test_sam1(){
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
val result = Observable.zip( source , numbers) { s , n -> "$s $n"} //오류 발생한다.
result.subscribe(::println)
}
위와 같이 BiFunction<String,Int,String>을 찾을 수 없다고 나온다.
그러면 수정을 다시 하면 정상적으로 나온다.
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
val result = Observable.zip( source , numbers ,
BiFunction<String,Int,String>{ s , n -> "$s $n"} //이렇게 BiFunction을 따로 지정
)
result.subscribe(::println)
===================
Alpha 1
Beta 2
Gamma 3
Delta 4
그런데 소스가 콜백함수로 지저분해진다. 보기가 싫어진다.
그래서 앞서 소개한 RxKotlin 라이버러리를 이용하면 보기 좋게 만들수 있다.
io.reactivex.rxkotlin.Observables
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
val result = io.reactivex.rxkotlin.Observables.zip( source , numbers) { s, n -> "$s $n"}
result.subscribe(::println)
그리고 zipWith
을 확장해서 제공하고 있다.
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
// val result = io.reactivex.rxkotlin.Observables.zip( source , numbers) { s, n -> "$s $n"}
val result2 = source.zipWith( numbers ) { s, n -> "$s $n"} //zipWith는 따로 확장해서 사용중
result2.subscribe(::println)
let() 와 apply() 응용
Kotlin 에서 let 과 apply 는 아주 유용한 함수이다.
자세한 설명은 다음 링크 를 참조해서 보자.
val numbers = Observable.just( 180.0 , 160.0 , 140.0 , 100.0 , 120.0)
val average = numbers
.map { it * 2 }
.doOnNext({ println("doOnNExt => $it")} )
.publish()
.autoConnect(2)
.let {
val sum = it.doOnNext({ println("sum doOnNext => $it")}).reduce(0.0) { total , next -> total + next }
val count = it.count()
println("count -> "+ count)
sum.doOnEvent( {d , e -> println("sum emmit -> $d")}).zipWith( count ) { s,c -> s/c }
}
average.subscribeBy (::println )
============================
count -> io.reactivex.internal.operators.observable.ObservableCountSingle@7b1d7fff
doOnNExt => 360.0
sum doOnNext => 360.0
doOnNExt => 320.0
sum doOnNext => 320.0
doOnNExt => 280.0
sum doOnNext => 280.0
doOnNExt => 200.0
sum doOnNext => 200.0
doOnNExt => 240.0
sum doOnNext => 240.0
sum emmit -> 1400.0
280.0
Apply()
자체 함수를 실행할 수 있는 장점이 있다.
val statusObserver = PublishSubject.create<Long>()
statusObserver.subscribe{ println("Status Observer : $it")}
Observable.interval(1 , TimeUnit.SECONDS)
.take(5)
.publish()
.autoConnect(2)
.apply {
subscribe(statusObserver) //Observable 클래스 내부에 있는 함수를 호출할 수 있다.
}
.map{ it * 1000 }
.subscribe{
println("Main Observer : $it")
}
Thread.sleep(7000);
============================================
Status Observer : 0
Main Observer : 0
Status Observer : 1
Main Observer : 1000
Status Observer : 2
Main Observer : 2000
Status Observer : 3
Main Observer : 3000
Status Observer : 4
Main Observer : 4000
Tuples and data classes
코틀린 함수로 Tuple 함수를 빨리 만드는 방법을 알아보자.
주어진 문자 , 숫자 를 뭉쳐서 Pair 로 만드는 예제이다.
val strings = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")val numbers = Observable.range(1,4)
Observables .zip(strings , numbers) { s, n -> s to n} // Pair로 뭉쳐서 리턴 .subscribe{ println(it) }=================(Alpha, 1)(Beta, 2)(Gamma, 3)(Delta, 4)
위의 내용을 Data 클래스로 변경하면 더 보기가 편해진다.
val strings = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")val numbers = Observable.range(1,4)
data class StringAndNumber(val myString : String , val myNumber : Int) //Data Class 생성 후 만들기
Observables .zip(strings , numbers) { s, n -> StringAndNumber(s , n)} // Pair로 뭉쳐서 리턴 .subscribe{ println(it) }
=====================StringAndNumber(myString=Alpha, myNumber=1)StringAndNumber(myString=Beta, myNumber=2)StringAndNumber(myString=Gamma, myNumber=3)StringAndNumber(myString=Delta, myNumber=4)
'스터디 > RxJava2' 카테고리의 다른 글
[RxKotlin] Lift 연산자에 대해서 알아보자 (0) | 2018.01.22 |
---|---|
RxKotlin Using 사용해보자 (0) | 2018.01.19 |
[RxJava2]동시성과 병렬화 테스트 (0) | 2017.10.22 |
[RxJava2]SubscriberWith는 무엇일까? (2) | 2017.10.22 |
[RxJava2] 디버깅 하는 기본적인 방법 (1) | 2017.10.22 |