diff --git a/README.md b/README.md
index 5242df6c..5b51c4dc 100644
--- a/README.md
+++ b/README.md
@@ -329,53 +329,6 @@ all of them.
complete backlog is read at once.
-
`allowDifferentTopicSchemas`
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala
index 27f90f8a..bc301e7d 100644
--- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala
+++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala
@@ -225,30 +225,14 @@ private[pulsar] case class PulsarMetadataReader(
}.toMap)
}
-
- def forwardOffset(actualOffset: Map[String, MessageId],
- strategy: String,
- numberOfEntriesToForward: Long,
- ensureEntriesPerTopic: Long): SpecificPulsarOffset = {
+ def fetchNextOffsetWithMaxEntries(actualOffset: Map[String, MessageId],
+ numberOfEntries: Long): SpecificPulsarOffset = {
getTopicPartitions()
// Collect internal stats for all topics
val topicStats = topicPartitions.map( topic => {
- val internalStats = admin.topics().getInternalStats(topic)
- val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
- topic -> TopicState(internalStats,
- PulsarSourceUtils.getLedgerId(topicActualMessageId),
- PulsarSourceUtils.getEntryId(topicActualMessageId))
- } ).toMap
-
- val forwarder = strategy match {
- case PulsarOptions.ProportionalForwardStrategy =>
- new ProportionalForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
- case PulsarOptions.LargeFirstForwardStrategy =>
- new LargeFirstForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
- case _ =>
- new LinearForwardStrategy(numberOfEntriesToForward)
- }
+ topic -> admin.topics().getInternalStats(topic)
+ } ).toMap.asJava
SpecificPulsarOffset(topicPartitions.map { topic =>
topic -> PulsarSourceUtils.seekableLatestMid {
@@ -262,39 +246,39 @@ private[pulsar] case class PulsarMetadataReader(
// Get the partition index
val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId)
// Cache topic internal stats
- val internalStats = topicStats.get(topic).get.internalStat
+ val internalStats = topicStats.get(topic)
// Calculate the amount of messages we will pull in
- val numberOfEntriesPerTopic = forwarder.forward(topicStats)(topic)
- // Get a future message ID which corresponds
- // to the maximum number of messages
+ val numberOfEntriesPerTopic = numberOfEntries / topics.size
+ // Get a next message ID which respects
+ // the maximum number of messages
val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId(
internalStats,
actualLedgerId,
actualEntryId,
numberOfEntriesPerTopic)
- // Build a message id
- val forwardedMessageId =
+ // Build the next message ID
+ val nextMessageId =
DefaultImplementation.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
// Log state
- val forwardedEntry = TopicInternalStatsUtils.numOfEntriesUntil(
+ val entryCountUntilNextMessageId = TopicInternalStatsUtils.numOfEntriesUntil(
internalStats, nextLedgerId, nextEntryId)
val entryCount = internalStats.numberOfEntries
- val progress = f"${forwardedEntry.toFloat / entryCount.toFloat}%1.3f"
- val logMessage = s"Pulsar Connector forward on topic. " +
- s"[$numberOfEntriesPerTopic/$numberOfEntriesToForward]" +
+ val progress = f"${entryCountUntilNextMessageId.toFloat / entryCount.toFloat}%1.3f"
+ val logMessage = s"Pulsar Connector offset step forward. " +
+ s"[$numberOfEntriesPerTopic/$numberOfEntries]" +
s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " +
- s"$forwardedMessageId ($forwardedEntry/$entryCount) [$progress]"
+ s"$nextMessageId ($entryCountUntilNextMessageId/$entryCount) [$progress]"
log.debug(logMessage)
// Return the message ID
- forwardedMessageId
+ nextMessageId
} catch {
case e: PulsarAdminException if e.getStatusCode == 404 =>
MessageId.earliest
case e: Throwable =>
throw new RuntimeException(
s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " +
- s"(tried to forward ${forwarder.forward(topicStats)(topic)} messages " +
- s"starting from `$topicActualMessageId` using strategy $strategy)", e)
+ s"(tried to forward ${numberOfEntries} messages " +
+ s"starting from `$topicActualMessageId`)", e)
}
}
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala
index 457cceb7..b39a8ada 100644
--- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala
+++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala
@@ -34,10 +34,6 @@ private[pulsar] object PulsarOptions {
val PartitionSuffix: String = TopicName.PARTITIONED_TOPIC_SUFFIX
val MaxEntriesPerTrigger = "maxentriespertrigger"
- val EnsureEntriesPerTopic = "ensureentriespertopic"
- val ForwardStrategy = "forwardstrategy"
- val ProportionalForwardStrategy = "proportional"
- val LargeFirstForwardStrategy = "large-first"
val TopicOptionKeys = Set(
TopicSingle,
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
index c8a088ae..bdcbb879 100644
--- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
+++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
@@ -113,9 +113,7 @@ private[pulsar] class PulsarProvider
failOnDataLoss(caseInsensitiveParams),
subscriptionNamePrefix,
jsonOptions,
- maxEntriesPerTrigger(caseInsensitiveParams),
- minEntriesPerTopic(caseInsensitiveParams),
- forwardStrategy(caseInsensitiveParams)
+ maxEntriesPerTrigger(caseInsensitiveParams)
)
}
@@ -384,12 +382,6 @@ private[pulsar] object PulsarProvider extends Logging {
private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
caseInsensitiveParams.getOrElse(MaxEntriesPerTrigger, "-1").toLong
- private def minEntriesPerTopic(caseInsensitiveParams: Map[String, String]): Long =
- caseInsensitiveParams.getOrElse(EnsureEntriesPerTopic, "0").toLong
-
- private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String =
- caseInsensitiveParams.getOrElse(ForwardStrategy, "simple")
-
private def validateGeneralOptions(
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
index 98378f69..aa4b2fe5 100644
--- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
+++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
@@ -37,9 +37,7 @@ private[pulsar] class PulsarSource(
failOnDataLoss: Boolean,
subscriptionNamePrefix: String,
jsonOptions: JSONOptionsInRead,
- maxEntriesPerTrigger: Long,
- ensureEntriesPerTopic: Long,
- forwardStrategy: String)
+ maxEntriesPerTrigger: Long)
extends Source
with Logging {
@@ -72,11 +70,11 @@ private[pulsar] class PulsarSource(
} else {
currentTopicOffsets match {
case Some(value) =>
- metadataReader.forwardOffset(value,
- forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
+ metadataReader.fetchNextOffsetWithMaxEntries(value,
+ maxEntriesPerTrigger)
case _ =>
- metadataReader.forwardOffset(initialTopicOffsets.topicOffsets,
- forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
+ metadataReader.fetchNextOffsetWithMaxEntries(initialTopicOffsets.topicOffsets,
+ maxEntriesPerTrigger)
}
}
logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}")
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategy.scala b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategy.scala
deleted file mode 100644
index ed39885b..00000000
--- a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategy.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.pulsar.topicinternalstats.forward
-
-/**
- * Forward strategy which sorts the topics by their backlog size starting
- * with the largest, and forwards topics starting from the beginning of
- * this list as the maximum entries parameter allows (taking into account
- * the number entries that need to be added anyway if
- *
- * @param additionalEntriesPerTopic is set).
- *
- * If the maximum entries to forward is `100`, topics will be forwarded
- * like this (provided there is no minimum entry number specified:
- * | topic name | backlog size | forward amount |
- * |------------|--------------|----------------|
- * | topic-1 | 60 | 60 |
- * | topic-2 | 50 | 40 |
- * | topic-3 | 40 | 0 |
- *
- * If @param ensureEntriesPerTopic is specified, then every topic will be
- * forwarded by that value in addition to this (taking the backlog size of
- * the topic into account so that bandwidth is not wasted). Given maximum
- * entries is `100`, minimum entries is `10`, topics will be forwarded like
- * this:
- *
- * | topic name | backlog size | forward amount |
- * |------------|--------------|----------------|
- * | topic-1 | 60 | 10 + 50 = 60 |
- * | topic-2 | 50 | 10 + 30 = 30 |
- * | topic-3 | 40 | 10 + 0 = 10 |
- * @param maxEntriesAltogetherToForward Maximum entries in all topics to forward.
- * Individual topics forward values will sum
- * up to this value.
- * @param ensureEntriesPerTopic All topics will be forwarded by this value. The goal
- * of this parameter is to ensure that topics with a very
- * small backlog are also forwarded with a given minimal
- * value. Has a higher precedence than
- * @param maxEntriesAltogetherToForward.
- */
-class LargeFirstForwardStrategy(maxEntriesAltogetherToForward: Long,
- ensureEntriesPerTopic: Long) extends ForwardStrategy {
- override def forward(topics: Map[String, TopicState]): Map[String, Long] = {
-
- // calculate all remaining entries per topic, ordering them by remaining entry count
- // in a reverse order
- val topicBacklogs = topics
- .map{
- case(topicName, topicStat) =>
- val internalStat = topicStat.internalStat
- val ledgerId = topicStat.actualLedgerId
- val entryId = topicStat.actualEntryId
- (topicName, TopicInternalStatsUtils.numOfEntriesAfter(internalStat, ledgerId, entryId))
- }
- .toList
- .sortBy{ case(_, numOfEntriesAfterPosition) => numOfEntriesAfterPosition }
- .reverse
-
- // calculate quota based on the ensured entry count
- // this will be distributed between individual topics
- var quota = Math.max(maxEntriesAltogetherToForward - ensureEntriesPerTopic * topics.size, 0)
-
- val result = for ((topic, topicBacklogSize) <- topicBacklogs) yield {
- // try to increase topic by this number
- // - if we have already ran out of quota, do not move topic
- // - if we do not have enough quota, proceed with the quota (exhaust it completely)
- // - if we have enough quota, exhaust all topic content (and decrease it later)
- // - take the number of ensured entries into account when calculating quota
- val forwardTopicBy = if (quota > 0) {
- Math.min(quota, topicBacklogSize - ensureEntriesPerTopic)
- } else {
- 0
- }
- // calculate forward position for a topic, make sure that it is
- // always increased by the configured ensure entry count
- val resultEntry = topic -> (ensureEntriesPerTopic + forwardTopicBy)
- // decrease the overall quota separately
- quota -= (topicBacklogSize - ensureEntriesPerTopic)
- // return already calculated forward position
- resultEntry
- }
-
- result.toMap
- }
-}
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategy.scala b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategy.scala
deleted file mode 100644
index 5dc994a0..00000000
--- a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategy.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.pulsar.topicinternalstats.forward
-
-/**
- * Simple forward strategy, which forwards every topic evenly, not
- * taking actual backlog sizes into account. Might waste bandwidth
- * when the backlog of the topic is smaller than the calculated value
- * for that topic.
- *
- * If the maximum entries to forward is `150`, topics will be forwarded
- * like this (provided there is no minimum entry number specified:
- * | topic name | backlog size | forward amount |
- * |------------|--------------|----------------|
- * | topic-1 | 60 | 50 |
- * | topic-2 | 50 | 50 |
- * | topic-3 | 40 | 50 |
- *
- * @param maxEntriesAltogetherToForward Maximum entries in all topics to
- * forward. Will forward every topic
- * by dividing this with the number of
- * topics.
- */
-class LinearForwardStrategy(maxEntriesAltogetherToForward: Long) extends ForwardStrategy {
- override def forward(topics: Map[String, TopicState]): Map[String, Long] =
- topics
- .map{ case (topicName, _) =>
- topicName -> (maxEntriesAltogetherToForward / topics.size) }
-}
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategy.scala b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategy.scala
deleted file mode 100644
index b52f7049..00000000
--- a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategy.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.pulsar.topicinternalstats.forward
-
-/**
- * This forward strategy will forward individual topic backlogs based on
- * their size proportional to the size of the overall backlog (considering
- * all topics).
- *
- * If the maximum entries to forward is `100`, topics will be forwarded
- * like this (provided there is no minimum entry number specified:
- * | topic name | backlog size | forward amount |
- * |------------|--------------|--------------------------|
- * |topic-1 | 60 | 100*(60/(60+50+40)) = 40 |
- * |topic-2 | 50 | 100*(50/(60+50+40)) = 33 |
- * |topic-3 | 40 | 100*(40/(60+50+40)) = 27 |
- *
- * If @param ensureEntriesPerTopic is specified, then every topic will be
- * forwarded by that value in addition to this (taking the backlog size of
- * the topic into account so that bandwidth is not wasted).
- * Given maximum entries is `100`, minimum entries is `10`, topics will be
- * forwarded like this:
- *
- * | topic name | backlog size | forward amount |
- * |------------|--------------|----------------------------|
- * |topic-1 | 60 | 10+70*(60/(60+50+40)) = 38 |
- * |topic-2 | 50 | 10+70*(50/(60+50+40)) = 33 |
- * |topic-3 | 40 | 10+70*(40/(60+50+40)) = 29 |
- *
- * @param maxEntriesAltogetherToForward Maximum entries in all topics to forward.
- * Individual topics forward values will sum
- * up to this value.
- * @param ensureEntriesPerTopic All topics will be forwarded by this value. The goal
- * of this parameter is to ensure that topics with a very
- * small backlog are also forwarded with a given minimal
- * value. Has a higher precedence than
- * @param maxEntriesAltogetherToForward.
- */
-class ProportionalForwardStrategy(maxEntriesAltogetherToForward: Long,
- ensureEntriesPerTopic: Long) extends ForwardStrategy {
- override def forward(topics: Map[String, TopicState]): Map[String, Long] = {
- // calculate all remaining entries per topic
- val topicBacklogs = topics
- .map{
- case (topicName, topicStat) =>
- val internalStat = topicStat.internalStat
- val ledgerId = topicStat.actualLedgerId
- val entryId = topicStat.actualEntryId
- (topicName, TopicInternalStatsUtils.numOfEntriesAfter(internalStat, ledgerId, entryId))
- }
- .toList
-
- // this is the size of the complete backlog (the sum of all individual topic
- // backlogs)
- val completeBacklogSize = topicBacklogs
- .map{ case (_, topicBacklogSize) => topicBacklogSize }
- .sum
-
- // calculate quota based on the ensured entry count
- // this will be distributed between individual topics
- val quota = Math.max(maxEntriesAltogetherToForward - ensureEntriesPerTopic * topics.size, 0)
-
- topicBacklogs.map {
- case (topicName: String, backLog: Long) =>
- // when calculating the coefficient, do not take the number of additional entries into
- // account (that we will add anyway)
- val topicBacklogCoefficient = if (completeBacklogSize == 0) {
- 0.0 // do not forward if there is no backlog
- } else {
- // take the ensured entries into account when calculating
- // backlog coefficient
- val backlogWithoutAdditionalEntries =
- Math.max(backLog - ensureEntriesPerTopic, 0).toFloat
- val completeBacklogWithoutAdditionalEntries =
- (completeBacklogSize - ensureEntriesPerTopic * topics.size).toFloat
- backlogWithoutAdditionalEntries / completeBacklogWithoutAdditionalEntries
- }
- topicName -> (ensureEntriesPerTopic + (quota * topicBacklogCoefficient).toLong)
- }.toMap
- }
-}
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ForwardStrategy.scala b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicState.scala
similarity index 81%
rename from src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ForwardStrategy.scala
rename to src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicState.scala
index ec75ba7b..c1bc0218 100644
--- a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ForwardStrategy.scala
+++ b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicState.scala
@@ -11,14 +11,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark.sql.pulsar.topicinternalstats.forward
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats
-trait ForwardStrategy {
- def forward(topics: Map[String, TopicState]): Map[String, Long]
-}
-
case class TopicState(internalStat: PersistentTopicInternalStats,
- actualLedgerId: Long,
- actualEntryId: Long)
+ ledgerId: Long,
+ entryId: Long)
diff --git a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategySuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategySuite.scala
deleted file mode 100644
index 993b383a..00000000
--- a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategySuite.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.pulsar.topicinternalstats.forward
-
-import TopicStateFixture.{createLedgerInfo, _}
-import org.apache.spark.SparkFunSuite
-
-class LargeFirstForwardStrategySuite extends SparkFunSuite {
-
- test("forward empty topics") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(),
- 0, 0
- ))
- val testForwarder = new LargeFirstForwardStrategy(10, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 0)
- }
-
- test("forward a single topic with a single ledger") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200)
- ),
- 0, 0
- ))
- val testForwarder = new LargeFirstForwardStrategy(10, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 10)
- }
-
- test("forward a single topic with multiple ledgers") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- createLedgerInfo(2000, 200)
- ),
- 0, 0
- ))
- val testForwarder = new LargeFirstForwardStrategy(350, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 350)
- }
-
- test("forward a single topic with the biggest backlog") {
- val fakeState = Map(
- "topic1" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- ),
- 0, 0
- ),
- "topic2" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 400),
- ),
- 0, 0
- ),
- "topic3" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 600),
- ),
- 0, 0
- ))
- val testForwarder = new LargeFirstForwardStrategy(15, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 3)
- assert(result("topic3") == 15)
- assert(result("topic2") == 0)
- assert(result("topic1") == 0)
- }
-
- test("forward multiple topics if the backlog is small enough") {
- val fakeState = Map(
- "topic1" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 20),
- ),
- 0, 0
- ),
- "topic2" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 40),
- ),
- 0, 0
- ),
- "topic3" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 60),
- ),
- 0, 0
- ))
- val testForwarder = new LargeFirstForwardStrategy(100, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 3)
- assert(result("topic3") == 60)
- assert(result("topic2") == 40)
- assert(result("topic1") == 0)
- }
-
- test("forward by additional entries regardless of backlog size") {
- val maxEntries = 130
- val additionalEntries = 10
- val topic1Backlog = 80
- val topic2Backlog = 60
- val topic3Backlog = 40
- val fakeState = Map(
- "topic1" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, topic1Backlog),
- ),
- 0, 0
- ),
- "topic2" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, topic2Backlog),
- ),
- 0, 0
- ),
- "topic3" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, topic3Backlog),
- ),
- 0, 0
- ))
- val testForwarder = new LargeFirstForwardStrategy(maxEntries, additionalEntries)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 3)
-
- assert(result("topic1") >= additionalEntries)
- assert(result("topic2") >= additionalEntries)
- assert(result("topic3") == additionalEntries)
-
- }
-
- test("additional entries to forward has a higher precedence than max allowed entries") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(),
- 0, 0
- ))
-
- val testForwarder = new LargeFirstForwardStrategy(10, 20)
- val result = testForwarder.forward(fakeState)
-
- assert(result("topic1") == 20)
- }
-
- test("forward from the middle of the first topic ledger") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200)
- ),
- 1000, 20
- ))
- val testForwarder = new LargeFirstForwardStrategy(80, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 80)
- }
-
- test("forward from the middle of the last topic ledger") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- createLedgerInfo(2000, 200),
- createLedgerInfo(3000, 200)
- ),
- 3000, 20
- ))
- val testForwarder = new LargeFirstForwardStrategy(80, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 80)
- }
-
-}
-
diff --git a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategySuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategySuite.scala
deleted file mode 100644
index 087356f8..00000000
--- a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategySuite.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.pulsar.topicinternalstats.forward
-
-import TopicStateFixture._
-
-import org.apache.spark.SparkFunSuite
-
-class LinearForwardStrategySuite extends SparkFunSuite {
-
- test("forward empty topics") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(),
- 0, 0
- ))
- val testForwarder = new LinearForwardStrategy(10)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 10)
- }
-
- test("forward a single topic with a single ledger") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200)
- ),
- 0, 0
- ))
- val testForwarder = new LinearForwardStrategy(10)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 10)
- }
-
- test("forward a single topic with multiple ledgers") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- createLedgerInfo(2000, 200)
- ),
- 0, 0
- ))
- val testForwarder = new LinearForwardStrategy(350)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 350)
- }
-
- test("forward multiple topics with single ledger") {
- val fakeState = Map(
- "topic1" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- ),
- 0, 0
- ),
- "topic2" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- ),
- 0, 0
- ),
- "topic3" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- ),
- 0, 0
- ))
- val testForwarder = new LinearForwardStrategy(15)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 3)
- assert(result("topic1") == 5)
- assert(result("topic2") == 5)
- assert(result("topic3") == 5)
- }
-
- test("forward from the middle of the first topic ledger") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200)
- ),
- 1000, 20
- ))
- val testForwarder = new LinearForwardStrategy(80)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 80)
- }
-
- test("forward from the middle of the last topic ledger") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- createLedgerInfo(2000, 200),
- createLedgerInfo(3000, 200)
- ),
- 3000, 20
- ))
- val testForwarder = new LinearForwardStrategy(80)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 80)
- }
-
-}
-
diff --git a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategySuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategySuite.scala
deleted file mode 100644
index d811871d..00000000
--- a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategySuite.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.pulsar.topicinternalstats.forward
-
-import TopicStateFixture._
-
-import org.apache.spark.SparkFunSuite
-
-class ProportionalForwardStrategySuite extends SparkFunSuite {
-
- test("forward empty topics") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(),
- 0, 0
- ))
- val testForwarder = new ProportionalForwardStrategy(10, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 0)
- }
-
- test("forward a single topic with a single ledger") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200)
- ),
- 0, 0
- ))
- val testForwarder = new ProportionalForwardStrategy(10, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 10)
- }
-
- test("forward a single topic with multiple ledgers") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- createLedgerInfo(2000, 200)
- ),
- 0, 0
- ))
- val testForwarder = new ProportionalForwardStrategy(350, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 350)
- }
-
- test("forward a single topic with the biggest backlog") {
- val maxEntries = 12
- val fakeState = Map(
- "topic1" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- ),
- 0, 0
- ),
- "topic2" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 400),
- ),
- 0, 0
- ),
- "topic3" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 600),
- ),
- 0, 0
- ))
- val testForwarder = new ProportionalForwardStrategy(maxEntries, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 3)
- assert(result("topic1") == (maxEntries.toFloat / 6.0).toInt)
- assert(result("topic2") == (maxEntries.toFloat / 3.0).toInt)
- assert(result("topic3") == (maxEntries.toFloat / 2.0).toInt)
- }
-
- test("forward multiple topics at the same time") {
- val fakeState = Map(
- "topic1" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 20),
- ),
- 0, 0
- ),
- "topic2" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 40),
- ),
- 0, 0
- ),
- "topic3" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 60),
- ),
- 0, 0
- ))
- val testForwarder = new ProportionalForwardStrategy(100, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 3)
- assert(result("topic3") > 0)
- assert(result("topic2") > 0)
- assert(result("topic1") > 0)
- }
-
- test("forward by additional entries regardless of backlog size") {
- val maxEntries = 50
- val additionalEntries = 10
- val topic1Backlog = 10000
- val topic2Backlog = 20000
- val topic3Backlog = 10
- val fakeState = Map(
- "topic1" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, topic1Backlog),
- ),
- 0, 0
- ),
- "topic2" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, topic2Backlog),
- ),
- 0, 0
- ),
- "topic3" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, topic3Backlog),
- ),
- 0, 0
- ))
- val testForwarder = new ProportionalForwardStrategy(maxEntries, additionalEntries)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 3)
-
- assert(result("topic1") >= additionalEntries)
- assert(result("topic2") >= additionalEntries)
- assert(result("topic3") == additionalEntries)
-
- }
-
- test("additional entries to forward has a higher precedence than topic backlog size") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 10)
- ),
- 0, 0
- ))
-
- val testForwarder = new ProportionalForwardStrategy(10, 20)
- val result = testForwarder.forward(fakeState)
-
- assert(result("topic1") == 20)
- }
-
- test("take the additional entries into account when calculating individual topic forward ratio") {
- val fakeState = Map(
- "topic1" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 1000),
- ),
- 0, 0
- ),
- "topic2" -> createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 2000),
- ),
- 0, 0
- ))
- val numberOfFakeTopics = fakeState.size
- val ensureAdditionalEntriesPerTopic = 500
- val entriesOnTopOfAdditionalEntries = 100
- val maxEntries = entriesOnTopOfAdditionalEntries + ensureAdditionalEntriesPerTopic * numberOfFakeTopics
-
- val testForwarder = new ProportionalForwardStrategy(maxEntries, ensureAdditionalEntriesPerTopic)
- val result = testForwarder.forward(fakeState)
-
- assert(result("topic1") ==
- (entriesOnTopOfAdditionalEntries.toFloat / 4.0).toInt
- + ensureAdditionalEntriesPerTopic)
- assert(result("topic2") ==
- (entriesOnTopOfAdditionalEntries.toFloat * 3.0 / 4.0).toInt
- + ensureAdditionalEntriesPerTopic)
- }
-
- test("forward from the middle of the first topic ledger") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200)
- ),
- 1000, 20
- ))
- val testForwarder = new ProportionalForwardStrategy(80, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 80)
- }
-
- test("forward from the middle of the last topic ledger") {
- val fakeState = Map( "topic1" ->
- createTopicState(
- createPersistentTopicInternalStat(
- createLedgerInfo(1000, 200),
- createLedgerInfo(2000, 200),
- createLedgerInfo(3000, 200)
- ),
- 3000, 20
- ))
- val testForwarder = new ProportionalForwardStrategy(80, 0)
- val result = testForwarder.forward(fakeState)
-
- assert(result.size == 1)
- assert(result("topic1") == 80)
- }
-
-}
|