Skip to content

Commit

Permalink
add qbg-handler
Browse files Browse the repository at this point in the history
  • Loading branch information
datelier committed Jan 22, 2025
1 parent 8179e7d commit 7256b3e
Show file tree
Hide file tree
Showing 15 changed files with 1,098 additions and 118 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/bin/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ edition = "2021"

[dependencies]
algorithm = { version = "0.1.0", path = "../../libs/algorithm" }
qbg = { version = "0.1.0", path = "../../libs/algorithms/qbg" }
anyhow = "1.0.88"
cargo = "0.81.0"
prost = "0.13.2"
Expand Down
15 changes: 12 additions & 3 deletions rust/bin/agent/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,32 @@
mod common;
pub mod index;
pub mod insert;
pub mod object;
pub mod remove;
pub mod search;
pub mod update;
pub mod upsert;
use std::sync::Arc;
use tokio::sync::RwLock;

pub struct Agent {
s: Box<dyn algorithm::ANN>,
s: Arc<RwLock<dyn algorithm::ANN>>,
name: String,
ip: String,
resource_type: String,
api_name: String,
}

impl Agent {
pub fn new(s: impl algorithm::ANN + 'static, name: &str, ip: &str, resource_type: &str, api_name: &str) -> Self {
pub fn new(
s: impl algorithm::ANN + 'static,
name: &str,
ip: &str,
resource_type: &str,
api_name: &str,
) -> Self {
Self {
s: Box::new(s),
s: Arc::new(RwLock::new(s)),
name: name.to_string(),
ip: ip.to_string(),
resource_type: resource_type.to_string(),
Expand Down
104 changes: 98 additions & 6 deletions rust/bin/agent/src/handler/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,109 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
use algorithm::Error;
use proto::{
core::v1::agent_server,
payload::v1::{control, info, Empty},
vald::v1::index_server,
};
use std::collections::HashMap;
use tonic::{Code, Status};
use tonic_types::{ErrorDetails, PreconditionViolation, StatusExt};

#[tonic::async_trait]
impl agent_server::Agent for super::Agent {
async fn create_index(
&self,
_request: tonic::Request<control::CreateIndexRequest>,
request: tonic::Request<control::CreateIndexRequest>,
) -> std::result::Result<tonic::Response<Empty>, tonic::Status> {
todo!()
println!("Recieved a request from {:?}", request.remote_addr());
let req = request.get_ref();
let pool_size = req.pool_size;
let hostname = cargo::util::hostname()?;
let domain = hostname.to_str().unwrap();
let res = Empty {};
{
let mut s = self.s.write().await;
let result = s.create_index();
match result {
Err(err) => {
let metadata = HashMap::new();
let resource_type = self.resource_type.clone() + "/qbg.CreateIndex";
let resource_name = format!("{}: {}({})", self.api_name, self.name, self.ip);
let status = match err {
Error::UncommittedIndexNotFound {} => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_precondition_failure(vec![PreconditionViolation::new(
"uncommitted index is empty",
"failed to CreateIndex operation caused by empty uncommitted indices",
err.to_string(),
)]);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::FailedPrecondition,
format!("CreateIndex API failed to create indexes pool_size = {} due to the precondition failure, error: {}", pool_size, err.to_string()),
err_details,
)
}
Error::FlushingIsInProgress {} => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::Aborted,
"CreateIndex API aborted to process create indexes request due to flushing indices is in progress",
err_details,
)
}
_ => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::Internal,
format!("CreateIndex API failed to create indexes pool_size = {}, error: {}", pool_size, err.to_string()),
err_details,
)
}
};
Err(status)
}
Ok(()) => Ok(tonic::Response::new(res)),
}
}
}

async fn save_index(
&self,
_request: tonic::Request<Empty>,
request: tonic::Request<Empty>,
) -> std::result::Result<tonic::Response<Empty>, tonic::Status> {
todo!()
println!("Recieved a request from {:?}", request.remote_addr());
let hostname = cargo::util::hostname()?;
let domain = hostname.to_str().unwrap();
let res = Empty {};
{
let mut s = self.s.write().await;
let result = s.save_index();
match result {
Err(err) => {
let metadata = HashMap::new();
let resource_type = self.resource_type.clone() + "/qbg.SaveIndex";
let resource_name = format!("{}: {}({})", self.api_name, self.name, self.ip);
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_resource_info(resource_type, resource_name, "", "");
let status = Status::with_error_details(
Code::Internal,
"SaveIndex API failed to save indices",
err_details,
);
Err(status)
}
Ok(()) => Ok(tonic::Response::new(res)),
}
}
}

