Skip to content

Commit

Permalink
allow queueing specific actions
Browse files Browse the repository at this point in the history
we can now say "hey, run this action after $time". This will mostly be
used for suspending users after their free trial period is up in
big_money
  • Loading branch information
billyb2 committed Aug 30, 2024
1 parent c204d38 commit a96bbf8
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 39 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tokio = { version = "1", features = [
"rt-multi-thread",
] }
humantime = "2"
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "tls-rustls"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "tls-rustls", "time"] }
rand = "0.8"
wtransport = { version = "0.1" }
#bfsp = { path = "../bfsp" }
Expand Down
9 changes: 9 additions & 0 deletions migrations/20240824233210_queued_actions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
create table queued_actions (
id serial primary key not null,
action text not null,
user_id bigint not null,
execute_at timestamptz not null,
status text not null
);

CREATE INDEX id_user_id_status ON queued_actions (id, user_id, status);
130 changes: 130 additions & 0 deletions src/action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use anyhow::anyhow;
use bfsp::internal::ActionInfo;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tracing::error_span;
use tracing::{error, Level};

use rand::Rng;

use crate::{chunk_db::ChunkDB, meta_db::MetaDB};

#[derive(Debug)]
enum Action {
DeleteFiles,
SuspendRead,
SuspendWrite,
SuspendDelete,
SuspendQuery,
}

impl TryFrom<String> for Action {
type Error = anyhow::Error;

#[tracing::instrument(err)]
fn try_from(value: String) -> Result<Self, Self::Error> {
match value.as_str() {
"delete_files" => Ok(Self::DeleteFiles),
"suspend_read" => Ok(Self::SuspendRead),
"suspend_write" => Ok(Self::SuspendWrite),
"suspend_delete" => Ok(Self::SuspendDelete),
"suspend_query" => Ok(Self::SuspendQuery),
_ => Err(anyhow!("invalid action")),
}
}
}

pub async fn check_run_actions_loop<M: MetaDB + 'static, C: ChunkDB + 'static>(
meta_db: Arc<M>,
chunk_db: Arc<C>,
) {
loop {
tracing::span!(Level::INFO, "run_current_actions");

match meta_db
.list_actions(Some("pending".to_string()), true)
.await
{
Ok(actions) => {
for action_info in actions.into_iter() {
let meta_db = Arc::clone(&meta_db);
let chunk_db = Arc::clone(&chunk_db);

tokio::task::spawn(async move {
match run_action(Arc::clone(&meta_db), chunk_db, &action_info).await {
Ok(_) => {
let _ = meta_db.executed_action(action_info.id.unwrap()).await;
}
Err(err) => {
error!("Error running action: {err}");
}
}
});
}
}
Err(err) => {
error!("Error listing actions: {err}");
}
}

// random jitter to make servers less likely to run multiple actions at once
let jitter = rand::thread_rng().gen_range(-1.0..=1.0);
tokio::time::sleep(Duration::from_secs_f32(10.0 + jitter)).await;
}
}

#[tracing::instrument(err, skip(meta_db, chunk_db))]
async fn run_action<M: MetaDB, C: ChunkDB>(
meta_db: Arc<M>,
chunk_db: Arc<C>,
action_info: &ActionInfo,
) -> anyhow::Result<()> {
let action: Action = action_info.action.clone().try_into()?;
let user_id = action_info.user_id;

match action {
Action::DeleteFiles => {
meta_db.delete_all_meta(user_id).await?;
}
//FIXME: REPLACE ALL WITH JSON_INSERT FUNCTION!!!!!!
Action::SuspendRead => {
// FIXME: i'm so god damn sure i can just do something like suspension_info->read = true;
let suspensions = meta_db.suspensions(&[user_id]).await?;
let mut suspension = suspensions[&user_id];
suspension.read_suspended = true;
meta_db
.set_suspensions([(user_id, suspension)].into())
.await?;
}
Action::SuspendWrite => {
// FIXME: i'm so god damn sure i can just do something like suspension_info->read = true;
let suspensions = meta_db.suspensions(&[user_id]).await?;
let mut suspension = suspensions[&user_id];
suspension.write_suspended = true;
meta_db
.set_suspensions([(user_id, suspension)].into())
.await?;
}
Action::SuspendDelete => {
// FIXME: i'm so god damn sure i can just do something like suspension_info->read = true;
let suspensions = meta_db.suspensions(&[user_id]).await?;
let mut suspension = suspensions[&user_id];
suspension.delete_suspended = true;
meta_db
.set_suspensions([(user_id, suspension)].into())
.await?;
}
Action::SuspendQuery => {
// FIXME: i'm so god damn sure i can just do something like suspension_info->read = true;
let suspensions = meta_db.suspensions(&[user_id]).await?;
let mut suspension = suspensions[&user_id];
suspension.query_suspended = true;
meta_db
.set_suspensions([(user_id, suspension)].into())
.await?;
}
};

Ok(())
}
Loading

0 comments on commit a96bbf8

Please sign in to comment.