지난 포스트 [Android] RxJava 시작하기에서는 반응형 프로그래밍에 대한 개념을 설명하고 명령형 프로그램과의 차이를 서술했다. 이번 포스트에서는 Observable이 어떻게 동작하는지 알아보자.

그리고 이번 글 쓰면서 느낀건데, Rx 공홈 도큐멘테이션 진짜 갓갓이다👍


Observable

RxJava에서는 Observable을 구독하는 Observer가 존재하고, Observable이 순차적으로 발행하는 데이터에 대해서 반응한다. Observable은 다음의 3가지 이벤트를 사용하여 동작한다.

  • onNext() : 하나의 소스 Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터를 발행한다.
  • onComplete() : 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 onNext()를 더 호출하지 않음을 나타낸다.
  • onError() : 오류가 발생했음을 Observer에 전달한다.

위 이벤트들은 Emitter라는 인터페이스에 의해 선언된다.

Emit [imít] 1.방출하다 2.내뿜다 3.발산하다

그리고 데이터나 오류 내용을 발행할 때 null은 발행할 수 없다.


Observable 생성하기

RxJava에서는 연산자(Operator)를 통해 기존 데이터를 참조, 변형하여 Observable을 생성할 수 있다. 이 중 자주 쓰이는 연산자를 살펴보자.


create() 연산자

create

Observable.create()를 사용하면 Emitter를 이용하여 직접 아이템을 발행하고, 아이템 발행의 완료나 오류(Complete/Error)의 알림을 직접 설정할 수 있다.


Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Hello");
    emitter.onNext("Yena");
    emitter.onComplete();
});
source.subscribe(System.out::println);
Hello
Yena


emitter를 통해 “Hello”와 “Yena”를 발행했다. 해당 Observable을 구독하기 위해서 subscribe()를 호출해서 Observer나 Consumer를 추가해준다. 그리고 아이템의 발행이 끝났다면 반드시 onComplete()를 호출해야 한다. onComplete 이후에는 아이템이 더 발행되더라도 구독자는 데이터를 받지 못한다. 위에서 본 예제에서 두 번째 onNext와 onComplete의 위치를 바꾸면 두 번째 데이터인 “Yena”는 받을 수 없다.

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Hello");
    emitter.onComplete();
    emitter.onNext("Yena");
});
source.subscribe(System.out::println);
Hello


만약 오류가 발생했을 때에는 onError()를 호출해서 에러 상황을 처리해야 한다.

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Hello");
    emitter.onError(new Throwable());
    emitter.onNext("Yena");
});
source.subscribe(System.out::println,
    throwable -> System.out.println("Good bye")
);
Hello
Good bye


Observable 객체를 생성하고 아이템을 발행하고 구독까지 해보았다. 하지만 실제로 create() 연산자는 개발자가 직접 Emitter를 제어하기 때문에 주의해서 사용해야 한다. 예를 들어 Observable을 더 이상 사용하지 않을 때에는 등록된 Callback을 모두 해제하지 않으면 메모리 릭이 발생하고, BackPressure를 직접 처리해야 한다.


just() 연산자

just

just()는 해당 아이템을 그대로 발행하는 Observable을 생성해준다. just() 연산자의 인자로 넣은 아이템을 차례로 발행하며, 한 개의 아이템을 넣을 수도 있고, 타입이 같은 여러 아이템을 넣을 수도 있다.

Observable<String> source = Observable.just("Hello", "Yena");
source.subscribe(System.out::println());
Hello
Yena


위에서 언급했지만, RxJava에서는 기본적으로 null을 허용하지 않아서 just의 인자로 null을 발행하면 오류가 발생한다. 만약 아무런 아이템을 발행하지 않는 빈 Observable을 만들고 싶다면 Observable.empty() 연산자를 사용한다.


간단히 Observable로 변환하기

from

배열, 리스트 등의 자료구조나 Future, Callable, Publisher 등은 from으로 시작하는 연산자를 통해 간단히 Observable로 변환할 수 있다.

fromArray() 연산자

배열의 아이템을 Observable로 바꿀 때에는 fromArray() 연산자를 이용하여 아이템을 순차적으로 발행한다.

String[] itemArray = new String[]{"Morning", "Afternoon", "Evening"};
Observable source = Observable.fromArray(itemArray);
source.subscribe(System.out::println);
Morning
Afternoon
Evening


fromIterable() 연산자

ArrayList, HashSet과 같은 Iterable 자료 구조 클래스는 fromIterable() 연산자를 사용해 변환한다.

