RxJava 1 포스트 [Android] RxJava 시작하기
RxJava 2 포스트 [Android] RxJava Observable 옵저버블
RxJava 3 포스트 [Android] RxJava Cold Observable, Hot Observable
RxJava 4 포스트 [Android] RxJava Disposable


이번 RxJava 5 포스트에서는 여러 연산자 중에서도 Observable을 생성, 변형하는 연산자에 대해 알아보려고 한다. 영어 독해나 작문을 하기 위해서는 먼저 단어를 알아야 하는 것처럼, 연산자를 알아야 각 상황에 맞는 비동기 처리를 할 수 있을 것이다. 그래서 오늘 포스트는 각 연산자가 의미하는 것을 나열하는 포스트가 되겠다.



Observable을 생성하는 연산자

지난 포스트에서 살펴본 create(), just() 이외에도 Observable을 생성하는 다른 연산자들이 있다.


defer 연산자

defer


defer 연산자는 Observer가 구독할 때까지 Observable의 생성을 지연시키는 역할을 한다. subscribe()를 호출할 때 Observable 아이템을 생성한다.


SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", Locale.KOREA);
Observable<String> justSrc = Observable.just(
        sdf.format(System.currentTimeMillis())
);

Observable<String> deferSrc = Observable.defer(() ->
        Observable.just(sdf.format(System.currentTimeMillis()))
);

System.out.println("현재시각1 : " + sdf.format(System.currentTimeMillis()));

try {
    Thread.sleep(5000);
} catch (Exception e) {
    e.printStackTrace();
}

System.out.println("현재시각2 : " + sdf.format(System.currentTimeMillis()));
justSrc.subscribe(time ->
        System.out.println("justSrc : " + time)
);

deferSrc.subscribe(time ->
        System.out.println("deferSrc : " + time)
);
현재시각1 : 2020-12-05 20:48:46.507
현재시각2 : 2020-12-05 20:48:55.112
justSrc : 2020-12-05 20:48:46.430
deferSrc : 2020-12-05 20:48:55.138


just()와 defer()를 통해 각각의 Observable 객체를 만든다. 5초 후에 두 객체를 subscribe 한다.

  • just로 만든 Observable은 생성 시점의 시간을 발행한다. (현재시각1과 거의 동일)
  • defer로 만든 Observable은 subscribe할 때 새로운 Observable 아이템을 발행한다. (현재시각2와 거의 동일)

구독하기 직전에 Observable을 생성해서 가장 최신 상태의 아이템을 발행하려고 한다면 defer를 이용하는 것이 좋아 보인다.


empty, never 연산자

empty, never 둘 다 아이템을 발행하지 않는 Observable을 생성한다. empty는 onComplete()를 호출하고, never는 호출하지 않는다.


empty


empty는 아이템을 발행하지 않고 정상적으로 스트림을 종료시킨다.


never


never는 아이템도 발행하지 않고, 스트림을 종료하지도 않는다.


Observable.empty()
        .doOnTerminate(() -> System.out.println("empty 종료"))
        .subscribe();

Observable.never()
        .doOnTerminate(() -> System.out.println("never 종료"))
        .subscribe();
empty 종료


doOnTerminate()의 콜백은 Observable이 종료된 후에 호출된다. empty 스트림의 경우에는 정상 종료 후 onComplete -> doOnTerminate 가 호출됐지만, never 스트림은 onComplete가 호출되지 않아 콜백을 받지 못한다.


interval 연산자

interval


interval 연산자는 주어진 시간 간격으로 순서대로 정수를 발행하는 Observable을 생성한다. 구독을 중지하기 전까지 아이템을 무한히 발행하기 때문에 블필요해졌을 때 dispose 해주어야 한다.

Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(System.out::println);
try {
    Thread.sleep(3500);
} catch (InterruptedException e) {
    e.printStackTrace();
}

disposable.dispose();
0
1
2


range 연산자

range


range는 특정 범위의 정수를 순서대로 발행하는 Observable을 생성한다. interval과 비슷하지만, 특정 범위의 아이템을 발행한다는 점과 발행히 끝나면 스트림을 종료시킨다는 점에서 다르다.

Observable.range(3, 5) //range(start, count)
        .subscribe(System.out::println);
3
4
5
6
7


timer 연산자

timer


timer 연산자는 특정 시간 동안 지연시킨 뒤 0L을 발행한 후 종료시킨다.

Observable src = Observable.timer(3, TimeUnit.SECONDS);
System.out.println("구독");
src.subscribe(item -> //구독 후 3초 후에 아이템이 발행된다.
        System.out.println("발행 --- item : " + item + ", Class: " + item.getClass().getSimpleName())
);
구독
발행 --- item : 0, Class: Long



Observable을 변형하는 연산자

RxJava에서는 아이템을 그대로 발행하는 것 뿐 아니라, 발행되는 아이템을 변환하여 다른 아이템으로 변경할 수도 있다.


map 연산자

map


map 은 발행되는 아이템을 변환하는 가장 기본적인 방법이다. 발행된 아이템에 원하는 수식을 적용하거나 다른 타입으로 변환시킬 수 있다.


Observable<Integer> intSrc = Observable.just(1, 2, 3);
Observable<String> stringSrc = intSrc.map(i -> String.valueOf(i * 10));
stringSrc.subscribe(item ->
        System.out.println("item : " + item + ", Class: " +
        item.getClass().getSimpleName())
);
item : 10, Class: String
item : 20, Class: String
item : 30, Class: String


flatMap 연산자

flatMap


flatMap은 Observable을 다른 Observable로 변환시킨 후, 발행되는 아이템을 병합하여 다시 아이템을 방출시킨다. 한 개의 아이템을 n번씩 방출하게 된다.

Observable<String> src = Observable.just("a", "b", "c");
src.flatMap(str -> Observable.just(str + "1", str + "2"))
        .subscribe(System.out::println);
a1
a2
b1
b2
c1
c2


buffer 연산자

buffer


buffer는 Observable이 발행하는 아이템을 묶어서 List로 변환한다. 에러가 발생한 경우, 이미 발행된 아이템들이 버퍼에 포함되더라도 버퍼를 발행하지 않고 즉시 에러를 전달한다. buffer의 파라미터로 count 값을 받아 아이템을 몇 번씩 저장할지 정한다.

Observable.range(0, 7)
        .buffer(3) //count
        .subscribe(integers -> {
            System.out.println("buffer 데이터 발행");
            for (Integer i : integers) {
                System.out.println("#" + i);
            }
        });
buffer 데이터 발행
#0
#1
#2
buffer 데이터 발행
#3
#4
#5
buffer 데이터 발행
$6


scan 연산자

scan

scan은 발행된 아이템을 다음 번 발행되는 아이템의 첫 번째 인자로 전달하는 연산자이다.

Observable.just("a", "b", "c", "d", "e")
        .scan((x, y) -> x + y)
        .subscribe(System.out::println);
a
ab
abc
abcd
abcde

한 개의 값만 있는 첫 번째 값으로는 x가 그대로 발행되고, 두 번째부터 계산이 처리된다.




References

  • 옥수환, 『아키텍처를 알아야 앱 개발이 보인다』, 비제이퍼블릭(2020)
  • http://reactivex.io/documentation/operators