From 6e635388b8cdf8471c1934da622eeac12a77021b Mon Sep 17 00:00:00 2001 From: Raul Victor Trombin Date: Mon, 21 Oct 2024 13:14:34 -0300 Subject: [PATCH] src: device: Fix: Add task handles to manage and free resource --- src/device.rs | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/device.rs b/src/device.rs index b42ccc55c..119565b43 100644 --- a/src/device.rs +++ b/src/device.rs @@ -4,13 +4,16 @@ use futures::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, }; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::{ broadcast::{self, Sender}, mpsc::{self, Receiver}, }; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + task::JoinHandle, +}; use tokio_util::codec::{Decoder, Framed}; -use tracing::error; +use tracing::{error, info}; use crate::{ codec::PingCodec, @@ -28,6 +31,13 @@ pub use crate::ping360::Device as Ping360; pub struct Common { tx: mpsc::Sender, rx: broadcast::Receiver, + task_handles: TaskHandles, +} +#[derive(Debug)] + +struct TaskHandles { + stream_handle: JoinHandle<()>, + sink_handle: JoinHandle<()>, } impl Common { @@ -41,13 +51,17 @@ impl Common { // Prepare Serial receiver broadcast and sender let (broadcast_tx, broadcast_rx) = broadcast::channel::(100); - tokio::spawn(Self::stream(serial_stream, broadcast_tx)); + let stream_handle = tokio::spawn(Self::stream(serial_stream, broadcast_tx)); let (sender, sender_rx) = mpsc::channel::(100); - tokio::spawn(Self::sink(serial_sink, sender_rx)); + let sink_handle = tokio::spawn(Self::sink(serial_sink, sender_rx)); Common { tx: sender, rx: broadcast_rx, + task_handles: TaskHandles { + stream_handle, + sink_handle, + }, } } @@ -92,6 +106,14 @@ impl Common { } } +impl Drop for Common { + fn drop(&mut self) { + self.task_handles.stream_handle.abort(); + self.task_handles.sink_handle.abort(); + info!("TaskHandles sink and stream dropped, tasks aborted"); + } +} + pub trait PingDevice { fn get_common(&self) -> &Common;