Skip to content

Commit

Permalink
feat: raft log store (part 2) (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx authored Apr 7, 2022
1 parent be06036 commit 21db7ba
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 47 deletions.
25 changes: 17 additions & 8 deletions storage/src/raft_log_store/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use runkv_common::coding::CompressionAlgorithm;

use super::DEFAULT_LOG_BATCH_SIZE;
use crate::raft_log_store::error::RaftLogStoreError;
use crate::utils::{
crc32sum, get_length_prefixed_slice, put_length_prefixed_slice, BufExt, BufMutExt,
};
use crate::utils::{crc32sum, get_length_prefixed_slice, put_length_prefixed_slice};

#[derive(PartialEq, Eq, Clone, Debug)]
pub enum Entry {
Expand Down Expand Up @@ -70,6 +68,7 @@ pub struct RaftLogBatch {
term: u64,
first_index: u64,
offsets: Vec<usize>,
/// Note: Only used for encoding.
data: Vec<u8>,
}

Expand Down Expand Up @@ -101,6 +100,10 @@ impl RaftLogBatch {
self.group
}

pub fn first_index(&self) -> u64 {
self.first_index
}

pub fn term(&self) -> u64 {
self.term
}
Expand All @@ -110,6 +113,12 @@ impl RaftLogBatch {
self.offsets.len() - 1
}

pub fn data_segment_location(&self) -> (usize, usize) {
let offset = 8 + 8 + 8 + 8 + self.offsets.len() * 4 + 8;
let len = self.offsets[self.offsets.len() - 1];
(offset, len)
}

pub fn location(&self, index: usize) -> (usize, usize) {
debug_assert!(index < self.len() - 1);
let offset = self.offsets[index];
Expand All @@ -120,11 +129,11 @@ impl RaftLogBatch {
/// Format:
///
/// ```plain
/// | group (8B) | term (8B) | first index (8B) | N+1 (8B) | offset 0 | ... | offset (N-1) | offset N (phantom) |
/// | group (8B) | term (8B) | first index (8B) | N+1 (8B) | offset 0 (4B) | ... | offset (N-1) | offset N (phantom) |
/// | data segment len (8B) | data block (compressed) | compression algorithm (1B) | crc32sum (4B) |
/// | <---------- data segment ------------------------------------------->|
/// ```
pub fn encode(&self, mut buf_meta: &mut Vec<u8>, buf_data: &mut Vec<u8>) {
pub fn encode(&self, buf_meta: &mut Vec<u8>, buf_data: &mut Vec<u8>) {
debug_assert!(!self.offsets.is_empty());

// Encode meta.
Expand All @@ -133,7 +142,7 @@ impl RaftLogBatch {
buf_meta.put_u64_le(self.first_index);
buf_meta.put_u64_le(self.offsets.len() as u64);
for offset in self.offsets.iter() {
buf_meta.put_var_u32(*offset as u32);
buf_meta.put_u32_le(*offset as u32);
}

// Encode data.
Expand All @@ -160,14 +169,14 @@ impl RaftLogBatch {
}

/// Decode meta only. [`RaftLogBatch.data`] will be left empty.
pub fn decode(mut buf: &mut &[u8]) -> Self {
pub fn decode(buf: &mut &[u8]) -> Self {
let group = buf.get_u64_le();
let term = buf.get_u64_le();
let first_index = buf.get_u64_le();
let offsets_len = buf.get_u64_le() as usize;
let mut offsets = Vec::with_capacity(offsets_len);
for _ in 0..offsets_len {
let offset = buf.get_var_u32() as usize;
let offset = buf.get_u32_le() as usize;
offsets.push(offset);
}
let data_segment_len = buf.get_u64_le() as usize;
Expand Down
4 changes: 4 additions & 0 deletions storage/src/raft_log_store/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
pub enum RaftLogStoreError {
#[error("group {0} not exists")]
GroupNotExists(u64),
#[error("group {0} already exists")]
GroupAlreadyExists(u64),
#[error("encode error: {0}")]
EncodeError(String),
#[error("raft log gap exists: [{start}, {end})")]
RaftLogGap { start: u64, end: u64 },
#[error("other: {0}")]
Other(String),
}
Expand Down
36 changes: 18 additions & 18 deletions storage/src/raft_log_store/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@ use crate::entry::Entry;
use crate::error::{Error, Result};

#[derive(Clone, Debug)]
pub struct PipeLogOptions {
pub struct LogOptions {
path: String,
log_file_capacity: usize,
}

struct PipeLogCore {
struct LogCore {
active_file: File,
frozen_files: Vec<File>,
first_log_file_id: u64,
}

pub struct PipeLog {
pub struct Log {
path: String,
log_file_capacity: usize,
core: Mutex<PipeLogCore>,
core: Mutex<LogCore>,
}

impl PipeLog {
pub async fn open(options: PipeLogOptions) -> Result<Self> {
impl Log {
pub async fn open(options: LogOptions) -> Result<Self> {
create_dir_all(&options.path).await?;
let (frozen_files, first_log_file_id) = {
let mut frozen_files = vec![];
Expand Down Expand Up @@ -68,7 +68,7 @@ impl PipeLog {
let active_file_id = first_log_file_id + frozen_files.len() as u64 + 1;
let active_file = Self::new_active_file(&options.path, active_file_id).await?;

let core = PipeLogCore {
let core = LogCore {
active_file,
frozen_files,
first_log_file_id,
Expand All @@ -89,19 +89,19 @@ impl PipeLog {
Ok(())
}

pub async fn append(&self, entries: Vec<Entry>) -> Result<()> {
pub async fn push(&self, entry: Entry) -> Result<(usize, usize)> {
let mut guard = self.core.lock().await;
let start = guard.active_file.metadata().await?.len() as usize;
let mut buf = Vec::with_capacity(DEFAULT_LOG_BATCH_SIZE);
for entry in entries {
entry.encode(&mut buf);
}
entry.encode(&mut buf);
guard.active_file.write_all(&buf).await?;
guard.active_file.sync_data().await?;
if guard.active_file.metadata().await?.len() as usize >= self.log_file_capacity {
let end = guard.active_file.metadata().await?.len() as usize;
if end >= self.log_file_capacity {
drop(guard);
self.rotate().await?;
}
Ok(())
Ok((start, end - start))
}

pub async fn read(&self, log_file_id: u64, offset: u64, len: usize) -> Result<Vec<u8>> {
Expand All @@ -119,7 +119,7 @@ impl PipeLog {
}
}

impl PipeLog {
impl Log {
async fn rotate(&self) -> Result<()> {
let mut guard = self.core.lock().await;
// Sync old active file.
Expand Down Expand Up @@ -172,17 +172,17 @@ mod tests {
#[test(tokio::test)]
async fn test_pipe_log_recovery() {
let tempdir = tempfile::tempdir().unwrap();
let options = PipeLogOptions {
let options = LogOptions {
path: tempdir.path().to_str().unwrap().to_string(),
// Estimated size of each compressed entry is 111.
log_file_capacity: 100,
};
let log = PipeLog::open(options.clone()).await.unwrap();
let log = Log::open(options.clone()).await.unwrap();
let entries = generate_entries(4, 16, vec![b'x'; 64]);
assert_eq!(entries.len(), 4);

for entry in entries.iter().cloned() {
log.append(vec![entry]).await.unwrap();
log.push(entry).await.unwrap();
}
assert_eq!(log.core.lock().await.frozen_files.len(), 4);
let mut buf = vec![];
Expand All @@ -202,7 +202,7 @@ mod tests {

// Recover pipe log.
drop(log);
let log = PipeLog::open(options).await.unwrap();
let log = Log::open(options).await.unwrap();
assert_eq!(log.core.lock().await.frozen_files.len(), 5);
let mut buf = vec![];
for i in 0..4 {
Expand Down
Loading

0 comments on commit 21db7ba

Please sign in to comment.