출처 : https://rongi.github.io/kotlin-blog/rxjava/rx/2017/08/01/error-handling-in-rxjava.html?utm_source=Android+Weekly&utm_campaign=51f4814861-android-weekly-269&utm_medium=email&utm_term=0_4eb677ad19-51f4814861-338106413

Once you start writing RxJava code you realize that some things can be done in different ways and sometimes it’s hard to identify best practices right away. Error handling is one of these things.

So, what is the best way to handle errors in RxJava and what are the options?

Handling errors in onError consumer

Let’s say you have an observable that can produce an exception. How to handle that? First instinct is to handle errors directly in onError consumer.

  userProvider.getUsers().subscribe(
    { users -> onGetUsersSuccess(users) },
    { e -> onGetUsersFail(e) } // Stop the progress, show error message, etc.            
  )

It’s similar to what we used to do with AssyncTasks and looks pretty much like a try-catch block.

There is one big problem with this though. Say there is a programming error inside userProvider.getUsers() observable that leads to NullPointerException or something like this. It’ll be super convenient here to crash right away so we can detect and fix the problem on the spot. But we’ll see no crash, the error will be handled as an expected one: an error message will be shown, or in some other graceful way.

Even worse is that there wouldn’t be any crash in the tests. The tests will just fail with mysterious unexpected behavior. You’ll have to spend time on debugging instead of seeing the reason right away in a nice call stack.

Expected and unexpected exceptions

Just to be clear let me explain what do I meant here by expected and unexpected exceptions.

Expected exceptions are those that are expected to happen in a bug-free program. Examples here are various kinds of IO exceptions, like no network exception, etc. Your software is supposed to react on these exceptions gracefully, showing error messages, etc. Expected exceptions are like second valid return value, they are part of method’s signature.

Unexpected exceptions are mostly programming errors. They can and will happen during development, but they should never happen in the finished product. At least it’s a goal. But if they do happen, usually it’s a good idea just to crash the app right away. This helps to raise attention to the problem quickly and fix it as soon as possible.

In Java expected exceptions are mostly implemented using checked exceptions (subclassed directly from Exception class). The majority of unexpected ones are implemented with unchecked exceptions and derived from RuntimeException.

Crashing on RuntimeExceptions

So, if we want to crash why don’t just check if the exception is a RuntimeException and rethrow it inside onError consumer? And if it’s not just handle it like we did it in the previous example?

  userProvider.getUsers().subscribe(
    { users -> onGetUsersSuccess(users) },
    { e ->
      if (e is RuntimeException) {
        throw e
      } else {
        onGetUsersFail(e)
      }
    }
  )

This one may look nice, but it has a couple of flaws:

  1. In RxJava 2 this will crash in the live app but not in the tests. Which can be extremely confusing. In RxJava 1 though it will crash both in the tests and in the application.
  2. There are more unchecked exceptions besides RuntimeException that we want to crash on. This includes Error, etc. It’s hard to track all exceptions of this kind.

But the main flaw is this:

During application development your Rx chains will become more and more complex. Also your observables will be reused in different places, in the contexts you never expected them to be used in.

Imagine you’ve decided to use userProvider.getUsers() observable in this chain:

Observable.concat(userProvider.getUsers(), userProvider.getUsers())
  .onErrorResumeNext(just(emptyList()))
  .subscribe { println(it) }

What will happen if both userProvider.getUsers() observables emit an error?

Now, you may think that both errors will be mapped to an empty list and so two empty lists will be emitted. You may be surprised to see that actually only one list is emitted. This is because error occurred in the first userProvider.getUsers() will terminate the whole chain upstream and second parameter of concat will never be executed.

You see, errors in RxJava are pretty destructive. They are designed as fatal signals that stop the whole chain upstream. They aren’t supposed to be part of interface of your observable. They perform as unexpected errors.

Observables designed to emit errors as a valid output have limited scope of possible use. It’s not obvious how complex chains will work in case of error, so it’s very easy to misuse this kind of observables. And this will result in bugs. Very nasty kind of bugs, those that are reproducible only occasionally (on exceptional conditions, like lack of network) and don’t leave stack traces.

Result class

So, how to design observables that return expected errors? Just make them return some kind of Resultclass, which will contain either result of the operation or an exception. Something like this:

data class Result<out T>(
  val data: T?,
  val error: Throwable?
)

Wrap all expected exceptions into this and let all unexpected ones fall through and crash the app. Avoid using onError consumers, let RxJava do the crashing for you.

Now, while this approach doesn’t looks particularly elegant or intuitive and produces quite a bit of boilerplate, I’ve found that it causes the least amount of problems. Also, it looks like this is an “official” way to do error handling in RxJava. I saw it recommended by RxJava maintainers in multiple discussions across Internet.

Some useful code snippets

To make your Retrofit observables return Result you can use this handy extension function:

fun <T> Observable<T>.retrofitResponseToResult(): Observable<Result<T>> {
  return this.map { it.asResult() }
    .onErrorReturn {
      if (it is HttpException || it is IOException) {
        return@onErrorReturn it.asErrorResult<T>()
      } else {
        throw it
      }
    }
}

fun <T> T.asResult(): Result<T> {
  return Result(data = this, error = null)
}

fun <T> Throwable.asErrorResult(): Result<T> {
  return Result(data = null, error = this)
}

Then your userProvider.getUsers() observable can look like this:

class UserProvider {
  fun getUsers(): Observable<Result<List<String>>> {
    return myRetrofitApi.getUsers()
      .retrofitResponseToResult()
  }
}


'Android > RXAndroid' 카테고리의 다른 글

RxAndroid 예제  (0) 2017.02.24
RxAndroid 버튼 다중터치 방지  (0) 2017.02.24
Rxjava backpresure 설명 링크  (0) 2017.02.24
Easy SQLite on Android with RxJava  (0) 2017.02.21
Rx 예제모음  (0) 2017.02.21
  • 출처 : http://www.cnblogs.com/zhaoyanjun/p/5535651.html

  • Button 防抖处理

     button = (Button) findViewById( R.id.bt ) ;
    
         RxView.clicks( button )
                 .throttleFirst( 2 , TimeUnit.SECONDS )   //两秒钟之内只取一个点击事件,防抖操作
                 .subscribe(new Action1<Void>() {
                     @Override
                     public void call(Void aVoid) {
                         Toast.makeText(MainActivity.this, "点击了", Toast.LENGTH_SHORT).show();
                     }
                 }) ;
    
  • 按钮的长按时间监听

     button = (Button) findViewById( R.id.bt ) ;
    
     //监听长按时间
     RxView.longClicks( button)
          .subscribe(new Action1<Void>() {
              @Override
             public void call(Void aVoid) {
             Toast.makeText(MainActivity.this, "long click  !!", Toast.LENGTH_SHORT).show();
             }
         }) ;
    
  • listView 的点击事件、长按事件处理

    listView = (ListView) findViewById( R.id.listview );
    
     List<String> list = new ArrayList<>() ;
         for ( int i = 0 ; i < 40 ; i++ ){
             list.add( "sss" + i ) ;
         }
    
     final ArrayAdapter<String> adapter = new ArrayAdapter<String>(this, android.R.layout.simple_expandable_list_item_1 );
         adapter.addAll( list );
    
     listView.setAdapter( adapter );
    
     //item click event
     RxAdapterView.itemClicks( listView )
         .subscribe(new Action1<Integer>() {
             @Override
             public void call(Integer integer) {
             Toast.makeText(ListActivity.this, "item click " + integer , Toast.LENGTH_SHORT).show();
                 }
             }) ;
    
     //item long click
     RxAdapterView.itemLongClicks( listView)
         .subscribe(new Action1<Integer>() {
             @Override
             public void call(Integer integer) {
                 Toast.makeText(ListActivity.this, "item long click " + integer , Toast.LENGTH_SHORT).show();
                 }
             }) ;
    
- 用户登录界面,勾选同意隐私协议,登录按钮就变高亮
button = (Button) findViewById( R.id.login_bt );
checkBox = (CheckBox) findViewById( R.id.checkbox );

RxCompoundButton.checkedChanges( checkBox )
    .subscribe(new Action1<Boolean>() {
        @Override
        public void call(Boolean aBoolean) {
            button.setEnabled( aBoolean );
            button.setBackgroundResource( aBoolean ? R.color.button_yes : R.color.button_no );
            }
        }) ;
