Skip to content

Commit

Permalink
update libraries versions && fix start-up bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ltronky committed Dec 21, 2014
1 parent a620dd8 commit 2842f8e
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 30 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := """TesiBSPScala"""

val akkaVersion = "2.3.7"
val akkaVersion = "2.3.8"

fork in run := true

Expand Down Expand Up @@ -37,7 +37,7 @@ javaOptions in run ++= Seq(
"-XX:+UnlockCommercialFeatures",
"-XX:+FlightRecorder")

assemblyJarName in assembly := "BSPClusterNode.jar"
assemblyJarName in assembly := "BSPScalaNode.jar"

test in assembly := {}

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.6
sbt.version=0.13.7
3 changes: 3 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ akka {

remote {
log-remote-lifecycle-events = off

enabled-transports = ["akka.remote.netty.tcp"]
transport = "akka.remote.netty.NettyRemoteTransport"

log-frame-size-exceeding=5000b
netty.tcp {
hostname = "192.168.56.11"
port = 2551
Expand Down
20 changes: 9 additions & 11 deletions src/main/scala/it/unipd/trluca/bsp/EntryPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import akka.cluster.{Member, Cluster}
import akka.pattern.ask
import akka.util.Timeout
import it.unipd.trluca.bsp.aggregators.WorldClock
import it.unipd.trluca.bsp.engine.{InitStartAgents, JobTerminated, Message, Job}
import it.unipd.trluca.bsp.engine.aggregators.PhaseClock
import it.unipd.trluca.bsp.engine.{InitStartAgents, JobTerminated, Job}

import scala.collection.SortedSet
import scala.concurrent.duration.DurationInt
Expand All @@ -24,12 +23,11 @@ case class CreateConnections(clusterSize:SortedSet[Member], nr:Int) extends Engi
case object PrintResult extends EngineStep

case object TakeDownCluster

case object Done

case class SetInitialSize(c:Config)
case object StartExecution
case object InitAgentsConnections
case class SetInitialSize(c:Config)


class EntryPoint extends Job[Any, V] with Actor with ActorLogging {
Expand Down Expand Up @@ -58,23 +56,23 @@ class EntryPoint extends Job[Any, V] with Actor with ActorLogging {
val wc = context.actorOf(Props[WorldClock])
val response = wc ? CreateConnections(Cluster(context.system).state.members, 3) //TODO numero massimo di connesioni DA un nodo
response map { Done =>
log.info("StartEngine t=" + System.nanoTime())
self ! InitStartAgents
}

case JobTerminated =>
log.info("JobTerminated")
log.info("JobTerminated t=" + System.nanoTime())
val wc = context.actorOf(Props[WorldClock])
val response = wc ? PrintResult
response map { Done =>
log.info("PrintDone")
takeDown()
self ! TakeDownCluster
}
}

def takeDown() = {
Cluster(context.system).state.members foreach { m=>
context.actorSelection(m.address + ConstStr.NODE_ACT_NAME) ! TakeDownCluster
}
case TakeDownCluster =>
Cluster(context.system).state.members foreach { m=>
context.actorSelection(m.address + ConstStr.NODE_ACT_NAME) ! TakeDownCluster
}
}

override def shouldRunAgain(phase:Int) = true
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/it/unipd/trluca/bsp/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ object Main {
def main(args: Array[String]): Unit = {

val parser = new OptionParser[Config]("scopt") {
head("Distributed-Array Sorting", "1.0")
opt[Unit]('d', "debug") optional() action { (x, c) =>
head("BSP Example", "1.0")
opt[Unit]('d', "debug") action { (x, c) =>
c.copy(debug = true)
} text "Debug"
opt[Int]('c', "cSize") action { (x, c) =>
Expand Down
17 changes: 5 additions & 12 deletions src/main/scala/it/unipd/trluca/bsp/MemberListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MemberListener extends Actor with ActorLogging {
nodes += member.address
log.info("Member is Up: {}. {} nodes in cluster", member.address, nodes.size)
if (nodes.size == initCSize)
context.actorSelection(Cluster(context.system).state.members.head.address + "/user/ep") ! StartExecution
context.actorSelection("/user/ep") ! StartExecution

case MemberRemoved(member, _) =>
nodes -= member.address
Expand All @@ -39,17 +39,10 @@ class MemberListener extends Actor with ActorLogging {
log.info("Member detected as unreachable: {}", member)

case LeaderChanged(address) =>
cluster unsubscribe self
if (Cluster(context.system).selfAddress == address.get) {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[LeaderChanged])
log.info(s"leader changed: $address")
} else {
cluster.subscribe(self, classOf[LeaderChanged])
}

case _: MemberEvent => // ignore
case _ => // ignore
log.info(s"leader changed: $address")

//case _: MemberEvent => // ignore
case m:Any => log.info("MessageLost:" + m)// ignore
}

}
4 changes: 2 additions & 2 deletions src/main/scala/it/unipd/trluca/bsp/V.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class V(id:Int) extends Agent[Any, V] with ActorLogging {
edges += as
}
}
log.info("CE " + toString)
//log.info("CE " + toString)
sender() ! Done

case PrintResult =>
log.info("CE " + toString)
//log.info("CE " + toString)
sender() ! Done

case _=>
Expand Down

0 comments on commit 2842f8e

Please sign in to comment.