Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ReadSocket and WriteSocket #10

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ The built-in blocks provided by Protoflow are listed below:
| [`ReadDir`] | Reads file names from a file system directory. |
| [`ReadEnv`] | Reads the value of an environment variable. |
| [`ReadFile`] | Reads bytes from the contents of a file. |
| [`ReadSocket`] | Reads proto message from a TCP port. |
| [`ReadStdin`] | Reads bytes from standard input (aka stdin). |
| [`WriteFile`] | Writes or appends bytes to the contents of a file. |
| [`WriteSocket`] | Writes a proto message to a TCP port |
| [`WriteStderr`] | Writes bytes to standard error (aka stderr). |
| [`WriteStdout`] | Writes bytes to standard output (aka stdout). |

Expand Down Expand Up @@ -466,6 +468,25 @@ block-beta
```bash
protoflow execute ReadFile path=/tmp/file.txt
```
#### [`ReadSocket`]

A block that reads proto messages from a TCP port.

```mermaid
block-beta
columns 4
ReadSocket space:2 Sink
ReadSocket-- "output" -->Sink

classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class ReadSocket block
class Sink hidden
```

```bash
protoflow execute ReadSocket host="127.0.0.1" port=7077 buffer_size=1024
```

#### [`ReadStdin`]

Expand Down Expand Up @@ -511,7 +532,25 @@ block-beta
```bash
protoflow execute WriteFile path=/tmp/file.txt
```
#### [`WriteSocket`]

A block that writes proto messages to TCP port.

```mermaid
block-beta
columns 4
Source space:2 WriteSocket
Source-- "input" -->WriteSocket

classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class WriteSocket block
class Source hidden
```

```bash
protoflow execute WriteSocket host="127.0.0.1" port=7077 buffer_size=1024
```
#### [`WriteStderr`]

