RxJava 1 포스트 [Android] RxJava 시작하기
RxJava 2 포스트 [Android] RxJava Observable 옵저버블
RxJava 3 포스트 [Android] RxJava Cold Observable, Hot Observable
RxJava 4 포스트 [Android] RxJava Disposable
RxJava 5 포스트 [Android] RxJava Observable 생성, 변형 연산자


이번 포스트에는 Observable을 필터링, 결합하는 연산자에 대해 정리했다.


Observable을 필터링하는 연산자

Observable로부터 발행되는 아이템을 선택적으로 발행하도록 하는 연산자에 대해 알아보자.


debounce 연산자

debounce


debounce 연산자는 특정 시간 동안 다른 아이템이 발행되지 않을 때에만 아이템을 발행하도록 하는 연산자이다. 반복적으로 빠르게 발행된 아이템들을 필터링할 때에 유용하다.

Observable.create(emitter -> {
    emitter.onNext("1");
    Thread.sleep(100);
    emitter.onNext("2");
    emitter.onNext("3");
    emitter.onNext("4");
    emitter.onNext("5");
    Thread.sleep(100);
    emitter.onNext("6");
})
        .debounce(10, TimeUnit.MILLISECONDS)
        .subscribe(System.out::println);
1
5
6


distinct 연산자

distinct


distinct는 이미 발행한 아이템을 중복해 발행하지 않도록 필터링하는 연산자이다.


Observable.just("A", "B", "B", "A", "C")
                .distinct()
                .subscribe(System.out::println);
A
B
C


elementAt 연산자

elementAt


elementAt 연산자는 발행되는 아이템 시퀀스에서 특정 인덱스를 필터링한다.


Observable.just("A", "B", "B", "A", "C")
        .elementAt(3)
        .subscribe(System.out::println);
A


filter 연산자

filter


filter 연산자는 조건식이 true일 때에만 아이템을 발행한다.

Observable.just(12, 35, 43, 39, 10, 18, 42) //942회차 로또 당첨번호
        .filter(x -> x > 30)
        .subscribe(System.out::println);
35
43
39
42


sample 연산자

sample


sample 연산자는 일정 시간 간격으로 최근에 Observable이 배출한 아이템들을 방출한다.


Observable.interval(100, TimeUnit.MILLISECONDS)
        .sample(300, TimeUnit.MILLISECONDS)
        .subscribe(System.out::println);
1
4
7
10
13
16
...


interval 연산자를 이용해 100ms마다 정수 아이템을 발행하므로, 1초동안 0~9의 아이템이 발행된다. 그리고 300ms마다 sample 연산자가 실행되면서 가장 최근에 발행됐던 아이템을 샘플링한다.


skip 연산자

skip


skip 연산자는 Observable이 발행하는 n개의 아이템을 무시하고 이후에 나오는 아이템을 발행하는 연산자이다.


Observable.just(1, 2, 3, 4, 5)
        .skip(2)
        .subscribe(System.out::println);
3
4
5

skip(2) 로 인해 앞의 두 개를 제외한 나머지 아이템을 발행하는 것을 볼 수 있다.


take 연산자

take


take는 skip과 반대로 Observable이 처음 발행하는 n개의 아이템만 방출하도록 하는 연산자이다. n번째 이후 방출되는 아이템은 무시된다.


Observable.just(1, 2, 3, 4, 5)
        .take(2)
        .subscribe(System.out::println);
1
2


all 연산자

all


all 연산자는 모든 발행되는 아이템이 특정 조건을 만족해야 true를 반환한다. 하나라도 조건에 맞지 않다면 false를 반환한다.

Observable.just(1, 2, 3, 4, 5)
        .all(i -> i > 0)
        .subscribe(System.out::println);
true

Observable을 결합하는 연산자

여러 개의 Observable 소스를 결합하여 하나의 Observable을 생성하는 연산자이다.

combineLatest 연산자

combineLatest


combineLatest 연산자는 두 개의 Observable 중 하나에서 아이템이 발행될 때, 두 Observable에서 가장 최근에 발행한 아이템을 취합하여 하나로 발행한다.

여러 개의 http 요청에 의한 응답을 하나로 묶어서 처리할 때 사용되곤 한다.


Observable<Integer> intSource = Observable.create((emitter -> {
    new Thread(() -> {
        for (int i = 1; i <= 5; i++) {
            emitter.onNext(i);
            try {
                Thread.sleep(1000);
            } catch (Exception ignored) {
            }
        }
    }).start();
}));

Observable<String> strSource = Observable.create(emitter -> {
    new Thread(() -> {
        try {
            Thread.sleep(500);
            emitter.onNext("A");
            Thread.sleep(700);
            emitter.onNext("B");
            Thread.sleep(100);
            emitter.onNext("C");
            Thread.sleep(700);
            emitter.onNext("D");
        } catch (Exception e) {
        }
    }).start();
});

Observable.combineLatest(intSource, strSource, (num, str) -> num + str)
        .subscribe(System.out::println);

1A
2A
2B
2C
3C
3D
4D
5D


zip 연산자

zip


zip 연산자는 여러 Observable을 하나로 결합하고, 지정된 함수를 통해 하나의 아이템으로 발행한다. combineLatest가 최근에 발행된 아이템끼리 발행하는 역할을 했다면, zip은 1:1 순서를 지켜 아이템을 발행한다.

Observable<Integer> intSource = Observable.create((emitter -> {
    new Thread(() -> {
        for (int i = 1; i <= 5; i++) {
            emitter.onNext(i);
            try {
                Thread.sleep(1000);
            } catch (Exception ignored) {
            }
        }
    }).start();
}));


Observable<String> strSource = Observable.create(emitter -> {
    new Thread(() -> {
        try {
            Thread.sleep(500);
            emitter.onNext("A");
            Thread.sleep(700);
            emitter.onNext("B");
            Thread.sleep(100);
            emitter.onNext("C");
            Thread.sleep(700);
            emitter.onNext("D");
        } catch (Exception e) {
        }
    }).start();
});

Observable.zip(intSource, strSource, (num, str) -> num + str)
        .subscribe(System.out::println);
1A
2B
3C
4D


combineLatest와 동일한 src를 사용했지만 strSource의 최대 아이템 개수인 4개까지 결합된 아이템이 발행되는 것을 볼 수 있다.


merge 연산자

merge

merge 연산자는 여러 Observable을 결합해 하나의 Observable에서 발행되는 것처럼 사용할 수 있다.


Observable<Integer> src1 = Observable.create(emitter ->
        new Thread(() -> {
            try {
                Thread.sleep(100);
                emitter.onNext(1);
                Thread.sleep(100);
                emitter.onNext(2);
                Thread.sleep(100);
                emitter.onNext(3);
            } catch (Exception e) {
            }
        }).start()
);

Observable<Integer> src2 = Observable.create(emitter ->
        new Thread(() -> {
            try {
                Thread.sleep(250);
                emitter.onNext(100);
                Thread.sleep(250);
                emitter.onNext(200);
                Thread.sleep(250);
                emitter.onNext(300);
            } catch (Exception e) {
            }
        }).start()
);


Observable.merge(src1, src2)
        .subscribe(System.out::println);
1
2
100
3
200
300


References

  • 옥수환, 『아키텍처를 알아야 앱 개발이 보인다』, 비제이퍼블릭(2020)
  • http://reactivex.io/documentation/operators