Skip to content

Commit

Permalink
Add raw_binary format (#625)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored May 15, 2024
1 parent 67d94bd commit fa91a41
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 260 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@ members = [
"crates/arroyo-udf/arroyo-udf-common",
"crates/arroyo-udf/arroyo-udf-plugin",
"crates/arroyo-udf/arroyo-udf-host",
"crates/arroyo-udf/arroyo-udf-macros",
"crates/arroyo-worker",
"crates/copy-artifacts",
"crates/integ", "crates/arroyo-udf/arroyo-udf-macros",
"crates/integ",
]

resolver = "2"

exclude = [
"build_dir",
]

[workspace.dependencies]
tonic = { version = "0.11" }
tonic-build = { version = "0.11" }
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ pub(crate) async fn expand_schema(
}
Format::Parquet(_) => Ok(schema),
Format::RawString(_) => Ok(schema),
Format::RawBytes(_) => Ok(schema),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ impl IntoResponse for HttpError {
AvroFormat,
ParquetFormat,
RawStringFormat,
RawBytesFormat,
TimestampFormat,
Framing,
FramingMethod,
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/filesystem/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ impl FileSystemSourceFunc {
.await
}
Format::RawString(_) => todo!(),
Format::RawBytes(_) => todo!(),
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,9 @@ impl KafkaTester {
String::from_utf8(msg).map_err(|e|
anyhow!("Failed to parse message as UTF-8: {:?}. Ensure that the format and schema type are correct.", e))?;
}
Format::RawBytes(_) => {
// all bytes are valid
}
};

Ok(())
Expand Down
26 changes: 14 additions & 12 deletions crates/arroyo-connectors/src/preview/operator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use arrow::array::{RecordBatch, TimestampNanosecondArray};
#[allow(deprecated)]
use arrow::json::writer::record_batches_to_json_rows;
use arrow::json::writer::record_batch_to_vec;
use std::time::SystemTime;

use arroyo_operator::context::ArrowContext;
Expand Down Expand Up @@ -32,19 +31,22 @@ impl ArrowOperator for PreviewSink {
);
}

