Skip to content

Commit

Permalink
Box FollowChange
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 13, 2024
1 parent 5495ae4 commit 221a8ea
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 110 deletions.
4 changes: 4 additions & 0 deletions src/domain/follow_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ impl FollowChange {
}
}

pub fn is_notifiable(&self) -> bool {
matches!(self.change_type, ChangeType::Followed)
}

#[cfg(test)]
pub fn with_friendly_follower(mut self, name: FriendlyId) -> Self {
self.friendly_follower = name;
Expand Down
19 changes: 8 additions & 11 deletions src/domain/followee_notification_factory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{ChangeType, FollowChange, NotificationMessage, MAX_FOLLOWERS_PER_BATCH};
use super::{FollowChange, NotificationMessage, MAX_FOLLOWERS_PER_BATCH};
use nostr_sdk::PublicKey;
use ordermap::OrderMap;
use std::fmt::Debug;
Expand All @@ -9,7 +9,7 @@ type Follower = PublicKey;
type Followee = PublicKey;

pub struct FolloweeNotificationFactory {
pub follow_changes: OrderMap<Follower, FollowChange>,
pub follow_changes: OrderMap<Follower, Box<FollowChange>>,
pub followee: Option<Followee>,
min_time_between_messages: Duration,
emptied_at: Option<Instant>,
Expand All @@ -25,7 +25,7 @@ impl FolloweeNotificationFactory {
}
}

pub fn insert(&mut self, follow_change: FollowChange) {
pub fn insert(&mut self, follow_change: Box<FollowChange>) {
match &self.followee {
Some(followee) => {
assert_eq!(
Expand Down Expand Up @@ -71,17 +71,14 @@ impl FolloweeNotificationFactory {
self.follow_changes.is_empty() && self.should_flush()
}

pub fn no_followers(&self) -> bool {
!self
.follow_changes
.iter()
.any(|(_, v)| matches!(v.change_type, ChangeType::Followed))
pub fn no_notifiables(&self) -> bool {
!self.follow_changes.iter().any(|(_, v)| v.is_notifiable())
}

// Only followers are accumulated into messages, unfollowers are not, but
// all of them are drained
pub fn flush(&mut self) -> Vec<NotificationMessage> {
if self.no_followers() {
if self.no_notifiables() {
return vec![];
}

Expand All @@ -92,8 +89,8 @@ impl FolloweeNotificationFactory {
.follow_changes
.drain(..)
.map(|(_, v)| v)
.filter(|v| matches!(v.change_type, ChangeType::Followed))
.collect::<Vec<FollowChange>>()
.filter(|v| v.is_notifiable())
.collect::<Vec<Box<FollowChange>>>()
.chunks(MAX_FOLLOWERS_PER_BATCH)
.map(|batch| batch.to_vec().into())
.collect();
Expand Down
21 changes: 12 additions & 9 deletions src/domain/follows_differ.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ where
{
repo: Arc<T>,
nostr_client: Arc<U>,
follow_change_sender: Sender<FollowChange>,
follow_change_sender: Sender<Box<FollowChange>>,
}

#[async_trait]
Expand Down Expand Up @@ -115,7 +115,7 @@ where
pub fn new(
repo: Arc<T>,
nostr_client: Arc<U>,
follow_change_sender: Sender<FollowChange>,
follow_change_sender: Sender<Box<FollowChange>>,
) -> Self {
Self {
repo,
Expand Down Expand Up @@ -225,7 +225,7 @@ where
}

fn send_follow_change(&self, follow_change: FollowChange) -> Result<()> {
self.follow_change_sender.send(follow_change)?;
self.follow_change_sender.send(Box::new(follow_change))?;
Ok(())
}
}
Expand Down Expand Up @@ -395,6 +395,7 @@ mod tests {
use super::*;
use crate::domain::contact_list_follow::ContactListFollow;
use crate::repo::RepoError;
use assertables::*;
use chrono::{Duration, Utc};
use nostr_sdk::PublicKey;
use std::borrow::Cow;
Expand Down Expand Up @@ -955,13 +956,15 @@ mod tests {
assert!(!has_nos_agent(&event_with_no_tag));
}

async fn assert_follow_changes(contact_events: Vec<Event>, mut expected: Vec<FollowChange>) {
async fn assert_follow_changes(contact_events: Vec<Event>, expected: Vec<FollowChange>) {
let follow_changes = get_follow_changes_from_contact_events(contact_events)
.await
.unwrap();
.unwrap()
.into_iter()
.map(|fc| *fc)
.collect::<Vec<FollowChange>>();

expected.sort(); // Sort the expected follow changes
assert_eq!(follow_changes, expected);
assert_bag_eq!(follow_changes, expected);
}

fn create_contact_event(
Expand Down Expand Up @@ -1000,7 +1003,7 @@ mod tests {

async fn get_follow_changes_from_contact_events(
contact_events: Vec<Event>,
) -> Result<Vec<FollowChange>> {
) -> Result<Vec<Box<FollowChange>>> {
let (follow_change_sender, _) = channel(100);
let repo = Arc::new(MockRepo::default());
let follows_differ = FollowsDiffer::new(
Expand All @@ -1010,7 +1013,7 @@ mod tests {
);

let mut follow_change_receiver = follow_change_sender.subscribe();
let follow_changes: Arc<Mutex<Vec<FollowChange>>> = Arc::new(Mutex::new(Vec::new()));
let follow_changes: Arc<Mutex<Vec<Box<FollowChange>>>> = Arc::new(Mutex::new(Vec::new()));
let shared_follow_changes = follow_changes.clone();
let follow_change_task = tokio::spawn(async move {
loop {
Expand Down
43 changes: 17 additions & 26 deletions src/domain/notification_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl NotificationFactory {
}
}

pub fn insert(&mut self, follow_change: FollowChange) {
pub fn insert(&mut self, follow_change: Box<FollowChange>) {
let followee_info = self
.followee_maps
.entry(follow_change.followee)
Expand Down Expand Up @@ -159,10 +159,10 @@ mod tests {
let followee = Keys::generate().public_key();

let change1 = create_follow_change(follower, followee, seconds_to_datetime(1));
notification_factory.insert(change1);
notification_factory.insert(change1.into());

let change2 = create_follow_change(follower, followee, seconds_to_datetime(1));
notification_factory.insert(change2.clone());
notification_factory.insert(change2.into());

// When they share the same time, the last change added should be kept
let messages = notification_factory.flush();
Expand All @@ -179,10 +179,10 @@ mod tests {
let followee = Keys::generate().public_key();

let newer_change = create_follow_change(follower, followee, seconds_to_datetime(2));
notification_factory.insert(newer_change.clone());
notification_factory.insert(newer_change.into());

let older_change = create_unfollow_change(follower, followee, seconds_to_datetime(1));
notification_factory.insert(older_change);
notification_factory.insert(older_change.into());

let messages = notification_factory.flush();
assert_eq!(messages.len(), 1);
Expand All @@ -200,16 +200,16 @@ mod tests {
let change1 = create_follow_change(follower, followee1, seconds_to_datetime(2));
let change2 = create_follow_change(follower, followee2, seconds_to_datetime(1));

notification_factory.insert(change1.clone());
notification_factory.insert(change2.clone());
notification_factory.insert(change1.clone().into());
notification_factory.insert(change2.clone().into());

let mut messages = notification_factory.flush();
// Both changes should be kept since they have different followees
assert_eq!(
messages.sort(),
[
NotificationMessage::from(change1.clone()),
NotificationMessage::from(change2.clone())
NotificationMessage::from(Box::new(change1)),
NotificationMessage::from(Box::new(change2))
]
.sort()
);
Expand All @@ -225,8 +225,8 @@ mod tests {
let follow_change = create_follow_change(follower, followee, seconds_to_datetime(1));
let unfollow_change = create_unfollow_change(follower, followee, seconds_to_datetime(2));

notification_factory.insert(follow_change.clone());
notification_factory.insert(unfollow_change.clone());
notification_factory.insert(follow_change.into());
notification_factory.insert(unfollow_change.into());

// The unfollow should cancel the follow
assert_eq!(notification_factory.flush(), []);
Expand All @@ -241,8 +241,8 @@ mod tests {
let unfollow_change = create_unfollow_change(follower, followee, seconds_to_datetime(1));
let follow_change = create_follow_change(follower, followee, seconds_to_datetime(2));

notification_factory.insert(unfollow_change.clone());
notification_factory.insert(follow_change.clone());
notification_factory.insert(unfollow_change.into());
notification_factory.insert(follow_change.into());

// The follow should cancel the unfollow
assert_eq!(notification_factory.flush(), []);
Expand Down Expand Up @@ -458,7 +458,7 @@ mod tests {
follower: PublicKey,
) -> FollowChange {
let change = create_follow_change(follower, followee, seconds_to_datetime(1));
notification_factory.insert(change.clone());
notification_factory.insert(change.clone().into());
change
}

Expand All @@ -470,23 +470,13 @@ mod tests {
insert_follower(notification_factory, followee, follower)
}

fn insert_unfollower(
notification_factory: &mut NotificationFactory,
followee: PublicKey,
follower: PublicKey,
) -> FollowChange {
let change = create_unfollow_change(follower, followee, seconds_to_datetime(1));
notification_factory.insert(change.clone());
change
}

fn insert_new_unfollower<'a>(
notification_factory: &mut NotificationFactory,
followee: PublicKey,
) -> FollowChange {
let follower = Keys::generate().public_key();
let change = create_unfollow_change(follower, followee, seconds_to_datetime(1));
notification_factory.insert(change.clone());
notification_factory.insert(change.clone().into());
change
}

Expand All @@ -513,7 +503,8 @@ mod tests {
let mut expected_batches = Vec::new();

for (_, changes) in expected {
let batch: NotificationMessage = (*changes).to_vec().into();
let batch: NotificationMessage =
(*changes).to_vec().into_iter().map(|fc| fc.into()).into();
expected_batches.push(batch);
}

Expand Down
36 changes: 19 additions & 17 deletions src/domain/notification_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,22 @@ impl NotificationMessage {
&self.followee
}

pub fn add(&mut self, follow_change: FollowChange) {
pub fn add(&mut self, follow_change: Box<FollowChange>) {
assert!(self.followee == follow_change.followee, "Followee mismatch");

assert!(
self.len() < MAX_FOLLOWERS_PER_BATCH,
"Too many followers in a single message, can't exceed {}",
MAX_FOLLOWERS_PER_BATCH
);

if follow_change.change_type == ChangeType::Followed {
self.follows.insert(follow_change.follower);
} else {
//TODO typed instead of conditional
panic!("Only followed changes are allowed");
}
assert_eq!(
follow_change.change_type,
ChangeType::Followed,
"Only followed changes can be messaged"
);

self.follows.insert(follow_change.follower);

if self.len() == 1 {
self.friendly_follower = Some(follow_change.friendly_follower);
Expand All @@ -61,7 +63,7 @@ impl NotificationMessage {
}
}

pub fn add_all(&mut self, follow_changes: impl IntoIterator<Item = FollowChange>) {
pub fn add_all(&mut self, follow_changes: impl IntoIterator<Item = Box<FollowChange>>) {
for follow_change in follow_changes {
self.add(follow_change);
}
Expand Down Expand Up @@ -131,8 +133,8 @@ impl Debug for NotificationMessage {
}
}

impl From<FollowChange> for NotificationMessage {
fn from(change: FollowChange) -> Self {
impl From<Box<FollowChange>> for NotificationMessage {
fn from(change: Box<FollowChange>) -> Self {
let mut message = NotificationMessage::new(change.followee);
message.add(change);
message
Expand All @@ -141,7 +143,7 @@ impl From<FollowChange> for NotificationMessage {

impl<T> From<T> for NotificationMessage
where
T: IntoIterator<Item = FollowChange>,
T: IntoIterator<Item = Box<FollowChange>>,
{
fn from(changes: T) -> Self {
let mut changes = changes.into_iter();
Expand Down Expand Up @@ -171,7 +173,7 @@ mod tests {

let mut message = NotificationMessage::new(followee1);

message.add(follower1_follow);
message.add(Box::new(follower1_follow));

assert_eq!(
serde_json::to_string(&message).unwrap(),
Expand Down Expand Up @@ -201,16 +203,16 @@ mod tests {

let mut message = NotificationMessage::new(followee1);

message.add(follower1_follow);
message.add(follower2_follow);
message.add(follower2_follow2);
message.add(follower3_follow);
message.add(follower1_follow.into());
message.add(follower2_follow.into());
message.add(follower2_follow2.into());
message.add(follower3_follow.into());

// TODO: This panics on github CI, but not locally. Investigate.
#[cfg(not(feature = "ci"))]
{
let result = std::panic::catch_unwind(|| {
NotificationMessage::new(followee1).add(wrong_followee_change)
NotificationMessage::new(followee1).add(wrong_followee_change.into())
});
assert!(result.is_err());
}
Expand Down
4 changes: 2 additions & 2 deletions src/follow_change_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ where
}

#[async_trait]
impl<T: GetEventsOf> WorkerTask<FollowChange> for FollowChangeHandler<T> {
async fn call(&self, mut follow_change: FollowChange) -> Result<(), Box<dyn Error>> {
impl<T: GetEventsOf> WorkerTask<Box<FollowChange>> for FollowChangeHandler<T> {
async fn call(&self, mut follow_change: Box<FollowChange>) -> Result<(), Box<dyn Error>> {
// Fetch friendly IDs for the pubkeys or fallback to the DB if it takes
// more than timeout_secs. Whatever is found through the network is
// cached.
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn start(settings: Settings) -> Result<()> {
info!("Initializing workers for follower list diff calculation");
let shared_nostr_client = Arc::new(create_client());
let (follow_change_sender, _) =
broadcast::channel::<FollowChange>(settings.follow_change_channel_size.get());
broadcast::channel::<Box<FollowChange>>(settings.follow_change_channel_size.get());
let follows_differ_worker = FollowsDiffer::new(
repo.clone(),
shared_nostr_client.clone(),
Expand Down
Loading

0 comments on commit 221a8ea

Please sign in to comment.