Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V0.3 #3

Open
wants to merge 1 commit into
base: v0.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ project/target
null
.bsp
.idea
db
db.wal.0
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#GITHUBRANK CHALLANGE BY @lucasrpb

For the implementation of the task it was used Akka Streams because it is a functional and composable library who allows
us to express pretty complex computations using basic abstract components: Source (the input), Flow (the computation) and
Sink (the output). The library comes with very useful default components. Among those there is a one called throttle,
which was used in the implementation to overcome the GitHub API request rate limit :)

The HTTP Etag cache specification supported by GitHub it was also implemented to help reduce the number of api request
counts. For the content storage MapDB, a popular Java collection implementation, was used as persistent storage. In memory
cache layer was also implemented to speed up things a little bit!

**Be aware that for big organizations the time to get a response can be huge! This is due to the throttling!**

To run the server you must first set a system variable called GH_TOKEN with the value as your github developer token! (this
is necessary because without it the request limit by hour is only 60!). On Windows it can be set on environment variables
section (unfortunately I was not able to make it work on Windows Terminal :/). On Linux and Mac you can do in the terminal:

**export GH_TOKEN="my_token_goes_here"**

With token set, you will be able to run the application doing: sbt run

The server runs at port 8080. To check the rank of contributions for a certain repo you can use curl as follows (this is
an example):

**curl -X GET localhost:8080/org/spray/contributors**

Finally, for testing the solution:

**sbt test**

**One more thing: to stop the server, press any key :P**

That's all, folks!
17 changes: 14 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

name := "githubrank"

version := "0.1"
Expand All @@ -17,10 +18,20 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,

"com.typesafe.akka" %% "akka-stream-contrib" % "0.11+3-08ccb218",
"com.typesafe.akka" %% "akka-stream-contrib" % "0.11+3-08ccb218" % Test,
"com.typesafe.akka" %% "akka-stream-contrib" % "0.11+3-08ccb218" % Test,

"com.github.ben-manes.caffeine" % "caffeine" % "3.0.3",
"org.mapdb" % "mapdb" % "3.0.8",

"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,

"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion,
"com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion
)

dependencyOverrides += "com.typesafe.akka" %% "akka-discovery" % "2.6.15"

enablePlugins(AkkaGrpcPlugin)
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.0.0")
13 changes: 13 additions & 0 deletions src/main/protobuf/client.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

import "protocol.proto";

package io.scalac.githubrank.grpc;
//#options

//#services
/*service CommanderClient {
rpc sendCommand (CommandRequest) returns (CommandResponse) {}
rpc read(ReadRequest) returns(ReadResponse) {}
}*/
//#services
9 changes: 9 additions & 0 deletions src/main/protobuf/protocol.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";

package io.scalac.githubrank.grpc;

message CacheItem {
string tag = 1;
string lastModified = 2;
bytes entity = 3;
}
9 changes: 9 additions & 0 deletions src/main/scala/io/scalac/githubrank/GithubAPIConsumer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.scalac.githubrank

import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import scala.concurrent.Future

trait GithubAPIConsumer {
def request(req: HttpRequest): Future[HttpResponse]
def close(): Future[Unit]
}
9 changes: 9 additions & 0 deletions src/main/scala/io/scalac/githubrank/GithubAPIService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.scalac.githubrank

import spray.json.JsValue
import scala.concurrent.Future

trait GithubAPIService {
def getContributors(org: String): Future[JsValue]
def close(): Future[Unit]
}
88 changes: 54 additions & 34 deletions src/main/scala/io/scalac/githubrank/HttpCache.scala
Original file line number Diff line number Diff line change
@@ -1,51 +1,71 @@
package io.scalac.githubrank

import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.http.scaladsl.model.ResponseEntity
import akka.util.Timeout
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, RemovalCause}
import com.google.protobuf.any.Any
import io.scalac.githubrank.grpc._
import org.mapdb.{DBMaker, Serializer}
import org.slf4j.LoggerFactory

import scala.collection.concurrent.TrieMap
import scala.concurrent.Future
import java.util.concurrent.Executor
import scala.concurrent.ExecutionContext

