diff --git a/ikv/src/controller/index_builder.rs b/ikv/src/controller/index_builder.rs index 53d21f0..fe4941b 100644 --- a/ikv/src/controller/index_builder.rs +++ b/ikv/src/controller/index_builder.rs @@ -11,46 +11,38 @@ use crate::proto::generated_proto::index::CKVIndexHeader; use super::index_loader; -pub struct IndexBuilder { - index: CKVIndex, -} +pub struct IndexBuilder {} impl IndexBuilder { - pub fn new(config: &IKVStoreConfig) -> anyhow::Result { - // Load index - index_loader::load_index(config)?; - let index = CKVIndex::open_or_create(config)?; - - Ok(Self { index }) - } - // NOTE: callers must cleanup their working directories - pub fn build_and_export(self, config: &IKVStoreConfig) -> anyhow::Result<()> { - info!("Starting base index build."); - // set index headers - { - let mut header = CKVIndexHeader::new(); - header.base_index_epoch_millis = - SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64; - self.index.write_index_header(&header)?; - } + pub fn build_and_export(config: &IKVStoreConfig) -> anyhow::Result<()> { + // Download and load previous base index + info!("Loading previous base index"); + index_loader::load_index(config)?; + let index = CKVIndex::open_or_create(config)?; - let arc_index = Arc::new(self.index); + let arc_index = Arc::new(index); // process writes till high watermark { + info!("Consuming pending write events till high watermark."); let processor = Arc::new(WritesProcessor::new(arc_index.clone())); let kafka_consumer = IKVKafkaConsumer::new(config, processor.clone())?; kafka_consumer.blocking_run_till_completion()?; kafka_consumer.stop(); } + let index = Arc::try_unwrap(arc_index).expect("there should be no other references"); - let mut index = Arc::try_unwrap(arc_index).expect("there should be no other references"); + info!("Starting index compaction."); + // set headers - date time of data present in this index. + let mut header = CKVIndexHeader::new(); + header.base_index_epoch_millis = + SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64; + index.write_index_header(&header)?; - // index compaction - info!("Starting base index compaction."); - index.compact()?; + // in-place compaction + index.compact_and_close()?; // upload to S3 info!("Uploading base index to S3."); diff --git a/ikv/src/ffi/jni_api.rs b/ikv/src/ffi/jni_api.rs index f041069..9df3e90 100644 --- a/ikv/src/ffi/jni_api.rs +++ b/ikv/src/ffi/jni_api.rs @@ -38,18 +38,9 @@ pub extern "system" fn Java_io_inlined_clients_IKVClientJNI_buildIndex<'local>( return; } - // initialize builder - let maybe_builder = IndexBuilder::new(&ikv_config); - if let Err(e) = maybe_builder { - let exception = format!("Cannot initialize offline index builder: {}", e.to_string()); - let _ = env.throw_new("java/lang/RuntimeException", exception); - return; - } - // build and export - let mut index_builder = maybe_builder.unwrap(); - if let Err(e) = index_builder.build_and_export(&ikv_config) { - let exception = format!("Cannot build offline index: {}", e.to_string()); + if let Err(e) = IndexBuilder::build_and_export(&ikv_config) { + let exception = format!("Cannot build offline index, error: {}", e.to_string()); let _ = env.throw_new("java/lang/RuntimeException", exception); return; } diff --git a/ikv/src/index/ckv.rs b/ikv/src/index/ckv.rs index 866d147..7a38c6c 100644 --- a/ikv/src/index/ckv.rs +++ b/ikv/src/index/ckv.rs @@ -7,10 +7,11 @@ use crate::{ schema::field::FieldId, }; use anyhow::{anyhow, bail}; +use log::info; use super::{ ckv_segment::CKVIndexSegment, header::HeaderStore, offset_store::OffsetStore, - schema_store::CKVIndexSchema, + schema_store::CKVIndexSchema, stats::CompactionStats, }; use std::{ collections::HashMap, @@ -28,6 +29,10 @@ const NUM_SEGMENTS: usize = 16; /// Memmap based row-oriented key-value index. #[derive(Debug)] pub struct CKVIndex { + // Top level usable directory + // Format: tmp/usr-mount-dir// + mount_directory: String, + // hash(key) -> PrimaryKeyIndex segments: Vec>, @@ -57,6 +62,8 @@ impl CKVIndex { let mut segments = Vec::with_capacity(NUM_SEGMENTS); for index_id in 0..NUM_SEGMENTS { let segment_mount_directory = format!("{}/index/segment_{}", mount_directory, index_id); + + // TODO: this needs to be parallelized, can take time for large indexes let segment = CKVIndexSegment::open_or_create(&segment_mount_directory)?; segments.push(RwLock::new(segment)); } @@ -68,6 +75,7 @@ impl CKVIndex { let header_store = HeaderStore::open_or_create(&mount_directory)?; Ok(Self { + mount_directory, segments, schema: RwLock::new(schema), header_store, @@ -132,18 +140,56 @@ impl CKVIndex { Ok(()) } - pub fn compact(&mut self) -> anyhow::Result<()> { - // lock all - let mut segments = Vec::with_capacity(NUM_SEGMENTS); - for i in 0..NUM_SEGMENTS { - segments.push(self.segments[i].write().unwrap()); + pub fn compact_and_close(mut self) -> anyhow::Result<(CompactionStats, CompactionStats)> { + // schema compaction, get field id mapping + let new_fid_to_old_fid = self.schema.write().unwrap().compact()?; + + // create (empty) compacted-segments + let mut compacted_segments = Vec::with_capacity(NUM_SEGMENTS); + for index_id in 0..NUM_SEGMENTS { + let segment_mount_directory = format!( + "{}/index/compacted_segment_{}", + &self.mount_directory, index_id + ); + let segment = CKVIndexSegment::open_or_create(&segment_mount_directory)?; + compacted_segments.push(RwLock::new(segment)); + } + + // loop over existing segments, copy-to-compact, and close both + let mut pre_compaction_stats: Vec = vec![]; + let mut post_compaction_stats: Vec = vec![]; + + for (segment_id, segment) in self.segments.drain(..).enumerate() { + info!( + "Starting in-place compaction of index segment: {}", + segment_id + ); + + let mut segment = segment.write().unwrap(); + let mut compacted_segment = compacted_segments[segment_id].write().unwrap(); + + pre_compaction_stats.push(segment.compaction_stats()?); + segment.copy_to_compact(&mut compacted_segment, &new_fid_to_old_fid)?; + post_compaction_stats.push(compacted_segment.compaction_stats()?); } - for mut segment in segments { - segment.compact()?; + drop(compacted_segments); + + // swap directories + for i in 0..NUM_SEGMENTS { + let segment_mount_directory = format!("{}/index/segment_{}", &self.mount_directory, i); + let compacted_segment_mount_directory = + format!("{}/index/compacted_segment_{}", &self.mount_directory, i); + std::fs::rename(&compacted_segment_mount_directory, &segment_mount_directory)?; } - Ok(()) + // print stats + let pre_stats = CompactionStats::aggregate(&pre_compaction_stats); + let post_stats = CompactionStats::aggregate(&post_compaction_stats); + info!("Pre-compaction stats: {:?}", &pre_stats); + info!("Post-compaction stats: {:?}", &post_stats); + + Ok((pre_stats, post_stats)) } /// Fetch field value for a primary key. @@ -280,7 +326,7 @@ impl CKVIndex { let index_id = fxhash::hash(&primary_key) % NUM_SEGMENTS; let mut ckv_index_segment = self.segments[index_id].write().unwrap(); - ckv_index_segment.upsert_document(&primary_key, field_ids, values)?; + ckv_index_segment.upsert_document(&primary_key, &field_ids, &values)?; Ok(()) } @@ -314,7 +360,7 @@ impl CKVIndex { let index_id = fxhash::hash(&primary_key) % NUM_SEGMENTS; let mut ckv_index_segment = self.segments[index_id].write().unwrap(); - ckv_index_segment.delete_field_values(&primary_key, field_ids)?; + ckv_index_segment.delete_field_values(&primary_key, &field_ids)?; Ok(()) } diff --git a/ikv/src/index/ckv_segment.rs b/ikv/src/index/ckv_segment.rs index 55730ec..5640999 100644 --- a/ikv/src/index/ckv_segment.rs +++ b/ikv/src/index/ckv_segment.rs @@ -1,4 +1,5 @@ use std::{ + borrow::Borrow, collections::HashMap, fs::{File, OpenOptions}, io::{self, BufReader, BufWriter, ErrorKind, Read, Seek, Write}, @@ -27,7 +28,10 @@ use crate::{ schema::field::FieldId, }; +use super::stats::CompactionStats; + const CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8M +const EMPTY_BYTE_SLICE: &[u8] = &[]; const NONE_SIZE: [u8; 4] = (-1 as i32).to_le_bytes(); #[derive(Debug)] @@ -212,26 +216,64 @@ impl CKVIndexSegment { Ok(()) } - /// Offline index build hook. - /// Does field colocation and deletes compaction to create a compressed - /// and efficient offline index. - pub fn compact(&mut self) -> anyhow::Result<()> { + pub fn copy_to_compact( + &mut self, + destination: &mut CKVIndexSegment, + new_fid_to_old_fid: &[FieldId], + ) -> anyhow::Result<()> { self.flush_writes()?; - // TODO: add linear pass to compact. + for (primary_key, offsets) in self.offset_table.iter() { + // construct document to copy + let capacity = std::cmp::min(offsets.len(), new_fid_to_old_fid.len()); + let mut field_ids = Vec::with_capacity(capacity); + let mut field_values = Vec::with_capacity(capacity); + + for new_fid in 0..new_fid_to_old_fid.len() { + let old_fid = new_fid_to_old_fid[new_fid]; + if let Some(offset) = offsets.get(old_fid as usize).copied() { + if let Some((field_type, value)) = self.read_from_mmap(offset) { + if field_type == FieldType::UNKNOWN { + // either write event in kafka stream was missing type info, or + // this node is behind on symbol list + bail!( + "Found unknown fieldType for primary-key: {}", + format!("{:?}", primary_key.as_slice()) + ); + } - // The current index files on disk have been mutated - // with ETL'ed kafka events. + let mut field_value = FieldValue::new(); + field_value.fieldType = field_type.into(); + field_value.value = value.to_vec(); + field_values.push(field_value); + + field_ids.push(new_fid as FieldId); + } + } + } + + // write to destination segment + destination.upsert_document(primary_key, &field_ids, &field_values)?; + } Ok(()) } + pub fn compaction_stats(&self) -> anyhow::Result { + return Ok(CompactionStats { + offset_table_size_bytes: self.offset_table_file_writer.get_ref().metadata()?.len(), + mmap_file_size_bytes: self.mmap_file.metadata()?.len(), + }); + } + pub fn read_field(&self, primary_key: &[u8], field_id: FieldId) -> Option> { let offsets = self.offset_table.get(primary_key)?; let maybe_offset = offsets.get(field_id as usize).copied(); if let Some(offset) = maybe_offset { - let result = self.read_from_mmap(offset)?; - return Some(result.to_vec()); + let (field_type, result) = self.read_from_mmap(offset)?; + if field_type != FieldType::UNKNOWN { + return Some(result.to_vec()); + } } None @@ -272,15 +314,19 @@ impl CKVIndexSegment { None => { dest.extend(NONE_SIZE); } - Some(value) => { - dest.extend((value.len() as i32).to_le_bytes()); - dest.extend_from_slice(value); + Some((field_type, value)) => { + if field_type == FieldType::UNKNOWN { + dest.extend(NONE_SIZE); + } else { + dest.extend((value.len() as i32).to_le_bytes()); + dest.extend_from_slice(value); + } } }; } } - fn read_from_mmap(&self, mmap_offset: usize) -> Option<&[u8]> { + fn read_from_mmap(&self, mmap_offset: usize) -> Option<(FieldType, &[u8])> { if mmap_offset == usize::MAX { return None; } @@ -299,15 +345,28 @@ impl CKVIndexSegment { let mmap_offset = mmap_offset + 2; match field_type { - FieldType::UNKNOWN => None, - FieldType::INT32 | FieldType::FLOAT32 => Some(&self.mmap[mmap_offset..mmap_offset + 4]), - FieldType::INT64 | FieldType::FLOAT64 => Some(&self.mmap[mmap_offset..mmap_offset + 8]), - FieldType::BOOLEAN => Some(&self.mmap[mmap_offset..mmap_offset + 1]), + FieldType::UNKNOWN => { + // Some unknown field-type was written to the mmap files + // can occur when this reader is behind on the FieldType.proto symbol list. + // Can be okay to ignore this when doing live reads on the index, but this + // should not be ignored during data copy (ex. compaction). + Some((field_type, EMPTY_BYTE_SLICE)) + } + FieldType::INT32 | FieldType::FLOAT32 => { + Some((field_type, &self.mmap[mmap_offset..mmap_offset + 4])) + } + FieldType::INT64 | FieldType::FLOAT64 => { + Some((field_type, &self.mmap[mmap_offset..mmap_offset + 8])) + } + FieldType::BOOLEAN => Some((field_type, &self.mmap[mmap_offset..mmap_offset + 1])), FieldType::STRING | FieldType::BYTES => { // extract size (varint decoding) let (size, bytes_read) = u32::decode_var(&self.mmap[mmap_offset..])?; let mmap_offset = mmap_offset + bytes_read; - Some(&self.mmap[mmap_offset..mmap_offset + size as usize]) + Some(( + field_type, + &self.mmap[mmap_offset..mmap_offset + size as usize], + )) } } } @@ -337,7 +396,10 @@ impl CKVIndexSegment { let mut size: usize = 2; // for storing type match field_value.fieldType.enum_value_or_default() { - FieldType::UNKNOWN => bail!("unknown feld type cannot be indexed in mmap"), + FieldType::UNKNOWN => { + // unknown types are serialized by just saving the type (=0) + size += 0 + } FieldType::INT32 | FieldType::FLOAT32 => size += 4, FieldType::INT64 | FieldType::FLOAT64 => size += 8, FieldType::BOOLEAN => size += 1, @@ -372,16 +434,26 @@ impl CKVIndexSegment { // TODO: consider being more robust by not gettings stuck on incorrect input // events - limit blast radius of bad serialization schemes in producer. - // write field_type - let field_type = (field_value.fieldType.value() as u16).to_le_bytes(); - mmap[write_offset..write_offset + 2].copy_from_slice(&field_type[..]); + // serialize field_type + if CKVIndexSegment::is_unknown_field_type(field_value) { + // we save unknown types as sentinels + // they are ignored during normal read ops but caught during compaction + let field_type = (FieldType::UNKNOWN.value() as u16).to_le_bytes(); + mmap[write_offset..write_offset + 2].copy_from_slice(&field_type[..]); + } else { + let field_type = (field_value.fieldType.value() as u16).to_le_bytes(); + mmap[write_offset..write_offset + 2].copy_from_slice(&field_type[..]); + } let mut num_bytes = 2; let write_offset = write_offset + 2; // write value num_bytes += match field_value.fieldType.enum_value_or_default() { - FieldType::UNKNOWN => bail!("unknown feld type cannot be indexed in mmap"), + FieldType::UNKNOWN => { + // no field value data required + 0 + } FieldType::INT32 | FieldType::FLOAT32 => { mmap[write_offset..write_offset + 4].copy_from_slice(&field_value.value[..]); 4 @@ -412,27 +484,39 @@ impl CKVIndexSegment { } /// Upsert field values for a document. - pub fn upsert_document( + pub fn upsert_document( &mut self, primary_key: &[u8], - field_ids: Vec, - field_values: Vec<&FieldValue>, - ) -> anyhow::Result<()> { + field_ids: &[FieldId], + field_values: &[T], + ) -> anyhow::Result<()> + where + T: Borrow, + { if primary_key.is_empty() || field_ids.is_empty() { return Ok(()); } + // Unknown field types (i.e. FieldType::UNKNOWN) handling. + // They can occur when we are behind on symbol list or upstream ingestion path didn't propelry construct the write event + // We save a sentinel which is 2 bytes for the field-type (=0), and no additonal data. + // They get ignored during normal reads, but get caught during compaction. + // + // TODO: we should have cfg flags to block kafka stream for such events (catch compaction errors earlier). + + // mmap resizing? + let mmap; { // mmap instantiation - let mut total_num_bytes = 0; - for field_value in field_values.iter() { - total_num_bytes += Self::size_of_mmap_entry(field_value)?; + let mut mmap_entry_size = 0; + for field_value in field_values.iter().map(|fv| fv.borrow()) { + mmap_entry_size += Self::size_of_mmap_entry(field_value)?; } // mmap instantiation - self.expand_mmap_if_required(self.write_offset, total_num_bytes)?; + self.expand_mmap_if_required(self.write_offset, mmap_entry_size)?; + mmap = self.mmap.deref_mut(); } - let mmap = self.mmap.deref_mut(); // propagate to disk (OffsetTableEntry.proto) let mut update_doc_fields = UpdateDocFields::new(); @@ -444,7 +528,7 @@ impl CKVIndexSegment { for i in 0..field_ids.len() { let field_id = field_ids[i]; - let field_value = &field_values[i]; + let field_value = field_values[i].borrow(); let write_offset = self.write_offset; @@ -477,11 +561,15 @@ impl CKVIndexSegment { Ok(()) } + fn is_unknown_field_type(value: &FieldValue) -> bool { + return value.fieldType.enum_value_or_default() == FieldType::UNKNOWN; + } + /// Delete field values for a document. pub fn delete_field_values( &mut self, primary_key: &[u8], - field_ids: Vec, + field_ids: &[FieldId], ) -> io::Result<()> { if primary_key.is_empty() || field_ids.is_empty() { return Ok(()); diff --git a/ikv/src/index/mod.rs b/ikv/src/index/mod.rs index b42ee42..e4d96c8 100644 --- a/ikv/src/index/mod.rs +++ b/ikv/src/index/mod.rs @@ -3,3 +3,4 @@ mod ckv_segment; mod header; pub mod offset_store; mod schema_store; +mod stats; diff --git a/ikv/src/index/schema_store.rs b/ikv/src/index/schema_store.rs index 158cee7..488ae7d 100644 --- a/ikv/src/index/schema_store.rs +++ b/ikv/src/index/schema_store.rs @@ -108,6 +108,33 @@ impl CKVIndexSchema { Ok(()) } + pub fn compact(&mut self) -> anyhow::Result> { + // calculate new-field-id -> old-field-id mapping + let mut new_fid_to_old_fid: Vec = Vec::with_capacity(self.field_name_to_id.len()); + let mut old_fid_to_field_name: HashMap = + HashMap::with_capacity(self.field_name_to_id.len()); + for (field_name, field_id) in self.field_name_to_id.iter() { + new_fid_to_old_fid.push(*field_id as FieldId); + old_fid_to_field_name.insert(*field_id as FieldId, field_name.clone()); + } + new_fid_to_old_fid.sort(); + + // change string->id mappings + let mut field_name_to_id = HashMap::with_capacity(new_fid_to_old_fid.len()); + for new_fid in 0..new_fid_to_old_fid.len() { + let old_fid = new_fid_to_old_fid[new_fid]; + let field_name = old_fid_to_field_name.get(&old_fid).unwrap(); + field_name_to_id.insert(field_name.clone(), new_fid as FieldId); + } + + // reset counter and save to disk + self.field_name_to_id = field_name_to_id; + self.field_id_counter = self.field_name_to_id.len() as u64; + self.save()?; + + Ok(new_fid_to_old_fid) + } + pub fn fetch_id_by_name(&self, field_name: &str) -> Option { self.field_name_to_id.get(field_name).copied() } diff --git a/ikv/src/index/stats.rs b/ikv/src/index/stats.rs new file mode 100644 index 0000000..032404f --- /dev/null +++ b/ikv/src/index/stats.rs @@ -0,0 +1,22 @@ +#[derive(Debug, Default)] +pub struct CompactionStats { + pub offset_table_size_bytes: u64, + pub mmap_file_size_bytes: u64, +} + +impl CompactionStats { + pub fn aggregate(stats: &[CompactionStats]) -> CompactionStats { + let mut offset_table_size_bytes = 0; + let mut mmap_file_size_bytes = 0; + + for cs in stats.iter() { + offset_table_size_bytes += cs.offset_table_size_bytes; + mmap_file_size_bytes += cs.mmap_file_size_bytes; + } + + return CompactionStats { + offset_table_size_bytes, + mmap_file_size_bytes, + }; + } +}