[Android] Rx java Observable 기본 함수 (merge, flatMap, create,just...)

2021. 10. 18. 10:51카테고리 없음

Observable.create()

  • Lambda 표현을 쓸 수 있지만 내부 객체의 익명클래스가 어떤건지 명확하게 하기 위해서 람다표현을 사용하지 않았습니다.
  • 사용방법은 간단한데, create함수는 onSubscribe객체를 파라미터로 가지며, 구독이 발생하면 이 객체의 call()함수가 실행됩니다.
  • 리스너에게 콜백함수를 호출시켜주는 것과 같은원리

코드를 보자!

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Exception {
                emitter.onNext(1L);
            }
        });
        observable.subscribe(
                new Observer<Long>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Long aLong) {
                        System.out.println("sout onNext");
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                }
        );

// sout onNext

emiiter에게 onNext를 주면 onNext가 호출되고 onError를 주면 onError가 호출된다.

Ovservable.flatMap()

  • 각 API 호출 간의 의존성이 있을때 사용한다.
  • 첫번째 API 호출을 통해 얻은 결과를 두번째 API 에서 사용할때
        Observable<Long> observable1 = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Exception {
                emitter.onNext(1L);
            }
        });
        Observable<Long> observable2 = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Exception {
                emitter.onNext(2L);
            }
        });
        observable1
                .flatMap(new Function<Long, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Long aLong) throws Exception {
                        System.out.println("observable1 : " + aLong);
                        return observable2;
                    }
                })
                .subscribe(
                        new Consumer<Object>() {
                            @Override
                            public void accept(Object o) throws Exception {
                                System.out.println(o);
                            }
                        }
                );

> Task :test.main()
observable1 : 1
2

Ovservable.merge()

  • 두개의 타임새틈프를 합쳐 하나의 타임스탬프에서 발행하는것처럼 변경해준다.
  • 각각의 API 결과를 한곳에서 처리하고 싶을 때 사용한다.
  • 여러개의 Observable 소스에서 발행한 데이터를 모아서 한곳에서 모두 받을 수 있게 해준다.
        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Exception {
                emitter.onNext(1L);
            }
        });
        Observable<Long> observable2 = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Exception {
                emitter.onNext(2L);
            }
        });
        observable.merge(observable,observable2)
                .subscribe(
                        new Consumer<Object>() {
                            @Override
                            public void accept(Object o) throws Exception {
                                System.out.println("call : " + o);
                            }
                        }
                );

> Task :test.main()
call : 1
call : 2