```

效果图

  • 搜索的时候,关键词联想功能 。debounce()在一定的时间内没有操作就会发送事件。

     editText = (EditText) findViewById( R.id.editText );
     listView = (ListView) findViewById( R.id.listview );
    
     final ArrayAdapter<String> adapter = new ArrayAdapter<String>(this, android.R.layout.simple_expandable_list_item_1 );
     listView.setAdapter( adapter );
    
     RxTextView.textChanges( editText )
                 .debounce( 600 , TimeUnit.MILLISECONDS )
                 .map(new Func1<CharSequence, String>() {
                     @Override
                     public String call(CharSequence charSequence) {
                         //get the keyword
                         String key = charSequence.toString() ;
                         return key ;
                     }
                 })
                 .observeOn( Schedulers.io() )
                 .map(new Func1<String, List<String>>() {
                     @Override
                     public List<String> call(String keyWord ) {
                         //get list
                         List<String> dataList = new ArrayList<String>() ;
                         if ( ! TextUtils.isEmpty( keyWord )){
                             for ( String s : getData()  ) {
                                 if (s != null) {
                                     if (s.contains(keyWord)) {
                                         dataList.add(s);
                                     }
                                 }
                             }
                         }
                         return dataList ;
                     }
                 })
                 .observeOn( AndroidSchedulers.mainThread() )
                 .subscribe(new Action1<List<String>>() {
                     @Override
                     public void call(List<String> strings) {
                         adapter.clear();
                         adapter.addAll( strings );
                         adapter.notifyDataSetChanged();
                     }
                 }) ;
    

    运行效果

总结

'Android > RXAndroid' 카테고리의 다른 글

[펌] Rx에서 에러 핸들링 방법  (0) 2017.08.08
RxAndroid 버튼 다중터치 방지  (0) 2017.02.24
Rxjava backpresure 설명 링크  (0) 2017.02.24
Easy SQLite on Android with RxJava  (0) 2017.02.21
Rx 예제모음  (0) 2017.02.21

Rx CheetSheet] 다중 터치 방지하기 - throttleFirst()



'Android > RXAndroid' 카테고리의 다른 글

[펌] Rx에서 에러 핸들링 방법  (0) 2017.08.08
RxAndroid 예제  (0) 2017.02.24
Rxjava backpresure 설명 링크  (0) 2017.02.24
Easy SQLite on Android with RxJava  (0) 2017.02.21
Rx 예제모음  (0) 2017.02.21

'Android > RXAndroid' 카테고리의 다른 글

RxAndroid 예제  (0) 2017.02.24
RxAndroid 버튼 다중터치 방지  (0) 2017.02.24
Easy SQLite on Android with RxJava  (0) 2017.02.21
Rx 예제모음  (0) 2017.02.21
Rx에서의 멀티캐스팅 방법  (0) 2017.02.09

출처 : http://beust.com/weblog/2015/06/01/easy-sqlite-on-android-with-rxjava/

Easy SQLite on Android with RxJava

Whenever I consider using an ORM library on my Android projects, I always end up abandoning the idea and rolling my own layer instead for a few reasons:

  • My database models have never reached the level of complexity that ORM’s help with.
  • Every ounce of performance counts on Android and I can’t help but fear that the SQL generated will not be as optimized as it should be.

Recently, I started using a pretty simple design pattern that uses Rx to offer what I think is a fairly simple way of managing your database access with RxJava. I’m calling this the “Async Rx Read” design pattern, which is a horrible name but the best I can think of right now.

Easy reads

One of the important design principles on Android is to never perform I/O on the main thread, and this obviously applies to database access. RxJava turns out to be a great fit for this problem.

I usually create one Java class per table and these tables are then managed by my SQLiteOpenHelper. With this new approach, I decided to extend my use of the helper and make it the only point of access to anything that needs to read or write to my SQL tables.

Let’s consider a simple example: a USERS table managed by the UserTable class:

// UserTable.java
List<User> getUsers(SQLiteDatabase db, String userId) {
  // select * from users where _id = {userId}
}

The problem with this method is that if you’re not careful, you will call it on the main thread, so it’s up to the caller to make sure they are always invoking this method on a background thread (and then to post their UI update back on the main thread, if they are updating the UI). Instead of relying on managing yet another thread pool or, worse, using AsyncTask, we are going to rely on RxJava to take care of the threading model for us.

Let’s rewrite this method to return a callable instead:

// UserTable.java
Callable<List<User>> getUsers(SQLiteDatabase db, String userId) {
  return new Callable<List<User>>() {
    @Override
    public List<User> call() {
      // select * from users where _id is userId
    }
  }
}

In effect, we simply refactored our method to return a lazy result, which makes it possible for the database helper to turn this result into an Observable:

// MySqliteOpenHelper.java
Observable<List<User>> getUsers(String userId) {
  return makeObservable(mUserTable.getUsers(getReadableDatabase(), userId))
    .subscribeOn(Schedulers.computation()) // note: do not use Schedulers.io()
}

Notice that on top of turning the lazy result into an Observable, the helper forces the subscription to happen on a background thread (the computation scheduler here; do not use Schedulers.io() because it’s backed by an unbounded executor). This guarantees that callers don’t have to worry about ever blocking the main thread.

Finally, the makeObservable method is pretty straightforward (and completely generic):

// MySqliteOpenHelper.java
private static <T> Observable<T> makeObservable(final Callable<T> func) {
  return Observable.create(
      new Observable.OnSubscribe<T>() {
          @Override
          public void call(Subscriber<? super T> subscriber) {
            try {
              subscriber.onNext(func.call());
            } catch(Exception ex) {
              Log.e(TAG, "Error reading from the database", ex);
            }
          }
    });
}

At this point, all our database reads have become observables that guarantee that the queries run on a background thread. Accessing the database is now pretty standard Rx code:

// DisplayUsersFragment.java
@Inject
MySqliteOpenHelper mDbHelper;
 
// ...
 
mDbHelper.getUsers(userId)
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Action1<List<User>>()) {
    @Override
    public void onNext(List<User> users) {
      // Update our UI with the users
    }
  }
}

And if you don’t need to update your UI with the results, just observe on a background thread.

Since your database layer is now returning observables, it’s trivial to compose and transform these results as they come in. For example, you might decide that your ContactTable is a low layer class that should not know anything about your model (the User class) and that instead, it should only return low level objects (maybe a Cursor or ContentValues). Then you can use use Rx to map these low level values into your model classes for an even cleaner separation of layers.

Two additional remarks:

  1. Your Table Java classes should contain no public methods: only package protected methods (which are accessed exclusively by your Helper, located in the same package) and private methods. No other classes should ever access these Table classes directly.

  2. This approach is extremely compatible with dependency injection: it’s trivial to have both your database helper and your individual tables injected (additional bonus: with Dagger 2, your tables can have their own component since the database helper is the only refence needed to instantiate them).

This is a very simple design pattern that has scaled remarkably well for our projects while fully enabling the power of RxJava. I also started extending this layer to provide a flexible update notification mechanism for list view adapters (not unlike what SQLBrite offers), but this will be for a future post.

This is still a work in progress, so feedback welcome!


'Android > RXAndroid' 카테고리의 다른 글

RxAndroid 버튼 다중터치 방지  (0) 2017.02.24
Rxjava backpresure 설명 링크  (0) 2017.02.24
Rx 예제모음  (0) 2017.02.21
Rx에서의 멀티캐스팅 방법  (0) 2017.02.09
MissingBackpressureException 설명  (0) 2017.02.03

'Android > RXAndroid' 카테고리의 다른 글

Rxjava backpresure 설명 링크  (0) 2017.02.24
Easy SQLite on Android with RxJava  (0) 2017.02.21
Rx에서의 멀티캐스팅 방법  (0) 2017.02.09
MissingBackpressureException 설명  (0) 2017.02.03
Flowable 를 언제 사용하면 될까?  (0) 2017.02.03

출처 : http://blog.danlew.net/2016/06/13/multicasting-in-rxjava/


멀티 캐스팅은 RxJava에서 중복 된 작업을 줄이기위한 핵심 방법입니다.

당신이 이벤트를 멀티 캐스트하면 보내 같은 이벤트를 모두 다운 스트림 사업자 / 가입자. 이 기능은 네트워크 요청과 같이 값 비싼 작업을 수행 할 때 유용합니다. 각 가입자마다 똑같은 네트워크 요청을 반복적으로 실행하고 싶지는 않습니다. 결과를 멀티 캐스팅하기 만하면됩니다.

멀티 캐스트에는 두 가지 방법이 있습니다.

  1. 를 사용 ConnectableObservable을 통해 ( publish()또는replay()1 )
  2. 사용 Subject

ConnectableObservable또는 전에 수행 된 Subject작업은 한 번만 발생합니다. 그런 다음 해당 작업은 모든 다운 스트림에 멀티 캐스트 Subscribers됩니다.

: 당신이 인식해야 여기에 미묘한 지점이 스트림 만의 시점에서 멀티 캐스트 ConnectableObservable또는Subject . 그 결과, 멀티 캐스트 후에 수행하는 작업에 따라 복제된다 Subscriber.

이것이 어떻게 작동하는지 예를 들어 보겠습니다.

Observable<String> observable = Observable.just("Event")  
    .publish()
    .autoConnect(2)
    .map(s -> {
      System.out.println("Expensive operation for " + s);
      return s;
    });

observable.subscribe(s -> System.out.println("Sub1 got: " + s));  
observable.subscribe(s -> System.out.println("Sub2 got: " + s));

// Output:
// Expensive operation for Event
// Sub1 got: Event
// Expensive operation for Event
// Sub2 got: Event

여기서 우리는이 ConnectableObservable, 고가의 map()운영 및 두 Subscribers. 놀라운 결과는 고가 인 것이 map()작업 실행  우리가 것을 방지하려하더라도 publish()!

이 차트는 상황을 더 명확하게 만듭니다.

게시가 작동하지 않는 이유를 보여주는 차트

당신이 실제로 원하는 경우 map()한 번 일이, 당신은 그것을 넣을 필요가 전에publish()전화 :

Observable<String> observable = Observable.just("Event")  
    .map(s -> {
      System.out.println("Expensive operation for " + s);
      return s;
    })
    .publish()
    .autoConnect(2);

observable.subscribe(s -> System.out.println("Sub1 got: " + s));  
observable.subscribe(s -> System.out.println("Sub2 got: " + s));

// Output:
// Expensive operation for Event
// Sub1 received: Event
// Sub2 received: Event

업데이트 된 차트는 다음과 같습니다.

올바른 게시 사용법이있는 차트

여기에 무슨 교훈이 있니? 작업량을 줄이기 위해 멀티 캐스팅에 의존한다면 올바른 시점에서 멀티 캐스팅해야합니다.

좋든 나쁘 든 들어, 많은 사람들이 사용하고 있습니다 Subjects. 하나의 장점은 멀티 캐스트하지만 당신은 그들이 것을 기억해야한다는 것입니다  그들이 방출 지점에서 멀티 캐스트. 당신이 비싼 사업자의 무리가 하류의 적용이 있다면 Subject당신은 다른 추가하는 것을 고려한다 publish()하류 어딘가에.


1 share() 및cache()도 옵션입니다,하지만 그들은 기본적으로 그냥 주위에 바로 가기ConnectableObservableshare()그냥publish().refCount()cache()사용하여 재 작성 할 수 있습니다replay().autoConnect().

'Android > RXAndroid' 카테고리의 다른 글

Easy SQLite on Android with RxJava  (0) 2017.02.21
Rx 예제모음  (0) 2017.02.21
MissingBackpressureException 설명  (0) 2017.02.03
Flowable 를 언제 사용하면 될까?  (0) 2017.02.03
RxJava 예제  (0) 2017.01.31

출처 : http://kunny.github.io/community/2016/02/08/gdg_korea_android_weekly_02_1/

MissingBackpressureException

MissingBackpressureException은 Observable에서 항목(item)을 보내는(emit) 속도보다 처리하는 속도가 느릴 때 발생합니다.

RxJava 가이드 문서 내 Backpressure 항목에도 이와 관련된 항목이 기술되어 있는데, 문서에서 예로 든 사례(zip 연산자를 사용하는 경우)를 사용하지 않는 경우에도 상당히 높은 확률로 경험할 수 있습니다.

RxAndroid를 사용하는 경우, 수신된 데이터를 UI에 표시하기 위해 observeOn(AndroidSchedulers.mainThread()) 를 많이 사용합니다. 그런데, 이 과정에서 UI 쓰레드 내에서 다른 작업이 수행되고 있어 Observable에서 보낸 항목을 바로 처리하지 못할 경우 MissingBackpressureException이 발생합니다. 따라서, 항상 발생하지 않고 사용자 환경에 따라 발생 여부가 달라질 수 있는 부분이라 개발 도중에는 발견하지 못할 가능성도 매우 높습니다.

위 오류를 방지하려면 Backpressure가 필요할 때 처리할 방법을 다음과 같이 지정해 주면 됩니다.

  • onBackpressureBuffer(): Observable에서 보낸 항목을 큐에 계속 쌓아두어, 항목을 처리하는 쪽에서 해당 항목을 나중에 처리할 수 있도록 합니다. 항목이 무한정 쌓이는 것을 방지하기 위해 onBackpressureBuffer(int)를 사용하여 큐에 쌓아둘 수 있는 항목 수를 제한할 수도 있습니다.
  • onBackpressureDrop(): Observable에서 항목을 보냈을 때 바로 처리되지 못한 데이터는 무시합니다. 서버나 데이터베이서에서 받은 데이터를 처리할 때 이 방법을 사용하면 실제로 받은 데이터와 UI에 표시되는 데이터 간 차이가 발생할 수 있으니 신중하게 사용해야 합니다.

subscribeOn / observeOn

작업이 수행될 쓰레드를 지정하기 위해 subscribeOn()과 observeOn() 메서드를 사용하는데, 역할은 비슷하지만 각각 적용되는 효과가 조금 다릅니다.

  • subscribeOn: 작업을 시작하는 쓰레드를 지정합니다. 체인의 어느 곳에 사용해도 가장 마지막에서 지정해 준 것만 적용됩니다.
  • observeOn: 바로 다음 번에 실행되는 연산이 실행되는 쓰레드를 지정합니다. 이를 사용하여 하나의 체인 내에서 실행되는 여러 연산들이 수행될 쓰레드를 각각 지정할 수 있습니다. 안드로이드에서 사용할 경우, 일반적으로 subscribe() 직전에 observeOn(AndroidSchedulers.mainThread())를 사용하여 onNext() 메서드가 UI 쓰레드에서 실행되도록 구성합니다.

공식 문서 내 Scheduler 항목에 포함되어 있는 아래 그림을 보시면 좀 더 이해가 쉽습니다.

Schedulers

flatMap / concatMap

두 연산자는 Observable 에서 받은 항목을 다른 형태로 변형해 준다는 점은 동일하지만, 세부 동작이 하나 다릅니다. 빠른 이해를 위해 공식 문서 내 포함되어 있는 각 연산자별 설명 이미지를 보겠습니다.

[flatMap]flatMap

flatMap은 아밴트 스트림에서 발생한 항목들을 처리할 때 merge 연산자를 사용합니다. 따라서 이벤트 스트림에서 항목이 인입됨과 동시에 결과를 출력합니다.

이러한 특성 때문에, 인입되는 항목 순서와 출력되는 순서가 달라질 수 있습니다. 위의 예 에서도 초록색과 파란색 항목의 순서가 섞여 출력되는 것을 확인할 수 있습니다.

[concatMap]concatMap

concatMap은 이벤트 스트림에서 발생한 항목들을 처리할 때 concat 연산자를 사용합니다. 때문에 flatMap과는 달리 이벤트 스트림을 통해 전달된 항목 하나에 대한 처리가 완료된 후에 다음 항목을 처리합니다. 초록색 항목과 파란색 항목의 순서도 뒤바뀌지 않고 그대로 출력되는 것을 확인할 수 있습니다.

flatMap은 저도 상당히 많이 쓰는 연산자 중 하나인데, 위와 같은 차이점이 있었는지 이제야 알게 되었습니다. 순서가 섞이면 안 되는 연산에는 필히 concatMap을 사용해야겠습니다.

위와 관련하여 잘 정리된 블로그 포스트가 있으니, 추가 정보가 필요하신 분은 참고하세요.

RxJava Observable transformation: concatMap() vs flatMap() Link

RxMarbles

RxMarbles

RxMarbles 에선 Rx에서 사용하는 연산자가 어떤 방식으로 처리되는지 인터랙티브 형태로 확인할 수 있습니다.

이벤트 스트림에서 발생한 각각의 이벤트들을 직접 조작할 수 있어, 그림만으로는 이해가기 어려웠던 연산자들의 동작 방식을 좀 더 잘 이해할 수 있습니다. 이해가 잘 가지 않는 연산자가 있었다면, 이 기회에 확실하게 이해해 보는 것은 어떨까요?

Frodo

Frodo

Frodo는 RxJava의 디버깅을 도와주는 도구입니다. onSubscribe()onNext()onError()onCompleted() 등 각 절차가 어떤 순서로 발생하는지, onNext()에서 발생한 데이터가 어떻게 되는지 쉽게 확인할 수 있습니다.

RxJava를 처음 배울 때 이 도구를 알았더라면 삽질하는 시간을 좀 더 줄일 수 있었을텐데, 참 아쉽습니다. 처음 접하는 사용자부터 어느 정도 익숙해진 사용자까지, RxJava를 사용하는 개발자에게 강력 추천해 주고 싶은 도구입니다.

Rx와 관련된 더 많은 읽을거리

Going Reactive, And Android Architectural Journey

RxAndroid의 창시자 Matthias Käppler의 발표 영상으로, Rx를 사용하여 안드로이드 애플리케이션을 개발한 경험에 대해 소개하고 있습니다.

Realm Korea 블로그에 발표 내용을 한글로 번역한 포스트가 올라와 있으니 함께 보시면 더 도움이 될 것입니다.

Android Application Architecture (번역)

RxJava를 사용하여 MVP 기반 구조를 설계하는 과정을 소개한 원문 포스트를 번역한 글입니다. RxJava를 실제로 사용하는 방법과, 이를 효율적으로 사용하기 위한 설계 팁을 함께 얻을 수 있겠네요.

기본적인 액티비티와 AsyncTask 에서 시작하여 RxJava 로 만든 MVP 기반 구조까지의 여정. Link

Android Studio

Android Studio 2.0 Beta 2가 공개되었습니다. Beta 1이 공개된 직후 큰 버그가 있어, 얼마 지나지 않아 바로 Beta 2가 나오는 우여곡절을 겪기도 했습니다.

Android Studio 2.0 Beta Available in Canary Channel - Android Tools Project Site Link

주요 변경 사항은 다음과 같습니다.

  • 성능 개선
  • Incremental Java complication과 Annotation processor를 사용하는 라이브러리 간 충돌 방지
  • Instant Run 관련 오류 수정
  • 개선된 에뮬레이터
  • 더 빨라진 ADB: 애플리케이션 설치 및 파일 전송 속도 최대 5배 향상

자세한 내용은 블로그 포스트를 참조하세요.



'Android > RXAndroid' 카테고리의 다른 글

Rx 예제모음  (0) 2017.02.21
Rx에서의 멀티캐스팅 방법  (0) 2017.02.09
Flowable 를 언제 사용하면 될까?  (0) 2017.02.03
RxJava 예제  (0) 2017.01.31
RxView 예제 모음  (0) 2017.01.07

you may use Observable for example:


handling GUI events

working with short sequences (less than 1000 elements total)


=========================================================


You may use Flowable for example:


cold and non-timed sources

generator like sources

network and database accessors

'Android > RXAndroid' 카테고리의 다른 글

Rx에서의 멀티캐스팅 방법  (0) 2017.02.09
MissingBackpressureException 설명  (0) 2017.02.03
RxJava 예제  (0) 2017.01.31
RxView 예제 모음  (0) 2017.01.07
Easy SQLite on Android with RxJava  (0) 2016.12.16

출처 : http://www.introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html

Combining sequences

Data sources are everywhere, and sometimes we need to consume data from more than just a single source. Common examples that have many inputs include: multi touch surfaces, news feeds, price feeds, social media aggregators, file watchers, heart-beating/polling servers, etc. The way we deal with these multiple stimuli is varied too. We may want to consume it all as a deluge of integrated data, or one sequence at a time as sequential data. We could also get it in an orderly fashion, pairing data values from two sources to be processed together, or perhaps just consume the data from the first source that responds to the request.

We have uncovered the benefits of operator composition; now we turn our focus to sequence composition. Earlier on, we briefly looked at operators that work with multiple sequences such as SelectManyTakeUntil/SkipUntilCatch and OnErrorResumeNext. These give us a hint at the potential that sequence composition can deliver. By uncovering the features of sequence composition with Rx, we find yet another layer of game changing functionality. Sequence composition enables you to create complex queries across multiple data sources. This unlocks the possibility to write some very powerful and succinct code.

Now we will build upon the concepts covered in the Advanced Error Handling chapter. There we were able to provide continuations for sequences that failed. We will now examine operators aimed at composing sequences that are still operational instead of sequences that have terminated due to an error.

Sequential concatenation

The first methods we will look at are those that concatenate sequences sequentially. They are very similar to the methods we have seen before for dealing with faulted sequences.

Concat

The Concat extension method is probably the most simple composition method. It simply concatenates two sequences. Once the first sequence completes, the second sequence is subscribed to and its values are passed on through to the result sequence. It behaves just like the Catch extension method, but will concatenate operational sequences when they complete, instead of faulted sequences when they OnError. The simple signature for Concatis as follows.

// Concatenates two observable sequences. Returns an observable sequence that contains the
// elements of the first sequence, followed by those of the second the sequence.
public static IObservable<TSource> Concat<TSource>(
this IObservable<TSource> first,
IObservable<TSource> second)
{
...
}

Usage of Concat is familiar. Just like Catch or OnErrorResumeNext, we pass the continuation sequence to the extension method.

//Generate values 0,1,2
var s1 = Observable.Range(0, 3);
//Generate values 5,6,7,8,9
var s2 = Observable.Range(5, 5);
s1.Concat(s2)
.Subscribe(Console.WriteLine);
s1 --0--1--2-|
s2           -5--6--7--8--|
r  --0--1--2--5--6--7--8--|

If either sequence was to fault so too would the result sequence. In particular, if s1 produced an OnError notification, then s2 would never be used. If you wanted s2 to be used regardless of how s1 terminates, then OnErrorResumeNext would be your best option.

Concat also has two useful overloads. These overloads allow you to pass multiple observable sequences as either a params array or an IEnumerable<IObservable<T>>.

public static IObservable<TSource> Concat<TSource>(
params IObservable<TSource>[] sources)
{...}
public static IObservable<TSource> Concat<TSource>(
this IEnumerable<IObservable<TSource>> sources)
{...}

The ability to pass an IEnumerable<IObservable<T>> means that the multiple sequences can be lazily evaluated. The overload that takes a params array is well-suited to times when we know how many sequences we want to merge at compile time, whereas the IEnumerable<IObservable<T>> overload is a better fit when we do not know this ahead of time.

In the case of the lazily evaluated IEnumerable<IObservable<T>>, the Concat method will take one sequence, subscribe until it is completed and then switch to the next sequence. To help illustrate this, we create a method that returns a sequence of sequences and is sprinkled with logging. It returns three observable sequences each with a single value [1], [2] and [3]. Each sequence returns its value on a timer delay.

public IEnumerable<IObservable<long>> GetSequences()
{
Console.WriteLine("GetSequences() called");
Console.WriteLine("Yield 1st sequence");
yield return Observable.Create<long>(o =>
{
Console.WriteLine("1st subscribed to");
return Observable.Timer(TimeSpan.FromMilliseconds(500))
.Select(i=>1L)
.Subscribe(o);
});
Console.WriteLine("Yield 2nd sequence");
yield return Observable.Create<long>(o =>
{
Console.WriteLine("2nd subscribed to");
return Observable.Timer(TimeSpan.FromMilliseconds(300))
.Select(i=>2L)
.Subscribe(o);
});
Thread.Sleep(1000); //Force a delay
Console.WriteLine("Yield 3rd sequence");
yield return Observable.Create<long>(o =>
{
Console.WriteLine("3rd subscribed to");
return Observable.Timer(TimeSpan.FromMilliseconds(100))
.Select(i=>3L)
.Subscribe(o);
});
Console.WriteLine("GetSequences() complete");
}

When we call our GetSequences method and concatenate the results, we see the following output using our Dumpextension method.

GetSequences().Concat().Dump("Concat");

Output:

GetSequences() called
Yield 1st sequence
1st subscribed to
Concat-->1
Yield 2nd sequence
2nd subscribed to
Concat-->2
Yield 3rd sequence
3rd subscribed to
Concat-->3
GetSequences() complete
Concat completed

Below is a marble diagram of the Concat operator applied to the GetSequences method. 's1', 's2' and 's3' represent sequence 1, 2 and 3. Respectively, 'rs' represents the result sequence.

s1-----1|
s2      ---2|
s3          -3|
rs-----1---2-3|

You should note that the second sequence is only yielded once the first sequence has completed. To prove this, we explicitly put in a 500ms delay on producing a value and completing. Once that happens, the second sequence is then subscribed to. When that sequence completes, then the third sequence is processed in the same fashion.

Repeat

Another simple extension method is Repeat. It allows you to simply repeat a sequence, either a specified or an infinite number of times.

// Repeats the observable sequence indefinitely and sequentially.
public static IObservable<TSource> Repeat<TSource>(
this IObservable<TSource> source)
{...}
//Repeats the observable sequence a specified number of times.
public static IObservable<TSource> Repeat<TSource>(
this IObservable<TSource> source,
int repeatCount)
{...}

If you use the overload that loops indefinitely, then the only way the sequence will stop is if there is an error or the subscription is disposed of. The overload that specifies a repeat count will stop on error, un-subscription, or when it reaches that count. This example shows the sequence [0,1,2] being repeated three times.

var source = Observable.Range(0, 3);
var result = source.Repeat(3);
result.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));

Output:

0
1
2
0
1
2
0
1
2
Completed

StartWith

Another simple concatenation method is the StartWith extension method. It allows you to prefix values to a sequence. The method signature takes a params array of values so it is easy to pass in as many or as few values as you need.

// prefixes a sequence of values to an observable sequence.
public static IObservable<TSource> StartWith<TSource>(
this IObservable<TSource> source,
params TSource[] values)
{
...
}

Using StartWith can give a similar effect to a BehaviorSubject<T> by ensuring a value is provided as soon as a consumer subscribes. It is not the same as a BehaviorSubject however, as it will not cache the last value.

In this example, we prefix the values -3, -2 and -1 to the sequence [0,1,2].

//Generate values 0,1,2
var source = Observable.Range(0, 3);
var result = source.StartWith(-3, -2, -1);
result.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));

Output:

-3
-2
-1
0
1
2
Completed

Concurrent sequences

The next set of methods aims to combine observable sequences that are producing values concurrently. This is an important step in our journey to understanding Rx. For the sake of simplicity, we have avoided introducing concepts related to concurrency until we had a broad understanding of the simple concepts.

Amb

The Amb method was a new concept to me when I started using Rx. It is a non-deterministic function, first introduced by John McCarthy and is an abbreviation of the word Ambiguous. The Rx implementation will return values from the sequence that is first to produce values, and will completely ignore the other sequences. In the examples below I have three sequences that all produce values. The sequences can be represented as the marble diagram below.

s1 -1--1--|
s2 --2--2--|
s3 ---3--3--|
r  -1--1--|

The code to produce the above is as follows.

var s1 = new Subject<int>();
var s2 = new Subject<int>();
var s3 = new Subject<int>();
var result = Observable.Amb(s1, s2, s3);
result.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));
s1.OnNext(1);
s2.OnNext(2);
s3.OnNext(3);
s1.OnNext(1);
s2.OnNext(2);
s3.OnNext(3);
s1.OnCompleted();
s2.OnCompleted();
s3.OnCompleted();

Output:

1
1
Completed

If we comment out the first s1.OnNext(1); then s2 would produce values first and the marble diagram would look like this.

s1 ---1--|
s2 -2--2--|
s3 --3--3--|
r  -2--2--|

The Amb feature can be useful if you have multiple cheap resources that can provide values, but latency is widely variable. For an example, you may have servers replicated around the world. Issuing a query is cheap for both the client to send and for the server to respond, however due to network conditions the latency is not predictable and varies considerably. Using the Amb operator, you can send the same request out to many servers and consume the result of the first that responds.

There are other useful variants of the Amb method. We have used the overload that takes a params array of sequences. You could alternatively use it as an extension method and chain calls until you have included all the target sequences (e.g. s1.Amb(s2).Amb(s3)). Finally, you could pass in an IEnumerable<IObservable<T>>.

// Propagates the observable sequence that reacts first.
public static IObservable<TSource> Amb<TSource>(
this IObservable<TSource> first,
IObservable<TSource> second)
{...}
public static IObservable<TSource> Amb<TSource>(
params IObservable<TSource>[] sources)
{...}
public static IObservable<TSource> Amb<TSource>(
this IEnumerable<IObservable<TSource>> sources)
{...}

Reusing the GetSequences method from the Concat section, we see that the evaluation of the outer (IEnumerable) sequence is eager.

GetSequences().Amb().Dump("Amb");

Output:

GetSequences() called
Yield 1st sequence
Yield 2nd sequence
Yield 3rd sequence
GetSequences() complete
1st subscribed to
2nd subscribed to
3rd subscribed to
Amb-->3
Amb completed

Marble:

s1-----1|
s2---2|
s3-3|
rs-3|

Take note that the inner observable sequences are not subscribed to until the outer sequence has yielded them all. This means that the third sequence is able to return values the fastest even though there are two sequences yielded one second before it (due to the Thread.Sleep).

Merge

The Merge extension method does a primitive combination of multiple concurrent sequences. As values from any sequence are produced, those values become part of the result sequence. All sequences need to be of the same type, as per the previous methods. In this diagram, we can see s1 and s2 producing values concurrently and the values falling through to the result sequence as they occur.

s1 --1--1--1--|
s2 ---2---2---2|
r  --12-1-21--2|

The result of a Merge will complete only once all input sequences complete. By contrast, the Merge operator will error if any of the input sequences terminates erroneously.

//Generate values 0,1,2
var s1 = Observable.Interval(TimeSpan.FromMilliseconds(250))
.Take(3);
//Generate values 100,101,102,103,104
var s2 = Observable.Interval(TimeSpan.FromMilliseconds(150))
.Take(5)
.Select(i => i + 100);
s1.Merge(s2)
.Subscribe(
Console.WriteLine,
()=>Console.WriteLine("Completed"));

The code above could be represented by the marble diagram below. In this case, each unit of time is 50ms. As both sequences produce a value at 750ms, there is a race condition and we cannot be sure which value will be notified first in the result sequence (sR).

s1 ----0----0----0| 
s2 --0--0--0--0--0|
sR --0-00--00-0--00|

Output:

100
0
101
102
1
103
104 //Note this is a race condition. 2 could be
2 // published before 104.

You can chain this overload of the Merge operator to merge multiple sequences. Merge also provides numerous other overloads that allow you to pass more than two source sequences. You can use the static method Observable.Merge which takes a params array of sequences that is known at compile time. You could pass in an IEnumerable of sequences like the Concat method. Merge also has the overload that takes an IObservable<IObservable<T>>, a nested observable. To summarize:

  • Chain Merge operators together e.g. s1.Merge(s2).Merge(s3)
  • Pass a params array of sequences to the Observable.Merge static method. e.g. Observable.Merge(s1,s2,s3)
  • Apply the Merge operator to an IEnumerable<IObservable<T>>.
  • Apply the Merge operator to an IObservable<IObservable<T>>.
/// Merges two observable sequences into a single observable sequence.
/// Returns a sequence that merges the elements of the given sequences.
public static IObservable<TSource> Merge<TSource>(
this IObservable<TSource> first,
IObservable<TSource> second)
{...}
// Merges all the observable sequences into a single observable sequence.
// The observable sequence that merges the elements of the observable sequences.
public static IObservable<TSource> Merge<TSource>(
params IObservable<TSource>[] sources)
{...}
// Merges an enumerable sequence of observable sequences into a single observable sequence.
public static IObservable<TSource> Merge<TSource>(
this IEnumerable<IObservable<TSource>> sources)
{...}
// Merges an observable sequence of observable sequences into an observable sequence.
// Merges all the elements of the inner sequences in to the output sequence.
public static IObservable<TSource> Merge<TSource>(
this IObservable<IObservable<TSource>> sources)
{...}

For merging a known number of sequences, the first two operators are effectively the same thing and which style you use is a matter of taste: either provide them as a params array or chain the operators together. The third and fourth overloads allow to you merge sequences that can be evaluated lazily at run time. The Merge operators that take a sequence of sequences make for an interesting concept. You can either pull or be pushed observable sequences, which will be subscribed to immediately.

If we again reuse the GetSequences method, we can see how the Merge operator works with a sequence of sequences.

GetSequences().Merge().Dump("Merge");

Output:

GetSequences() called
Yield 1st sequence
1st subscribed to
Yield 2nd sequence
2nd subscribed to
Merge-->2
Merge-->1
Yield 3rd sequence
3rd subscribed to
GetSequences() complete
Merge-->3
Merge completed

As we can see from the marble diagram, s1 and s2 are yielded and subscribed to immediately. s3 is not yielded for one second and then is subscribed to. Once all input sequences have completed, the result sequence completes.

s1-----1|
s2---2|
s3          -3|
rs---2-1-----3|

Switch

Receiving all values from a nested observable sequence is not always what you need. In some scenarios, instead of receiving everything, you may only want the values from the most recent inner sequence. A great example of this is live searches. As you type, the text is sent to a search service and the results are returned to you as an observable sequence. Most implementations have a slight delay before sending the request so that unnecessary work does not happen. Imagine I want to search for "Intro to Rx". I quickly type in "Into to" and realize I have missed the letter 'r'. I stop briefly and change the text to "Intro ". By now, two searches have been sent to the server. The first search will return results that I do not want. Furthermore, if I were to receive data for the first search merged together with results for the second search, it would be a very odd experience for the user. This scenario fits perfectly with the Switch method.

In this example, there is a source that represents a sequence of search text. Values the user types are represented as the source sequence. Using Select, we pass the value of the search to a function that takes a string and returns an IObservable<string>. This creates our resulting nested sequence, IObservable<IObservable<string>>.

Search function signature:

private IObservable<string> SearchResults(string query)
{
...
}

Using Merge with overlapping search:

IObservable<string> searchValues = ....;
IObservable<IObservable<string>> search = searchValues
.Select(searchText=>SearchResults(searchText));
var subscription = search
.Merge()
.Subscribe(
Console.WriteLine);

If we were lucky and each search completed before the next element from searchValues was produced, the output would look sensible. It is much more likely, however that multiple searches will result in overlapped search results. This marble diagram shows what the Merge function could do in such a situation.

  • SV is the searchValues sequence
  • S1 is the search result sequence for the first value in searchValues/SV
  • S2 is the search result sequence for the second value in searchValues/SV
  • S3 is the search result sequence for the third value in searchValues/SV
  • RM is the result sequence for the merged (Result Merge) sequences
SV--1---2---3---|
S1  -1--1--1--1|
S2      --2-2--2--2|
S3          -3--3|
RM---1--1-2123123-2|

Note how the values from the search results are all mixed together. This is not what we want. If we use the Switchextension method we will get much better results. Switch will subscribe to the outer sequence and as each inner sequence is yielded it will subscribe to the new inner sequence and dispose of the subscription to the previous inner sequence. This will result in the following marble diagram where RS is the result sequence for the Switch (Result Switch) sequences

SV--1---2---3---|
S1  -1--1--1--1|
S2      --2-2--2--2|
S3          -3--3|
RS --1--1-2-23--3|

Also note that, even though the results from S1 and S2 are still being pushed, they are ignored as their subscription has been disposed of. This eliminates the issue of overlapping values from the nested sequences.

Pairing sequences

The previous methods allowed us to flatten multiple sequences sharing a common type into a result sequence of the same type. These next sets of methods still take multiple sequences as an input, but attempt to pair values from each sequence to produce a single value for the output sequence. In some cases, they also allow you to provide sequences of different types.

CombineLatest

The CombineLatest extension method allows you to take the most recent value from two sequences, and with a given function transform those into a value for the result sequence. Each input sequence has the last value cached like Replay(1). Once both sequences have produced at least one value, the latest output from each sequence is passed to the resultSelector function every time either sequence produces a value. The signature is as follows.

// Composes two observable sequences into one observable sequence by using the selector
// function whenever one of the observable sequences produces an element.
public static IObservable<TResult> CombineLatest<TFirst, TSecond, TResult>(
this IObservable<TFirst> first,
IObservable<TSecond> second,
Func<TFirst, TSecond, TResult> resultSelector)
{...}

The marble diagram below shows off usage of CombineLatest with one sequence that produces numbers (N), and the other letters (L). If the resultSelector function just joins the number and letter together as a pair, this would be the result (R):

N---1---2---3---
L--a------bc----
R---1---2-223---
    a   a bcc   

If we slowly walk through the above marble diagram, we first see that L produces the letter 'a'. N has not produced any value yet so there is nothing to pair, no value is produced for the result (R). Next, N produces the number '1' so we now have a pair '1a' that is yielded in the result sequence. We then receive the number '2' from N. The last letter is still 'a' so the next pair is '2a'. The letter 'b' is then produced creating the pair '2b', followed by 'c' giving '2c'. Finally the number 3 is produced and we get the pair '3c'.

This is great in case you need to evaluate some combination of state which needs to be kept up-to-date when the state changes. A simple example would be a monitoring system. Each service is represented by a sequence that returns a Boolean indicating the availability of said service. The monitoring status is green if all services are available; we can achieve this by having the result selector perform a logical AND. Here is an example.

IObservable<bool> webServerStatus = GetWebStatus();
IObservable<bool> databaseStatus = GetDBStatus();
//Yields true when both systems are up.
var systemStatus = webServerStatus
.CombineLatest(
databaseStatus,
(webStatus, dbStatus) => webStatus && dbStatus);

Some readers may have noticed that this method could produce a lot of duplicate values. For example, if the web server goes down the result sequence will yield 'false'. If the database then goes down, another (unnecessary) 'false' value will be yielded. This would be an appropriate time to use the DistictUntilChanged extension method. The corrected code would look like the example below.

//Yields true when both systems are up, and only on change of status
var systemStatus = webServerStatus
.CombineLatest(
databaseStatus,
(webStatus, dbStatus) => webStatus && dbStatus)
.DistinctUntilChanged();

To provide an even better service, we could provide a default value by prefixing false to the sequence.

//Yields true when both systems are up, and only on change of status
var systemStatus = webServerStatus
.CombineLatest(
databaseStatus,
(webStatus, dbStatus) => webStatus && dbStatus)
.DistinctUntilChanged()
.StartWith(false);

Zip

The Zip extension method is another interesting merge feature. Just like a zipper on clothing or a bag, the Zipmethod brings together two sequences of values as pairs; two by two. Things to note about the Zip function is that the result sequence will complete when the first of the sequences complete, it will error if either of the sequences error and it will only publish once it has a pair of fresh values from each source sequence. So if one of the source sequences publishes values faster than the other sequence, the rate of publishing will be dictated by the slower of the two sequences.

//Generate values 0,1,2
var nums = Observable.Interval(TimeSpan.FromMilliseconds(250))
.Take(3);
//Generate values a,b,c,d,e,f
var chars = Observable.Interval(TimeSpan.FromMilliseconds(150))
.Take(6)
.Select(i => Char.ConvertFromUtf32((int)i + 97));
//Zip values together
nums.Zip(chars, (lhs, rhs) => new { Left = lhs, Right = rhs })
.Dump("Zip");

This can be seen in the marble diagram below. Note that the result uses two lines so that we can represent a complex type, i.e. the anonymous type with the properties Left and Right.

nums  ----0----1----2| 
chars --a--b--c--d--e--f| 
result----0----1----2|
          a    b    c|

The actual output of the code:

{ Left = 0, Right = a }
{ Left = 1, Right = b }
{ Left = 2, Right = c }

Note that the nums sequence only produced three values before completing, while the chars sequence produced six values. The result sequence thus has three values, as this was the most pairs that could be made.

The first use I saw of Zip was to showcase drag and drop. The example tracked mouse movements from a MouseMove event that would produce event arguments with its current X,Y coordinates. First, the example turns the event into an observable sequence. Then they cleverly zipped the sequence with a Skip(1) version of the same sequence. This allows the code to get a delta of the mouse position, i.e. where it is now (sequence.Skip(1)) minus where it was (sequence). It then applied the delta to the control it was dragging.

To visualize the concept, let us look at another marble diagram. Here we have the mouse movement (MM) and the Skip 1 (S1). The numbers represent the index of the mouse movement.

MM --1--2--3--4--5
S1    --2--3--4--5
Zip   --1--2--3--4
        2  3  4  5

Here is a code sample where we fake out some mouse movements with our own subject.

var mm = new Subject<Coord>();
var s1 = mm.Skip(1);
var delta = mm.Zip(s1,
(prev, curr) => new Coord
{
X = curr.X - prev.X,
Y = curr.Y - prev.Y
});
delta.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));
mm.OnNext(new Coord { X = 0, Y = 0 });
mm.OnNext(new Coord { X = 1, Y = 0 }); //Move across 1
mm.OnNext(new Coord { X = 3, Y = 2 }); //Diagonally up 2
mm.OnNext(new Coord { X = 0, Y = 0 }); //Back to 0,0
mm.OnCompleted();

This is the simple Coord(inate) class we use.

public class Coord
{
public int X { get; set; }
public int Y { get; set; }
public override string ToString()
{
return string.Format("{0},{1}", X, Y);
}
}

Output:

0,1
2,2
-3,-2
Completed

It is also worth noting that Zip has a second overload that takes an IEnumerable<T> as the second input sequence.

// Merges an observable sequence and an enumerable sequence into one observable sequence
// containing the result of pair-wise combining the elements by using the selector function.
public static IObservable<TResult> Zip<TFirst, TSecond, TResult>(
this IObservable<TFirst> first,
IEnumerable<TSecond> second,
Func<TFirst, TSecond, TResult> resultSelector)
{...}

This allows us to zip sequences from both IEnumerable<T> and IObservable<T> paradigms!

And-Then-When

If Zip only taking two sequences as an input is a problem, then you can use a combination of the three And/Then/When methods. These methods are used slightly differently from most of the other Rx methods. Out of these three, And is the only extension method to IObservable<T>. Unlike most Rx operators, it does not return a sequence; instead, it returns the mysterious type Pattern<T1, T2>. The Pattern<T1, T2> type is public (obviously), but all of its properties are internal. The only two (useful) things you can do with a Pattern<T1, T2> are invoking its And or Then methods. The And method called on the Pattern<T1, T2> returns a Pattern<T1, T2, T3>. On that type, you will also find the And and Then methods. The generic Pattern types are there to allow you to chain multiple And methods together, each one extending the generic type parameter list by one. You then bring them all together with the Then method overloads. The Then methods return you a Plan type. Finally, you pass this Plan to the Observable.When method in order to create your sequence.

It may sound very complex, but comparing some code samples should make it easier to understand. It will also allow you to see which style you prefer to use.

To Zip three sequences together, you can either use Zip methods chained together like this:

var one = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
var two = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(10);
var three = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(14);
//lhs represents 'Left Hand Side'
//rhs represents 'Right Hand Side'
var zippedSequence = one
.Zip(two, (lhs, rhs) => new {One = lhs, Two = rhs})
.Zip(three, (lhs, rhs) => new {One = lhs.One, Two = lhs.Two, Three = rhs});
zippedSequence.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));

Or perhaps use the nicer syntax of the And/Then/When:

var pattern = one.And(two).And(three);
var plan = pattern.Then((first, second, third)=>new{One=first, Two=second, Three=third});
var zippedSequence = Observable.When(plan);
zippedSequence.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));

This can be further reduced, if you prefer, to:

var zippedSequence = Observable.When(
one.And(two)
.And(three)
.Then((first, second, third) =>
new {
One = first,
Two = second,
Three = third
})
);
zippedSequence.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));

The And/Then/When trio has more overloads that enable you to group an even greater number of sequences. They also allow you to provide more than one 'plan' (the output of the Then method). This gives you the Mergefeature but on the collection of 'plans'. I would suggest playing around with them if this functionality is of interest to you. The verbosity of enumerating all of the combinations of these methods would be of low value. You will get far more value out of using them and discovering for yourself.

As we delve deeper into the depths of what the Rx libraries provide us, we can see more practical usages for it. Composing sequences with Rx allows us to easily make sense of the multiple data sources a problem domain is exposed to. We can concatenate values or sequences together sequentially with StartWithConcat and Repeat. We can process multiple sequences concurrently with Merge, or process a single sequence at a time with Amb and Switch. Pairing values with CombineLatestZip and the And/Then/When operators can simplify otherwise fiddly operations like our drag-and-drop examples and monitoring system status.

'Android > RXAndroid' 카테고리의 다른 글

MissingBackpressureException 설명  (0) 2017.02.03
Flowable 를 언제 사용하면 될까?  (0) 2017.02.03
RxView 예제 모음  (0) 2017.01.07
Easy SQLite on Android with RxJava  (0) 2016.12.16
RXAndroid 기초 설명(영문)  (0) 2016.12.15
  • 출처 : http://www.cnblogs.com/zhaoyanjun/p/5535651.html


  • Button 防抖处理

     button = (Button) findViewById( R.id.bt ) ;
    
         RxView.clicks( button )
                 .throttleFirst( 2 , TimeUnit.SECONDS )   //两秒钟之内只取一个点击事件,防抖操作
                 .subscribe(new Action1<Void>() {
                     @Override
                     public void call(Void aVoid) {
                         Toast.makeText(MainActivity.this, "点击了", Toast.LENGTH_SHORT).show();
                     }
                 }) ;
    
  • 按钮的长按时间监听

     button = (Button) findViewById( R.id.bt ) ;
    
     //监听长按时间
     RxView.longClicks( button)
          .subscribe(new Action1<Void>() {
              @Override
             public void call(Void aVoid) {
             Toast.makeText(MainActivity.this, "long click  !!", Toast.LENGTH_SHORT).show();
             }
         }) ;
    
  • listView 的点击事件、长按事件处理

    listView = (ListView) findViewById( R.id.listview );
    
     List<String> list = new ArrayList<>() ;
         for ( int i = 0 ; i < 40 ; i++ ){
             list.add( "sss" + i ) ;
         }
    
     final ArrayAdapter<String> adapter = new ArrayAdapter<String>(this, android.R.layout.simple_expandable_list_item_1 );
         adapter.addAll( list );
    
     listView.setAdapter( adapter );
    
     //item click event
     RxAdapterView.itemClicks( listView )
         .subscribe(new Action1<Integer>() {
             @Override
             public void call(Integer integer) {
             Toast.makeText(ListActivity.this, "item click " + integer , Toast.LENGTH_SHORT).show();
                 }
             }) ;
    
     //item long click
     RxAdapterView.itemLongClicks( listView)
         .subscribe(new Action1<Integer>() {
             @Override
             public void call(Integer integer) {
                 Toast.makeText(ListActivity.this, "item long click " + integer , Toast.LENGTH_SHORT).show();
                 }
             }) ;
    
- 用户登录界面,勾选同意隐私协议,登录按钮就变高亮
button = (Button) findViewById( R.id.login_bt );
checkBox = (CheckBox) findViewById( R.id.checkbox );

RxCompoundButton.checkedChanges( checkBox )
    .subscribe(new Action1<Boolean>() {
        @Override
        public void call(Boolean aBoolean) {
            button.setEnabled( aBoolean );
            button.setBackgroundResource( aBoolean ? R.color.button_yes : R.color.button_no );
            }
        }) ;
```

