diff --git a/Cargo.lock b/Cargo.lock index 2f939044f..33bc57c74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -333,13 +333,14 @@ dependencies = [ [[package]] name = "arrow-json" version = "51.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=51.0.0/json#f0d3bd0f5ac37066ad9617fdae1316bf661c20d4" +source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=51.0.0/json#2c95ca0a0f5d5498fd2b958b846ebd2ed0550b31" dependencies = [ "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", + "base64 0.22.1", "chrono", "half", "indexmap 2.2.6", diff --git a/Cargo.toml b/Cargo.toml index 5933c7867..c7cac10b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,17 +22,14 @@ members = [ "crates/arroyo-udf/arroyo-udf-common", "crates/arroyo-udf/arroyo-udf-plugin", "crates/arroyo-udf/arroyo-udf-host", + "crates/arroyo-udf/arroyo-udf-macros", "crates/arroyo-worker", "crates/copy-artifacts", - "crates/integ", "crates/arroyo-udf/arroyo-udf-macros", + "crates/integ", ] resolver = "2" -exclude = [ - "build_dir", -] - [workspace.dependencies] tonic = { version = "0.11" } tonic-build = { version = "0.11" } diff --git a/crates/arroyo-api/src/connection_tables.rs b/crates/arroyo-api/src/connection_tables.rs index 727774105..25d9b172b 100644 --- a/crates/arroyo-api/src/connection_tables.rs +++ b/crates/arroyo-api/src/connection_tables.rs @@ -468,6 +468,7 @@ pub(crate) async fn expand_schema( } Format::Parquet(_) => Ok(schema), Format::RawString(_) => Ok(schema), + Format::RawBytes(_) => Ok(schema), } } diff --git a/crates/arroyo-api/src/lib.rs b/crates/arroyo-api/src/lib.rs index 633eb99d5..7620e34ac 100644 --- a/crates/arroyo-api/src/lib.rs +++ b/crates/arroyo-api/src/lib.rs @@ -321,6 +321,7 @@ impl IntoResponse for HttpError { AvroFormat, ParquetFormat, RawStringFormat, + RawBytesFormat, TimestampFormat, Framing, FramingMethod, diff --git a/crates/arroyo-connectors/src/filesystem/source.rs b/crates/arroyo-connectors/src/filesystem/source.rs index 505d4bb34..39551282f 100644 --- a/crates/arroyo-connectors/src/filesystem/source.rs +++ b/crates/arroyo-connectors/src/filesystem/source.rs @@ -316,6 +316,7 @@ impl FileSystemSourceFunc { .await } Format::RawString(_) => todo!(), + Format::RawBytes(_) => todo!(), } } diff --git a/crates/arroyo-connectors/src/kafka/mod.rs b/crates/arroyo-connectors/src/kafka/mod.rs index 7d29f5702..720e9441b 100644 --- a/crates/arroyo-connectors/src/kafka/mod.rs +++ b/crates/arroyo-connectors/src/kafka/mod.rs @@ -659,6 +659,9 @@ impl KafkaTester { String::from_utf8(msg).map_err(|e| anyhow!("Failed to parse message as UTF-8: {:?}. Ensure that the format and schema type are correct.", e))?; } + Format::RawBytes(_) => { + // all bytes are valid + } }; Ok(()) diff --git a/crates/arroyo-connectors/src/preview/operator.rs b/crates/arroyo-connectors/src/preview/operator.rs index 766d74250..ffe6dce19 100644 --- a/crates/arroyo-connectors/src/preview/operator.rs +++ b/crates/arroyo-connectors/src/preview/operator.rs @@ -1,6 +1,5 @@ use arrow::array::{RecordBatch, TimestampNanosecondArray}; -#[allow(deprecated)] -use arrow::json::writer::record_batches_to_json_rows; +use arrow::json::writer::record_batch_to_vec; use std::time::SystemTime; use arroyo_operator::context::ArrowContext; @@ -32,19 +31,22 @@ impl ArrowOperator for PreviewSink { ); } - async fn process_batch(&mut self, record_batch: RecordBatch, ctx: &mut ArrowContext) { - let timestamp_column = record_batch - .column(ctx.in_schemas[0].timestamp_index) + async fn process_batch(&mut self, mut batch: RecordBatch, ctx: &mut ArrowContext) { + let ts = ctx.in_schemas[0].timestamp_index; + let timestamp_column = batch + .column(ts) .as_any() .downcast_ref::() - .unwrap(); + .unwrap() + .clone(); - #[allow(deprecated)] - let json_rows = record_batches_to_json_rows(&[&record_batch]).unwrap(); - for (mut map, timestamp) in json_rows.into_iter().zip(timestamp_column.iter()) { - map.remove("_timestamp"); - let value = serde_json::to_string(&map).unwrap(); + batch.remove_column(ts); + + let rows = record_batch_to_vec(&batch, true, arrow::json::writer::TimestampFormat::RFC3339) + .unwrap(); + for (value, timestamp) in rows.into_iter().zip(timestamp_column.iter()) { + let s = String::from_utf8(value).unwrap(); self.client .as_mut() .unwrap() @@ -57,7 +59,7 @@ impl ArrowOperator for PreviewSink { .map(|nanos| from_nanos(nanos as u128)) .unwrap_or_else(SystemTime::now), ), - value, + value: s, done: false, }) .await diff --git a/crates/arroyo-formats/src/de.rs b/crates/arroyo-formats/src/de.rs index f6d826d00..3fd1339cc 100644 --- a/crates/arroyo-formats/src/de.rs +++ b/crates/arroyo-formats/src/de.rs @@ -1,6 +1,9 @@ use crate::avro::de; use arrow::compute::kernels; -use arrow_array::builder::{ArrayBuilder, StringBuilder, TimestampNanosecondBuilder}; +use arrow_array::builder::{ + ArrayBuilder, GenericByteBuilder, StringBuilder, TimestampNanosecondBuilder, +}; +use arrow_array::types::GenericBinaryType; use arrow_array::RecordBatch; use arroyo_rpc::df::ArroyoSchema; use arroyo_rpc::formats::{AvroFormat, BadData, Format, Framing, FramingMethod, JsonFormat}; @@ -209,6 +212,10 @@ impl ArrowDeserializer { self.deserialize_raw_string(buffer, msg); add_timestamp(buffer, self.schema.timestamp_index, timestamp); } + Format::RawBytes(_) => { + self.deserialize_raw_bytes(buffer, msg); + add_timestamp(buffer, self.schema.timestamp_index, timestamp); + } Format::Json(json) => { let msg = if json.confluent_schema_registry { &msg[5..] @@ -316,6 +323,19 @@ impl ArrowDeserializer { .append_value(String::from_utf8_lossy(msg)); } + fn deserialize_raw_bytes(&mut self, buffer: &mut [Box], msg: &[u8]) { + let (col, _) = self + .schema + .schema + .column_with_name("value") + .expect("no 'value' column for RawBytes format"); + buffer[col] + .as_any_mut() + .downcast_mut::>>() + .expect("'value' column has incorrect type") + .append_value(msg); + } + pub fn bad_data(&self) -> &BadData { &self.bad_data } @@ -338,11 +358,13 @@ mod tests { use crate::de::{ArrowDeserializer, FramingIterator}; use arrow_array::builder::{make_builder, ArrayBuilder}; use arrow_array::cast::AsArray; - use arrow_array::types::{Int64Type, TimestampNanosecondType}; + use arrow_array::types::{GenericBinaryType, Int64Type, TimestampNanosecondType}; + use arrow_array::RecordBatch; use arrow_schema::{Schema, TimeUnit}; use arroyo_rpc::df::ArroyoSchema; use arroyo_rpc::formats::{ BadData, Format, Framing, FramingMethod, JsonFormat, NewlineDelimitedFraming, + RawBytesFormat, }; use arroyo_types::{to_nanos, SourceError}; use serde_json::json; @@ -521,4 +543,54 @@ mod tests { assert!(matches!(err, SourceError::BadData { .. })); } + + #[tokio::test] + async fn test_raw_bytes() { + let schema = Arc::new(Schema::new(vec![ + arrow_schema::Field::new("value", arrow_schema::DataType::Binary, false), + arrow_schema::Field::new( + "_timestamp", + arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + ])); + + let mut arrays: Vec<_> = schema + .fields + .iter() + .map(|f| make_builder(f.data_type(), 16)) + .collect(); + + let arroyo_schema = ArroyoSchema::from_schema_unkeyed(schema.clone()).unwrap(); + + let mut deserializer = ArrowDeserializer::new( + Format::RawBytes(RawBytesFormat {}), + arroyo_schema, + None, + BadData::Fail {}, + ); + + let time = SystemTime::now(); + let result = deserializer + .deserialize_slice(&mut arrays, &vec![0, 1, 2, 3, 4, 5], time) + .await; + assert!(result.is_empty()); + + let arrays: Vec<_> = arrays.into_iter().map(|mut a| a.finish()).collect(); + let batch = RecordBatch::try_new(schema, arrays).unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!( + batch.columns()[0] + .as_bytes::>() + .value(0), + &[0, 1, 2, 3, 4, 5] + ); + assert_eq!( + batch.columns()[1] + .as_primitive::() + .value(0), + to_nanos(time) as i64 + ); + } } diff --git a/crates/arroyo-formats/src/ser.rs b/crates/arroyo-formats/src/ser.rs index 3cff774f6..212b7f3d7 100644 --- a/crates/arroyo-formats/src/ser.rs +++ b/crates/arroyo-formats/src/ser.rs @@ -1,10 +1,13 @@ use crate::avro::schema; use crate::{avro, json}; use arrow_array::cast::AsArray; +use arrow_array::types::GenericBinaryType; use arrow_array::RecordBatch; use arrow_json::writer::record_batch_to_vec; use arrow_schema::{DataType, Field}; -use arroyo_rpc::formats::{AvroFormat, Format, JsonFormat, RawStringFormat, TimestampFormat}; +use arroyo_rpc::formats::{ + AvroFormat, Format, JsonFormat, RawBytesFormat, RawStringFormat, TimestampFormat, +}; use arroyo_rpc::TIMESTAMP_FIELD; use serde_json::Value; use std::sync::Arc; @@ -78,6 +81,7 @@ impl ArrowSerializer { Format::Avro(avro) => self.serialize_avro(avro, &batch), Format::Parquet(_) => todo!("parquet"), Format::RawString(RawStringFormat {}) => self.serialize_raw_string(&batch), + Format::RawBytes(RawBytesFormat {}) => self.serialize_raw_bytes(&batch), } } @@ -136,6 +140,7 @@ impl ArrowSerializer { } })) } + fn serialize_raw_string( &self, batch: &RecordBatch, @@ -161,6 +166,28 @@ impl ArrowSerializer { Box::new(values.into_iter()) } + fn serialize_raw_bytes(&self, batch: &RecordBatch) -> Box> + Send> { + let value_idx = batch.schema().index_of("value").unwrap_or_else(|_| { + panic!( + "invalid schema for raw_string serializer: {}; a VALUE column is required", + batch.schema() + ) + }); + + if *batch.schema().field(value_idx).data_type() != DataType::Binary { + panic!("invalid schema for raw_string serializer: {}; a must have a column VALUE of type BYTEA", batch.schema()); + } + + let values: Vec> = batch + .column(value_idx) + .as_bytes::>() + .iter() + .map(|v| v.map(|v| v.to_vec()).unwrap_or_default()) + .collect(); + + Box::new(values.into_iter()) + } + fn serialize_avro( &self, format: &AvroFormat, @@ -213,7 +240,7 @@ mod tests { use crate::ser::ArrowSerializer; use arrow_array::builder::TimestampNanosecondBuilder; use arrow_schema::{Schema, TimeUnit}; - use arroyo_rpc::formats::{Format, RawStringFormat, TimestampFormat}; + use arroyo_rpc::formats::{Format, RawBytesFormat, RawStringFormat, TimestampFormat}; use arroyo_types::to_nanos; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -258,6 +285,44 @@ mod tests { assert_eq!(iter.next(), None); } + #[test] + fn test_raw_bytes() { + let mut serializer = ArrowSerializer::new(Format::RawBytes(RawBytesFormat {})); + + let data = [b"0123".to_vec(), b"hello".to_vec(), vec![0, 1, 2, 4]]; + let ts: Vec<_> = data + .iter() + .enumerate() + .map(|(i, _)| to_nanos(SystemTime::now() + Duration::from_secs(i as u64)) as i64) + .collect(); + + let schema = Arc::new(Schema::new(vec![ + arrow_schema::Field::new("value", arrow_schema::DataType::Binary, false), + arrow_schema::Field::new( + "_timestamp", + arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + ])); + + let batch = arrow_array::RecordBatch::try_new( + schema, + vec![ + Arc::new(arrow_array::BinaryArray::from( + data.iter().map(|t| t.as_slice()).collect::>(), + )), + Arc::new(arrow_array::TimestampNanosecondArray::from(ts)), + ], + ) + .unwrap(); + + let mut iter = serializer.serialize(&batch); + assert_eq!(iter.next().unwrap(), b"0123"); + assert_eq!(iter.next().unwrap(), b"hello"); + assert_eq!(iter.next().unwrap(), vec![0, 1, 2, 4]); + assert_eq!(iter.next(), None); + } + #[test] fn test_json() { let mut serializer = ArrowSerializer::new(Format::Json(arroyo_rpc::formats::JsonFormat { diff --git a/crates/arroyo-rpc/src/formats.rs b/crates/arroyo-rpc/src/formats.rs index e7efb4d6f..ceae9c176 100644 --- a/crates/arroyo-rpc/src/formats.rs +++ b/crates/arroyo-rpc/src/formats.rs @@ -103,6 +103,10 @@ impl JsonFormat { #[serde(rename_all = "camelCase")] pub struct RawStringFormat {} +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct RawBytesFormat {} + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, ToSchema)] pub struct ConfluentSchemaRegistryConfig { endpoint: String, @@ -233,6 +237,7 @@ pub enum Format { Avro(AvroFormat), Parquet(ParquetFormat), RawString(RawStringFormat), + RawBytes(RawBytesFormat), } impl Format { @@ -247,6 +252,7 @@ impl Format { "protobuf" => return Err("protobuf is not yet supported".to_string()), "avro" => Format::Avro(AvroFormat::from_opts(opts)?), "raw_string" => Format::RawString(RawStringFormat {}), + "raw_bytes" => Format::RawBytes(RawBytesFormat {}), "parquet" => Format::Parquet(ParquetFormat {}), f => return Err(format!("Unknown format '{}'", f)), })) @@ -256,6 +262,7 @@ impl Format { match self { Format::Json(JsonFormat { debezium: true, .. }) => true, Format::Json(_) | Format::Avro(_) | Format::Parquet(_) | Format::RawString(_) => false, + Format::RawBytes(_) => false, } } } diff --git a/webui/src/gen/api-types.ts b/webui/src/gen/api-types.ts index 313658150..69bce35fa 100644 --- a/webui/src/gen/api-types.ts +++ b/webui/src/gen/api-types.ts @@ -11,203 +11,114 @@ type OneOf = T extends [infer Only] ? Only : T extends [infer A export interface paths { "/v1/connection_profiles": { - /** - * List all connection profiles - * @description List all connection profiles - */ + /** List all connection profiles */ get: operations["get_connection_profiles"]; - /** - * Create connection profile - * @description Create connection profile - */ + /** Create connection profile */ post: operations["create_connection_profile"]; }; "/v1/connection_profiles/test": { - /** - * Test connection profile - * @description Test connection profile - */ + /** Test connection profile */ post: operations["test_connection_profile"]; }; "/v1/connection_profiles/{id}": { - /** - * Delete a Connection Profile - * @description Delete a Connection Profile - */ + /** Delete a Connection Profile */ delete: operations["delete_connection_profile"]; }; "/v1/connection_profiles/{id}/autocomplete": { - /** - * Get autocomplete suggestions for a connection profile - * @description Get autocomplete suggestions for a connection profile - */ + /** Get autocomplete suggestions for a connection profile */ get: operations["get_connection_profile_autocomplete"]; }; "/v1/connection_tables": { - /** - * List all connection tables - * @description List all connection tables - */ + /** List all connection tables */ get: operations["get_connection_tables"]; - /** - * Create a new connection table - * @description Create a new connection table - */ + /** Create a new connection table */ post: operations["create_connection_table"]; }; "/v1/connection_tables/schemas/test": { - /** - * Test a Connection Schema - * @description Test a Connection Schema - */ + /** Test a Connection Schema */ post: operations["test_schema"]; }; "/v1/connection_tables/test": { - /** - * Test a Connection Table - * @description Test a Connection Table - */ + /** Test a Connection Table */ post: operations["test_connection_table"]; }; "/v1/connection_tables/{id}": { - /** - * Delete a Connection Table - * @description Delete a Connection Table - */ + /** Delete a Connection Table */ delete: operations["delete_connection_table"]; }; "/v1/connectors": { - /** - * List all connectors - * @description List all connectors - */ + /** List all connectors */ get: operations["get_connectors"]; }; "/v1/jobs": { - /** - * Get all jobs - * @description Get all jobs - */ + /** Get all jobs */ get: operations["get_jobs"]; }; "/v1/ping": { - /** - * Ping endpoint - * @description Ping endpoint - */ + /** Ping endpoint */ get: operations["ping"]; }; "/v1/pipelines": { - /** - * List all pipelines - * @description List all pipelines - */ + /** List all pipelines */ get: operations["get_pipelines"]; /** * Create a new pipeline - * @description Create a new pipeline - * - * The API will create a single job for the pipeline. + * @description The API will create a single job for the pipeline. */ post: operations["create_pipeline"]; }; "/v1/pipelines/validate_query": { - /** - * Validate a query and return pipeline graph - * @description Validate a query and return pipeline graph - */ + /** Validate a query and return pipeline graph */ post: operations["validate_query"]; }; "/v1/pipelines/{id}": { - /** - * Get a single pipeline - * @description Get a single pipeline - */ + /** Get a single pipeline */ get: operations["get_pipeline"]; - /** - * Delete a pipeline - * @description Delete a pipeline - */ + /** Delete a pipeline */ delete: operations["delete_pipeline"]; - /** - * Update a pipeline - * @description Update a pipeline - */ + /** Update a pipeline */ patch: operations["patch_pipeline"]; }; "/v1/pipelines/{id}/jobs": { - /** - * List a pipeline's jobs - * @description List a pipeline's jobs - */ + /** List a pipeline's jobs */ get: operations["get_pipeline_jobs"]; }; "/v1/pipelines/{id}/restart": { - /** - * Restart a pipeline - * @description Restart a pipeline - */ + /** Restart a pipeline */ post: operations["restart_pipeline"]; }; "/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints": { - /** - * List a job's checkpoints - * @description List a job's checkpoints - */ + /** List a job's checkpoints */ get: operations["get_job_checkpoints"]; }; "/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints/{epoch}/operator_checkpoint_groups": { - /** - * Get a checkpoint's details - * @description Get a checkpoint's details - */ + /** Get a checkpoint's details */ get: operations["get_checkpoint_details"]; }; "/v1/pipelines/{pipeline_id}/jobs/{job_id}/errors": { - /** - * List a job's error messages - * @description List a job's error messages - */ + /** List a job's error messages */ get: operations["get_job_errors"]; }; "/v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups": { - /** - * Get a job's metrics - * @description Get a job's metrics - */ + /** Get a job's metrics */ get: operations["get_operator_metric_groups"]; }; "/v1/pipelines/{pipeline_id}/jobs/{job_id}/output": { - /** - * Subscribe to a job's output - * @description Subscribe to a job's output - */ + /** Subscribe to a job's output */ get: operations["get_job_output"]; }; "/v1/udfs": { - /** - * Get Global UDFs - * @description Get Global UDFs - */ + /** Get Global UDFs */ get: operations["get_udfs"]; - /** - * Create a global UDF - * @description Create a global UDF - */ + /** Create a global UDF */ post: operations["create_udf"]; }; "/v1/udfs/validate": { - /** - * Validate UDFs - * @description Validate UDFs - */ + /** Validate UDFs */ post: operations["validate_udf"]; }; "/v1/udfs/{id}": { - /** - * Delete UDF - * @description Delete UDF - */ + /** Delete UDF */ delete: operations["delete_udf"]; }; } @@ -341,6 +252,8 @@ export interface components { parquet: components["schemas"]["ParquetFormat"]; }, { raw_string: components["schemas"]["RawStringFormat"]; + }, { + raw_bytes: components["schemas"]["RawBytesFormat"]; }]>; Framing: { method: components["schemas"]["FramingMethod"]; @@ -516,6 +429,7 @@ export interface components { errors: (string)[]; graph?: components["schemas"]["PipelineGraph"] | null; }; + RawBytesFormat: Record; RawStringFormat: Record; SchemaDefinition: OneOf<[{ json_schema: string; @@ -591,10 +505,7 @@ export type external = Record; export interface operations { - /** - * List all connection profiles - * @description List all connection profiles - */ + /** List all connection profiles */ get_connection_profiles: { responses: { /** @description Got connections collection */ @@ -605,10 +516,7 @@ export interface operations { }; }; }; - /** - * Create connection profile - * @description Create connection profile - */ + /** Create connection profile */ create_connection_profile: { requestBody: { content: { @@ -624,10 +532,7 @@ export interface operations { }; }; }; - /** - * Test connection profile - * @description Test connection profile - */ + /** Test connection profile */ test_connection_profile: { requestBody: { content: { @@ -643,10 +548,7 @@ export interface operations { }; }; }; - /** - * Delete a Connection Profile - * @description Delete a Connection Profile - */ + /** Delete a Connection Profile */ delete_connection_profile: { parameters: { path: { @@ -659,10 +561,7 @@ export interface operations { 200: never; }; }; - /** - * Get autocomplete suggestions for a connection profile - * @description Get autocomplete suggestions for a connection profile - */ + /** Get autocomplete suggestions for a connection profile */ get_connection_profile_autocomplete: { parameters: { path: { @@ -679,10 +578,7 @@ export interface operations { }; }; }; - /** - * List all connection tables - * @description List all connection tables - */ + /** List all connection tables */ get_connection_tables: { parameters: { query?: { @@ -699,10 +595,7 @@ export interface operations { }; }; }; - /** - * Create a new connection table - * @description Create a new connection table - */ + /** Create a new connection table */ create_connection_table: { requestBody: { content: { @@ -718,10 +611,7 @@ export interface operations { }; }; }; - /** - * Test a Connection Schema - * @description Test a Connection Schema - */ + /** Test a Connection Schema */ test_schema: { requestBody: { content: { @@ -733,10 +623,7 @@ export interface operations { 200: never; }; }; - /** - * Test a Connection Table - * @description Test a Connection Table - */ + /** Test a Connection Table */ test_connection_table: { requestBody: { content: { @@ -748,10 +635,7 @@ export interface operations { 200: never; }; }; - /** - * Delete a Connection Table - * @description Delete a Connection Table - */ + /** Delete a Connection Table */ delete_connection_table: { parameters: { path: { @@ -764,10 +648,7 @@ export interface operations { 200: never; }; }; - /** - * List all connectors - * @description List all connectors - */ + /** List all connectors */ get_connectors: { responses: { /** @description Got connectors collection */ @@ -778,10 +659,7 @@ export interface operations { }; }; }; - /** - * Get all jobs - * @description Get all jobs - */ + /** Get all jobs */ get_jobs: { responses: { /** @description Get all jobs */ @@ -792,20 +670,14 @@ export interface operations { }; }; }; - /** - * Ping endpoint - * @description Ping endpoint - */ + /** Ping endpoint */ ping: { responses: { /** @description Ping endpoint */ 200: never; }; }; - /** - * List all pipelines - * @description List all pipelines - */ + /** List all pipelines */ get_pipelines: { parameters: { query?: { @@ -824,9 +696,7 @@ export interface operations { }; /** * Create a new pipeline - * @description Create a new pipeline - * - * The API will create a single job for the pipeline. + * @description The API will create a single job for the pipeline. */ create_pipeline: { requestBody: { @@ -849,10 +719,7 @@ export interface operations { }; }; }; - /** - * Validate a query and return pipeline graph - * @description Validate a query and return pipeline graph - */ + /** Validate a query and return pipeline graph */ validate_query: { requestBody: { content: { @@ -868,10 +735,7 @@ export interface operations { }; }; }; - /** - * Get a single pipeline - * @description Get a single pipeline - */ + /** Get a single pipeline */ get_pipeline: { parameters: { path: { @@ -888,10 +752,7 @@ export interface operations { }; }; }; - /** - * Delete a pipeline - * @description Delete a pipeline - */ + /** Delete a pipeline */ delete_pipeline: { parameters: { path: { @@ -904,10 +765,7 @@ export interface operations { 200: never; }; }; - /** - * Update a pipeline - * @description Update a pipeline - */ + /** Update a pipeline */ patch_pipeline: { parameters: { path: { @@ -929,10 +787,7 @@ export interface operations { }; }; }; - /** - * List a pipeline's jobs - * @description List a pipeline's jobs - */ + /** List a pipeline's jobs */ get_pipeline_jobs: { parameters: { path: { @@ -949,10 +804,7 @@ export interface operations { }; }; }; - /** - * Restart a pipeline - * @description Restart a pipeline - */ + /** Restart a pipeline */ restart_pipeline: { parameters: { path: { @@ -974,10 +826,7 @@ export interface operations { }; }; }; - /** - * List a job's checkpoints - * @description List a job's checkpoints - */ + /** List a job's checkpoints */ get_job_checkpoints: { parameters: { path: { @@ -996,10 +845,7 @@ export interface operations { }; }; }; - /** - * Get a checkpoint's details - * @description Get a checkpoint's details - */ + /** Get a checkpoint's details */ get_checkpoint_details: { parameters: { path: { @@ -1020,10 +866,7 @@ export interface operations { }; }; }; - /** - * List a job's error messages - * @description List a job's error messages - */ + /** List a job's error messages */ get_job_errors: { parameters: { query?: { @@ -1048,10 +891,7 @@ export interface operations { }; }; }; - /** - * Get a job's metrics - * @description Get a job's metrics - */ + /** Get a job's metrics */ get_operator_metric_groups: { parameters: { path: { @@ -1070,10 +910,7 @@ export interface operations { }; }; }; - /** - * Subscribe to a job's output - * @description Subscribe to a job's output - */ + /** Subscribe to a job's output */ get_job_output: { parameters: { path: { @@ -1088,10 +925,7 @@ export interface operations { 200: never; }; }; - /** - * Get Global UDFs - * @description Get Global UDFs - */ + /** Get Global UDFs */ get_udfs: { responses: { /** @description List of UDFs */ @@ -1102,10 +936,7 @@ export interface operations { }; }; }; - /** - * Create a global UDF - * @description Create a global UDF - */ + /** Create a global UDF */ create_udf: { requestBody: { content: { @@ -1121,10 +952,7 @@ export interface operations { }; }; }; - /** - * Validate UDFs - * @description Validate UDFs - */ + /** Validate UDFs */ validate_udf: { requestBody: { content: { @@ -1140,10 +968,7 @@ export interface operations { }; }; }; - /** - * Delete UDF - * @description Delete UDF - */ + /** Delete UDF */ delete_udf: { parameters: { path: { diff --git a/webui/src/routes/connections/DefineSchema.tsx b/webui/src/routes/connections/DefineSchema.tsx index 76e11e530..0db24eab7 100644 --- a/webui/src/routes/connections/DefineSchema.tsx +++ b/webui/src/routes/connections/DefineSchema.tsx @@ -248,6 +248,53 @@ const RawStringEditor = ({ ); }; +const RawBytesEditor = ({ + state, + setState, + next, +}: { + state: CreateConnectionState; + setState: Dispatch; + next: () => void; +}) => { + const submit = () => { + setState({ + ...state, + schema: { + ...state.schema, + definition: { raw_schema: 'value' }, + fields: [ + { + fieldName: 'value', + fieldType: { + type: { + primitive: 'Bytes', + }, + }, + nullable: false, + }, + ], + format: { raw_bytes: {} }, + }, + }); + next(); + }; + + return ( + + + When using the raw bytes format, values are read from the source as raw byte strings. + + + + Raw bytes connection tables have a single value column with the value. + + + + + ); +}; + export const DefineSchema = ({ connector, state, @@ -306,6 +353,11 @@ export const DefineSchema = ({ value: 'raw_string', el: , }, + { + name: 'Raw Bytes', + value: 'raw_bytes', + el: , + }, { name: 'Protobuf (coming soon)', value: 'protobuf',