#[doc = " Represent the creating and saving index RPC.\n"]
Expand All @@ -49,9 +132,18 @@ impl index_server::Index for super::Agent {
#[doc = " Represent the RPC to get the agent index information.\n"]
async fn index_info(
&self,
_request: tonic::Request<Empty>,
request: tonic::Request<Empty>,
) -> std::result::Result<tonic::Response<info::index::Count>, tonic::Status> {
todo!()
println!("Recieved a request from {:?}", request.remote_addr());
{
let s = self.s.read().await;
Ok(tonic::Response::new(info::index::Count {
stored: s.len(),
uncommitted: s.insert_vqueue_buffer_len() + s.delete_vqueue_buffer_len(),
indexing: s.is_indexing(),
saving: s.is_saving(),
}))
}
}

#[doc = " Represent the RPC to get the agent index detailed information.\n"]
Expand Down
117 changes: 115 additions & 2 deletions rust/bin/agent/src/handler/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,130 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
use algorithm::Error;
use prost::Message;
use proto::{
payload::v1::{insert, object},
vald::v1::insert_server,
};
use std::{collections::HashMap, string::String};
use tonic::{Code, Status};
use tonic_types::{ErrorDetails, FieldViolation, StatusExt};

#[tonic::async_trait]
impl insert_server::Insert for super::Agent {
async fn insert(
&self,
_request: tonic::Request<insert::Request>,
request: tonic::Request<insert::Request>,
) -> std::result::Result<tonic::Response<object::Location>, tonic::Status> {
todo!()
println!("Recieved a request from {:?}", request.remote_addr());
let req = request.get_ref();
let config = req.config.clone().unwrap();
let hostname = cargo::util::hostname()?;
let domain = hostname.to_str().unwrap();
{
let mut s = self.s.write().await;
let vec = req.vector.clone().unwrap();
if vec.vector.len() != s.get_dimension_size() {
let err = Error::IncompatibleDimensionSize {
got: vec.vector.len(),
want: s.get_dimension_size(),
};
let mut err_details = ErrorDetails::new();
let metadata = HashMap::new();
let resource_type = self.resource_type.clone() + "/qbg.Insert";
let resource_name = format!("{}: {}({})", self.api_name, self.name, self.ip);
err_details.set_error_info(err.to_string(), domain, metadata);
err_details
.set_request_info(vec.id, String::from_utf8(req.encode_to_vec()).unwrap());
err_details.set_bad_request(vec![FieldViolation::new(
"vector dimension size",
err.to_string(),
)]);
err_details.set_resource_info(resource_type, resource_name, "", "");
let status = Status::with_error_details(
Code::InvalidArgument,
"Insert API Incombatible Dimension Size detedted",
err_details,
);
return Err(status);
}
let result = s.insert(vec.id.clone(), vec.vector.clone(), config.timestamp);
match result {
Err(err) => {
let metadata = HashMap::new();
let resource_type = self.resource_type.clone() + "/qbg.Insert";
let resource_name = format!("{}: {}({})", self.api_name, self.name, self.ip);
let status = match err {
Error::FlushingIsInProgress {} => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_request_info(
vec.id,
String::from_utf8(req.encode_to_vec()).unwrap(),
);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(Code::Aborted, "Insert API aborted to process insert request due to flushing indices is in progress", err_details)
}
Error::UUIDAlreadyExists { id: _ } => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_request_info(
vec.id.clone(),
String::from_utf8(req.encode_to_vec()).unwrap(),
);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::AlreadyExists,
format!("Insert API uuid {} already exists", vec.id),
err_details,
)
}
Error::UUIDNotFound { id: _ } => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_request_info(
vec.id.clone(),
String::from_utf8(req.encode_to_vec()).unwrap(),
);
err_details.set_bad_request(vec![FieldViolation::new(
"uuid",
err.to_string(),
)]);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::InvalidArgument,
format!(
"Insert API invalid id: \"{}\" or vector: {:?} was given",
vec.id, vec.vector
),
err_details,
)
}
_ => {
let mut err_details = ErrorDetails::new();
err_details.set_error_info(err.to_string(), domain, metadata);
err_details.set_request_info(
vec.id,
String::from_utf8(req.encode_to_vec()).unwrap(),
);
err_details.set_resource_info(resource_type, resource_name, "", "");
Status::with_error_details(
Code::Unknown,
"failed to parse Insert gRPC error response",
err_details,
)
}
};
Err(status)
}
Ok(()) => Ok(tonic::Response::new(object::Location {
name: self.name.clone(),
uuid: vec.id,
ips: vec![self.ip.clone()],
})),
}
}
}

#[doc = " Server streaming response type for the StreamInsert method."]
Expand Down
Loading

0 comments on commit 7256b3e

Please sign in to comment.