From f51a89f3ef5e50b2d0aa0f1beaa3f0adb6bc63f6 Mon Sep 17 00:00:00 2001 From: Dmitry Erastov Date: Sun, 4 Feb 2018 22:14:08 -0500 Subject: [PATCH 1/3] Remove PubSubServer and dependency on Akka Code moved to https://github.com/debasishg/akka-redis-pubsub --- project/ScalaRedisProject.scala | 10 +- src/main/scala/com/redis/PubSubServer.scala | 44 ----- src/test/resources/application.conf | 45 ------ src/test/scala/com/redis/Patterns.scala | 20 ++- .../scala/com/redis/PubSubServerDemo.scala | 152 ------------------ 5 files changed, 11 insertions(+), 260 deletions(-) delete mode 100644 src/main/scala/com/redis/PubSubServer.scala delete mode 100644 src/test/resources/application.conf delete mode 100644 src/test/scala/com/redis/PubSubServerDemo.scala diff --git a/project/ScalaRedisProject.scala b/project/ScalaRedisProject.scala index 7f84087f..25ed9453 100644 --- a/project/ScalaRedisProject.scala +++ b/project/ScalaRedisProject.scala @@ -19,7 +19,7 @@ object ScalaRedisProject extends Build lazy val coreSettings = commonSettings ++ Seq( name := "RedisClient", - libraryDependencies := Seq( + libraryDependencies ++= Seq( "commons-pool" % "commons-pool" % "1.6", "org.slf4j" % "slf4j-api" % "1.7.25", "org.slf4j" % "slf4j-log4j12" % "1.7.25" % "provided", @@ -27,14 +27,8 @@ object ScalaRedisProject extends Build "junit" % "junit" % "4.12" % "test", "org.scalatest" %% "scalatest" % "3.0.4" % "test"), - libraryDependencies += { - if(scalaVersion.value.startsWith("2.12")) - "com.typesafe.akka" %% "akka-actor" % "2.4.12" - else - "com.typesafe.akka" %% "akka-actor" % "2.3.6" - }, parallelExecution in Test := false, - publishTo <<= version { (v: String) => + publishTo <<= version { (v: String) => val nexus = "https://oss.sonatype.org/" if (v.trim.endsWith("SNAPSHOT")) Some("snapshots" at nexus + "content/repositories/snapshots") else Some("releases" at nexus + "service/local/staging/deploy/maven2") diff --git a/src/main/scala/com/redis/PubSubServer.scala b/src/main/scala/com/redis/PubSubServer.scala deleted file mode 100644 index 153ba2b2..00000000 --- a/src/main/scala/com/redis/PubSubServer.scala +++ /dev/null @@ -1,44 +0,0 @@ -package com.redis - -import akka.actor.Actor - -sealed trait Msg -case class Subscribe(channels: Array[String]) extends Msg -case class Register(callback: PubSubMessage => Any) extends Msg -case class Unsubscribe(channels: Array[String]) extends Msg -case object UnsubscribeAll extends Msg -case class Publish(channel: String, msg: String) extends Msg - -@deprecated( - "Will be removed in the next version; please use https://github.com/debasishg/akka-redis-pubsub", "3.5") -class Subscriber(client: RedisClient) extends Actor { - var callback: PubSubMessage => Any = { m => } - - def receive = { - case Subscribe(channels) => - client.subscribe(channels.head, channels.tail: _*)(callback) - sender ! true - - case Register(cb) => - callback = cb - sender ! true - - case Unsubscribe(channels) => - client.unsubscribe(channels.head, channels.tail: _*) - sender ! true - - case UnsubscribeAll => - client.unsubscribe - sender ! true - } -} - -@deprecated( - "Will be removed in the next version; please use https://github.com/debasishg/akka-redis-pubsub", "3.5") -class Publisher(client: RedisClient) extends Actor { - def receive = { - case Publish(channel, message) => - client.publish(channel, message) - sender ! true - } -} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf deleted file mode 100644 index 08643aae..00000000 --- a/src/test/resources/application.conf +++ /dev/null @@ -1,45 +0,0 @@ -akka { - actor { - default-dispatcher { - type = "Dispatcher" - - executor = "thread-pool-executor" - - thread-pool-executor { - # Keep alive time for threads - keep-alive-time = 60s - - # Min number of threads to cap factor-based core number to - core-pool-size-min = 8 - - # The core pool size factor is used to determine thread pool core size - # using the following formula: ceil(available processors * factor). - # Resulting size is then bounded by the core-pool-size-min and - # core-pool-size-max values. - core-pool-size-factor = 3.0 - - # Max number of threads to cap factor-based number to - core-pool-size-max = 64 - - # Minimum number of threads to cap factor-based max number to - # (if using a bounded task queue) - max-pool-size-min = 8 - - # Max no of threads (if using a bounded task queue) is determined by - # calculating: ceil(available processors * factor) - max-pool-size-factor = 3.0 - - # Max number of threads to cap factor-based max number to - # (if using a bounded task queue) - max-pool-size-max = 64 - - # Allow core threads to time out - allow-core-timeout = on - } - - # Throughput defines the number of messages that are processed in a batch - # before the thread is returned to the pool. Set to 1 for as fair as possible. - throughput = 5 - } - } -} diff --git a/src/test/scala/com/redis/Patterns.scala b/src/test/scala/com/redis/Patterns.scala index 041b954d..c7a3adf2 100644 --- a/src/test/scala/com/redis/Patterns.scala +++ b/src/test/scala/com/redis/Patterns.scala @@ -1,9 +1,8 @@ package com.redis import serialization._ -import scala.concurrent.{ ExecutionContext, Await, Future } +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -import akka.actor.ActorSystem /** * Implementing some of the common patterns like scatter/gather, which can benefit from @@ -25,25 +24,24 @@ object Patterns { def listPush(count: Int, key: String)(implicit clients: RedisClientPool) = { clients.withClient { client => (1 to count) foreach {i => client.rpush(key, i)} - assert(client.llen(key) == Some(count)) + assert(client.llen(key).contains(count)) } key } def listPop(count: Int, key: String)(implicit clients: RedisClientPool) = { - implicit val parseInt = Parse[Long](new String(_).toLong) + implicit val parseInt: Parse[Long] = Parse[Long](new String(_).toLong) clients.withClient { client => - val list = (1 to count) map {i => client.lpop[Long](key).get} - assert(client.llen(key) == Some(0)) + val list = (1 to count) map {_ => client.lpop[Long](key).get} + assert(client.llen(key).contains(0)) list.sum } } // set up Executors - val system = ActorSystem("ScatterGatherSystem") - import system.dispatcher + import scala.concurrent.ExecutionContext.Implicits.global - val timeout = 5 minutes + val timeout: Duration = 5 minutes private[this] def flow[A](noOfRecipients: Int, opsPerClient: Int, keyPrefix: String, fn: (Int, String) => A) = { @@ -69,7 +67,7 @@ object Patterns { val allPops = Future.sequence(futurePops) allPops map {members => members.sum} } - Await.result(allSum, timeout).asInstanceOf[Long] + Await.result(allSum, timeout) } // scatter across clietns and gather the first future to complete @@ -84,6 +82,6 @@ object Patterns { val firstSum = firstPush map {key => listPop(opsPerClient, key) } - Await.result(firstSum, timeout).asInstanceOf[Int] + Await.result(firstSum, timeout) } } diff --git a/src/test/scala/com/redis/PubSubServerDemo.scala b/src/test/scala/com/redis/PubSubServerDemo.scala deleted file mode 100644 index 30a94aa1..00000000 --- a/src/test/scala/com/redis/PubSubServerDemo.scala +++ /dev/null @@ -1,152 +0,0 @@ -package com.redis - -import akka.actor.{ Actor, ActorSystem, Props } - -case class PublishMessage(channel: String, message: String) -case class SubscribeMessage(channels: List[String]) -case class UnsubscribeMessage(channels: List[String]) -case object GoDown - -class Pub extends Actor { - println("starting publishing service ..") - val system = ActorSystem("pub") - val r = new RedisClient("localhost", 6379) - val p = system.actorOf(Props(new Publisher(r))) - - def receive = { - case PublishMessage(ch, msg) => publish(ch, msg) - case GoDown => - r.quit - system.shutdown() - system.awaitTermination() - - case x => println("Got in Pub " + x) - } - - def publish(channel: String, message: String) = { - p ! Publish(channel, message) - } -} - - -class Sub extends Actor { - println("starting subscription service ..") - val system = ActorSystem("sub") - val r = new RedisClient("localhost", 6379) - val s = system.actorOf(Props(new Subscriber(r))) - s ! Register(callback) - - def receive = { - case SubscribeMessage(chs) => sub(chs) - case UnsubscribeMessage(chs) => unsub(chs) - case GoDown => - r.quit - system.shutdown() - system.awaitTermination() - - case x => println("Got in Sub " + x) - } - - def sub(channels: List[String]) = { - s ! Subscribe(channels.toArray) - } - - def unsub(channels: List[String]) = { - s ! Unsubscribe(channels.toArray) - } - - def callback(pubsub: PubSubMessage) = pubsub match { - case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup") - case S(channel, no) => println("subscribed to " + channel + " and count = " + no) - case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no) - case M(channel, msg) => - msg match { - // exit will unsubscribe from all channels and stop subscription service - case "exit" => - println("unsubscribe all ..") - r.unsubscribe - - // message "+x" will subscribe to channel x - case x if x startsWith "+" => - val s: Seq[Char] = x - s match { - case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => } - } - - // message "-x" will unsubscribe from channel x - case x if x startsWith "-" => - val s: Seq[Char] = x - s match { - case Seq('-', rest @ _*) => r.unsubscribe(rest.toString) - } - - // other message receive - case x => - println("received message on channel " + channel + " as : " + x) - } - } -} - -/** -Welcome to Scala version 2.10.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_51). -Type in expressions to have them evaluated. -Type :help for more information. - -scala> import com.redis._ -import com.redis._ - -scala> import akka.actor.{ Actor, ActorSystem, Props } -import akka.actor.{Actor, ActorSystem, Props} - -scala> val ps = ActorSystem("pub") -ps: akka.actor.ActorSystem = akka://pub - -scala> val ss = ActorSystem("sub") -ss: akka.actor.ActorSystem = akka://sub - -scala> val p = ps.actorOf(Props(new Pub)) -p: akka.actor.ActorRef = Actor[akka://pub/user/$a#2075877062] - -scala> starting publishing service .. - - -scala> val s = ss.actorOf(Props(new Sub)) -s: akka.actor.ActorRef = Actor[akka://sub/user/$a#724245975] - -scala> starting subscription service .. -Got in Sub true - - -scala> p ! PublishMessage("a", "hello world") - -scala> Got in Pub true - - -scala> s ! SubscribeMessage(List("a")) - -scala> Got in Sub true -subscribed to a and count = 1 - -$ ./redis-cli -redis 127.0.0.1:6379> publish a "hi there" -(integer) 1 - -scala> p ! PublishMessage("b", "+c") - -scala> Got in Pub true - - -scala> p ! PublishMessage("b", "+d") - -scala> Got in Pub true - - -scala> p ! PublishMessage("b", "-c") - -scala> Got in Pub true - - -scala> p ! PublishMessage("b", "exit") - -scala> Got in Pub true -**/ From e72d017d803437d9cee0ddf70030edd530349aca Mon Sep 17 00:00:00 2001 From: Dmitry Erastov Date: Sun, 11 Feb 2018 12:39:10 -0500 Subject: [PATCH 2/3] Migrate to sbt 1.x --- build.sbt | 59 +++++++++++++++++++++++++++++ project/ScalaRedisProject.scala | 66 --------------------------------- project/build.properties | 2 +- 3 files changed, 60 insertions(+), 67 deletions(-) create mode 100644 build.sbt delete mode 100644 project/ScalaRedisProject.scala diff --git a/build.sbt b/build.sbt new file mode 100644 index 00000000..e32a86f9 --- /dev/null +++ b/build.sbt @@ -0,0 +1,59 @@ +name := "RedisClient" + +lazy val redisClient = (project in file(".")).settings(coreSettings : _*) + +lazy val commonSettings: Seq[Setting[_]] = Seq( + organization := "net.debasishg", + version := "3.5", + scalaVersion := "2.11.12", + crossScalaVersions := Seq("2.12.4", "2.11.12", "2.10.7"), + + scalacOptions in Compile ++= Seq( "-unchecked", "-feature", "-language:postfixOps", "-deprecation" ), + + resolvers ++= Seq( + "typesafe repo" at "http://repo.typesafe.com/typesafe/releases/" + ) +) + +lazy val coreSettings = commonSettings ++ Seq( + name := "RedisClient", + libraryDependencies ++= Seq( + "commons-pool" % "commons-pool" % "1.6", + "org.slf4j" % "slf4j-api" % "1.7.25", + "org.slf4j" % "slf4j-log4j12" % "1.7.25" % "provided", + "log4j" % "log4j" % "1.2.17" % "provided", + "junit" % "junit" % "4.12" % "test", + "org.scalatest" %% "scalatest" % "3.0.4" % "test"), + + parallelExecution in Test := false, + publishTo := version { (v: String) => + val nexus = "https://oss.sonatype.org/" + if (v.trim.endsWith("SNAPSHOT")) Some("snapshots" at nexus + "content/repositories/snapshots") + else Some("releases" at nexus + "service/local/staging/deploy/maven2") + }.value, + credentials += Credentials(Path.userHome / ".sbt" / "sonatype.credentials"), + publishMavenStyle := true, + publishArtifact in Test := false, + pomIncludeRepository := { repo => false }, + pomExtra := ( + https://github.com/debasishg/scala-redis + + + Apache 2.0 License + http://www.apache.org/licenses/LICENSE-2.0.html + repo + + + + git@github.com:debasishg/scala-redis.git + scm:git:git@github.com:debasishg/scala-redis.git + + + + debasishg + Debasish Ghosh + http://debasishg.blogspot.com + + ), + unmanagedResources in Compile += baseDirectory.map( _ / "LICENSE" ).value +) diff --git a/project/ScalaRedisProject.scala b/project/ScalaRedisProject.scala deleted file mode 100644 index 25ed9453..00000000 --- a/project/ScalaRedisProject.scala +++ /dev/null @@ -1,66 +0,0 @@ -import sbt._ -import Keys._ - -object ScalaRedisProject extends Build -{ - import Resolvers._ - lazy val root = Project("RedisClient", file(".")) settings(coreSettings : _*) - - lazy val commonSettings: Seq[Setting[_]] = Seq( - organization := "net.debasishg", - version := "3.5", - scalaVersion := "2.11.12", - crossScalaVersions := Seq("2.12.4", "2.11.12", "2.10.7"), - - scalacOptions in Compile ++= Seq( "-unchecked", "-feature", "-language:postfixOps", "-deprecation" ), - - resolvers ++= Seq(akkaRepo) - ) - - lazy val coreSettings = commonSettings ++ Seq( - name := "RedisClient", - libraryDependencies ++= Seq( - "commons-pool" % "commons-pool" % "1.6", - "org.slf4j" % "slf4j-api" % "1.7.25", - "org.slf4j" % "slf4j-log4j12" % "1.7.25" % "provided", - "log4j" % "log4j" % "1.2.17" % "provided", - "junit" % "junit" % "4.12" % "test", - "org.scalatest" %% "scalatest" % "3.0.4" % "test"), - - parallelExecution in Test := false, - publishTo <<= version { (v: String) => - val nexus = "https://oss.sonatype.org/" - if (v.trim.endsWith("SNAPSHOT")) Some("snapshots" at nexus + "content/repositories/snapshots") - else Some("releases" at nexus + "service/local/staging/deploy/maven2") - }, - credentials += Credentials(Path.userHome / ".sbt" / "sonatype.credentials"), - publishMavenStyle := true, - publishArtifact in Test := false, - pomIncludeRepository := { repo => false }, - pomExtra := ( - https://github.com/debasishg/scala-redis - - - Apache 2.0 License - http://www.apache.org/licenses/LICENSE-2.0.html - repo - - - - git@github.com:debasishg/scala-redis.git - scm:git:git@github.com:debasishg/scala-redis.git - - - - debasishg - Debasish Ghosh - http://debasishg.blogspot.com - - ), - unmanagedResources in Compile <+= baseDirectory map { _ / "LICENSE" } - ) -} - -object Resolvers { - val akkaRepo = "typesafe repo" at "http://repo.typesafe.com/typesafe/releases/" -} diff --git a/project/build.properties b/project/build.properties index 133a8f19..8b697bbb 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.17 +sbt.version=1.1.0 From 083a4df227fdd715841166bf60188fb94ac73dec Mon Sep 17 00:00:00 2001 From: Dmitry Erastov Date: Sun, 11 Feb 2018 12:39:29 -0500 Subject: [PATCH 3/3] Bump version to 3.6 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index e32a86f9..2b3522d9 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ lazy val redisClient = (project in file(".")).settings(coreSettings : _*) lazy val commonSettings: Seq[Setting[_]] = Seq( organization := "net.debasishg", - version := "3.5", + version := "3.6", scalaVersion := "2.11.12", crossScalaVersions := Seq("2.12.4", "2.11.12", "2.10.7"),