ArrayList itemList = new ArrayList<String>();
itemList.add("Morning");
itemList.add("Afternoon");
itemList.add("Evening");
Observable source = Observable.fromIterable(itemList);
source.subscribe(System.out::println);
Morning
Afternoon
Evening


fromFuture() 연산자

메소드 이름이 멋지기도 하고 아련하기도 하고…

fromFuture() 연산자는 Future 인터페이스를 지원하는 모든 객체를 ObservableSource로 변환하고 Future.get() 메소드를 호출한 값을 반환한다. 그럼 이 Future 인터페이스는 어디에 쓰냐하면, 바로 비동기적인 작업의 결과를 구할 때 사용한다. 보통 Executor Service를 통해 비동기작업을 할 때 사용된다. Emitter는 Observable 내부에서 Future.get() 메소드를 호출하고, Future의 작업이 끝나기 전까지 스레드는 블로킹된다. RxJava에서는 Executor를 직접 다루기보다는 스케줄러를 사용하는 것을 권장한다.

Future<String> future = Executors.newSingleThreadExecutor()
    .submit(() -> {
        Thread.sleep(5000);
        return "This is the future";
    });
Observable source = Observable.fromFuture(future);
source.subscribe(System.out::println); //블로킹되어 기다림
This is the future


fromPublisher() 연산자

Publisher는 잠재적인 아이템 발행을 제공하는 생산자로 Subscriber로부터 요청을 받아 아이템을 발행한다. fromPublisher() 연산자는 Publisher를 Observable로 변환해준다.

Publisher<String> publisher = Subscriber -> {
    subscriber.onNext("Morning");
    subscriber.onNext("Afternoon");
    subscriber.onNext("Evening");
    subscriber.onComplete();
};
Observable<String> source = Observabler.fromPublisher(publisher);
source.subscribe(System.out::println);
Morning
Afternoon
Evening


fromCallable() 연산자

Callable 인터페이스는 비동기적인 실행 결과를 반환한다는 점이 Runnable과 다르다. fromCallable() 연산자를 통해 Callable을 Observable로 변환하고 비동기적으로 아이템을 발행할 수 있다.

Callable<String> callable = () -> "RxJava is cool";
Observable source = Observable.fromCallable(callable);
source.subscribe(System.out::println);
RxJava is cool


다양한 Observable의 형태

Observable 스트림 외에도 Single, Maybe, Completable 처럼 특별한 스트림이 있다.

Single

Single(싱글)은 단 하나의 아이템만을 발행할 수 있다. create()를 사용하는 경우 Emitter를 사용해 데이터를 발행한다. 데이터를 한 번만 발행하기 때문에 onNext(), onComplete() 대신 onSuccess()를 사용해 데이터 발행이 완료됨을 알려준다. 오류 처리는 Observable의 Emitter와 동일하게 onError()를 이용해 구독자에게 알려준다.

단일 아이템만을 발행한다는 특징 때문에 http 요청/응답 같은 이벤트 처리에 자주 쓰인다.

Single.create(emitter -> emitter.onSuccess("Hello"))
    .subscribe(System.out::println);
Hello


RxJava에는 Observable을 Single로 변환시킬 수 있는 여러 연산자를 가지고 있다. 반대로 Single에서 Observable로도 변환 시킬 수 있다.

  • observable.all()
  • observable.first()
  • observable.toList()
  • single.toObservable()


Maybe

Maybe는 Single과 비슷하지만, 아이템을 발행하거나 발행하지 않을 수도 있다는 점에서 차이가 있다. 그래서 아이템을 발행했을 때에는 onSuccess()를 호출하고, 발행하지 않을 때에는 onComplete()를 호출한다. onSuccess() 이후에 다시 onComplete()를 호출할 필요는 없다.

Maybe 역시 연산자를 통해서 Observable과 서로 변환될 수 있다.

  • observable.firstElement()
  • maybe.toObservable()


Completable

Completable은 아이템을 발행하지 않고, 정상적으로 실행이 종료되었는지에 대해 확인할 때 사용한다. 아이템 발행을 하지 않기 때문에 onNext(), onSuccess()는 쓰지 않고 onComplete()onError()만을 사용한다.

Completable.create(emitter -> {
    System.out.println("OK")
    emitter.onComplete();
}).subscribe(() -> System.out.println("Completed"));
OK
Completed


References

  • 옥수환, 『아키텍처를 알아야 앱 개발이 보인다』, 비제이퍼블릭(2020)
  • http://reactivex.io/documentation/operators