compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxkotlin:2.0.2'
io.reactivex.rxjava2:rxkotlin
sourceSets {
main.kotlin.srcDirs += 'src/main/kotlin'
}
import io.reactivex.Observable
fun main(args : Array<String>) {
Observable.range(1 , 10)
.map { "Number : $it" }
.subscribe(System.out::println)
}
=======================
Number : 1
Number : 2
Number : 3
Number : 4
Number : 5
Number : 6
Number : 7
Number : 8
Number : 9
Number : 10
fun testScan(){
Observable.range(1 , 10)
.scan(0) { total , next -> total + next} //람다를 이용한 축
.subscribe(System.out::println)
}
=====================
0
1
3
6
10
15
21
28
36
45
55
Extension Operators
fun test_extenstion_toset(){
val source = Observable.range(1, 10)
val asSet = source.toSet()
asSet.subscribe( { s -> System.out.println( s ) } )
}
//toSet 으로 확장하기.
fun <T> Observable<T>.toSet() = collect(
{ HashSet<T>() } , // HashSet으로 초기화
{ set , next -> set.add( next )} // set에 next를 추가
)
.map { it as Set<T> } // 다시 Set으로 변경
==========================================
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
fun test_extenstion_sum(){
val source = Observable.just( 100 , 50 , 250 , 150)
val total = source.sum();
total.subscribe( { s -> System.out.println( s ) } )
}
fun Observable<Int>.sum() =
reduce(0) { total , next -> total + next}
==========================================================
550
RxKotlin
import io.reactivex.rxkotlin.toObservable
fun main(args: Array<String>) {
val myList = listOf("Alpha", "Beta", "Gamma", "Delta",
"Epsilon")
myList.toObservable() //일반 리스트를 Observable로 변경할 수 있는 확장 함수 가능
.map(String::length)
.subscribe(::println)
}
Extensions
Target Type | Method | Return Type | Description |
---|
BooleanArray | toObservable() | Observable | Turns a Boolean array into an Observable |
ByteArray | toObservable() | Observable | Turns a Byte array into an Observable |
ShortArray | toObservable() | Observable | Turns a Short array into an Observable |
IntArray | toObservable() | Observable | Turns an Int array into an Observable |
LongArray | toObservable() | Observable | Turns a Long array into an Observable |
FloatArray | toObservable() | Observable | Turns an Float array into an Observable |
DoubleArray | toObservable() | Observable | Turns an Double array into an Observable |
Array | toObservable() | Observable | Turns aT array into an Observable |
IntProgression | toObservable() | Observable | Turns anIntProgression into an Observable |
Iterable | toObservable() | Observable | Turns anIterable<T> into an Observable |
Iterator | toObservable() | Observable | Turns anIterator<T> into an Observable |
Observable | flatMapSequence() | Observable | Flat maps eachT emission to aSequenece<R> |
Observable<Pair<A,B>> | toMap() | CollectsPair<A,B> emissions into aMap<A,B> | |
Observable<Pair<A,B>> | toMultimap() | CollectsPair<A,B> emissions into aMap<A,List<B>> | |
Observable<Observable> | mergeAll() | Observable | Merges all Observables emitted from an Observable |
Observable<Observable> | concatAll() | Observable | Cocnatenates all Observables emitted from an Observable |
Observable<Observable> | switchLatest() | Observable | Emits from the last emitted Observable |
Observable<*> | cast() | Observable | Casts all emissions to the reified type |
Observable<*> | ofType() | Observable | Filters all emissions to only the reified type |
Iterable<Observable> | merge() | Merges an Iterable of Observables into a single Observable | |
Iterable<Observable> | mergeDelayError() | Merges an Iterable of Observables into a single Observable, but delays any error | |
BooleanArray | toFlowable() | Flowable | Turns a Boolean array into an Flowable |
ByteArray | toFlowable() | Flowable | Turns a Byte array into an Flowable |
ShortArray | toFlowable() | Flowable | Turns a Short array into an Flowable |
IntArray | toFlowable() | Flowable | Turns an Int array into an Flowable |
LongArray | toFlowable() | Flowable | Turns a Long array into an Flowable |
FloatArray | toFlowable() | Flowable | Turns an Float array into an Flowable |
DoubleArray | toFlowable() | Flowable | Turns an Double array into an Flowable |
Array | toFlowable() | Flowable | Turns aT array into an Flowable |
IntProgression | toFlowable() | Flowable | Turns anIntProgression into an Flowable |
Iterable | toFlowable() | Flowable | Turns anIterable<T> into an Flowable |
Iterator | toFlowable() | Flowable | Turns anIterator<T> into an Flowable |
Flowable | flatMapSequence() | Flowable | Flat maps eachT emission to aSequenece<R> |
Flowable<Pair<A,B>> | toMap() | CollectsPair<A,B> emissions into aMap<A,B> | |
Flowable<Pair<A,B>> | toMultimap() | CollectsPair<A,B> emissions into aMap<A,List<B>> | |
Flowable<Flowable> | mergeAll() | Flowable | Merges all Flowables emitted from an Flowable |
Flowable<Flowable> | concatAll() | Flowable | Cocnatenates all Flowables emitted from an Flowable |
Flowable<Flowable> | switchLatest() | Flowable | Emits from the last emitted Flowable |
Flowable | cast() | Flowable | Casts all emissions to the reified type |
Flowable | ofType() | Flowable | Filters all emissions to only the reified type |
Iterable<Flowable> | merge() | Merges an Iterable of Flowables into a single Flowable | |
Iterable<Flowable> | mergeDelayError() | Merges an Iterable of Flowables into a single Flowable, but delays any error | |
T | toSingle() | Single | Turns anyT item into aSingle<T> |
Future | toSingle() | Single | Turns aFuture<T> into aSingle<T> |
Callable | toSingle() | Single | Turns aCallable<T> into aSingle<T> |
() -> T | toSingle() | Single | Turns a() -> T into aSingle<T> |
Single | cast() | Single | Casts all emissions to the reified type |
Observable<Single> | mergeAllSingles() | Observable | Mergaes all Singles emitted from an Observable |
Flowable<Single> | mergeAllSingles() | Flowable | Mergaes all Singles emitted from a Flowable |
T?.toMaybe() | toMaybe() | Maybe | Turns a nullableT value into aMaybe<T> that will only emit if not null |
Future | toMaybe() | Maybe | Turns aFuture<T> into aMaybe<T> |
Callable | toMaybe() | Maybe | Turns aCallable<T> into aMaybe<T> |
() -> T | toMaybe() | Maybe | Turns a() -> T into aMaybe<T> |
Maybe | cast() | Maybe | Casts any emissions to the reified type |
Maybe | ofType() | Maybe | Filters any emission that is the reified type |
Observable<Maybe> | mergeAllMaybes() | Observable | Merges all emitted Maybes |
Flowable<Maybe> | mergeAllMaybes() | Flowable | Merges all emitted Maybes |
Action | toCompletable() | Completable | Turns anAction into aCompletable |
Callable | toCompletable() | Completable | Turns aCallable into aCompletable |
Future | toCompletable() | Completable | Turns aFuture into aCompletable |
(() -> Any) | toCompletable() | Completable | Turns a(() -> Any) into aCompletable |
Observable | mergeAllCompletables() | Completable> | Merges all emitted Completables |
Flowable | mergeAllCompletables() | Completable | Merges all emitted Completables |
Observable | subscribeBy() | Disposable | Allows named arguments to construct an Observer |
Flowable | subscribeBy() | Disposable | Allows named arguments to construct a Subscriber |
Single | subscribeBy() | Disposable | Allows named arguments to construct a SingleObserver |
Maybe | subscribeBy() | Disposable | Allows named arguments to construct a MaybeObserver |
Completable | subscribeBy() | Disposable | Allows named arguments to construct a CompletableObserver |
Observable | blockingSubscribeBy() | Unit | Allows named arguments to construct a blocking Observer |
Flowable | blockingSubscribeBy() | Unit | Allows named arguments to construct a blocking Subscriber |
Disposable | addTo() | Disposable | Adds aDisposable to the specifiedCompositeDisposable |
CompositeDisposable | plusAssign() | Disposable | Operator function to add aDisposable to thisCompositeDisposable |
RxKotlin SAM(Single Abstract Methods) Ambiguity
fun test_sam1(){
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
val result = Observable.zip( source , numbers) { s , n -> "$s $n"} //오류 발생한다.
result.subscribe(::println)
}
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
val result = Observable.zip( source , numbers ,
BiFunction<String,Int,String>{ s , n -> "$s $n"} //이렇게 BiFunction을 따로 지정
)
result.subscribe(::println)
===================
Alpha 1
Beta 2
Gamma 3
Delta 4
io.reactivex.rxkotlin.Observables
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
val result = io.reactivex.rxkotlin.Observables.zip( source , numbers) { s, n -> "$s $n"}
result.subscribe(::println)
val source = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
// val result = io.reactivex.rxkotlin.Observables.zip( source , numbers) { s, n -> "$s $n"}
val result2 = source.zipWith( numbers ) { s, n -> "$s $n"} //zipWith는 따로 확장해서 사용중
result2.subscribe(::println)
let() 와 apply() 응용
val numbers = Observable.just( 180.0 , 160.0 , 140.0 , 100.0 , 120.0)
val average = numbers
.map { it * 2 }
.doOnNext({ println("doOnNExt => $it")} )
.publish()
.autoConnect(2)
.let {
val sum = it.doOnNext({ println("sum doOnNext => $it")}).reduce(0.0) { total , next -> total + next }
val count = it.count()
println("count -> "+ count)
sum.doOnEvent( {d , e -> println("sum emmit -> $d")}).zipWith( count ) { s,c -> s/c }
}
average.subscribeBy (::println )
============================
count -> io.reactivex.internal.operators.observable.ObservableCountSingle@7b1d7fff
doOnNExt => 360.0
sum doOnNext => 360.0
doOnNExt => 320.0
sum doOnNext => 320.0
doOnNExt => 280.0
sum doOnNext => 280.0
doOnNExt => 200.0
sum doOnNext => 200.0
doOnNExt => 240.0
sum doOnNext => 240.0
sum emmit -> 1400.0
280.0
Apply()
val statusObserver = PublishSubject.create<Long>()
statusObserver.subscribe{ println("Status Observer : $it")}
Observable.interval(1 , TimeUnit.SECONDS)
.take(5)
.publish()
.autoConnect(2)
.apply {
subscribe(statusObserver) //Observable 클래스 내부에 있는 함수를 호출할 수 있다.
}
.map{ it * 1000 }
.subscribe{
println("Main Observer : $it")
}
Thread.sleep(7000);
============================================
Status Observer : 0
Main Observer : 0
Status Observer : 1
Main Observer : 1000
Status Observer : 2
Main Observer : 2000
Status Observer : 3
Main Observer : 3000
Status Observer : 4
Main Observer : 4000
Tuples and data classes
코틀린 함수로 Tuple 함수를 빨리 만드는 방법을 알아보자.
주어진 문자 , 숫자 를 뭉쳐서 Pair 로 만드는 예제이다.
val strings = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
Observables
.zip(strings , numbers) { s, n -> s to n} // Pair로 뭉쳐서 리턴
.subscribe{
println(it)
}
=================
(Alpha, 1)
(Beta, 2)
(Gamma, 3)
(Delta, 4)
위의 내용을 Data 클래스로 변경하면 더 보기가 편해진다.
val strings = Observable.just("Alpha" , "Beta" , "Gamma" , "Delta")
val numbers = Observable.range(1,4)
data class StringAndNumber(val myString : String , val myNumber : Int) //Data Class 생성 후 만들기
Observables
.zip(strings , numbers) { s, n -> StringAndNumber(s , n)} // Pair로 뭉쳐서 리턴
.subscribe{
println(it)
}
=====================
StringAndNumber(myString=Alpha, myNumber=1)
StringAndNumber(myString=Beta, myNumber=2)
StringAndNumber(myString=Gamma, myNumber=3)
StringAndNumber(myString=Delta, myNumber=4)