Skip to content

Commit

Permalink
update libraries && fix start-up bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ltronky committed Dec 21, 2014
1 parent e1219c7 commit e75aa77
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 22 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := """TesiMRLiteScala"""

val akkaVersion = "2.3.7"
val akkaVersion = "2.3.8"

fork in run := true

Expand Down
5 changes: 1 addition & 4 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
#Activator-generated Properties
#Sun Oct 26 23:34:56 CET 2014
template.uuid=d6c1c796-490e-42c1-be7b-6d85600695e8
sbt.version=0.13.6
sbt.version=0.13.7
12 changes: 6 additions & 6 deletions src/main/scala/it/unipd/trluca/mrlite/DistArrayNodeActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random

object Messages {
case class CreateBlock(size: Int, valueRange:Int, leaderAddress:Address)
case class CreateBlock(size: Int, valueRange:Int, mainNodeAddress:Address)
case object PrintBlock
case object MinEMax
}
Expand All @@ -21,15 +21,15 @@ import it.unipd.trluca.mrlite.Messages._
var originalArray: Array[Int] = Array.empty[Int]
var sortedArray: Array[(Int,V2Address)] = Array.empty

var leaderAddress:Address = null
var mainNodeAddress:Address = null

override def receive = super.receive orElse localReceive
def localReceive:Receive = {
case CreateBlock(size, valueRange, lAddress) =>
leaderAddress = lAddress
case CreateBlock(size, valueRange, mAddress) =>
mainNodeAddress = mAddress
originalArray = Array.fill(size)(Random.nextInt(valueRange))

if (leaderAddress == Cluster(context.system).selfAddress)
if (mainNodeAddress == Cluster(context.system).selfAddress)
context.actorOf(Props[SinkReceiver], "sinkreceiver")

// log.info("Contains {}", originalArray.mkString(","))
Expand Down Expand Up @@ -122,7 +122,7 @@ import it.unipd.trluca.mrlite.Messages._
}

override def sink(s: Iterable[(Int, Seq[(Int, V2Address)])]): Unit = {
val destination = context.actorSelection(leaderAddress + Consts.NODE_ACT_NAME + "/sinkreceiver")
val destination = context.actorSelection(mainNodeAddress + Consts.NODE_ACT_NAME + "/sinkreceiver")

s foreach { item =>
val list = item._2.grouped(Consts.CHUNK_SIZE).toList
Expand Down
9 changes: 1 addition & 8 deletions src/main/scala/it/unipd/trluca/mrlite/MemberListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,7 @@ class MemberListener extends Actor with ActorLogging {
log.info("Member detected as unreachable: {}", member)

case LeaderChanged(address) =>
cluster unsubscribe self
if (Cluster(context.system).selfAddress == address.get) {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[LeaderChanged])
log.info(s"leader changed: $address")
} else {
cluster.subscribe(self, classOf[LeaderChanged])
}
log.info(s"leader changed: $address")

//case _: MemberEvent => // ignore
case m:Any => log.info("MessageLost:" + m)// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ import it.unipd.trluca.mrlite.Consts

import scala.collection.mutable.ArrayBuffer

case class InitArray(arraySize:Int, valueRange:Int, leaderAddress:Address)
case class InitArray(arraySize:Int, valueRange:Int, mainNodeAddress:Address)

class InitAggregator extends Actor with Aggregator {
val results = ArrayBuffer.empty[Unit]
var originalSender:ActorRef = null
var clusterSize:Int = 0

expectOnce {
case InitArray(distArraySize, valueRange, leaderAddress) =>
case InitArray(distArraySize, valueRange, mainNodeAddress) =>
val members = Cluster(context.system).state.members
clusterSize = members.size
originalSender = sender()
val portion = distArraySize / clusterSize
var rest = distArraySize % clusterSize
members foreach { member =>
context.actorSelection(member.address + Consts.NODE_ACT_NAME) !
CreateBlock(portion + (if (rest > 0) 1 else 0), valueRange, leaderAddress)
CreateBlock(portion + (if (rest > 0) 1 else 0), valueRange, mainNodeAddress)
rest -= 1
}
}
Expand Down

0 comments on commit e75aa77

Please sign in to comment.