Skip to content

Commit

Permalink
fix: reduce memory usage by a ton
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Sep 2, 2024
1 parent fac2066 commit 27c1e92
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ resolver = "2"


[workspace.package]
version = "0.1.71"
version = "0.1.72"
authors = ["louis030195 <[email protected]>"]
description = ""
repository = "https://github.com/mediar-ai/screenpipe"
Expand Down
2 changes: 1 addition & 1 deletion examples/apps/screenpipe-app-tauri/src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "screenpipe-app"
version = "0.1.79"
version = "0.1.80"
description = ""
authors = ["you"]
license = ""
Expand Down
58 changes: 41 additions & 17 deletions screenpipe-audio/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::Serialize;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::Duration;
use std::{fmt, thread};
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn run_ffmpeg(
sample_rate: u32,
channels: u16,
output_path: &PathBuf,
is_running: Arc<AtomicBool>,
is_running: Weak<AtomicBool>,
duration: Duration,
) -> Result<()> {
debug!("Starting FFmpeg process");
Expand Down Expand Up @@ -218,7 +218,10 @@ async fn run_ffmpeg(
let mut stdin = ffmpeg.stdin.take().expect("Failed to open stdin");
let start_time = std::time::Instant::now();

while is_running.load(Ordering::Relaxed) {
while is_running
.upgrade()
.map_or(false, |arc| arc.load(Ordering::Relaxed))
{
tokio::select! {
Some(data) = rx.recv() => {
if start_time.elapsed() >= duration {
Expand Down Expand Up @@ -254,6 +257,7 @@ async fn run_ffmpeg(
if !status.success() {
error!("FFmpeg process failed with status: {}", status);
error!("FFmpeg stderr: {}", stderr);
// ? ffmpeg.kill().await?;
return Err(anyhow!("FFmpeg process failed"));
}

Expand All @@ -277,10 +281,10 @@ pub async fn record_and_transcribe(

// TODO: consider a lock-free ring buffer like crossbeam_queue::ArrayQueue (ask AI why)
let (tx, rx) = mpsc::channel(100); // For audio data
let is_running_clone = Arc::clone(&is_running);
let is_running_clone_2 = is_running.clone();
let is_running_clone_3 = is_running.clone();
let is_running_clone_4 = is_running.clone();
let is_running_weak = Arc::downgrade(&is_running);
let is_running_weak_2 = Arc::downgrade(&is_running);
let is_running_weak_3 = Arc::downgrade(&is_running);
let is_running_weak_4 = Arc::downgrade(&is_running);

let output_path_clone = Arc::new(output_path);
let output_path_clone_2 = Arc::clone(&output_path_clone);
Expand All @@ -290,7 +294,9 @@ pub async fn record_and_transcribe(
error!("An error occurred on the audio stream: {}", err);
if err.to_string().contains("device is no longer valid") {
warn!("Audio device disconnected. Stopping recording.");
is_running_clone_2.store(false, Ordering::Relaxed);
if let Some(arc) = is_running_weak_2.upgrade() {
arc.store(false, Ordering::Relaxed);
}
}
};
// Spawn a thread to handle the non-Send stream
Expand All @@ -299,7 +305,10 @@ pub async fn record_and_transcribe(
cpal::SampleFormat::I8 => cpal_audio_device.build_input_stream(
&config.into(),
move |data: &[i8], _: &_| {
if is_running_clone_3.load(Ordering::Relaxed) {
if is_running_weak_3
.upgrade()
.map_or(false, |arc| arc.load(Ordering::Relaxed))
{
let _ = tx.blocking_send(bytemuck::cast_slice(data).to_vec());
}
},
Expand All @@ -309,7 +318,10 @@ pub async fn record_and_transcribe(
cpal::SampleFormat::I16 => cpal_audio_device.build_input_stream(
&config.into(),
move |data: &[i16], _: &_| {
if is_running_clone_3.load(Ordering::Relaxed) {
if is_running_weak_3
.upgrade()
.map_or(false, |arc| arc.load(Ordering::Relaxed))
{
let _ = tx.blocking_send(bytemuck::cast_slice(data).to_vec());
}
},
Expand All @@ -319,7 +331,10 @@ pub async fn record_and_transcribe(
cpal::SampleFormat::I32 => cpal_audio_device.build_input_stream(
&config.into(),
move |data: &[i32], _: &_| {
if is_running_clone_3.load(Ordering::Relaxed) {
if is_running_weak_3
.upgrade()
.map_or(false, |arc| arc.load(Ordering::Relaxed))
{
let _ = tx.blocking_send(bytemuck::cast_slice(data).to_vec());
}
},
Expand All @@ -329,7 +344,10 @@ pub async fn record_and_transcribe(
cpal::SampleFormat::F32 => cpal_audio_device.build_input_stream(
&config.into(),
move |data: &[f32], _: &_| {
if is_running_clone_3.load(Ordering::Relaxed) {
if is_running_weak_3
.upgrade()
.map_or(false, |arc| arc.load(Ordering::Relaxed))
{
let _ = tx.blocking_send(bytemuck::cast_slice(data).to_vec());
}
},
Expand All @@ -342,13 +360,18 @@ pub async fn record_and_transcribe(
}
};

// ? drop(tx);

match stream {
Ok(s) => {
if let Err(e) = s.play() {
error!("Failed to play stream: {}", e);
}
// Keep the stream alive until the recording is done
while is_running_clone.load(Ordering::Relaxed) {
while is_running_weak
.upgrade()
.map_or(false, |arc| arc.load(Ordering::Relaxed))
{
std::thread::sleep(Duration::from_millis(100));
}
s.pause().ok();
Expand All @@ -365,15 +388,16 @@ pub async fn record_and_transcribe(
);

// Run FFmpeg in a separate task
let _ = run_ffmpeg(
let ffmpeg_handle = run_ffmpeg(
rx,
sample_rate,
channels,
&output_path_clone,
is_running_clone_4,
is_running_weak_4,
duration,
)
.await;
);

ffmpeg_handle.await?;

info!(
"Recording stopped, wrote to {}. Now triggering transcription",
Expand Down

0 comments on commit 27c1e92

Please sign in to comment.