Skip to content

Latest commit

 

History

History
206 lines (136 loc) · 7.28 KB

spark-sql-streaming-StateStore.adoc

File metadata and controls

206 lines (136 loc) · 7.28 KB

StateStore — Streaming Aggregation State Management

StateStore is the contract of a versioned and fault-tolerant key-value store for persisting state of running aggregates across streaming batches (for aggregate operations on streaming Datasets).

Tip
Read the motivation and design in State Store for Streaming Aggregations.

StateStore describes a key-value store that lives on every executor (across the nodes in a Spark cluster) for persistent keyed aggregates.

StateStore is identified with the aggregating operator id and the partition id.

package org.apache.spark.sql.execution.streaming.state

trait StateStore {
  def abort(): Unit
  def commit(): Long
  def get(key: UnsafeRow): UnsafeRow
  def getRange(start: Option[UnsafeRow], end: Option[UnsafeRow]): Iterator[UnsafeRowPair]
  def hasCommitted: Boolean
  def id: StateStoreId
  def iterator(): Iterator[UnsafeRowPair]
  def metrics: StateStoreMetrics
  def put(key: UnsafeRow, value: UnsafeRow): Unit
  def remove(key: UnsafeRow): Unit
  def version: Long
}
Table 1. StateStore Contract
Method Description

abort

commit

get

Used exclusively when StateStoreRDD is executed.

getRange

hasCommitted

id

iterator

metrics

put

Stores a value for a non-null key (both of UnsafeRow type)

Used when:

  • StateStoreSaveExec is executed (and…​FIXME)

  • StreamingDeduplicateExec is executed (and…​FIXME)

  • StateStoreUpdater attempts to write the current state when rows are processed (which is when their iterator is fully consumed).

Caution
FIXME Review StateStoreUpdater.callFunctionAndUpdateState

remove

version

Table 2. StateStore’s Internal Registries and Counters
Name Description

loadedProviders

Registry of StateStoreProviders per StateStoreProviderId

Used in…​FIXME

_coordRef

StateStoreCoordinatorRef (a RpcEndpointRef to StateStoreCoordinator).

Used in…​FIXME

Creating StateStoreCoordinatorRef (for Executors) — coordinatorRef Internal Method

Caution
FIXME

Removing StateStoreProvider From Provider Registry — unload Internal Method

Caution
FIXME

verifyIfStoreInstanceActive Internal Method

Caution
FIXME

Announcing New StateStoreProvider — reportActiveStoreInstance Internal Method

reportActiveStoreInstance(storeProviderId: StateStoreProviderId): Unit

reportActiveStoreInstance takes the current host and executorId (from BlockManager) and requests StateStoreCoordinatorRef to reportActiveInstance.

Note
reportActiveStoreInstance uses SparkEnv to access the current BlockManager.

You should see the following INFO message in the logs:

Reported that the loaded instance [storeProviderId] is active
Note
reportActiveStoreInstance is used exclusively when StateStore is requested to find the StateStore by StateStoreProviderId.

numKeys Method

Caution
FIXME

Finding StateStore by StateStoreProviderId — get Method

get(
  storeProviderId: StateStoreProviderId,
  keySchema: StructType,
  valueSchema: StructType,
  indexOrdinal: Option[Int],
  version: Long,
  storeConf: StateStoreConf,
  hadoopConf: Configuration): StateStore

get finds StateStore for StateStoreProviderId.

Internally, get looks up the StateStoreProvider (for storeProviderId) in loadedProviders registry. If unavailable, get creates and initializes one.

get will also start the periodic maintenance task (unless already started) and announce the new StateStoreProvider.

In the end, get gets the StateStore (for the version).

Note
get is used exclusively when StateStoreRDD is computed.

Starting Periodic Maintenance Task (Unless Already Started) — startMaintenanceIfNeeded Internal Method

startMaintenanceIfNeeded(): Unit

startMaintenanceIfNeeded schedules MaintenanceTask to start after and every spark.sql.streaming.stateStore.maintenanceInterval (defaults to 60s).

Note
startMaintenanceIfNeeded does nothing when the maintenance task has already been started and is still running.
Note
startMaintenanceIfNeeded is used exclusively when StateStore is requested to find the StateStore by StateStoreProviderId.

MaintenanceTask Daemon Thread

MaintenanceTask is a daemon thread that triggers maintenance work of every registered StateStoreProvider.

When an error occurs, MaintenanceTask clears loadedProviders registry.

MaintenanceTask is scheduled on state-store-maintenance-task thread pool.

Note
Use spark.sql.streaming.stateStore.maintenanceInterval Spark property (default: 60s) to control the initial delay and how often the thread should be executed.

Triggering Maintenance of Registered StateStoreProviders — doMaintenance Internal Method

doMaintenance(): Unit

Internally, doMaintenance prints the following DEBUG message to the logs:

DEBUG Doing maintenance

doMaintenance then requests every StateStoreProvider (registered in loadedProviders) to do its own internal maintenance (only when a StateStoreProvider is still active).

When a StateStoreProvider is inactive, doMaintenance removes it from the provider registry and prints the following INFO message to the logs:

INFO Unloaded [provider]
Note
doMaintenance is used exclusively in MaintenanceTask daemon thread.