Skip to content

Commit

Permalink
W00t! actor system is working with the webservice!!!
Browse files Browse the repository at this point in the history
and is quite fast too :)
  • Loading branch information
nvankaam committed Nov 16, 2012
1 parent 9c27ae9 commit 51bd21f
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 24 deletions.
28 changes: 17 additions & 11 deletions application.conf
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
resources {
location="/Users/nielsvankaam/Documents/Studie/FuncProgramming/repo/github-relations-viz/commits"
}

//#Configuration for the linkcombiner actor
//#LinkCombine
LinkCombine {
include "common"




akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"

actor {
provider = "akka.remote.RemoteActorRefProvider"

}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-received-messages = on
log-sent-messages = on
netty.hostname = "145.94.36.61"
netty.port = 2552
}
remote {



transport = "akka.remote.netty.NettyRemoteTransport"
//#log-received-messages = on
log-sent-messages = on
netty.hostname = "145.94.36.61"
netty.port = 2552
}
loglevel = DEBUG
log-config-on-start = on
}
Expand Down
57 changes: 57 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
resources {
location="/Users/nielsvankaam/Documents/Studie/FuncProgramming/repo/github-relations-viz/commits"
}
//#LinkCombine
LinkCombine {
include "common"




akka {

actor {
provider = "akka.remote.RemoteActorRefProvider"

}
remote {



transport = "akka.remote.netty.NettyRemoteTransport"
//#log-received-messages = on
log-sent-messages = on
netty.hostname = "145.94.188.22"
netty.port = 2552
netty.message-frame-size = 1000 MiB
}
loglevel = DEBUG
log-config-on-start = on
}

}
//#remotelookup




//#LinkCombine
LinkCombineHost_ovh {
include "common"
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"

}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-received-messages = on
log-sent-messages = on
netty.hostname = "37.59.53.125"
netty.port = 2552
}
loglevel = DEBUG
log-config-on-start = on
}
}
//#remotelookup
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,23 @@ import akka.util.duration._
import akka.dispatch.{Future,Promise,ExecutionContext}
import java.util.concurrent.Executors
import com.typesafe.config.ConfigFactory
import nl.tudelft.courses.in4355.github_relations_viz.actors.ActorCombinerSet
import nl.tudelft.courses.in4355.github_relations_viz.actors.ActorCombinerConfig
import akka.actor.AddressFromURIString
import nl.tudelft.courses.in4355.github_relations_viz.actors.ActorComputationConfig
import nl.tudelft.courses.in4355.github_relations_viz.actors.LinkComputerConfig
import nl.tudelft.courses.in4355.github_relations_viz.actors.obtainLinksFilterPass
import nl.tudelft.courses.in4355.github_relations_viz.actors.userProjectsPerWeekSkip