A block that writes bytes to standard error (aka stderr).
Expand Down Expand Up @@ -589,7 +628,9 @@ git clone https://github.com/AsimovPlatform/protoflow.git
[`ReadDir`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadDir.html
[`ReadEnv`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadEnv.html
[`ReadFile`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadFile.html
[`ReadFile`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadSocket.html
[`ReadStdin`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadStdin.html
[`WriteSocket`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteSocket.html
[`WriteFile`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteFile.html
[`WriteStderr`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteStderr.html
[`WriteStdout`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteStdout.html
9 changes: 9 additions & 0 deletions lib/protoflow-blocks/doc/sys/read_socket.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
block-beta
columns 4
ReadSocket space:2 Sink
ReadSocket-- "output" -->Sink

classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class ReadSocket block
class Sink hidden
15 changes: 15 additions & 0 deletions lib/protoflow-blocks/doc/sys/read_socket.seq.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
sequenceDiagram
autonumber
participant ReadSocket as ReadSocket block
participant ReadSocket.output as ReadSocket.output port
participant BlockA as Another block

ReadSocket-->>BlockA: Connect

loop ReadSocket process
ReadSocket->>ReadSocket: Read proto message from TCP port
ReadSocket->>BlockA: Message (proto)
end

ReadSocket-->>ReadSocket.output: Close
ReadSocket-->>BlockA: Disconnect
9 changes: 9 additions & 0 deletions lib/protoflow-blocks/doc/sys/write_socket.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
block-beta
columns 4
Source space:2 WriteSocket
Source-- "input" -->WriteSocket

classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class WriteSocket block
class Source hidden
15 changes: 15 additions & 0 deletions lib/protoflow-blocks/doc/sys/write_socket.seq.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
sequenceDiagram
autonumber
participant BlockA as Another block
participant WriteSocket.input as WriteSocket.input port
participant WriteSocket as WriteSocket block

BlockA-->>WriteSocket: Connect

loop WriteSocket process
BlockA->>WriteSocket: Message (proto)
WriteSocket->>WriteSocket: Write proto message to TCP port
end

BlockA-->>WriteSocket: Disconnect
WriteSocket-->>WriteSocket.input: Close
10 changes: 6 additions & 4 deletions lib/protoflow-blocks/src/block_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ impl<'de> serde::Deserialize<'de> for BlockConfig {
}

#[cfg(feature = "std")]
"ReadDir" | "ReadEnv" | "ReadFile" | "ReadStdin" | "WriteFile" | "WriteStderr"
| "WriteStdout" => SysBlockConfig::deserialize(value.clone())
.map(BlockConfig::Sys)
.unwrap(),
"ReadDir" | "ReadEnv" | "ReadFile" | "ReadSocket" | "ReadStdin" | "WriteFile"
| "WriteSocket" | "WriteStderr" | "WriteStdout" => {
SysBlockConfig::deserialize(value.clone())
.map(BlockConfig::Sys)
.unwrap()
}

_ => return Err(serde::de::Error::custom("unknown Protoflow block type")),
}),
Expand Down
18 changes: 18 additions & 0 deletions lib/protoflow-blocks/src/block_tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ pub enum BlockTag {
#[cfg(feature = "std")]
ReadFile,
#[cfg(feature = "std")]
ReadSocket,
#[cfg(feature = "std")]
ReadStdin,
#[cfg(feature = "std")]
WriteFile,
#[cfg(feature = "std")]
WriteSocket,
#[cfg(feature = "std")]
WriteStderr,
#[cfg(feature = "std")]
WriteStdout,
Expand Down Expand Up @@ -76,10 +80,14 @@ impl BlockTag {
#[cfg(feature = "std")]
ReadFile => "ReadFile",
#[cfg(feature = "std")]
ReadSocket => "ReadSocket",
#[cfg(feature = "std")]
ReadStdin => "ReadStdin",
#[cfg(feature = "std")]
WriteFile => "WriteFile",
#[cfg(feature = "std")]
WriteSocket => "WriteSocket",
#[cfg(feature = "std")]
WriteStderr => "WriteStderr",
#[cfg(feature = "std")]
WriteStdout => "WriteStdout",
Expand Down Expand Up @@ -112,10 +120,14 @@ impl FromStr for BlockTag {
#[cfg(feature = "std")]
"ReadFile" => ReadFile,
#[cfg(feature = "std")]
"ReadSocket" => ReadSocket,
#[cfg(feature = "std")]
"ReadStdin" => ReadStdin,
#[cfg(feature = "std")]
"WriteFile" => WriteFile,
#[cfg(feature = "std")]
"WriteSocket" => WriteSocket,
#[cfg(feature = "std")]
"WriteStderr" => WriteStderr,
#[cfg(feature = "std")]
"WriteStdout" => WriteStdout,
Expand Down Expand Up @@ -159,10 +171,16 @@ impl BlockInstantiation for BlockTag {
#[cfg(feature = "std")]
ReadFile => Box::new(super::ReadFile::with_system(system)),
#[cfg(feature = "std")]
//todo: evren add param
ReadSocket => Box::new(super::ReadSocket::<Any>::with_system(system, None)),
#[cfg(feature = "std")]
ReadStdin => Box::new(super::ReadStdin::with_system(system, None)),
#[cfg(feature = "std")]
WriteFile => Box::new(super::WriteFile::with_system(system, None)),
#[cfg(feature = "std")]
//todo: evren add param
WriteSocket => Box::new(super::WriteSocket::<Any>::with_system(system, None)),
#[cfg(feature = "std")]
WriteStderr => Box::new(super::WriteStderr::with_system(system)),
#[cfg(feature = "std")]
WriteStdout => Box::new(super::WriteStdout::with_system(system)),
Expand Down
37 changes: 35 additions & 2 deletions lib/protoflow-blocks/src/blocks/sys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ pub mod sys {
types::ByteSize,
BlockConnections, BlockInstantiation, InputPortName, OutputPortName, System,
};
use protoflow_core::Block;
use protoflow_core::{Block, Message};

pub trait SysBlocks {
fn read_dir(&mut self) -> ReadDir;
fn read_env(&mut self) -> ReadEnv;
fn read_file(&mut self) -> ReadFile;
fn read_socket<T: Message + 'static>(&mut self) -> ReadSocket<T>;
fn read_stdin(&mut self) -> ReadStdin;
fn write_file(&mut self) -> WriteFile;
fn write_socket<T: Message + 'static>(&mut self) -> WriteSocket<T>;
fn write_stderr(&mut self) -> WriteStderr;
fn write_stdout(&mut self) -> WriteStdout;
}
Expand All @@ -31,8 +33,10 @@ pub mod sys {
ReadDir,
ReadEnv,
ReadFile,
ReadSocket,
ReadStdin,
WriteFile,
WriteSocket,
WriteStderr,
WriteStdout,
}
Expand All @@ -55,6 +59,11 @@ pub mod sys {
output: OutputPortName,
},

ReadSocket {
output: OutputPortName,
config: ReadSocketConfig,
},

ReadStdin {
output: OutputPortName,
buffer_size: Option<ByteSize>,
Expand All @@ -66,6 +75,11 @@ pub mod sys {
flags: Option<WriteFlags>,
},

WriteSocket {
input: InputPortName,
config: WriteSocketConfig,
},

WriteStderr {
input: InputPortName,
},
Expand All @@ -82,8 +96,10 @@ pub mod sys {
ReadDir { .. } => "ReadDir",
ReadEnv { .. } => "ReadEnv",
ReadFile { .. } => "ReadFile",
ReadSocket { .. } => "ReadSocket",
ReadStdin { .. } => "ReadStdin",
WriteFile { .. } => "WriteFile",
WriteSocket { .. } => "WriteSocket",
WriteStderr { .. } => "WriteStderr",
WriteStdout { .. } => "WriteStdout",
})
Expand All @@ -97,10 +113,13 @@ pub mod sys {
ReadDir { output, .. }
| ReadEnv { output, .. }
| ReadFile { output, .. }
| ReadSocket { output, .. }
| ReadStdin { output, .. } => {
vec![("output", Some(output.clone()))]
}
WriteFile { .. } | WriteStderr { .. } | WriteStdout { .. } => vec![],
WriteFile { .. } | WriteSocket { .. } | WriteStderr { .. } | WriteStdout { .. } => {
vec![]
}
}
}
}
Expand All @@ -112,10 +131,18 @@ pub mod sys {
ReadDir { .. } => Box::new(super::ReadDir::with_system(system)),
ReadEnv { .. } => Box::new(super::ReadEnv::<String>::with_system(system)),
ReadFile { .. } => Box::new(super::ReadFile::with_system(system)),
ReadSocket { config, .. } => Box::new(super::ReadSocket::<String>::with_system(
system,
Some(config.clone()),
)),
ReadStdin { buffer_size, .. } => {
Box::new(super::ReadStdin::with_system(system, *buffer_size))
}
WriteFile { flags, .. } => Box::new(super::WriteFile::with_system(system, *flags)),
WriteSocket { config, .. } => Box::new(super::WriteSocket::<String>::with_system(
system,
Some(config.clone()),
)),
WriteStderr { .. } => Box::new(super::WriteStderr::with_system(system)),
WriteStdout { .. } => Box::new(super::WriteStdout::with_system(system)),
}
Expand All @@ -131,12 +158,18 @@ pub mod sys {
mod read_file;
pub use read_file::*;

mod read_socket;
pub use read_socket::*;

mod read_stdin;
pub use read_stdin::*;

mod write_file;
pub use write_file::*;

mod write_socket;
pub use write_socket::*;

mod write_stderr;
pub use write_stderr::*;

Expand Down
Loading