StateStore
is the contract of a versioned and fault-tolerant key-value store for persisting state of running aggregates across streaming batches (for aggregate operations on streaming Datasets).
Tip
|
Read the motivation and design in State Store for Streaming Aggregations. |
StateStore
describes a key-value store that lives on every executor (across the nodes in a Spark cluster) for persistent keyed aggregates.
StateStore
is identified with the aggregating operator id and the partition id.
package org.apache.spark.sql.execution.streaming.state
trait StateStore {
def abort(): Unit
def commit(): Long
def get(key: UnsafeRow): UnsafeRow
def getRange(start: Option[UnsafeRow], end: Option[UnsafeRow]): Iterator[UnsafeRowPair]
def hasCommitted: Boolean
def id: StateStoreId
def iterator(): Iterator[UnsafeRowPair]
def metrics: StateStoreMetrics
def put(key: UnsafeRow, value: UnsafeRow): Unit
def remove(key: UnsafeRow): Unit
def version: Long
}
Method | Description | ||
---|---|---|---|
Used exclusively when |
|||
Stores a value for a non-null key (both of Used when:
|
|||
Name | Description |
---|---|
Registry of StateStoreProviders per Used in…FIXME |
|
StateStoreCoordinatorRef (a Used in…FIXME |
Note
|
StateStore was introduced in [SPARK-13809][SQL] State store for streaming aggregations.
|
reportActiveStoreInstance(storeProviderId: StateStoreProviderId): Unit
reportActiveStoreInstance
takes the current host and executorId (from BlockManager
) and requests StateStoreCoordinatorRef
to reportActiveInstance.
Note
|
reportActiveStoreInstance uses SparkEnv to access the current BlockManager .
|
You should see the following INFO message in the logs:
Reported that the loaded instance [storeProviderId] is active
Note
|
reportActiveStoreInstance is used exclusively when StateStore is requested to find the StateStore by StateStoreProviderId.
|
get(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
valueSchema: StructType,
indexOrdinal: Option[Int],
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStore
get
finds StateStore
for StateStoreProviderId
.
Internally, get
looks up the StateStoreProvider
(for storeProviderId
) in loadedProviders registry. If unavailable, get
creates and initializes one.
get
will also start the periodic maintenance task (unless already started) and announce the new StateStoreProvider.
In the end, get
gets the StateStore
(for the version
).
Note
|
get is used exclusively when StateStoreRDD is computed.
|
Starting Periodic Maintenance Task (Unless Already Started) — startMaintenanceIfNeeded
Internal Method
startMaintenanceIfNeeded(): Unit
startMaintenanceIfNeeded
schedules MaintenanceTask to start after and every spark.sql.streaming.stateStore.maintenanceInterval (defaults to 60s
).
Note
|
startMaintenanceIfNeeded does nothing when the maintenance task has already been started and is still running.
|
Note
|
startMaintenanceIfNeeded is used exclusively when StateStore is requested to find the StateStore by StateStoreProviderId.
|
MaintenanceTask
is a daemon thread that triggers maintenance work of every registered StateStoreProvider.
When an error occurs, MaintenanceTask
clears loadedProviders registry.
MaintenanceTask
is scheduled on state-store-maintenance-task thread pool.
Note
|
Use spark.sql.streaming.stateStore.maintenanceInterval Spark property (default: 60s ) to control the initial delay and how often the thread should be executed.
|
doMaintenance(): Unit
Internally, doMaintenance
prints the following DEBUG message to the logs:
DEBUG Doing maintenance
doMaintenance
then requests every StateStoreProvider (registered in loadedProviders) to do its own internal maintenance (only when a StateStoreProvider
is still active).
When a StateStoreProvider
is inactive, doMaintenance
removes it from the provider registry and prints the following INFO message to the logs:
INFO Unloaded [provider]
Note
|
doMaintenance is used exclusively in MaintenanceTask daemon thread.
|