diff --git a/build.sbt b/build.sbt index 7850be9da..7402f66a3 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ lazy val `akka-management-root` = project `akka-discovery-kubernetes-api`, `akka-discovery-marathon-api`, `akka-management`, + `akka-management-pki`, `loglevels-logback`, `integration-test-aws-api-ec2-tag-based`, `integration-test-local`, @@ -51,6 +52,7 @@ lazy val `akka-discovery-kubernetes-api` = project libraryDependencies := Dependencies.DiscoveryKubernetesApi, mimaPreviousArtifactsSet ) + .dependsOn(`akka-management-pki`) lazy val `akka-discovery-marathon-api` = project .in(file("discovery-marathon-api")) @@ -102,6 +104,16 @@ lazy val `akka-management` = project mimaPreviousArtifactsSet ) +lazy val `akka-management-pki` = project + .in(file("management-pki")) + .enablePlugins(AutomateHeaderPlugin) + .settings( + name := "akka-management-pki", + libraryDependencies := Dependencies.ManagementPki, + // Don't enable mima until 1.1.1 + mimaPreviousArtifacts := Set.empty + ) + lazy val `loglevels-logback` = project .in(file("loglevels-logback")) .enablePlugins(AutomateHeaderPlugin) @@ -144,6 +156,7 @@ lazy val `lease-kubernetes` = project Defaults.itSettings ) .configs(IntegrationTest) + .dependsOn(`akka-management-pki`) lazy val `lease-kubernetes-int-test` = project .in(file("lease-kubernetes-int-test")) @@ -181,12 +194,7 @@ lazy val `integration-test-kubernetes-api` = project whitesourceIgnore := true, libraryDependencies := Dependencies.BootstrapDemos ) - .dependsOn( - `akka-management`, - `cluster-http`, - `cluster-bootstrap`, - `akka-discovery-kubernetes-api` - ) + .dependsOn(`akka-management`, `cluster-http`, `cluster-bootstrap`, `akka-discovery-kubernetes-api`) lazy val `integration-test-kubernetes-api-java` = project .in(file("integration-test/kubernetes-api-java")) diff --git a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapDiscoveryBackoffIntegrationSpec.scala b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapDiscoveryBackoffIntegrationSpec.scala index 4a7deda27..7b396c2a5 100644 --- a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapDiscoveryBackoffIntegrationSpec.scala +++ b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapDiscoveryBackoffIntegrationSpec.scala @@ -6,23 +6,29 @@ package akka.management.cluster.bootstrap.contactpoint import java.net.InetAddress import java.util.concurrent.ConcurrentHashMap -import java.util.function.{ BiConsumer, BiFunction } +import java.util.function.BiConsumer +import java.util.function.BiFunction + +import scala.concurrent.Future +import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.cluster.Cluster -import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp } -import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } -import akka.discovery.{ Lookup, MockDiscovery } +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberUp +import akka.discovery.Lookup +import akka.discovery.MockDiscovery +import akka.discovery.ServiceDiscovery.Resolved +import akka.discovery.ServiceDiscovery.ResolvedTarget import akka.http.scaladsl.Http -import akka.http.scaladsl.server.RouteResult import akka.management.cluster.bootstrap.ClusterBootstrap -import akka.testkit.{ SocketUtil, TestKit, TestProbe } -import com.typesafe.config.{ Config, ConfigFactory } +import akka.testkit.SocketUtil +import akka.testkit.TestKit +import akka.testkit.TestProbe +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory import org.scalactic.Tolerance import org.scalatest.concurrent.ScalaFutures - -import scala.concurrent.Future -import scala.concurrent.duration._ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -154,13 +160,12 @@ class ClusterBootstrapDiscoveryBackoffIntegrationSpec "start listening with the http contact-points on 2 systems" in { def start(system: ActorSystem, contactPointPort: Int) = { - import system.dispatcher implicit val sys = system val bootstrap: ClusterBootstrap = ClusterBootstrap(system) val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort") - Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort) + Http().newServerAt("127.0.0.1", contactPointPort).bind(routes) } start(systemA, contactPointPorts("A")).futureValue diff --git a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapExistingSeedNodesSpec.scala b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapExistingSeedNodesSpec.scala index 5c642ba24..b20b3244e 100644 --- a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapExistingSeedNodesSpec.scala +++ b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapExistingSeedNodesSpec.scala @@ -4,19 +4,20 @@ package akka.management.cluster.bootstrap.contactpoint -import akka.actor.{ ActorSystem, Address } +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.actor.Address import akka.cluster.Cluster import akka.discovery.MockDiscovery import akka.event.Logging import akka.http.scaladsl.Http -import akka.http.scaladsl.server.RouteResult import akka.management.cluster.bootstrap.ClusterBootstrap import akka.remote.RARP -import akka.testkit.{ SocketUtil, TestKit } +import akka.testkit.SocketUtil +import akka.testkit.TestKit import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll - -import scala.concurrent.duration._ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -107,13 +108,12 @@ class ClusterBootstrapExistingSeedNodesSpec(system: ActorSystem) "start listening with the http contact-points on all systems" in { def start(system: ActorSystem, contactPointPort: Int) = { - import system.dispatcher implicit val sys: ActorSystem = system val bootstrap = ClusterBootstrap(system) val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort") - Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort) + Http().newServerAt("127.0.0.1", contactPointPort).bind(routes) } start(systemA, contactPointPorts("A")) diff --git a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapIntegrationSpec.scala b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapIntegrationSpec.scala index 8cf61c77a..0bf852e30 100644 --- a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapIntegrationSpec.scala +++ b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapIntegrationSpec.scala @@ -6,18 +6,24 @@ package akka.management.cluster.bootstrap.contactpoint import java.net.InetAddress +import scala.concurrent.Future +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.cluster.Cluster -import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp } -import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } -import akka.discovery.{ Lookup, MockDiscovery } +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberUp +import akka.discovery.Lookup +import akka.discovery.MockDiscovery +import akka.discovery.ServiceDiscovery.Resolved +import akka.discovery.ServiceDiscovery.ResolvedTarget import akka.http.scaladsl.Http -import akka.http.scaladsl.server.RouteResult import akka.management.cluster.bootstrap.ClusterBootstrap -import akka.testkit.{ SocketUtil, TestKit, TestProbe } -import com.typesafe.config.{ Config, ConfigFactory } -import scala.concurrent.Future -import scala.concurrent.duration._ +import akka.testkit.SocketUtil +import akka.testkit.TestKit +import akka.testkit.TestProbe +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -125,13 +131,12 @@ class ClusterBootstrapIntegrationSpec extends AnyWordSpecLike with Matchers { "start listening with the http contact-points on 3 systems" in { def start(system: ActorSystem, contactPointPort: Int) = { - import system.dispatcher implicit val sys = system val bootstrap = ClusterBootstrap(system) val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort") - Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort) + Http().newServerAt("127.0.0.1", contactPointPort).bind(routes) } start(systemA, contactPointPorts("A")) diff --git a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapRetryUnreachableContactPointIntegrationSpec.scala b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapRetryUnreachableContactPointIntegrationSpec.scala index 1180e4081..da8f4b3a2 100644 --- a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapRetryUnreachableContactPointIntegrationSpec.scala +++ b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapRetryUnreachableContactPointIntegrationSpec.scala @@ -6,18 +6,24 @@ package akka.management.cluster.bootstrap.contactpoint import java.net.InetAddress +import scala.concurrent.Future +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.cluster.Cluster -import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp } -import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } -import akka.discovery.{ Lookup, MockDiscovery } +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberUp +import akka.discovery.Lookup +import akka.discovery.MockDiscovery +import akka.discovery.ServiceDiscovery.Resolved +import akka.discovery.ServiceDiscovery.ResolvedTarget import akka.http.scaladsl.Http -import akka.http.scaladsl.server.RouteResult import akka.management.cluster.bootstrap.ClusterBootstrap -import akka.testkit.{ SocketUtil, TestKit, TestProbe } -import com.typesafe.config.{ Config, ConfigFactory } -import scala.concurrent.Future -import scala.concurrent.duration._ +import akka.testkit.SocketUtil +import akka.testkit.TestKit +import akka.testkit.TestProbe +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -133,13 +139,12 @@ class ClusterBootstrapRetryUnreachableContactPointIntegrationSpec extends AnyWor "start listening with the http contact-points on 3 systems" in { def start(system: ActorSystem, contactPointPort: Int) = { - import system.dispatcher implicit val sys = system val bootstrap = ClusterBootstrap(system) val routes = new HttpClusterBootstrapRoutes(bootstrap.settings).routes bootstrap.setSelfContactPoint(s"http://127.0.0.1:$contactPointPort") - Http().bindAndHandle(RouteResult.route2HandlerFlow(routes), "127.0.0.1", contactPointPort) + Http().newServerAt("127.0.0.1", contactPointPort).bind(routes) } start(systemA, contactPointPorts("A")) diff --git a/cluster-http/src/main/scala/akka/management/cluster/ClusterDomainEventServerSentEventEncoder.scala b/cluster-http/src/main/scala/akka/management/cluster/ClusterDomainEventServerSentEventEncoder.scala index 31a025b14..b8a8fc665 100644 --- a/cluster-http/src/main/scala/akka/management/cluster/ClusterDomainEventServerSentEventEncoder.scala +++ b/cluster-http/src/main/scala/akka/management/cluster/ClusterDomainEventServerSentEventEncoder.scala @@ -11,7 +11,7 @@ import akka.http.scaladsl.model.sse.ServerSentEvent import spray.json.{ DefaultJsonProtocol, JsArray, JsNumber, JsObject, JsString, JsValue } /** - * Encodes a supplied [[ClusterEvent.ClusterDomainEvent]] into a [[ServerSentEvent]]. + * Encodes a supplied `ClusterEvent.ClusterDomainEvent` into a `ServerSentEvent`. */ object ClusterDomainEventServerSentEventEncoder extends SprayJsonSupport with DefaultJsonProtocol { def encode(event: ClusterEvent.ClusterDomainEvent): Option[ServerSentEvent] = { diff --git a/cluster-http/src/main/scala/akka/management/cluster/scaladsl/ClusterHttpManagementRoutes.scala b/cluster-http/src/main/scala/akka/management/cluster/scaladsl/ClusterHttpManagementRoutes.scala index 2850a60fa..19b0e61e6 100644 --- a/cluster-http/src/main/scala/akka/management/cluster/scaladsl/ClusterHttpManagementRoutes.scala +++ b/cluster-http/src/main/scala/akka/management/cluster/scaladsl/ClusterHttpManagementRoutes.scala @@ -16,7 +16,6 @@ import akka.util.Timeout import scala.annotation.tailrec import scala.concurrent.duration._ -import scala.util.control.NonFatal object ClusterHttpManagementRoutes extends ClusterHttpManagementJsonProtocol { import ClusterHttpManagementHelper._ diff --git a/cluster-http/src/test/scala/akka/cluster/http/management/scaladsl/ClusterHttpManagementRoutesSpec.scala b/cluster-http/src/test/scala/akka/cluster/http/management/scaladsl/ClusterHttpManagementRoutesSpec.scala index 42ac1a0be..76895cb65 100644 --- a/cluster-http/src/test/scala/akka/cluster/http/management/scaladsl/ClusterHttpManagementRoutesSpec.scala +++ b/cluster-http/src/test/scala/akka/cluster/http/management/scaladsl/ClusterHttpManagementRoutesSpec.scala @@ -387,7 +387,7 @@ class ClusterHttpManagementRoutesSpec val clusterHttpManagement = ClusterHttpManagementRouteProvider(system) val settings = ManagementRouteProviderSettings(selfBaseUri = "http://127.0.0.1:20100", readOnly = false) - val binding = Http().bindAndHandle(clusterHttpManagement.routes(settings), "127.0.0.1", 20100).futureValue + val binding = Http().newServerAt("127.0.0.1", 20100).bind(clusterHttpManagement.routes(settings)).futureValue val responseGetShardDetails = Http().singleRequest(HttpRequest(uri = s"http://127.0.0.1:20100/cluster/shards/$name")).futureValue(t) @@ -450,7 +450,7 @@ class ClusterHttpManagementRoutesSpec val clusterHttpManagement = ClusterHttpManagementRouteProvider(system) val settings = ManagementRouteProviderSettings(selfBaseUri = "http://127.0.0.1:20100", readOnly = false) - val binding = Http().bindAndHandle(clusterHttpManagement.routes(settings), "127.0.0.1", 20100).futureValue + val binding = Http().newServerAt("127.0.0.1", 20100).bind(clusterHttpManagement.routes(settings)).futureValue val responseGetDomainEvents = Http() diff --git a/cluster-http/src/test/scala/akka/management/cluster/MultiDcSpec.scala b/cluster-http/src/test/scala/akka/management/cluster/MultiDcSpec.scala index 6ffd4b095..20eda0da4 100644 --- a/cluster-http/src/test/scala/akka/management/cluster/MultiDcSpec.scala +++ b/cluster-http/src/test/scala/akka/management/cluster/MultiDcSpec.scala @@ -64,7 +64,8 @@ class MultiDcSpec try { Http() - .bindAndHandle(ClusterHttpManagementRouteProvider(dcASystem).routes(routeSettings), "127.0.0.1", httpPortA) + .newServerAt("127.0.0.1", httpPortA) + .bind(ClusterHttpManagementRouteProvider(dcASystem).routes(routeSettings)) .futureValue eventually { diff --git a/discovery-kubernetes-api/src/main/scala/akka/discovery/kubernetes/KubernetesApiServiceDiscovery.scala b/discovery-kubernetes-api/src/main/scala/akka/discovery/kubernetes/KubernetesApiServiceDiscovery.scala index a26ea2fca..69cddc297 100644 --- a/discovery-kubernetes-api/src/main/scala/akka/discovery/kubernetes/KubernetesApiServiceDiscovery.scala +++ b/discovery-kubernetes-api/src/main/scala/akka/discovery/kubernetes/KubernetesApiServiceDiscovery.scala @@ -5,27 +5,36 @@ package akka.discovery.kubernetes import java.net.InetAddress -import java.nio.file.{ Files, Paths } - -import akka.actor.ActorSystem -import akka.annotation.InternalApi -import akka.discovery._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken } -import akka.http.scaladsl.unmarshalling.Unmarshal -import com.typesafe.sslconfig.akka.AkkaSSLConfig -import com.typesafe.sslconfig.ssl.TrustStoreConfig +import java.nio.file.Files +import java.nio.file.Paths +import java.security.KeyStore +import java.security.SecureRandom import scala.collection.immutable.Seq import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.util.Try -import JsonFormat._ -import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } +import scala.util.control.NoStackTrace +import scala.util.control.NonFatal -import scala.util.control.{ NoStackTrace, NonFatal } +import akka.actor.ActorSystem +import akka.annotation.InternalApi +import akka.discovery.ServiceDiscovery.Resolved +import akka.discovery.ServiceDiscovery.ResolvedTarget +import akka.discovery._ +import akka.discovery.kubernetes.JsonFormat._ import akka.event.Logging +import akka.http.scaladsl.HttpsConnectionContext +import akka.http.scaladsl._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.Authorization +import akka.http.scaladsl.model.headers.OAuth2BearerToken +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.pki.kubernetes.PemManagersProvider +import javax.net.ssl.KeyManager +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.SSLContext +import javax.net.ssl.TrustManager object KubernetesApiServiceDiscovery { @@ -92,14 +101,23 @@ class KubernetesApiServiceDiscovery(implicit system: ActorSystem) extends Servic private val log = Logging(system, getClass) - private val httpsTrustStoreConfig = - TrustStoreConfig(data = None, filePath = Some(settings.apiCaPath)).withStoreType("PEM") - - private val httpsConfig = - AkkaSSLConfig()(system).mapSettings(s => - s.withTrustManagerConfig(s.trustManagerConfig.withTrustStoreConfigs(Seq(httpsTrustStoreConfig)))) + private val sslContext = { + val certificate = PemManagersProvider.loadCertificate(settings.apiCaPath) + + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + val keyStore = KeyStore.getInstance("PKCS12") + keyStore.load(null) + factory.init(keyStore, Array.empty) + val km: Array[KeyManager] = factory.getKeyManagers + val tm: Array[TrustManager] = + PemManagersProvider.buildTrustManagers(certificate) + val random: SecureRandom = new SecureRandom + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(km, tm, random) + sslContext + } - private val httpsContext = http.createClientHttpsContext(httpsConfig) + private val clientSslContext: HttpsConnectionContext = ConnectionContext.httpsClient(sslContext) log.debug("Settings {}", settings) @@ -118,7 +136,7 @@ class KubernetesApiServiceDiscovery(implicit system: ActorSystem) extends Servic s"Unable to form request; check Kubernetes environment (expecting env vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})" ) - response <- http.singleRequest(request, httpsContext) + response <- http.singleRequest(request, clientSslContext) entity <- response.entity.toStrict(resolveTimeout) diff --git a/integration-test/kubernetes-api-java/pom.xml b/integration-test/kubernetes-api-java/pom.xml index b79fdd996..d63f2bc02 100644 --- a/integration-test/kubernetes-api-java/pom.xml +++ b/integration-test/kubernetes-api-java/pom.xml @@ -19,7 +19,7 @@ 1.8 UTF-8 2.6.14 - 10.1.11 + 10.2.0 1.0.5 2.12 diff --git a/integration-test/kubernetes-api/src/main/scala/akka/cluster/bootstrap/DemoApp.scala b/integration-test/kubernetes-api/src/main/scala/akka/cluster/bootstrap/DemoApp.scala index fc0e894a7..de67e7a3a 100644 --- a/integration-test/kubernetes-api/src/main/scala/akka/cluster/bootstrap/DemoApp.scala +++ b/integration-test/kubernetes-api/src/main/scala/akka/cluster/bootstrap/DemoApp.scala @@ -48,7 +48,7 @@ object DemoApp extends App { ) } } - Http().bindAndHandle(routes, "0.0.0.0", 8080) + Http().newServerAt("0.0.0.0", 8080).bind(routes) Cluster(system).registerOnMemberUp({ log.info("Cluster member is up!") diff --git a/integration-test/kubernetes-dns/src/main/scala/akka/cluster/bootstrap/ClusterApp.scala b/integration-test/kubernetes-dns/src/main/scala/akka/cluster/bootstrap/ClusterApp.scala index 9c6d07c22..2d3a2924e 100644 --- a/integration-test/kubernetes-dns/src/main/scala/akka/cluster/bootstrap/ClusterApp.scala +++ b/integration-test/kubernetes-dns/src/main/scala/akka/cluster/bootstrap/ClusterApp.scala @@ -19,7 +19,6 @@ object ClusterApp { def main(args: Array[String]): Unit = { implicit val system = ActorSystem() - implicit val executionContext = system.dispatcher val cluster = Cluster(system) system.log.info("Starting Akka Management") @@ -49,7 +48,7 @@ object ClusterApp { } } - Http().bindAndHandle(routes, "0.0.0.0", 8080) + Http().newServerAt("0.0.0.0", 8080).bind(routes) system.log.info( s"Server online at http://localhost:8080/\nPress RETURN to stop..." diff --git a/integration-test/marathon-api-docker/src/main/scala/akka/cluster/bootstrap/MarathonApiDockerDemoApp.scala b/integration-test/marathon-api-docker/src/main/scala/akka/cluster/bootstrap/MarathonApiDockerDemoApp.scala index 71b340e10..b45cf3977 100644 --- a/integration-test/marathon-api-docker/src/main/scala/akka/cluster/bootstrap/MarathonApiDockerDemoApp.scala +++ b/integration-test/marathon-api-docker/src/main/scala/akka/cluster/bootstrap/MarathonApiDockerDemoApp.scala @@ -37,8 +37,7 @@ object MarathonApiDockerDemoApp extends App { AkkaManagement(system).start() ClusterBootstrap(system).start() - Http().bindAndHandle( - route, - sys.env.get("HOST").getOrElse("127.0.0.1"), - sys.env.get("PORT_HTTP").map(_.toInt).getOrElse(8080)) + private val host: String = sys.env.get("HOST").getOrElse("127.0.0.1") + private val port: Int = sys.env.get("PORT_HTTP").map(_.toInt).getOrElse(8080) + Http().newServerAt(host, port).bind(route) } diff --git a/lease-kubernetes/src/it/scala/akka/coordination/lease/kubernetes/KubernetesApiIntegrationTest.scala b/lease-kubernetes/src/it/scala/akka/coordination/lease/kubernetes/KubernetesApiIntegrationTest.scala index 642d63fed..b49d6a27b 100644 --- a/lease-kubernetes/src/it/scala/akka/coordination/lease/kubernetes/KubernetesApiIntegrationTest.scala +++ b/lease-kubernetes/src/it/scala/akka/coordination/lease/kubernetes/KubernetesApiIntegrationTest.scala @@ -79,6 +79,7 @@ class KubernetesApiIntegrationTest extends TestKit(ActorSystem("KubernetesApiInt val leaseRecord = underTest.updateLeaseResource(leaseName, client1, currentVersion).futureValue val success: LeaseResource = leaseRecord match { case Right(lr) => lr + case Left(_) => fail("There shouldn't be anyone else updating the resource.") } success.version shouldNot equal(currentVersion) currentVersion = success.version @@ -90,6 +91,7 @@ class KubernetesApiIntegrationTest extends TestKit(ActorSystem("KubernetesApiInt val leaseRecord = underTest.updateLeaseResource(leaseName, client1, currentVersion, time = timeUpdate).futureValue val success: LeaseResource = leaseRecord match { case Right(lr) => lr + case Left(_) => fail("There shouldn't be anyone else updating the resource.") } success.version shouldNot equal(currentVersion) currentVersion = success.version @@ -101,6 +103,7 @@ class KubernetesApiIntegrationTest extends TestKit(ActorSystem("KubernetesApiInt val timeUpdate = System.currentTimeMillis() val leaseRecord = underTest.updateLeaseResource(leaseName, client1, "10", time = timeUpdate).futureValue val failure: LeaseResource = leaseRecord match { + case Right(_) => fail("Expected update failure (we've used an invalid version!).") case Left(lr) => lr } failure.version shouldEqual currentVersion @@ -113,6 +116,7 @@ class KubernetesApiIntegrationTest extends TestKit(ActorSystem("KubernetesApiInt val leaseRecord = underTest.updateLeaseResource(leaseName, "", currentVersion).futureValue val success: LeaseResource = leaseRecord match { case Right(lr) => lr + case Left(_) => fail("There shouldn't be anyone else updating the resource.") } success.version shouldNot equal(currentVersion) currentVersion = success.version @@ -123,6 +127,7 @@ class KubernetesApiIntegrationTest extends TestKit(ActorSystem("KubernetesApiInt val leaseRecord = underTest.updateLeaseResource(leaseName, client2, currentVersion).futureValue val success: LeaseResource = leaseRecord match { case Right(lr) => lr + case Left(_) => fail("There shouldn't be anyone else updating the resource.") } success.version shouldNot equal(currentVersion) currentVersion = success.version diff --git a/lease-kubernetes/src/it/scala/akka/coordination/lease/kubernetes/LeaseContentionSpec.scala b/lease-kubernetes/src/it/scala/akka/coordination/lease/kubernetes/LeaseContentionSpec.scala index c7f1719fd..502b83075 100644 --- a/lease-kubernetes/src/it/scala/akka/coordination/lease/kubernetes/LeaseContentionSpec.scala +++ b/lease-kubernetes/src/it/scala/akka/coordination/lease/kubernetes/LeaseContentionSpec.scala @@ -2,6 +2,10 @@ package akka.coordination.lease.kubernetes import java.util.concurrent.Executors +import scala.collection.immutable +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + import akka.actor.ActorSystem import akka.coordination.lease.TimeoutSettings import akka.coordination.lease.kubernetes.internal.KubernetesApiImpl @@ -9,10 +13,9 @@ import akka.coordination.lease.scaladsl.LeaseProvider import akka.testkit.TestKit import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import scala.collection.immutable -import scala.concurrent.{ExecutionContext, Future} +import org.scalatest.wordspec.AnyWordSpecLike +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers /** * This test requires an API server available on localhost:8080 with a namespace called lease @@ -35,7 +38,7 @@ class LeaseContentionSpec extends TestKit(ActorSystem("LeaseContentionSpec", Con } """ -))) with WordSpecLike with Matchers with ScalaFutures with BeforeAndAfterAll { +))) with AnyWordSpecLike with Matchers with ScalaFutures with BeforeAndAfterAll { implicit val patience: PatienceConfig = PatienceConfig(testKitSettings.DefaultTimeout.duration) diff --git a/lease-kubernetes/src/main/scala/akka/coordination/lease/kubernetes/LeaseActor.scala b/lease-kubernetes/src/main/scala/akka/coordination/lease/kubernetes/LeaseActor.scala index a0010c0df..02c0a07e4 100644 --- a/lease-kubernetes/src/main/scala/akka/coordination/lease/kubernetes/LeaseActor.scala +++ b/lease-kubernetes/src/main/scala/akka/coordination/lease/kubernetes/LeaseActor.scala @@ -169,7 +169,7 @@ private[akka] class LeaseActor( who ! LeaseAcquired // Try again as lock version has moved on but is not taken pipe(k8sApi.updateLeaseResource(leaseName, ownerName, version).map(r => WriteResponse(r))).to(self) - stay + stay() case Event(WriteResponse(Left(LeaseResource(Some(_), _, _))), OperationInProgress(who, _, _, _)) => // The audacity, someone else has taken the lease :( who ! LeaseTaken @@ -180,14 +180,14 @@ private[akka] class LeaseActor( case Event(Heartbeat, GrantedVersion(version, _)) => log.debug("Heartbeat: updating lease time. Version {}", version) pipe(k8sApi.updateLeaseResource(leaseName, ownerName, version).map(WriteResponse)).to(self) - stay + stay() case Event(WriteResponse(Right(resource)), gv: GrantedVersion) => require( resource.owner.contains(ownerName), "response from API server has different owner for success: " + resource) log.debug("Heartbeat: lease time updated: Version {}", resource.version) startSingleTimer("heartbeat", Heartbeat, settings.timeoutSettings.heartbeatInterval) - stay.using(gv.copy(version = resource.version)) + stay().using(gv.copy(version = resource.version)) case Event(WriteResponse(Left(lr @ _)), GrantedVersion(_, leaseLost)) => log.warning("Conflict during heartbeat to lease {}. Lease assumed to be released.", lr) granted.set(false) @@ -204,7 +204,7 @@ private[akka] class LeaseActor( goto(Releasing).using(OperationInProgress(sender(), version, leaseLost)) case Event(Acquire(leaseLostCallback), gv: GrantedVersion) => sender() ! LeaseAcquired - stay.using(gv.copy(leaseLostCallback = leaseLostCallback)) + stay().using(gv.copy(leaseLostCallback = leaseLostCallback)) } private def executeLeaseLockCallback(callback: Option[Throwable] => Unit, result: Option[Throwable]): Unit = @@ -244,7 +244,7 @@ private[akka] class LeaseActor( ownerName, leaseName, stateName) - stay.using(data) + stay().using(data) case Event(Release(), data @ _) => log.info( "Release request for owner {} lease {} while previous acquire/release still in progress. Current state: {}", @@ -252,7 +252,7 @@ private[akka] class LeaseActor( leaseName, stateName) sender() ! InvalidRequest("Tried to release a lease that is not acquired") - stay.using(data) + stay().using(data) case Event(Failure(t), replyRequired: ReplyRequired) => log.warning( "Failure communicating with the API server for owner {} lease {}: [{}]. Current state: {}", diff --git a/lease-kubernetes/src/main/scala/akka/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala b/lease-kubernetes/src/main/scala/akka/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala index 0b6562120..dd4401e22 100644 --- a/lease-kubernetes/src/main/scala/akka/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala +++ b/lease-kubernetes/src/main/scala/akka/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala @@ -4,25 +4,38 @@ package akka.coordination.lease.kubernetes.internal -import java.nio.file.{ Files, Paths } +import java.nio.file.Files +import java.nio.file.Paths +import java.security.KeyStore +import java.security.SecureRandom + +import scala.collection.immutable +import scala.concurrent.Future +import scala.util.control.NonFatal import akka.Done import akka.actor.ActorSystem import akka.annotation.InternalApi +import akka.coordination.lease.LeaseException +import akka.coordination.lease.LeaseTimeoutException +import akka.coordination.lease.kubernetes.KubernetesApi +import akka.coordination.lease.kubernetes.KubernetesSettings +import akka.coordination.lease.kubernetes.LeaseResource import akka.event.Logging +import akka.http.scaladsl.ConnectionContext import akka.http.scaladsl.Http +import akka.http.scaladsl.HttpsConnectionContext import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken } +import akka.http.scaladsl.model.headers.Authorization +import akka.http.scaladsl.model.headers.OAuth2BearerToken import akka.http.scaladsl.unmarshalling.Unmarshal import akka.pattern.after -import akka.coordination.lease.kubernetes.{ KubernetesApi, KubernetesSettings, LeaseResource } -import akka.coordination.lease.{ LeaseException, LeaseTimeoutException } -import com.typesafe.sslconfig.akka.AkkaSSLConfig -import com.typesafe.sslconfig.ssl.TrustStoreConfig -import scala.collection.immutable -import scala.concurrent.Future -import scala.util.control.NonFatal +import akka.pki.kubernetes.PemManagersProvider +import javax.net.ssl.KeyManager +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.SSLContext +import javax.net.ssl.TrustManager /** * Could be shared between leases: https://github.com/akka/akka-management/issues/680 @@ -37,13 +50,23 @@ import scala.util.control.NonFatal private implicit val sys = system private val log = Logging(system, getClass) private val http = Http()(system) - private val httpsTrustStoreConfig = - TrustStoreConfig(data = None, filePath = Some(settings.apiCaPath)).withStoreType("PEM") - private lazy val httpsConfig = - AkkaSSLConfig()(system).mapSettings(s => - s.withTrustManagerConfig(s.trustManagerConfig.withTrustStoreConfigs(immutable.Seq(httpsTrustStoreConfig)))) - private lazy val httpsContext = http.createClientHttpsContext(httpsConfig) + private lazy val sslContext = { + val certificate = PemManagersProvider.loadCertificate(settings.apiCaPath) + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + val keyStore = KeyStore.getInstance("PKCS12") + keyStore.load(null) + factory.init(keyStore, Array.empty) + val km: Array[KeyManager] = factory.getKeyManagers + val tm: Array[TrustManager] = + PemManagersProvider.buildTrustManagers(certificate) + val random: SecureRandom = new SecureRandom + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(km, tm, random) + sslContext + } + + private lazy val clientSslContext: HttpsConnectionContext = ConnectionContext.httpsClient(sslContext) private val namespace = settings.namespace.orElse(readConfigVarFromFilesystem(settings.namespacePath, "namespace")).getOrElse("default") @@ -105,9 +128,11 @@ PUTs must contain resourceVersions. Response: * Must [[readOrCreateLeaseResource]] to first to get a resource version. * * Can return one of three things: - * - Failure, e.g. timed out waiting for k8s api server to respond - * - Update failed due to version not matching current in the k8s api server. In this case resource is returned so the version can be used for subsequent calls - * - Success. Returns the LeaseResource that contains the clientName and new version. The new version should be used for any subsequent calls + * - Future.Failure, e.g. timed out waiting for k8s api server to respond + * - Future.sucess[Left(resource)]: the update failed due to version not matching current in the k8s api server. + * In this case the current resource is returned so the version can be used for subsequent calls + * - Future.sucess[Right(resource)]: Returns the LeaseResource that contains the clientName and new version. + * The new version should be used for any subsequent calls */ override def updateLeaseResource( leaseName: String, @@ -239,7 +264,7 @@ PUTs must contain resourceVersions. Response: private def makeRequest(request: HttpRequest, timeoutMsg: String): Future[HttpResponse] = { val response = if (settings.secure) - http.singleRequest(request, httpsContext) + http.singleRequest(request, clientSslContext) else http.singleRequest(request) diff --git a/lease-kubernetes/src/test/scala/akka/coordination/lease/kubernetes/KubernetesApiSpec.scala b/lease-kubernetes/src/test/scala/akka/coordination/lease/kubernetes/KubernetesApiSpec.scala index a7fb302ae..a4d04261f 100644 --- a/lease-kubernetes/src/test/scala/akka/coordination/lease/kubernetes/KubernetesApiSpec.scala +++ b/lease-kubernetes/src/test/scala/akka/coordination/lease/kubernetes/KubernetesApiSpec.scala @@ -4,19 +4,20 @@ package akka.coordination.lease.kubernetes +import scala.concurrent.duration._ + import akka.Done import akka.actor.ActorSystem -import akka.http.scaladsl.model.StatusCodes import akka.coordination.lease.kubernetes.internal.KubernetesApiImpl +import akka.http.scaladsl.model.StatusCodes import akka.testkit.TestKit import com.github.tomakehurst.wiremock.WireMockServer import com.github.tomakehurst.wiremock.client.WireMock -import org.scalatest.concurrent.ScalaFutures import com.github.tomakehurst.wiremock.client.WireMock._ import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig -import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } - -import scala.concurrent.duration._ +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -30,6 +31,7 @@ class KubernetesApiSpec val wireMockServer = new WireMockServer(wireMockConfig().port(0)) wireMockServer.start() + val settings = new KubernetesSettings( "", "", diff --git a/management-pki/src/main/scala/akka/pki/kubernetes/PemManagersProvider.scala b/management-pki/src/main/scala/akka/pki/kubernetes/PemManagersProvider.scala new file mode 100644 index 000000000..7061a9eba --- /dev/null +++ b/management-pki/src/main/scala/akka/pki/kubernetes/PemManagersProvider.scala @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2017-2021 Lightbend Inc. + */ + +package akka.pki.kubernetes + +import java.io.ByteArrayInputStream +import java.io.File +import java.nio.charset.Charset +import java.nio.file.Files +import java.security.KeyStore +import java.security.PrivateKey +import java.security.cert.Certificate +import java.security.cert.CertificateFactory + +import scala.concurrent.blocking + +import akka.annotation.InternalApi +import akka.pki.pem.DERPrivateKeyLoader +import akka.pki.pem.PEMDecoder +import javax.net.ssl.TrustManager +import javax.net.ssl.TrustManagerFactory + +/** + * INTERNAL API + * Convenience methods to ease building an SSLContext from k8s-provided PEM files. + */ +// Duplicate from https://github.com/akka/akka/blob/31f654768f86db68f4c22daa2cbd0bae28fc1fad/akka-remote/src/main/scala/akka/remote/artery/tcp/ssl/PemManagersProvider.scala#L35 +// Eventually that will be a bit more open and we can reuse the class from akka in akka-management. +// See also https://github.com/akka/akka-http/issues/3772 +@InternalApi +private[akka] object PemManagersProvider { + + /** + * INTERNAL API + */ + @InternalApi def buildTrustManagers(cacert: Certificate): Array[TrustManager] = { + val trustStore = KeyStore.getInstance("JKS") + trustStore.load(null) + trustStore.setCertificateEntry("cacert", cacert) + + val tmf = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + tmf.init(trustStore) + tmf.getTrustManagers + } + + /** + * INTERNAL API + */ + @InternalApi def loadPrivateKey(filename: String): PrivateKey = blocking { + val bytes = Files.readAllBytes(new File(filename).toPath) + val pemData = new String(bytes, Charset.forName("UTF-8")) + DERPrivateKeyLoader.load(PEMDecoder.decode(pemData)) + } + + private val certFactory = CertificateFactory.getInstance("X.509") + + /** + * INTERNAL API + */ + @InternalApi def loadCertificate(filename: String): Certificate = blocking { + val bytes = Files.readAllBytes(new File(filename).toPath) + certFactory.generateCertificate(new ByteArrayInputStream(bytes)) + } + +} diff --git a/management/src/main/scala/akka/management/javadsl/AkkaManagement.scala b/management/src/main/scala/akka/management/javadsl/AkkaManagement.scala index 710a09257..04242edb5 100644 --- a/management/src/main/scala/akka/management/javadsl/AkkaManagement.scala +++ b/management/src/main/scala/akka/management/javadsl/AkkaManagement.scala @@ -31,7 +31,7 @@ final class AkkaManagement(delegate: scaladsl.AkkaManagement) { * Get the routes for the HTTP management endpoint. * * This method can be used to embed the Akka management routes in an existing Akka HTTP server. - * @throws IllegalArgumentException if routes not configured for akka management + * @throws java.lang.IllegalArgumentException if routes not configured for akka management */ def getRoutes: akka.http.javadsl.server.Route = RouteAdapter(delegate.routes) @@ -42,7 +42,7 @@ final class AkkaManagement(delegate: scaladsl.AkkaManagement) { * Use this when adding authentication and HTTPS. * * This method can be used to embed the Akka management routes in an existing Akka HTTP server. - * @throws IllegalArgumentException if routes not configured for akka management + * @throws java.lang.IllegalArgumentException if routes not configured for akka management */ def getRoutes(transformSettings: JFunction[ManagementRouteProviderSettings, ManagementRouteProviderSettings]) : akka.http.javadsl.server.Route = diff --git a/management/src/main/scala/akka/management/javadsl/HealthChecks.scala b/management/src/main/scala/akka/management/javadsl/HealthChecks.scala index 067955baa..01ee1bb1f 100644 --- a/management/src/main/scala/akka/management/javadsl/HealthChecks.scala +++ b/management/src/main/scala/akka/management/javadsl/HealthChecks.scala @@ -66,7 +66,7 @@ object ReadinessCheckSetup { } /** - * Setup for readiness checks, constructor is *Internal API*, use factories in [[ReadinessCheckSetup()]] + * Setup for readiness checks, constructor is *Internal API*, use factories in [[ReadinessCheckSetup]] */ final class ReadinessCheckSetup private ( val createHealthChecks: JFunction[ActorSystem, JList[Supplier[CompletionStage[java.lang.Boolean]]]] @@ -85,7 +85,7 @@ object LivenessCheckSetup { } /** - * Setup for liveness checks, constructor is *Internal API*, use factories in [[LivenessCheckSetup()]] + * Setup for liveness checks, constructor is *Internal API*, use factories in [[LivenessCheckSetup]] */ final class LivenessCheckSetup private ( val createHealthChecks: JFunction[ActorSystem, JList[Supplier[CompletionStage[java.lang.Boolean]]]] diff --git a/management/src/main/scala/akka/management/scaladsl/AkkaManagement.scala b/management/src/main/scala/akka/management/scaladsl/AkkaManagement.scala index 5331e5231..d81433f1d 100644 --- a/management/src/main/scala/akka/management/scaladsl/AkkaManagement.scala +++ b/management/src/main/scala/akka/management/scaladsl/AkkaManagement.scala @@ -34,7 +34,6 @@ import akka.http.scaladsl.server.Directives.pathPrefix import akka.http.scaladsl.server.Directives.rawPathPrefix import akka.http.scaladsl.server.PathMatchers import akka.http.scaladsl.server.Route -import akka.http.scaladsl.server.RouteResult import akka.http.scaladsl.server.directives.Credentials import akka.http.scaladsl.settings.ServerSettings import akka.management.AkkaManagementSettings @@ -96,7 +95,7 @@ final class AkkaManagement(implicit private[akka] val system: ExtendedActorSyste * * This method can be used to embed the Akka management routes in an existing Akka HTTP server. * - * @throws IllegalArgumentException if routes not configured for akka management + * @throws java.lang.IllegalArgumentException if routes not configured for akka management */ def routes: Route = prepareCombinedRoutes(providerSettings) @@ -107,7 +106,7 @@ final class AkkaManagement(implicit private[akka] val system: ExtendedActorSyste * * This method can be used to embed the Akka management routes in an existing Akka HTTP server. * - * @throws IllegalArgumentException if routes not configured for akka management + * @throws java.lang.IllegalArgumentException if routes not configured for akka management */ def routes(transformSettings: ManagementRouteProviderSettings => ManagementRouteProviderSettings): Route = prepareCombinedRoutes(transformSettings(providerSettings)) @@ -144,17 +143,15 @@ final class AkkaManagement(implicit private[akka] val system: ExtendedActorSyste val combinedRoutes = prepareCombinedRoutes(effectiveProviderSettings) - val connectionContext = - effectiveProviderSettings.httpsConnectionContext.getOrElse(Http().defaultServerHttpContext) + val baseBuilder = Http() + .newServerAt(effectiveBindHostname, effectiveBindPort) + .withSettings(ServerSettings(system).withRemoteAddressHeader(true)) - val serverFutureBinding = - Http().bindAndHandle( - RouteResult.route2HandlerFlow(combinedRoutes), - effectiveBindHostname, - effectiveBindPort, - connectionContext = connectionContext, - settings = ServerSettings(system).withRemoteAddressHeader(true) - ) + val securedBuilder = effectiveProviderSettings.httpsConnectionContext match { + case Some(httpsContext) => baseBuilder.enableHttps(httpsContext) + case None => baseBuilder + } + val serverFutureBinding = securedBuilder.bind(combinedRoutes) serverBindingPromise.completeWith(serverFutureBinding).future.flatMap { binding => val boundPort = binding.localAddress.getPort diff --git a/management/src/main/scala/akka/management/scaladsl/HealthChecks.scala b/management/src/main/scala/akka/management/scaladsl/HealthChecks.scala index 5b5eaefe5..433ad2049 100644 --- a/management/src/main/scala/akka/management/scaladsl/HealthChecks.scala +++ b/management/src/main/scala/akka/management/scaladsl/HealthChecks.scala @@ -65,7 +65,7 @@ object ReadinessCheckSetup { } /** - * Setup for readiness checks, constructor is *Internal API*, use factories in [[ReadinessCheckSetup()]] + * Setup for readiness checks, constructor is *Internal API*, use factories in [[ReadinessCheckSetup]] */ final class ReadinessCheckSetup private ( val createHealthChecks: ActorSystem => immutable.Seq[HealthChecks.HealthCheck] @@ -83,7 +83,7 @@ object LivenessCheckSetup { } /** - * Setup for liveness checks, constructor is *Internal API*, use factories in [[LivenessCheckSetup()]] + * Setup for liveness checks, constructor is *Internal API*, use factories in [[LivenessCheckSetup]] */ final class LivenessCheckSetup private ( val createHealthChecks: ActorSystem => immutable.Seq[HealthChecks.HealthCheck] diff --git a/management/src/test/scala/akka/management/AkkaManagementHttpEndpointSpec.scala b/management/src/test/scala/akka/management/AkkaManagementHttpEndpointSpec.scala index 41032bd1f..8571c56dd 100644 --- a/management/src/test/scala/akka/management/AkkaManagementHttpEndpointSpec.scala +++ b/management/src/test/scala/akka/management/AkkaManagementHttpEndpointSpec.scala @@ -177,25 +177,41 @@ class AkkaManagementHttpEndpointSpec extends AnyWordSpecLike with Matchers { require(keystore != null, "Keystore required!") ks.load(keystore, password) - val keyManagerFactory: KeyManagerFactory = KeyManagerFactory.getInstance("SunX509") + val keyManagerFactory: KeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) keyManagerFactory.init(ks, password) - val tmf: TrustManagerFactory = TrustManagerFactory.getInstance("SunX509") + val tmf: TrustManagerFactory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) tmf.init(ks) val sslContext: SSLContext = SSLContext.getInstance("TLS") sslContext.init(keyManagerFactory.getKeyManagers, tmf.getTrustManagers, new SecureRandom) + + // A custom httpsClient for tests (without endpoint verification) + val httpsClient: HttpsConnectionContext = ConnectionContext.httpsClient((host, port) => { + val engine = sslContext.createSSLEngine(host, port) + engine.setUseClientMode(true) + // disable endpoint verification for tests + engine.setSSLParameters({ + val params = engine.getSSLParameters + params.setEndpointIdentificationAlgorithm(null) + params + }) + + engine + }) + //#start-akka-management-with-https-context val management = AkkaManagement(system) - val https: HttpsConnectionContext = ConnectionContext.https(sslContext) - val started = management.start(_.withHttpsConnectionContext(https)) + val httpsServer: HttpsConnectionContext = ConnectionContext.httpsServer(sslContext) + + val started = management.start(_.withHttpsConnectionContext(httpsServer)) //#start-akka-management-with-https-context Await.result(started, 10.seconds) val httpRequest = HttpRequest(uri = s"https://127.0.0.1:$httpPort/scaladsl") - val responseGetMembersFuture = Http().singleRequest(httpRequest, connectionContext = https) + val responseGetMembersFuture = Http().singleRequest(httpRequest, connectionContext = httpsClient) val responseGetMembers = Await.result(responseGetMembersFuture, 5.seconds) responseGetMembers.status shouldEqual StatusCodes.OK diff --git a/project/Common.scala b/project/Common.scala index 8cc4ccec7..6021de657 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -36,7 +36,8 @@ object Common extends AutoPlugin { crossScalaVersions := Dependencies.CrossScalaVersions, projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value), crossVersion := CrossVersion.binary, - scalacOptions ++= Seq( + scalacOptions ++= { + var scalacOptionsBase = Seq( "-encoding", "UTF-8", "-feature", @@ -44,9 +45,13 @@ object Common extends AutoPlugin { "-deprecation", "-Xlint", "-Ywarn-dead-code", - "-Xfuture", "-target:jvm-1.8" - ), + ) + if (scalaVersion.value == Dependencies.Scala212) + scalacOptionsBase ++: Seq("-Xfuture", "-Xfatal-warnings") + else + scalacOptionsBase + }, javacOptions ++= Seq( "-Xlint:unchecked" ), diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 029336b38..46dc5027d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,8 +3,6 @@ import Keys._ object Dependencies { - val CronBuild = sys.env.get("TRAVIS_EVENT_TYPE").contains("cron") - val Scala212 = "2.12.13" val Scala213 = "2.13.5" val CrossScalaVersions = Seq(Dependencies.Scala212, Dependencies.Scala213) @@ -13,10 +11,8 @@ object Dependencies { val AkkaVersion = "2.6.14" val AkkaBinaryVersion = "2.6" // Align the versions in integration-test/kubernetes-api-java/pom.xml - val AkkaHttp101 = "10.1.11" - val AkkaHttp102 = "10.2.0" - val AkkaHttpVersion = if (CronBuild) AkkaHttp102 else AkkaHttp101 - val AkkaHttpBinaryVersion = if (CronBuild) "10.2" else "10.1" + val AkkaHttpVersion = "10.2.0" + val AkkaHttpBinaryVersion = "10.2" val ScalaTestVersion = "3.1.4" val ScalaTestPlusJUnitVersion = ScalaTestVersion + ".0" @@ -96,6 +92,12 @@ object Dependencies { "org.scalatestplus" %% "junit-4-13" % ScalaTestPlusJUnitVersion % Test ) + val ManagementPki = Seq( + "com.typesafe.akka" %% "akka-pki" % AkkaVersion, + "org.scalatest" %% "scalatest" % ScalaTestVersion % Test, + "org.scalatestplus" %% "junit-4-13" % ScalaTestPlusJUnitVersion % Test + ) + val LoglevelsLogback = Seq( "com.typesafe.akka" %% "akka-actor" % AkkaVersion, "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,