效果图

  • 搜索的时候,关键词联想功能 。debounce()在一定的时间内没有操作就会发送事件。

     editText = (EditText) findViewById( R.id.editText );
     listView = (ListView) findViewById( R.id.listview );
    
     final ArrayAdapter<String> adapter = new ArrayAdapter<String>(this, android.R.layout.simple_expandable_list_item_1 );
     listView.setAdapter( adapter );
    
     RxTextView.textChanges( editText )
                 .debounce( 600 , TimeUnit.MILLISECONDS )
                 .map(new Func1<CharSequence, String>() {
                     @Override
                     public String call(CharSequence charSequence) {
                         //get the keyword
                         String key = charSequence.toString() ;
                         return key ;
                     }
                 })
                 .observeOn( Schedulers.io() )
                 .map(new Func1<String, List<String>>() {
                     @Override
                     public List<String> call(String keyWord ) {
                         //get list
                         List<String> dataList = new ArrayList<String>() ;
                         if ( ! TextUtils.isEmpty( keyWord )){
                             for ( String s : getData()  ) {
                                 if (s != null) {
                                     if (s.contains(keyWord)) {
                                         dataList.add(s);
                                     }
                                 }
                             }
                         }
                         return dataList ;
                     }
                 })
                 .observeOn( AndroidSchedulers.mainThread() )
                 .subscribe(new Action1<List<String>>() {
                     @Override
                     public void call(List<String> strings) {
                         adapter.clear();
                         adapter.addAll( strings );
                         adapter.notifyDataSetChanged();
                     }
                 }) ;
    

    运行效果


