Skip to content

Commit

Permalink
server code rewritten to akka-streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq committed May 31, 2016
1 parent 9d6987a commit c9437d2
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 439 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "proxychain"

organization := "com.github.karasiq"

version := "2.0.1"
version := "2.0.2"

isSnapshot := version.value.endsWith("SNAPSHOT")

Expand All @@ -22,7 +22,7 @@ libraryDependencies ++= {
"com.typesafe.akka" %% "akka-http-experimental" % akkaV,
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
"com.github.karasiq" %% "cryptoutils" % "1.4.0",
"com.github.karasiq" %% "proxyutils" % "2.0.3",
"com.github.karasiq" %% "proxyutils" % "2.0.4",
"com.github.karasiq" %% "coffeescript" % "1.0"
)
}
Expand Down
52 changes: 43 additions & 9 deletions src/main/scala/com/karasiq/proxychain/app/Boot.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package com.karasiq.proxychain.app

import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.event.Logging
import akka.io.Tcp.Bind
import akka.io.{IO, Tcp}
import akka.stream.scaladsl.Tcp.IncomingConnection
import akka.stream.scaladsl.{Concat, Flow, GraphDSL, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, ClosedShape, scaladsl}
import akka.util.ByteString
import com.karasiq.fileutils.PathUtils._
import com.karasiq.proxy.server.{ProxyConnectionRequest, ProxyServerStage}
import com.karasiq.proxychain.AppConfig
import com.karasiq.proxychain.script.ScriptEngine
import com.typesafe.config.Config

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Failure, Success}

object Boot extends App {
val configFile: Config = AppConfig.externalConfig()
val actorSystem: ActorSystem = ActorSystem("ProxyChain", configFile.resolve())
implicit val actorSystem: ActorSystem = ActorSystem("ProxyChain", configFile.resolve())
implicit val actorMaterializer = ActorMaterializer()
import actorSystem.dispatcher

val cfg = configFile.getConfig("proxyChain")
val host = cfg.getString("host")
Expand All @@ -33,17 +37,47 @@ object Boot extends App {
}

// Start server
val connector = Connector(config)
def proxyChainConnect(tcpConn: IncomingConnection, request: ProxyConnectionRequest, connection: Flow[ByteString, ByteString, _]): Unit = {
connector.connect(request, tcpConn.remoteAddress)
.onComplete {
case Success((outConn, proxy))
val graph = RunnableGraph.fromGraph(GraphDSL.create(connection, proxy)(Keep.none) { implicit builder (connection, proxy)
import GraphDSL.Implicits._
val success = builder.add(Source.single(ProxyServerStage.successResponse(request)))
val toConnection = builder.add(Concat[ByteString]())
connection ~> proxy
success ~> toConnection
proxy ~> toConnection
toConnection ~> connection
ClosedShape
})
graph.run()

case Failure(exc)
val source = Source
.single(ProxyServerStage.failureResponse(request))
.concat(Source.failed(exc))
source.via(connection).runWith(Sink.cancelled)
}
}

val port = cfg.getInt("port")
if (port != 0) {
val server = actorSystem.actorOf(Server.props(config, tls = false), "proxychain-server")
IO(Tcp)(actorSystem).tell(Bind(server, new InetSocketAddress(host, port)), server)
scaladsl.Tcp().bind(host, port)
.runForeach(tcpConn tcpConn.handleWith(Flow.fromGraph(new ProxyServerStage)).foreach {
case (request, connection)
proxyChainConnect(tcpConn, request, connection)
})
}

val tlsPort = cfg.getInt("tls.port")
if (tlsPort != 0) {
val server = actorSystem.actorOf(Server.props(config, tls = true), "proxychain-tls-server")
IO(Tcp)(actorSystem).tell(Bind(server, new InetSocketAddress(host, tlsPort)), server)
scaladsl.Tcp().bind(host, tlsPort)
.runForeach(tcpConn tcpConn.handleWith(Flow.fromGraph(ProxyServerStage.withTls(AppConfig.tlsContext(server = true)))).foreach {
case (request, connection)
proxyChainConnect(tcpConn, request, connection)
})
}

Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
Expand Down
55 changes: 55 additions & 0 deletions src/main/scala/com/karasiq/proxychain/app/Connector.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.karasiq.proxychain.app

import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Tcp.OutgoingConnection
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString
import akka.{Done, NotUsed}
import com.karasiq.proxy.server.ProxyConnectionRequest
import com.karasiq.proxy.{ProxyChain, ProxyException}
import com.karasiq.proxychain.AppConfig

import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
import scala.language.postfixOps

private [app] object Connector {
def apply(cfg: AppConfig)(implicit as: ActorSystem, am: ActorMaterializer) = new Connector(cfg)
}

private[app] class Connector(cfg: AppConfig)(implicit actorSystem: ActorSystem, actorMaterializer: ActorMaterializer) {
import actorSystem.dispatcher
private val log = Logging(actorSystem, "Proxy")

def connect(request: ProxyConnectionRequest, clientAddress: InetSocketAddress): Future[(OutgoingConnection, Flow[ByteString, ByteString, NotUsed])] = {
val chains = cfg.proxyChainsFor(request.address)
log.debug("Trying connect to {} through chains: {}", request.address, chains)

val promise = Promise[(OutgoingConnection, Flow[ByteString, ByteString, NotUsed])]
val futures = chains.map { chain
val ((proxyInput, (connFuture, proxyFuture)), proxyOutput) = Source.asSubscriber[ByteString]
.initialTimeout(10 seconds)
.idleTimeout(5 minutes)
.viaMat(ProxyChain.connect(request.address, chain, Some(AppConfig.tlsContext())))(Keep.both)
.toMat(Sink.asPublisher[ByteString](fanout = false))(Keep.both)
.run()

(for (_ proxyFuture; proxyConnection connFuture) yield {
if (promise.trySuccess((proxyConnection, Flow.fromSinkAndSource(Sink.fromSubscriber(proxyInput), Source.fromPublisher(proxyOutput))))) {
if (chain.isEmpty) log.warning("Proxy chain not defined, direct connection to {} opened", request.address)
else log.info("Opened connection through proxy chain {} to {}", chain.mkString("[", " -> ", "]"), request.address)
}
Done
}).recover { case _ Done }
}

Future.sequence(futures).onComplete { completed
promise.tryFailure(new ProxyException(s"Connection to ${request.address} through chains $chains failed"))
}
promise.future
}
}
240 changes: 0 additions & 240 deletions src/main/scala/com/karasiq/proxychain/app/Handler.scala

This file was deleted.

Loading

0 comments on commit c9437d2

Please sign in to comment.