diff --git a/Cargo.lock b/Cargo.lock index 8e011fb..4876c26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -298,7 +298,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "bfsp" version = "0.1.0" -source = "git+https://github.com/Billy-s-E2EE-File-Server/bfsp.git#8922db525a1c3fb742a1843dc6e9565457a15caf" +source = "git+https://github.com/Billy-s-E2EE-File-Server/bfsp.git#9a6dde7a3cd95ed047bcfa2a3904b0b99b09705e" dependencies = [ "anyhow", "argon2", diff --git a/src/internal.rs b/src/internal.rs index f7dcd19..f275761 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -1,11 +1,11 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use bfsp::internal::get_action_resp::ActionsPerUser; -use bfsp::internal::internal_file_server_message::{DeleteQueuedAction, Message}; +use bfsp::internal::get_queued_action_resp::{Actions, ActionsPerUser}; +use bfsp::internal::internal_file_server_message::Message; use bfsp::internal::{ - get_action_resp, queue_action_resp, DeleteQueuedActionResp, GetActionResp, GetStorageCapResp, - GetSuspensionsResp, QueueActionResp, SuspendUsersResp, + get_queued_action_resp, queue_action_resp, DeleteQueuedActionResp, GetQueuedActionResp, + GetStorageCapResp, GetSuspensionsResp, QueueActionResp, SuspendUsersResp, }; use bfsp::{ chacha20poly1305::XChaCha20Poly1305, @@ -86,12 +86,18 @@ async fn handle_internal_message( } .encode_to_vec() } - Message::GetAction(args) => { + Message::GetQueuedActions(args) => { let user_ids: HashSet = args.user_ids.into_iter().collect(); - let actions = meta_db.get_actions_for_users(user_ids).await.unwrap(); - - GetActionResp { - response: Some(get_action_resp::Response::Actions(ActionsPerUser { + let actions: HashMap = meta_db + .get_actions_for_users(user_ids) + .await + .unwrap() + .into_iter() + .map(|(user_id, actions)| (user_id, Actions { actions })) + .collect(); + + GetQueuedActionResp { + response: Some(get_queued_action_resp::Response::Actions(ActionsPerUser { action_info: actions, })), } diff --git a/src/main.rs b/src/main.rs index 8732bc5..f4ab07c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -245,7 +245,7 @@ async fn main() -> Result<()> { } } -#[tracing::instrument(err, skip(bytes))] +#[tracing::instrument(err, skip(bytes, meta_db, chunk_db))] async fn handle_http_connection( bytes: Bytes, meta_db: Arc, @@ -267,7 +267,7 @@ async fn handle_http_connection( .unwrap()) } -#[tracing::instrument(skip(incoming_session, public_key))] +#[tracing::instrument(skip(incoming_session, public_key, meta_db, chunk_db))] async fn handle_connection( incoming_session: IncomingSession, public_key: PublicKey, diff --git a/src/meta_db.rs b/src/meta_db.rs index a0ddd76..5b06b56 100644 --- a/src/meta_db.rs +++ b/src/meta_db.rs @@ -136,7 +136,7 @@ pub trait MetaDB: Sized + Send + Sync + std::fmt::Debug { fn get_actions_for_users( &self, user_ids: HashSet, - ) -> impl Future>> + Send; + ) -> impl Future>>> + Send; } #[derive(Debug)] @@ -778,51 +778,53 @@ FROM async fn get_actions_for_users( &self, user_ids: HashSet, - ) -> Result> { - let mut query = - QueryBuilder::new("select id, action, user_id, execute_at, status where user_id in ("); + ) -> Result>> { + let mut query = QueryBuilder::new( + "select id, action, user_id, execute_at, status from queued_actions where user_id in (", + ); let mut separated = query.separated(","); for id in user_ids.iter() { separated.push_bind(id); } - query.push(") group by user_id"); + query.push(");"); + + let mut actions: HashMap> = HashMap::new(); - let actions = query + query .build() .fetch_all(&self.pool) .await? .into_iter() - .map(|row| { + .for_each(|row| { let id: i32 = row.get("id"); let action: String = row.get("action"); let user_id: i64 = row.get("user_id"); let execute_at: OffsetDateTime = row.get("execute_at"); let status: String = row.get("status"); - ( + let action_info = ActionInfo { + id: Some(id), + action, + execute_at: Some( + prost_types::Timestamp::date_time_nanos( + execute_at.year().into(), + execute_at.month().into(), + execute_at.day(), + execute_at.hour(), + execute_at.minute(), + execute_at.second(), + execute_at.nanosecond(), + ) + .unwrap(), + ), + status, user_id, - ActionInfo { - id: Some(id), - action, - execute_at: Some( - prost_types::Timestamp::date_time_nanos( - execute_at.year().into(), - execute_at.month().into(), - execute_at.day(), - execute_at.hour(), - execute_at.minute(), - execute_at.second(), - execute_at.nanosecond(), - ) - .unwrap(), - ), - status, - user_id, - }, - ) - }) - .collect(); + }; + + let actions = actions.entry(user_id).or_insert_with(|| Vec::new()); + actions.push(action_info); + }); Ok(actions) }