[Android] RxJava Observable 생성, 변형 연산자
by Yena Choi
Study Note
RxJava 1 포스트 [Android] RxJava 시작하기
RxJava 2 포스트 [Android] RxJava Observable 옵저버블
RxJava 3 포스트 [Android] RxJava Cold Observable, Hot Observable
RxJava 4 포스트 [Android] RxJava Disposable
이번 RxJava 5 포스트에서는 여러 연산자 중에서도 Observable을 생성, 변형하는 연산자에 대해 알아보려고 한다. 영어 독해나 작문을 하기 위해서는 먼저 단어를 알아야 하는 것처럼, 연산자를 알아야 각 상황에 맞는 비동기 처리를 할 수 있을 것이다. 그래서 오늘 포스트는 각 연산자가 의미하는 것을 나열하는 포스트가 되겠다.
Observable을 생성하는 연산자
지난 포스트에서 살펴본 create()
, just()
이외에도 Observable을 생성하는 다른 연산자들이 있다.
defer 연산자
defer 연산자는 Observer가 구독할 때까지 Observable의 생성을 지연시키는 역할을 한다. subscribe()를 호출할 때 Observable 아이템을 생성한다.
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", Locale.KOREA);
Observable<String> justSrc = Observable.just(
sdf.format(System.currentTimeMillis())
);
Observable<String> deferSrc = Observable.defer(() ->
Observable.just(sdf.format(System.currentTimeMillis()))
);
System.out.println("현재시각1 : " + sdf.format(System.currentTimeMillis()));
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("현재시각2 : " + sdf.format(System.currentTimeMillis()));
justSrc.subscribe(time ->
System.out.println("justSrc : " + time)
);
deferSrc.subscribe(time ->
System.out.println("deferSrc : " + time)
);
현재시각1 : 2020-12-05 20:48:46.507
현재시각2 : 2020-12-05 20:48:55.112
justSrc : 2020-12-05 20:48:46.430
deferSrc : 2020-12-05 20:48:55.138
just()와 defer()를 통해 각각의 Observable 객체를 만든다. 5초 후에 두 객체를 subscribe 한다.
- just로 만든 Observable은 생성 시점의 시간을 발행한다. (현재시각1과 거의 동일)
- defer로 만든 Observable은 subscribe할 때 새로운 Observable 아이템을 발행한다. (현재시각2와 거의 동일)
구독하기 직전에 Observable을 생성해서 가장 최신 상태의 아이템을 발행하려고 한다면 defer를 이용하는 것이 좋아 보인다.
empty, never 연산자
empty, never 둘 다 아이템을 발행하지 않는 Observable을 생성한다. empty는 onComplete()
를 호출하고, never는 호출하지 않는다.
empty는 아이템을 발행하지 않고 정상적으로 스트림을 종료시킨다.
never는 아이템도 발행하지 않고, 스트림을 종료하지도 않는다.
Observable.empty()
.doOnTerminate(() -> System.out.println("empty 종료"))
.subscribe();
Observable.never()
.doOnTerminate(() -> System.out.println("never 종료"))
.subscribe();
empty 종료
doOnTerminate()의 콜백은 Observable이 종료된 후에 호출된다. empty 스트림의 경우에는 정상 종료 후 onComplete -> doOnTerminate 가 호출됐지만, never 스트림은 onComplete가 호출되지 않아 콜백을 받지 못한다.
interval 연산자
interval 연산자는 주어진 시간 간격으로 순서대로 정수를 발행하는 Observable을 생성한다. 구독을 중지하기 전까지 아이템을 무한히 발행하기 때문에 블필요해졌을 때 dispose 해주어야 한다.
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
try {
Thread.sleep(3500);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
0
1
2
range 연산자
range는 특정 범위의 정수를 순서대로 발행하는 Observable을 생성한다. interval과 비슷하지만, 특정 범위의 아이템을 발행한다는 점과 발행히 끝나면 스트림을 종료시킨다는 점에서 다르다.
Observable.range(3, 5) //range(start, count)
.subscribe(System.out::println);
3
4
5
6
7
timer 연산자
timer 연산자는 특정 시간 동안 지연시킨 뒤 0L
을 발행한 후 종료시킨다.
Observable src = Observable.timer(3, TimeUnit.SECONDS);
System.out.println("구독");
src.subscribe(item -> //구독 후 3초 후에 아이템이 발행된다.
System.out.println("발행 --- item : " + item + ", Class: " + item.getClass().getSimpleName())
);
구독
발행 --- item : 0, Class: Long
Observable을 변형하는 연산자
RxJava에서는 아이템을 그대로 발행하는 것 뿐 아니라, 발행되는 아이템을 변환하여 다른 아이템으로 변경할 수도 있다.
map 연산자
map 은 발행되는 아이템을 변환하는 가장 기본적인 방법이다. 발행된 아이템에 원하는 수식을 적용하거나 다른 타입으로 변환시킬 수 있다.
Observable<Integer> intSrc = Observable.just(1, 2, 3);
Observable<String> stringSrc = intSrc.map(i -> String.valueOf(i * 10));
stringSrc.subscribe(item ->
System.out.println("item : " + item + ", Class: " +
item.getClass().getSimpleName())
);
item : 10, Class: String
item : 20, Class: String
item : 30, Class: String
flatMap 연산자
flatMap은 Observable을 다른 Observable로 변환시킨 후, 발행되는 아이템을 병합하여 다시 아이템을 방출시킨다. 한 개의 아이템을 n번씩 방출하게 된다.
Observable<String> src = Observable.just("a", "b", "c");
src.flatMap(str -> Observable.just(str + "1", str + "2"))
.subscribe(System.out::println);
a1
a2
b1
b2
c1
c2
buffer 연산자
buffer는 Observable이 발행하는 아이템을 묶어서 List로 변환한다. 에러가 발생한 경우, 이미 발행된 아이템들이 버퍼에 포함되더라도 버퍼를 발행하지 않고 즉시 에러를 전달한다. buffer의 파라미터로 count 값을 받아 아이템을 몇 번씩 저장할지 정한다.
Observable.range(0, 7)
.buffer(3) //count
.subscribe(integers -> {
System.out.println("buffer 데이터 발행");
for (Integer i : integers) {
System.out.println("#" + i);
}
});
buffer 데이터 발행
#0
#1
#2
buffer 데이터 발행
#3
#4
#5
buffer 데이터 발행
$6
scan 연산자
scan은 발행된 아이템을 다음 번 발행되는 아이템의 첫 번째 인자로 전달하는 연산자이다.
Observable.just("a", "b", "c", "d", "e")
.scan((x, y) -> x + y)
.subscribe(System.out::println);
a
ab
abc
abcd
abcde
한 개의 값만 있는 첫 번째 값으로는 x가 그대로 발행되고, 두 번째부터 계산이 처리된다.
References
- 옥수환, 『아키텍처를 알아야 앱 개발이 보인다』, 비제이퍼블릭(2020)
- http://reactivex.io/documentation/operators