Skip to content

Commit

Permalink
Better handling of concurrent UID map sync requests (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Jul 24, 2023
1 parent 9725aa7 commit a372865
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 13 deletions.
3 changes: 2 additions & 1 deletion crates/imap/src/core/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use jmap_proto::{
use parking_lot::Mutex;
use store::query::log::{Change, Query};
use tokio::io::AsyncRead;
use utils::listener::limiter::InFlight;
use utils::{listener::limiter::InFlight, map::mutex_map::MutexMap};

use super::{Account, Mailbox, MailboxId, MailboxSync, Session, SessionData};

Expand All @@ -31,6 +31,7 @@ impl SessionData {
span: session.span.clone(),
mailboxes: Mutex::new(vec![]),
state: access_token.state().into(),
mailbox_locks: MutexMap::with_capacity(5),
in_flight,
};

Expand Down
5 changes: 4 additions & 1 deletion crates/imap/src/core/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::core::ImapId;

use super::{MailboxId, MailboxState, NextMailboxState, SelectedMailbox, SessionData};

const MAX_RETRIES: usize = 50;
const MAX_RETRIES: usize = 10;

#[derive(Debug)]
struct UidMap {
Expand Down Expand Up @@ -68,6 +68,9 @@ impl SessionData {
let mut try_count = 0;

loop {
// Acquire lock on the mailbox
let _guard = self.mailbox_locks.lock_hash(mailbox).await;

// Deserialize mailbox data
let uid_map = self
.jmap
Expand Down
5 changes: 4 additions & 1 deletion crates/imap/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use tokio::{
use utils::{
config::Rate,
listener::{limiter::InFlight, ServerInstance},
map::mutex_map::MutexMap,
};

pub mod client;
Expand All @@ -75,6 +76,7 @@ pub struct IMAP {
pub name_shared: String,
pub name_all: String,
pub allow_plain_auth: bool,
pub enable_uidplus: bool,

pub timeout_auth: Duration,
pub timeout_unauth: Duration,
Expand Down Expand Up @@ -111,6 +113,7 @@ pub struct SessionData {
pub imap: Arc<IMAP>,
pub span: tracing::Span,
pub mailboxes: parking_lot::Mutex<Vec<Account>>,
pub mailbox_locks: MutexMap<()>,
pub writer: mpsc::Sender<writer::Event>,
pub state: AtomicU32,
pub in_flight: InFlight,
Expand Down Expand Up @@ -147,7 +150,7 @@ pub struct SelectedMailbox {
pub is_condstore: bool,
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct MailboxId {
pub account_id: u32,
pub mailbox_id: Option<u32>,
Expand Down
1 change: 1 addition & 0 deletions crates/imap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl IMAP {
rate_requests: config.property_or_static("imap.rate-limit.requests", "2000/1m")?,
rate_concurrent: config.property("imap.rate-limit.concurrent")?.unwrap_or(4),
allow_plain_auth: config.property_or_static("imap.auth.allow-plain-text", "false")?,
enable_uidplus: config.property_or_static("imap.auth.protocol.uidplus", "true")?,
}))
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/imap/src/op/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl SessionData {
.await;
}

if !created_ids.is_empty() {
if !created_ids.is_empty() && self.imap.enable_uidplus {
let (uids, uid_validity) = match selected_mailbox {
Some(selected_mailbox) if selected_mailbox.id == mailbox => {
self.write_mailbox_changes(&selected_mailbox, is_qresync)
Expand Down
6 changes: 6 additions & 0 deletions crates/utils/src/listener/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,9 @@ impl ConcurrencyLimiter {
self.concurrent.load(Ordering::Relaxed) > 0
}
}

impl InFlight {
pub fn num_concurrent(&self) -> u64 {
self.concurrent.load(Ordering::Relaxed)
}
}
1 change: 1 addition & 0 deletions crates/utils/src/map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@
*/

pub mod bitmap;
pub mod mutex_map;
pub mod ttl_dashmap;
pub mod vec_map;
88 changes: 88 additions & 0 deletions crates/utils/src/map/mutex_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact [email protected]
* for more details.
*/

use core::hash::Hash;
use std::hash::Hasher;

use ahash::AHasher;
use tokio::sync::{Mutex, MutexGuard};

pub struct MutexMap<T: Default> {
map: Box<[Mutex<T>]>,
mask: u64,
hasher: AHasher,
}

pub struct MutexMapLockError;
pub type Result<T> = std::result::Result<T, MutexMapLockError>;

#[allow(clippy::mutex_atomic)]
impl<T: Default> MutexMap<T> {
pub fn with_capacity(size: usize) -> MutexMap<T> {
let size = size.next_power_of_two();
MutexMap {
map: (0..size)
.map(|_| T::default().into())
.collect::<Vec<Mutex<T>>>()
.into_boxed_slice(),
mask: (size - 1) as u64,
hasher: AHasher::default(),
}
}

pub async fn lock<U>(&self, key: U) -> MutexGuard<'_, T>
where
U: Into<u64> + Copy,
{
let hash = key.into() & self.mask;
self.map[hash as usize].lock().await
}

/*pub async fn try_lock<U>(&self, key: U, timeout: Duration) -> Option<MutexGuard<'_, T>>
where
U: Into<u64> + Copy,
{
let hash = key.into() & self.mask;
self.map[hash as usize].try_lock(timeout).await
}*/

pub async fn lock_hash<U>(&self, key: U) -> MutexGuard<'_, T>
where
U: Hash,
{
let mut hasher = self.hasher.clone();
key.hash(&mut hasher);
let hash = hasher.finish() & self.mask;
self.map[hash as usize].lock().await
}

/*pub async fn try_lock_hash<U>(&self, key: U, timeout: Duration) -> Option<MutexGuard<'_, T>>
where
U: Hash,
{
let mut hasher = self.hasher.clone();
key.hash(&mut hasher);
let hash = hasher.finish() & self.mask;
self.map[hash as usize].try_lock_for(timeout).await
}*/
}
6 changes: 3 additions & 3 deletions resources/config/imap.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
#############################################

[server.listener."imap"]
bind = ["0.0.0.0:143"]
bind = ["[::]:143"]
protocol = "imap"

[server.listener."imaptls"]
bind = ["0.0.0.0:993"]
bind = ["[::]:993"]
protocol = "imap"
tls.implicit = true

[server.listener."sieve"]
bind = ["0.0.0.0:4190"]
bind = ["[::]:4190"]
protocol = "managesieve"
tls.implicit = true

Expand Down
2 changes: 1 addition & 1 deletion resources/config/jmap.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#############################################

[server.listener."jmap"]
bind = ["0.0.0.0:8080"]
bind = ["[::]:8080"]
url = "https://__HOST__:8080"
protocol = "jmap"

Expand Down
6 changes: 3 additions & 3 deletions resources/config/smtp.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
#############################################

[server.listener."smtp"]
bind = ["0.0.0.0:25"]
bind = ["[::]:25"]
greeting = "Stalwart SMTP at your service"
protocol = "smtp"

[server.listener."submission"]
bind = ["0.0.0.0:587"]
bind = ["[::]:587"]
protocol = "smtp"

[server.listener."submissions"]
bind = ["0.0.0.0:465"]
bind = ["[::]:465"]
protocol = "smtp"
tls.implicit = true

Expand Down
4 changes: 2 additions & 2 deletions tests/resources/test_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ certificate = "default"

[global.tracing]
method = "stdout"
level = "trace"
level = "info"

[session.ehlo]
reject-non-fqdn = false
Expand Down Expand Up @@ -196,5 +196,5 @@ refresh-token-renew = "2s"
allow-plain-text = true

[imap.rate-limit]
rate = "10000/1s"
requests = "90000/1s"
concurrent = 9000

0 comments on commit a372865

Please sign in to comment.