RxJava에서 에러가 발생한 상황이나 디버깅이 필요한 때 사용하는 연산자에 대해 알아보자.


오류를 다루는 연산자

RxJava에서는 예외 사항이 일어나면 Emitter.onError()를 호출하여 오류 이벤트를 통제한다.


Observable.just("1", "2", "a", "3")
        .map(i -> Integer.parseInt(i))
        .subscribe(System.out::println);
1
2
java.lang.NumberFormatException: (대충 "a"를 Integer로 못 바꿨다는 내용)

“a”는 정수로 파싱이 불가능하므로 java에서 NumberFormatException이 발생한다. 정확히는, 고맙게도 RxJava에서 친절하게 해결 방법을 포함해서 로그를 출력해준다.

io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling java.lang.NumberFormatException: For input string: “a”

subscribe()에서 에러를 처리하는 방법이 다뤄지지 않았다는 내용이다. 현재는 subscribe에 아이템이 발행되면(onNext()) print를 하도록 지정되어 있다. subscribe의 두 번째 인자로 오류를 처리하는(onError()) Consumer를 추가해 줄 것이다.

subscribe


Observable.just("1", "2", "a", "3")
        .map(i -> Integer.parseInt(i))
        .subscribe(System.out::println,
                throwable -> System.out.println("대충 파싱 실패 에러라는 내용")
        );
1
2
대충 파싱 실패 에러라는 내용


위와 같이 subscribe에서 오류를 처리하는 방법이 있고, 또 RxJava에서 제공하는 오류 처리에 대한 연산자를 사용할 수도 있다.


onErrorReturn 연산자

onErrorReturn

onErrorReturn 연산자는 오류가 발생하면 아이템 발행을 종료하고 onError()를 호출하는 대신 오류 처리를 위한 함수를 실행한다.

Observable.just("1", "2", "a", "3")
        .map(i -> Integer.parseInt(i))
        .onErrorReturn(throwable -> -1)
        .subscribe(System.out::println);
1
2
-1

오류 발생 시 -1을 반환하고 스트림을 종료한다.


onErrorResumeNext 연산자

onErrorResumeNext

RxJava는 함수 이름이 참 직관적이라 좋다. 이 함수는 오류가 발생하면 기존 스트림은 종료하고 다른 Observable 소스로 스트림을 교체한다.

Observable.just("1", "2", "a", "3")
        .map(i -> Integer.parseInt(i))
        .onErrorResumeNext(throwable ->
                Observable.just(100, 200, 300))
        .subscribe(System.out::println);
1
2
100
200
300


retry 연산자

retry

retry는 에러가 발생하면 Observable을 재구독하도록 한다. 하지만 재시도한들 같은 Observable이기 때문에 무한 루프에 빠질 확률이 높다. 그래서 retry(2) 처럼 재시도 횟수를 long 형태의 인자로 줄 수 있게 되어 있다.

Observable.just("1", "2", "a", "3")
        .map(i -> Integer.parseInt(i))
        .retry(2)
        .onErrorReturn(throwable -> -1)
        .subscribe(System.out::println);
1
2
1
2
1
2
-1

최초에 1,2를 한 번 출력하고 그 후에 두 번 재시도, 마지막으로 -1을 반환하는 것을 볼 수 있다.


디버깅을 돕는 doOn- 연산자

Observable의 체인 스트림 내부를 보기 힘든 경우에는 doOn 연산자를 통해 흐름을 파악할 수 있다.

doOnEach 연산자

doOnEach

doOnEach 연산자는 Observable이 아이템을 발행하기 전에 Notification<T> 형태의 콜백으로 확인할 수 있도록 해준다. getValue()를 통해 아이템을 확인할 수도 있고, isOnNext(), isOnComplete, isOnError 등의 함수도 제공된다.

Observable.just(1, 2, 3)
        .doOnEach(notification -> {
            System.out.println("value = " + notification.getValue());
            System.out.println("isOnNext = " + notification.isOnNext());
            System.out.println("isOnComplete = " + notification.isOnComplete());
            System.out.println("isOnError = " + notification.isOnError());
        })
        .subscribe(System.out::println);
value = 1
isOnNext = true
isOnComplete = false
isOnError = false
1
value = 2
isOnNext = true
isOnComplete = false
isOnError = false
2
value = 3
isOnNext = true
isOnComplete = false
isOnError = false
3
value = null
isOnNext = false
isOnComplete = true

발행되는 아이템이 1, 2, 3 이므로 Notification 이 제공된다. 주목해야 할 것은 3을 발행한 이후의 흐름이다. 더 이상 아이템이 없으므로 `value`는 null이고 `isOnComplete` 처리를 하는 것을 볼 수 있다.


doOnNext 연산자

doOnNext

doOnNext도 각 아이템 발행 전에 확인할 수 있다는 점에서 doOnEach와 유사하지만, Notification 형식이 아닌 Consumer를 파라미터로 넘겨주어 발행되는 아이템을 쉽게 확인할 수 있다.

Observable.just(1, 2, 3)
        .doOnNext(it -> System.out.println("아이템 출력 " + it))
        .subscribe(System.out::println);
아이템 출력 1
1
아이템 출력 2
2
아이템 출력 3
3


doOnSubscribe 연산자

doOnSubscribe

doOnSubscribe 연산자는 구독 시에 콜백을 받을 수 있게 한다. Disposable을 넘겨주기 때문에 isDisposed()로 확인을 하거나 직접 dispose() 할 수 있다.

Observable.just(1, 2, 3)
        .doOnSubscribe(disposable -> {
            if (disposable.isDisposed()) {
                System.out.println("disposed 됨");
            } else {
                System.out.println("구독 시작");
            }
        })
        .subscribe(System.out::println);
구독 시작
1
2
3


doOnComplete 연산자

doOnComplete

doOnComplete 는 Emitter의 onComplete() 호출로 Observable이 정상적으로 종료됐을 때에 호출되는 콜백이다.

Observable.just("1", "2", "3")
        .doOnComplete(() -> System.out.println("Complete"))
        .subscribe(System.out::println);
1
2
3
Complete


doOnError 연산자

doOnError


doOnError는 말 그대로, doOnComplete 와는 반대로 Emitter에서 onError() 호출로 Observable이 정상적으로 종료되지 않았을 때 호출된다. 파라미터로 throwable이 주어진다.

Observable.just("1", "2", "a", "3")
        .map(i -> Integer.parseInt(i))
        .doOnError(throwable -> System.out.println(throwable.toString()))
        .subscribe(System.out::println);
1
2
java.lang.NumberFormatException: For input string: "a"

다만 doOnError는 디버깅에 사용되기 때문에 위에서 언급한 onErrorReturn 등을 사용하거나 subscribe에서 에러 처리를 하는 등 별도 처리가 필요하다.


References

  • 옥수환, 『아키텍처를 알아야 앱 개발이 보인다』, 비제이퍼블릭(2020)
  • http://reactivex.io/documentation/operators