'Android > RXAndroid' 카테고리의 다른 글

Flowable 를 언제 사용하면 될까?  (0) 2017.02.03
RxJava 예제  (0) 2017.01.31
Easy SQLite on Android with RxJava  (0) 2016.12.16
RXAndroid 기초 설명(영문)  (0) 2016.12.15
RXJava 내용 정리  (0) 2016.12.15

출처 : http://beust.com/weblog/2015/06/01/easy-sqlite-on-android-with-rxjava/

Easy SQLite on Android with RxJava

Whenever I consider using an ORM library on my Android projects, I always end up abandoning the idea and rolling my own layer instead for a few reasons:

  • My database models have never reached the level of complexity that ORM’s help with.
  • Every ounce of performance counts on Android and I can’t help but fear that the SQL generated will not be as optimized as it should be.

Recently, I started using a pretty simple design pattern that uses Rx to offer what I think is a fairly simple way of managing your database access with RxJava. I’m calling this the “Async Rx Read” design pattern, which is a horrible name but the best I can think of right now.

Easy reads

One of the important design principles on Android is to never perform I/O on the main thread, and this obviously applies to database access. RxJava turns out to be a great fit for this problem.

I usually create one Java class per table and these tables are then managed by my SQLiteOpenHelper. With this new approach, I decided to extend my use of the helper and make it the only point of access to anything that needs to read or write to my SQL tables.

