Skip to content

05. How to merge Publishers

이진혁 edited this page May 25, 2021 · 1 revision

오늘은 데이터 스트림을 합병하는 방법에 대해 알아보도록 하겠습니다.
다음은 간단하게 여러 개의 데이터를 가지고 있는 Flux 간의 합병입니다.

Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1))
    .take(3)
    .map(String::valueOf);
Flux<String> flux2 = Flux.just("가", "나", "다");

flux1.mergeWith(flux2)
    .doOnNext(System.out::println)
    .blockLast();

간단하게 합병하는 방법은 FluxmergeWith() 메소드를 사용하는 것입니다.
flux1interval()을 통해 1초의 딜레이 후 데이터를 생성하는 Flux이고,
flux2는 데이터를 바로 출력할 수 있는 Flux이므로
둘을 합병하고 출력하면 flux2의 데이터들이 먼저 출력됩니다.

public final Flux mergeWith(Publisher<? extends T> other)

Descriptions:
Merge data from this Flux and a Publisher into an interleaved merged sequence.
Unlike concat, inner sources are subscribed to eagerly.

Note that merge is tailored to work with asynchronous sources or finite sources.
When dealing with an infinite source that doesn't already publish on a dedicated Scheduler,
you must isolate that source in its own Scheduler,
as merge would otherwise attempt to drain it before subscribing to another source.

Parameters:
other - the Publisher to merge with

Returns:
a new Flux

Example

가장 간단한 Flux 합병 - mergeWith()

만약 flux1의 데이터가 먼저 출력되도록 보장하고 싶다면 concatWith() 메소드를 사용할 수 있습니다.

Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1))
    .take(3)
    .map(String::valueOf);
Flux<String> flux2 = Flux.just("가", "나", "다");

flux1.concatWith(flux2)
    .doOnNext(System.out::println)
    .blockLast();

mergeOrderedWith()라는 메소드도 제공하는데 이 메소드는 두 Publisher의 순서가 아니라,
데이터를 기준으로 순서를 보장합니다.
따라서 어떤 Flux가 먼저 실행되도록 순서를 보장한다기 보다는
모든 Flux의 데이터 중 우선순위에 맞도록 반환하는 것을 보장한다고 할 수 있습니다.

Flux<Double> flux1 = Flux.interval(Duration.ofSeconds(1))
    .take(3)
    .map(Double::valueOf);
Flux<Double> flux2 = Flux.just(0.5, 1.5, 2.5, 3.5);

Comparator<Double> comparator = (x, y) -> x.compareTo(y);
flux1.mergeOrderedWith(flux2, comparator)
    .doOnNext(System.out::println)
    .blockLast();

여기서 flux1의 데이터는 1.0, 2.0, 3.0 이고 flux2의 데이터는 0.5, 1.5, 2.5, 3.5 입니다.
만약 concatWith()였다면 [1.0, 2.0, 3.0] 먼저 출력 후 [0.5, 1.5, 2.5, 3.5] 출력하도록 하거나 그 반대였을 것입니다.
하지만 mergeOrderedWith()는 [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5]를 출력하도록 하거나 리버스된 형태로 출력합니다.

public final Flux mergeOrderedWith(Publisher<? extends T> other,
Comparator<? super T> otherComparator)

Descriptions:
Merge data from this Flux and a Publisher into a reordered merge sequence,
by picking the smallest value from each sequence as defined by a provided Comparator.
Note that subsequent calls are combined,
and their comparators are in lexicographic order as defined by Comparator.thenComparing(Comparator).
The combination step is avoided if the two Comparators are equal
(which can easily be achieved by using the same reference, and is also always true of Comparator.naturalOrder()).

Note that merge is tailored to work with asynchronous sources or finite sources.
When dealing with an infinite source that doesn't already publish on a dedicated Scheduler,
you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

Parameters:
other - the Publisher to merge with
otherComparator - the Comparator to use for merging
Returns:
a new Flux

Example

Flux간 순서를 보장하는 합병 - concatWith()
데이터간 순서를 보장하는 합병 - mergeOrderedWith()

하나의 Publisher가 아니라 여러 Publisher를 합치고 싶다면 Flux.concat() 메소드를 사용할 수 있습니다.
FLux.concat() 메소드는 매개변수로 받은 가변인자 Publisher를 받은 순서대로 합병합니다.

Mono<String> mono1 = Mono.just("가");
Mono<String> mono2 = Mono.just("나");
Mono<String> mono3 = Mono.just("다");

Flux.concat(mono1, mono2, mono3)
    .doOnNext(System.out::println)
    .blockLast();

Example

여러 Publisher 합병 - Flux.concat()


Reference

Flux Docs
백기선님의 Merge 강의