RxJava 1 포스트 [Android] RxJava 시작하기에서는 반응형 프로그래밍에 대한 개념을 설명하고 명령형 프로그램과의 차이를 서술했다.

RxJava 2 포스트 [Android] RxJava Observable 옵저버블에서는 옵저버블과 관련된 기본적인 연산자들을 살펴보았다.

이번에 쓸 내용은 Hot Observable과 Cold Observable, 그리고 그외 관련된 연산자에 관한 내용이다.


두 줄 요약

구구절절 설명을 늘어놓으면 어려우니 요약부터 하고 시작하자.



Cold Observable : Youtube 동영상을 재생하는 것처럼, 구독을 요청하면 아이템을 발행하기 시작한다.

Hot Observable : 라이브 방송을 시청하는 것처럼, 아이템 발행이 시작된 이후로 모든 구독자에게 동시 같은 아이템을 발행한다.


Cold Observable

Cold Observable은 지금까지 앞에서 다뤄왔던 예제들처럼 Observable에 구독을 요청하면 아이템을 발행하기 시작한다. 아이템은 처음부터 끝까지 발행되고, 임의로 종료시키지 않는 이상 여러 번 요청에도 처음부터 끝까지 발행하는 것을 보장한다. 생성된 옵저버블은 방출된 아이템에 대해 각각 고유한 인스턴스를 가지게 된다.(different instances of emitted items) 아래 예시는 interval 연산자를 통해 1초 마다 아이템을 발행하는 Cold Observable 의 예시이다.

Observable src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(value -> System.out.println("First: " + value));
Thread.sleep(3000);
src.subscribe(value -> System.out.println("Second: " + value));
Thread.sleep(3000);
First: 0
First: 1
First: 2
First: 3
Second: 0
First: 4
Second: 1
First: 5
Second: 2


첫 번째로 Observable을 구독하고, 3초 후에 새로운 구독자로 다시 구독한 모습이다. 옵저버블 인스턴스가 동일한지, 아닌지에 상관 없이 두 옵저버 모두 처음부터 (0부터) 발행하는 것을 볼 수 있다. 이런 옵저버블이 Cold Observable, 콜드 옵저버블이다. 같은 src 인스턴스를 사용했음에도 처음부터 시작하는 이유는 각각의 observer(관찰자)마다 다른 observable 소스가 생성되기 때문이다.

데이터 기반(data-driven)의 옵저버블은 대부분 Cold Observable 로 만들어진다. HTTP의 GET이나 데이터베이스 쿼리처럼, 동일한 옵저버블 객체에서 서로 다른 결과를 생성할 때에도 마찬가지로 Cold 속성을 지닌다. 데이터 혹은 작업이 모든 관찰자에게 재생(Replayed) 된다. 실제 사용 예시로는 Retrofit 통신이나 Room 쿼리 같은 경우가 있겠다.


Hot Observable

반면 Hot Observable아이템 발행이 시작된 이후로 모든 구독자에게 동시 같은 아이템을 발행한다. 첫 번째 구독자가 옵저버블을 구독 한 몇 초 후에 두 번째 구독자가 같은 옵저버블을 구독한다면, 둘은 동시에 같은 아이템을 수신하며 두 번째 구독자는 구독 이전에 발행된 아이템을 놓칠 수도 있다.

가장 일반적인 예시로는 안드로이드에서의 UI 이벤트이다. 안드로이드에서의 클릭 이벤트는 observer 가 구독한 후 이벤트만 수신하는 용도로만 사용되며, 이를 재생(replay)하기 위해 캐싱할 필요는 없다.

Hot Observable을 구현하기 위한 방법에는 여러 가지가 있다. 그 중 하나로 ConnectableObservable 이라는 특수한 유형의 Observable이 있다.


publish 연산자와 connect 연산자

ConnectableObservable은 Hot Observable을 구현할 수 있도록 도와주는 타입이다. 서로 다른 관찰자(Observer)에 대한 단일 옵저버블 소스이다. 이 ConnectableObservable의 주요 특징은 구독을 요청해도 데이터를 곧바로 발행하지 않는다는 점이다.

우선 publish() 연산자를 통해 일반적인 Observable을 Hot Observable로 변환한다. publish만으로는 아이템 replay가 활성화되지 않는다. connect() 연산자를 호출할 때에 비로소 아이템을 발행하기 시작한다.

ConnectableObservable src =
    Observable.interval(1, TimeUnit.SECONDS)
    .publish();
src.connect();
src.subscribe(value -> System.out.println("First: " + value));
Thread.sleep(3000);
src.subscribe(value -> System.out.println("Second: " + value));
Thread.sleep(3000);
First: 0
First: 1
First: 2
First: 3
Second: 3
First: 4
Second: 4
First: 5
Second: 5

첫 번째 구독 시에 3초동안 0~2 아이템을 발행하고, 3초 뒤에 추가된 구독자는 이를 수신하지 못하며 3부터 수신하는 것을 볼 수 있다.


autoConnect 연산자

autoConnect 연산자는 connect()를 호출하지 않더라도 구독 즉시 아이템을 발행할 수 있도록 도와주는 연산자이다. autoConnect의 매개변수는 아이템을 발행하는 구독자의 수이다. 만약 autoConnect(2)라고 한다면 구독자가 2개 이상 붙어야 아이템을 발행하기 시작한다.

Observable<Long> src =
    Observable.interval(1, TimeUnit.SECONDS)
    .publish()
    .autoConnect(2);

src.subscribe(value -> System.out.println("First: " + value));
src.subscribe(value -> System.out.println("Second: " + value));
Thread.sleep(3000);
First: 0
Second: 0
First: 1
Second: 1
First: 2
Second: 2


만약 src.subscribe()를 한 번만 호출했다면 아이템을 발행하지 않아 println()이 실행되지 않을 것이다. autoConnect()의 파라미터에 0 이하의 수를 입력하면 구독자 수와 관계 없이 곧바로 아이템 발행을 시작한다.


References

  • 옥수환, 『아키텍처를 알아야 앱 개발이 보인다』, 비제이퍼블릭(2020)
  • http://reactivex.io/documentation/operators
  • https://medium.com/tompee/rxjava-ninja-hot-and-cold-observables-19b30d6cc2fa