Let’s consider a simple example: a USERS table managed by the UserTable class:

// UserTable.java
List<User> getUsers(SQLiteDatabase db, String userId) {
  // select * from users where _id = {userId}
}

The problem with this method is that if you’re not careful, you will call it on the main thread, so it’s up to the caller to make sure they are always invoking this method on a background thread (and then to post their UI update back on the main thread, if they are updating the UI). Instead of relying on managing yet another thread pool or, worse, using AsyncTask, we are going to rely on RxJava to take care of the threading model for us.

Let’s rewrite this method to return a callable instead:

// UserTable.java
Callable<List<User>> getUsers(SQLiteDatabase db, String userId) {
  return new Callable<List<User>>() {
    @Override
    public List<User> call() {
      // select * from users where _id is userId
    }
  }
}

In effect, we simply refactored our method to return a lazy result, which makes it possible for the database helper to turn this result into an Observable:

// MySqliteOpenHelper.java
Observable<List<User>> getUsers(String userId) {
  return makeObservable(mUserTable.getUsers(getReadableDatabase(), userId))
    .subscribeOn(Schedulers.computation()) // note: do not use Schedulers.io()
}

Notice that on top of turning the lazy result into an Observable, the helper forces the subscription to happen on a background thread (the computation scheduler here; do not use Schedulers.io() because it’s backed by an unbounded executor). This guarantees that callers don’t have to worry about ever blocking the main thread.

