From 7bdba3531f82492b8d2d992296547cacd642e730 Mon Sep 17 00:00:00 2001 From: Rory Malcolm Date: Fri, 26 Jan 2024 23:47:25 +0000 Subject: [PATCH] reorg --- gateway/src/worker.ts | 2 +- parquet-generator/src/lib.rs | 12 +++++++++--- schema-manager/src/worker.ts | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/gateway/src/worker.ts b/gateway/src/worker.ts index b02c84f..00bceff 100644 --- a/gateway/src/worker.ts +++ b/gateway/src/worker.ts @@ -29,8 +29,8 @@ const getSchema = async (env: Env): Promise> => { errors: ['Schema is not valid JSON'], }; } - const parseResult = ParquetSchema.safeParse(schemaJSON.value); + const parseResult = ParquetSchema.safeParse(schemaJSON.value); if (parseResult.success) { return { success: true, diff --git a/parquet-generator/src/lib.rs b/parquet-generator/src/lib.rs index 36457e2..93ab92e 100644 --- a/parquet-generator/src/lib.rs +++ b/parquet-generator/src/lib.rs @@ -111,6 +111,7 @@ fn physical_type_matcher(parquet_primitive_type: ParquetPrimitiveType) -> Physic fn build_schema(schema: String) -> String { let schema = serde_json::from_str::(schema.as_str()).unwrap(); let mut type_vec: Vec> = vec![]; + for field in schema.fields { let type_builder = Type::primitive_type_builder( field.name.as_str(), @@ -133,28 +134,33 @@ fn build_schema(schema: String) -> String { let converted_type = type_builder.build().unwrap(); type_vec.push(Arc::new(converted_type)); } + let mut buf = Vec::new(); + let schema = Type::group_type_builder("schema") .with_fields(type_vec) .build() .unwrap(); printer::print_schema(&mut buf, &schema); - let string_schema = String::from_utf8(buf).unwrap(); - string_schema + + String::from_utf8(buf).unwrap() } #[wasm_bindgen] pub fn generate_parquet(schema: String, fields: Vec) -> Result<(), JsValue> { let message_type = build_schema(schema); + let schema = Arc::new(parse_message_type(message_type.as_str()).unwrap()); let buffer = vec![]; - let schema = Arc::new(parse_message_type(message_type.as_str()).unwrap()); + let mut writer = SerializedFileWriter::new(buffer, schema, Default::default()).unwrap(); let mut row_group_writer = writer.next_row_group().unwrap(); + while let Some(col_writer) = row_group_writer.next_column().unwrap() { // ... write values to a column writer col_writer.close().unwrap() } + row_group_writer.close().unwrap(); writer.close().unwrap(); diff --git a/schema-manager/src/worker.ts b/schema-manager/src/worker.ts index e2b0169..77ee076 100644 --- a/schema-manager/src/worker.ts +++ b/schema-manager/src/worker.ts @@ -16,8 +16,8 @@ async function processGETSchema(env: Env) { if (!schemaJSON.success) { return ErrorsToResponse(schemaJSON.errors); } - const parseResult = ParquetSchema.safeParse(schemaJSON.value); + const parseResult = ParquetSchema.safeParse(schemaJSON.value); if (parseResult.success) { return new Response(JSON.stringify({ schema: parseResult.data })); }