diff --git a/build.sbt b/build.sbt index a1a0f07..ed0063b 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := """TesiMRLiteScala""" -val akkaVersion = "2.3.7" +val akkaVersion = "2.3.8" fork in run := true diff --git a/project/build.properties b/project/build.properties index 7875a69..748703f 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1,4 +1 @@ -#Activator-generated Properties -#Sun Oct 26 23:34:56 CET 2014 -template.uuid=d6c1c796-490e-42c1-be7b-6d85600695e8 -sbt.version=0.13.6 +sbt.version=0.13.7 diff --git a/src/main/scala/it/unipd/trluca/mrlite/DistArrayNodeActor.scala b/src/main/scala/it/unipd/trluca/mrlite/DistArrayNodeActor.scala index fae77fa..43e0ba8 100644 --- a/src/main/scala/it/unipd/trluca/mrlite/DistArrayNodeActor.scala +++ b/src/main/scala/it/unipd/trluca/mrlite/DistArrayNodeActor.scala @@ -7,7 +7,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random object Messages { - case class CreateBlock(size: Int, valueRange:Int, leaderAddress:Address) + case class CreateBlock(size: Int, valueRange:Int, mainNodeAddress:Address) case object PrintBlock case object MinEMax } @@ -21,15 +21,15 @@ import it.unipd.trluca.mrlite.Messages._ var originalArray: Array[Int] = Array.empty[Int] var sortedArray: Array[(Int,V2Address)] = Array.empty - var leaderAddress:Address = null + var mainNodeAddress:Address = null override def receive = super.receive orElse localReceive def localReceive:Receive = { - case CreateBlock(size, valueRange, lAddress) => - leaderAddress = lAddress + case CreateBlock(size, valueRange, mAddress) => + mainNodeAddress = mAddress originalArray = Array.fill(size)(Random.nextInt(valueRange)) - if (leaderAddress == Cluster(context.system).selfAddress) + if (mainNodeAddress == Cluster(context.system).selfAddress) context.actorOf(Props[SinkReceiver], "sinkreceiver") // log.info("Contains {}", originalArray.mkString(",")) @@ -122,7 +122,7 @@ import it.unipd.trluca.mrlite.Messages._ } override def sink(s: Iterable[(Int, Seq[(Int, V2Address)])]): Unit = { - val destination = context.actorSelection(leaderAddress + Consts.NODE_ACT_NAME + "/sinkreceiver") + val destination = context.actorSelection(mainNodeAddress + Consts.NODE_ACT_NAME + "/sinkreceiver") s foreach { item => val list = item._2.grouped(Consts.CHUNK_SIZE).toList diff --git a/src/main/scala/it/unipd/trluca/mrlite/MemberListener.scala b/src/main/scala/it/unipd/trluca/mrlite/MemberListener.scala index 91aeba5..04c5ab8 100644 --- a/src/main/scala/it/unipd/trluca/mrlite/MemberListener.scala +++ b/src/main/scala/it/unipd/trluca/mrlite/MemberListener.scala @@ -39,14 +39,7 @@ class MemberListener extends Actor with ActorLogging { log.info("Member detected as unreachable: {}", member) case LeaderChanged(address) => - cluster unsubscribe self - if (Cluster(context.system).selfAddress == address.get) { - cluster.subscribe(self, classOf[MemberEvent]) - cluster.subscribe(self, classOf[LeaderChanged]) - log.info(s"leader changed: $address") - } else { - cluster.subscribe(self, classOf[LeaderChanged]) - } + log.info(s"leader changed: $address") //case _: MemberEvent => // ignore case m:Any => log.info("MessageLost:" + m)// ignore diff --git a/src/main/scala/it/unipd/trluca/mrlite/aggregators/InitAggregator.scala b/src/main/scala/it/unipd/trluca/mrlite/aggregators/InitAggregator.scala index d4b6b24..7c9a063 100644 --- a/src/main/scala/it/unipd/trluca/mrlite/aggregators/InitAggregator.scala +++ b/src/main/scala/it/unipd/trluca/mrlite/aggregators/InitAggregator.scala @@ -8,7 +8,7 @@ import it.unipd.trluca.mrlite.Consts import scala.collection.mutable.ArrayBuffer -case class InitArray(arraySize:Int, valueRange:Int, leaderAddress:Address) +case class InitArray(arraySize:Int, valueRange:Int, mainNodeAddress:Address) class InitAggregator extends Actor with Aggregator { val results = ArrayBuffer.empty[Unit] @@ -16,7 +16,7 @@ class InitAggregator extends Actor with Aggregator { var clusterSize:Int = 0 expectOnce { - case InitArray(distArraySize, valueRange, leaderAddress) => + case InitArray(distArraySize, valueRange, mainNodeAddress) => val members = Cluster(context.system).state.members clusterSize = members.size originalSender = sender() @@ -24,7 +24,7 @@ class InitAggregator extends Actor with Aggregator { var rest = distArraySize % clusterSize members foreach { member => context.actorSelection(member.address + Consts.NODE_ACT_NAME) ! - CreateBlock(portion + (if (rest > 0) 1 else 0), valueRange, leaderAddress) + CreateBlock(portion + (if (rest > 0) 1 else 0), valueRange, mainNodeAddress) rest -= 1 } }