async fn process_batch(&mut self, record_batch: RecordBatch, ctx: &mut ArrowContext) {
let timestamp_column = record_batch
.column(ctx.in_schemas[0].timestamp_index)
async fn process_batch(&mut self, mut batch: RecordBatch, ctx: &mut ArrowContext) {
let ts = ctx.in_schemas[0].timestamp_index;
let timestamp_column = batch
.column(ts)
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
.unwrap()
.clone();

#[allow(deprecated)]
let json_rows = record_batches_to_json_rows(&[&record_batch]).unwrap();
for (mut map, timestamp) in json_rows.into_iter().zip(timestamp_column.iter()) {
map.remove("_timestamp");
let value = serde_json::to_string(&map).unwrap();
batch.remove_column(ts);

let rows = record_batch_to_vec(&batch, true, arrow::json::writer::TimestampFormat::RFC3339)
.unwrap();

for (value, timestamp) in rows.into_iter().zip(timestamp_column.iter()) {
let s = String::from_utf8(value).unwrap();
self.client
.as_mut()
.unwrap()
Expand All @@ -57,7 +59,7 @@ impl ArrowOperator for PreviewSink {
.map(|nanos| from_nanos(nanos as u128))
.unwrap_or_else(SystemTime::now),
),
value,
value: s,
done: false,
})
.await
Expand Down
76 changes: 74 additions & 2 deletions crates/arroyo-formats/src/de.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::avro::de;
use arrow::compute::kernels;
use arrow_array::builder::{ArrayBuilder, StringBuilder, TimestampNanosecondBuilder};
use arrow_array::builder::{
ArrayBuilder, GenericByteBuilder, StringBuilder, TimestampNanosecondBuilder,
};
use arrow_array::types::GenericBinaryType;
use arrow_array::RecordBatch;
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{AvroFormat, BadData, Format, Framing, FramingMethod, JsonFormat};
Expand Down Expand Up @@ -209,6 +212,10 @@ impl ArrowDeserializer {
self.deserialize_raw_string(buffer, msg);
add_timestamp(buffer, self.schema.timestamp_index, timestamp);
}
Format::RawBytes(_) => {
self.deserialize_raw_bytes(buffer, msg);
add_timestamp(buffer, self.schema.timestamp_index, timestamp);
}
Format::Json(json) => {
let msg = if json.confluent_schema_registry {
&msg[5..]
Expand Down Expand Up @@ -316,6 +323,19 @@ impl ArrowDeserializer {
.append_value(String::from_utf8_lossy(msg));
}

fn deserialize_raw_bytes(&mut self, buffer: &mut [Box<dyn ArrayBuilder>], msg: &[u8]) {
let (col, _) = self
.schema
.schema
.column_with_name("value")
.expect("no 'value' column for RawBytes format");
buffer[col]
.as_any_mut()
.downcast_mut::<GenericByteBuilder<GenericBinaryType<i32>>>()
.expect("'value' column has incorrect type")
.append_value(msg);
}

pub fn bad_data(&self) -> &BadData {
&self.bad_data
}
Expand All @@ -338,11 +358,13 @@ mod tests {
use crate::de::{ArrowDeserializer, FramingIterator};
use arrow_array::builder::{make_builder, ArrayBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::{Int64Type, TimestampNanosecondType};
use arrow_array::types::{GenericBinaryType, Int64Type, TimestampNanosecondType};
use arrow_array::RecordBatch;
use arrow_schema::{Schema, TimeUnit};
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::formats::{
BadData, Format, Framing, FramingMethod, JsonFormat, NewlineDelimitedFraming,
RawBytesFormat,
};
use arroyo_types::{to_nanos, SourceError};
use serde_json::json;
Expand Down Expand Up @@ -521,4 +543,54 @@ mod tests {

assert!(matches!(err, SourceError::BadData { .. }));
}

#[tokio::test]
async fn test_raw_bytes() {
let schema = Arc::new(Schema::new(vec![
arrow_schema::Field::new("value", arrow_schema::DataType::Binary, false),
arrow_schema::Field::new(
"_timestamp",
arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]));

let mut arrays: Vec<_> = schema
.fields
.iter()
.map(|f| make_builder(f.data_type(), 16))
.collect();

let arroyo_schema = ArroyoSchema::from_schema_unkeyed(schema.clone()).unwrap();

let mut deserializer = ArrowDeserializer::new(
Format::RawBytes(RawBytesFormat {}),
arroyo_schema,
None,
BadData::Fail {},
);

let time = SystemTime::now();
let result = deserializer
.deserialize_slice(&mut arrays, &vec![0, 1, 2, 3, 4, 5], time)
.await;
assert!(result.is_empty());

let arrays: Vec<_> = arrays.into_iter().map(|mut a| a.finish()).collect();
let batch = RecordBatch::try_new(schema, arrays).unwrap();

assert_eq!(batch.num_rows(), 1);
assert_eq!(
batch.columns()[0]
.as_bytes::<GenericBinaryType<i32>>()
.value(0),
&[0, 1, 2, 3, 4, 5]
);
assert_eq!(
batch.columns()[1]
.as_primitive::<TimestampNanosecondType>()
.value(0),
to_nanos(time) as i64
);
}
}
69 changes: 67 additions & 2 deletions crates/arroyo-formats/src/ser.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::avro::schema;
use crate::{avro, json};
use arrow_array::cast::AsArray;
use arrow_array::types::GenericBinaryType;
use arrow_array::RecordBatch;
use arrow_json::writer::record_batch_to_vec;
use arrow_schema::{DataType, Field};
use arroyo_rpc::formats::{AvroFormat, Format, JsonFormat, RawStringFormat, TimestampFormat};
use arroyo_rpc::formats::{
AvroFormat, Format, JsonFormat, RawBytesFormat, RawStringFormat, TimestampFormat,
};
use arroyo_rpc::TIMESTAMP_FIELD;
use serde_json::Value;
use std::sync::Arc;
Expand Down Expand Up @@ -78,6 +81,7 @@ impl ArrowSerializer {
Format::Avro(avro) => self.serialize_avro(avro, &batch),
Format::Parquet(_) => todo!("parquet"),
Format::RawString(RawStringFormat {}) => self.serialize_raw_string(&batch),
Format::RawBytes(RawBytesFormat {}) => self.serialize_raw_bytes(&batch),
}
}

Expand Down Expand Up @@ -136,6 +140,7 @@ impl ArrowSerializer {
}
}))
}

fn serialize_raw_string(
&self,
batch: &RecordBatch,
Expand All @@ -161,6 +166,28 @@ impl ArrowSerializer {
Box::new(values.into_iter())
}

fn serialize_raw_bytes(&self, batch: &RecordBatch) -> Box<dyn Iterator<Item = Vec<u8>> + Send> {
let value_idx = batch.schema().index_of("value").unwrap_or_else(|_| {
panic!(
"invalid schema for raw_string serializer: {}; a VALUE column is required",
batch.schema()
)
});

if *batch.schema().field(value_idx).data_type() != DataType::Binary {
panic!("invalid schema for raw_string serializer: {}; a must have a column VALUE of type BYTEA", batch.schema());
}

let values: Vec<Vec<u8>> = batch
.column(value_idx)
.as_bytes::<GenericBinaryType<i32>>()
.iter()
.map(|v| v.map(|v| v.to_vec()).unwrap_or_default())
.collect();

Box::new(values.into_iter())
}

fn serialize_avro(
&self,
format: &AvroFormat,
Expand Down Expand Up @@ -213,7 +240,7 @@ mod tests {
use crate::ser::ArrowSerializer;
use arrow_array::builder::TimestampNanosecondBuilder;
use arrow_schema::{Schema, TimeUnit};
use arroyo_rpc::formats::{Format, RawStringFormat, TimestampFormat};
use arroyo_rpc::formats::{Format, RawBytesFormat, RawStringFormat, TimestampFormat};
use arroyo_types::to_nanos;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
Expand Down Expand Up @@ -258,6 +285,44 @@ mod tests {
assert_eq!(iter.next(), None);
}

#[test]
fn test_raw_bytes() {
let mut serializer = ArrowSerializer::new(Format::RawBytes(RawBytesFormat {}));

let data = [b"0123".to_vec(), b"hello".to_vec(), vec![0, 1, 2, 4]];
let ts: Vec<_> = data
.iter()
.enumerate()
.map(|(i, _)| to_nanos(SystemTime::now() + Duration::from_secs(i as u64)) as i64)
.collect();

let schema = Arc::new(Schema::new(vec![
arrow_schema::Field::new("value", arrow_schema::DataType::Binary, false),
arrow_schema::Field::new(
"_timestamp",
arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]));

let batch = arrow_array::RecordBatch::try_new(
schema,
vec![
Arc::new(arrow_array::BinaryArray::from(
data.iter().map(|t| t.as_slice()).collect::<Vec<_>>(),
)),
Arc::new(arrow_array::TimestampNanosecondArray::from(ts)),
],
)
.unwrap();

let mut iter = serializer.serialize(&batch);
assert_eq!(iter.next().unwrap(), b"0123");
assert_eq!(iter.next().unwrap(), b"hello");
assert_eq!(iter.next().unwrap(), vec![0, 1, 2, 4]);
assert_eq!(iter.next(), None);
}

#[test]
fn test_json() {
let mut serializer = ArrowSerializer::new(Format::Json(arroyo_rpc::formats::JsonFormat {
Expand Down
7 changes: 7 additions & 0 deletions crates/arroyo-rpc/src/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ impl JsonFormat {
#[serde(rename_all = "camelCase")]
pub struct RawStringFormat {}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct RawBytesFormat {}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
pub struct ConfluentSchemaRegistryConfig {
endpoint: String,
Expand Down Expand Up @@ -233,6 +237,7 @@ pub enum Format {
Avro(AvroFormat),
Parquet(ParquetFormat),
RawString(RawStringFormat),
RawBytes(RawBytesFormat),
}

impl Format {
Expand All @@ -247,6 +252,7 @@ impl Format {
"protobuf" => return Err("protobuf is not yet supported".to_string()),
"avro" => Format::Avro(AvroFormat::from_opts(opts)?),
"raw_string" => Format::RawString(RawStringFormat {}),
"raw_bytes" => Format::RawBytes(RawBytesFormat {}),
"parquet" => Format::Parquet(ParquetFormat {}),
f => return Err(format!("Unknown format '{}'", f)),
}))
Expand All @@ -256,6 +262,7 @@ impl Format {
match self {
Format::Json(JsonFormat { debezium: true, .. }) => true,
Format::Json(_) | Format::Avro(_) | Format::Parquet(_) | Format::RawString(_) => false,
Format::RawBytes(_) => false,
}
}
}
Expand Down
Loading

0 comments on commit fa91a41

Please sign in to comment.