Finally, the makeObservable method is pretty straightforward (and completely generic):

// MySqliteOpenHelper.java
private static <T> Observable<T> makeObservable(final Callable<T> func) {
  return Observable.create(
      new Observable.OnSubscribe<T>() {
          @Override
          public void call(Subscriber<? super T> subscriber) {
            try {
              subscriber.onNext(func.call());
            } catch(Exception ex) {
              Log.e(TAG, "Error reading from the database", ex);
            }
          }
    });
}

At this point, all our database reads have become observables that guarantee that the queries run on a background thread. Accessing the database is now pretty standard Rx code:

// DisplayUsersFragment.java
@Inject
MySqliteOpenHelper mDbHelper;
 
// ...
 
mDbHelper.getUsers(userId)
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Action1<List<User>>()) {
    @Override
    public void onNext(List<User> users) {
      // Update our UI with the users
    }
  }
}

And if you don’t need to update your UI with the results, just observe on a background thread.

Since your database layer is now returning observables, it’s trivial to compose and transform these results as they come in. For example, you might decide that your ContactTable is a low layer class that should not know anything about your model (the User class) and that instead, it should only return low level objects (maybe a Cursor or ContentValues). Then you can use use Rx to map these low level values into your model classes for an even cleaner separation of layers.

Two additional remarks:

  1. Your Table Java classes should contain no public methods: only package protected methods (which are accessed exclusively by your Helper, located in the same package) and private methods. No other classes should ever access these Table classes directly.

  2. This approach is extremely compatible with dependency injection: it’s trivial to have both your database helper and your individual tables injected (additional bonus: with Dagger 2, your tables can have their own component since the database helper is the only refence needed to instantiate them).

