Skip to content

Commit

Permalink
add: parameters to control number and the distribution of messages in…
Browse files Browse the repository at this point in the history
… a micro-batch
  • Loading branch information
Attila Tóth committed Nov 5, 2021
1 parent 29d01c4 commit b9da6dc
Show file tree
Hide file tree
Showing 16 changed files with 1,589 additions and 13 deletions.
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,77 @@ You can use `org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, Mess
This may cause a false alarm. You can set it to `false` when it doesn't work as you expected. <br>

A batch query always fails if it fails to read any data from the provided offsets due to data loss.</td>
</tr>
<tr>
<td>
`maxEntriesPerTrigger`
</td>
<td>
Number of entries to include in a single micro-batch during
streaming.
</td>
<td>-1</td>
<td>Streaming query</td>
<td>This parameter controls how many Pulsar entries are read by
the connector from the topic backlog at once. If the topic
backlog is considerably high, users can use this parameter
to limit the size of the micro-batch. If multiple topics are read,
this parameter controls the complete number of entries fetched from
all of them.

*Note:* Entries might contain multiple messages. The default value of `-1` means that the
complete backlog is read at once.</td>
</tr>

<tr>
<td>
`forwardStrategy`
</td>
<td>
`simple`, `large-first` or `proportional`
</td>
<td>`simple`</td>
<td>Streaming query</td>
<td>If `maxEntriesPerTrigger` is set, this parameter controls
which forwarding strategy is in use during the read of multiple
topics.
<li>
`simple` just divides the allowed number of entries equally
between all topics, regardless of their backlog size
</li>
<li>
`large-first` will load the largest topic backlogs first,
as the maximum number of allowed entries allows
</li>
<li>
`proportional` will forward all topics proportional to the
topic backlog/overall backlog ratio
</li>
</td>
</tr>

<tr>
<td>
`ensureEntriesPerTopic`
</td>
<td>Number to forward each topic with during a micro-batch.</td>
<td>0</td>
<td>Streaming query</td>
<td>If multiple topics are read, and the maximum number of
entries is also specified, always forward all topics with the
amount of entries specified here. Using this, users can ensure that topics
with considerably smaller backlogs than others are also forwarded
and read. Note that:
<li>If this number is higher than the maximum allowed entries divided
by the number of topics, then this value is taken into account, overriding
the maximum number of entries per micro-batch.
</li>
<li>This parameter has an effect only for forwarding strategies
`large-first` and `proportional`.</li>
</td>
</tr>


</table>

#### Authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@ package org.apache.spark.sql.pulsar

import java.{util => ju}
import java.io.Closeable
import java.util.{Optional, UUID}
import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException}
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient, SubscriptionInitialPosition, SubscriptionType}
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient}
import org.apache.pulsar.client.impl.schema.BytesSchema
import org.apache.pulsar.client.internal.DefaultImplementation
import org.apache.pulsar.common.naming.TopicName
import org.apache.pulsar.common.schema.SchemaInfo

import org.apache.spark.internal.Logging
import org.apache.spark.sql.pulsar.PulsarOptions.{AUTH_PARAMS, AUTH_PLUGIN_CLASS_NAME, TLS_ALLOW_INSECURE_CONNECTION, TLS_HOSTNAME_VERIFICATION_ENABLE, TLS_TRUST_CERTS_FILE_PATH, TOPIC_OPTION_KEYS}
import org.apache.spark.sql.pulsar.PulsarOptions._
import org.apache.spark.sql.pulsar.topicinternalstats.forward._
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -205,6 +207,82 @@ private[pulsar] case class PulsarMetadataReader(
}.toMap)
}


def forwardOffset(actualOffset: Map[String, MessageId],
strategy: String,
numberOfEntriesToForward: Long,
ensureEntriesPerTopic: Long): SpecificPulsarOffset = {
getTopicPartitions()

// Collect internal stats for all topics
val topicStats = topicPartitions.map( topic => {
val internalStats = admin.topics().getInternalStats(topic)
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
topic -> TopicState(internalStats,
PulsarSourceUtils.getLedgerId(topicActualMessageId),
PulsarSourceUtils.getEntryId(topicActualMessageId))
} ).toMap

val forwarder = strategy match {
case PulsarOptions.ProportionalForwardStrategy =>
new ProportionalForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
case PulsarOptions.LargeFirstForwardStrategy =>
new LargeFirstForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
case _ =>
new LinearForwardStrategy(numberOfEntriesToForward)
}

SpecificPulsarOffset(topicPartitions.map { topic =>
topic -> PulsarSourceUtils.seekableLatestMid {
// Fetch actual offset for topic
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
try {
// Get the actual ledger
val actualLedgerId = PulsarSourceUtils.getLedgerId(topicActualMessageId)
// Get the actual entry ID
val actualEntryId = PulsarSourceUtils.getEntryId(topicActualMessageId)
// Get the partition index
val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId)
// Cache topic internal stats
val internalStats = topicStats.get(topic).get.internalStat
// Calculate the amount of messages we will pull in
val numberOfEntriesPerTopic = forwarder.forward(topicStats)(topic)
// Get a future message ID which corresponds
// to the maximum number of messages
val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId(
internalStats,
actualLedgerId,
actualEntryId,
numberOfEntriesPerTopic)
// Build a message id
val forwardedMessageId =
DefaultImplementation.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
// Log state
val forwardedEntry = TopicInternalStatsUtils.numOfEntriesUntil(
internalStats, nextLedgerId, nextEntryId)
val entryCount = internalStats.numberOfEntries
val progress = f"${forwardedEntry.toFloat / entryCount.toFloat}%1.3f"
val logMessage = s"Pulsar Connector forward on topic. " +
s"[$numberOfEntriesPerTopic/$numberOfEntriesToForward]" +
s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " +
s"$forwardedMessageId ($forwardedEntry/$entryCount) [$progress]"
log.debug(logMessage)
// Return the message ID
forwardedMessageId
} catch {
case e: PulsarAdminException if e.getStatusCode == 404 =>
MessageId.earliest
case e: Throwable =>
throw new RuntimeException(
s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " +
s"(tried to forward ${forwarder.forward(topicStats)(topic)} messages " +
s"starting from `$topicActualMessageId` using strategy $strategy)", e)
}

}
}.toMap)
}

def fetchLatestOffsetForTopic(topic: String): MessageId = {
PulsarSourceUtils.seekableLatestMid( try {
admin.topics().getLastMessageId(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ private[pulsar] object PulsarOptions {
val TOPIC_MULTI = "topics"
val TOPIC_PATTERN = "topicspattern"

val MaxEntriesPerTrigger = "maxentriespertrigger"
val EnsureEntriesPerTopic = "ensureentriespertopic"
val ForwardStrategy = "forwardstrategy"
val ProportionalForwardStrategy = "proportional"
val LargeFirstForwardStrategy = "large-first"

val PARTITION_SUFFIX = TopicName.PARTITIONED_TOPIC_SUFFIX

val TOPIC_OPTION_KEYS = Set(
Expand Down
14 changes: 13 additions & 1 deletion src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ private[pulsar] class PulsarProvider
pollTimeoutMs(caseInsensitiveParams),
failOnDataLoss(caseInsensitiveParams),
subscriptionNamePrefix,
jsonOptions
jsonOptions,
maxEntriesPerTrigger(caseInsensitiveParams),
minEntriesPerTopic(caseInsensitiveParams),
forwardStrategy(caseInsensitiveParams)
)
}

Expand Down Expand Up @@ -365,6 +368,15 @@ private[pulsar] object PulsarProvider extends Logging {
(SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString)
.toInt

private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
caseInsensitiveParams.getOrElse(MaxEntriesPerTrigger, "-1").toLong

private def minEntriesPerTopic(caseInsensitiveParams: Map[String, String]): Long =
caseInsensitiveParams.getOrElse(EnsureEntriesPerTopic, "0").toLong

private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String =
caseInsensitiveParams.getOrElse(ForwardStrategy, "simple")

private def validateGeneralOptions(
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
if (!caseInsensitiveParams.contains(SERVICE_URL_OPTION_KEY)) {
Expand Down
28 changes: 19 additions & 9 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ private[pulsar] class PulsarSource(
pollTimeoutMs: Int,
failOnDataLoss: Boolean,
subscriptionNamePrefix: String,
jsonOptions: JSONOptionsInRead)
jsonOptions: JSONOptionsInRead,
maxEntriesPerTrigger: Long,
ensureEntriesPerTopic: Long,
forwardStrategy: String)
extends Source
with Logging {

Expand All @@ -63,12 +66,21 @@ private[pulsar] class PulsarSource(
override def schema(): StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)

override def getOffset: Option[Offset] = {
// Make sure initialTopicOffsets is initialized
initialTopicOffsets
val latest = metadataReader.fetchLatestOffsets()
currentTopicOffsets = Some(latest.topicOffsets)
logDebug(s"GetOffset: ${latest.topicOffsets.toSeq.map(_.toString).sorted}")
Some(latest.asInstanceOf[Offset])
val nextOffsets = if (maxEntriesPerTrigger == -1) {
metadataReader.fetchLatestOffsets()
} else {
currentTopicOffsets match {
case Some(value) =>
metadataReader.forwardOffset(value,
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
case _ =>
metadataReader.forwardOffset(initialTopicOffsets.topicOffsets,
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
}
}
logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}")
Some(nextOffsets.asInstanceOf[Offset])
}

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
Expand All @@ -78,9 +90,7 @@ private[pulsar] class PulsarSource(
logInfo(s"getBatch called with start = $start, end = $end")
val endTopicOffsets = SpecificPulsarOffset.getTopicOffsets(end)

if (currentTopicOffsets.isEmpty) {
currentTopicOffsets = Some(endTopicOffsets)
}
currentTopicOffsets = Some(endTopicOffsets)

if (start.isDefined && start.get == end) {
return sqlContext.internalCreateDataFrame(
Expand Down
30 changes: 30 additions & 0 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,36 @@ private[pulsar] object PulsarSourceUtils extends Logging {
}
}

def getLedgerId(mid: MessageId): Long = {
mid match {
case bmid: BatchMessageIdImpl =>
bmid.getLedgerId
case midi: MessageIdImpl => midi.getLedgerId
case t: TopicMessageIdImpl => getLedgerId(t.getInnerMessageId)
case up: UserProvidedMessageId => up.getLedgerId
}
}

def getEntryId(mid: MessageId): Long = {
mid match {
case bmid: BatchMessageIdImpl =>
bmid.getEntryId
case midi: MessageIdImpl => midi.getEntryId
case t: TopicMessageIdImpl => getEntryId(t.getInnerMessageId)
case up: UserProvidedMessageId => up.getEntryId
}
}

def getPartitionIndex(mid: MessageId): Int = {
mid match {
case bmid: BatchMessageIdImpl =>
bmid.getPartitionIndex
case midi: MessageIdImpl => midi.getPartitionIndex
case t: TopicMessageIdImpl => getPartitionIndex(t.getInnerMessageId)
case up: UserProvidedMessageId => up.getPartitionIndex
}
}

def seekableLatestMid(mid: MessageId): MessageId = {
if (messageExists(mid)) mid else MessageId.earliest
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.pulsar.topicinternalstats.forward

import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats

trait ForwardStrategy {
def forward(topics: Map[String, TopicState]): Map[String, Long]
}

case class TopicState(internalStat: PersistentTopicInternalStats,
actualLedgerId: Long,
actualEntryId: Long)
Loading

0 comments on commit b9da6dc

Please sign in to comment.