From a891936ac95cb2e814bfed3945551489a00bde72 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 24 May 2021 16:59:58 +0800 Subject: [PATCH] use bytes for protobuf (#438) If `protobuf-codec` is enabled, `bytes` will be used to replace `Vec` for protobuf `bytes` type. However if `prost-codec` is enabled, `Vec` will be still used. Signed-off-by: qupeng --- Cargo.toml | 3 +- benches/suites/raw_node.rs | 6 +-- harness/tests/integration_cases/test_raft.rs | 42 +++++++++---------- .../tests/integration_cases/test_raw_node.rs | 8 ++-- harness/tests/test_util/mod.rs | 2 +- proto/Cargo.toml | 3 +- proto/build.rs | 1 + proto/proto/eraftpb.proto | 3 ++ src/lib.rs | 5 ++- src/raft.rs | 28 ++++++++----- src/raw_node.rs | 10 ++--- src/read_only.rs | 2 +- src/util.rs | 2 +- 13 files changed, 65 insertions(+), 50 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f50a830..bfddae3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,12 +19,13 @@ members = ["proto", "harness", "datadriven"] default = ["protobuf-codec", "default-logger"] # Enable failpoints failpoints = ["fail/failpoints"] -protobuf-codec = ["raft-proto/protobuf-codec"] +protobuf-codec = ["raft-proto/protobuf-codec", "bytes"] prost-codec = ["raft-proto/prost-codec"] default-logger = ["slog-stdlog", "slog-envlogger", "slog-term"] # Make sure to synchronize updates with Harness. [dependencies] +bytes = { version = "1", optional = true } fxhash = "0.2.1" fail = { version = "0.3", optional = true } getset = "0.0.9" diff --git a/benches/suites/raw_node.rs b/benches/suites/raw_node.rs index 259bae8..a9e7c44 100644 --- a/benches/suites/raw_node.rs +++ b/benches/suites/raw_node.rs @@ -115,8 +115,8 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode { let mut entries = vec![]; for i in 1..101 { let mut e = Entry::default(); - e.data = vec![0; 32 * 1024]; - e.context = vec![]; + e.data = vec![0; 32 * 1024].into(); + e.context = vec![].into(); e.index = i; e.term = 1; entries.push(e); @@ -129,7 +129,7 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode { node.raft.on_persist_entries(101, 1); let mut snap = Snapshot::default(); - snap.set_data(vec![0; 8 * 1024 * 1024]); + snap.set_data(vec![0; 8 * 1024 * 1024].into()); // We don't care about the contents in snapshot here since it won't be applied. snap.set_metadata(SnapshotMetadata::default()); for _ in 0..100 { diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 18a9bbc..a7af2f7 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -137,7 +137,7 @@ fn test_progress_committed_index() { // #1 test append entries // append entries between 1 and 2 let mut test_entries = Entry::default(); - test_entries.data = b"testdata".to_vec(); + test_entries.data = (b"testdata" as &'static [u8]).into(); let m = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]); nt.cut(1, 3); nt.send(vec![m.clone(), m]); @@ -356,7 +356,7 @@ fn test_progress_paused() { m.to = 1; m.set_msg_type(MessageType::MsgPropose); let mut e = Entry::default(); - e.data = b"some_data".to_vec(); + e.data = (b"some_data" as &'static [u8]).into(); m.entries = vec![e].into(); raft.step(m.clone()).expect(""); raft.step(m.clone()).expect(""); @@ -1471,7 +1471,7 @@ fn test_raft_frees_read_only_mem() { // acknowledge the authority of the leader. // more info: raft dissertation 6.4, step 3. let mut m = new_message(2, 1, MessageType::MsgHeartbeatResponse, 0); - m.context = vec_ctx.clone(); + m.context = vec_ctx.clone().into(); sm.step(m).expect(""); assert_eq!(sm.read_only.read_index_queue.len(), 0); assert_eq!(sm.read_only.pending_read_index.len(), 0); @@ -3304,7 +3304,7 @@ fn test_commit_after_remove_node() -> Result<()> { cc.set_change_type(ConfChangeType::RemoveNode); cc.node_id = 2; let ccdata = cc.write_to_bytes().unwrap(); - entry.data = ccdata; + entry.data = ccdata.into(); msg.mut_entries().push(entry); r.step(msg).expect(""); // Stabilize the log and make sure nothing is committed yet. @@ -3334,7 +3334,7 @@ fn test_commit_after_remove_node() -> Result<()> { let ents = next_ents(&mut r, &s); assert_eq!(ents.len(), 1); assert_eq!(ents[0].get_entry_type(), EntryType::EntryNormal); - assert_eq!(ents[0].data, b"hello"); + assert_eq!(ents[0].data.as_ref(), b"hello"); Ok(()) } @@ -4450,7 +4450,7 @@ fn test_conf_change_check_before_campaign() { let mut cc = ConfChange::default(); cc.set_change_type(ConfChangeType::RemoveNode); cc.node_id = 3; - e.data = protobuf::Message::write_to_bytes(&cc).unwrap(); + e.data = protobuf::Message::write_to_bytes(&cc).unwrap().into(); m.mut_entries().push(e); nt.send(vec![m]); @@ -4532,10 +4532,10 @@ fn test_advance_commit_index_by_vote_request(use_prevote: bool) { let mut e = Entry::default(); if let Some(v1) = cc.as_v1() { e.set_entry_type(EntryType::EntryConfChange); - e.set_data(v1.write_to_bytes().unwrap()); + e.set_data(v1.write_to_bytes().unwrap().into()); } else { e.set_entry_type(EntryType::EntryConfChangeV2); - e.set_data(cc.as_v2().write_to_bytes().unwrap()); + e.set_data(cc.as_v2().write_to_bytes().unwrap().into()); } // propose a confchange entry but don't let it commit @@ -4684,10 +4684,10 @@ fn test_advance_commit_index_by_vote_response(use_prevote: bool) { let mut e = Entry::default(); if let Some(v1) = cc.as_v1() { e.set_entry_type(EntryType::EntryConfChange); - e.set_data(v1.write_to_bytes().unwrap()); + e.set_data(v1.write_to_bytes().unwrap().into()); } else { e.set_entry_type(EntryType::EntryConfChangeV2); - e.set_data(cc.as_v2().write_to_bytes().unwrap()); + e.set_data(cc.as_v2().write_to_bytes().unwrap().into()); } // propose a confchange entry but don't let it commit @@ -4822,7 +4822,7 @@ fn prepare_request_snapshot() -> (Network, Snapshot) { nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); let mut test_entries = Entry::default(); - test_entries.data = b"testdata".to_vec(); + test_entries.data = (b"testdata" as &'static [u8]).into(); let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]); nt.send(vec![msg.clone(), msg]); assert_eq!(nt.peers[&1].raft_log.committed, 14); @@ -4841,7 +4841,7 @@ fn prepare_request_snapshot() -> (Network, Snapshot) { // Commit a new raft log. let mut test_entries = Entry::default(); - test_entries.data = b"testdata".to_vec(); + test_entries.data = (b"testdata" as &'static [u8]).into(); let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]); nt.send(vec![msg]); @@ -4877,7 +4877,7 @@ fn test_follower_request_snapshot() { // New proposes can not be replicated to peer 2. let mut test_entries = Entry::default(); - test_entries.data = b"testdata".to_vec(); + test_entries.data = (b"testdata" as &'static [u8]).into(); let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]); nt.send(vec![msg.clone()]); assert_eq!(nt.peers[&1].raft_log.committed, 16); @@ -5032,7 +5032,7 @@ fn test_request_snapshot_step_down() { // Commit a new entry and leader steps down while peer 2 is isolated. nt.isolate(2); let mut test_entries = Entry::default(); - test_entries.data = b"testdata".to_vec(); + test_entries.data = (b"testdata" as &'static [u8]).into(); let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]); nt.send(vec![msg]); nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]); @@ -5393,7 +5393,7 @@ fn test_read_when_quorum_becomes_less() { m.to = 1; m.set_msg_type(MessageType::MsgReadIndex); let mut e = Entry::default(); - e.data = b"abcdefg".to_vec(); + e.data = (b"abcdefg" as &'static [u8]).into(); m.set_entries(vec![e].into()); network.dispatch(vec![m]).unwrap(); @@ -5425,7 +5425,7 @@ fn test_uncommitted_entries_size_limit() { let mut nt = Network::new_with_config(vec![None, None, None], config, &l); let data = b"hello world!".to_vec(); let mut entry = Entry::default(); - entry.data = data.to_vec(); + entry.data = data.to_vec().into(); let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]); nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); @@ -5447,7 +5447,7 @@ fn test_uncommitted_entries_size_limit() { // after reduce, new proposal should be accepted let mut entry = Entry::default(); - entry.data = data; + entry.data = data.into(); entry.index = 3; nt.peers .get_mut(&1) @@ -5458,14 +5458,14 @@ fn test_uncommitted_entries_size_limit() { // a huge proposal should be accepted when there is no uncommitted entry, // even it's bigger than max_uncommitted_size let mut entry = Entry::default(); - entry.data = b"hello world and raft".to_vec(); + entry.data = (b"hello world and raft" as &'static [u8]).into(); let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]); let result = nt.dispatch(vec![long_msg].to_vec()); assert!(result.is_ok()); // but another huge one will be dropped let mut entry = Entry::default(); - entry.data = b"hello world and raft".to_vec(); + entry.data = (b"hello world and raft" as &'static [u8]).into(); let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]); let result = nt.dispatch(vec![long_msg].to_vec()); assert!(!result.is_ok()); @@ -5488,7 +5488,7 @@ fn test_uncommitted_entry_after_leader_election() { let mut nt = Network::new_with_config(vec![None, None, None, None, None], config, &l); let data = b"hello world!".to_vec(); let mut entry = Entry::default(); - entry.data = data; + entry.data = data.into(); let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]); nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); @@ -5524,7 +5524,7 @@ fn test_uncommitted_state_advance_ready_from_last_term() { let data = b"hello world!".to_vec(); let mut ent = Entry::default(); - ent.data = data.clone(); + ent.data = data.clone().into(); nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index 19d878a..8198740 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -123,7 +123,7 @@ fn test_raw_node_read_index_to_old_leader() { nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); let mut test_entries = Entry::default(); - test_entries.data = b"testdata".to_vec(); + test_entries.data = (b"testdata" as &'static [u8]).into(); // send readindex request to r2(follower) let _ = nt.peers.get_mut(&2).unwrap().step(new_message_with_entries( @@ -341,7 +341,7 @@ fn test_raw_node_propose_and_conf_change() { } context = b"manual".to_vec(); let mut cc = conf_change_v2(vec![]); - cc.set_context(context.clone()); + cc.set_context(context.clone().into()); raw_node.propose_conf_change(vec![], cc).unwrap(); rd = raw_node.ready(); } @@ -729,7 +729,7 @@ fn test_skip_bcast_commit() { // Without bcast commit, followers will not update its commit index immediately. let mut test_entries = Entry::default(); - test_entries.data = b"testdata".to_vec(); + test_entries.data = (b"testdata" as &'static [u8]).into(); let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]); nt.send(vec![msg.clone()]); assert_eq!(nt.peers[&1].raft_log.committed, 2); @@ -767,7 +767,7 @@ fn test_skip_bcast_commit() { let data = cc.write_to_bytes().unwrap(); let mut cc_entry = Entry::default(); cc_entry.set_entry_type(EntryType::EntryConfChange); - cc_entry.data = data; + cc_entry.data = data.into(); nt.send(vec![new_message_with_entries( 1, 1, diff --git a/harness/tests/test_util/mod.rs b/harness/tests/test_util/mod.rs index 9bdcff7..d7864ad 100644 --- a/harness/tests/test_util/mod.rs +++ b/harness/tests/test_util/mod.rs @@ -155,7 +155,7 @@ pub fn new_entry(term: u64, index: u64, data: Option<&str>) -> Entry { e.index = index; e.term = term; if let Some(d) = data { - e.data = d.as_bytes().to_vec(); + e.data = d.as_bytes().to_vec().into(); } e } diff --git a/proto/Cargo.toml b/proto/Cargo.toml index b7a3e76..8538bf6 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -14,13 +14,14 @@ build = "build.rs" [features] default = ["protobuf-codec"] -protobuf-codec = ["protobuf-build/protobuf-codec"] +protobuf-codec = ["protobuf-build/protobuf-codec", "bytes"] prost-codec = ["protobuf-build/prost-codec", "prost", "lazy_static"] [build-dependencies] protobuf-build = { version = "0.12", default-features = false } [dependencies] +bytes = { version = "1", optional = true } lazy_static = { version = "1", optional = true } prost = { version = "0.7", optional = true } protobuf = "2" diff --git a/proto/build.rs b/proto/build.rs index cdaf494..957137a 100644 --- a/proto/build.rs +++ b/proto/build.rs @@ -7,5 +7,6 @@ fn main() { Builder::new() .search_dir_for_protos(&format!("{}/proto", base)) .includes(&[format!("{}/include", base), format!("{}/proto", base)]) + .include_google_protos() .generate() } diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index 413e730..1f7c71b 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -1,6 +1,9 @@ syntax = "proto3"; package eraftpb; +import "rustproto.proto"; +option (rustproto.carllerche_bytes_for_bytes_all) = true; + enum EntryType { EntryNormal = 0; EntryConfChange = 1; diff --git a/src/lib.rs b/src/lib.rs index 9c4d4b8..5d8ed97 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -502,7 +502,10 @@ pub mod storage; mod tracker; pub mod util; -pub use crate::raft::{vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID, INVALID_INDEX}; +pub use crate::raft::{ + vote_resp_msg_type, Raft, SoftState, StateRole, CAMPAIGN_ELECTION, CAMPAIGN_PRE_ELECTION, + CAMPAIGN_TRANSFER, INVALID_ID, INVALID_INDEX, +}; pub use confchange::{Changer, MapChange}; pub use config::Config; pub use errors::{Error, Result, StorageError}; diff --git a/src/raft.rs b/src/raft.rs index 78a2ca8..7e1d608 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -44,12 +44,17 @@ use crate::{confchange, Progress, ProgressState, ProgressTracker}; // CAMPAIGN_PRE_ELECTION represents the first phase of a normal election when // Config.pre_vote is true. -const CAMPAIGN_PRE_ELECTION: &[u8] = b"CampaignPreElection"; +#[doc(hidden)] +pub const CAMPAIGN_PRE_ELECTION: &[u8] = b"CampaignPreElection"; +#[doc(hidden)] // CAMPAIGN_ELECTION represents a normal (time-based) election (the second phase // of the election when Config.pre_vote is true). -const CAMPAIGN_ELECTION: &[u8] = b"CampaignElection"; +#[doc(hidden)] +pub const CAMPAIGN_ELECTION: &[u8] = b"CampaignElection"; +#[doc(hidden)] // CAMPAIGN_TRANSFER represents the type of leader transfer. -const CAMPAIGN_TRANSFER: &[u8] = b"CampaignTransfer"; +#[doc(hidden)] +pub const CAMPAIGN_TRANSFER: &[u8] = b"CampaignTransfer"; /// The role of the node. #[derive(Debug, PartialEq, Clone, Copy)] @@ -824,7 +829,7 @@ impl RaftCore { let commit = cmp::min(pr.matched, self.raft_log.committed); m.commit = commit; if let Some(context) = ctx { - m.context = context; + m.context = context.into(); } self.send(m, msgs); } @@ -1196,10 +1201,11 @@ impl Raft { .count() } - /// Campaign to attempt to become a leader. - /// - /// If prevote is enabled, this is handled as well. - pub fn campaign(&mut self, campaign_type: &[u8]) { + // Campaign to attempt to become a leader. + // + // If prevote is enabled, this is handled as well. + #[doc(hidden)] + pub fn campaign(&mut self, campaign_type: &'static [u8]) { let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION { self.become_pre_candidate(); // Pre-vote RPCs are sent for next term before we've incremented self.term. @@ -1238,7 +1244,7 @@ impl Raft { m.commit = commit; m.commit_term = commit_term; if campaign_type == CAMPAIGN_TRANSFER { - m.context = campaign_type.to_vec(); + m.context = campaign_type.into(); } self.r.send(m, &mut self.msgs); } @@ -2325,7 +2331,7 @@ impl Raft { } let rs = ReadState { index: m.index, - request_ctx: m.take_entries()[0].take_data(), + request_ctx: m.take_entries()[0].take_data().to_vec(), }; self.read_states.push(rs); // `index` and `term` in MsgReadIndexResp is the leader's commit index and its current term, @@ -2776,7 +2782,7 @@ impl Raft { if req.from == INVALID_ID || req.from == self.id { let rs = ReadState { index, - request_ctx: req.take_entries()[0].take_data(), + request_ctx: req.take_entries()[0].take_data().to_vec(), }; self.read_states.push(rs); return None; diff --git a/src/raw_node.rs b/src/raw_node.rs index ec84fe1..bc407fb 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -356,8 +356,8 @@ impl RawNode { m.set_msg_type(MessageType::MsgPropose); m.from = self.raft.id; let mut e = Entry::default(); - e.data = data; - e.context = context; + e.data = data.into(); + e.context = context.into(); m.set_entries(vec![e].into()); self.raft.step(m) } @@ -385,8 +385,8 @@ impl RawNode { m.set_msg_type(MessageType::MsgPropose); let mut e = Entry::default(); e.set_entry_type(ty); - e.data = data; - e.context = context; + e.data = data.into(); + e.context = context.into(); m.set_entries(vec![e].into()); self.raft.step(m) } @@ -728,7 +728,7 @@ impl RawNode { let mut m = Message::default(); m.set_msg_type(MessageType::MsgReadIndex); let mut e = Entry::default(); - e.data = rctx; + e.data = rctx.into(); m.set_entries(vec![e].into()); let _ = self.raft.step(m); } diff --git a/src/read_only.rs b/src/read_only.rs index 4c62637..ae238a1 100644 --- a/src/read_only.rs +++ b/src/read_only.rs @@ -85,7 +85,7 @@ impl ReadOnly { /// `m` is the original read only request message from the local or remote node. pub fn add_request(&mut self, index: u64, req: Message, self_id: u64) { let ctx = { - let key = &req.entries[0].data; + let key: &[u8] = req.entries[0].data.as_ref(); if self.pending_read_index.contains_key(key) { return; } diff --git a/src/util.rs b/src/util.rs index 9a14648..28f870d 100644 --- a/src/util.rs +++ b/src/util.rs @@ -28,7 +28,7 @@ pub const NO_LIMIT: u64 = u64::MAX; /// /// let template = { /// let mut entry = Entry::default(); -/// entry.data = "*".repeat(100).into_bytes(); +/// entry.data = "*".repeat(100).into_bytes().into(); /// entry /// }; ///