Skip to content

Commit

Permalink
Cleanup and polish chat stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
lightvector committed Jun 29, 2015
1 parent ae7e485 commit a2278e8
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 49 deletions.
120 changes: 72 additions & 48 deletions src/main/scala/org/playarimaa/server/Chat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,40 @@ import akka.util.Timeout
import slick.driver.H2Driver.api._
import org.playarimaa.server.Timestamp.Timestamp

object Chat {
//Leave chat if it's been 120 seconds with no activity
object ChatSystem {
//Leave chat if it's been this many seconds with no activity
val INACTIVITY_TIMEOUT: Double = 120
//Max lines at a time to read and return
//Max lines at a time to return in a single query
val READ_MAX_LINES: Int = 5000
//Timeout for waiting for chat messages
//Timeout for a single get query for chat messages
val GET_TIMEOUT: Double = 15
//Timeout for akka ask queries
val AKKA_TIMEOUT: Timeout = new Timeout(20 seconds)

val NO_LOGIN_MESSAGE = "Not logged in, or timed out due to inactivity"

object Import {
//Alias a few types to make method declarations more self-documenting
type Channel = String
type Username = String
type Auth = String
}
}

import Chat.Import._
import ChatSystem.Import._

class Chat(val db: Database, val actorSystem: ActorSystem)(implicit ec: ExecutionContext) {
case class ChatLine(id: Long, channel: Channel, username: Username, text: String, timestamp: Timestamp)

//---CHAT SYSTEM----------------------------------------------------------------------------------

/** Class representing a whole chat system composed of various channels, backed by a database. */
class ChatSystem(val db: Database, val actorSystem: ActorSystem)(implicit ec: ExecutionContext) {

private var channelData: Map[Channel,ActorRef] = Map()
private val chatDB = actorSystem.actorOf(Props(new ChatDB(db)))
implicit val timeout = Chat.AKKA_TIMEOUT
implicit val timeout = ChatSystem.AKKA_TIMEOUT

private def openChannel(channel: String): ActorRef = this.synchronized {
private def openChannel(channel: Channel): ActorRef = this.synchronized {
channelData.get(channel) match {
case Some(cc) => cc
case None =>
Expand All @@ -47,88 +53,108 @@ class Chat(val db: Database, val actorSystem: ActorSystem)(implicit ec: Executio
}
}

def join(channel: Channel, username: Username): Future[Auth] = {
val cc = openChannel(channel)
(cc ? ChatChannel.Join(username)).map(_.asInstanceOf[Auth])
}

private def requiringLogin[T](channel: String)(f:ActorRef => Future[T]) : Future[T] = {
private def requiringLogin[T](channel: Channel)(f:ActorRef => Future[T]) : Future[T] = {
val cc = this.synchronized { channelData.get(channel) }
cc match {
case None => Future.failed(new Exception(Chat.NO_LOGIN_MESSAGE))
case None => Future.failed(new Exception(ChatSystem.NO_LOGIN_MESSAGE))
case Some(cc) => f(cc)
}
}

def leave(channel: String, username: Username, auth: Auth): Future[Unit] = {
/** Join the specified chat channel */
def join(channel: Channel, username: Username): Future[Auth] = {
val cc = openChannel(channel)
(cc ? ChatChannel.Join(username)).map(_.asInstanceOf[Auth])
}

/** Leave the specified chat channel. Failed if not logged in. */
def leave(channel: Channel, username: Username, auth: Auth): Future[Unit] = {
requiringLogin(channel) { cc =>
(cc ? ChatChannel.Leave(username,auth)).map(_.asInstanceOf[Unit])
}
}

def post(channel: String, username: Username, auth: String, text:String): Future[Unit] = {
/** Post in the specified chat channel. Failed if not logged in. */
def post(channel: Channel, username: Username, auth: Auth, text:String): Future[Unit] = {
requiringLogin(channel) { cc =>
(cc ? ChatChannel.Post(username,auth,text)).map(_.asInstanceOf[Unit])
}
}

def heartbeat(channel: String, username: Username, auth: String): Future[Unit] = {
/** Heartbeat the specified chat channel to avoid logout from inactivity. Failed if not logged in. */
def heartbeat(channel: Channel, username: Username, auth: Auth): Future[Unit] = {
requiringLogin(channel) { cc =>
(cc ? ChatChannel.Heartbeat(username,auth)).map(_.asInstanceOf[Unit])
}
}

/** [minId] defaults to the current end of chat minus [READ_MAX_LINES]
/** Get the specified range of lines of chat from a channel.
* If [doWait] is true and there are no lines meeting the criteria, wait a short time,
* returning when a new line is posted or upon timeout.
*
* [minId] defaults to the current end of chat minus [READ_MAX_LINES]
* [doWait] defaults to false.
* All other optional parameters default to having no effect.
*/
def get(channel: String, minId: Option[Long], maxId: Option[Long], minTime: Option[Timestamp], maxTime: Option[Timestamp], doWait: Option[Boolean])
def get(channel: Channel, minId: Option[Long], maxId: Option[Long], minTime: Option[Timestamp], maxTime: Option[Timestamp], doWait: Option[Boolean])
: Future[List[ChatLine]] = {
val cc = openChannel(channel)
val maxId_ = maxId.getOrElse(Chat.READ_MAX_LINES + 1000000L)
val maxId_ = maxId.getOrElse(0x3FFFFFFFFFFFFFFFL)
val minTime_ = minTime.getOrElse(0.0)
val maxTime_ = maxTime.getOrElse(1e60)
val maxTime_ = maxTime.getOrElse(1e100)
val doWait_ = doWait.getOrElse(false)
(cc ? ChatChannel.Get(minId,maxId_,minTime_,maxTime_,doWait_)).map(_.asInstanceOf[List[ChatLine]])
}
}

//---CHAT CHANNEL----------------------------------------------------------------------------------

object ChatChannel {

//ACTOR MESSAGES---------------------------------------------------------

//Replies with Auth
case class Join(username: String)
case class Join(username: Username)
//Replies with Unit
case class Leave(username: String, auth: Auth)
case class Leave(username: Username, auth: Auth)
//Replies with Unit
case class Post(username: String, auth: Auth, text:String)
case class Post(username: Username, auth: Auth, text:String)
//Replies with Unit
case class Heartbeat(username: String, auth: Auth)
case class Heartbeat(username: Username, auth: Auth)

/** [minId] defaults to the current end of chat minus [READ_MAX_LINES]
* Replies with List[ChatLine]
*/
case class Get(minId: Option[Long], maxId: Long, minTime: Timestamp, maxTime: Timestamp, doWait: Boolean)
}

/** An actor that handles an individual channel that people can chat in */
class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: ActorSystem) extends Actor with Stash {

class LoginData() {
//All auth keys this user has, along with the time they were most recently used
var auths: Map[Auth,Timestamp] = Map()
//Most recent time anything happened for this user
var lastActive: Timestamp = Timestamp.get
}
var loginData: Map[Username,LoginData] = Map()
var nextId: Long = 0L

//Fulfilled and replaced on each message - this is the mechanism by which
//queries can block and wait for chat activity
var nextMessage: Promise[ChatLine] = Promise()
var lastActive: Timestamp = Timestamp.get
var nextId: Long = 0L

//Holds messages that we are not confident are in the database yet, avoiding
//races between writes to the chat and queries to read the new lines posted
var messagesNotYetInDB: Queue[ChatLine] = Queue()

//Most recent time anything happened in this channel
var lastActive: Timestamp = Timestamp.get

case class Initialized(nextId:Try[Long])
case class DBWritten(upToId:Long)

implicit val timeout = Chat.AKKA_TIMEOUT
implicit val timeout = ChatSystem.AKKA_TIMEOUT
import context.dispatcher

override def preStart = {
Expand All @@ -149,10 +175,11 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A

def normalReceive: Receive = {
case ChatChannel.Join(username: Username) =>
updateLastActive
lastActive = Timestamp.get
val auth = AuthTokenGen.genToken
val ld = findOrAddLogin(username)
ld.auths = ld.auths + (auth -> lastActive)
ld.lastActive = lastActive
sender ! (auth : Auth)

case ChatChannel.Leave(username: Username, auth: Auth) =>
Expand All @@ -162,7 +189,7 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A
}
replyWith(sender, result)

case ChatChannel.Post(username: Username, auth: String, text:String) =>
case ChatChannel.Post(username: Username, auth: Auth, text:String) =>
val result: Try[Unit] = requiringLogin(username,auth) { () =>
val line = ChatLine(nextId, channel, username, text, Timestamp.get)
nextId = nextId + 1
Expand All @@ -187,9 +214,9 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A


case ChatChannel.Get(minId: Option[Long], maxId: Long, minTime: Timestamp, maxTime: Timestamp, doWait: Boolean) =>
val minId_ = minId.getOrElse(nextId - Chat.READ_MAX_LINES)
lastActive = Timestamp.get
val minId_ = minId.getOrElse(nextId - ChatSystem.READ_MAX_LINES)

updateLastActive
def isOk(x:ChatLine): Boolean = {
x.id >= minId_ &&
x.id <= maxId &&
Expand All @@ -213,7 +240,7 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A

if(doWait && lines.isEmpty) {
val result = nextMessage.future.map { x => List(x).filter(isOk) }
val timeout = akka.pattern.after(Chat.GET_TIMEOUT seconds, actorSystem.scheduler)(Future(List()))
val timeout = akka.pattern.after(ChatSystem.GET_TIMEOUT seconds, actorSystem.scheduler)(Future(List()))
Future.firstCompletedOf(List(result,timeout))
}
else {
Expand All @@ -230,8 +257,6 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A
}
}

def updateLastActive: Unit =
lastActive = Timestamp.get

def findOrAddLogin(username: Username): LoginData = {
val ld = loginData.getOrElse(username, new LoginData)
Expand All @@ -241,17 +266,18 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A

def requiringLogin[T](username: Username, auth: Auth)(f:() => T) : Try[T] = {
loginData.get(username) match {
case None => Failure(new Exception(Chat.NO_LOGIN_MESSAGE))
case None => Failure(new Exception(ChatSystem.NO_LOGIN_MESSAGE))
case Some(ld) =>
ld.auths.get(auth) match {
case None => Failure(new Exception(Chat.NO_LOGIN_MESSAGE))
case None => Failure(new Exception(ChatSystem.NO_LOGIN_MESSAGE))
case Some(time) =>
val now = Timestamp.get
if(now >= time + Chat.INACTIVITY_TIMEOUT)
Failure(new Exception(Chat.NO_LOGIN_MESSAGE))
if(now >= time + ChatSystem.INACTIVITY_TIMEOUT)
Failure(new Exception(ChatSystem.NO_LOGIN_MESSAGE))
else {
ld.auths = ld.auths + (auth -> now)
updateLastActive
lastActive = now
ld.lastActive = now
Success(f())
}
}
Expand All @@ -267,26 +293,24 @@ class ChatChannel(val channel: Channel, val chatDB: ActorRef, val actorSystem: A

}




case class ChatLine(id: Long, channel: String, username: String, text: String, timestamp: Timestamp)
//CHAT DATABASE---------------------------------------------------------------------

object ChatDB {

//ACTOR MESSAGES---------------------------------------------------------

//Replies with Long
case class GetNextId(channel: String)
case class GetNextId(channel: Channel)
//Replies with Unit when result committed
case class WriteLine(line: ChatLine)
//Replies with List[ChatLine]
case class ReadLines(channel: String, minId: Long, maxId: Long, minTime: Timestamp, maxTime: Timestamp)
case class ReadLines(channel: Channel, minId: Long, maxId: Long, minTime: Timestamp, maxTime: Timestamp)


val table = TableQuery[ChatTable]
}

/** An actor that handles chat-related database queries */
class ChatDB(val db: Database)(implicit ec: ExecutionContext) extends Actor {

def receive = {
Expand All @@ -311,7 +335,7 @@ class ChatDB(val db: Database)(implicit ec: ExecutionContext) extends Actor {
filter(_.timestamp >= minTime).
filter(_.timestamp <= maxTime).
sortBy(_.id).
take(Chat.READ_MAX_LINES)
take(ChatSystem.READ_MAX_LINES)
val result: Future[List[ChatLine]] = db.run(query.result).map(_.toList)
result pipeTo sender
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/org/playarimaa/server/ChatServlet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ChatServlet(system: ActorSystem)
protected implicit def executor: ExecutionContext = system.dispatcher

val db = Database.forConfig("h2mem1")
val chat = new Chat(db,system)
val chat = new ChatSystem(db,system)

//Before every action runs, set the content type to be in JSON format.
before() {
Expand Down

0 comments on commit a2278e8

Please sign in to comment.