Skip to content

Commit

Permalink
Bump Akka HTTP 10.2.0 (#893)
Browse files Browse the repository at this point in the history
* Bump Akka HTTP 10.2.0. Introduce PKI convenience class for k8s PEMs

* Apply suggestions from code review

Co-authored-by: Johannes Rudolph <[email protected]>

* Inline variable

* Makes PemManagersProvider private

* scalafmt

* scalafmt

* Remove unused imports

* Fix matcher exhaustivty and deprecetaions on akka-management

* Only fail-on-warnings wof rscala 2.12

* Fix unidoc issues

* Fix tests

* scalafmt

* Disable MiMa using the correct syntax

* Align maven version

* Remove unused import

* Fix: Lease k8s APIImpl supports non-TLS

Co-authored-by: Johannes Rudolph <[email protected]>
Co-authored-by: Arnout Engelen <[email protected]>
  • Loading branch information
3 people authored May 4, 2021
1 parent 07d8486 commit 1e83485
Show file tree
Hide file tree
Showing 27 changed files with 309 additions and 148 deletions.
20 changes: 14 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ lazy val `akka-management-root` = project
`akka-discovery-kubernetes-api`,
`akka-discovery-marathon-api`,
`akka-management`,
`akka-management-pki`,
`loglevels-logback`,
`integration-test-aws-api-ec2-tag-based`,
`integration-test-local`,
Expand Down Expand Up @@ -51,6 +52,7 @@ lazy val `akka-discovery-kubernetes-api` = project
libraryDependencies := Dependencies.DiscoveryKubernetesApi,
mimaPreviousArtifactsSet
)
.dependsOn(`akka-management-pki`)

lazy val `akka-discovery-marathon-api` = project
.in(file("discovery-marathon-api"))
Expand Down Expand Up @@ -102,6 +104,16 @@ lazy val `akka-management` = project
mimaPreviousArtifactsSet
)

lazy val `akka-management-pki` = project
.in(file("management-pki"))
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-management-pki",
libraryDependencies := Dependencies.ManagementPki,
// Don't enable mima until 1.1.1
mimaPreviousArtifacts := Set.empty
)

lazy val `loglevels-logback` = project
.in(file("loglevels-logback"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down Expand Up @@ -144,6 +156,7 @@ lazy val `lease-kubernetes` = project
Defaults.itSettings
)
.configs(IntegrationTest)
.dependsOn(`akka-management-pki`)

lazy val `lease-kubernetes-int-test` = project
.in(file("lease-kubernetes-int-test"))
Expand Down Expand Up @@ -181,12 +194,7 @@ lazy val `integration-test-kubernetes-api` = project
whitesourceIgnore := true,
libraryDependencies := Dependencies.BootstrapDemos
)
.dependsOn(
`akka-management`,
`cluster-http`,
`cluster-bootstrap`,
`akka-discovery-kubernetes-api`
)
.dependsOn(`akka-management`, `cluster-http`, `cluster-bootstrap`, `akka-discovery-kubernetes-api`)

