Skip to content

Commit

Permalink
implement timeout for flux
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacherr committed Jul 3, 2024
1 parent dd05898 commit f349b73
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 44 deletions.
1 change: 1 addition & 0 deletions assyst-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ urlencoding = "2.1.3"
twilight-util = { version = "=0.16.0-rc.1", features = ["builder"] }
dash_vm = { git = "https://github.com/y21/dash" }
dash_rt = { git = "https://github.com/y21/dash" }
libc = "0.2.155"
2 changes: 1 addition & 1 deletion assyst-core/src/command/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<T: ParseArgument> ParseArgument for Option<T> {
fn usage(name: &str) -> String {
let as_required = T::usage(name);
// this is hacky maybe
return format!("[{}]", &as_required[1..as_required.len() - 1]);
format!("[{}]", &as_required[1..as_required.len() - 1])
}
}

Expand Down
5 changes: 4 additions & 1 deletion assyst-core/src/command/image/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ pub async fn ahshit(ctxt: CommandCtxt<'_>, source: Image) -> anyhow::Result<()>
send_processing = true
)]
pub async fn aprilfools(ctxt: CommandCtxt<'_>, source: Image) -> anyhow::Result<()> {
let result = ctxt.wsi_handler().aprilfools(source.0).await?;
let result = ctxt
.wsi_handler()
.aprilfools(source.0, ctxt.data.author.id.get())
.await?;

ctxt.reply(result).await?;

Expand Down
36 changes: 22 additions & 14 deletions assyst-core/src/flux_handler/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@ impl FluxHandler {
pub async fn ahshit(&self, media: Vec<u8>, user_id: u64) -> FluxResult {
let tier = self.get_request_tier(user_id).await?;

let mut request = FluxRequest::new_with_input_and_limits(media, &LIMITS[tier]);
let limits = &LIMITS[tier];
let mut request = FluxRequest::new_with_input_and_limits(media, limits);
request.operation("ah-shit".to_owned(), HashMap::new());
request.output();

self.run_flux(request).await
self.run_flux(request, limits.time).await
}

pub async fn aprilfools(&self, media: Vec<u8>) -> FluxResult {
pub async fn aprilfools(&self, media: Vec<u8>, user_id: u64) -> FluxResult {
let tier = self.get_request_tier(user_id).await?;

let mut request = FluxRequest::new();
request.input(media);
request.operation("april-fools".to_owned(), HashMap::new());
request.output();

self.run_flux(request).await
self.run_flux(request, LIMITS[tier].time).await
}

pub async fn bloom(
Expand All @@ -37,7 +40,8 @@ impl FluxHandler {
) -> FluxResult {
let tier = self.get_request_tier(user_id).await?;

let mut request = FluxRequest::new_with_input_and_limits(media, &LIMITS[tier]);
let limits = &LIMITS[tier];
let mut request = FluxRequest::new_with_input_and_limits(media, limits);

let mut options = HashMap::new();
if let Some(r) = radius {
Expand All @@ -53,13 +57,14 @@ impl FluxHandler {
request.operation("bloom".to_owned(), options);
request.output();

self.run_flux(request).await
self.run_flux(request, limits.time).await
}

pub async fn blur(&self, media: Vec<u8>, power: Option<f32>, user_id: u64) -> FluxResult {
let tier = self.get_request_tier(user_id).await?;

let mut request = FluxRequest::new_with_input_and_limits(media, &LIMITS[tier]);
let limits = &LIMITS[tier];
let mut request = FluxRequest::new_with_input_and_limits(media, limits);

let mut options = HashMap::new();
if let Some(p) = power {
Expand All @@ -69,27 +74,29 @@ impl FluxHandler {
request.operation("blur".to_owned(), options);
request.output();

self.run_flux(request).await
self.run_flux(request, limits.time).await
}

pub async fn caption(&self, media: Vec<u8>, text: String, user_id: u64) -> FluxResult {
let tier = self.get_request_tier(user_id).await?;

let mut request = FluxRequest::new_with_input_and_limits(media, &LIMITS[tier]);
let limits = &LIMITS[tier];
let mut request = FluxRequest::new_with_input_and_limits(media, limits);

let mut options = HashMap::new();
options.insert("text".to_owned(), text);

request.operation("caption".to_owned(), options);
request.output();

self.run_flux(request).await
self.run_flux(request, limits.time).await
}

pub async fn resize_absolute(&self, media: Vec<u8>, width: u32, height: u32, user_id: u64) -> FluxResult {
let tier = self.get_request_tier(user_id).await?;

let mut request = FluxRequest::new_with_input_and_limits(media, &LIMITS[tier]);
let limits = &LIMITS[tier];
let mut request = FluxRequest::new_with_input_and_limits(media, limits);

let mut options = HashMap::new();
options.insert("width".to_owned(), width.to_string());
Expand All @@ -98,20 +105,21 @@ impl FluxHandler {
request.operation("resize".to_owned(), options);
request.output();

self.run_flux(request).await
self.run_flux(request, limits.time).await
}

pub async fn resize_scale(&self, media: Vec<u8>, scale: f32, user_id: u64) -> FluxResult {
let tier = self.get_request_tier(user_id).await?;

let mut request = FluxRequest::new_with_input_and_limits(media, &LIMITS[tier]);
let limits = &LIMITS[tier];
let mut request = FluxRequest::new_with_input_and_limits(media, limits);

let mut options = HashMap::new();
options.insert("scale".to_owned(), scale.to_string());

request.operation("resize".to_owned(), options);
request.output();

self.run_flux(request).await
self.run_flux(request, limits.time).await
}
}
23 changes: 20 additions & 3 deletions assyst-core/src/flux_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use assyst_database::model::free_tier_2_requests::FreeTier2Requests;
use assyst_database::DatabaseHandler;
use flux_request::{FluxRequest, FluxStep};
use jobs::FluxResult;
use libc::pid_t;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::fs;
use tokio::process::Command;
use tokio::signal::unix::Signal;
use tokio::time::timeout;

pub mod flux_request;
pub mod jobs;
Expand Down Expand Up @@ -39,7 +42,7 @@ impl FluxHandler {
}
}

pub async fn run_flux(&self, request: FluxRequest) -> FluxResult {
pub async fn run_flux(&self, request: FluxRequest, time_limit: Duration) -> FluxResult {
let mut input_file_paths: Vec<String> = vec![];
let mut output_file_path: String = String::new();
let mut args: Vec<String> = vec![];
Expand Down Expand Up @@ -113,7 +116,21 @@ impl FluxHandler {
command.args(args);
command.current_dir(flux_workspace_root);
command.env("LD_LIBRARY_PATH", LD_LIBRARY_PATH);
let output = command.output().await.context("Failed to execute flux")?;
let spawn = command.spawn().context("Failed to execute flux")?;
let id = spawn.id();
let output = timeout(time_limit, spawn.wait_with_output()).await;

let output = (match output {
Ok(o) => o,
Err(_) => {
// send SIGTERM to flux to clean up child processes
if let Some(id) = id {
unsafe { libc::kill(id as pid_t, libc::SIGTERM) };
};
bail!("The operation timed out");
},
})
.context("Failed to execute flux")?;

if !output.status.success() {
bail!(
Expand Down
20 changes: 0 additions & 20 deletions assyst-core/src/gateway_handler/message_parser/preprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ use crate::gateway_handler::message_parser::error::PreParseError;
pub struct PreprocessResult {
/// The command prefix used in this message.
pub prefix: String,
/// The owner of the guild, if the command was ran in a guild.
pub guild_owner: Option<u64>,
/// If the command was ran in the bot's DMs.
pub is_in_dm: bool,
/// Time taken to determine the prefix.
pub prefixing_determinism_time: Duration,
}
Expand Down Expand Up @@ -140,24 +136,8 @@ pub async fn preprocess(assyst: ThreadSafeAssyst, message: &Message) -> Result<P
return Err(PreParseError::UserGloballyBlacklisted(message.author.id.get()));
}

// fetch guild command restrictions and check the ones we can (any that have "all" feature
// restriction) - server owner bypasses all restrictions so we check if user owns the server here
let guild_owner = if !is_in_dm {
Some(
assyst
.rest_cache_handler
.get_guild_owner(message.guild_id.unwrap().get())
.await
.map_err(|x| PreParseError::Failure(format!("failed to get guild owner: {x}")))?,
)
} else {
None
};

Ok(PreprocessResult {
prefix: parsed_prefix,
guild_owner,
is_in_dm,
prefixing_determinism_time: prefix_time,
})
}
4 changes: 0 additions & 4 deletions assyst-core/src/replies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ impl Replies {
self.1.insert(id, ());
}

pub fn remove_interaction_command(&self, id: u64) -> Option<()> {
self.1.remove(&id)
}

pub fn get_interaction_command(&self, id: u64) -> Option<()> {
self.1.get(&id)
}
Expand Down
6 changes: 5 additions & 1 deletion assyst-core/src/rest/bad_translation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ pub enum TranslateError {
impl Display for TranslateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TranslateError::Reqwest(_) => write!(f, "A network error occurred"),
TranslateError::Reqwest(e) => write!(
f,
"A network error occurred: {}",
e.to_string().replace(&CONFIG.urls.bad_translation, "[bt]")
),
TranslateError::Raw(s) => write!(f, "{}", s),
}
}
Expand Down

0 comments on commit f349b73

Please sign in to comment.