Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump Akka HTTP 10.2.0 #893

Merged
merged 16 commits into from
May 4, 2021
20 changes: 14 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ lazy val `akka-management-root` = project
`akka-discovery-kubernetes-api`,
`akka-discovery-marathon-api`,
`akka-management`,
`akka-management-pki`,
`loglevels-logback`,
`integration-test-aws-api-ec2-tag-based`,
`integration-test-local`,
Expand Down Expand Up @@ -51,6 +52,7 @@ lazy val `akka-discovery-kubernetes-api` = project
libraryDependencies := Dependencies.DiscoveryKubernetesApi,
mimaPreviousArtifactsSet
)
.dependsOn(`akka-management-pki`)

lazy val `akka-discovery-marathon-api` = project
.in(file("discovery-marathon-api"))
Expand Down Expand Up @@ -102,6 +104,16 @@ lazy val `akka-management` = project
mimaPreviousArtifactsSet
)

lazy val `akka-management-pki` = project
.in(file("management-pki"))
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-management-pki",
libraryDependencies := Dependencies.ManagementPki,
// Don't enable mima until 1.1.1
ignasi35 marked this conversation as resolved.
Show resolved Hide resolved
mimaPreviousArtifacts := Set.empty
)

lazy val `loglevels-logback` = project
.in(file("loglevels-logback"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down Expand Up @@ -144,6 +156,7 @@ lazy val `lease-kubernetes` = project
Defaults.itSettings
)
.configs(IntegrationTest)
.dependsOn(`akka-management-pki`)

lazy val `lease-kubernetes-int-test` = project
.in(file("lease-kubernetes-int-test"))
Expand Down Expand Up @@ -181,12 +194,7 @@ lazy val `integration-test-kubernetes-api` = project
whitesourceIgnore := true,
libraryDependencies := Dependencies.BootstrapDemos
)
.dependsOn(
`akka-management`,
`cluster-http`,
`cluster-bootstrap`,
`akka-discovery-kubernetes-api`
)
.dependsOn(`akka-management`, `cluster-http`, `cluster-bootstrap`, `akka-discovery-kubernetes-api`)

