Skip to content

Commit

Permalink
use bytes for protobuf (#438)
Browse files Browse the repository at this point in the history
If `protobuf-codec` is enabled, `bytes` will be used to replace `Vec<u8>` for protobuf `bytes` type.

However if `prost-codec` is enabled,  `Vec<u8>` will be still used.

Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu authored May 24, 2021
1 parent 64b99c7 commit a891936
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 50 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ members = ["proto", "harness", "datadriven"]
default = ["protobuf-codec", "default-logger"]
# Enable failpoints
failpoints = ["fail/failpoints"]
protobuf-codec = ["raft-proto/protobuf-codec"]
protobuf-codec = ["raft-proto/protobuf-codec", "bytes"]
prost-codec = ["raft-proto/prost-codec"]
default-logger = ["slog-stdlog", "slog-envlogger", "slog-term"]

# Make sure to synchronize updates with Harness.
[dependencies]
bytes = { version = "1", optional = true }
fxhash = "0.2.1"
fail = { version = "0.3", optional = true }
getset = "0.0.9"
Expand Down
6 changes: 3 additions & 3 deletions benches/suites/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode<MemStorage> {
let mut entries = vec![];
for i in 1..101 {
let mut e = Entry::default();
e.data = vec![0; 32 * 1024];
e.context = vec![];
e.data = vec![0; 32 * 1024].into();
e.context = vec![].into();
e.index = i;
e.term = 1;
entries.push(e);
Expand All @@ -129,7 +129,7 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode<MemStorage> {
node.raft.on_persist_entries(101, 1);

let mut snap = Snapshot::default();
snap.set_data(vec![0; 8 * 1024 * 1024]);
snap.set_data(vec![0; 8 * 1024 * 1024].into());
// We don't care about the contents in snapshot here since it won't be applied.
snap.set_metadata(SnapshotMetadata::default());
for _ in 0..100 {
Expand Down
42 changes: 21 additions & 21 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ fn test_progress_committed_index() {
// #1 test append entries
// append entries between 1 and 2
let mut test_entries = Entry::default();
test_entries.data = b"testdata".to_vec();
test_entries.data = (b"testdata" as &'static [u8]).into();
let m = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]);
nt.cut(1, 3);
nt.send(vec![m.clone(), m]);
Expand Down Expand Up @@ -356,7 +356,7 @@ fn test_progress_paused() {
m.to = 1;
m.set_msg_type(MessageType::MsgPropose);
let mut e = Entry::default();
e.data = b"some_data".to_vec();
e.data = (b"some_data" as &'static [u8]).into();
m.entries = vec![e].into();
raft.step(m.clone()).expect("");
raft.step(m.clone()).expect("");
Expand Down Expand Up @@ -1471,7 +1471,7 @@ fn test_raft_frees_read_only_mem() {
// acknowledge the authority of the leader.
// more info: raft dissertation 6.4, step 3.
let mut m = new_message(2, 1, MessageType::MsgHeartbeatResponse, 0);
m.context = vec_ctx.clone();
m.context = vec_ctx.clone().into();
sm.step(m).expect("");
assert_eq!(sm.read_only.read_index_queue.len(), 0);
assert_eq!(sm.read_only.pending_read_index.len(), 0);
Expand Down Expand Up @@ -3304,7 +3304,7 @@ fn test_commit_after_remove_node() -> Result<()> {
cc.set_change_type(ConfChangeType::RemoveNode);
cc.node_id = 2;
let ccdata = cc.write_to_bytes().unwrap();
entry.data = ccdata;
entry.data = ccdata.into();
msg.mut_entries().push(entry);
r.step(msg).expect("");
// Stabilize the log and make sure nothing is committed yet.
Expand Down Expand Up @@ -3334,7 +3334,7 @@ fn test_commit_after_remove_node() -> Result<()> {
let ents = next_ents(&mut r, &s);
assert_eq!(ents.len(), 1);
assert_eq!(ents[0].get_entry_type(), EntryType::EntryNormal);
assert_eq!(ents[0].data, b"hello");
assert_eq!(ents[0].data.as_ref(), b"hello");

Ok(())
}
Expand Down Expand Up @@ -4450,7 +4450,7 @@ fn test_conf_change_check_before_campaign() {
let mut cc = ConfChange::default();
cc.set_change_type(ConfChangeType::RemoveNode);
cc.node_id = 3;
e.data = protobuf::Message::write_to_bytes(&cc).unwrap();
e.data = protobuf::Message::write_to_bytes(&cc).unwrap().into();
m.mut_entries().push(e);
nt.send(vec![m]);

Expand Down Expand Up @@ -4532,10 +4532,10 @@ fn test_advance_commit_index_by_vote_request(use_prevote: bool) {
let mut e = Entry::default();
if let Some(v1) = cc.as_v1() {
e.set_entry_type(EntryType::EntryConfChange);
e.set_data(v1.write_to_bytes().unwrap());
e.set_data(v1.write_to_bytes().unwrap().into());
} else {
e.set_entry_type(EntryType::EntryConfChangeV2);
e.set_data(cc.as_v2().write_to_bytes().unwrap());
e.set_data(cc.as_v2().write_to_bytes().unwrap().into());
}

// propose a confchange entry but don't let it commit
Expand Down Expand Up @@ -4684,10 +4684,10 @@ fn test_advance_commit_index_by_vote_response(use_prevote: bool) {
let mut e = Entry::default();
if let Some(v1) = cc.as_v1() {
e.set_entry_type(EntryType::EntryConfChange);
e.set_data(v1.write_to_bytes().unwrap());
e.set_data(v1.write_to_bytes().unwrap().into());
} else {
e.set_entry_type(EntryType::EntryConfChangeV2);
e.set_data(cc.as_v2().write_to_bytes().unwrap());
e.set_data(cc.as_v2().write_to_bytes().unwrap().into());
}

// propose a confchange entry but don't let it commit
Expand Down Expand Up @@ -4822,7 +4822,7 @@ fn prepare_request_snapshot() -> (Network, Snapshot) {
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

let mut test_entries = Entry::default();
test_entries.data = b"testdata".to_vec();
test_entries.data = (b"testdata" as &'static [u8]).into();
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]);
nt.send(vec![msg.clone(), msg]);
assert_eq!(nt.peers[&1].raft_log.committed, 14);
Expand All @@ -4841,7 +4841,7 @@ fn prepare_request_snapshot() -> (Network, Snapshot) {

// Commit a new raft log.
let mut test_entries = Entry::default();
test_entries.data = b"testdata".to_vec();
test_entries.data = (b"testdata" as &'static [u8]).into();
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]);
nt.send(vec![msg]);

Expand Down Expand Up @@ -4877,7 +4877,7 @@ fn test_follower_request_snapshot() {

// New proposes can not be replicated to peer 2.
let mut test_entries = Entry::default();
test_entries.data = b"testdata".to_vec();
test_entries.data = (b"testdata" as &'static [u8]).into();
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]);
nt.send(vec![msg.clone()]);
assert_eq!(nt.peers[&1].raft_log.committed, 16);
Expand Down Expand Up @@ -5032,7 +5032,7 @@ fn test_request_snapshot_step_down() {
// Commit a new entry and leader steps down while peer 2 is isolated.
nt.isolate(2);
let mut test_entries = Entry::default();
test_entries.data = b"testdata".to_vec();
test_entries.data = (b"testdata" as &'static [u8]).into();
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]);
nt.send(vec![msg]);
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);
Expand Down Expand Up @@ -5393,7 +5393,7 @@ fn test_read_when_quorum_becomes_less() {
m.to = 1;
m.set_msg_type(MessageType::MsgReadIndex);
let mut e = Entry::default();
e.data = b"abcdefg".to_vec();
e.data = (b"abcdefg" as &'static [u8]).into();
m.set_entries(vec![e].into());
network.dispatch(vec![m]).unwrap();

Expand Down Expand Up @@ -5425,7 +5425,7 @@ fn test_uncommitted_entries_size_limit() {
let mut nt = Network::new_with_config(vec![None, None, None], config, &l);
let data = b"hello world!".to_vec();
let mut entry = Entry::default();
entry.data = data.to_vec();
entry.data = data.to_vec().into();
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
Expand All @@ -5447,7 +5447,7 @@ fn test_uncommitted_entries_size_limit() {

// after reduce, new proposal should be accepted
let mut entry = Entry::default();
entry.data = data;
entry.data = data.into();
entry.index = 3;
nt.peers
.get_mut(&1)
Expand All @@ -5458,14 +5458,14 @@ fn test_uncommitted_entries_size_limit() {
// a huge proposal should be accepted when there is no uncommitted entry,
// even it's bigger than max_uncommitted_size
let mut entry = Entry::default();
entry.data = b"hello world and raft".to_vec();
entry.data = (b"hello world and raft" as &'static [u8]).into();
let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
let result = nt.dispatch(vec![long_msg].to_vec());
assert!(result.is_ok());

// but another huge one will be dropped
let mut entry = Entry::default();
entry.data = b"hello world and raft".to_vec();
entry.data = (b"hello world and raft" as &'static [u8]).into();
let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
let result = nt.dispatch(vec![long_msg].to_vec());
assert!(!result.is_ok());
Expand All @@ -5488,7 +5488,7 @@ fn test_uncommitted_entry_after_leader_election() {
let mut nt = Network::new_with_config(vec![None, None, None, None, None], config, &l);
let data = b"hello world!".to_vec();
let mut entry = Entry::default();
entry.data = data;
entry.data = data.into();
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
Expand Down Expand Up @@ -5524,7 +5524,7 @@ fn test_uncommitted_state_advance_ready_from_last_term() {

let data = b"hello world!".to_vec();
let mut ent = Entry::default();
ent.data = data.clone();
ent.data = data.clone().into();

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

Expand Down
8 changes: 4 additions & 4 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ fn test_raw_node_read_index_to_old_leader() {
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

let mut test_entries = Entry::default();
test_entries.data = b"testdata".to_vec();
test_entries.data = (b"testdata" as &'static [u8]).into();

// send readindex request to r2(follower)
let _ = nt.peers.get_mut(&2).unwrap().step(new_message_with_entries(
Expand Down Expand Up @@ -341,7 +341,7 @@ fn test_raw_node_propose_and_conf_change() {
}
context = b"manual".to_vec();
let mut cc = conf_change_v2(vec![]);
cc.set_context(context.clone());
cc.set_context(context.clone().into());
raw_node.propose_conf_change(vec![], cc).unwrap();
rd = raw_node.ready();
}
Expand Down Expand Up @@ -729,7 +729,7 @@ fn test_skip_bcast_commit() {

// Without bcast commit, followers will not update its commit index immediately.
let mut test_entries = Entry::default();
test_entries.data = b"testdata".to_vec();
test_entries.data = (b"testdata" as &'static [u8]).into();
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries]);
nt.send(vec![msg.clone()]);
assert_eq!(nt.peers[&1].raft_log.committed, 2);
Expand Down Expand Up @@ -767,7 +767,7 @@ fn test_skip_bcast_commit() {
let data = cc.write_to_bytes().unwrap();
let mut cc_entry = Entry::default();
cc_entry.set_entry_type(EntryType::EntryConfChange);
cc_entry.data = data;
cc_entry.data = data.into();
nt.send(vec![new_message_with_entries(
1,
1,
Expand Down
2 changes: 1 addition & 1 deletion harness/tests/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub fn new_entry(term: u64, index: u64, data: Option<&str>) -> Entry {
e.index = index;
e.term = term;
if let Some(d) = data {
e.data = d.as_bytes().to_vec();
e.data = d.as_bytes().to_vec().into();
}
e
}
Expand Down
3 changes: 2 additions & 1 deletion proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ build = "build.rs"

[features]
default = ["protobuf-codec"]
protobuf-codec = ["protobuf-build/protobuf-codec"]
protobuf-codec = ["protobuf-build/protobuf-codec", "bytes"]
prost-codec = ["protobuf-build/prost-codec", "prost", "lazy_static"]

[build-dependencies]
protobuf-build = { version = "0.12", default-features = false }

[dependencies]
bytes = { version = "1", optional = true }
lazy_static = { version = "1", optional = true }
prost = { version = "0.7", optional = true }
protobuf = "2"
1 change: 1 addition & 0 deletions proto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ fn main() {
Builder::new()
.search_dir_for_protos(&format!("{}/proto", base))
.includes(&[format!("{}/include", base), format!("{}/proto", base)])
.include_google_protos()
.generate()
}
3 changes: 3 additions & 0 deletions proto/proto/eraftpb.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
syntax = "proto3";
package eraftpb;

import "rustproto.proto";
option (rustproto.carllerche_bytes_for_bytes_all) = true;

enum EntryType {
EntryNormal = 0;
EntryConfChange = 1;
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,10 @@ pub mod storage;
mod tracker;
pub mod util;

pub use crate::raft::{vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID, INVALID_INDEX};
pub use crate::raft::{
vote_resp_msg_type, Raft, SoftState, StateRole, CAMPAIGN_ELECTION, CAMPAIGN_PRE_ELECTION,
CAMPAIGN_TRANSFER, INVALID_ID, INVALID_INDEX,
};
pub use confchange::{Changer, MapChange};
pub use config::Config;
pub use errors::{Error, Result, StorageError};
Expand Down
28 changes: 17 additions & 11 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@ use crate::{confchange, Progress, ProgressState, ProgressTracker};

// CAMPAIGN_PRE_ELECTION represents the first phase of a normal election when
// Config.pre_vote is true.
const CAMPAIGN_PRE_ELECTION: &[u8] = b"CampaignPreElection";
#[doc(hidden)]
pub const CAMPAIGN_PRE_ELECTION: &[u8] = b"CampaignPreElection";
#[doc(hidden)]
// CAMPAIGN_ELECTION represents a normal (time-based) election (the second phase
// of the election when Config.pre_vote is true).
const CAMPAIGN_ELECTION: &[u8] = b"CampaignElection";
#[doc(hidden)]
pub const CAMPAIGN_ELECTION: &[u8] = b"CampaignElection";
#[doc(hidden)]
// CAMPAIGN_TRANSFER represents the type of leader transfer.
const CAMPAIGN_TRANSFER: &[u8] = b"CampaignTransfer";
#[doc(hidden)]
pub const CAMPAIGN_TRANSFER: &[u8] = b"CampaignTransfer";

/// The role of the node.
#[derive(Debug, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -824,7 +829,7 @@ impl<T: Storage> RaftCore<T> {
let commit = cmp::min(pr.matched, self.raft_log.committed);
m.commit = commit;
if let Some(context) = ctx {
m.context = context;
m.context = context.into();
}
self.send(m, msgs);
}
Expand Down Expand Up @@ -1196,10 +1201,11 @@ impl<T: Storage> Raft<T> {
.count()
}

/// Campaign to attempt to become a leader.
///
/// If prevote is enabled, this is handled as well.
pub fn campaign(&mut self, campaign_type: &[u8]) {
// Campaign to attempt to become a leader.
//
// If prevote is enabled, this is handled as well.
#[doc(hidden)]
pub fn campaign(&mut self, campaign_type: &'static [u8]) {
let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
self.become_pre_candidate();
// Pre-vote RPCs are sent for next term before we've incremented self.term.
Expand Down Expand Up @@ -1238,7 +1244,7 @@ impl<T: Storage> Raft<T> {
m.commit = commit;
m.commit_term = commit_term;
if campaign_type == CAMPAIGN_TRANSFER {
m.context = campaign_type.to_vec();
m.context = campaign_type.into();
}
self.r.send(m, &mut self.msgs);
}
Expand Down Expand Up @@ -2325,7 +2331,7 @@ impl<T: Storage> Raft<T> {
}
let rs = ReadState {
index: m.index,
request_ctx: m.take_entries()[0].take_data(),
request_ctx: m.take_entries()[0].take_data().to_vec(),
};
self.read_states.push(rs);
// `index` and `term` in MsgReadIndexResp is the leader's commit index and its current term,
Expand Down Expand Up @@ -2776,7 +2782,7 @@ impl<T: Storage> Raft<T> {
if req.from == INVALID_ID || req.from == self.id {
let rs = ReadState {
index,
request_ctx: req.take_entries()[0].take_data(),
request_ctx: req.take_entries()[0].take_data().to_vec(),
};
self.read_states.push(rs);
return None;
Expand Down
Loading

0 comments on commit a891936

Please sign in to comment.