diff --git a/docs/openapi.yaml b/docs/openapi.yaml index 9773264..22333ce 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -27,10 +27,14 @@ components: type: object required: - error + - value properties: error: type: 'null' - example: {error: null} + example: null + value: + type: 'null' + example: null username: type: string minLength: 1 @@ -369,6 +373,7 @@ paths: - text - created_at - votes + - replies properties: id: $ref: '#/components/schemas/masked-id' @@ -387,6 +392,8 @@ paths: type: string created_at: type: string + replies: + type: integer votes: $ref: '#/components/schemas/votes' @@ -431,6 +438,7 @@ paths: - reply_context - text - created_at + - replies - votes properties: id: @@ -450,6 +458,8 @@ paths: type: string created_at: type: string + replies: + type: integer votes: $ref: '#/components/schemas/votes' @@ -541,3 +551,236 @@ paths: - type: 'null' - type: number minimum: 0 + /comments/{comment_id}/: + delete: + summary: Delete a comment + parameters: + - name: comment_id + in: path + required: true + schema: + id: + type: string + responses: + '200': + content: + application/json: + schema: + type: object + required: + - error + - value + properties: + error: + type: string + example: null + value: + type: string + example: null + '400': + content: + application/json: + schema: + type: object + required: + - BadRequest + properties: + BadRequest: + type: string + '401': + $ref: '#/components/responses/unauthenticated' + '500': + $ref: '#/components/responses/unexpected' + + /comments/{comment_id}/vote: + put: + summary: Vote on comment + parameters: + - name: post_id + in: path + required: true + schema: + $ref: '#/components/schemas/masked-id' + requestBody: + required: true + content: + application/json: + schema: + type: integer + enum: + - -1 + - 0 + - 1 + responses: + '200': + content: + application/json: + schema: + type: 'object' + required: + - value + properties: + value: + $ref: '#/components/schemas/votes' + '400': + content: + application/json: + schema: + type: 'object' + properties: + BadRequest: + type: string + '401': + $ref: '#/components/responses/unauthenticated' + '500': + $ref: '#/components/responses/unexpected' + /comments/: + post: + summary: Create comment + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - text + - parent_post + - parent_comments + properties: + text: + type: string + parent_post: + $ref: '#/components/schemas/masked-id' + parent_comments: + type: array + items: + $ref: '#/components/schemas/masked-id' + responses: + '200': + content: + application/json: + schema: + type: object + required: + - error + - value + properties: + error: + type: string + example: null + value: + type: object + required: + - id + properties: + id: + $ref: '#/components/schemas/masked-id' + '400': + content: + application/json: + schema: + type: 'object' + properties: + BadRequest: + type: string + '401': + $ref: '#/components/responses/unauthenticated' + '500': + $ref: '#/components/responses/unexpected' + get: + summary: Fetch comments from a post, or from a parent comment + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - kind + - sort + - parent + - seen + properties: + kind: + type: string + enum: + - thread + - root + sort: + type: string + enum: + - new + - top + - worst + - controversial + - best + - replies + parent: + $ref: '#/components/schemas/masked-id' + seen: + type: array + items: + $ref: '#/components/schemas/masked-id' + responses: + '200': + content: + application/json: + schema: + type: object + required: + - error + - value + properties: + error: + type: string + example: null + value: + type: array + items: + type: object + required: + - id + - parent_comments + - parent_post + - text + - replies + - children + - votes + properties: + id: + $ref: '#/components/schemas/masked-id' + parent_comments: + type: array + items: + $ref: '#/components/schemas/masked-id' + parent_post: + $ref: '#/components/schemas/masked-id' + text: + type: string + replies: + type: integer + children: + type: array + items: + $ref: '#/components/schemas/masked-id' + votes: + type: object + required: + - up + - down + properties: + up: + type: integer + down: + type: integer + '400': + content: + application/json: + schema: + type: 'object' + properties: + BadRequest: + type: string + '500': + $ref: '#/components/responses/unexpected' diff --git a/src/auth.rs b/src/auth.rs index 91212b6..93eae28 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -16,7 +16,6 @@ use actix_web::{ ResponseError, }; use log::{ - debug, error, warn, }; diff --git a/src/conf.rs b/src/conf.rs index 592787c..d8b4399 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -15,6 +15,18 @@ pub const POST_MAX_SIZE: usize = 1000; /// The maximum length of a comment in UTF-8 bytes. pub const COMMENT_MAX_SIZE: usize = 500; +/// The max depth of a comment thread. +pub const COMMENT_MAX_DEPTH: usize = 5; + +/// The number of comments to return for each request to a comments list. +pub const COMMENTS_PAGE_SIZE: u16 = 5; + +/// The max number of comments to return for each request to build a comment thread +pub const MAX_REPLYING_COMMENTS_PER_LOAD: u16 = 3; + +/// The minimum number of comments to return for each request to build a comment thread (if available). +pub const MIN_REPLYING_COMMENTS_PER_LOAD_IF_AVAILABLE: u16 = 2; + /// The maximum length of a username. pub const USERNAME_MAX_LENGTH: usize = 32; diff --git a/src/main.rs b/src/main.rs index 4b5267e..47d3640 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ mod masked_oid; mod middleware; mod services; mod types; +mod utils; use std::env; use std::error::Error; @@ -40,6 +41,7 @@ use mongodb::{ Database, IndexModel, }; +use types::Comment; use crate::masked_oid::MaskingKey; use crate::middleware::HostCheckWrap; @@ -58,7 +60,9 @@ async fn initialize_database(db: &Database) -> mongodb::error::Result<()> { let users = db.collection::("users"); let sessions = db.collection::("sessions"); let posts = db.collection::("posts"); - let votes = db.collection::("votes"); + let post_votes = db.collection::("post_votes"); + let comment_votes = db.collection::("comment_votes"); + let comments = db.collection::("comments"); try_join!( users.create_index( @@ -90,6 +94,51 @@ async fn initialize_database(db: &Database) -> mongodb::error::Result<()> { .build(), None, ), + // Is this too many indices? Probably... + comments.create_index( + IndexModel::builder().keys(doc! {"parent_post": -1}).build(), + None, + ), + comments.create_index( + IndexModel::builder() + .keys(doc! {"parent_comments": -1}) + .build(), + None, + ), + comments.create_index( + IndexModel::builder().keys(doc! {"replies": -1}).build(), + None, + ), + comments.create_index( + IndexModel::builder() + .keys(doc! {"sequential_id": -1}) + .build(), + None, + ), + comments.create_index( + IndexModel::builder() + .keys(doc! {"votes_up": -1, "votes_down": -1}) + .build(), + None, + ), + comments.create_index( + IndexModel::builder() + .keys(doc! {"absolute_score": -1}) + .build(), + None, + ), + comments.create_index( + IndexModel::builder() + .keys(doc! {"absolute_score": 1}) + .build(), + None, + ), + comments.create_index( + IndexModel::builder() + .keys(doc! {"trending_score": -1}) + .build(), + None, + ), posts.create_index( IndexModel::builder() .keys(doc! {"sequential_id": -1}) @@ -156,9 +205,16 @@ async fn initialize_database(db: &Database) -> mongodb::error::Result<()> { Ok(()) }, - votes.create_index( + post_votes.create_index( + IndexModel::builder() + .keys(doc! {"content": 1, "user": 1}) + .options(IndexOptions::builder().unique(true).build()) + .build(), + None, + ), + comment_votes.create_index( IndexModel::builder() - .keys(doc! {"post": 1, "user": 1}) + .keys(doc! {"content": 1, "user": 1}) .options(IndexOptions::builder().unique(true).build()) .build(), None, @@ -244,6 +300,10 @@ async fn main() -> Result<(), Box> { .service(services::profile::get_watched) .service(services::profile::add_watched) .service(services::profile::delete_watched) + .service(services::comments::get_comment) + .service(services::comments::create_comment) + .service(services::comments::delete_comment) + .service(services::comments::vote_on_comment) }) .bind(("0.0.0.0", 3000))? .run() diff --git a/src/masked_oid.rs b/src/masked_oid.rs index 51930a7..7420aa7 100644 --- a/src/masked_oid.rs +++ b/src/masked_oid.rs @@ -18,7 +18,7 @@ use serde::{ const TYPE_OBJECT_ID: u8 = 0; const TYPE_SEQUENTIAL_ID: u8 = 1; -#[derive(Clone, Deserialize, Serialize)] +#[derive(Clone, Deserialize, Serialize, PartialEq)] pub struct MaskedObjectId(#[serde(with = "crate::base64_serde")] [u8; 16]); #[derive(Clone, Deserialize, Serialize)] @@ -35,6 +35,12 @@ impl fmt::Display for PaddingError { } } +impl fmt::Display for MaskedObjectId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", base64::encode(&self.0)) + } +} + impl Error for PaddingError {} impl MaskingKey { diff --git a/src/services/comments.rs b/src/services/comments.rs new file mode 100644 index 0000000..1bf0b16 --- /dev/null +++ b/src/services/comments.rs @@ -0,0 +1,823 @@ +use std::collections::{ + HashMap, + VecDeque, +}; + +use actix_web::{ + delete, + get, + post, + put, + web, +}; +use futures::TryStreamExt; +use log::{ + debug, + error, +}; +use mongodb::{ + bson::{ + self, + doc, + Bson, + DateTime, + Document, + }, + options::{ + FindOneOptions, + FindOptions, + TransactionOptions, + }, + Client as MongoClient, + Database, +}; +use rand::Rng; +use serde::{ + Deserialize, + Serialize, +}; + +use crate::{ + api_types::{ + success, + ApiResult, + Failure, + }, + auth::AuthenticatedUser, + conf, + masked_oid::{ + self, + MaskedObjectId, + MaskingKey, + }, + services::posts::{ + Created, + Votes, + }, + to_unexpected, + types::{ + Comment, + Post, + Vote, + }, + utils::content_scoring::get_trending_score_time, +}; + +/// Detailed comment that is returned to the frontend with just the needed details. +#[derive(Serialize, Clone)] +pub struct CommentDetail { + pub id: MaskedObjectId, + pub parent_comments: Vec, + pub parent_post: MaskedObjectId, + pub text: String, + pub replies: i32, + pub children: Vec, + pub votes: Votes, +} + +/// The request body for creating a comment. +#[derive(Deserialize)] +pub struct CreateRequest { + pub text: String, + pub parent_post: MaskedObjectId, + pub parent_comments: Vec, +} + +/// Vote on a comment. +#[put("/comments/{comment_id}/vote")] +pub async fn vote_on_comment( + mongo_client: web::Data, + db: web::Data, + masking_key: web::Data<&'static MaskingKey>, + user: AuthenticatedUser, + comment_id: web::Path, + request: web::Json, // TODO: enum; see https://github.com/serde-rs/serde/issues/745 +) -> ApiResult { + if !(-1..=1).contains(&*request) { + return Err(Failure::BadRequest("invalid vote")); + } + + let comment_id = masking_key + .unmask(&comment_id) + .map_err(|masked_oid::PaddingError| Failure::BadRequest("bad masked id"))?; + + let mut session = mongo_client + .start_session(None) + .await + .map_err(to_unexpected!("Starting session failed"))?; + + let mut attempt = 0; + 'atomic_vote: loop { + attempt += 1; + if attempt > 8 { + error!("Too many voting attempts"); + return Err(Failure::Unexpected); + } + + if attempt > 1 { + session + .abort_transaction() + .await + .map_err(to_unexpected!("Aborting vote transaction failed"))?; + } + + let existing_vote = db + .collection::("comment_votes") + .find_one_with_session( + doc! { + "content": {"$eq": comment_id}, + "user": {"$eq": user.id}, + }, + None, + &mut session, + ) + .await + .map_err(to_unexpected!("Finding existing vote failed"))? + .map(|v| v.value); + + session + .start_transaction(None) + .await + .map_err(to_unexpected!("Starting transaction failed"))?; + + match existing_vote { + None => { + match db + .collection::("comment_votes") + .insert_one_with_session( + Vote { + content: comment_id, + user: user.id, + value: *request, + }, + None, + &mut session, + ) + .await + { + Ok(_) => {} + Err(err) => { + debug!("Inserting vote failed: {}", err); + continue 'atomic_vote; + } + } + } + Some(existing_vote) => { + match db + .collection::("comment_votes") + .update_one_with_session( + doc! { + "content": {"$eq": comment_id}, + "user": {"$eq": user.id}, + "value": {"$eq": existing_vote}, + }, + doc! { + "$set": { + "value": *request, + }, + }, + None, + &mut session, + ) + .await + { + Ok(update_result) if update_result.matched_count == 1 => {} + Ok(_) => { + debug!("Updating vote failed: no match"); + continue 'atomic_vote; + } + Err(err) => { + debug!("Updating vote failed: {}", err); + continue 'atomic_vote; + } + } + } + } + + let votes_up_difference = -i32::from(existing_vote == Some(1)) + i32::from(*request == 1); + let votes_down_difference = + -i32::from(existing_vote == Some(-1)) + i32::from(*request == -1); + let difference = -existing_vote.unwrap_or(0) + *request; + + let trending_score_time = get_trending_score_time(&comment_id.timestamp()); + + // TODO: Are update pipelines atomic? I haven’t found a straight answer yet. + let comment_update = db + .collection::("comments") + .update_one_with_session( + doc! { + "_id": {"$eq": comment_id}, + }, + vec![ + doc! { + "$addFields": { + "votes_up": { + "$add": ["$votes_up", {"$literal": votes_up_difference}], + }, + "votes_down": { + "$add": ["$votes_down", {"$literal": votes_down_difference}], + }, + "absolute_score": { + "$add": ["$absolute_score", {"$literal": difference}], + }, + }, + }, + doc! { + "$addFields": { + "trending_score": {"$add": [ + {"$multiply": [ + {"$cond": [ + {"$lt": ["$absolute_score", 0]}, + -1, + 1, + ]}, + {"$ln": + {"$add": [1, {"$abs": "$absolute_score"}]}}, + ]}, + {"$literal": trending_score_time}, + ]}, + }, + }, + ], + None, + &mut session, + ) + .await + .map_err(to_unexpected!("Updating comment score failed"))?; + + if comment_update.matched_count != 1 { + error!("Updating comment score failed: no such comment"); + return Err(Failure::Unexpected); + } + + if let Err(err) = session.commit_transaction().await { + debug!("Committing voting transaction failed: {}", err); + continue 'atomic_vote; + } + + break; + } + + let votes = db + .collection::("comments") + .find_one( + doc! { + "_id": {"$eq": comment_id}, + }, + FindOneOptions::builder() + .projection(doc! { + "_id": false, + "up": "$votes_up", + "down": "$votes_down", + }) + .build(), + ) + .await + .map_err(to_unexpected!("Retrieving updated comment votes failed"))? + .ok_or_else(|| { + error!("Retrieving updated comment votes failed: no more comments"); + Failure::Unexpected + })?; + + success(votes) +} + +/// Create a comment that's a reply to a post, and optionally to another threaded comments. +#[post("/comments/")] +pub async fn create_comment( + mongo_client: web::Data, + db: web::Data, + masking_key: web::Data<&'static MaskingKey>, + user: AuthenticatedUser, + request: web::Json, +) -> ApiResult { + // If the text is too long, send a 400 Bad Request. + if request.text.len() > conf::COMMENT_MAX_SIZE { + return Err(Failure::BadRequest("oversized comment text")); + } + + // If the comment is trying to be nested too deep, send a 400 Bad Request. + if request.parent_comments.len() > conf::COMMENT_MAX_DEPTH { + return Err(Failure::BadRequest("too many parent comments")); + } + + let mut insert_doc = doc! { + "owner": &user.id, + "text": &request.text, + "parent_post": masking_key.unmask(&request.parent_post).map_err(|masked_oid::PaddingError| Failure::BadRequest("bad masked id"))?, + "parent_comments": request.parent_comments.iter().map(|masked_oid| masking_key.unmask(masked_oid).map_err(|masked_oid::PaddingError| Failure::BadRequest("bad masked id"))).collect::, _>>()?, + "replies": 0, + "deleted": false, + "votes_up": 0, + "votes_down": 0, + "absolute_score": 0, + "trending_score": get_trending_score_time(&DateTime::now()), // approximate, but will match `_id` exactly with the next vote + }; + + let mut session = mongo_client + .start_session(None) + .await + .map_err(to_unexpected!("Starting session failed"))?; + + let transaction_options = TransactionOptions::builder() + .write_concern( + mongodb::options::WriteConcern::builder() + .w(Some(mongodb::options::Acknowledgment::Majority)) + .build(), + ) + .build(); + + let mut attempt = 0; + 'lp: loop { + attempt += 1; + if attempt > 8 { + error!("Too many comment creation attempts"); + return Err(Failure::Unexpected); + } + + if attempt > 1 { + session + .abort_transaction() + .await + .map_err(to_unexpected!("Aborting vote transaction failed"))?; + } + + let last_sequential_id = db + .collection::("comments") + .aggregate( + [ + doc! {"$sort": {"sequential_id": -1}}, + doc! {"$limit": 1}, + doc! {"$project": {"_id": false, "sequential_id": true}}, + ], + None, + ) + .await + .map_err(to_unexpected!( + "Getting next comment sequential id cursor failed" + ))? + .try_next() + .await + .map_err(to_unexpected!("Getting next comment sequential id failed"))? + .map(|doc| doc.get_i32("sequential_id").unwrap()); + + let new_sequential_id = last_sequential_id.unwrap_or(0) + 1; + insert_doc.insert("sequential_id", new_sequential_id); + + session + .start_transaction(transaction_options.clone()) + .await + .map_err(to_unexpected!("Starting transaction failed"))?; + + // Execute increment of parent comment reply counter (if array of parent comments is not empty). + if request.parent_comments.len() > 0 { + let direct_parent_id = masking_key + .unmask(&request.parent_comments.last().unwrap()) + .map_err(|masked_oid::PaddingError| Failure::BadRequest("bad masked id"))?; + match db + .collection::("comments") + .update_one_with_session( + doc! {"_id": direct_parent_id}, + doc! {"$inc": {"replies": 1}}, + None, + &mut session, + ) + .await + { + Ok(update_result) => { + if update_result.modified_count != 1 { + error!("updating parent comment failed"); + return Err(Failure::BadRequest("parent comment doesn't exist")); + } + } + Err(_) => { + error!("updating parent comment failed"); + continue 'lp; + } + } + } + + // Execute increment of parent post reply counter. + let parent_post_id = masking_key + .unmask(&request.parent_post) + .map_err(|masked_oid::PaddingError| Failure::BadRequest("bad masked id"))?; + match db + .collection::("posts") + .update_one_with_session( + doc! {"_id": parent_post_id}, + doc! {"$inc": {"replies": 1}}, + None, + &mut session, + ) + .await + { + Ok(update_result) => { + if update_result.modified_count != 1 { + error!("updating comment parent post failed"); + return Err(Failure::BadRequest("comment parent post doesn't exist")); + } + } + Err(_) => { + error!("updating comment parent post failed"); + continue 'lp; + } + } + + // Inserting the comment. + match db + .collection::("comments") + .insert_one_with_session(&insert_doc, None, &mut session) + .await + { + Ok(insertion) => { + if let Err(err) = session.commit_transaction().await { + debug!("Committing comment transaction failed: {}", err); + continue 'lp; + } else { + return success(Created { + id: masking_key.mask(&insertion.inserted_id.as_object_id().unwrap()), + }); + } + } + Err(err) => { + error!("Creating comment failed: {}", err); + continue 'lp; + } + }; + } +} + +/// Delete a specified comment. +#[delete("/comments/{comment_id}/")] +pub async fn delete_comment( + db: web::Data, + masking_key: web::Data<&'static MaskingKey>, + user: AuthenticatedUser, + comment_id: web::Path, +) -> ApiResult<(), ()> { + match db.collection::("comments").update_one( + doc! {"_id": masking_key.unmask(&comment_id).map_err(|masked_oid::PaddingError| Failure::BadRequest("bad masked id"))?, "owner": &user.id}, + doc! {"$set": {"deleted": true}}, + None, + ).await { + Ok(_) => success(()), // idempotent deletion + Err(_) => Err(Failure::Unexpected), + } +} + +/// The types of comment fetching options. +/// +/// - `Root': Fetches the root comments of a post. +/// - `Thread': Fetches comments based off a specified comment. +#[derive(Deserialize)] +#[serde(rename_all = "snake_case")] +enum CommentFetchType { + Root, + Thread, +} + +/// The types of comment sorting options. +/// +/// - `New': Sorts by the time the comment was created (using `sequential_id`). +/// - `Top': Sorts by the highest absolute score of the comment. +/// - `Worst': Sorts by the lowest absolute score of the comment. +/// - `Controversial': Sorts by the closest absolute score to 0, if there's a tie, rank the one with most total votes highest. +/// - `Best': Sorts by the highest trending score of the comment. +/// - `Replies': Sorts by the highest number of replies the comment has. +#[derive(Deserialize)] +#[serde(rename_all = "snake_case")] +enum CommentSort { + New, + Top, + Worst, + Controversial, + Best, + Replies, +} + +impl CommentSort { + /// Converts the type of sort to the sorting `doc`s. + fn sort(&self) -> Vec { + match self { + CommentSort::New => vec![doc! {"sequential_id": -1}], + CommentSort::Top => vec![doc! {"absolute_score": -1}], + CommentSort::Worst => vec![doc! {"absolute_score": 1}], + CommentSort::Controversial => vec![ + doc! { + "$addFields": { + "score_diff": { + "$abs": { + "$subtract": ["$absolute_score", 0] + } + }, + "total_votes": { + "$add": ["$votes_up", "$votes_down"] + } + } + }, + doc! { + "$sort": { "score_diff": 1, "total_votes": -1 } + }, + ], + CommentSort::Best => vec![doc! {"trending_score": -1}], + CommentSort::Replies => vec![doc! {"replies": -1}], + } + } +} + +/// The request body for getting comments. +#[derive(Deserialize)] +pub struct ListQuery { + kind: CommentFetchType, // are you fetching starting at a root or another comment? + parent: MaskedObjectId, // what is the id of the parent comment/post? + seen: Vec, // what comments have you already seen? + sort: CommentSort, // what sort are you using? +} + +/// The endpoint for fetching commments. +#[get("/comments/")] +pub async fn get_comment( + db: web::Data, + masking_key: web::Data<&'static MaskingKey>, + query: web::Json, +) -> ApiResult>, ()> { + let id = masking_key + .unmask(&query.parent) + .map_err(|masked_oid::PaddingError| Failure::BadRequest("bad masked id"))?; + + let unmasked_seen: Result, _> = query + .seen + .iter() + .map(|masked_oid| { + masking_key + .unmask(masked_oid) + .map_err(|_| Failure::BadRequest("bad masked id")) + .map(|oid| Bson::ObjectId(oid)) + }) + .collect(); + let mut excluded_ids = unmasked_seen?; + + let mut find_filter = match &query.kind { + CommentFetchType::Root { .. } => { + vec![ + doc! { + "parent_post": id, + }, + doc! { + "parent_comments": { + "$size": 0 + } + }, + ] + } + CommentFetchType::Thread { .. } => { + vec![doc! { + "$expr": { + "$eq": [ + { "$arrayElemAt": [ "$parent_comments", -1 ] }, + id + ] + } + }] + } + }; + + find_filter.push(doc! { + "_id": { + "$not": { + "$in": &excluded_ids + } + } + }); + + let find_filter = doc! { + "$and": find_filter + }; + + let mut found_comments; + if let CommentSort::Controversial = query.sort { + let mut pipeline = vec![ + doc! { + "$match": find_filter + }, + doc! { + "$limit": i64::from(conf::COMMENTS_PAGE_SIZE) + }, + ]; + pipeline.splice(1..1, query.sort.sort()); + found_comments = db + .collection::("comments") + .aggregate(pipeline, None) + .await + .map_err(to_unexpected!("Getting comments failed"))? + .map_ok(|doc| { + bson::from_bson(bson::Bson::Document(doc)) + .map_err(to_unexpected!("Deserializing comment failed")) + }) + .try_collect::>>>() + .await + .map_err(to_unexpected!("Getting comments failed"))? + .into_iter() + .map(|result| { + result.and_then(|comment| { + Ok(CommentDetail { + id: masking_key.mask(&comment.id), + parent_comments: comment + .parent_comments + .iter() + .map(|id| masking_key.mask(id)) + .collect(), + parent_post: masking_key.mask(&comment.parent_post), + text: if comment.deleted { + "[deleted]".to_string() + } else { + comment.text + }, + replies: comment.replies, + children: vec![], + votes: Votes { + up: u32::try_from(comment.votes_up).unwrap(), + down: u32::try_from(comment.votes_down).unwrap(), + }, + }) + }) + }) + .collect::, Failure<()>>>()?; + } else { + found_comments = db + .collection::("comments") + .find( + find_filter, + FindOptions::builder() + .sort(query.sort.sort()[0].clone()) + .limit(i64::from(conf::COMMENTS_PAGE_SIZE)) + .build(), + ) + .await + .map_err(to_unexpected!("Getting comments cursor failed"))? + .map_ok(|comment| { + Ok(CommentDetail { + id: masking_key.mask(&comment.id), + parent_comments: comment + .parent_comments + .iter() + .map(|id| masking_key.mask(id)) + .collect(), + parent_post: masking_key.mask(&comment.parent_post), + text: if comment.deleted { + "[deleted]".to_string() + } else { + comment.text + }, + replies: comment.replies, + children: vec![], + votes: Votes { + up: u32::try_from(comment.votes_up).unwrap(), + down: u32::try_from(comment.votes_down).unwrap(), + }, + }) + }) + .try_collect::>>>() + .await + .map_err(to_unexpected!("Getting comments failed"))? + .into_iter() + .collect::, Failure<()>>>()?; + } + + let init_depth: i32; + if found_comments.len() == 0 { + return success(Box::new(vec![])); + } else { + init_depth = found_comments[0].parent_comments.len() as i32; + } + + let mut deque: VecDeque = VecDeque::from(found_comments.clone()); + let mut count = 0; + while let Some(parent_comment) = deque.pop_front() { + if count > conf::MAX_REPLYING_COMMENTS_PER_LOAD { + break; + }; + if parent_comment.replies == 0 { + continue; + }; + let replies = db.collection::("comments") + .find( + doc! { + "$and": vec![ + doc! { + "_id": { + "$not": { + "$in": &excluded_ids + } + } + }, + doc! { + "$expr": { + "$eq": [ + { "$arrayElemAt": [ "$parent_comments", -1 ] }, + masking_key.unmask(&parent_comment.id).map_err(|masked_oid::PaddingError| Failure::BadRequest("bad masked id"))? + ] + } + }, + ] + }, + FindOptions::builder() + .sort(doc! {"replies": -1}) // sort threaded comments by replies to help build the best tree structures + .limit(i64::from(conf::MAX_REPLYING_COMMENTS_PER_LOAD)) + .build() + ) + .await + .map_err(to_unexpected!("Getting comments cursor failed"))? + .map_ok(|comment| Ok(CommentDetail { + id: masking_key.mask(&comment.id), + parent_comments: comment.parent_comments.iter().map(|id| masking_key.mask(id)).collect(), + parent_post: masking_key.mask(&comment.parent_post), + text: if comment.deleted {"[deleted]".to_string()} else {comment.text}, + replies: comment.replies, + children: vec![], + votes: Votes { + up: u32::try_from(comment.votes_up).unwrap(), + down: u32::try_from(comment.votes_down).unwrap(), + }, + })) + .try_collect::>>>() + .await + .map_err(to_unexpected!("Getting comments failed"))? + .into_iter() + .collect::, Failure<()>>>()?; + for comment in replies { + if count < conf::MIN_REPLYING_COMMENTS_PER_LOAD_IF_AVAILABLE + || rand::thread_rng().gen_bool(p(1.0, parent_comment.replies as f64)) + { + count += 1; + excluded_ids.push(Bson::ObjectId(masking_key.unmask(&comment.id).map_err( + |masked_oid::PaddingError| Failure::BadRequest("bad masked id"), + )?)); + deque.push_back(comment.clone()); + found_comments.push(comment.clone()); + } + } + } + + let threaded_comments = thread_comments(found_comments.clone(), init_depth); + + let results: Vec = found_comments + .iter() + .filter(|c| { + c.parent_comments.len() == init_depth as usize + && threaded_comments.iter().any(|tc| tc.id == c.id) + }) + .filter_map(|c| threaded_comments.iter().find(|tc| tc.id == c.id).cloned()) + .collect(); + + success(Box::new(results)) +} + +fn thread_comments(comments: Vec, init_depth: i32) -> Vec { + let mut comment_map: HashMap = HashMap::new(); + + // first pass: create comment map and add each comment to the map + for comment in comments { + comment_map.insert(comment.id.to_string(), comment); + } + + // second pass: thread each top-level comment and its children recursively + let mut threaded_comments: Vec = vec![]; + for comment in comment_map.clone().values() { + if comment.parent_comments.len() == (init_depth) as usize { + let threaded_comment = thread_comment((init_depth as u32), comment, &mut comment_map); + threaded_comments.push(threaded_comment); + } + } + threaded_comments +} + +fn thread_comment( + depth: u32, + comment: &CommentDetail, + comment_map: &mut HashMap, +) -> CommentDetail { + let mut threaded_comment = comment.clone(); + for c in comment_map.clone().values() { + if c.parent_comments.contains(&comment.id) + && c.parent_comments.len() <= (depth + 1).try_into().unwrap() + { + let threaded_child = thread_comment(depth + 1, c, comment_map); + threaded_comment.children.push(threaded_child); + } + } + comment_map.remove(&threaded_comment.id.to_string()); + threaded_comment +} + +/// The probability of a comment being included in the results. +fn p(numerator: f64, denominator: f64) -> f64 { + if denominator == 0.0 { + 0.0 + } else { + if denominator > conf::MAX_REPLYING_COMMENTS_PER_LOAD as f64 { + return numerator / conf::MAX_REPLYING_COMMENTS_PER_LOAD as f64; + } + numerator / denominator + } +} diff --git a/src/services/mod.rs b/src/services/mod.rs index b10a526..12bfbcf 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,4 +1,5 @@ pub mod auth; +pub mod comments; pub mod posts; pub mod profile; diff --git a/src/services/posts.rs b/src/services/posts.rs index e1d6a28..68f0f34 100644 --- a/src/services/posts.rs +++ b/src/services/posts.rs @@ -52,13 +52,14 @@ use crate::types::{ Post, Vote, }; +use crate::utils::content_scoring::get_trending_score_time; #[derive(Serialize)] pub struct ReplyContext { pub id: MaskedObjectId, } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct Votes { pub up: u32, pub down: u32, @@ -72,6 +73,7 @@ pub struct Detail { pub text: String, pub created_at: String, pub votes: Votes, + pub replies: i32, } #[derive(Deserialize)] @@ -131,6 +133,7 @@ pub async fn get_single_post( up: u32::try_from(post.votes_up).unwrap(), down: u32::try_from(post.votes_down).unwrap(), }, + replies: post.replies, })) } @@ -192,6 +195,7 @@ pub async fn list( up: u32::try_from(post.votes_up).unwrap(), down: u32::try_from(post.votes_down).unwrap(), }, + replies: post.replies, }) }) .try_collect::>>>() @@ -203,12 +207,6 @@ pub async fn list( success(posts.into()) } -/// Gets the time-based offset of the trending score for the given timestamp. -fn get_trending_score_time(date_time: &DateTime) -> f64 { - f64::from(u32::try_from(date_time.timestamp_millis() / 1000 - conf::TRENDING_EPOCH).unwrap()) - / conf::TRENDING_DECAY -} - #[post("/posts/")] pub async fn create( db: web::Data, @@ -227,6 +225,7 @@ pub async fn create( "votes_down": 0, "absolute_score": 0, "trending_score": get_trending_score_time(&DateTime::now()), // approximate, but will match `_id` exactly with the next vote + "replies": 0, }; let mut attempt = 0; let insertion = loop { @@ -321,10 +320,10 @@ pub async fn vote( } let existing_vote = db - .collection::("votes") + .collection::("post_votes") .find_one_with_session( doc! { - "post": {"$eq": post_id}, + "content": {"$eq": post_id}, "user": {"$eq": user.id}, }, None, @@ -342,10 +341,10 @@ pub async fn vote( match existing_vote { None => { match db - .collection::("votes") + .collection::("post_votes") .insert_one_with_session( Vote { - post: post_id, + content: post_id, user: user.id, value: *request, }, @@ -363,10 +362,10 @@ pub async fn vote( } Some(existing_vote) => { match db - .collection::("votes") + .collection::("post_votes") .update_one_with_session( doc! { - "post": {"$eq": post_id}, + "content": {"$eq": post_id}, "user": {"$eq": user.id}, "value": {"$eq": existing_vote}, }, diff --git a/src/types.rs b/src/types.rs index 44c8fd1..e0196a1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -91,6 +91,22 @@ pub struct Session { pub last_used: DateTime, } +#[derive(Deserialize)] +pub struct Comment { + #[serde(rename = "_id")] + pub id: ObjectId, + pub owner: ObjectId, + pub parent_post: ObjectId, + pub parent_comments: Vec, + pub text: String, + pub replies: i32, + pub deleted: bool, // specifies whether or not a comment has been deleted, without actually deleting it to prevent messing up the threaded structure + pub votes_up: i32, + pub votes_down: i32, + pub absolute_score: i32, + pub trending_score: f64, +} + #[derive(Deserialize)] pub struct Post { #[serde(rename = "_id")] @@ -102,6 +118,7 @@ pub struct Post { pub votes_down: i32, pub absolute_score: i32, pub trending_score: f64, + pub replies: i32, } /// The various years of study the creator of a post can be. @@ -143,7 +160,7 @@ pub struct School { #[derive(Deserialize, Serialize)] pub struct Vote { - pub post: ObjectId, + pub content: ObjectId, pub user: ObjectId, pub value: i32, } diff --git a/src/utils/content_scoring.rs b/src/utils/content_scoring.rs new file mode 100644 index 0000000..483d11d --- /dev/null +++ b/src/utils/content_scoring.rs @@ -0,0 +1,9 @@ +use mongodb::bson::DateTime; + +use crate::conf; + +/// Gets the time-based offset of the trending score for the given timestamp. +pub fn get_trending_score_time(date_time: &DateTime) -> f64 { + f64::from(u32::try_from(date_time.timestamp_millis() / 1000 - conf::TRENDING_EPOCH).unwrap()) + / conf::TRENDING_DECAY +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..34221a7 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod content_scoring;