-
Notifications
You must be signed in to change notification settings - Fork 0
04. What is difference between map() and flatMap()
Mono
와 Flux
에는 map()
과 flatMap()
이라는 메소드가 존재합니다.
이 둘의 차이는 무엇일까요?
쉽게 생각하면 map()
은 동기적으로 실행하는 매핑 함수,
flatMap()
은 비동기적으로 실행하는 매핑 함수라고 생각하면 됩니다.
하지만 과연 그럴까요?
Flux.just("apple", "banana", "melon", "mango", "grape", "strawberry", "eggplant", "watermelon", "kiwi")
.map(f -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return f.toUpperCase();
})
.doOnNext(System.out::println)
.subscribe();
위 코드는 실제 웹 서비스에서 IO를 통해 API 요청을 보낸다고 생각하고
한 데이터를 대문자로 변경하는 작업이 API 요청이라고 생각합니다.
그래서 IO에 걸리는 시간을 1초로 설정해두었고,
이에 따라 map()
메소드는 동기적으로 실행하므로 약 9초의 시간이 걸리게 될 것입니다.
실제로 실행하면 예상과 같습니다.
Flux.just("apple", "banana", "melon", "mango", "grape", "strawberry", "eggplant", "watermelon", "kiwi")
.flatMap(f -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.just(f.toUpperCase());
})
.doOnNext(System.out::println)
.subscribe();
이렇게 flatMap()
메소드를 사용해보았는데
실제로 실행해보면 약 9초의 실행시간이 걸리는 것으로
동기적으로 실행한다는 것을 알 수 있습니다.
그럼 map()
과 flatMap()
도대체 뭐가 다른 걸까요?
답은 함수의 형식에서 알 수 있습니다.
public final <V> Flux<V> map(Function<? super T,? extends V> mapper)
public final <R> Flux<R> flatMap(Function<? super T,? extends Publisher<? extends R>> mapper)
map()
메소드는 반환 타입에 제한이 없는 반면에,
flatMap()
메소드는 반환 타입이 Publisher<R>
타입이어야 한다는 제한이 있습니다.
따라서 flatMap()
메소드는 Mono
또는 Flux
로 반환해야 하는데
이렇게 Publisher
로 반환하게 되면 이후 병렬적으로 실행할 수 있습니다.
하지만 map()
은 Publisher
가 아닌 타입을 반환할 수 있으므로
보통 동기적으로 작동하고, flatMap()
은 보통 비동기적으로 작동합니다.
그럼 flatMap()
메소드를 이용하여 비동기적으로 처리하는 코드를 작성해보겠습니다.
Flux.just("apple", "banana", "melon", "mango", "grape", "strawberry", "eggplant", "watermelon", "kiwi")
.window(1)
.flatMap(f -> f.map(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}).subscribeOn(Schedulers.parallel()))
.doOnNext(System.out::println)
.blockLast();
flatMap()
을 하기 전에 실행한 window()
메소드는
데이터를 매개변수 값만큼 묶어서 Flux
로 처리하는 역할을 합니다.
위 코드에서는 1개씩 묶었으므로, 웹 서비스를 9번 호출하는 것과 같다고 볼 수 있습니다.
flatMap()
은 Flux<Flux<String>>
을 받고 처리하는데
내부적으로 map()
을 이용하여 안에 있는 Flux<String>
의 값에서 대문자로 변경하는 작업을 시도합니다.
flatMap()
은 최종적으로 데이터 스트림을 평평하게 피므로 Flux<Flux<대문자>>
에서 Flux<String>
형태가 됩니다.
그리고 내부적으로 map()
을 실행할 때 subscribeOn(Schedulers.parallel())
메소드를 붙였는데
이는 map()
작업을 비동기적으로 실행하도록 합니다.
그러면 "
flatMap()
메소드 말고map()
으로도 비동기 처리가 가능하지 않냐?"고 생각할 수도 있는데map()
을 이용하여 비동기 처리를 하면 다음과 같이 이루어집니다.Flux.just("apple", "banana", "melon", "mango", "grape", "strawberry", "eggplant", "watermelon", "kiwi") .map(f -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return Mono.just(f.toUpperCase()); }).subscribeOn(Schedulers.parallel()) .doOnNext(System.out::println) .blockLast();
subscribeOn()
메소드는 매개변수로 들어오는Scheduler
객체에 따라 처리하는 방식을 변경합니다.
들어온 스케줄링 방식이 병렬 방식이므로 여러 스레드를 이용하여 비동기처럼 작동하게 됩니다.
하지만 본 메소드는Publisher
의 메소드이므로map()
메소드에서 반환하는 값이Publisher
여야 합니다.
이렇게map()
으로도 비동기 처리를 할 수는 있습니다.
하지만 위 코드는 원하는 바를 이룰 수 없습니다.
위 코드를 실행하면 비동기적으로 처리는 하나, 결과가MonoJust
객체가 됩니다.
즉 내부에 있는 값을 가져올 수 없다는 뜻입니다.
그래서 내부에 있는 값을 가져오기 위해서는 다음과 같이 작성해야 합니다.Flux.just("apple", "banana", "melon", "mango", "grape", "strawberry", "eggplant", "watermelon", "kiwi") .window(1) .map(f -> f.map(s -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return s.toUpperCase(); }).subscribeOn(Schedulers.parallel())) .flatMap(f -> f) .doOnNext(System.out::println) .blockLast();이렇게
flatMap()
을 이용하여 값을 다시 합치는 작업을 진행하게 되는데,
flatMap()
을 사용할 때도 내부적으로map()
을 사용했던 것처럼 둘 다 사용하게 됩니다.
결국엔 무엇이 주냐에 따라 변하게 됩니다.
물론 다른 방법으로 하는 방법도 존재할 수 있습니다.
flatMap()
메소드는 순서를 보장하지 않기 때문에 flatMap()
메소드를 통한 비동기 처리 구문은 한계가 있습니다.
그래서 순서를 보장하도록 변경하려면 flatMap()
대신 concatMap()
메소드를 사용할 수 있습니다.
Flux.just("apple", "banana", "melon", "mango", "grape", "strawberry", "eggplant", "watermelon", "kiwi")
.window(1)
.concatMap(f -> f.map(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}).subscribeOn(Schedulers.parallel()))
.doOnNext(System.out::println)
.blockLast();
실행해보면 'apple'부터 'kiwi'까지 순서를 보장하는 것을 볼 수 있습니다.
하지만 concatMap()
은 순서를 보장하기 위해서 동기적으로 작동합니다.
그래서 이후에 사용한 subscribeOn(Schedulers.parallel())
의 효과를 볼 수 없습니다.
그래서 약 9초의 시간이 걸리게 됩니다.
저희가 원한 것은 이게 아니므로 다른 것을 찾아보아야 합니다.
Flux.just("apple", "banana", "melon", "mango", "grape", "strawberry", "eggplant", "watermelon", "kiwi")
.window(1)
.flatMapSequential(f -> f.map(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}).subscribeOn(Schedulers.parallel()))
.doOnNext(System.out::println)
.blockLast();
이렇게 flatMapSequential()
메소드를 이용하면 순서도 보장하면서 비동기적으로 처리할 수 있습니다.
그러면 flatMapSequential()
메소드는 어떻게 순서도 보장하면서 비동기적으로 처리도 할 수 있을까요?
flatMapSequential()
메소드는 데이터 스트림에 있는 모든 데이터를 트리거하여 실행은 시키나,
결과를 반환하는 시점이 가장 먼저 들어온 데이터가 처리되었을 때입니다.
말이 조금 어려울 수도 있는데 만약 'apple' 작업을 하는 동안
뒤에 있는 'banana', 'melon', 'grape'의 대문자 변환 작업이 완료되었다고 합시다.
그러면 바로 값을 반환하는 것이 아니라 'apple' 데이터가 처리되는 순간 순서대로 반환하게 됩니다.
하지만 'apple', 'banana', 'melon' 순서대로 반환하고
'grape'는 처리가 되었음에도 불구하고 'mango'가 아직 대기중이므로 같이 대기합니다.