Skip to content

Commit

Permalink
fix workflow tests
Browse files Browse the repository at this point in the history
update standalone Spiral test server to use new JSON interface
  • Loading branch information
neilmovva committed Sep 11, 2023
1 parent 975c5d3 commit 88fe5e1
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 82 deletions.
8 changes: 0 additions & 8 deletions e2e-tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ function generateKeys(n: number, seed: number = 0): string[] {
);
}

// async function generateKVPairs(n: number, seed: number, itemSize: number): Promise<{ [key: string]: Uint8Array }> {
// const keys = generateKeys(n, seed);
// const kvPairs: { [key: string]: Uint8Array } = {};
// keys.forEach(async key => {
// kvPairs[key] = await keyToValue(key, itemSize);
// });
// return kvPairs;
// }

async function generateKVPairs(n: number, seed: number, itemSize: number): Promise<{ [key: string]: Uint8Array }> {
const keys = generateKeys(n, seed);
Expand Down
7 changes: 5 additions & 2 deletions e2e-tests/tests/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ export default async function main(port: string) {

console.log(bucket.metadata);

// buckets are bytes-in/bytes-out. SDK write() will automatically serialize as UTF-8.
await bucket.write({
Ohio: 'Columbus',
California: 'Sacramento'
});

let capital = await bucket.privateRead('Ohio');
// but reads are always bytes-out, and must be decoded.
let capital = new TextDecoder().decode(await bucket.privateRead('Ohio'));
if (capital !== 'Columbus') {
throw 'Incorrect result.';
}

capital = await bucket.privateRead('California');
// capital = await bucket.privateRead('California');
capital = new TextDecoder().decode(await bucket.privateRead('California'));
if (capital !== 'Sacramento') {
throw 'Incorrect result.';
}
Expand Down
41 changes: 12 additions & 29 deletions js/bucket/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,33 +175,6 @@ export class Bucket {
);

return endResults;

// const queries: { key: string; queryData: Uint8Array }[] = [];
// for (const key of keys) {
// const rowIdx = this.lib.getRow(key);
// const queryData = this.lib.generateQuery(this.uuid, rowIdx);
// queries.push({ key, queryData });
// }

// const endResults = [];
// const batches = Math.ceil(queries.length / this.batchSize);
// for (let i = 0; i < batches; i++) {
// const queriesForBatch = queries.slice(
// i * this.batchSize,
// (i + 1) * this.batchSize
// );

// const queryBatch = serializeChunks(queriesForBatch.map(x => x.queryData));
// const rawResultChunks = await this.getRawResponse(queryBatch);
// const rawResults = deserializeChunks(rawResultChunks);

// const batchEndResults = await Promise.all(
// rawResults.map((r, i) => this.getEndResult(queriesForBatch[i].key, r))
// );

// endResults.push(...batchEndResults);
// }

}

private async performPrivateRead(key: string): Promise<any> {
Expand Down Expand Up @@ -357,10 +330,20 @@ export class Bucket {
* 1024 UTF-8 bytes.
*/
async write(
keyValuePairs: { [key: string]: Uint8Array | null }
keyValuePairs: { [key: string]: Uint8Array | string | null }
) {
this.ensureSpiral();
await this.api.write(this.name, keyValuePairs);
// convert any string KV pairs to Uint8Array
const kvPairs: { [key: string]: Uint8Array | null } = {};
for (const key in keyValuePairs) {
const value = keyValuePairs[key];
if (!(value instanceof Uint8Array)) {
kvPairs[key] = new TextEncoder().encode(value);
} else {
kvPairs[key] = value;
}
}
await this.api.write(this.name, kvPairs);
}

/**
Expand Down
15 changes: 15 additions & 0 deletions lib/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion lib/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ default = []
[dependencies]
spiral-rs = { version = "0.2.1-alpha.2", path = "../spiral-rs" }
rand = { version = "0.8.5", features = ["small_rng"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0"}
rayon = "1.5.2"
rand_chacha = "0.3.1"

Expand Down
57 changes: 38 additions & 19 deletions lib/server/src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use actix_web::HttpServer;
use serde::Serialize;
use spiral_rs::client::*;
use spiral_rs::params::*;
use spiral_rs::util::*;
Expand All @@ -17,7 +18,6 @@ use uuid::Uuid;

use actix_web::error::PayloadError;
use actix_web::{get, post, web, App};
use base64::{engine::general_purpose, Engine as _};

struct ServerState {
params: &'static Params,
Expand Down Expand Up @@ -49,7 +49,11 @@ async fn write(body: web::Bytes, data: web::Data<ServerState>) -> Result<String,
let mut db_mut = data.db.write().unwrap();

let kv_pairs = unwrap_kv_pairs(&body);
update_database(data.params, &kv_pairs, &mut rows_mut, &mut db_mut);
let kv_pairs_slices: Vec<(&str, &[u8])> = kv_pairs
.iter()
.map(|(key, value)| (key.as_str(), value.as_slice()))
.collect();
update_database(data.params, &kv_pairs_slices, &mut rows_mut, &mut db_mut);
let mut version_mut = data.version.write().unwrap();
*version_mut += 1;

Expand All @@ -59,19 +63,34 @@ async fn write(body: web::Bytes, data: web::Data<ServerState>) -> Result<String,
))
}

#[derive(Serialize)]
pub struct UuidResponse {
pub uuid: String,
}

#[post("/setup")]
async fn setup(
body: web::Bytes,
body: String,
data: web::Data<ServerState>,
) -> Result<String, actix_web::error::Error> {
// parse body as json str
let body_str = serde_json::from_str::<String>(&body).unwrap();
// decode body from base64
let client_pub_params = base64::decode(&body_str).unwrap();
let mut pub_params_map_mut = data.pub_params.write().unwrap();
assert_eq!(body.len(), data.params.setup_bytes());
let pub_params = PublicParameters::deserialize(&data.params, &body);
assert_eq!(client_pub_params.len(), data.params.setup_bytes());
let pub_params = PublicParameters::deserialize(&data.params, &client_pub_params);

let uuid = Uuid::new_v4();
pub_params_map_mut.insert(uuid.to_string(), pub_params);

Ok(format!("{{\"uuid\":\"{}\"}}", uuid.to_string()))
// return uuid as JSON string
let uuid_json = serde_json::to_string(&UuidResponse {
uuid: uuid.to_string(),
})
.unwrap();

Ok(uuid_json)
}

const UUID_V4_STR_BYTES: usize = 36;
Expand Down Expand Up @@ -126,22 +145,22 @@ async fn private_read(
body: web::Bytes,
data: web::Data<ServerState>,
) -> Result<String, actix_web::error::Error> {
// parse body as list of json strings
let query_strs = serde_json::from_slice::<Vec<String>>(&body).unwrap();

let mut out = Vec::new();
let mut i = 0;
let num_chunks = u64::from_le_bytes(body[..8].try_into().unwrap()) as usize;
i += 8;
out.extend(u64::to_le_bytes(num_chunks as u64));
for _ in 0..num_chunks {
let chunk_len = u64::from_le_bytes(body[i..i + 8].try_into().unwrap()) as usize;
i += 8;
let result = private_read_impl(&body[i..i + chunk_len], data.clone()).await?;
i += chunk_len;

out.extend(u64::to_le_bytes(result.len() as u64));
out.extend(result);
for query_str in query_strs.iter() {
// decode each query from base64
let query_bytes = base64::decode(query_str).unwrap();
let result = private_read_impl(&query_bytes, data.clone()).await?;
// store base64-encoded results in out
let result_str = base64::encode(&result);
out.push(result_str);
}

Ok(general_purpose::STANDARD.encode(out))
let out_json = serde_json::to_string(&out).unwrap();

Ok(out_json)
}

#[get("/meta")]
Expand Down
35 changes: 13 additions & 22 deletions lib/server/src/db/write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, io::Read};

use base64::{engine::general_purpose, Engine};
use bzip2::{read::BzEncoder, Compression};
use sha2::{Digest, Sha256};
use spiral_rs::params::Params;
Expand Down Expand Up @@ -125,30 +126,20 @@ pub fn update_row(row: &mut Vec<u8>, key: &str, value: &[u8]) {
}
}

pub fn unwrap_kv_pairs(data: &[u8]) -> Vec<(&str, &[u8])> {
pub fn unwrap_kv_pairs(data: &[u8]) -> Vec<(String, Vec<u8>)> {
let mut kv_pairs = Vec::new();
let mut i = 0;
while i < data.len() {
// 1. Read key length.
let (key_len, key_len_bytes) = varint_decode(&data[i..]);
i += key_len_bytes;

// 2. Read key.
let key_bytes = &data[i..i + key_len];
i += key_len;

// 3. Read value length.
let (value_len, value_len_bytes) = varint_decode(&data[i..]);
i += value_len_bytes;

// 4. Read value.
let value_bytes = &data[i..i + value_len];
i += value_len;

// 5. Yield key/value pair.
let pair = (std::str::from_utf8(key_bytes).unwrap(), value_bytes);
kv_pairs.push(pair);

// Parse the data as a JSON object
if let Ok(json_data) = serde_json::from_slice::<HashMap<String, String>>(data) {
for (key, base64_value) in json_data.iter() {
// Decode the Base64-encoded value
if let Ok(decoded_value) = base64::decode(base64_value) {
kv_pairs.push((key.clone(), decoded_value));
}
}
}
// print KV pairs
println!("kv_pairs: {:?}", kv_pairs);

kv_pairs
}
Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def generateBucketName() -> str:
async def test_e2e_async(
endpoint: str, api_key: str, N: int = 4000, itemSize: int = 32
):
client = blyss.AsyncClient({"endpoint": endpoint, "api_key": api_key})
client = blyss.AsyncClient(api_key, endpoint)
# generate random string for bucket name
bucket_name = generateBucketName()
await client.create(bucket_name, usage_hints={"maxItemSize": 40_000})
Expand Down

0 comments on commit 88fe5e1

Please sign in to comment.