Skip to content

08. How to do Error Handling

이진혁 edited this page May 25, 2021 · 1 revision

이번 글에서는 데이터를 처리하다가 발생하는 에러를 어떻게 핸들링할 것인지에 대해서 다룰 것입니다.
저번 글에서 Flux.error()Mono.error()를 통해 데이터 스트림에 에러 발생 데이터를 생성하는 법에 대해 알아보았습니다.

이 방법 이외에도 map() 메소드를 통해 데이터 매핑을 시도하다가 ClassCastException이 발생할 수도 있고,
데이터베이스에 쿼리문을 보낼때 SQLException이 발생할 수도 있으며,
API 요청을 보내다가 IOException이 발생할 수도 있습니다.
이런 에러 상황이 발생했을 때 데이터 스트림을 처리하지 않고 프로그램을 종료하게 되면
애플리케이션에 치명적인 피해가 있을 것입니다.
이런 피해를 사전에 방지하고자 에러 발생시 실행되는 onError...() 메소드가 존재합니다.

onErrorReturn()

onErrorReturn()은 에러가 발생했을 때 어떠한 값을 리턴하는 메소드입니다.
매개변수 타입이 기존 데이터 스트림의 데이터 타입 이므로
기존 데이터 스트림이 Publisher<String>이었다면 String 값을 매개변수로 넘겨주어야 합니다.

Flux.just("before data")
    .mergeWith(Flux.error(new RuntimeException()))
    .mergeWith(Flux.just("after data"))
    .log()
    .onErrorReturn("flux error")
    .mergeWith(Flux.just("after after data"))
    .doOnNext(System.out::println)
    .subscribe();

위 코드에서 찍힌 로그를 살펴보면 아래와 같습니다.

14:02:29.429 [Test worker] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
14:02:29.434 [Test worker] INFO reactor.Flux.Merge.1 - request(32)
14:02:29.434 [Test worker] INFO reactor.Flux.Merge.1 - onNext(before data)
before data
14:02:29.448 [Test worker] ERROR reactor.Flux.Merge.1 - onError(java.lang.RuntimeException)
14:02:29.453 [Test worker] ERROR reactor.Flux.Merge.1 - 
java.lang.RuntimeException: null
flux error
after after data

위 코드는 'before data', RuntimeException 에러 발생 데이터, 'after data'를 데이터로 가진 Flux가 있습니다.
그리고 onErrorReturn() 메소드를 통해 에러 발생시 'flux error' 데이터를 가진 Flux를 리턴하도록 하였습니다.
그리고 doOnNext() 메소드를 통해 출력하는데,
'before data'가 출력되고, 에러 발생 후, 'flux error'가 출력되고 이후 merge된 'after after data'가 출력됩니다.
'after data'가 출력되지 않은 것으로 봤을 때
onErrorReturn() 메소드는 기존 데이터 스트림을 제거하고
매개변수로 들어온 리턴할 데이터만을 가진 새로운 스트림을 생성하는 것 같습니다.

Example

onErrorReturn() 메소드 테스트하기

onErrorResume()

onErrorResume()onErrorReturn() 메소드와 아주 유사합니다.
다른 점은 매개변수 타입이 Function<? super Throwable,? extends Mono<? extends T>>이라는 것입니다.
따라서 FLuxMono를 만들 때 발생한 에러 객체를 사용할 수 있다는 특징이 있습니다.
이외에는 onErrorReturn()과 동일합니다.

Example

onErrorResume() 메소드 테스트하기

onErrorContinue()

onErrorContinue()onErrorReturn()이나 onErrorResume()과는 다르게 에러를 데이터로 치환하지 않고
에러를 제거한 후 어떠한 행동을 실시하고 기존의 데이터 스트림을 그대로 진행합니다.

Flux.just("before data")
    .mergeWith(Flux.error(RuntimeException::new))
    .mergeWith(Flux.just("after data"))
    .log()
    .onErrorContinue((e, o) -> {
        System.out.println(String.format("error: %s", e));
        System.out.println(String.format("object: %s", o));
    })
    .mergeWith(Flux.just("after after data"))
    .doOnNext(System.out::println)
    .subscribe();

onErrorContinue() 메소드가 받는 매개변수는 BiConsumer<Throwable, Object> 타입입니다.
Throwable은 발생한 에러 객체이고 ObjectFluxErrorSupplied 객체입니다.
여기서 '어떠한 행동'을 할 수 있습니다.
매개변수가 Consumer인만큼 로그를 찍든, 데이터베이스에 저장하든 무슨 짓이든 할 수 있습니다.
이후 기존의 스트림을 유지한채 'after after data' 데이터를 추가합니다.
따라서 로그는 다음과 같이 출력됩니다.

14:34:53.343 [Test worker] INFO reactor.Flux.Merge.1 - onSubscribe(FluxFlatMap.FlatMapMain)
14:34:53.351 [Test worker] INFO reactor.Flux.Merge.1 - request(32)
14:34:53.352 [Test worker] INFO reactor.Flux.Merge.1 - onNext(before data)
before data
error: java.lang.RuntimeException
object: FluxErrorSupplied
14:34:53.364 [Test worker] INFO reactor.Flux.Merge.1 - onNext(after data)
after data
14:34:53.365 [Test worker] INFO reactor.Flux.Merge.1 - onComplete()
after after data

Example

onErrorContinue() 메소드 테스트하기

Exceptions.propagate()를 통해 Unchecked Exception으로 추상화하기

데이터 스트림에서 데이터가 흐를 때 Checked Exception이 발생하는 메소드를 실행한다면,
try-catch를 이용해서 새로운 에러를 발생시키거나 무심하게 throws해야합니다.
하지만 try-catch를 이용하면 Checked Exception이 아닌 다른 RuntimeException을 리턴해야 하므로,
기존 Checked Exception의 의미를 잃어버리게 되고 새로운 에러 클래스를 생성해야 하는 불편함도 생기게 됩니다.

그래서 Project Reactor에는 Exceptions 클래스의 static propagate() 메소드를 지원합니다.
propagate() 메소드는 Checked ExceptionRuntimeException으로 추상화해주지만,
데이터 스트림에서는 Checked Exception으로 작동하도록 합니다.
따라서 Checked Exception의 본질을 잃지 않을 수 있습니다.

@Test
public void checkedExceptionAbstractionTest() {
    Flux.just("apple", "apple", "apple", "banana", "apple")
        .log()
        .map(fruit -> {
            try {
                return toPineapple(fruit);
            } catch (NonAppleException e) {
                throw Exceptions.propagate(e);
            }
        })
        .doOnNext(System.out::println)
        .subscribe();
}

private String toPineapple(String fruit) throws NonAppleException {
    if (!fruit.equals("apple")) {
        throw new NonAppleException();
    }
    return "pineapple";
}

private static class NonAppleException extends Exception {}

Example

Exceptions.propagate() 메소드로 Checked Exception 추상화하기


Reference

백기선님의 에러 처리 강의
Exceptions Docs
Flux Docs