lazy val `integration-test-kubernetes-api-java` = project
.in(file("integration-test/kubernetes-api-java"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,29 @@ package akka.management.cluster.bootstrap.contactpoint

import java.net.InetAddress
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ BiConsumer, BiFunction }
import java.util.function.BiConsumer
import java.util.function.BiFunction

import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp }
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.discovery.{ Lookup, MockDiscovery }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.discovery.Lookup
import akka.discovery.MockDiscovery
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.RouteResult
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.testkit.{ SocketUtil, TestKit, TestProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import akka.testkit.SocketUtil
import akka.testkit.TestKit
import akka.testkit.TestProbe
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalactic.Tolerance
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.Future
import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down Expand Up @@ -154,13 +160,12 @@ class ClusterBootstrapDiscoveryBackoffIntegrationSpec

"start listening with the http contact-points on 2 systems" in {
def start(system: ActorSystem, contactPointPort: Int) = {
import system.dispatcher
implicit val sys = system

val bootstrap: ClusterBootstrap = ClusterBootstrap(system)
val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes
bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort")
Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort)
Http().newServerAt("127.0.0.1", contactPointPort).bind(routes)
}

start(systemA, contactPointPorts("A")).futureValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

package akka.management.cluster.bootstrap.contactpoint

import akka.actor.{ ActorSystem, Address }
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.actor.Address
import akka.cluster.Cluster
import akka.discovery.MockDiscovery
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.RouteResult
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.remote.RARP
import akka.testkit.{ SocketUtil, TestKit }
import akka.testkit.SocketUtil
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll

import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down Expand Up @@ -107,13 +108,12 @@ class ClusterBootstrapExistingSeedNodesSpec(system: ActorSystem)

"start listening with the http contact-points on all systems" in {
def start(system: ActorSystem, contactPointPort: Int) = {
import system.dispatcher
implicit val sys: ActorSystem = system

val bootstrap = ClusterBootstrap(system)
val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes
bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort")
Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort)
Http().newServerAt("127.0.0.1", contactPointPort).bind(routes)
}

start(systemA, contactPointPorts("A"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ package akka.management.cluster.bootstrap.contactpoint

import java.net.InetAddress

import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp }
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.discovery.{ Lookup, MockDiscovery }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.discovery.Lookup
import akka.discovery.MockDiscovery
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.RouteResult
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.testkit.{ SocketUtil, TestKit, TestProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.testkit.SocketUtil
import akka.testkit.TestKit
import akka.testkit.TestProbe
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down Expand Up @@ -125,13 +131,12 @@ class ClusterBootstrapIntegrationSpec extends AnyWordSpecLike with Matchers {

"start listening with the http contact-points on 3 systems" in {
def start(system: ActorSystem, contactPointPort: Int) = {
import system.dispatcher
implicit val sys = system

val bootstrap = ClusterBootstrap(system)
val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes
bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort")
Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort)
Http().newServerAt("127.0.0.1", contactPointPort).bind(routes)
}

start(systemA, contactPointPorts("A"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ package akka.management.cluster.bootstrap.contactpoint

import java.net.InetAddress

import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp }
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.discovery.{ Lookup, MockDiscovery }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.discovery.Lookup
import akka.discovery.MockDiscovery
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.RouteResult
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.testkit.{ SocketUtil, TestKit, TestProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.testkit.SocketUtil
import akka.testkit.TestKit
import akka.testkit.TestProbe
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down Expand Up @@ -133,13 +139,12 @@ class ClusterBootstrapRetryUnreachableContactPointIntegrationSpec extends AnyWor

"start listening with the http contact-points on 3 systems" in {
def start(system: ActorSystem, contactPointPort: Int) = {
import system.dispatcher
implicit val sys = system

val bootstrap = ClusterBootstrap(system)
val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes
bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort")
Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort)
Http().newServerAt("127.0.0.1", contactPointPort).bind(routes)
}

start(systemA, contactPointPorts("A"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.http.scaladsl.model.sse.ServerSentEvent
import spray.json.{ DefaultJsonProtocol, JsArray, JsNumber, JsObject, JsString, JsValue }

/**
* Encodes a supplied [[ClusterEvent.ClusterDomainEvent]] into a [[ServerSentEvent]].
* Encodes a supplied `ClusterEvent.ClusterDomainEvent` into a `ServerSentEvent`.
*/
object ClusterDomainEventServerSentEventEncoder extends SprayJsonSupport with DefaultJsonProtocol {
def encode(event: ClusterEvent.ClusterDomainEvent): Option[ServerSentEvent] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import akka.util.Timeout

import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.control.NonFatal

object ClusterHttpManagementRoutes extends ClusterHttpManagementJsonProtocol {
import ClusterHttpManagementHelper._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ class ClusterHttpManagementRoutesSpec

val clusterHttpManagement = ClusterHttpManagementRouteProvider(system)
val settings = ManagementRouteProviderSettings(selfBaseUri = "http://127.0.0.1:20100", readOnly = false)
val binding = Http().bindAndHandle(clusterHttpManagement.routes(settings), "127.0.0.1", 20100).futureValue
val binding = Http().newServerAt("127.0.0.1", 20100).bind(clusterHttpManagement.routes(settings)).futureValue

val responseGetShardDetails =
Http().singleRequest(HttpRequest(uri = s"http://127.0.0.1:20100/cluster/shards/$name")).futureValue(t)
Expand Down Expand Up @@ -450,7 +450,7 @@ class ClusterHttpManagementRoutesSpec

val clusterHttpManagement = ClusterHttpManagementRouteProvider(system)
val settings = ManagementRouteProviderSettings(selfBaseUri = "http://127.0.0.1:20100", readOnly = false)
val binding = Http().bindAndHandle(clusterHttpManagement.routes(settings), "127.0.0.1", 20100).futureValue
val binding = Http().newServerAt("127.0.0.1", 20100).bind(clusterHttpManagement.routes(settings)).futureValue

val responseGetDomainEvents =
Http()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class MultiDcSpec

try {
Http()
.bindAndHandle(ClusterHttpManagementRouteProvider(dcASystem).routes(routeSettings), "127.0.0.1", httpPortA)
.newServerAt("127.0.0.1", httpPortA)
.bind(ClusterHttpManagementRouteProvider(dcASystem).routes(routeSettings))
.futureValue

eventually {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,36 @@
package akka.discovery.kubernetes

import java.net.InetAddress
import java.nio.file.{ Files, Paths }

import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.discovery._
import akka.http.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
import akka.http.scaladsl.unmarshalling.Unmarshal
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import com.typesafe.sslconfig.ssl.TrustStoreConfig
import java.nio.file.Files
import java.nio.file.Paths
import java.security.KeyStore
import java.security.SecureRandom

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import JsonFormat._
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal

import scala.util.control.{ NoStackTrace, NonFatal }
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.discovery._
import akka.discovery.kubernetes.JsonFormat._
import akka.event.Logging
import akka.http.scaladsl.HttpsConnectionContext
import akka.http.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Authorization
import akka.http.scaladsl.model.headers.OAuth2BearerToken
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.pki.kubernetes.PemManagersProvider
import javax.net.ssl.KeyManager
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManager

object KubernetesApiServiceDiscovery {

Expand Down Expand Up @@ -92,14 +101,23 @@ class KubernetesApiServiceDiscovery(implicit system: ActorSystem) extends Servic

private val log = Logging(system, getClass)

private val httpsTrustStoreConfig =
TrustStoreConfig(data = None, filePath = Some(settings.apiCaPath)).withStoreType("PEM")

private val httpsConfig =
AkkaSSLConfig()(system).mapSettings(s =>
s.withTrustManagerConfig(s.trustManagerConfig.withTrustStoreConfigs(Seq(httpsTrustStoreConfig))))
private val sslContext = {
val certificate = PemManagersProvider.loadCertificate(settings.apiCaPath)

val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance("PKCS12")
keyStore.load(null)
factory.init(keyStore, Array.empty)
val km: Array[KeyManager] = factory.getKeyManagers
val tm: Array[TrustManager] =
PemManagersProvider.buildTrustManagers(certificate)
val random: SecureRandom = new SecureRandom
val sslContext = SSLContext.getInstance("TLSv1.2")
sslContext.init(km, tm, random)
sslContext
}

private val httpsContext = http.createClientHttpsContext(httpsConfig)
private val clientSslContext: HttpsConnectionContext = ConnectionContext.httpsClient(sslContext)

log.debug("Settings {}", settings)

Expand All @@ -118,7 +136,7 @@ class KubernetesApiServiceDiscovery(implicit system: ActorSystem) extends Servic
s"Unable to form request; check Kubernetes environment (expecting env vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})"
)

response <- http.singleRequest(request, httpsContext)
response <- http.singleRequest(request, clientSslContext)

entity <- response.entity.toStrict(resolveTimeout)

Expand Down
2 changes: 1 addition & 1 deletion integration-test/kubernetes-api-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<akka.version>2.6.14</akka.version>
<akka.http.version>10.1.11</akka.http.version>
<akka.http.version>10.2.0</akka.http.version>
<akka-management.version>1.0.5</akka-management.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
Expand Down
Loading

0 comments on commit 1e83485

Please sign in to comment.