Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration skeleton of parallel-executor #2617

Merged
merged 8 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2643](https://github.com/FuelLabs/fuel-core/pull/2643): Before this fix when tip is zero, transactions that use 30M have the same priority as transactions with 1M gas. Now they are correctly ordered.

### Added
- [2617](https://github.com/FuelLabs/fuel-core/pull/2617): Add integration skeleton of parallel-executor.
- [2553](https://github.com/FuelLabs/fuel-core/pull/2553): Scaffold global merkle root storage crate.
- [2598](https://github.com/FuelLabs/fuel-core/pull/2598): Add initial test suite for global merkle root storage updates.

Expand Down
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"crates/services/gas_price_service",
"crates/services/importer",
"crates/services/p2p",
"crates/services/parallel-executor",
"crates/services/producer",
"crates/services/relayer",
"crates/services/shared-sequencer",
Expand Down Expand Up @@ -81,6 +82,7 @@ fuel-core-executor = { version = "0.41.4", path = "./crates/services/executor",
fuel-core-importer = { version = "0.41.4", path = "./crates/services/importer" }
fuel-core-gas-price-service = { version = "0.41.4", path = "crates/services/gas_price_service" }
fuel-core-p2p = { version = "0.41.4", path = "./crates/services/p2p" }
fuel-core-parallel-executor = { version = "0.41.4", path = "./crates/services/parallel-executor" }
fuel-core-producer = { version = "0.41.4", path = "./crates/services/producer" }
fuel-core-relayer = { version = "0.41.4", path = "./crates/services/relayer" }
fuel-core-sync = { version = "0.41.4", path = "./crates/services/sync" }
Expand Down
1 change: 1 addition & 0 deletions bin/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ production = [
"aws-kms",
]
fault-proving = ["fuel-core-compression/fault-proving"]
parallel-executor = ["fuel-core/parallel-executor"]
12 changes: 12 additions & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ use url::Url;
#[cfg(feature = "rocksdb")]
use fuel_core::state::historical_rocksdb::StateRewindPolicy;

#[cfg(feature = "parallel-executor")]
use std::num::NonZeroUsize;

#[cfg(feature = "p2p")]
mod p2p;

Expand Down Expand Up @@ -196,6 +199,11 @@ pub struct Command {
#[arg(long = "native-executor-version", env)]
pub native_executor_version: Option<StateTransitionBytecodeVersion>,

/// Number of cores to use for the parallel executor.
#[cfg(feature = "parallel-executor")]
#[arg(long = "executor-number-of-cores", env, default_value = "1")]
rymnc marked this conversation as resolved.
Show resolved Hide resolved
pub executor_number_of_cores: NonZeroUsize,

/// The starting execution gas price for the network
#[cfg_attr(
feature = "production",
Expand Down Expand Up @@ -372,6 +380,8 @@ impl Command {
debug,
utxo_validation,
native_executor_version,
#[cfg(feature = "parallel-executor")]
executor_number_of_cores,
starting_gas_price,
gas_price_change_percent,
min_gas_price,
Expand Down Expand Up @@ -664,6 +674,8 @@ impl Command {
native_executor_version,
continue_on_error,
utxo_validation,
#[cfg(feature = "parallel-executor")]
executor_number_of_cores,
block_production: trigger,
predefined_blocks_path,
vm: VMConfig {
Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ fuel-core-gas-price-service = { workspace = true }
fuel-core-importer = { workspace = true }
fuel-core-metrics = { workspace = true }
fuel-core-p2p = { workspace = true, optional = true }
fuel-core-parallel-executor = { workspace = true, optional = true }
fuel-core-poa = { workspace = true }
fuel-core-producer = { workspace = true }
fuel-core-relayer = { workspace = true, optional = true }
Expand Down Expand Up @@ -118,3 +119,4 @@ test-helpers = [
rocksdb-production = ["rocksdb", "rocksdb/jemalloc"]
wasm-executor = ["fuel-core-upgradable-executor/wasm-executor"]
fault-proving = ["fuel-core-compression/fault-proving"]
parallel-executor = ["fuel-core-parallel-executor"]
3 changes: 3 additions & 0 deletions crates/fuel-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub use fuel_core_chain_config as chain_config;
#[cfg(feature = "p2p")]
#[doc(no_inline)]
pub use fuel_core_p2p as p2p;
#[cfg(feature = "parallel-executor")]
#[doc(no_inline)]
pub use fuel_core_parallel_executor as parallel_executor;
#[doc(no_inline)]
pub use fuel_core_producer as producer;
#[cfg(feature = "relayer")]
Expand Down
6 changes: 6 additions & 0 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use fuel_core_consensus_module::{
use fuel_core_executor::executor::OnceTransactionsSource;
use fuel_core_gas_price_service::v1::service::LatestGasPrice;
use fuel_core_importer::ImporterResult;
// #[cfg(feature = "parallel-executor")]
// use fuel_core_parallel_executor::executor::Executor;
use fuel_core_poa::ports::BlockSigner;
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::transactional::Changes;
Expand Down Expand Up @@ -49,6 +51,7 @@ use fuel_core_types::{
signer::SignMode,
tai64::Tai64,
};
//#[cfg(not(feature = "parallel-executor"))]
use fuel_core_upgradable_executor::executor::Executor;
use std::sync::Arc;

Expand Down Expand Up @@ -346,6 +349,9 @@ impl ExecutorAdapter {
pub fn new(
database: Database,
relayer_database: Database<Relayer>,
// #[cfg(feature = "parallel-executor")]
// config: fuel_core_parallel_executor::config::Config,
// #[cfg(not(feature = "parallel-executor"))]
config: fuel_core_upgradable_executor::config::Config,
) -> Self {
let executor = Executor::new(database, relayer_database, config);
Expand Down
7 changes: 7 additions & 0 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ use crate::{
},
};

#[cfg(feature = "parallel-executor")]
use std::num::NonZeroUsize;

#[derive(Clone, Debug)]
pub struct Config {
pub graphql_config: GraphQLConfig,
Expand All @@ -54,6 +57,8 @@ pub struct Config {
// default to false until downstream consumers stabilize
pub utxo_validation: bool,
pub native_executor_version: Option<StateTransitionBytecodeVersion>,
#[cfg(feature = "parallel-executor")]
pub executor_number_of_cores: NonZeroUsize,
pub block_production: Trigger,
pub predefined_blocks_path: Option<PathBuf>,
pub vm: VMConfig,
Expand Down Expand Up @@ -175,6 +180,8 @@ impl Config {
debug: true,
utxo_validation,
native_executor_version: Some(native_executor_version),
#[cfg(feature = "parallel-executor")]
executor_number_of_cores: NonZeroUsize::new(1).expect("1 is not zero"),
snapshot_reader,
block_production: Trigger::Instant,
predefined_blocks_path: None,
Expand Down
17 changes: 12 additions & 5 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,21 @@ pub fn init_sub_services(

let last_height = *last_block_header.height();

let upgradable_executor_config = fuel_core_upgradable_executor::config::Config {
backtrace: config.vm.backtrace,
utxo_validation_default: config.utxo_validation,
native_executor_version: config.native_executor_version,
};
let executor = ExecutorAdapter::new(
database.on_chain().clone(),
database.relayer().clone(),
fuel_core_upgradable_executor::config::Config {
backtrace: config.vm.backtrace,
utxo_validation_default: config.utxo_validation,
native_executor_version: config.native_executor_version,
},
// #[cfg(not(feature = "parallel-executor"))]
upgradable_executor_config,
// #[cfg(feature = "parallel-executor")]
// fuel_core_parallel_executor::config::Config {
// number_of_cores: config.executor_number_of_cores,
// executor_config: upgradable_executor_config,
// },
);
let import_result_provider =
ImportResultProvider::new(database.on_chain().clone(), executor.clone());
Expand Down
19 changes: 19 additions & 0 deletions crates/services/parallel-executor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "fuel-core-parallel-executor"
version = { workspace = true }
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = ["blockchain", "fuel", "fuel-vm", "parallel"]
license = { workspace = true }
repository = { workspace = true }
description = "Fuel Block Parallel Executor"

[dependencies]
fuel-core-storage = { workspace = true, features = ["std"] }
fuel-core-types = { workspace = true, features = ["std"] }
fuel-core-upgradable-executor = { workspace = true, features = ["std"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }

[features]
wasm-executor = ["fuel-core-upgradable-executor/wasm-executor"]
19 changes: 19 additions & 0 deletions crates/services/parallel-executor/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use fuel_core_upgradable_executor::config::Config as ExecutorConfig;
use std::num::NonZeroUsize;

#[derive(Clone, Debug)]
pub struct Config {
/// The number of cores to use for the block execution.
pub number_of_cores: NonZeroUsize,
/// See [`fuel_core_upgradable_executor::config::Config`].
pub executor_config: ExecutorConfig,
}

impl Default for Config {
fn default() -> Self {
Self {
number_of_cores: NonZeroUsize::new(1).expect("The value is not zero; qed"),
executor_config: Default::default(),
}
}
}
114 changes: 114 additions & 0 deletions crates/services/parallel-executor/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use crate::config::Config;
use fuel_core_storage::transactional::Changes;
use fuel_core_types::{
blockchain::block::Block,
fuel_tx::Transaction,
services::{
block_producer::Components,
executor::{
ExecutionResult,
Result as ExecutorResult,
TransactionExecutionStatus,
ValidationResult,
},
Uncommitted,
},
};
use fuel_core_upgradable_executor::{
executor::Executor as UpgradableExecutor,
native_executor::ports::TransactionsSource,
};
use std::{
num::NonZeroUsize,
sync::{
Arc,
RwLock,
},
};
use tokio::runtime::Runtime;

#[cfg(feature = "wasm-executor")]
use fuel_core_upgradable_executor::error::UpgradableError;

#[cfg(feature = "wasm-executor")]
use fuel_core_types::fuel_merkle::common::Bytes32;

pub struct Executor<S, R> {
_executor: Arc<RwLock<UpgradableExecutor<S, R>>>,
runtime: Option<Runtime>,
_number_of_cores: NonZeroUsize,
}

// Shutdown the tokio runtime to avoid panic if executor is already
// used from another tokio runtime
impl<S, R> Drop for Executor<S, R> {
fn drop(&mut self) {
if let Some(runtime) = self.runtime.take() {
runtime.shutdown_background();
}
}
}

impl<S, R> Executor<S, R> {
pub fn new(
storage_view_provider: S,
relayer_view_provider: R,
config: Config,
) -> Self {
let executor = UpgradableExecutor::new(
storage_view_provider,
relayer_view_provider,
config.executor_config,
);
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.number_of_cores.get())
.enable_all()
.build()
.unwrap();
let number_of_cores = config.number_of_cores;

Self {
_executor: Arc::new(RwLock::new(executor)),
runtime: Some(runtime),
_number_of_cores: number_of_cores,
}
}
}

impl<S, R> Executor<S, R> {
/// Produces the block and returns the result of the execution without committing the changes.
pub fn produce_without_commit_with_source<TxSource>(
&self,
_components: Components<TxSource>,
) -> ExecutorResult<Uncommitted<ExecutionResult, Changes>>
where
TxSource: TransactionsSource + Send + Sync + 'static,
{
unimplemented!("Not implemented yet");
}

pub fn validate(
&self,
_block: &Block,
) -> ExecutorResult<Uncommitted<ValidationResult, Changes>> {
unimplemented!("Not implemented yet");
}

#[cfg(feature = "wasm-executor")]
pub fn validate_uploaded_wasm(
&self,
_wasm_root: &Bytes32,
) -> Result<(), UpgradableError> {
unimplemented!("Not implemented yet");
}

/// Executes the block and returns the result of the execution without committing
/// the changes in the dry run mode.
pub fn dry_run(
&self,
_component: Components<Vec<Transaction>>,
_utxo_validation: Option<bool>,
) -> ExecutorResult<Vec<TransactionExecutionStatus>> {
unimplemented!("Not implemented yet");
}
}
2 changes: 2 additions & 0 deletions crates/services/parallel-executor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod config;
pub mod executor;
2 changes: 2 additions & 0 deletions crates/services/upgradable-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub mod config;
pub mod error;
pub mod executor;

pub use fuel_core_executor as native_executor;

#[cfg(feature = "wasm-executor")]
pub mod instance;

Expand Down
Loading