diff --git a/src/main/scala/org/playarimaa/server/Chat.scala b/src/main/scala/org/playarimaa/server/Chat.scala index ee31341..e1dc73c 100644 --- a/src/main/scala/org/playarimaa/server/Chat.scala +++ b/src/main/scala/org/playarimaa/server/Chat.scala @@ -10,12 +10,12 @@ import akka.util.Timeout import slick.driver.H2Driver.api._ import org.playarimaa.server.Timestamp.Timestamp -object Chat { - //Leave chat if it's been 120 seconds with no activity +object ChatSystem { + //Leave chat if it's been this many seconds with no activity val INACTIVITY_TIMEOUT: Double = 120 - //Max lines at a time to read and return + //Max lines at a time to return in a single query val READ_MAX_LINES: Int = 5000 - //Timeout for waiting for chat messages + //Timeout for a single get query for chat messages val GET_TIMEOUT: Double = 15 //Timeout for akka ask queries val AKKA_TIMEOUT: Timeout = new Timeout(20 seconds) @@ -23,21 +23,27 @@ object Chat { val NO_LOGIN_MESSAGE = "Not logged in, or timed out due to inactivity" object Import { + //Alias a few types to make method declarations more self-documenting type Channel = String type Username = String type Auth = String } } -import Chat.Import._ +import ChatSystem.Import._ -class Chat(val db: Database, val actorSystem: ActorSystem)(implicit ec: ExecutionContext) { +case class ChatLine(id: Long, channel: Channel, username: Username, text: String, timestamp: Timestamp) + +//---CHAT SYSTEM---------------------------------------------------------------------------------- + +/** Class representing a whole chat system composed of various channels, backed by a database. */ +class ChatSystem(val db: Database, val actorSystem: ActorSystem)(implicit ec: ExecutionContext) { private var channelData: Map[Channel,ActorRef] = Map() private val chatDB = actorSystem.actorOf(Props(new ChatDB(db))) - implicit val timeout = Chat.AKKA_TIMEOUT + implicit val timeout = ChatSystem.AKKA_TIMEOUT - private def openChannel(channel: String): ActorRef = this.synchronized { + private def openChannel(channel: Channel): ActorRef = this.synchronized { channelData.get(channel) match { case Some(cc) => cc case None => @@ -47,64 +53,74 @@ class Chat(val db: Database, val actorSystem: ActorSystem)(implicit ec: Executio } } - def join(channel: Channel, username: Username): Future[Auth] = { - val cc = openChannel(channel) - (cc ? ChatChannel.Join(username)).map(_.asInstanceOf[Auth]) - } - - private def requiringLogin[T](channel: String)(f:ActorRef => Future[T]) : Future[T] = { + private def requiringLogin[T](channel: Channel)(f:ActorRef => Future[T]) : Future[T] = { val cc = this.synchronized { channelData.get(channel) } cc match { - case None => Future.failed(new Exception(Chat.NO_LOGIN_MESSAGE)) + case None => Future.failed(new Exception(ChatSystem.NO_LOGIN_MESSAGE)) case Some(cc) => f(cc) } } - def leave(channel: String, username: Username, auth: Auth): Future[Unit] = { + /** Join the specified chat channel */ + def join(channel: Channel, username: Username): Future[Auth] = { + val cc = openChannel(channel) + (cc ? ChatChannel.Join(username)).map(_.asInstanceOf[Auth]) + } + + /** Leave the specified chat channel. Failed if not logged in. */ + def leave(channel: Channel, username: Username, auth: Auth): Future[Unit] = { requiringLogin(channel) { cc => (cc ? ChatChannel.Leave(username,auth)).map(_.asInstanceOf[Unit]) } } - def post(channel: String, username: Username, auth: String, text:String): Future[Unit] = { + /** Post in the specified chat channel. Failed if not logged in. */ + def post(channel: Channel, username: Username, auth: Auth, text:String): Future[Unit] = { requiringLogin(channel) { cc => (cc ? ChatChannel.Post(username,auth,text)).map(_.asInstanceOf[Unit]) } } - def heartbeat(channel: String, username: Username, auth: String): Future[Unit] = { + /** Heartbeat the specified chat channel to avoid logout from inactivity. Failed if not logged in. */ + def heartbeat(channel: Channel, username: Username, auth: Auth): Future[Unit] = { requiringLogin(channel) { cc => (cc ? ChatChannel.Heartbeat(username,auth)).map(_.asInstanceOf[Unit]) } } - /** [minId] defaults to the current end of chat minus [READ_MAX_LINES] + /** Get the specified range of lines of chat from a channel. + * If [doWait] is true and there are no lines meeting the criteria, wait a short time, + * returning when a new line is posted or upon timeout. + * + * [minId] defaults to the current end of chat minus [READ_MAX_LINES] * [doWait] defaults to false. * All other optional parameters default to having no effect. */ - def get(channel: String, minId: Option[Long], maxId: Option[Long], minTime: Option[Timestamp], maxTime: Option[Timestamp], doWait: Option[Boolean]) + def get(channel: Channel, minId: Option[Long], maxId: Option[Long], minTime: Option[Timestamp], maxTime: Option[Timestamp], doWait: Option[Boolean]) : Future[List[ChatLine]] = { val cc = openChannel(channel) - val maxId_ = maxId.getOrElse(Chat.READ_MAX_LINES + 1000000L) + val maxId_ = maxId.getOrElse(0x3FFFFFFFFFFFFFFFL) val minTime_ = minTime.getOrElse(0.0) - val maxTime_ = maxTime.getOrElse(1e60) + val maxTime_ = maxTime.getOrElse(1e100) val doWait_ = doWait.getOrElse(false) (cc ? ChatChannel.Get(minId,maxId_,minTime_,maxTime_,doWait_)).map(_.asInstanceOf[List[ChatLine]]) } } +//---CHAT CHANNEL---------------------------------------------------------------------------------- + object ChatChannel { //ACTOR MESSAGES--------------------------------------------------------- //Replies with Auth - case class Join(username: String) + case class Join(username: Username) //Replies with Unit - case class Leave(username: String, auth: Auth) + case class Leave(username: Username, auth: Auth) //Replies with Unit - case class Post(username: String, auth: Auth, text:String) + case class Post(username: Username, auth: Auth, text:String) //Replies with Unit - case class Heartbeat(username: String, auth: Auth) + case class Heartbeat(username: Username, auth: Auth) /** [minId] defaults to the current end of chat minus [READ_MAX_LINES] * Replies with List[ChatLine] @@ -112,23 +128,33 @@ object ChatChannel { case class Get(minId: Option[Long], maxId: Long, minTime: Timestamp, maxTime: Timestamp, doWait: Boolean) } +/** An actor that handles an individual channel that people can chat in */ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: ActorSystem) extends Actor with Stash { class LoginData() { + //All auth keys this user has, along with the time they were most recently used var auths: Map[Auth,Timestamp] = Map() + //Most recent time anything happened for this user var lastActive: Timestamp = Timestamp.get } var loginData: Map[Username,LoginData] = Map() - var nextId: Long = 0L + + //Fulfilled and replaced on each message - this is the mechanism by which + //queries can block and wait for chat activity var nextMessage: Promise[ChatLine] = Promise() - var lastActive: Timestamp = Timestamp.get + var nextId: Long = 0L + //Holds messages that we are not confident are in the database yet, avoiding + //races between writes to the chat and queries to read the new lines posted var messagesNotYetInDB: Queue[ChatLine] = Queue() + //Most recent time anything happened in this channel + var lastActive: Timestamp = Timestamp.get + case class Initialized(nextId:Try[Long]) case class DBWritten(upToId:Long) - implicit val timeout = Chat.AKKA_TIMEOUT + implicit val timeout = ChatSystem.AKKA_TIMEOUT import context.dispatcher override def preStart = { @@ -149,10 +175,11 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A def normalReceive: Receive = { case ChatChannel.Join(username: Username) => - updateLastActive + lastActive = Timestamp.get val auth = AuthTokenGen.genToken val ld = findOrAddLogin(username) ld.auths = ld.auths + (auth -> lastActive) + ld.lastActive = lastActive sender ! (auth : Auth) case ChatChannel.Leave(username: Username, auth: Auth) => @@ -162,7 +189,7 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A } replyWith(sender, result) - case ChatChannel.Post(username: Username, auth: String, text:String) => + case ChatChannel.Post(username: Username, auth: Auth, text:String) => val result: Try[Unit] = requiringLogin(username,auth) { () => val line = ChatLine(nextId, channel, username, text, Timestamp.get) nextId = nextId + 1 @@ -187,9 +214,9 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A case ChatChannel.Get(minId: Option[Long], maxId: Long, minTime: Timestamp, maxTime: Timestamp, doWait: Boolean) => - val minId_ = minId.getOrElse(nextId - Chat.READ_MAX_LINES) + lastActive = Timestamp.get + val minId_ = minId.getOrElse(nextId - ChatSystem.READ_MAX_LINES) - updateLastActive def isOk(x:ChatLine): Boolean = { x.id >= minId_ && x.id <= maxId && @@ -213,7 +240,7 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A if(doWait && lines.isEmpty) { val result = nextMessage.future.map { x => List(x).filter(isOk) } - val timeout = akka.pattern.after(Chat.GET_TIMEOUT seconds, actorSystem.scheduler)(Future(List())) + val timeout = akka.pattern.after(ChatSystem.GET_TIMEOUT seconds, actorSystem.scheduler)(Future(List())) Future.firstCompletedOf(List(result,timeout)) } else { @@ -230,8 +257,6 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A } } - def updateLastActive: Unit = - lastActive = Timestamp.get def findOrAddLogin(username: Username): LoginData = { val ld = loginData.getOrElse(username, new LoginData) @@ -241,17 +266,18 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A def requiringLogin[T](username: Username, auth: Auth)(f:() => T) : Try[T] = { loginData.get(username) match { - case None => Failure(new Exception(Chat.NO_LOGIN_MESSAGE)) + case None => Failure(new Exception(ChatSystem.NO_LOGIN_MESSAGE)) case Some(ld) => ld.auths.get(auth) match { - case None => Failure(new Exception(Chat.NO_LOGIN_MESSAGE)) + case None => Failure(new Exception(ChatSystem.NO_LOGIN_MESSAGE)) case Some(time) => val now = Timestamp.get - if(now >= time + Chat.INACTIVITY_TIMEOUT) - Failure(new Exception(Chat.NO_LOGIN_MESSAGE)) + if(now >= time + ChatSystem.INACTIVITY_TIMEOUT) + Failure(new Exception(ChatSystem.NO_LOGIN_MESSAGE)) else { ld.auths = ld.auths + (auth -> now) - updateLastActive + lastActive = now + ld.lastActive = now Success(f()) } } @@ -267,26 +293,24 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A } - - - -case class ChatLine(id: Long, channel: String, username: String, text: String, timestamp: Timestamp) +//CHAT DATABASE--------------------------------------------------------------------- object ChatDB { //ACTOR MESSAGES--------------------------------------------------------- //Replies with Long - case class GetNextId(channel: String) + case class GetNextId(channel: Channel) //Replies with Unit when result committed case class WriteLine(line: ChatLine) //Replies with List[ChatLine] - case class ReadLines(channel: String, minId: Long, maxId: Long, minTime: Timestamp, maxTime: Timestamp) + case class ReadLines(channel: Channel, minId: Long, maxId: Long, minTime: Timestamp, maxTime: Timestamp) val table = TableQuery[ChatTable] } +/** An actor that handles chat-related database queries */ class ChatDB(val db: Database)(implicit ec: ExecutionContext) extends Actor { def receive = { @@ -311,7 +335,7 @@ class ChatDB(val db: Database)(implicit ec: ExecutionContext) extends Actor { filter(_.timestamp >= minTime). filter(_.timestamp <= maxTime). sortBy(_.id). - take(Chat.READ_MAX_LINES) + take(ChatSystem.READ_MAX_LINES) val result: Future[List[ChatLine]] = db.run(query.result).map(_.toList) result pipeTo sender } diff --git a/src/main/scala/org/playarimaa/server/ChatServlet.scala b/src/main/scala/org/playarimaa/server/ChatServlet.scala index 1085c82..66badd3 100644 --- a/src/main/scala/org/playarimaa/server/ChatServlet.scala +++ b/src/main/scala/org/playarimaa/server/ChatServlet.scala @@ -63,7 +63,7 @@ class ChatServlet(system: ActorSystem) protected implicit def executor: ExecutionContext = system.dispatcher val db = Database.forConfig("h2mem1") - val chat = new Chat(db,system) + val chat = new ChatSystem(db,system) //Before every action runs, set the content type to be in JSON format. before() {