올해는 머신러닝이다.
RxJava 예제 본문
출처 : http://www.introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html
Combining sequences
Data sources are everywhere, and sometimes we need to consume data from more than just a single source. Common examples that have many inputs include: multi touch surfaces, news feeds, price feeds, social media aggregators, file watchers, heart-beating/polling servers, etc. The way we deal with these multiple stimuli is varied too. We may want to consume it all as a deluge of integrated data, or one sequence at a time as sequential data. We could also get it in an orderly fashion, pairing data values from two sources to be processed together, or perhaps just consume the data from the first source that responds to the request.
We have uncovered the benefits of operator composition; now we turn our focus to sequence composition. Earlier on, we briefly looked at operators that work with multiple sequences such as SelectMany, TakeUntil/SkipUntil, Catch and OnErrorResumeNext. These give us a hint at the potential that sequence composition can deliver. By uncovering the features of sequence composition with Rx, we find yet another layer of game changing functionality. Sequence composition enables you to create complex queries across multiple data sources. This unlocks the possibility to write some very powerful and succinct code.
Now we will build upon the concepts covered in the Advanced Error Handling chapter. There we were able to provide continuations for sequences that failed. We will now examine operators aimed at composing sequences that are still operational instead of sequences that have terminated due to an error.
Sequential concatenation
The first methods we will look at are those that concatenate sequences sequentially. They are very similar to the methods we have seen before for dealing with faulted sequences.
Concat
The Concat extension method is probably the most simple composition method. It simply concatenates two sequences. Once the first sequence completes, the second sequence is subscribed to and its values are passed on through to the result sequence. It behaves just like the Catch extension method, but will concatenate operational sequences when they complete, instead of faulted sequences when they OnError. The simple signature for Concatis as follows.
Usage of Concat is familiar. Just like Catch or OnErrorResumeNext, we pass the continuation sequence to the extension method.
s1 --0--1--2-|
s2 -5--6--7--8--|
r --0--1--2--5--6--7--8--|
If either sequence was to fault so too would the result sequence. In particular, if s1 produced an OnError
notification, then s2 would never be used. If you wanted s2 to be used regardless of how s1 terminates, then OnErrorResumeNext would be your best option.
Concat also has two useful overloads. These overloads allow you to pass multiple observable sequences as either a params
array or an IEnumerable<IObservable<T>>.
The ability to pass an IEnumerable<IObservable<T>> means that the multiple sequences can be lazily evaluated. The overload that takes a params
array is well-suited to times when we know how many sequences we want to merge at compile time, whereas the IEnumerable<IObservable<T>> overload is a better fit when we do not know this ahead of time.
In the case of the lazily evaluated IEnumerable<IObservable<T>>, the Concat method will take one sequence, subscribe until it is completed and then switch to the next sequence. To help illustrate this, we create a method that returns a sequence of sequences and is sprinkled with logging. It returns three observable sequences each with a single value [1], [2] and [3]. Each sequence returns its value on a timer delay.
When we call our GetSequences
method and concatenate the results, we see the following output using our Dump
extension method.
Output:
Below is a marble diagram of the Concat operator applied to the GetSequences
method. 's1', 's2' and 's3' represent sequence 1, 2 and 3. Respectively, 'rs' represents the result sequence.
s1-----1|
s2 ---2|
s3 -3|
rs-----1---2-3|
You should note that the second sequence is only yielded once the first sequence has completed. To prove this, we explicitly put in a 500ms delay on producing a value and completing. Once that happens, the second sequence is then subscribed to. When that sequence completes, then the third sequence is processed in the same fashion.
Repeat
Another simple extension method is Repeat. It allows you to simply repeat a sequence, either a specified or an infinite number of times.
If you use the overload that loops indefinitely, then the only way the sequence will stop is if there is an error or the subscription is disposed of. The overload that specifies a repeat count will stop on error, un-subscription, or when it reaches that count. This example shows the sequence [0,1,2] being repeated three times.
Output:
StartWith
Another simple concatenation method is the StartWith extension method. It allows you to prefix values to a sequence. The method signature takes a params
array of values so it is easy to pass in as many or as few values as you need.
Using StartWith can give a similar effect to a BehaviorSubject<T> by ensuring a value is provided as soon as a consumer subscribes. It is not the same as a BehaviorSubject however, as it will not cache the last value.
In this example, we prefix the values -3, -2 and -1 to the sequence [0,1,2].
Output:
Concurrent sequences
The next set of methods aims to combine observable sequences that are producing values concurrently. This is an important step in our journey to understanding Rx. For the sake of simplicity, we have avoided introducing concepts related to concurrency until we had a broad understanding of the simple concepts.
Amb
The Amb method was a new concept to me when I started using Rx. It is a non-deterministic function, first introduced by John McCarthy and is an abbreviation of the word Ambiguous. The Rx implementation will return values from the sequence that is first to produce values, and will completely ignore the other sequences. In the examples below I have three sequences that all produce values. The sequences can be represented as the marble diagram below.
s1 -1--1--|
s2 --2--2--|
s3 ---3--3--|
r -1--1--|
The code to produce the above is as follows.
Output:
If we comment out the first s1.OnNext(1);
then s2 would produce values first and the marble diagram would look like this.
s1 ---1--|
s2 -2--2--|
s3 --3--3--|
r -2--2--|
The Amb feature can be useful if you have multiple cheap resources that can provide values, but latency is widely variable. For an example, you may have servers replicated around the world. Issuing a query is cheap for both the client to send and for the server to respond, however due to network conditions the latency is not predictable and varies considerably. Using the Amb operator, you can send the same request out to many servers and consume the result of the first that responds.
There are other useful variants of the Amb method. We have used the overload that takes a params
array of sequences. You could alternatively use it as an extension method and chain calls until you have included all the target sequences (e.g. s1.Amb(s2).Amb(s3)). Finally, you could pass in an IEnumerable<IObservable<T>>.
Reusing the GetSequences
method from the Concat section, we see that the evaluation of the outer (IEnumerable) sequence is eager.
Output:
Marble:
s1-----1|
s2---2|
s3-3|
rs-3|
Take note that the inner observable sequences are not subscribed to until the outer sequence has yielded them all. This means that the third sequence is able to return values the fastest even though there are two sequences yielded one second before it (due to the Thread.Sleep
).
Merge
The Merge extension method does a primitive combination of multiple concurrent sequences. As values from any sequence are produced, those values become part of the result sequence. All sequences need to be of the same type, as per the previous methods. In this diagram, we can see s1 and s2 producing values concurrently and the values falling through to the result sequence as they occur.
s1 --1--1--1--|
s2 ---2---2---2|
r --12-1-21--2|
The result of a Merge will complete only once all input sequences complete. By contrast, the Merge operator will error if any of the input sequences terminates erroneously.
The code above could be represented by the marble diagram below. In this case, each unit of time is 50ms. As both sequences produce a value at 750ms, there is a race condition and we cannot be sure which value will be notified first in the result sequence (sR).
s1 ----0----0----0|
s2 --0--0--0--0--0|
sR --0-00--00-0--00|
Output:
You can chain this overload of the Merge operator to merge multiple sequences. Merge also provides numerous other overloads that allow you to pass more than two source sequences. You can use the static method Observable.Merge which takes a params
array of sequences that is known at compile time. You could pass in an IEnumerable of sequences like the Concat method. Merge also has the overload that takes an IObservable<IObservable<T>>, a nested observable. To summarize:
- Chain Merge operators together e.g.
s1.Merge(s2).Merge(s3)
- Pass a
params
array of sequences to the Observable.Merge static method. e.g.Observable.Merge(s1,s2,s3)
- Apply the Merge operator to an IEnumerable<IObservable<T>>.
- Apply the Merge operator to an IObservable<IObservable<T>>.
For merging a known number of sequences, the first two operators are effectively the same thing and which style you use is a matter of taste: either provide them as a params
array or chain the operators together. The third and fourth overloads allow to you merge sequences that can be evaluated lazily at run time. The Merge operators that take a sequence of sequences make for an interesting concept. You can either pull or be pushed observable sequences, which will be subscribed to immediately.
If we again reuse the GetSequences
method, we can see how the Merge operator works with a sequence of sequences.
Output:
As we can see from the marble diagram, s1 and s2 are yielded and subscribed to immediately. s3 is not yielded for one second and then is subscribed to. Once all input sequences have completed, the result sequence completes.
s1-----1|
s2---2|
s3 -3|
rs---2-1-----3|
Switch
Receiving all values from a nested observable sequence is not always what you need. In some scenarios, instead of receiving everything, you may only want the values from the most recent inner sequence. A great example of this is live searches. As you type, the text is sent to a search service and the results are returned to you as an observable sequence. Most implementations have a slight delay before sending the request so that unnecessary work does not happen. Imagine I want to search for "Intro to Rx". I quickly type in "Into to" and realize I have missed the letter 'r'. I stop briefly and change the text to "Intro ". By now, two searches have been sent to the server. The first search will return results that I do not want. Furthermore, if I were to receive data for the first search merged together with results for the second search, it would be a very odd experience for the user. This scenario fits perfectly with the Switch method.
In this example, there is a source that represents a sequence of search text. Values the user types are represented as the source sequence. Using Select, we pass the value of the search to a function that takes a string
and returns an IObservable<string>. This creates our resulting nested sequence, IObservable<IObservable<string>>.
Search function signature:
Using Merge with overlapping search:
If we were lucky and each search completed before the next element from searchValues
was produced, the output would look sensible. It is much more likely, however that multiple searches will result in overlapped search results. This marble diagram shows what the Merge function could do in such a situation.
- SV is the searchValues sequence
- S1 is the search result sequence for the first value in searchValues/SV
- S2 is the search result sequence for the second value in searchValues/SV
- S3 is the search result sequence for the third value in searchValues/SV
- RM is the result sequence for the merged (Result Merge) sequences
SV--1---2---3---|
S1 -1--1--1--1|
S2 --2-2--2--2|
S3 -3--3|
RM---1--1-2123123-2|
Note how the values from the search results are all mixed together. This is not what we want. If we use the Switchextension method we will get much better results. Switch will subscribe to the outer sequence and as each inner sequence is yielded it will subscribe to the new inner sequence and dispose of the subscription to the previous inner sequence. This will result in the following marble diagram where RS is the result sequence for the Switch (Result Switch) sequences
SV--1---2---3---|
S1 -1--1--1--1|
S2 --2-2--2--2|
S3 -3--3|
RS --1--1-2-23--3|
Also note that, even though the results from S1 and S2 are still being pushed, they are ignored as their subscription has been disposed of. This eliminates the issue of overlapping values from the nested sequences.
Pairing sequences
The previous methods allowed us to flatten multiple sequences sharing a common type into a result sequence of the same type. These next sets of methods still take multiple sequences as an input, but attempt to pair values from each sequence to produce a single value for the output sequence. In some cases, they also allow you to provide sequences of different types.
CombineLatest
The CombineLatest extension method allows you to take the most recent value from two sequences, and with a given function transform those into a value for the result sequence. Each input sequence has the last value cached like Replay(1)
. Once both sequences have produced at least one value, the latest output from each sequence is passed to the resultSelector
function every time either sequence produces a value. The signature is as follows.
The marble diagram below shows off usage of CombineLatest with one sequence that produces numbers (N), and the other letters (L). If the resultSelector
function just joins the number and letter together as a pair, this would be the result (R):
N---1---2---3---
L--a------bc----
R---1---2-223---
a a bcc
If we slowly walk through the above marble diagram, we first see that L
produces the letter 'a'. N
has not produced any value yet so there is nothing to pair, no value is produced for the result (R). Next, N
produces the number '1' so we now have a pair '1a' that is yielded in the result sequence. We then receive the number '2' from N
. The last letter is still 'a' so the next pair is '2a'. The letter 'b' is then produced creating the pair '2b', followed by 'c' giving '2c'. Finally the number 3 is produced and we get the pair '3c'.
This is great in case you need to evaluate some combination of state which needs to be kept up-to-date when the state changes. A simple example would be a monitoring system. Each service is represented by a sequence that returns a Boolean indicating the availability of said service. The monitoring status is green if all services are available; we can achieve this by having the result selector perform a logical AND. Here is an example.
Some readers may have noticed that this method could produce a lot of duplicate values. For example, if the web server goes down the result sequence will yield 'false
'. If the database then goes down, another (unnecessary) 'false
' value will be yielded. This would be an appropriate time to use the DistictUntilChanged extension method. The corrected code would look like the example below.
To provide an even better service, we could provide a default value by prefixing false
to the sequence.
Zip
The Zip extension method is another interesting merge feature. Just like a zipper on clothing or a bag, the Zipmethod brings together two sequences of values as pairs; two by two. Things to note about the Zip function is that the result sequence will complete when the first of the sequences complete, it will error if either of the sequences error and it will only publish once it has a pair of fresh values from each source sequence. So if one of the source sequences publishes values faster than the other sequence, the rate of publishing will be dictated by the slower of the two sequences.
This can be seen in the marble diagram below. Note that the result uses two lines so that we can represent a complex type, i.e. the anonymous type with the properties Left and Right.
nums ----0----1----2|
chars --a--b--c--d--e--f|
result----0----1----2|
a b c|
The actual output of the code:
Note that the nums
sequence only produced three values before completing, while the chars
sequence produced six values. The result sequence thus has three values, as this was the most pairs that could be made.
The first use I saw of Zip was to showcase drag and drop. The example tracked mouse movements from a MouseMove
event that would produce event arguments with its current X,Y coordinates. First, the example turns the event into an observable sequence. Then they cleverly zipped the sequence with a Skip(1)
version of the same sequence. This allows the code to get a delta of the mouse position, i.e. where it is now (sequence.Skip(1)) minus where it was (sequence). It then applied the delta to the control it was dragging.
To visualize the concept, let us look at another marble diagram. Here we have the mouse movement (MM) and the Skip 1 (S1). The numbers represent the index of the mouse movement.
MM --1--2--3--4--5
S1 --2--3--4--5
Zip --1--2--3--4
2 3 4 5
Here is a code sample where we fake out some mouse movements with our own subject.
This is the simple Coord(inate) class we use.
Output:
It is also worth noting that Zip has a second overload that takes an IEnumerable<T> as the second input sequence.
This allows us to zip sequences from both IEnumerable<T> and IObservable<T> paradigms!
And-Then-When
If Zip only taking two sequences as an input is a problem, then you can use a combination of the three And/Then/When methods. These methods are used slightly differently from most of the other Rx methods. Out of these three, And is the only extension method to IObservable<T>. Unlike most Rx operators, it does not return a sequence; instead, it returns the mysterious type Pattern<T1, T2>. The Pattern<T1, T2> type is public (obviously), but all of its properties are internal. The only two (useful) things you can do with a Pattern<T1, T2> are invoking its And or Then methods. The And method called on the Pattern<T1, T2> returns a Pattern<T1, T2, T3>. On that type, you will also find the And and Then methods. The generic Pattern types are there to allow you to chain multiple And methods together, each one extending the generic type parameter list by one. You then bring them all together with the Then method overloads. The Then methods return you a Plan type. Finally, you pass this Plan to the Observable.When method in order to create your sequence.
It may sound very complex, but comparing some code samples should make it easier to understand. It will also allow you to see which style you prefer to use.
To Zip three sequences together, you can either use Zip methods chained together like this:
Or perhaps use the nicer syntax of the And/Then/When:
This can be further reduced, if you prefer, to:
The And/Then/When trio has more overloads that enable you to group an even greater number of sequences. They also allow you to provide more than one 'plan' (the output of the Then method). This gives you the Mergefeature but on the collection of 'plans'. I would suggest playing around with them if this functionality is of interest to you. The verbosity of enumerating all of the combinations of these methods would be of low value. You will get far more value out of using them and discovering for yourself.
As we delve deeper into the depths of what the Rx libraries provide us, we can see more practical usages for it. Composing sequences with Rx allows us to easily make sense of the multiple data sources a problem domain is exposed to. We can concatenate values or sequences together sequentially with StartWith, Concat and Repeat. We can process multiple sequences concurrently with Merge, or process a single sequence at a time with Amb and Switch. Pairing values with CombineLatest, Zip and the And/Then/When operators can simplify otherwise fiddly operations like our drag-and-drop examples and monitoring system status.
'Android > RXAndroid' 카테고리의 다른 글
MissingBackpressureException 설명 (0) | 2017.02.03 |
---|---|
Flowable 를 언제 사용하면 될까? (0) | 2017.02.03 |
RxView 예제 모음 (0) | 2017.01.07 |
Easy SQLite on Android with RxJava (0) | 2016.12.16 |
RXAndroid 기초 설명(영문) (0) | 2016.12.15 |