class GHRelationsVizDist(projectsurl: URL,
usersurl: URL,
system: ActorSystem) extends GHRelationsViz {
commitsurl: URL,
forksurl: URL,
system: ActorSystem,
minFrom: Int,
maxUntil: Int,
period: Int) extends GHRelationsViz {
import GHRelationsViz._

println( "Reading users" )
Expand All @@ -36,14 +49,44 @@ class GHRelationsVizDist(projectsurl: URL,
val projects = readProjects(projectsurl)
def getProject(id: ProjectRef) = projects.get(id).getOrElse(Project.unknown(id))

implicit val timeout: Timeout = 2400 seconds
implicit val timeout: Timeout = 59 seconds
implicit val ec = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())

val linkCombineActor = system.actorOf(Props[LinkCombineActor], "LinkCombineActor")

val config = ActorCombinerSet(List(
ActorCombinerConfig(
AddressFromURIString("akka://[email protected]:2552"),
ActorCombinerSet(List(
ActorCombinerConfig(
AddressFromURIString("akka://[email protected]:2552"),
ActorComputationConfig(List(
LinkComputerConfig(2, 0),
LinkComputerConfig(2, 1)
))
)
/**
,
ActorCombinerConfig(
AddressFromURIString("akka://[email protected]:2552"),
ActorComputationConfig(List(
LinkComputerConfig(3, 2)
//LinkComputerConfig(10, 9)
//LinkComputerConfig(12, 10),
//LinkComputerConfig(12, 11)
))
)
**/

))
)
))

linkCombineActor ! config

def getProjectLinks(from: Int, until: Int, minWeight: Int) =
(linkCombineActor ? obtainLinks(from,until)).map( _.asInstanceOf[linkResult].map )

def getUserProjectsLinksPerWeek = Promise.successful ( Nil )
(linkCombineActor.ask(obtainLinksFilterPass(from,until,minWeight))).map( _.asInstanceOf[linkResult].map )
def getUserProjectsLinksPerWeek = (linkCombineActor.ask(userProjectsPerWeekSkip())).map(_.asInstanceOf[Seq[(Int, Int)]])

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ class GHRelationsVizServlet extends ScalatraServlet with AkkaSupport {
val projectsurl = new URL(datadir+"projects.txt")
val usersurl = new URL(datadir+"users.txt")
val forksurl = new URL(datadir+"forks.txt")
val commitsurl = new URL(datadir+"smallcommits.txt")
val commitsurl = new URL(datadir+"commits.txt")
val (system: ActorSystem, processor:GHRelationsViz) =
if ( false ) {
if ( true ) {
val s = ActorSystem("ghlink", ConfigFactory.load.getConfig("LinkCombine"))
(s,new GHRelationsVizDist(projectsurl,usersurl,s))
(s,new GHRelationsVizDist(projectsurl,usersurl, commitsurl, forksurl,s,epoch1990,epoch2015,PERIOD))
} else {
(ActorSystem(),
new GHRelationsVizLocal(projectsurl,usersurl,forksurl,commitsurl,epoch1990,epoch2015,PERIOD))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ trait ActorCommand

case class obtainLinks(from : Int, until : Int) extends ActorCommand

case class obtainLinksFilter(From: Int, until : Int, degree : Int) extends ActorCommand

case class obtainLinksFilterPass(From: Int, until : Int, degree : Int) extends ActorCommand

//The config for a child computer to intialise on
case class LinkComputerConfig(modulo: Int, remainder: Int)
Expand All @@ -25,6 +28,11 @@ case class ActorCombinerConfig(system: Address, initCommand: ActorInitCommand)
//Case class which tells a linkCombiner to intialize a set of linkComputers
case class ActorComputationConfig(computers: List[LinkComputerConfig]) extends ActorInitCommand

//Ask for the amount of links per week, needed for histogram
case class userProjectsPerWeek()
//same as above, but skip the first actor. This is a bit of an ugly solution, but a quick fix for the presentation on friday
case class userProjectsPerWeekSkip()

trait linkResults

case class linkResult(map: GenMap[Link, Int])
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import nl.tudelft.courses.in4355.github_relations_viz.Timer
import com.typesafe.config.ConfigFactory
import akka.actor.{ ActorRef, Props, Actor, ActorSystem }
import nl.tudelft.courses.in4355.github_relations_viz.GHEntities.Link
import net.van_antwerpen.scala.collection.mapreduce.Aggregator._
import akka.dispatch.Future
import akka.pattern.{ ask, pipe }
import akka.util.Timeout
Expand All @@ -20,6 +19,10 @@ import akka.actor.ExtendedActorSystem
import akka.actor.Deploy
import akka.remote.RemoteScope
import akka.actor.AddressFromURIString
import net.van_antwerpen.scala.collection.mapreduce.Aggregator._
import net.van_antwerpen.scala.collection.mapreduce.MapReduce._
import scala.collection.immutable.SortedMap
import scala.collection.parallel.ParMap


//Link combiner actor. Able to initialize computer actors, and requesting them to obtain project links
Expand All @@ -35,6 +38,15 @@ class LinkCombineActor extends Actor {
println("Received link obtain command. Sending to %d children.".format(context.children.size))
Future.sequence(for (child <- context.children) yield child.ask(o).mapTo[linkResult].map(_.map))
.map(_.foldLeft(Map[Link,Int]()) ((i,s) => i |<| s)).map(linkResult(_)).pipeTo(sender)
//Another dirty fix for presewntation
case o:obtainLinksFilterPass =>
println("Received link obtain command. Sending to %d children.".format(context.children.size))
Future.sequence(for (child <- context.children) yield child.ask(obtainLinksFilter(o.From, o.until, o.degree)).mapTo[linkResult].map(_.map))
.map(_.foldLeft(Map[Link,Int]()) ((i,s) => i |<| s)).map(linkResult(_)).pipeTo(sender)
case o: obtainLinksFilter =>
println("Received link obtain command with filter. Sending to %d children.".format(context.children.size))
Future.sequence(for (child <- context.children) yield child.ask(obtainLinks(o.From, o.until)).mapTo[linkResult].map(_.map))
.map(_.foldLeft(Map[Link,Int]()) ((i,s) => i |<| s)).map(_.filter(_._2 >= o.degree)).map(linkResult(_)).pipeTo(sender)
//Initializing a series of computers
case ActorComputationConfig(computers) =>
println("Initializing computation actor")
Expand All @@ -50,6 +62,14 @@ class LinkCombineActor extends Actor {
ref ! (configVars.initCommand)
}
}
//Ask a child for the projects per week. Quick dirty fix for the presentation on friday
case userProjectsPerWeekSkip() => {
context.children.head.ask(userProjectsPerWeek()).pipeTo(sender)
}
//Compute the histogram of projects per user per week
case userProjectsPerWeek() => {
sender ! GHResources.commits.mapReduce[SortedMap[Int,Int]]( e => e._1 -> e._2.size ).toList
}
}
}
//#actor
Expand Down Expand Up @@ -161,11 +181,11 @@ object combineLinks {
ActorCombinerConfig(
AddressFromURIString("akka://[email protected]:2552"),
ActorComputationConfig(List(
LinkComputerConfig(3, 0),
LinkComputerConfig(3, 1)
LinkComputerConfig(2, 0),
LinkComputerConfig(2, 1)
))
)

/**
,
ActorCombinerConfig(
AddressFromURIString("akka://[email protected]:2552"),
Expand All @@ -176,7 +196,7 @@ object combineLinks {
//LinkComputerConfig(12, 11)
))
)

**/
))
)
))
Expand Down

0 comments on commit 51bd21f

Please sign in to comment.