diff --git a/.gitignore b/.gitignore index 5637bd4..b523b06 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ project/target null .bsp .idea +db +db.wal.0 \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..7ca8db0 --- /dev/null +++ b/README.md @@ -0,0 +1,33 @@ +#GITHUBRANK CHALLANGE BY @lucasrpb + +For the implementation of the task it was used Akka Streams because it is a functional and composable library who allows +us to express pretty complex computations using basic abstract components: Source (the input), Flow (the computation) and +Sink (the output). The library comes with very useful default components. Among those there is a one called throttle, +which was used in the implementation to overcome the GitHub API request rate limit :) + +The HTTP Etag cache specification supported by GitHub it was also implemented to help reduce the number of api request +counts. For the content storage MapDB, a popular Java collection implementation, was used as persistent storage. In memory +cache layer was also implemented to speed up things a little bit! + +**Be aware that for big organizations the time to get a response can be huge! This is due to the throttling!** + +To run the server you must first set a system variable called GH_TOKEN with the value as your github developer token! (this +is necessary because without it the request limit by hour is only 60!). On Windows it can be set on environment variables +section (unfortunately I was not able to make it work on Windows Terminal :/). On Linux and Mac you can do in the terminal: + +**export GH_TOKEN="my_token_goes_here"** + +With token set, you will be able to run the application doing: sbt run + +The server runs at port 8080. To check the rank of contributions for a certain repo you can use curl as follows (this is +an example): + +**curl -X GET localhost:8080/org/spray/contributors** + +Finally, for testing the solution: + +**sbt test** + +**One more thing: to stop the server, press any key :P** + +That's all, folks! \ No newline at end of file diff --git a/build.sbt b/build.sbt index 0029915..c2d68f3 100644 --- a/build.sbt +++ b/build.sbt @@ -1,3 +1,4 @@ + name := "githubrank" version := "0.1" @@ -17,10 +18,20 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test, "com.typesafe.akka" %% "akka-stream-contrib" % "0.11+3-08ccb218", -"com.typesafe.akka" %% "akka-stream-contrib" % "0.11+3-08ccb218" % Test, + "com.typesafe.akka" %% "akka-stream-contrib" % "0.11+3-08ccb218" % Test, + + "com.github.ben-manes.caffeine" % "caffeine" % "3.0.3", + "org.mapdb" % "mapdb" % "3.0.8", -"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion, + "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-stream" % AkkaVersion, "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, - "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion + "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion, + + "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion, + "com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion ) + +dependencyOverrides += "com.typesafe.akka" %% "akka-discovery" % "2.6.15" + +enablePlugins(AkkaGrpcPlugin) diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..f80b42c --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.0.0") \ No newline at end of file diff --git a/src/main/protobuf/client.proto b/src/main/protobuf/client.proto new file mode 100644 index 0000000..07461da --- /dev/null +++ b/src/main/protobuf/client.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +import "protocol.proto"; + +package io.scalac.githubrank.grpc; +//#options + +//#services +/*service CommanderClient { + rpc sendCommand (CommandRequest) returns (CommandResponse) {} + rpc read(ReadRequest) returns(ReadResponse) {} +}*/ +//#services \ No newline at end of file diff --git a/src/main/protobuf/protocol.proto b/src/main/protobuf/protocol.proto new file mode 100644 index 0000000..072419b --- /dev/null +++ b/src/main/protobuf/protocol.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package io.scalac.githubrank.grpc; + +message CacheItem { + string tag = 1; + string lastModified = 2; + bytes entity = 3; +} \ No newline at end of file diff --git a/src/main/scala/io/scalac/githubrank/GithubAPIConsumer.scala b/src/main/scala/io/scalac/githubrank/GithubAPIConsumer.scala new file mode 100644 index 0000000..978b77c --- /dev/null +++ b/src/main/scala/io/scalac/githubrank/GithubAPIConsumer.scala @@ -0,0 +1,9 @@ +package io.scalac.githubrank + +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import scala.concurrent.Future + +trait GithubAPIConsumer { + def request(req: HttpRequest): Future[HttpResponse] + def close(): Future[Unit] +} diff --git a/src/main/scala/io/scalac/githubrank/GithubAPIService.scala b/src/main/scala/io/scalac/githubrank/GithubAPIService.scala new file mode 100644 index 0000000..ea77d9b --- /dev/null +++ b/src/main/scala/io/scalac/githubrank/GithubAPIService.scala @@ -0,0 +1,9 @@ +package io.scalac.githubrank + +import spray.json.JsValue +import scala.concurrent.Future + +trait GithubAPIService { + def getContributors(org: String): Future[JsValue] + def close(): Future[Unit] +} diff --git a/src/main/scala/io/scalac/githubrank/HttpCache.scala b/src/main/scala/io/scalac/githubrank/HttpCache.scala index 63c3adc..7e222b7 100644 --- a/src/main/scala/io/scalac/githubrank/HttpCache.scala +++ b/src/main/scala/io/scalac/githubrank/HttpCache.scala @@ -1,51 +1,71 @@ package io.scalac.githubrank -import akka.actor.typed.scaladsl.AskPattern.Askable -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ActorRef, ActorSystem, Behavior} -import akka.http.scaladsl.model.ResponseEntity -import akka.util.Timeout +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, RemovalCause} +import com.google.protobuf.any.Any +import io.scalac.githubrank.grpc._ +import org.mapdb.{DBMaker, Serializer} +import org.slf4j.LoggerFactory -import scala.collection.concurrent.TrieMap -import scala.concurrent.Future +import java.util.concurrent.Executor +import scala.concurrent.ExecutionContext -object HttpCache { +/** + * This is an implementation of an in-memory and persistent cache for the Http ETag cache supported by the GitHub API + * When the GitHub API returns a response code 304 (Not Modified), the data is fetched from the local cache, thus not + * increasing the rate request limit of the GitHub API! :) + * @param MAX_ENTRIES Number of max entries in the cache + * @param ec + */ +class HttpCache(val MAX_ENTRIES: Int)(implicit val ec: ExecutionContext) { - sealed trait Command - protected case class GetItem(url: String, sender: ActorRef[Option[(String, String, ResponseEntity)]]) extends Command - protected case class PutItem(url: String, data: (String, String, ResponseEntity), sender: ActorRef[Boolean]) extends Command + protected val db = DBMaker.fileDB("db") + .transactionEnable() + .closeOnJvmShutdown() + .make() - def apply(): Behavior[Command] = Behaviors.setup { ctx => - val cache = TrieMap.empty[String, (String, String, ResponseEntity)] + val entitiesCollection = db.treeMap("responses_cache") + .keySerializer(Serializer.STRING) + .valueSerializer(Serializer.BYTE_ARRAY) + .createOrOpen() - Behaviors.receiveMessage { - case GetItem(url, sender) => - sender ! cache.get(url) - Behaviors.same + val logger = LoggerFactory.getLogger(this.getClass) - case PutItem(url, data, sender) => - cache.put(url, data) - sender ! true - Behaviors.same + val cache = Caffeine.newBuilder() + .maximumSize(MAX_ENTRIES) + .executor(ec.asInstanceOf[Executor]) + .removalListener((key: String, value: CacheItem, cause: RemovalCause) => { + if(cause.wasEvicted()) + logger.debug(s"REMOVING ${key} FROM CACHE $cause\n") + }) + .build[String, CacheItem]( + // This code loads the cached response for an URL (if not in memory) from the file database + new CacheLoader[String, CacheItem] { + override def load(key: String): CacheItem = entitiesCollection.get(key) match { + case res if res == null => null + case res => - case _ => Behaviors.same - } - } + val item = Any.parseFrom(res).unpack(CacheItem) - val system = ActorSystem.create[Command](apply(), "GithubCache") - implicit val sc = system.scheduler - implicit val ec = system.executionContext + logger.info(s"reading from db: ${item}") - def get(url: String)(implicit timeout: Timeout): Future[Option[(String, String, ResponseEntity)]] = { - system.ask[Option[(String, String, ResponseEntity)]] { sender => - GetItem(url, sender) - } + item + } + }) + + def put(key: String, value: CacheItem): Unit = synchronized { + cache.put(key, value) + entitiesCollection.put(key, Any.pack(value).toByteArray) } - def put(url: String, data: (String, String, ResponseEntity))(implicit timeout: Timeout): Future[Boolean] = { - system.ask[Boolean] { sender => - PutItem(url, data, sender) + def get(key: String): Option[CacheItem] = { + cache.get(key) match { + case res if res == null => None + case res => Some(res) } } + def close(): Unit = { + db.close() + } + } diff --git a/src/main/scala/io/scalac/githubrank/Main.scala b/src/main/scala/io/scalac/githubrank/Main.scala index bded89d..edb4def 100644 --- a/src/main/scala/io/scalac/githubrank/Main.scala +++ b/src/main/scala/io/scalac/githubrank/Main.scala @@ -1,140 +1,17 @@ package io.scalac.githubrank -import akka.actor.typed.{ActorRef, ActorSystem, Behavior} -import akka.actor.typed.scaladsl.Behaviors -import akka.http.scaladsl.Http -import org.slf4j.LoggerFactory -import akka.http.scaladsl.client.RequestBuilding._ -import akka.http.scaladsl.model.{HttpHeader, HttpRequest, ResponseEntity, StatusCode, StatusCodes} -import akka.http.scaladsl.model.HttpProtocols._ -import akka.http.scaladsl.model.MediaTypes._ -import akka.http.scaladsl.model.HttpCharsets._ -import akka.http.scaladsl.model.HttpMethods._ -import akka.http.scaladsl.model.headers.{Authorization, ETag, EntityTag, GenericHttpCredentials, RawHeader} -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.util.{ByteString, Timeout} -import spray.json._ -import DefaultJsonProtocol._ -import akka.{Done, NotUsed} -import akka.stream.contrib.PagedSource - -import scala.collection.concurrent.TrieMap -import scala.concurrent.{Await, Future} -import scala.concurrent.duration.{Duration, DurationInt} +import io.scalac.githubrank.impl.{DefaultGithubAPIConsumer, DefaultGithubAPIService} import scala.language.postfixOps -import scala.util.{Failure, Success, Try} object Main { - /*sealed trait Command - - case class Get(url: String, sender: ActorRef[ResponseEntity]) extends Command - - def rateLimitedAPIConsumer(): Behavior[Command] = Behaviors.setup { ctx => - - Behaviors.receive { - case _ => Behaviors.same - } - }*/ - def main(args: Array[String]): Unit = { - val logger = LoggerFactory.getLogger(this.getClass) - - implicit val actorSystem = ActorSystem.create[Nothing](Behaviors.empty[Nothing], "alpakka-samples") - import actorSystem.executionContext - - val pageSize = 100 - - /*val httpRequest = HttpRequest(uri = "https://api.github.com/orgs/ScalaConsultants/repos") - .withHeaders(RawHeader("Accept", "application/vnd.github.v3+json"), - Authorization(GenericHttpCredentials("token", "")))*/ - - val GITHUB_HEADER = RawHeader("Accept", "application/vnd.github.v3+json") - val AUTH_HEADER = Authorization(GenericHttpCredentials("token", scala.sys.env("GH_TOKEN"))) - - case class HttpResponseException(code: StatusCode) extends Throwable - case class UnmarshalResponseException(msg: String) extends Throwable - case class HttpConnectionException(msg: String) extends Throwable - - case class MyURI(url: String, page: Int) - - case class Repository(name: String, full_name: String) - case class Contribution(login: String, contributions: Int) - - implicit val repositoryFormat = jsonFormat2(Repository) - implicit val contributorFormat = jsonFormat2(Contribution) - implicit val timeout = Timeout(5 seconds) - - def getPagedSource(url: MyURI) = { - PagedSource[JsArray, MyURI](url){ case nextPageUri => - - println(s"${Console.GREEN_B}PROCESSING NEXT PAGE ${nextPageUri}${Console.RESET}") - - Http() - .singleRequest(HttpRequest(uri = s"${nextPageUri.url}?page=${nextPageUri.page}&per_page=$pageSize") - .withHeaders(GITHUB_HEADER, AUTH_HEADER)) - .flatMap { - case httpResponse if httpResponse.status != StatusCodes.OK => - //throw HttpResponseException(httpResponse.status) - Future.successful(PagedSource.Page( - Seq.empty[JsArray], - Some(MyURI(url.url, nextPageUri.page + 1) - ))) - - case httpResponse => - Unmarshal(httpResponse) - .to[ByteString].map(_.utf8String.parseJson.convertTo[JsArray]) - .map { response => - PagedSource.Page( - Seq(response), - if (response.elements.isEmpty) None - else Some(MyURI(url.url, nextPageUri.page + 1)) - ) - } - .recover { - case ex => - throw UnmarshalResponseException(ex.getMessage) - } - } - .recover { - case ex: HttpResponseException => throw ex - case ex: UnmarshalResponseException => throw ex - case ex => throw HttpConnectionException(ex.getMessage) - } - } - } - - val org = "spray" - val repos = getPagedSource(MyURI(s"https://api.github.com/orgs/${org}/repos", page = 1)) - - def lookupContributors(repos: Seq[Repository]) = { - Source(repos) - .flatMapConcat(repo => getPagedSource(MyURI(s"https://api.github.com/repos/${repo.full_name}/contributors", page = 1)) - .map(_.convertTo[Seq[Contribution]])) - } - - val future = - repos.map(_.convertTo[Seq[Repository]]) - .flatMapMerge(10, x => lookupContributors(x)) - .throttle(5000, 1 hour) - .runWith(Sink.seq[Seq[Contribution]]) - - future.onComplete { - case Success(value) => - - // Flatten all contributions grabbed, group by user and then sort in reversing order - val contributions = value.flatten.groupBy(_.login).map{case (user, list) => user -> list.map(_.contributions).sum}.toSeq.sortBy(_._2).reverse - - println(contributions) - - actorSystem.terminate() + implicit val consumer = new DefaultGithubAPIConsumer() + implicit val service = new DefaultGithubAPIService(pageSize = 100) + val server = new Server(port = 8080) - case Failure(ex) => - //ex.printStackTrace() - actorSystem.terminate() - } + server.run() } diff --git a/src/main/scala/io/scalac/githubrank/Server.scala b/src/main/scala/io/scalac/githubrank/Server.scala new file mode 100644 index 0000000..53a5f70 --- /dev/null +++ b/src/main/scala/io/scalac/githubrank/Server.scala @@ -0,0 +1,51 @@ +package io.scalac.githubrank + +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.Behaviors +import akka.http.scaladsl.Http +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ + +import scala.io.StdIn +import scala.language.postfixOps + +class Server(val port: Int = 8080)(implicit val service: GithubAPIService){ + + implicit val system = ActorSystem.create[Nothing](Behaviors.empty[Nothing], "GithubRank") + import system.executionContext + + val routes = get { + path("org" / Segment / "contributors") { org: String => + complete { + service.getContributors(org).map { json => + HttpResponse(entity = HttpEntity(ContentType(MediaTypes.`application/json`), json.compactPrint)) + } + } + } + } + + val bindingFuture = Http().newServerAt("localhost", port).bind(routes) + + system.log.info(s"${Console.GREEN_B}Server online at http://localhost:$port${Console.RESET}\n") + + def run(): Unit = { + + println(s"\n${Console.MAGENTA_B}PRESS ANY KEY TO STOP THE SERVER!${Console.RESET}\n") + + StdIn.readLine() + bindingFuture + .flatMap(_.unbind()) // trigger unbinding from the port + .onComplete { _ => + service.close().onComplete { + case _ => system.terminate() + } + } + } + + def close(): Unit = { + bindingFuture.onComplete { + case _ => system.terminate() + } + } + +} diff --git a/src/main/scala/io/scalac/githubrank/impl/DefaultGithubAPIConsumer.scala b/src/main/scala/io/scalac/githubrank/impl/DefaultGithubAPIConsumer.scala new file mode 100644 index 0000000..70c1c70 --- /dev/null +++ b/src/main/scala/io/scalac/githubrank/impl/DefaultGithubAPIConsumer.scala @@ -0,0 +1,24 @@ +package io.scalac.githubrank.impl + +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.Behaviors +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import io.scalac.githubrank.GithubAPIConsumer + +import scala.concurrent.Future + +class DefaultGithubAPIConsumer extends GithubAPIConsumer { + + implicit val system = ActorSystem.create[Nothing](Behaviors.empty[Nothing], "DefaultGithubAPIConsumer") + import system.executionContext + + override def request(req: HttpRequest): Future[HttpResponse] = { + Http().singleRequest(req) + } + + override def close(): Future[Unit] = { + system.terminate() + system.whenTerminated.map(_ => {}) + } +} diff --git a/src/main/scala/io/scalac/githubrank/impl/DefaultGithubAPIService.scala b/src/main/scala/io/scalac/githubrank/impl/DefaultGithubAPIService.scala new file mode 100644 index 0000000..0e7fa11 --- /dev/null +++ b/src/main/scala/io/scalac/githubrank/impl/DefaultGithubAPIService.scala @@ -0,0 +1,204 @@ +package io.scalac.githubrank.impl + +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.Behaviors +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.headers.{Authorization, GenericHttpCredentials, RawHeader} +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.contrib.PagedSource +import akka.stream.scaladsl.{Sink, Source} +import akka.util.{ByteString, Timeout} +import com.google.common.base.Charsets +import io.scalac.githubrank.{GithubAPIService, GithubAPIConsumer, HttpCache} +import io.scalac.githubrank.grpc.CacheItem +import spray.json.DefaultJsonProtocol._ +import spray.json._ + +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt +import scala.io.StdIn +import scala.language.postfixOps +import scala.concurrent.Future + +class DefaultGithubAPIService(val pageSize: Int)(implicit val apiConsumer: GithubAPIConsumer) extends GithubAPIService { + + implicit val system = ActorSystem.create[Nothing](Behaviors.empty[Nothing], "GithubRankService") + import system.executionContext + + val GH_TOKEN = scala.sys.env("GH_TOKEN") + + if(GH_TOKEN == null || GH_TOKEN.isEmpty || GH_TOKEN.isBlank){ + throw new RuntimeException(s"GH_TOKEN environment variable must be set!") + } + + // Http Headers for + val GITHUB_HEADER = RawHeader("Accept", "application/vnd.github.v3+json") + val AUTH_HEADER = Authorization(GenericHttpCredentials("token", GH_TOKEN)) + + case class HttpResponseException(code: StatusCode) extends Throwable + case class UnmarshalResponseException(msg: String) extends Throwable + case class HttpConnectionException(msg: String) extends Throwable + + case class MyURI(url: String, page: Int) + + case class Repository(name: String, full_name: String) + case class Contribution(login: String, contributions: Int) + + implicit val repositoryFormat = jsonFormat2(Repository) + implicit val contributorFormat = jsonFormat2(Contribution) + implicit val timeout = Timeout(5 seconds) + + val cache = new HttpCache(1000) + + val logger = system.log + + /** + * This function checks the etag cache mechanism to reduce the cost of calls to GitHub api. If the response status from that + * API is 304 (Not Modified), the content is fetched from the local cache + * + * * @param response The http response from the request + * @param url The next page URL being processed + * @param cacheItem The cached item if any + * @return returns a Source Graph + */ + def checkETag(response: HttpResponse, url: MyURI, cacheItem: Option[CacheItem]): Future[PagedSource.Page[JsArray, MyURI]] = { + response.status match { + case s if s == StatusCodes.OK || s == StatusCodes.NotModified => + + val optEtag = response.getHeader("ETag") + val optLastModified = response.getHeader("Last-Modified") + + if(s == StatusCodes.NotModified){ + logger.info(s"${Console.MAGENTA_B} etag for ${url}: $optEtag with status ${response.status}${Console.RESET}") + } + + if(optEtag.isEmpty){ + Unmarshal(response) + .to[ByteString].map(_.utf8String.parseJson.convertTo[JsArray]) + .map { response => + PagedSource.Page( + Seq(response), + if (response.elements.isEmpty) None + else Some(MyURI(url.url, url.page + 1)) + ) + } + .recover { + case ex => + throw UnmarshalResponseException(ex.getMessage) + } + } else if(response.status == StatusCodes.NotModified) { + + val entity = ByteString(cacheItem.get.entity.toByteArray).utf8String.parseJson.convertTo[JsArray] + + Future.successful( + PagedSource.Page( + Seq(entity), + if (entity.elements.isEmpty) None + else Some(MyURI(url.url, url.page + 1)) + ) + ) + } else { + + val tag = optEtag.get().value() + Unmarshal(response.entity).to[ByteString].map { bs => + val entity = bs.utf8String.parseJson.convertTo[JsArray] + + cache.put(s"${url.url}?page=${url.page}&per_page=${pageSize}", CacheItem(tag, if(optLastModified.isPresent) optLastModified.get().value() else "", + com.google.protobuf.ByteString.copyFrom(bs.utf8String.getBytes(Charsets.UTF_8)))) + + PagedSource.Page( + Seq(entity), + if (entity.elements.isEmpty) None + else Some(MyURI(url.url, url.page + 1)) + ) + } + } + + case _ => + Future.successful( + PagedSource.Page( + Seq.empty[JsArray], + Some(MyURI(url.url, url.page + 1) + )) + ) + } + } + + /** + * Generates a paged source that fetches pages from the GitHub API supporting backpressure to not consume all the request + * quota of 5000 req/s + * @param url + * @return + */ + def getPagedSource(url: MyURI) = { + PagedSource[JsArray, MyURI](url){ case nextPageUri => + + logger.info(s"${Console.GREEN_B}PROCESSING NEXT PAGE ${nextPageUri}${Console.RESET}") + + val uri = s"${nextPageUri.url}?page=${nextPageUri.page}&per_page=${pageSize}" + val cacheItem = cache.get(uri) + + var headers = Seq(GITHUB_HEADER, AUTH_HEADER) + + if(cacheItem.isDefined){ + headers = headers ++ Seq(RawHeader("If-None-Match", cacheItem.get.tag), RawHeader("If-Modified-Since", cacheItem.get.lastModified)) + logger.info(s"${RawHeader("If-None-Match", cacheItem.get.tag)}\n") + } + + val request = HttpRequest(uri = uri).withHeaders(headers) + + apiConsumer.request(request) + .flatMap { response => + checkETag(response, nextPageUri, cacheItem) + } + .recover { + case ex: HttpResponseException => throw ex + case ex: UnmarshalResponseException => throw ex + case ex => throw HttpConnectionException(ex.getMessage) + } + } + } + + def lookupContributors(repos: Seq[Repository]) = { + Source(repos) + .flatMapConcat(repo => getPagedSource(MyURI(s"https://api.github.com/repos/${repo.full_name}/contributors", page = 1)) + .map(_.convertTo[Seq[Contribution]])) + } + + def getContributors(org: String): Future[JsValue] = { + val repos = getPagedSource(MyURI(s"https://api.github.com/orgs/${org}/repos", page = 1)) + + val graph = + repos.map(_.convertTo[Seq[Repository]]) + .flatMapMerge(10, x => lookupContributors(x)) + // Backpressure... + .throttle(5000, 1 hour) + + graph.runWith(Sink.seq[Seq[Contribution]]).map { value => + // Flatten all contributions grabbed, group by user and then sort in reversing order + val contributions = value.flatten.groupBy(_.login).map{case (user, list) => user -> list.map(_.contributions).sum}.toSeq.sortBy(_._2).reverse + + JsArray.apply( + contributions.map { case (user, n) => + JsObject( + "name" -> JsString(user), + "contributions" -> JsNumber(n) + ) + }: _* + ) + } + } + + override def close(): Future[Unit] = { + system.terminate() + + for { + _ <- apiConsumer.close() + _ <- Future {cache.close()} + } yield { + system.whenTerminated.map(_ => {}) + } + } +} diff --git a/src/test/scala/io/scalac/githubrank/CorrectnessSpec.scala b/src/test/scala/io/scalac/githubrank/CorrectnessSpec.scala new file mode 100644 index 0000000..1aebdee --- /dev/null +++ b/src/test/scala/io/scalac/githubrank/CorrectnessSpec.scala @@ -0,0 +1,71 @@ +package io.scalac.githubrank + +import akka.http.scaladsl.model.{ContentTypes, StatusCodes} +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import akka.http.scaladsl.server._ +import Directives._ +import akka.http.scaladsl.client.RequestBuilding._ +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.testkit.TestDuration +import akka.util.ByteString +import io.scalac.githubrank.impl.DefaultGithubAPIService +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import org.slf4j.LoggerFactory +import spray.json.JsArray +import spray.json._ +import spray.json.DefaultJsonProtocol._ + +import scala.concurrent.duration.DurationInt + +/** + * This test mocks GitHub api to asses the correctness of the solution implemented + */ +class CorrectnessSpec extends AnyFlatSpec with Matchers with ScalatestRouteTest { + + val logger = LoggerFactory.getLogger(this.getClass) + + implicit val timeout = RouteTestTimeout(5.minutes.dilated) + + "it " should "match response from mocked API" in { + + implicit val consumer = FakeGitgubAPIConsumer + implicit val service = new DefaultGithubAPIService(FakeGitgubAPIConsumer.pageSize) + val server = new Server(port = 8080) + + //server.run() + + val routes = server.routes + + Get("/org/fakeorg/contributors") ~> Route.seal(routes) ~> check { + + status shouldEqual StatusCodes.OK + contentType shouldEqual ContentTypes.`application/json` + + Unmarshal(response) + .to[ByteString].map(_.utf8String.parseJson.convertTo[JsArray]).map { result => + + val elements = result.elements + + logger.info(s"${Console.BLUE_B}result from server: ${result.toJson.prettyPrint}${Console.RESET}") + logger.info(s"${Console.RED_B}result must be: ${result.toJson.prettyPrint}${Console.RESET}") + + assert(FakeGitgubAPIConsumer.contribsByUser.size == elements.size) + + for(i<-0 until elements.length){ + val obj1 = elements(i).asJsObject.fields + val (user, n) = FakeGitgubAPIConsumer.contribsByUser(i) + + assert(user.compareTo(obj1("login").asInstanceOf[JsString].value) == 0) + assert(n == obj1("contributions").asInstanceOf[JsNumber].value.toInt) + } + + } + } + + server.close() + + } + +} diff --git a/src/test/scala/io/scalac/githubrank/FakeGitgubAPIConsumer.scala b/src/test/scala/io/scalac/githubrank/FakeGitgubAPIConsumer.scala new file mode 100644 index 0000000..96976b8 --- /dev/null +++ b/src/test/scala/io/scalac/githubrank/FakeGitgubAPIConsumer.scala @@ -0,0 +1,95 @@ +package io.scalac.githubrank +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.Behaviors +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.Uri.Path.Segment +import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader, HttpRequest, HttpResponse, StatusCodes} +import akka.http.scaladsl.server.PathMatcher +import io.grpc.netty.shaded.io.netty.util.internal.ThreadLocalRandom +import spray.json.{JsArray, JsNumber, JsObject, JsString} + +import scala.concurrent.Future + +object FakeGitgubAPIConsumer extends GithubAPIConsumer { + + val rand = ThreadLocalRandom.current() + + val pageSize = 2 + val numOfRepos = 2//rand.nextInt(1, 6) + + val maxNumRepoPages = if(numOfRepos % pageSize > 0) numOfRepos/pageSize + 1 else numOfRepos/pageSize + + val repos = (0 until numOfRepos).map { i => + JsObject("name" -> JsString("fakeorg"), "full_name" -> JsString(s"repo-$i")) + } + + val contributers = repos.map { fullName => + fullName.fields("full_name").asInstanceOf[JsString].value -> (0 until rand.nextInt(1, 10)).map { j => + JsObject("login" -> JsString(s"user-$j"), "contributions" -> JsNumber(rand.nextInt(1, 1000))) + } + }.toMap + + val contribsByUser = contributers.values.flatten.map{obj => obj.fields("login").asInstanceOf[JsString].value -> obj.fields("contributions") + .asInstanceOf[JsNumber].value.toInt}.toSeq.sortBy(_._2).reverse + + implicit val system = ActorSystem.create[Nothing](Behaviors.empty[Nothing], "DefaultGithubAPIConsumer") + import system.executionContext + + val logger = system.log + + val EMPTY_RESPONSE = Future.successful { + HttpResponse( + StatusCodes.OK, + Seq.empty[HttpHeader], + HttpEntity(ContentTypes.`application/json`, JsArray.empty.toString()) + ) + } + + override def request(req: HttpRequest): Future[HttpResponse] = { + + val isRepo = req.uri.path.endsWith("repos") + val page = req.uri.queryString().get.split("&").map { pair => + val parts = pair.split("=") + parts(0) -> parts(1) + }.toMap.get("page").get.toInt + + if(isRepo){ + if(page > maxNumRepoPages){ + return EMPTY_RESPONSE + } + + return Future.successful { + HttpResponse( + StatusCodes.OK, + Seq.empty[HttpHeader], + HttpEntity(ContentTypes.`application/json`, JsArray.apply(repos: _*).toString()) + ) + } + } + + val repo = req.uri.path.toString().split("/")(2).trim + val numContribs = contributers("repo-0").length + + val maxNumContribs = if(numContribs % pageSize > 0) numContribs/pageSize + 1 else numContribs/pageSize + + if(page > maxNumContribs){ + return EMPTY_RESPONSE + } + + Future.successful { + HttpResponse( + StatusCodes.OK, + Seq.empty[HttpHeader], + HttpEntity.apply(ContentTypes.`application/json`, JsArray(contributers.head._2: _*).toString()) + ) + } + } + + override def close(): Future[Unit] = { + /*system.terminate() + system.whenTerminated.map(_ => {})*/ + + Future.successful {} + } +} diff --git a/src/test/scala/io/scalac/githubrank/MainSpec.scala b/src/test/scala/io/scalac/githubrank/MainSpec.scala deleted file mode 100644 index 95001de..0000000 --- a/src/test/scala/io/scalac/githubrank/MainSpec.scala +++ /dev/null @@ -1,30 +0,0 @@ -package io.scalac.githubrank - -import akka.Done -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.headers._ -import akka.http.scaladsl.model.{HttpRequest, MediaRanges} -import akka.stream.scaladsl._ -import spray.json._ -import DefaultJsonProtocol._ -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.util.ByteString -import org.scalatest.flatspec.AnyFlatSpec -import org.slf4j.LoggerFactory - -import scala.concurrent.Future -import scala.util.{Failure, Success} - -class MainSpec extends AnyFlatSpec { - - val logger = LoggerFactory.getLogger(this.getClass) - - "it " should "" in { - - - - } - -}