This is a very simple design pattern that has scaled remarkably well for our projects while fully enabling the power of RxJava. I also started extending this layer to provide a flexible update notification mechanism for list view adapters (not unlike what SQLBrite offers), but this will be for a future post.

This is still a work in progress, so feedback welcome!


'Android > RXAndroid' 카테고리의 다른 글

RxJava 예제  (0) 2017.01.31
RxView 예제 모음  (0) 2017.01.07
RXAndroid 기초 설명(영문)  (0) 2016.12.15
RXJava 내용 정리  (0) 2016.12.15
RxJava – PublishSubject, BehaviorSubject, ReplaySubject  (0) 2016.12.15

 

링크 모음


Grokking RxJava, Part 1: The Basics




Grokking RxJava, Part 2: Operator, Operator











'Android > RXAndroid' 카테고리의 다른 글

RxJava 예제  (0) 2017.01.31
RxView 예제 모음  (0) 2017.01.07
Easy SQLite on Android with RxJava  (0) 2016.12.16
RXJava 내용 정리  (0) 2016.12.15
RxJava – PublishSubject, BehaviorSubject, ReplaySubject  (0) 2016.12.15

출처 : http://chuumong.tistory.com/entry/RxJava-%EC%A0%95%EB%A6%AC


RxJava Class

  • Observable : 이벤트를 발생시키는 주체, onNext / onCompleted / onError를 이용하여 이벤트를 발생 시킴
  • Subscriber : 이벤트를 전달받는 객체
  • PublishSubject : 구독한 시점으로 부터(subscribe 호출) 발생되는 이벤트(onNext, onError, onCompleted)를 전달 받음
  • BehaviorSubject : 구독 전 (subscribe 호출 전) 발생된 이벤트가 한 건이라도 있으면 구독 시점에 해당 이벤트(한 건만)를 전달 받음
  • CompositeSubscription : Subscriber를 그룹화 함, add로 Subscriber를 추가하며 clear로 unsubscribe 처리

