diff --git a/Cargo.toml b/Cargo.toml index 74ef34a..c237313 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,3 +54,5 @@ serde_json = { version = "1.0", optional = true } thiserror = "1.0.48" uuid = "1.6.1" btleplug = "0.11.5" +fern = { version = "0.6.2", features = ["colored"] } +humantime = "2.1.0" diff --git a/examples/basic_serial.rs b/examples/basic_serial.rs index 5a8343e..9df7c56 100644 --- a/examples/basic_serial.rs +++ b/examples/basic_serial.rs @@ -4,12 +4,34 @@ extern crate meshtastic; use std::io::{self, BufRead}; +use std::time::SystemTime; use meshtastic::api::StreamApi; use meshtastic::utils; +/// Set up the logger to output to the console and to a file. +fn setup_logger() -> Result<(), fern::InitError> { + fern::Dispatch::new() + .format(|out, message, record| { + out.finish(format_args!( + "[{} {} {}] {}", + humantime::format_rfc3339_seconds(SystemTime::now()), + record.level(), + record.target(), + message + )) + }) + .level(log::LevelFilter::Trace) + .chain(std::io::stdout()) + .apply()?; + + Ok(()) +} + #[tokio::main] async fn main() -> Result<(), Box> { + setup_logger()?; + let stream_api = StreamApi::new(); let available_ports = utils::stream::available_serial_ports()?; @@ -32,8 +54,8 @@ async fn main() -> Result<(), Box> { // This loop can be broken with ctrl+c, or by disconnecting // the attached serial port. - while let Some(decoded) = decoded_listener.recv().await { - println!("Received: {:?}", decoded); + while let Some(_decoded) = decoded_listener.recv().await { + // println!("Received: {:?}", decoded); } // Note that in this specific example, this will only be called when diff --git a/src/connections/handlers.rs b/src/connections/handlers.rs index b1ea654..f464856 100644 --- a/src/connections/handlers.rs +++ b/src/connections/handlers.rs @@ -48,6 +48,19 @@ where let mut read_stream = read_stream; + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let handle = tokio::spawn(async move { + loop { + tokio::select! { + _ = rx.recv() => {} + _ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => { + error!("Didn't receive a message on read handler for 60s"); + } + } + } + }); + loop { let mut buffer = [0u8; 1024]; match read_stream.read(&mut buffer).await { @@ -73,9 +86,13 @@ where )); } } + + tx.send("hello there").expect("send failed"); } - // trace!("Read handler finished"); + handle.abort(); + + trace!("Read handler finished"); // Return type should be never (!) } @@ -116,6 +133,19 @@ where { debug!("Started write handler"); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let handle = tokio::spawn(async move { + loop { + tokio::select! { + _ = rx.recv() => {} + _ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => { + error!("Didn't receive a message on write handler for 60s"); + } + } + } + }); + while let Some(message) = write_input_rx.recv().await { trace!("Writing packet data: {:?}", message); @@ -127,8 +157,12 @@ where }, )); } + + tx.send("hello there").expect("send failed"); } + handle.abort(); + debug!("Write handler finished"); Ok(()) @@ -163,10 +197,26 @@ async fn start_processing_handler( let mut buffer = StreamBuffer::new(decoded_packet_tx); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let handle = tokio::spawn(async move { + loop { + tokio::select! { + _ = rx.recv() => {} + _ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => { + error!("Didn't receive a message on processing handler for 60s"); + } + } + } + }); + while let Some(message) = read_output_rx.recv().await { trace!("Processing {} bytes from radio", message.data().len()); buffer.process_incoming_bytes(message); + tx.send("hello there").expect("send failed"); } + handle.abort(); + trace!("Processing read_output_rx channel closed"); }