Skip to content

Commit

Permalink
Merge pull request #172 from pushkarmoi/pugupta/compact2
Browse files Browse the repository at this point in the history
copy to compact implementation
  • Loading branch information
pushkarmoi authored Apr 27, 2024
2 parents d0b91cc + 0afa060 commit 264e08d
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 81 deletions.
42 changes: 17 additions & 25 deletions ikv/src/controller/index_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,38 @@ use crate::proto::generated_proto::index::CKVIndexHeader;

use super::index_loader;

pub struct IndexBuilder {
index: CKVIndex,
}
pub struct IndexBuilder {}

impl IndexBuilder {
pub fn new(config: &IKVStoreConfig) -> anyhow::Result<Self> {
// Load index
index_loader::load_index(config)?;
let index = CKVIndex::open_or_create(config)?;

Ok(Self { index })
}

// NOTE: callers must cleanup their working directories
pub fn build_and_export(self, config: &IKVStoreConfig) -> anyhow::Result<()> {
info!("Starting base index build.");

// set index headers
{
let mut header = CKVIndexHeader::new();
header.base_index_epoch_millis =
SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
self.index.write_index_header(&header)?;
}
pub fn build_and_export(config: &IKVStoreConfig) -> anyhow::Result<()> {
// Download and load previous base index
info!("Loading previous base index");
index_loader::load_index(config)?;
let index = CKVIndex::open_or_create(config)?;

let arc_index = Arc::new(self.index);
let arc_index = Arc::new(index);

// process writes till high watermark
{
info!("Consuming pending write events till high watermark.");
let processor = Arc::new(WritesProcessor::new(arc_index.clone()));
let kafka_consumer = IKVKafkaConsumer::new(config, processor.clone())?;
kafka_consumer.blocking_run_till_completion()?;
kafka_consumer.stop();
}
let index = Arc::try_unwrap(arc_index).expect("there should be no other references");

let mut index = Arc::try_unwrap(arc_index).expect("there should be no other references");
info!("Starting index compaction.");
// set headers - date time of data present in this index.
let mut header = CKVIndexHeader::new();
header.base_index_epoch_millis =
SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
index.write_index_header(&header)?;

// index compaction
info!("Starting base index compaction.");
index.compact()?;
// in-place compaction
index.compact_and_close()?;

// upload to S3
info!("Uploading base index to S3.");
Expand Down
13 changes: 2 additions & 11 deletions ikv/src/ffi/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,9 @@ pub extern "system" fn Java_io_inlined_clients_IKVClientJNI_buildIndex<'local>(
return;
}

// initialize builder
let maybe_builder = IndexBuilder::new(&ikv_config);
if let Err(e) = maybe_builder {
let exception = format!("Cannot initialize offline index builder: {}", e.to_string());
let _ = env.throw_new("java/lang/RuntimeException", exception);
return;
}

// build and export
let mut index_builder = maybe_builder.unwrap();
if let Err(e) = index_builder.build_and_export(&ikv_config) {
let exception = format!("Cannot build offline index: {}", e.to_string());
if let Err(e) = IndexBuilder::build_and_export(&ikv_config) {
let exception = format!("Cannot build offline index, error: {}", e.to_string());
let _ = env.throw_new("java/lang/RuntimeException", exception);
return;
}
Expand Down
68 changes: 57 additions & 11 deletions ikv/src/index/ckv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use crate::{
schema::field::FieldId,
};
use anyhow::{anyhow, bail};
use log::info;

use super::{
ckv_segment::CKVIndexSegment, header::HeaderStore, offset_store::OffsetStore,
schema_store::CKVIndexSchema,
schema_store::CKVIndexSchema, stats::CompactionStats,
};
use std::{
collections::HashMap,
Expand All @@ -28,6 +29,10 @@ const NUM_SEGMENTS: usize = 16;
/// Memmap based row-oriented key-value index.
#[derive(Debug)]
pub struct CKVIndex {
// Top level usable directory
// Format: tmp/usr-mount-dir/<storename>/<partition>
mount_directory: String,

// hash(key) -> PrimaryKeyIndex
segments: Vec<RwLock<CKVIndexSegment>>,

Expand Down Expand Up @@ -57,6 +62,8 @@ impl CKVIndex {
let mut segments = Vec::with_capacity(NUM_SEGMENTS);
for index_id in 0..NUM_SEGMENTS {
let segment_mount_directory = format!("{}/index/segment_{}", mount_directory, index_id);

// TODO: this needs to be parallelized, can take time for large indexes
let segment = CKVIndexSegment::open_or_create(&segment_mount_directory)?;
segments.push(RwLock::new(segment));
}
Expand All @@ -68,6 +75,7 @@ impl CKVIndex {
let header_store = HeaderStore::open_or_create(&mount_directory)?;

Ok(Self {
mount_directory,
segments,
schema: RwLock::new(schema),
header_store,
Expand Down Expand Up @@ -132,18 +140,56 @@ impl CKVIndex {
Ok(())
}

pub fn compact(&mut self) -> anyhow::Result<()> {
// lock all
let mut segments = Vec::with_capacity(NUM_SEGMENTS);
for i in 0..NUM_SEGMENTS {
segments.push(self.segments[i].write().unwrap());
pub fn compact_and_close(mut self) -> anyhow::Result<(CompactionStats, CompactionStats)> {
// schema compaction, get field id mapping
let new_fid_to_old_fid = self.schema.write().unwrap().compact()?;

// create (empty) compacted-segments
let mut compacted_segments = Vec::with_capacity(NUM_SEGMENTS);
for index_id in 0..NUM_SEGMENTS {
let segment_mount_directory = format!(
"{}/index/compacted_segment_{}",
&self.mount_directory, index_id
);
let segment = CKVIndexSegment::open_or_create(&segment_mount_directory)?;
compacted_segments.push(RwLock::new(segment));
}

// loop over existing segments, copy-to-compact, and close both
let mut pre_compaction_stats: Vec<CompactionStats> = vec![];
let mut post_compaction_stats: Vec<CompactionStats> = vec![];

for (segment_id, segment) in self.segments.drain(..).enumerate() {
info!(
"Starting in-place compaction of index segment: {}",
segment_id
);

let mut segment = segment.write().unwrap();
let mut compacted_segment = compacted_segments[segment_id].write().unwrap();

pre_compaction_stats.push(segment.compaction_stats()?);
segment.copy_to_compact(&mut compacted_segment, &new_fid_to_old_fid)?;
post_compaction_stats.push(compacted_segment.compaction_stats()?);
}

for mut segment in segments {
segment.compact()?;
drop(compacted_segments);

// swap directories
for i in 0..NUM_SEGMENTS {
let segment_mount_directory = format!("{}/index/segment_{}", &self.mount_directory, i);
let compacted_segment_mount_directory =
format!("{}/index/compacted_segment_{}", &self.mount_directory, i);
std::fs::rename(&compacted_segment_mount_directory, &segment_mount_directory)?;
}

Ok(())
// print stats
let pre_stats = CompactionStats::aggregate(&pre_compaction_stats);
let post_stats = CompactionStats::aggregate(&post_compaction_stats);
info!("Pre-compaction stats: {:?}", &pre_stats);
info!("Post-compaction stats: {:?}", &post_stats);

Ok((pre_stats, post_stats))
}

/// Fetch field value for a primary key.
Expand Down Expand Up @@ -280,7 +326,7 @@ impl CKVIndex {

let index_id = fxhash::hash(&primary_key) % NUM_SEGMENTS;
let mut ckv_index_segment = self.segments[index_id].write().unwrap();
ckv_index_segment.upsert_document(&primary_key, field_ids, values)?;
ckv_index_segment.upsert_document(&primary_key, &field_ids, &values)?;
Ok(())
}

Expand Down Expand Up @@ -314,7 +360,7 @@ impl CKVIndex {

let index_id = fxhash::hash(&primary_key) % NUM_SEGMENTS;
let mut ckv_index_segment = self.segments[index_id].write().unwrap();
ckv_index_segment.delete_field_values(&primary_key, field_ids)?;
ckv_index_segment.delete_field_values(&primary_key, &field_ids)?;

Ok(())
}
Expand Down
Loading

0 comments on commit 264e08d

Please sign in to comment.