Skip to content

Commit

Permalink
Wasm interconnection via yarn workpsaces
Browse files Browse the repository at this point in the history
  • Loading branch information
rorymalcolm committed Jan 27, 2024
1 parent 3234b89 commit 18d2e53
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 13 deletions.
3 changes: 2 additions & 1 deletion gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
},
"dependencies": {
"uuid": "^9.0.1",
"parquet-schema-validator": "0.0.0"
"parquet-schema-validator": "0.0.0",
"parquet-generator": "0.1.0"
}
}
1 change: 1 addition & 0 deletions gateway/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { validateJSONAgainstSchema } from 'parquet-schema-validator';
import { ParquetSchema } from 'parquet-types';
import { ValueResult, ErrorsToResponse, SafeJSONParse } from 'rerrors';
import { z } from 'zod';
import { generate_parquet } from 'parquet-generator';

export interface Env {
LAKESIDE_BUCKET: R2Bucket;
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"schema-manager",
"packages/parquet-types",
"packages/parquet-schema-validator",
"packages/rerrors"
"packages/rerrors",
"parquet-generator/pkg"
]
},
"installConfig": {
Expand Down
59 changes: 48 additions & 11 deletions parquet-generator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use parquet::{file::writer::SerializedFileWriter, schema::parser::parse_message_
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use wasm_bindgen::prelude::*;
use wasm_bindgen::Clamped;

#[derive(Debug, Serialize, Deserialize)]
struct ParquetSchema {
Expand Down Expand Up @@ -124,7 +125,7 @@ fn build_schema(schema: String) -> String {
None => Repetition::REQUIRED,
})
.with_length(match field.primitive_type {
ParquetPrimitiveType::FixedLenByteArray => 16,
ParquetPrimitiveType::FixedLenByteArray => 1024,
_ => 0,
})
.with_converted_type(match field.logical_type {
Expand All @@ -146,28 +147,64 @@ fn build_schema(schema: String) -> String {
String::from_utf8(buf).unwrap()
}

/// Generate a parquet file from a schema and a list of files which are JSON strings
/// that match the schema
///
/// # Arguments
///
/// * `schema`: A JSON representation of the schema, which is then parsed into a parquet schema
/// * `files`: A list of strings of JSON objects that match the schema
///
/// returns: Result<Clamped<Vec<u8, Global>>, JsValue>
/// A result that contains a Clamped<Vec<u8>> if successful, or a JsValue if not
/// The Clamped<Vec<u8>> is the parquet file as a byte array, in JavaScript it's a Uint8Array
/// The JsValue is an error message
///
/// # Examples
///
/// ```
///
/// ```
#[wasm_bindgen]
pub fn generate_parquet(schema: String, fields: Vec<String>) -> Result<(), JsValue> {
pub fn generate_parquet(schema: String, files: Vec<String>) -> Result<Clamped<Vec<u8>>, JsValue> {
let message_type = build_schema(schema);
let schema = Arc::new(parse_message_type(message_type.as_str()).unwrap());
let parsed_schema = parse_message_type(message_type.as_str());

let schema = match parsed_schema {
Ok(s) => Arc::new(s),
Err(_) => return Err(JsValue::from_str("Error parsing schema")),
};

let buffer = vec![];

let mut writer = SerializedFileWriter::new(buffer, schema, Default::default()).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();
let mut writer = match SerializedFileWriter::new(buffer, schema, Default::default()) {
Ok(w) => w,
Err(_) => return Err(JsValue::from_str("Error creating writer")),
};

let mut row_group_writer = match writer.next_row_group() {
Ok(rgw) => rgw,
Err(_) => return Err(JsValue::from_str("Error creating row group writer")),
};

while let Some(col_writer) = row_group_writer.next_column().unwrap() {
col_writer.close().unwrap()
while let Ok(Some(col_writer)) = row_group_writer.next_column() {
if col_writer.close().is_err() {
return Err(JsValue::from_str("Error closing column writer"));
}
}

row_group_writer.close().unwrap();
writer.close().unwrap();
if row_group_writer.close().is_err() {
return Err(JsValue::from_str("Error closing row group writer"));
}

Ok(())
return match writer.into_inner() {
Ok(bytes_buffer) => Ok(Clamped(bytes_buffer)),
Err(_) => Err(JsValue::from_str("Error closing writer")),
};
}

#[test]
fn test_build_schema() {
fn test_build_schema_basic() {
let schema = r#"
{
"fields": [
Expand Down

0 comments on commit 18d2e53

Please sign in to comment.