-
Notifications
You must be signed in to change notification settings - Fork 0
05. How to merge Publishers
오늘은 데이터 스트림을 합병하는 방법에 대해 알아보도록 하겠습니다.
다음은 간단하게 여러 개의 데이터를 가지고 있는 Flux
간의 합병입니다.
Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1))
.take(3)
.map(String::valueOf);
Flux<String> flux2 = Flux.just("가", "나", "다");
flux1.mergeWith(flux2)
.doOnNext(System.out::println)
.blockLast();
간단하게 합병하는 방법은 Flux
의 mergeWith()
메소드를 사용하는 것입니다.
flux1
은 interval()
을 통해 1초의 딜레이 후 데이터를 생성하는 Flux
이고,
flux2
는 데이터를 바로 출력할 수 있는 Flux
이므로
둘을 합병하고 출력하면 flux2
의 데이터들이 먼저 출력됩니다.
public final Flux mergeWith(Publisher<? extends T> other)
Descriptions:
Merge data from this Flux and a Publisher into an interleaved merged sequence.
Unlike concat, inner sources are subscribed to eagerly.Note that merge is tailored to work with asynchronous sources or finite sources.
When dealing with an infinite source that doesn't already publish on a dedicated Scheduler,
you must isolate that source in its own Scheduler,
as merge would otherwise attempt to drain it before subscribing to another source.Parameters:
other - the Publisher to merge withReturns:
a new Flux
만약 flux1
의 데이터가 먼저 출력되도록 보장하고 싶다면 concatWith()
메소드를 사용할 수 있습니다.
Flux<String> flux1 = Flux.interval(Duration.ofSeconds(1))
.take(3)
.map(String::valueOf);
Flux<String> flux2 = Flux.just("가", "나", "다");
flux1.concatWith(flux2)
.doOnNext(System.out::println)
.blockLast();
mergeOrderedWith()
라는 메소드도 제공하는데 이 메소드는 두 Publisher
의 순서가 아니라,
데이터를 기준으로 순서를 보장합니다.
따라서 어떤 Flux
가 먼저 실행되도록 순서를 보장한다기 보다는
모든 Flux
의 데이터 중 우선순위에 맞도록 반환하는 것을 보장한다고 할 수 있습니다.
Flux<Double> flux1 = Flux.interval(Duration.ofSeconds(1))
.take(3)
.map(Double::valueOf);
Flux<Double> flux2 = Flux.just(0.5, 1.5, 2.5, 3.5);
Comparator<Double> comparator = (x, y) -> x.compareTo(y);
flux1.mergeOrderedWith(flux2, comparator)
.doOnNext(System.out::println)
.blockLast();
여기서 flux1
의 데이터는 1.0, 2.0, 3.0 이고 flux2
의 데이터는 0.5, 1.5, 2.5, 3.5 입니다.
만약 concatWith()
였다면 [1.0, 2.0, 3.0] 먼저 출력 후 [0.5, 1.5, 2.5, 3.5] 출력하도록 하거나 그 반대였을 것입니다.
하지만 mergeOrderedWith()
는 [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5]를 출력하도록 하거나 리버스된 형태로 출력합니다.
public final Flux mergeOrderedWith(Publisher<? extends T> other,
Comparator<? super T> otherComparator)Descriptions:
Merge data from this Flux and a Publisher into a reordered merge sequence,
by picking the smallest value from each sequence as defined by a provided Comparator.
Note that subsequent calls are combined,
and their comparators are in lexicographic order as defined by Comparator.thenComparing(Comparator).
The combination step is avoided if the two Comparators are equal
(which can easily be achieved by using the same reference, and is also always true of Comparator.naturalOrder()).Note that merge is tailored to work with asynchronous sources or finite sources.
When dealing with an infinite source that doesn't already publish on a dedicated Scheduler,
you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.Parameters:
other - the Publisher to merge with
otherComparator - the Comparator to use for merging
Returns:
a new Flux
Flux간 순서를 보장하는 합병 - concatWith()
데이터간 순서를 보장하는 합병 - mergeOrderedWith()
하나의 Publisher
가 아니라 여러 Publisher
를 합치고 싶다면 Flux.concat()
메소드를 사용할 수 있습니다.
FLux.concat()
메소드는 매개변수로 받은 가변인자 Publisher
를 받은 순서대로 합병합니다.
Mono<String> mono1 = Mono.just("가");
Mono<String> mono2 = Mono.just("나");
Mono<String> mono3 = Mono.just("다");
Flux.concat(mono1, mono2, mono3)
.doOnNext(System.out::println)
.blockLast();