Observable Method

  • just : 인자로 전달 받은 객체를 그냥 단순하게 subscribe onNext에 전달
  • from : List에 저장된 Item을 하나씩 전달
  • flatMap(Func1) : T를 매개변수로 전달받아 R로 리턴
    T는 ? super T
    R은 ? extends Observable<? extends R>
    인자로 전달 받은 T를 다시 Observable로 포장하여 리턴 처리
    관찰하고 있는 이벤트를 변형
  • concanMap : flatMap과 유사, flatMap은 결과 처리의 순서를 신경쓰지 않는 반면에 concanMap는 이벤트의 처리 결과가 순서대로 출력 (참조)
  • filter : return이 true 일 때 이벤트 발생
  • zip(Observable, Observable, Func2) : Observable 마지막 인자인 Func2로 값을 조합하여 넘김
    각 인자는 Observable<? extends T1>, Observable<? extends T2>, Func2<? super T1,? super T2,? extends R>로 구성이 되어지며 T1과 T2를 조합하여 새로운 R을 생성
  • buffer : 지정된 시간동안 발생한 이벤트를 List로 저장하고 있다가 이벤트를 발생 시킴
  • buffer(Observable boundary) : 발생한 데이터를 저장하고 있다가 boundary에 지정된 시간이 지나면 이벤트를 발생 시킴
  • debounce : 이벤트 발생 종료 후 지정된 시간동안 이벤트가 발생하지 않으면 이벤트를 발생 시킴
  • asObservable : Observable 객체를 전달 받음
  • skip(int) : int로 지정된 개수 만큼의 이벤트를 무시하고 그 이후의 이벤트만 발생시킴
  • combineLatest(Observable<? extends T1>, Observable<? extends T2>, Func2<? super T1, ? super T2, ? extends R>) : 모든 Observable에서 이벤트가 발생되면 Func2로 전달하여 이벤트를 처리하고 새로운 이벤트를 발생 시킴
  • merge : 다수의 Observable를 병합하여 이벤트가 발생 된 순서대로 이벤트를 전달, 한 Observable에서 error가 발생하면 다른 Observable에서 발생된 이벤트는 전달하지 않음
  • timer : 지정된 시간만큼 이벤트 전달을 딜레이 시킴
  • interval : 이벤트를 반복하여 발생 시키지만 지정된 시간동안 딜레이 시킴
  • interval(long, long, TimeUnit) : 최초 첫번째 인자 값 만큼 시간을 딜레이시키고 그 이후에는 2번째 인자에 지정된 시간 만큼 딜레이 시킴
  • take : 인자로 지정된 개수 만큼 이벤트를 발생 시키고 그 이후 이벤트는 발생시키지 않음(onCompleted 발생)
    interval과 같이 사용하면 지정된 시간만큼 해당 이벤트를 반복하여 발생시키지만 take로 지정된 개수만 발생
  • retryWhen : onError 이벤트가 발생하게되면 지정된 시간만큼 딜레이 시킨 뒤 재시도
  • defer : subscribe가 호출 되는 순간 인자로 지정된 Func이 실행되고 리턴 받은 Observable의 이벤트를 전달


'Android > RXAndroid' 카테고리의 다른 글

RxJava 예제  (0) 2017.01.31
RxView 예제 모음  (0) 2017.01.07
Easy SQLite on Android with RxJava  (0) 2016.12.16
RXAndroid 기초 설명(영문)  (0) 2016.12.15
RxJava – PublishSubject, BehaviorSubject, ReplaySubject  (0) 2016.12.15

RxJava – PublishSubject, BehaviorSubject, ReplaySubject


링크 : http://ohlab.kr/w/archives/436

'Android > RXAndroid' 카테고리의 다른 글

RxJava 예제  (0) 2017.01.31
RxView 예제 모음  (0) 2017.01.07
Easy SQLite on Android with RxJava  (0) 2016.12.16
RXAndroid 기초 설명(영문)  (0) 2016.12.15
RXJava 내용 정리  (0) 2016.12.15

+ Recent posts