object HttpCache {
/**
* This is an implementation of an in-memory and persistent cache for the Http ETag cache supported by the GitHub API
* When the GitHub API returns a response code 304 (Not Modified), the data is fetched from the local cache, thus not
* increasing the rate request limit of the GitHub API! :)
* @param MAX_ENTRIES Number of max entries in the cache
* @param ec
*/
class HttpCache(val MAX_ENTRIES: Int)(implicit val ec: ExecutionContext) {

sealed trait Command
protected case class GetItem(url: String, sender: ActorRef[Option[(String, String, ResponseEntity)]]) extends Command
protected case class PutItem(url: String, data: (String, String, ResponseEntity), sender: ActorRef[Boolean]) extends Command
protected val db = DBMaker.fileDB("db")
.transactionEnable()
.closeOnJvmShutdown()
.make()

def apply(): Behavior[Command] = Behaviors.setup { ctx =>
val cache = TrieMap.empty[String, (String, String, ResponseEntity)]
val entitiesCollection = db.treeMap("responses_cache")
.keySerializer(Serializer.STRING)
.valueSerializer(Serializer.BYTE_ARRAY)
.createOrOpen()

Behaviors.receiveMessage {
case GetItem(url, sender) =>
sender ! cache.get(url)
Behaviors.same
val logger = LoggerFactory.getLogger(this.getClass)

case PutItem(url, data, sender) =>
cache.put(url, data)
sender ! true
Behaviors.same
val cache = Caffeine.newBuilder()
.maximumSize(MAX_ENTRIES)
.executor(ec.asInstanceOf[Executor])
.removalListener((key: String, value: CacheItem, cause: RemovalCause) => {
if(cause.wasEvicted())
logger.debug(s"REMOVING ${key} FROM CACHE $cause\n")
})
.build[String, CacheItem](
// This code loads the cached response for an URL (if not in memory) from the file database
new CacheLoader[String, CacheItem] {
override def load(key: String): CacheItem = entitiesCollection.get(key) match {
case res if res == null => null
case res =>

case _ => Behaviors.same
}
}
val item = Any.parseFrom(res).unpack(CacheItem)

val system = ActorSystem.create[Command](apply(), "GithubCache")
implicit val sc = system.scheduler
implicit val ec = system.executionContext
logger.info(s"reading from db: ${item}")

def get(url: String)(implicit timeout: Timeout): Future[Option[(String, String, ResponseEntity)]] = {
system.ask[Option[(String, String, ResponseEntity)]] { sender =>
GetItem(url, sender)
}
item
}
})

def put(key: String, value: CacheItem): Unit = synchronized {
cache.put(key, value)
entitiesCollection.put(key, Any.pack(value).toByteArray)
}

def put(url: String, data: (String, String, ResponseEntity))(implicit timeout: Timeout): Future[Boolean] = {
system.ask[Boolean] { sender =>
PutItem(url, data, sender)
def get(key: String): Option[CacheItem] = {
cache.get(key) match {
case res if res == null => None
case res => Some(res)
}
}

def close(): Unit = {
db.close()
}

}
133 changes: 5 additions & 128 deletions src/main/scala/io/scalac/githubrank/Main.scala
Original file line number Diff line number Diff line change
@@ -1,140 +1,17 @@
package io.scalac.githubrank

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import org.slf4j.LoggerFactory
import akka.http.scaladsl.client.RequestBuilding._
import akka.http.scaladsl.model.{HttpHeader, HttpRequest, ResponseEntity, StatusCode, StatusCodes}
import akka.http.scaladsl.model.HttpProtocols._
import akka.http.scaladsl.model.MediaTypes._
import akka.http.scaladsl.model.HttpCharsets._
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model.headers.{Authorization, ETag, EntityTag, GenericHttpCredentials, RawHeader}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.{ByteString, Timeout}
import spray.json._
import DefaultJsonProtocol._
import akka.{Done, NotUsed}
import akka.stream.contrib.PagedSource

import scala.collection.concurrent.TrieMap
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.{Duration, DurationInt}
import io.scalac.githubrank.impl.{DefaultGithubAPIConsumer, DefaultGithubAPIService}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

object Main {

/*sealed trait Command

case class Get(url: String, sender: ActorRef[ResponseEntity]) extends Command

def rateLimitedAPIConsumer(): Behavior[Command] = Behaviors.setup { ctx =>

Behaviors.receive {
case _ => Behaviors.same
}
}*/

def main(args: Array[String]): Unit = {

val logger = LoggerFactory.getLogger(this.getClass)

implicit val actorSystem = ActorSystem.create[Nothing](Behaviors.empty[Nothing], "alpakka-samples")
import actorSystem.executionContext

val pageSize = 100

/*val httpRequest = HttpRequest(uri = "https://api.github.com/orgs/ScalaConsultants/repos")
.withHeaders(RawHeader("Accept", "application/vnd.github.v3+json"),
Authorization(GenericHttpCredentials("token", "")))*/

val GITHUB_HEADER = RawHeader("Accept", "application/vnd.github.v3+json")
val AUTH_HEADER = Authorization(GenericHttpCredentials("token", scala.sys.env("GH_TOKEN")))

case class HttpResponseException(code: StatusCode) extends Throwable
case class UnmarshalResponseException(msg: String) extends Throwable
case class HttpConnectionException(msg: String) extends Throwable

case class MyURI(url: String, page: Int)

case class Repository(name: String, full_name: String)
case class Contribution(login: String, contributions: Int)

implicit val repositoryFormat = jsonFormat2(Repository)
implicit val contributorFormat = jsonFormat2(Contribution)
implicit val timeout = Timeout(5 seconds)

def getPagedSource(url: MyURI) = {
PagedSource[JsArray, MyURI](url){ case nextPageUri =>

println(s"${Console.GREEN_B}PROCESSING NEXT PAGE ${nextPageUri}${Console.RESET}")

Http()
.singleRequest(HttpRequest(uri = s"${nextPageUri.url}?page=${nextPageUri.page}&per_page=$pageSize")
.withHeaders(GITHUB_HEADER, AUTH_HEADER))
.flatMap {
case httpResponse if httpResponse.status != StatusCodes.OK =>
//throw HttpResponseException(httpResponse.status)
Future.successful(PagedSource.Page(
Seq.empty[JsArray],
Some(MyURI(url.url, nextPageUri.page + 1)
)))

case httpResponse =>
Unmarshal(httpResponse)
.to[ByteString].map(_.utf8String.parseJson.convertTo[JsArray])
.map { response =>
PagedSource.Page(
Seq(response),
if (response.elements.isEmpty) None
else Some(MyURI(url.url, nextPageUri.page + 1))
)
}
.recover {
case ex =>
throw UnmarshalResponseException(ex.getMessage)
}
}
.recover {
case ex: HttpResponseException => throw ex
case ex: UnmarshalResponseException => throw ex
case ex => throw HttpConnectionException(ex.getMessage)
}
}
}

val org = "spray"
val repos = getPagedSource(MyURI(s"https://api.github.com/orgs/${org}/repos", page = 1))

def lookupContributors(repos: Seq[Repository]) = {
Source(repos)
.flatMapConcat(repo => getPagedSource(MyURI(s"https://api.github.com/repos/${repo.full_name}/contributors", page = 1))
.map(_.convertTo[Seq[Contribution]]))
}

val future =
repos.map(_.convertTo[Seq[Repository]])
.flatMapMerge(10, x => lookupContributors(x))
.throttle(5000, 1 hour)
.runWith(Sink.seq[Seq[Contribution]])

future.onComplete {
case Success(value) =>

// Flatten all contributions grabbed, group by user and then sort in reversing order
val contributions = value.flatten.groupBy(_.login).map{case (user, list) => user -> list.map(_.contributions).sum}.toSeq.sortBy(_._2).reverse

println(contributions)

actorSystem.terminate()
implicit val consumer = new DefaultGithubAPIConsumer()
implicit val service = new DefaultGithubAPIService(pageSize = 100)
val server = new Server(port = 8080)

case Failure(ex) =>
//ex.printStackTrace()
actorSystem.terminate()
}
server.run()

}

Expand Down
Loading