diff --git a/README.md b/README.md index fce786f..0ea593b 100644 --- a/README.md +++ b/README.md @@ -119,8 +119,10 @@ The built-in blocks provided by Protoflow are listed below: | [`ReadDir`] | Reads file names from a file system directory. | | [`ReadEnv`] | Reads the value of an environment variable. | | [`ReadFile`] | Reads bytes from the contents of a file. | +| [`ReadSocket`] | Reads proto message from a TCP port. | | [`ReadStdin`] | Reads bytes from standard input (aka stdin). | | [`WriteFile`] | Writes or appends bytes to the contents of a file. | +| [`WriteSocket`] | Writes a proto message to a TCP port | | [`WriteStderr`] | Writes bytes to standard error (aka stderr). | | [`WriteStdout`] | Writes bytes to standard output (aka stdout). | @@ -466,6 +468,25 @@ block-beta ```bash protoflow execute ReadFile path=/tmp/file.txt ``` +#### [`ReadSocket`] + +A block that reads proto messages from a TCP port. + +```mermaid +block-beta + columns 4 + ReadSocket space:2 Sink + ReadSocket-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class ReadSocket block + class Sink hidden +``` + +```bash +protoflow execute ReadSocket host="127.0.0.1" port=7077 buffer_size=1024 +``` #### [`ReadStdin`] @@ -511,7 +532,25 @@ block-beta ```bash protoflow execute WriteFile path=/tmp/file.txt ``` +#### [`WriteSocket`] +A block that writes proto messages to TCP port. + +```mermaid +block-beta + columns 4 + Source space:2 WriteSocket + Source-- "input" -->WriteSocket + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class WriteSocket block + class Source hidden +``` + +```bash +protoflow execute WriteSocket host="127.0.0.1" port=7077 buffer_size=1024 +``` #### [`WriteStderr`] A block that writes bytes to standard error (aka stderr). @@ -589,7 +628,9 @@ git clone https://github.com/AsimovPlatform/protoflow.git [`ReadDir`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadDir.html [`ReadEnv`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadEnv.html [`ReadFile`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadFile.html +[`ReadFile`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadSocket.html [`ReadStdin`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadStdin.html +[`WriteSocket`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteSocket.html [`WriteFile`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteFile.html [`WriteStderr`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteStderr.html [`WriteStdout`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteStdout.html diff --git a/lib/protoflow-blocks/doc/sys/read_socket.mmd b/lib/protoflow-blocks/doc/sys/read_socket.mmd new file mode 100644 index 0000000..b5e8604 --- /dev/null +++ b/lib/protoflow-blocks/doc/sys/read_socket.mmd @@ -0,0 +1,9 @@ +block-beta + columns 4 + ReadSocket space:2 Sink + ReadSocket-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class ReadSocket block + class Sink hidden \ No newline at end of file diff --git a/lib/protoflow-blocks/doc/sys/read_socket.seq.mmd b/lib/protoflow-blocks/doc/sys/read_socket.seq.mmd new file mode 100644 index 0000000..75b948c --- /dev/null +++ b/lib/protoflow-blocks/doc/sys/read_socket.seq.mmd @@ -0,0 +1,15 @@ +sequenceDiagram + autonumber + participant ReadSocket as ReadSocket block + participant ReadSocket.output as ReadSocket.output port + participant BlockA as Another block + + ReadSocket-->>BlockA: Connect + + loop ReadSocket process + ReadSocket->>ReadSocket: Read proto message from TCP port + ReadSocket->>BlockA: Message (proto) + end + + ReadSocket-->>ReadSocket.output: Close + ReadSocket-->>BlockA: Disconnect diff --git a/lib/protoflow-blocks/doc/sys/write_socket.mmd b/lib/protoflow-blocks/doc/sys/write_socket.mmd new file mode 100644 index 0000000..fb2730f --- /dev/null +++ b/lib/protoflow-blocks/doc/sys/write_socket.mmd @@ -0,0 +1,9 @@ +block-beta + columns 4 + Source space:2 WriteSocket + Source-- "input" -->WriteSocket + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class WriteSocket block + class Source hidden \ No newline at end of file diff --git a/lib/protoflow-blocks/doc/sys/write_socket.seq.mmd b/lib/protoflow-blocks/doc/sys/write_socket.seq.mmd new file mode 100644 index 0000000..dfeefe9 --- /dev/null +++ b/lib/protoflow-blocks/doc/sys/write_socket.seq.mmd @@ -0,0 +1,15 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant WriteSocket.input as WriteSocket.input port + participant WriteSocket as WriteSocket block + + BlockA-->>WriteSocket: Connect + + loop WriteSocket process + BlockA->>WriteSocket: Message (proto) + WriteSocket->>WriteSocket: Write proto message to TCP port + end + + BlockA-->>WriteSocket: Disconnect + WriteSocket-->>WriteSocket.input: Close diff --git a/lib/protoflow-blocks/src/block_config.rs b/lib/protoflow-blocks/src/block_config.rs index 11be39c..7484109 100644 --- a/lib/protoflow-blocks/src/block_config.rs +++ b/lib/protoflow-blocks/src/block_config.rs @@ -56,10 +56,12 @@ impl<'de> serde::Deserialize<'de> for BlockConfig { } #[cfg(feature = "std")] - "ReadDir" | "ReadEnv" | "ReadFile" | "ReadStdin" | "WriteFile" | "WriteStderr" - | "WriteStdout" => SysBlockConfig::deserialize(value.clone()) - .map(BlockConfig::Sys) - .unwrap(), + "ReadDir" | "ReadEnv" | "ReadFile" | "ReadSocket" | "ReadStdin" | "WriteFile" + | "WriteSocket" | "WriteStderr" | "WriteStdout" => { + SysBlockConfig::deserialize(value.clone()) + .map(BlockConfig::Sys) + .unwrap() + } _ => return Err(serde::de::Error::custom("unknown Protoflow block type")), }), diff --git a/lib/protoflow-blocks/src/block_tag.rs b/lib/protoflow-blocks/src/block_tag.rs index a9d526e..b06ef63 100644 --- a/lib/protoflow-blocks/src/block_tag.rs +++ b/lib/protoflow-blocks/src/block_tag.rs @@ -35,10 +35,14 @@ pub enum BlockTag { #[cfg(feature = "std")] ReadFile, #[cfg(feature = "std")] + ReadSocket, + #[cfg(feature = "std")] ReadStdin, #[cfg(feature = "std")] WriteFile, #[cfg(feature = "std")] + WriteSocket, + #[cfg(feature = "std")] WriteStderr, #[cfg(feature = "std")] WriteStdout, @@ -76,10 +80,14 @@ impl BlockTag { #[cfg(feature = "std")] ReadFile => "ReadFile", #[cfg(feature = "std")] + ReadSocket => "ReadSocket", + #[cfg(feature = "std")] ReadStdin => "ReadStdin", #[cfg(feature = "std")] WriteFile => "WriteFile", #[cfg(feature = "std")] + WriteSocket => "WriteSocket", + #[cfg(feature = "std")] WriteStderr => "WriteStderr", #[cfg(feature = "std")] WriteStdout => "WriteStdout", @@ -112,10 +120,14 @@ impl FromStr for BlockTag { #[cfg(feature = "std")] "ReadFile" => ReadFile, #[cfg(feature = "std")] + "ReadSocket" => ReadSocket, + #[cfg(feature = "std")] "ReadStdin" => ReadStdin, #[cfg(feature = "std")] "WriteFile" => WriteFile, #[cfg(feature = "std")] + "WriteSocket" => WriteSocket, + #[cfg(feature = "std")] "WriteStderr" => WriteStderr, #[cfg(feature = "std")] "WriteStdout" => WriteStdout, @@ -159,10 +171,16 @@ impl BlockInstantiation for BlockTag { #[cfg(feature = "std")] ReadFile => Box::new(super::ReadFile::with_system(system)), #[cfg(feature = "std")] + //todo: evren add param + ReadSocket => Box::new(super::ReadSocket::::with_system(system, None)), + #[cfg(feature = "std")] ReadStdin => Box::new(super::ReadStdin::with_system(system, None)), #[cfg(feature = "std")] WriteFile => Box::new(super::WriteFile::with_system(system, None)), #[cfg(feature = "std")] + //todo: evren add param + WriteSocket => Box::new(super::WriteSocket::::with_system(system, None)), + #[cfg(feature = "std")] WriteStderr => Box::new(super::WriteStderr::with_system(system)), #[cfg(feature = "std")] WriteStdout => Box::new(super::WriteStdout::with_system(system)), diff --git a/lib/protoflow-blocks/src/blocks/sys.rs b/lib/protoflow-blocks/src/blocks/sys.rs index d38361b..76600b3 100644 --- a/lib/protoflow-blocks/src/blocks/sys.rs +++ b/lib/protoflow-blocks/src/blocks/sys.rs @@ -13,14 +13,16 @@ pub mod sys { types::ByteSize, BlockConnections, BlockInstantiation, InputPortName, OutputPortName, System, }; - use protoflow_core::Block; + use protoflow_core::{Block, Message}; pub trait SysBlocks { fn read_dir(&mut self) -> ReadDir; fn read_env(&mut self) -> ReadEnv; fn read_file(&mut self) -> ReadFile; + fn read_socket(&mut self) -> ReadSocket; fn read_stdin(&mut self) -> ReadStdin; fn write_file(&mut self) -> WriteFile; + fn write_socket(&mut self) -> WriteSocket; fn write_stderr(&mut self) -> WriteStderr; fn write_stdout(&mut self) -> WriteStdout; } @@ -31,8 +33,10 @@ pub mod sys { ReadDir, ReadEnv, ReadFile, + ReadSocket, ReadStdin, WriteFile, + WriteSocket, WriteStderr, WriteStdout, } @@ -55,6 +59,11 @@ pub mod sys { output: OutputPortName, }, + ReadSocket { + output: OutputPortName, + config: ReadSocketConfig, + }, + ReadStdin { output: OutputPortName, buffer_size: Option, @@ -66,6 +75,11 @@ pub mod sys { flags: Option, }, + WriteSocket { + input: InputPortName, + config: WriteSocketConfig, + }, + WriteStderr { input: InputPortName, }, @@ -82,8 +96,10 @@ pub mod sys { ReadDir { .. } => "ReadDir", ReadEnv { .. } => "ReadEnv", ReadFile { .. } => "ReadFile", + ReadSocket { .. } => "ReadSocket", ReadStdin { .. } => "ReadStdin", WriteFile { .. } => "WriteFile", + WriteSocket { .. } => "WriteSocket", WriteStderr { .. } => "WriteStderr", WriteStdout { .. } => "WriteStdout", }) @@ -97,10 +113,13 @@ pub mod sys { ReadDir { output, .. } | ReadEnv { output, .. } | ReadFile { output, .. } + | ReadSocket { output, .. } | ReadStdin { output, .. } => { vec![("output", Some(output.clone()))] } - WriteFile { .. } | WriteStderr { .. } | WriteStdout { .. } => vec![], + WriteFile { .. } | WriteSocket { .. } | WriteStderr { .. } | WriteStdout { .. } => { + vec![] + } } } } @@ -112,10 +131,18 @@ pub mod sys { ReadDir { .. } => Box::new(super::ReadDir::with_system(system)), ReadEnv { .. } => Box::new(super::ReadEnv::::with_system(system)), ReadFile { .. } => Box::new(super::ReadFile::with_system(system)), + ReadSocket { config, .. } => Box::new(super::ReadSocket::::with_system( + system, + Some(config.clone()), + )), ReadStdin { buffer_size, .. } => { Box::new(super::ReadStdin::with_system(system, *buffer_size)) } WriteFile { flags, .. } => Box::new(super::WriteFile::with_system(system, *flags)), + WriteSocket { config, .. } => Box::new(super::WriteSocket::::with_system( + system, + Some(config.clone()), + )), WriteStderr { .. } => Box::new(super::WriteStderr::with_system(system)), WriteStdout { .. } => Box::new(super::WriteStdout::with_system(system)), } @@ -131,12 +158,18 @@ pub mod sys { mod read_file; pub use read_file::*; + mod read_socket; + pub use read_socket::*; + mod read_stdin; pub use read_stdin::*; mod write_file; pub use write_file::*; + mod write_socket; + pub use write_socket::*; + mod write_stderr; pub use write_stderr::*; diff --git a/lib/protoflow-blocks/src/blocks/sys/read_socket.rs b/lib/protoflow-blocks/src/blocks/sys/read_socket.rs new file mode 100644 index 0000000..0d365f3 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/sys/read_socket.rs @@ -0,0 +1,240 @@ +extern crate std; +use crate::prelude::{vec, String}; +use crate::{IoBlocks, StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{ + types::Any, Block, BlockError, BlockResult, BlockRuntime, Message, OutputPort, Port, + PortResult, SystemBuilding, +}; +use protoflow_derive::Block; +use serde::{Deserialize, Serialize}; +use simple_mermaid::mermaid; +use std::{ + format, + io::Read, + net::{TcpListener, TcpStream}, + string::ToString, + sync::{Arc, Mutex, PoisonError}, +}; +use tracing::{error, info}; +/// A block that reads a proto object from a TCP port. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/sys/read_socket.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/sys/read_socket.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// // TODO +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute ReadSocket host="127.0.0.1" port="7077" buffer_size="1024" +/// ``` +/// +#[derive(Block, Clone)] +pub struct ReadSocket { + #[output] + pub output: OutputPort, + #[parameter] + pub config: ReadSocketConfig, + #[cfg(feature = "std")] + #[state] + pub listener: Arc>>, + #[cfg(feature = "std")] + #[state] + pub stream: Arc>>, +} + +impl ReadSocket { + pub fn with_params(output: OutputPort, config: Option) -> Self { + Self { + output, + config: config.unwrap_or(Default::default()), + listener: Arc::new(Mutex::new(None)), + stream: Arc::new(Mutex::new(None)), + } + } +} + +impl ReadSocket { + pub fn with_system(system: &System, config: Option) -> Self { + Self::with_params(system.output(), config) + } +} + +impl StdioSystem for ReadSocket { + fn build_system(config: StdioConfig) -> Result { + config.allow_only(vec!["host", "port", "buffer_size"])?; + + let host_string = config.get_string("host")?; + let port: u16 = config.get("port")?; + let buffer_size: usize = config.get("buffer_size")?; + + Ok(System::build(|s| { + let line_encoder = s.encode_with(config.encoding); + let stdout = config.write_stdout(s); + let read_socket: ReadSocket = s.block(ReadSocket::with_system( + s, + Some(ReadSocketConfig { + host: host_string, + port, + buffer_size, + }), + )); + s.connect(&read_socket.output, &line_encoder.input); + s.connect(&line_encoder.output, &stdout.input); + })) + } +} + +impl Block for ReadSocket { + fn prepare(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult { + let address = format!("{}:{}", &self.config.host, &self.config.port); + let listener = TcpListener::bind(&address)?; + *self.listener.lock().map_err(lock_error)? = Some(listener); + info!("Server listening on {}", address); + Ok(()) + } + + fn execute(&mut self, _: &dyn BlockRuntime) -> BlockResult { + let mut stream_guard = self.stream.lock().map_err(lock_error)?; + + if stream_guard.is_none() { + let listener_lock = self.listener.lock().map_err(lock_error)?; + let listener = listener_lock + .as_ref() + .ok_or(BlockError::Other("Invalid TCP listener".into()))?; + + let (stream, addr) = listener.accept().map_err(|e| { + error!("Failed to accept client connection: {}", e); + BlockError::Other("Failed to accept client connection".into()) + })?; + + info!("Accepted connection from {}", addr); + *stream_guard = Some(stream); + } + + if let Some(stream) = stream_guard.as_mut() { + handle_client::(stream, self.config.buffer_size, |message| { + info!("Processing received message"); + if self.output.is_connected() { + self.output.send(message)?; + } + Ok(()) + }) + .map_err(|e| { + error!("Error handling client: {}", e); + BlockError::Other("Error handling client".into()) + })?; + } + + Ok(()) + } +} + +fn lock_error(err: PoisonError) -> BlockError { + BlockError::Other(format!("Failed to acquire lock: {:?}", err)) +} + +fn handle_client( + stream: &mut TcpStream, + buffer_size: usize, + process_fn: F, +) -> Result<(), BlockError> +where + M: Message + Default, + F: Fn(&M) -> PortResult<()>, +{ + let mut buffer = vec![0; buffer_size]; + + while let Ok(bytes_read) = Read::read(stream, &mut buffer) { + if bytes_read == 0 { + info!("Client disconnected"); + break; + } + + let message = M::decode(&buffer[..bytes_read]) + .map_err(|_| BlockError::Other("Failed to decode message".into()))?; + + info!("Received message: {:?}", message); + + process_fn(&message).map_err(|e| { + error!("Failed to send response: {:?}", e); + BlockError::Other("Failed to send response".into()) + })?; + } + + Ok(()) +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ReadSocketConfig { + pub host: String, + pub port: u16, + pub buffer_size: usize, +} + +impl Default for ReadSocketConfig { + fn default() -> Self { + Self { + host: String::from("127.0.0.1"), + port: 7070, + buffer_size: 512, + } + } +} +#[cfg(test)] +pub mod read_socket_tests { + + use protoflow_core::SystemBuilding; + + use crate::{Encoding, SysBlocks, System}; + + use super::ReadSocket; + use std::string::String; + extern crate std; + + #[test] + #[ignore] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(ReadSocket::::with_system(s, None)); + }); + } + #[test] + #[ignore = "requires port"] + fn run_server() { + use super::*; + use SystemBuilding; + if let Err(e) = System::run(|s| { + let std_out = s.write_stdout(); + let message_encoder = s.encode_with::(Encoding::TextWithNewlineSuffix); + + let read_socket = s.block(ReadSocket::::with_system( + s, + Some(ReadSocketConfig { + host: String::from("127.0.0.1"), + port: 7070, + buffer_size: 512, + }), + )); + s.connect(&read_socket.output, &message_encoder.input); + s.connect(&message_encoder.output, &std_out.input); + }) { + error!("{}", e) + } + } +} diff --git a/lib/protoflow-blocks/src/blocks/sys/write_socket.rs b/lib/protoflow-blocks/src/blocks/sys/write_socket.rs new file mode 100644 index 0000000..e9ed648 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/sys/write_socket.rs @@ -0,0 +1,185 @@ +extern crate std; +use super::SysBlocks; +use crate::prelude::{vec, String}; +use crate::{IoBlocks, StdioConfig, StdioError, StdioSystem, System}; +use core::str::FromStr; +use protoflow_core::{ + Block, BlockError, BlockResult, BlockRuntime, InputPort, Message, SystemBuilding, +}; +use protoflow_derive::Block; +use serde::{Deserialize, Serialize}; +use simple_mermaid::mermaid; +use std::{ + format, + net::TcpStream, + sync::{Arc, Mutex, PoisonError}, +}; +use tracing::error; +/// A block that writes a proto object to a TCP port. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/sys/write_socket.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/sys/write_socket.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// // TODO +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute WriteSocket host="127.0.0.1" port="7077" buffer_size="1024" +/// ``` +/// +#[derive(Block, Clone)] +pub struct WriteSocket { + #[output] + pub input: InputPort, + #[parameter] + pub config: WriteSocketConfig, + #[state] + pub stream: Arc>>, +} + +impl WriteSocket { + pub fn with_params(input: InputPort, config: Option) -> Self { + Self { + input, + config: config.unwrap_or(Default::default()), + stream: Arc::new(Mutex::new(None)), + } + } + pub fn with_config(self, config: WriteSocketConfig) -> Self { + Self { config, ..self } + } +} + +impl WriteSocket { + pub fn with_system(system: &System, config: Option) -> Self { + Self::with_params(system.input(), config) + } +} + +impl StdioSystem for WriteSocket { + fn build_system(config: StdioConfig) -> Result { + config.allow_only(vec!["host", "port", "buffer_size"])?; + + let host = config.get_string("host")?; + let port: u16 = config.get("port")?; + let buffer_size: usize = config.get("buffer_size")?; + + Ok(System::build(|s| { + let stdin = config.read_stdin(s); + let message_decoder = s.decode_with::(config.encoding); + let write_socket: WriteSocket = s.write_socket().with_config(WriteSocketConfig { + host, + port, + buffer_size, + }); + + s.connect(&stdin.output, &message_decoder.input); + s.connect(&message_decoder.output, &write_socket.input); + })) + } +} + +impl Block for WriteSocket { + fn execute(&mut self, _: &dyn BlockRuntime) -> BlockResult { + while let Some(input) = self.input.recv()? { + let mut stream_guard = self.stream.lock().map_err(lock_error)?; + + if stream_guard.is_none() { + let address = format!("{}:{}", self.config.host, self.config.port); + *stream_guard = Some(TcpStream::connect(&address).map_err(|e| { + error!("Failed to connect to {}: {}", &address, e); + BlockError::Other(format!("Failed to connect to {}: {}", &address, e)) + })?); + } + + let stream = stream_guard.as_mut().ok_or_else(|| { + error!("Stream is not connected"); + BlockError::Other("Stream is not connected".into()) + })?; + + let mut response = vec::Vec::new(); + input + .encode(&mut response) + .map_err(|e| BlockError::Other(format!("Encoding failed: {}", e)))?; + + std::io::Write::write_all(stream, &response)?; + } + Ok(()) + } +} + +fn lock_error(err: PoisonError) -> BlockError { + BlockError::Other(format!("Failed to acquire lock: {:?}", err)) +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct WriteSocketConfig { + pub host: String, + pub port: u16, + pub buffer_size: usize, +} + +impl<'a> Default for WriteSocketConfig { + fn default() -> Self { + Self { + host: String::from("127.0.0.1"), + port: 7070, + buffer_size: 512, + } + } +} + +#[cfg(test)] +pub mod write_socket_tests { + use protoflow_core::SystemBuilding; + + use crate::{Encoding, System}; + + use super::WriteSocket; + + extern crate std; + use std::string::String; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(WriteSocket::::with_system(s, None)); + }); + } + #[test] + #[ignore = "requires port"] + fn run_client() { + use super::*; + use protoflow_core::SystemBuilding; + if let Err(e) = System::run(|s| { + let stdin = s.read_stdin(); + let message_decoder = s.decode_with::(Encoding::TextWithNewlineSuffix); + + let write_socket = s.write_socket().with_config(WriteSocketConfig { + host: String::from("127.0.0.1"), + port: 7070, + buffer_size: 512, + }); + s.connect(&stdin.output, &message_decoder.input); + s.connect(&message_decoder.output, &write_socket.input); + }) { + error!("{}", e) + } + } +} diff --git a/lib/protoflow-blocks/src/lib.rs b/lib/protoflow-blocks/src/lib.rs index 17b35c6..22ec380 100644 --- a/lib/protoflow-blocks/src/lib.rs +++ b/lib/protoflow-blocks/src/lib.rs @@ -74,8 +74,10 @@ pub fn build_stdio_system( "ReadDir" => ReadDir::build_system(config)?, "ReadEnv" => ReadEnv::::build_system(config)?, "ReadFile" => ReadFile::build_system(config)?, + "ReadSocket" => ReadSocket::::build_system(config)?, "ReadStdin" => ReadStdin::build_system(config)?, "WriteFile" => WriteFile::build_system(config)?, + "WriteSocket" => WriteSocket::::build_system(config)?, "WriteStderr" => WriteStderr::build_system(config)?, "WriteStdout" => WriteStdout::build_system(config)?, // TextBlocks diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 3b2ddac..a74b07b 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -7,7 +7,8 @@ use crate::{ types::{DelayType, Encoding}, AllBlocks, Buffer, Const, CoreBlocks, Count, Decode, DecodeJson, Delay, Drop, Encode, EncodeHex, EncodeJson, FlowBlocks, HashBlocks, IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, - ReadFile, ReadStdin, SysBlocks, TextBlocks, WriteFile, WriteStderr, WriteStdout, + ReadFile, ReadSocket, ReadStdin, SysBlocks, TextBlocks, WriteFile, WriteSocket, WriteStderr, + WriteStdout, }; use protoflow_core::{ Block, BlockID, BlockResult, InputPort, Message, OutputPort, PortID, PortResult, Process, @@ -192,6 +193,10 @@ impl SysBlocks for System { self.0.block(ReadFile::with_system(self)) } + fn read_socket(&mut self) -> ReadSocket { + self.0.block(ReadSocket::::with_system(self, None)) + } + fn read_stdin(&mut self) -> ReadStdin { self.0.block(ReadStdin::with_system(self, None)) } @@ -200,6 +205,10 @@ impl SysBlocks for System { self.0.block(WriteFile::with_system(self, None)) } + fn write_socket(&mut self) -> WriteSocket { + self.0.block(WriteSocket::::with_system(self, None)) + } + fn write_stderr(&mut self) -> WriteStderr { self.0.block(WriteStderr::with_system(self)) }