Skip to content

Commit

Permalink
Merge pull request #15 from ScalaConsultants/akka-streams
Browse files Browse the repository at this point in the history
use akka streams instead of spray ws
  • Loading branch information
marioosh authored Jan 11, 2023
2 parents be50e91 + 65c44b8 commit 01659ad
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 86 deletions.
10 changes: 6 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
name := "slack-scala-bot-core"

version := "0.2.1"
version := "1.0.0"

scalaVersion := "2.11.6"

organization := "io.scalac"

libraryDependencies ++= {
val akkaVersion = "2.3.9"
val akkaVersion = "2.5.23"
val akkaHttpVersion = "10.1.9"
Seq(
"org.mockito" % "mockito-core" % "1.10.19",
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"io.spray" %% "spray-json" % "1.3.6",
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"io.spray" %% "spray-json" % "1.3.6",
"io.spray" %% "spray-client" % "1.3.4",
"io.spray" %% "spray-can" % "1.3.2",
"com.wandoulabs.akka" %% "spray-websocket" % "0.1.4",
"joda-time" % "joda-time" % "2.7",
"org.joda" % "joda-convert" % "1.7",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
Expand Down
15 changes: 8 additions & 7 deletions src/main/scala/io/scalac/slack/api/ApiActor.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.scalac.slack.api

import akka.actor.{Actor, ActorLogging}
import akka.actor.{Actor, ActorLogging, PoisonPill}
import io.scalac.slack.api.ResponseObject._
import io.scalac.slack.common.JsonProtocols._
import io.scalac.slack.common.RichOutboundMessage
Expand Down Expand Up @@ -38,7 +38,7 @@ class ApiActor extends Actor with ActorLogging {
log.debug("auth.test requested")
val send = sender()

SlackApiClient.get[AuthTestResponse]("auth.test", Map("token" -> token.key)) onComplete {
SlackApiClient.post[AuthTestResponse]("auth.test", token = Some(token.key)) onComplete {
case Success(res) =>

if (res.ok)
Expand All @@ -49,11 +49,11 @@ class ApiActor extends Actor with ActorLogging {
send ! ex
}

case RtmStart(token) =>
log.debug("rtm.start requested")
case RtmConnect(token) =>
log.debug("rtm.connect requested")
val send = sender()

SlackApiClient.get[RtmStartResponse]("rtm.start", Map("token" -> token.key)) onComplete {
SlackApiClient.get[RtmConnectResponse]("rtm.connect", token = Some(token.key)) onComplete {

case Success(res) =>
if (res.ok) {
Expand All @@ -62,15 +62,16 @@ class ApiActor extends Actor with ActorLogging {
send ! res
}
case Failure(ex) =>
println(ex)
send ! ex
}
case msg: RichOutboundMessage =>
log.debug("chat.postMessage requested")

val attachments = msg.elements.filter(_.isValid).map(_.toJson).mkString("[", ",", "]")
val params = Map("token" -> Config.apiKey.key, "channel" -> msg.channel, "as_user" -> "true", "attachments" -> attachments)
val params = Map("channel" -> msg.channel, "as_user" -> "true", "attachments" -> attachments)

SlackApiClient.post[ChatPostMessageResponse]("chat.postMessage", params) onComplete {
SlackApiClient.post[ChatPostMessageResponse]("chat.postMessage", params, token = Some(Config.apiKey.key)) onComplete {
case Success(res) =>
if (res.ok) {
log.info("[chat.postMessage]: message delivered: " + res.toString)
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/io/scalac/slack/api/ApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import scala.concurrent.Future
*/
trait ApiClient {

def request[T <: ResponseObject](method: HttpMethod, endpoint: String, params: Map[String, String] = Map.empty[String, String])(implicit reader: JsonReader[T]): Future[T]
def request[T <: ResponseObject](method: HttpMethod,
endpoint: String,
queryParams: Map[String, String] = Map.empty,
token: Option[String])(implicit reader: JsonReader[T]): Future[T]

}
3 changes: 3 additions & 0 deletions src/main/scala/io/scalac/slack/api/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ case object RegisterModules extends Message
//API CALLS
case class ApiTest(param: Option[String] = None, error: Option[String] = None) extends Message
case class AuthTest(token: APIKey) extends Message

@deprecated("Use RtmConnect instead")
case class RtmStart(token: APIKey) extends Message

case class RtmConnect(token: APIKey) extends Message
//API RESPONSES
case object Ok extends Message
case object Connected extends Message
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/io/scalac/slack/api/ResponseObject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ case class ApiTestResponse(ok: Boolean, error: Option[String], args: Option[Map[

case class AuthTestResponse(ok: Boolean, error: Option[String], url: Option[String], team: Option[String], user: Option[String], team_id: Option[String], user_id: Option[String]) extends ResponseObject

@deprecated("Please use RtmConnectResponse")
case class RtmStartResponse(ok: Boolean, url: String, users: List[SlackUser], channels: List[Channel], self: BotInfo, ims: List[DirectChannel]) extends ResponseObject

case class RtmConnectResponse(ok: Boolean, url: String, self: BotInfo, team: Team) extends ResponseObject
object ResponseObject {
implicit def authTestResponseToAuthData(atr: AuthTestResponse): AuthData =
AuthData(atr.url.getOrElse("url"), atr.team.getOrElse("team"), atr.user.getOrElse("user"), atr.team_id.getOrElse("teamID"), atr.user_id.getOrElse("userID"))
}

case class ChatPostMessageResponse(ok: Boolean, channel: String, error: Option[String]) extends ResponseObject

case class BotInfo(id: String, name: String)
case class BotInfo(id: String, name: String)

case class Team(domain: String, id: String, name: String)
23 changes: 17 additions & 6 deletions src/main/scala/io/scalac/slack/api/SlackApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import akka.actor.ActorSystem
import akka.event.Logging
import io.scalac.slack.Config
import spray.client.pipelining._
import spray.http.HttpHeaders.Authorization
import spray.http._
import spray.json._

import scala.concurrent.Future

import spray.httpx.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
/**
* Created on 29.01.15 22:43
*/
Expand All @@ -23,14 +25,23 @@ object SlackApiClient extends ApiClient{
//function from HttpRequest to HttpResponse
val pipeline: HttpRequest => Future[HttpResponse] = sendReceive

def get[T <: ResponseObject](endpoint: String, params: Map[String, String] = Map.empty[String, String])(implicit reader: JsonReader[T]): Future[T] = request(HttpMethods.GET, endpoint, params)
def post[T <: ResponseObject](endpoint: String, params: Map[String, String] = Map.empty[String, String])(implicit reader: JsonReader[T]): Future[T] = request(HttpMethods.POST, endpoint, params)
def get[T <: ResponseObject](endpoint: String, params: Map[String, String] = Map.empty[String, String], token: Option[String] = None)(implicit reader: JsonReader[T]): Future[T] = request(HttpMethods.GET, endpoint, params, token)
def post[T <: ResponseObject](endpoint: String, params: Map[String, String] = Map.empty[String, String], token: Option[String] = None)(implicit reader: JsonReader[T]): Future[T] = request(HttpMethods.POST, endpoint, params, token)

def request[T <: ResponseObject](method: HttpMethod,
endpoint: String,
queryParams: Map[String, String] = Map.empty[String,String],
token: Option[String] = None)(implicit reader: JsonReader[T]): Future[T] = {

def request[T <: ResponseObject](method: HttpMethod, endpoint: String, params: Map[String, String] = Map.empty[String,String])(implicit reader: JsonReader[T]): Future[T] = {
val url = Uri(apiUrl(endpoint)).withQuery(queryParams)

val url = Uri(apiUrl(endpoint)).withQuery(params)
var request = HttpRequest(method, url)

if (token.isDefined) {
request = request.withHeaders(Authorization(OAuth2BearerToken(token.get)))
}

val futureResponse = pipeline(HttpRequest(method, url)).map(_.entity.asString)
val futureResponse = pipeline(request).map(_.entity.asString)
(for {
responseJson <- futureResponse
response = JsonParser(responseJson).convertTo[T]
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/io/scalac/slack/api/Unmarshallers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ object Unmarshallers extends DefaultJsonProtocol {
implicit val channelFormat = jsonFormat(Channel, "name", "creator", "is_member", "is_channel", "id", "is_general", "is_archived", "created", "purpose", "topic", "unread_count", "last_read", "members")
implicit val userFormat = jsonFormat(SlackUser, "id", "name", "deleted", "is_admin", "is_owner", "is_primary_owner", "is_restricted", "is_ultra_restricted", "has_files", "is_bot", "presence")
implicit val botInfoFormat = jsonFormat(BotInfo, "id", "name")
implicit val teamFormat = jsonFormat3(Team)
implicit val imFormat = jsonFormat(DirectChannel, "id", "user")
implicit val rtmStartResponseFormat = jsonFormat(RtmStartResponse, "ok", "url", "users", "channels", "self", "ims")
implicit val rtmConnectResponseFormat = jsonFormat(RtmConnectResponse, "ok", "url", "self", "team")
implicit val chatPostMessageResponse = jsonFormat(ChatPostMessageResponse, "ok", "channel", "error")

}
31 changes: 19 additions & 12 deletions src/main/scala/io/scalac/slack/common/actors/SlackBotActor.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.scalac.slack.common.actors

import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props}
import akka.util.Timeout
import io.scalac.slack.api.{ApiActor, ApiTest, AuthData, AuthTest, BotInfo, Connected, RegisterModules, RtmData, RtmStart, RtmStartResponse, Start, Stop}
import io.scalac.slack.api.{ApiActor, ApiTest, AuthData, AuthTest, BotInfo, Connected, RegisterModules, RtmConnect, RtmConnectResponse, RtmData, RtmStart, RtmStartResponse, Start, Stop}
import io.scalac.slack.common.{BotInfoKeeper, RegisterDirectChannels, RegisterUsers, Shutdownable}
import io.scalac.slack.websockets.{WSActor, WebSocket}
import io.scalac.slack.{BotModules, Config, MessageEventBus, MigrationInProgress, OutgoingRichMessageProcessor, SlackError}
Expand Down Expand Up @@ -35,7 +34,7 @@ class SlackBotActor(modules: BotModules, eventBus: MessageEventBus, master: Shut
case ad: AuthData =>
log.info("authenticated successfully")
log.info("request for websocket connection...")
api ! RtmStart(Config.apiKey)
api ! RtmConnect(Config.apiKey)
case RtmData(url) =>
log.info("fetched WSS URL")
log.info(url)
Expand All @@ -48,7 +47,7 @@ class SlackBotActor(modules: BotModules, eventBus: MessageEventBus, master: Shut

log.info(s"Connecting to host [$host] and resource [$resource]")

websocketClient ! WebSocket.Connect(host, 443, resource, withSsl = true)
websocketClient ! WebSocket.Connect(url)

context.system.scheduler.scheduleOnce(Duration.create(5, TimeUnit.SECONDS), self, RegisterModules)

Expand All @@ -67,17 +66,25 @@ class SlackBotActor(modules: BotModules, eventBus: MessageEventBus, master: Shut
case se: SlackError =>
log.error(s"SlackError occured [${se.toString}]")
master.shutdown()
case res: RtmStartResponse =>
if(usersStorageOpt.isDefined) {
val userStorage = usersStorageOpt.get

userStorage ! RegisterUsers(res.users: _*)
userStorage ! RegisterDirectChannels(res.ims: _*)
}
case res: RtmConnectResponse =>
// if(usersStorageOpt.isDefined) {
// val userStorage = usersStorageOpt.get
//
// userStorage ! RegisterUsers(res.users: _*)
// userStorage ! RegisterDirectChannels(res.ims: _*)
// }

case WebSocket.Release =>
websocketClient ! WebSocket.Release

case ex: Exception =>
log.error("Received exception", ex)
self ! PoisonPill

case other =>
println("what is this?")
println(other)

}

}
106 changes: 58 additions & 48 deletions src/main/scala/io/scalac/slack/websockets/WSActor.scala
Original file line number Diff line number Diff line change
@@ -1,73 +1,83 @@
package io.scalac.slack.websockets

import akka.actor.{Actor, Props}
import akka.io.IO
import akka.{Done, NotUsed}
import akka.actor.{Actor, ActorLogging, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage, WebSocketRequest}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import io.scalac.slack._
import spray.can.Http
import spray.can.server.UHttp
import spray.can.websocket.WebSocketClientWorker
import spray.can.websocket.frame.{CloseFrame, StatusCode, TextFrame}
import spray.http.{HttpHeaders, HttpMethods, HttpRequest}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

/**
* Created on 28.01.15 19:45
*/
class WSActor(eventBus: MessageEventBus) extends Actor with WebSocketClientWorker {
class WSActor(eventBus: MessageEventBus) extends Actor with ActorLogging {

override def receive = connect orElse handshaking orElse closeLogic
private implicit val system = context.system
private implicit val mat = ActorMaterializer()
private implicit val ec: ExecutionContext = context.dispatcher
override def receive = connect()

val out = context.actorOf(Props(classOf[OutgoingMessageProcessor], self, eventBus))
val in = context.actorOf(Props(classOf[IncomingMessageProcessor], eventBus))

private val (sourceQueue, source) = Source.queue[Message](100, OverflowStrategy.fail).preMaterialize()
private def messageSink: Sink[Message, Future[Done]] = Sink.foreach({
case message: TextMessage.Strict =>
log.debug(s"Received $message from websocket")
in ! message.text

case message: TextMessage.Streamed =>
log.debug(s"Received stream message from socket")
val futureString = message.textStream.runWith(Sink.fold("")(_ + _))
futureString.onComplete({
case Failure(exception) =>
log.error("Error consuming streamed buffer", exception)
throw exception
case Success(value) =>
in ! value
})

case message: BinaryMessage =>
log.warning("Received binary message, ignoring")
message.dataStream.runWith(Sink.ignore) // prevent memory leak by consuming buffer into ether
() // ignore binary streamed messages, slack will use only json
})

private def messageSource: Source[Message, NotUsed] = source
private def flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(messageSink, messageSource)(Keep.left)
private def connect(): Receive = {
case WebSocket.Connect(host, port, resource, ssl) =>
val headers = List(
HttpHeaders.Host(host, port),
HttpHeaders.Connection("Upgrade"),
HttpHeaders.RawHeader("Upgrade", "websocket"),
HttpHeaders.RawHeader("Sec-WebSocket-Version", "13"),
HttpHeaders.RawHeader("Sec-WebSocket-Key", Config.websocketKey))
request = HttpRequest(HttpMethods.GET, resource, headers)
IO(UHttp)(context.system) ! Http.Connect(host, port, ssl)
}

override def businessLogic = {
case WebSocket.Release => close()
case TextFrame(msg) => //message received

// Each message without parsing is sent to eventprocessor
// Because all messages from websockets should be read fast
// If EventProcessor slow down with parsing
// can be used dispatcher
println(s"RECEIVED MESSAGE: ${msg.utf8String} ")
in ! msg.utf8String

case WebSocket.Send(message) => //message to send
case WebSocket.Connect(url) =>
val (upgradeResponse, closed) = Http()
.singleWebSocketRequest(WebSocketRequest(url), flow)

closed.onComplete(x => log.info(s"websocket closed ${x}"))
upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}(context.dispatcher)

case WebSocket.Send(msg) =>
log.debug(s"send : $msg")
sourceQueue.offer(TextMessage(msg))

println(s"SENT MESSAGE: $message ")
send(message)
case ignoreThis => // ignore
}

def send(message: String) = connection ! TextFrame(message)

def close() = connection ! CloseFrame(StatusCode.NormalClose)

private var request: HttpRequest = null

override def upgradeRequest = request

}

object WebSocket {

sealed trait WebSocketMessage

case class Connect(
host: String,
port: Int,
resource: String,
withSsl: Boolean = false) extends WebSocketMessage
case class Connect(url: String) extends WebSocketMessage

case class Send(msg: String) extends WebSocketMessage

Expand Down
9 changes: 4 additions & 5 deletions src/test/scala/io/scalac/slack/common/AttachmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ class AttachmentTest extends FunSuite with Matchers {
test("attachment to JSON"){
val att1 = Attachment(Text("sometext"), Field("title 1", "content 1", short = false), Field("title 2", "content 2", short = true), Color.danger)
//language=JSON
val json = """{"fallback":"wrong formatted message","text":"sometext","fields":[{"title":"title 1","value":"content 1","short":false},{"title":"title 2","value":"content 2","short":true}],"color":"danger"}"""

json should equal (att1.toJson.toString())
val json = """{"color":"danger","fallback":"wrong formatted message","fields":[{"short":false,"title":"title 1","value":"content 1"},{"short":true,"title":"title 2","value":"content 2"}],"text":"sometext"}"""
(att1.toJson.toString()) shouldEqual json

}

Expand All @@ -72,8 +71,8 @@ class AttachmentTest extends FunSuite with Matchers {

val fieldJson = field.toJson.toString()
//language=JSON
val json = """{"title":"field title","value":"field value","short":false}"""
json should equal (fieldJson)
val json = """{"short":false,"title":"field title","value":"field value"}"""
fieldJson shouldEqual json
}

}
Loading

0 comments on commit 01659ad

Please sign in to comment.