쓰레드가 이어져서 나오는 걸 직렬화라고 한다.

예를 보자.

public static void main(String[] args) {
    Observable.range(1 , 10)
            .map(i -> intenseCalculation(i))
            .subscribe(i->System.out.println("Received " + i + " "  + LocalTime.now()));
}

public static <T> T intenseCalculation(T value) {
    sleep(ThreadLocalRandom.current().nextInt(3000));
    return value;
}

public static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
..........
Received 1 19:14:42.414
Received 2 19:14:44.454
Received 3 19:14:44.727
Received 4 19:14:47.028
Received 5 19:14:49.174
Received 6 19:14:49.207
Received 7 19:14:51.057
Received 8 19:14:53.875
Received 9 19:14:56.381
Received 10 19:14:57.741

약 15초가 걸렸습니다. 만약 평행하게(병렬화) 처리를 하고 싶은 경우에는 어떻게 처리를 해야 할까?

Observable.range(1,10)
       .flatMap(i -> Observable.just(i)
               .subscribeOn(Schedulers.computation())
               .map(i2 -> intenseCalculation(i2))
       )
       .subscribe(i -> System.out.println("Received " + i + " "
               + LocalTime.now() + " on thread "
               + Thread.currentThread().getName()));

sleep(20000);
.........
Received 1 19:28:11.163 on thread RxComputationThreadPool-1
Received 7 19:28:11.381 on thread RxComputationThreadPool-7
Received 9 19:28:11.534 on thread RxComputationThreadPool-1
Received 6 19:28:11.603 on thread RxComputationThreadPool-6
Received 8 19:28:11.629 on thread RxComputationThreadPool-8
Received 3 19:28:12.214 on thread RxComputationThreadPool-3
Received 4 19:28:12.961 on thread RxComputationThreadPool-4
Received 5 19:28:13.274 on thread RxComputationThreadPool-5
Received 2 19:28:13.374 on thread RxComputationThreadPool-2
Received 10 19:28:14.335 on thread RxComputationThreadPool-2

약 3초 정도 걸렸습니다. 이렇게 내부적으로 사용해서 병렬형태로 하기를 바란다.

+ Recent posts