lazy val `integration-test-kubernetes-api-java` = project
.in(file("integration-test/kubernetes-api-java"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,29 @@ package akka.management.cluster.bootstrap.contactpoint

import java.net.InetAddress
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ BiConsumer, BiFunction }
import java.util.function.BiConsumer
import java.util.function.BiFunction

import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp }
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.discovery.{ Lookup, MockDiscovery }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.discovery.Lookup
import akka.discovery.MockDiscovery
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.RouteResult
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.testkit.{ SocketUtil, TestKit, TestProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import akka.testkit.SocketUtil
import akka.testkit.TestKit
import akka.testkit.TestProbe
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalactic.Tolerance
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.Future
import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down Expand Up @@ -154,13 +160,12 @@ class ClusterBootstrapDiscoveryBackoffIntegrationSpec

"start listening with the http contact-points on 2 systems" in {
def start(system: ActorSystem, contactPointPort: Int) = {
import system.dispatcher
implicit val sys = system

val bootstrap: ClusterBootstrap = ClusterBootstrap(system)
val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes
bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort")
Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort)
Http().newServerAt("127.0.0.1", contactPointPort).bind(routes)
}

start(systemA, contactPointPorts("A")).futureValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

package akka.management.cluster.bootstrap.contactpoint

import akka.actor.{ ActorSystem, Address }
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.actor.Address
import akka.cluster.Cluster
import akka.discovery.MockDiscovery
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.RouteResult
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.remote.RARP
import akka.testkit.{ SocketUtil, TestKit }
import akka.testkit.SocketUtil
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll

import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down Expand Up @@ -107,13 +108,12 @@ class ClusterBootstrapExistingSeedNodesSpec(system: ActorSystem)

"start listening with the http contact-points on all systems" in {
def start(system: ActorSystem, contactPointPort: Int) = {
import system.dispatcher
implicit val sys: ActorSystem = system

val bootstrap = ClusterBootstrap(system)
val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes
bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort")
Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort)
Http().newServerAt("127.0.0.1", contactPointPort).bind(routes)
}

start(systemA, contactPointPorts("A"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ package akka.management.cluster.bootstrap.contactpoint

import java.net.InetAddress

import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp }
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.discovery.{ Lookup, MockDiscovery }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.discovery.Lookup
import akka.discovery.MockDiscovery
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.RouteResult
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.testkit.{ SocketUtil, TestKit, TestProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.testkit.SocketUtil
import akka.testkit.TestKit
import akka.testkit.TestProbe
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down Expand Up @@ -125,13 +131,12 @@ class ClusterBootstrapIntegrationSpec extends AnyWordSpecLike with Matchers {

"start listening with the http contact-points on 3 systems" in {
def start(system: ActorSystem, contactPointPort: Int) = {
import system.dispatcher
implicit val sys = system

val bootstrap = ClusterBootstrap(system)
val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes
bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort")
Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort)
Http().newServerAt("127.0.0.1", contactPointPort).bind(routes)
}

start(systemA, contactPointPorts("A"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ package akka.management.cluster.bootstrap.contactpoint

import java.net.InetAddress

import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp }
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.discovery.{ Lookup, MockDiscovery }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.discovery.Lookup
import akka.discovery.MockDiscovery
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.RouteResult
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.testkit.{ SocketUtil, TestKit, TestProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.testkit.SocketUtil
import akka.testkit.TestKit
import akka.testkit.TestProbe
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down Expand Up @@ -133,13 +139,12 @@ class ClusterBootstrapRetryUnreachableContactPointIntegrationSpec extends AnyWor

"start listening with the http contact-points on 3 systems" in {
def start(system: ActorSystem, contactPointPort: Int) = {
import system.dispatcher
implicit val sys = system

val bootstrap = ClusterBootstrap(system)
val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes
bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort")
Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort)
Http().newServerAt("127.0.0.1", contactPointPort).bind(routes)
}

start(systemA, contactPointPorts("A"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.http.scaladsl.model.sse.ServerSentEvent
import spray.json.{ DefaultJsonProtocol, JsArray, JsNumber, JsObject, JsString, JsValue }

/**
* Encodes a supplied [[ClusterEvent.ClusterDomainEvent]] into a [[ServerSentEvent]].
* Encodes a supplied `ClusterEvent.ClusterDomainEvent` into a `ServerSentEvent`.
*/
object ClusterDomainEventServerSentEventEncoder extends SprayJsonSupport with DefaultJsonProtocol {
def encode(event: ClusterEvent.ClusterDomainEvent): Option[ServerSentEvent] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import akka.util.Timeout

import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.control.NonFatal

object ClusterHttpManagementRoutes extends ClusterHttpManagementJsonProtocol {
import ClusterHttpManagementHelper._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ class ClusterHttpManagementRoutesSpec

val clusterHttpManagement = ClusterHttpManagementRouteProvider(system)
val settings = ManagementRouteProviderSettings(selfBaseUri = "http://127.0.0.1:20100", readOnly = false)
val binding = Http().bindAndHandle(clusterHttpManagement.routes(settings), "127.0.0.1", 20100).futureValue
val binding = Http().newServerAt("127.0.0.1", 20100).bind(clusterHttpManagement.routes(settings)).futureValue

val responseGetShardDetails =
Http().singleRequest(HttpRequest(uri = s"http://127.0.0.1:20100/cluster/shards/$name")).futureValue(t)
Expand Down Expand Up @@ -450,7 +450,7 @@ class ClusterHttpManagementRoutesSpec

val clusterHttpManagement = ClusterHttpManagementRouteProvider(system)
val settings = ManagementRouteProviderSettings(selfBaseUri = "http://127.0.0.1:20100", readOnly = false)
val binding = Http().bindAndHandle(clusterHttpManagement.routes(settings), "127.0.0.1", 20100).futureValue
val binding = Http().newServerAt("127.0.0.1", 20100).bind(clusterHttpManagement.routes(settings)).futureValue

val responseGetDomainEvents =
Http()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class MultiDcSpec

try {
Http()
.bindAndHandle(ClusterHttpManagementRouteProvider(dcASystem).routes(routeSettings), "127.0.0.1", httpPortA)
.newServerAt("127.0.0.1", httpPortA)
.bind(ClusterHttpManagementRouteProvider(dcASystem).routes(routeSettings))
.futureValue

eventually {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,36 @@
package akka.discovery.kubernetes

import java.net.InetAddress
import java.nio.file.{ Files, Paths }

import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.discovery._
import akka.http.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
import akka.http.scaladsl.unmarshalling.Unmarshal
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import com.typesafe.sslconfig.ssl.TrustStoreConfig
import java.nio.file.Files
import java.nio.file.Paths
import java.security.KeyStore
import java.security.SecureRandom

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import JsonFormat._
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal

import scala.util.control.{ NoStackTrace, NonFatal }
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.discovery._
import akka.discovery.kubernetes.JsonFormat._
import akka.event.Logging
import akka.http.scaladsl.HttpsConnectionContext
import akka.http.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Authorization
import akka.http.scaladsl.model.headers.OAuth2BearerToken
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.pki.kubernetes.PemManagersProvider
import javax.net.ssl.KeyManager
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManager

object KubernetesApiServiceDiscovery {

Expand Down Expand Up @@ -92,14 +101,23 @@ class KubernetesApiServiceDiscovery(implicit system: ActorSystem) extends Servic

private val log = Logging(system, getClass)

private val httpsTrustStoreConfig =
TrustStoreConfig(data = None, filePath = Some(settings.apiCaPath)).withStoreType("PEM")

private val httpsConfig =
AkkaSSLConfig()(system).mapSettings(s =>
s.withTrustManagerConfig(s.trustManagerConfig.withTrustStoreConfigs(Seq(httpsTrustStoreConfig))))
private val sslContext = {
val certificate = PemManagersProvider.loadCertificate(settings.apiCaPath)

val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance("PKCS12")
keyStore.load(null)
factory.init(keyStore, Array.empty)
val km: Array[KeyManager] = factory.getKeyManagers
val tm: Array[TrustManager] =
PemManagersProvider.buildTrustManagers(certificate)
val random: SecureRandom = new SecureRandom
val sslContext = SSLContext.getInstance("TLSv1.2")
sslContext.init(km, tm, random)
sslContext
}

private val httpsContext = http.createClientHttpsContext(httpsConfig)
private val clientSslContext: HttpsConnectionContext = ConnectionContext.httpsClient(sslContext)

log.debug("Settings {}", settings)

Expand All @@ -118,7 +136,7 @@ class KubernetesApiServiceDiscovery(implicit system: ActorSystem) extends Servic
s"Unable to form request; check Kubernetes environment (expecting env vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})"
)

response <- http.singleRequest(request, httpsContext)
response <- http.singleRequest(request, clientSslContext)

entity <- response.entity.toStrict(resolveTimeout)

Expand Down
2 changes: 1 addition & 1 deletion integration-test/kubernetes-api-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<akka.version>2.6.14</akka.version>
<akka.http.version>10.1.11</akka.http.version>
<akka.http.version>10.2.0</akka.http.version>
<akka-management.version>1.0.5</akka-management.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
Expand Down
Loading