From 88fe5e15abb6446fe7718dd1590f02b39540b805 Mon Sep 17 00:00:00 2001 From: Neil Movva Date: Mon, 11 Sep 2023 21:37:34 +0000 Subject: [PATCH] fix workflow tests update standalone Spiral test server to use new JSON interface --- e2e-tests/api.ts | 8 ----- e2e-tests/tests/simple.ts | 7 +++-- js/bucket/bucket.ts | 41 ++++++++------------------ lib/server/Cargo.lock | 15 ++++++++++ lib/server/Cargo.toml | 3 +- lib/server/src/bin/server.rs | 57 ++++++++++++++++++++++++------------ lib/server/src/db/write.rs | 35 ++++++++-------------- python/tests/test_service.py | 2 +- 8 files changed, 86 insertions(+), 82 deletions(-) diff --git a/e2e-tests/api.ts b/e2e-tests/api.ts index 802cb67..10bb6fb 100644 --- a/e2e-tests/api.ts +++ b/e2e-tests/api.ts @@ -30,14 +30,6 @@ function generateKeys(n: number, seed: number = 0): string[] { ); } -// async function generateKVPairs(n: number, seed: number, itemSize: number): Promise<{ [key: string]: Uint8Array }> { -// const keys = generateKeys(n, seed); -// const kvPairs: { [key: string]: Uint8Array } = {}; -// keys.forEach(async key => { -// kvPairs[key] = await keyToValue(key, itemSize); -// }); -// return kvPairs; -// } async function generateKVPairs(n: number, seed: number, itemSize: number): Promise<{ [key: string]: Uint8Array }> { const keys = generateKeys(n, seed); diff --git a/e2e-tests/tests/simple.ts b/e2e-tests/tests/simple.ts index 33ed06e..fe99894 100644 --- a/e2e-tests/tests/simple.ts +++ b/e2e-tests/tests/simple.ts @@ -8,17 +8,20 @@ export default async function main(port: string) { console.log(bucket.metadata); + // buckets are bytes-in/bytes-out. SDK write() will automatically serialize as UTF-8. await bucket.write({ Ohio: 'Columbus', California: 'Sacramento' }); - let capital = await bucket.privateRead('Ohio'); + // but reads are always bytes-out, and must be decoded. + let capital = new TextDecoder().decode(await bucket.privateRead('Ohio')); if (capital !== 'Columbus') { throw 'Incorrect result.'; } - capital = await bucket.privateRead('California'); + // capital = await bucket.privateRead('California'); + capital = new TextDecoder().decode(await bucket.privateRead('California')); if (capital !== 'Sacramento') { throw 'Incorrect result.'; } diff --git a/js/bucket/bucket.ts b/js/bucket/bucket.ts index e10cc91..46ec9e3 100644 --- a/js/bucket/bucket.ts +++ b/js/bucket/bucket.ts @@ -175,33 +175,6 @@ export class Bucket { ); return endResults; - - // const queries: { key: string; queryData: Uint8Array }[] = []; - // for (const key of keys) { - // const rowIdx = this.lib.getRow(key); - // const queryData = this.lib.generateQuery(this.uuid, rowIdx); - // queries.push({ key, queryData }); - // } - - // const endResults = []; - // const batches = Math.ceil(queries.length / this.batchSize); - // for (let i = 0; i < batches; i++) { - // const queriesForBatch = queries.slice( - // i * this.batchSize, - // (i + 1) * this.batchSize - // ); - - // const queryBatch = serializeChunks(queriesForBatch.map(x => x.queryData)); - // const rawResultChunks = await this.getRawResponse(queryBatch); - // const rawResults = deserializeChunks(rawResultChunks); - - // const batchEndResults = await Promise.all( - // rawResults.map((r, i) => this.getEndResult(queriesForBatch[i].key, r)) - // ); - - // endResults.push(...batchEndResults); - // } - } private async performPrivateRead(key: string): Promise { @@ -357,10 +330,20 @@ export class Bucket { * 1024 UTF-8 bytes. */ async write( - keyValuePairs: { [key: string]: Uint8Array | null } + keyValuePairs: { [key: string]: Uint8Array | string | null } ) { this.ensureSpiral(); - await this.api.write(this.name, keyValuePairs); + // convert any string KV pairs to Uint8Array + const kvPairs: { [key: string]: Uint8Array | null } = {}; + for (const key in keyValuePairs) { + const value = keyValuePairs[key]; + if (!(value instanceof Uint8Array)) { + kvPairs[key] = new TextEncoder().encode(value); + } else { + kvPairs[key] = value; + } + } + await this.api.write(this.name, kvPairs); } /** diff --git a/lib/server/Cargo.lock b/lib/server/Cargo.lock index 870e24e..a01b523 100644 --- a/lib/server/Cargo.lock +++ b/lib/server/Cargo.lock @@ -937,6 +937,20 @@ name = "serde" version = "1.0.159" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c04e8343c3daeec41f58990b9d77068df31209f2af111e059e9fe9646693065" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.159" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c614d17805b093df4b147b51339e7e44bf05ef59fba1e45d83500bcfb4d8585" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.13", +] [[package]] name = "serde_json" @@ -1041,6 +1055,7 @@ dependencies = [ "rand", "rand_chacha", "rayon", + "serde", "serde_json", "sha2", "spiral-rs", diff --git a/lib/server/Cargo.toml b/lib/server/Cargo.toml index 86620c5..602386d 100644 --- a/lib/server/Cargo.toml +++ b/lib/server/Cargo.toml @@ -24,7 +24,8 @@ default = [] [dependencies] spiral-rs = { version = "0.2.1-alpha.2", path = "../spiral-rs" } rand = { version = "0.8.5", features = ["small_rng"] } -serde_json = "1.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0"} rayon = "1.5.2" rand_chacha = "0.3.1" diff --git a/lib/server/src/bin/server.rs b/lib/server/src/bin/server.rs index 16f6141..ef30f55 100644 --- a/lib/server/src/bin/server.rs +++ b/lib/server/src/bin/server.rs @@ -1,4 +1,5 @@ use actix_web::HttpServer; +use serde::Serialize; use spiral_rs::client::*; use spiral_rs::params::*; use spiral_rs::util::*; @@ -17,7 +18,6 @@ use uuid::Uuid; use actix_web::error::PayloadError; use actix_web::{get, post, web, App}; -use base64::{engine::general_purpose, Engine as _}; struct ServerState { params: &'static Params, @@ -49,7 +49,11 @@ async fn write(body: web::Bytes, data: web::Data) -> Result = kv_pairs + .iter() + .map(|(key, value)| (key.as_str(), value.as_slice())) + .collect(); + update_database(data.params, &kv_pairs_slices, &mut rows_mut, &mut db_mut); let mut version_mut = data.version.write().unwrap(); *version_mut += 1; @@ -59,19 +63,34 @@ async fn write(body: web::Bytes, data: web::Data) -> Result, ) -> Result { + // parse body as json str + let body_str = serde_json::from_str::(&body).unwrap(); + // decode body from base64 + let client_pub_params = base64::decode(&body_str).unwrap(); let mut pub_params_map_mut = data.pub_params.write().unwrap(); - assert_eq!(body.len(), data.params.setup_bytes()); - let pub_params = PublicParameters::deserialize(&data.params, &body); + assert_eq!(client_pub_params.len(), data.params.setup_bytes()); + let pub_params = PublicParameters::deserialize(&data.params, &client_pub_params); let uuid = Uuid::new_v4(); pub_params_map_mut.insert(uuid.to_string(), pub_params); - Ok(format!("{{\"uuid\":\"{}\"}}", uuid.to_string())) + // return uuid as JSON string + let uuid_json = serde_json::to_string(&UuidResponse { + uuid: uuid.to_string(), + }) + .unwrap(); + + Ok(uuid_json) } const UUID_V4_STR_BYTES: usize = 36; @@ -126,22 +145,22 @@ async fn private_read( body: web::Bytes, data: web::Data, ) -> Result { + // parse body as list of json strings + let query_strs = serde_json::from_slice::>(&body).unwrap(); + let mut out = Vec::new(); - let mut i = 0; - let num_chunks = u64::from_le_bytes(body[..8].try_into().unwrap()) as usize; - i += 8; - out.extend(u64::to_le_bytes(num_chunks as u64)); - for _ in 0..num_chunks { - let chunk_len = u64::from_le_bytes(body[i..i + 8].try_into().unwrap()) as usize; - i += 8; - let result = private_read_impl(&body[i..i + chunk_len], data.clone()).await?; - i += chunk_len; - - out.extend(u64::to_le_bytes(result.len() as u64)); - out.extend(result); + for query_str in query_strs.iter() { + // decode each query from base64 + let query_bytes = base64::decode(query_str).unwrap(); + let result = private_read_impl(&query_bytes, data.clone()).await?; + // store base64-encoded results in out + let result_str = base64::encode(&result); + out.push(result_str); } - Ok(general_purpose::STANDARD.encode(out)) + let out_json = serde_json::to_string(&out).unwrap(); + + Ok(out_json) } #[get("/meta")] diff --git a/lib/server/src/db/write.rs b/lib/server/src/db/write.rs index 6e6899e..cccf1fb 100644 --- a/lib/server/src/db/write.rs +++ b/lib/server/src/db/write.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, io::Read}; +use base64::{engine::general_purpose, Engine}; use bzip2::{read::BzEncoder, Compression}; use sha2::{Digest, Sha256}; use spiral_rs::params::Params; @@ -125,30 +126,20 @@ pub fn update_row(row: &mut Vec, key: &str, value: &[u8]) { } } -pub fn unwrap_kv_pairs(data: &[u8]) -> Vec<(&str, &[u8])> { +pub fn unwrap_kv_pairs(data: &[u8]) -> Vec<(String, Vec)> { let mut kv_pairs = Vec::new(); - let mut i = 0; - while i < data.len() { - // 1. Read key length. - let (key_len, key_len_bytes) = varint_decode(&data[i..]); - i += key_len_bytes; - - // 2. Read key. - let key_bytes = &data[i..i + key_len]; - i += key_len; - - // 3. Read value length. - let (value_len, value_len_bytes) = varint_decode(&data[i..]); - i += value_len_bytes; - - // 4. Read value. - let value_bytes = &data[i..i + value_len]; - i += value_len; - - // 5. Yield key/value pair. - let pair = (std::str::from_utf8(key_bytes).unwrap(), value_bytes); - kv_pairs.push(pair); + + // Parse the data as a JSON object + if let Ok(json_data) = serde_json::from_slice::>(data) { + for (key, base64_value) in json_data.iter() { + // Decode the Base64-encoded value + if let Ok(decoded_value) = base64::decode(base64_value) { + kv_pairs.push((key.clone(), decoded_value)); + } + } } + // print KV pairs + println!("kv_pairs: {:?}", kv_pairs); kv_pairs } diff --git a/python/tests/test_service.py b/python/tests/test_service.py index 1c160ae..2f4a161 100644 --- a/python/tests/test_service.py +++ b/python/tests/test_service.py @@ -43,7 +43,7 @@ def generateBucketName() -> str: async def test_e2e_async( endpoint: str, api_key: str, N: int = 4000, itemSize: int = 32 ): - client = blyss.AsyncClient({"endpoint": endpoint, "api_key": api_key}) + client = blyss.AsyncClient(api_key, endpoint) # generate random string for bucket name bucket_name = generateBucketName() await client.create(bucket_name, usage_hints={"maxItemSize": 40_000})