Skip to content

Commit

Permalink
try scala 3 (apache#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning authored and mdedetrich committed Jul 3, 2023
1 parent a7e23f7 commit 1ec219d
Show file tree
Hide file tree
Showing 109 changed files with 416 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.concurrent.{ Future, Promise }
* the queue named in the replyTo options of the message instead of from settings declared at construction.
*/
@InternalApi
private[amqp] final class AmqpReplyToSinkStage(settings: AmqpReplyToSinkSettings)
private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToSinkSettings)
extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage =>

val in = Inlet[WriteMessage]("AmqpReplyToSink.in")
Expand All @@ -41,7 +41,7 @@ private[amqp] final class AmqpReplyToSinkStage(settings: AmqpReplyToSinkSettings
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val streamCompletion = Promise[Done]()
(new GraphStageLogic(shape) with AmqpConnectorLogic {
override val settings = stage.settings
override val settings: AmqpReplyToSinkSettings = stage.replyToSinkSettings

override def whenConnected(): Unit = pull(in)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import scala.util.Success
* can be overridden per message by including `expectedReplies` in the the header of the [[pekko.stream.connectors.amqp.WriteMessage]]
*/
@InternalApi
private[amqp] final class AmqpRpcFlowStage(settings: AmqpWriteSettings, bufferSize: Int, responsesPerMessage: Int = 1)
private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, bufferSize: Int,
responsesPerMessage: Int = 1)
extends GraphStageWithMaterializedValue[FlowShape[WriteMessage, CommittableReadResult], Future[String]] {
stage =>

Expand All @@ -53,7 +54,7 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpWriteSettings, bufferSi
val streamCompletion = Promise[String]()
(new GraphStageLogic(shape) with AmqpConnectorLogic {

override val settings = stage.settings
override val settings: AmqpWriteSettings = stage.writeSettings
private val exchange = settings.exchange.getOrElse("")
private val routingKey = settings.routingKey.getOrElse("")
private val queue = mutable.Queue[CommittableReadResult]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.concurrent.{ Future, Promise }
* instead of complete [[WriteResult]] (possibly it would be less confusing for users), but [[WriteResult]] is used
* for consistency with other variants and to make the flow ready for any possible future [[WriteResult]] extensions.
*/
@InternalApi private[amqp] final class AmqpSimpleFlowStage[T](settings: AmqpWriteSettings)
@InternalApi private[amqp] final class AmqpSimpleFlowStage[T](writeSettings: AmqpWriteSettings)
extends GraphStageWithMaterializedValue[FlowShape[(WriteMessage, T), (WriteResult, T)], Future[Done]] { stage =>

private val in: Inlet[(WriteMessage, T)] = Inlet(Logging.simpleName(this) + ".in")
Expand All @@ -45,7 +45,7 @@ import scala.concurrent.{ Future, Promise }

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val streamCompletion = Promise[Done]()
(new AbstractAmqpFlowStageLogic[T](settings, streamCompletion, shape) {
(new AbstractAmqpFlowStageLogic[T](writeSettings, streamCompletion, shape) {
override def publish(message: WriteMessage, passThrough: T): Unit = {
log.debug("Publishing message {}.", message)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import pekko.stream.connectors.amqp.impl
import pekko.stream.connectors.amqp.{ AmqpSourceSettings, ReadResult }
import pekko.stream.scaladsl.Source

import scala.concurrent.ExecutionContext

object AmqpSource {
private implicit val executionContext = ExecutionContexts.parasitic
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic

/**
* Scala API: Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ
Expand Down
2 changes: 1 addition & 1 deletion amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.immutable
*/
class AmqpDocsSpec extends AmqpSpec {

override implicit val patienceConfig = PatienceConfig(10.seconds)
override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)

val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful(_)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.ExecutionContext

abstract class AmqpSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with LogCapturing {

implicit val system = ActorSystem(this.getClass.getSimpleName)
implicit val executionContext = ExecutionContexts.parasitic
implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName)
implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic

override protected def afterAll(): Unit =
system.terminate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package org.apache.pekko.stream.connectors.amqp.scaladsl

import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicInteger

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.dispatch.ExecutionContexts
Expand All @@ -33,7 +32,7 @@ import com.rabbitmq.client.{ AddressResolver, Connection, ConnectionFactory, Shu
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.BeforeAndAfterEach

import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
import scala.util.control.NonFatal
import org.scalatest.matchers.should.Matchers
Expand All @@ -50,8 +49,8 @@ class AmqpGraphStageLogicConnectionShutdownSpec
with BeforeAndAfterEach
with LogCapturing {

override implicit val patienceConfig = PatienceConfig(10.seconds)
private implicit val executionContext = ExecutionContexts.parasitic
override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic

val shutdownsAdded = new AtomicInteger()
val shutdownsRemoved = new AtomicInteger()
Expand All @@ -76,7 +75,7 @@ class AmqpGraphStageLogicConnectionShutdownSpec
"registers and unregisters a single connection shutdown hook per graph" in {
// actor system is within this test as it has to be shut down in order
// to verify graph stage termination
implicit val system = ActorSystem(this.getClass.getSimpleName + System.currentTimeMillis())
implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName + System.currentTimeMillis())

val connectionFactory = new ConnectionFactory() {
override def newConnection(es: ExecutorService, ar: AddressResolver, name: String): Connection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.concurrent.duration._

class EventBridgePublisherSpec extends AnyFlatSpec with Matchers with ScalaFutures with IntegrationTestContext {

implicit val defaultPatience =
implicit val defaultPatience: PatienceConfig =
PatienceConfig(timeout = 15.seconds, interval = 100.millis)

"EventBridge Publisher sink" should "send PutEventsEntry message" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package docs.scaladsl

import java.util.concurrent.CompletableFuture

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.connectors.awslambda.scaladsl.AwsLambdaFlow
Expand All @@ -36,7 +35,7 @@ import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.lambda.LambdaAsyncClient
import software.amazon.awssdk.services.lambda.model.{ InvokeRequest, InvokeResponse }

import scala.concurrent.Await
import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.duration._

class AwsLambdaFlowSpec
Expand All @@ -49,9 +48,9 @@ class AwsLambdaFlowSpec
with MockitoSugar
with LogCapturing {

implicit val ec = system.dispatcher
implicit val ec: ExecutionContext = system.dispatcher

implicit val awsLambdaClient = mock[LambdaAsyncClient]
implicit val awsLambdaClient: LambdaAsyncClient = mock[LambdaAsyncClient]

override protected def afterEach(): Unit = {
reset(awsLambdaClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object AzureQueueWithTimeoutsSink {
* of a [[com.microsoft.azure.storage.queue.CloudQueueMessage]] a [[MessageWithTimeouts]].
*/
def create(cloudQueue: Supplier[CloudQueue]): Sink[MessageWithTimeouts, CompletionStage[Done]] =
AzureQueueSink.fromFunction { input: MessageWithTimeouts =>
AzureQueueSink.fromFunction[MessageWithTimeouts] { input =>
AzureQueueSinkFunctions
.addMessage(() => cloudQueue.get)(input.message, input.timeToLive, input.initialVisibility)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ final class CassandraSession(@InternalApi private[pekko] val delegate: scaladsl.
onClose: java.lang.Runnable) =
this(system.classicSystem, sessionProvider, executionContext, log, metricsCategory, init, onClose)

implicit private val ec = delegate.ec
implicit private val ec: ExecutionContext = delegate.ec

/**
* Closes the underlying Cassandra session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package org.apache.pekko.stream.connectors.cassandra.scaladsl

import java.util.concurrent.CompletionStage
import java.util.concurrent.atomic.AtomicInteger

import org.apache.pekko
import pekko.Done
import pekko.testkit.TestKitBase
Expand All @@ -27,7 +26,7 @@ import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.util.control.NonFatal

trait CassandraLifecycleBase {
Expand Down Expand Up @@ -71,7 +70,7 @@ trait CassandraLifecycleBase {
executeCql(lifecycleSession, statements.asScala.toList).asJava

def withSchemaMetadataDisabled(block: => Future[Done]): Future[Done] = {
implicit val ec = lifecycleSession.ec
implicit val ec: ExecutionContext = lifecycleSession.ec
lifecycleSession.underlying().flatMap { cqlSession =>
cqlSession.setSchemaMetadataEnabled(false)
val blockResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import pekko.stream.connectors.couchbase.{
import pekko.stream.scaladsl.Flow
import com.couchbase.client.java.document.{ Document, JsonDocument }

import scala.concurrent.ExecutionContext

/**
* Scala API: Factory methods for Couchbase flows.
*/
Expand Down Expand Up @@ -104,7 +106,7 @@ object CouchbaseFlow {
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(doc => {
implicit val executor = materializer.system.dispatcher
implicit val executor: ExecutionContext = materializer.system.dispatcher
session
.flatMap(_.upsertDoc(doc, writeSettings))
.map(_ => CouchbaseWriteSuccess(doc))
Expand Down Expand Up @@ -157,7 +159,7 @@ object CouchbaseFlow {
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(doc => {
implicit val executor = materializer.system.dispatcher
implicit val executor: ExecutionContext = materializer.system.dispatcher
session
.flatMap(_.replaceDoc(doc, writeSettings))
.map(_ => CouchbaseWriteSuccess(doc))
Expand All @@ -179,7 +181,7 @@ object CouchbaseFlow {
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(writeSettings.parallelism)(id => {
implicit val executor = materializer.system.dispatcher
implicit val executor: ExecutionContext = materializer.system.dispatcher
session
.flatMap(_.remove(id, writeSettings))
.map(_ => id)
Expand All @@ -198,7 +200,7 @@ object CouchbaseFlow {
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(writeSettings.parallelism)(id => {
implicit val executor = materializer.system.dispatcher
implicit val executor: ExecutionContext = materializer.system.dispatcher
session
.flatMap(_.remove(id, writeSettings))
.map(_ => CouchbaseDeleteSuccess(id))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import pekko.util.FutureConverters._
import com.typesafe.config.Config

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.FiniteDuration

/**
Expand All @@ -39,7 +39,7 @@ sealed class DiscoverySupport private {
private def readNodes(
serviceName: String,
lookupTimeout: FiniteDuration)(implicit system: ClassicActorSystemProvider): Future[immutable.Seq[String]] = {
implicit val ec = system.classicSystem.dispatcher
implicit val ec: ExecutionContext = system.classicSystem.dispatcher
val discovery = Discovery(system).discovery
discovery.lookup(serviceName, lookupTimeout).map { resolved =>
resolved.addresses.map(_.host)
Expand All @@ -63,7 +63,7 @@ sealed class DiscoverySupport private {
def nodes(
config: Config)(
implicit system: ClassicActorSystemProvider): CouchbaseSessionSettings => Future[CouchbaseSessionSettings] = {
implicit val ec = system.classicSystem.dispatcher
implicit val ec: ExecutionContext = system.classicSystem.dispatcher
settings =>
readNodes(config)
.map { nodes =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package org.apache.pekko.stream.connectors.csv.scaladsl

import java.util.concurrent.TimeUnit

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
Expand All @@ -15,7 +14,7 @@ import pekko.util.ByteString
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

import scala.concurrent.Await
import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.duration.Duration

/**
Expand Down Expand Up @@ -50,9 +49,9 @@ import scala.concurrent.duration.Duration
@State(Scope.Benchmark)
class CsvBench {

implicit val system = ActorSystem()
implicit val executionContext = system.dispatcher
implicit val mat = ActorMaterializer()
implicit val system: ActorSystem = ActorSystem()
implicit val executionContext: ExecutionContext = system.dispatcher
implicit val mat: ActorMaterializer = ActorMaterializer()

/**
* Size of [[ByteString]] chunks in bytes.
Expand Down
2 changes: 1 addition & 1 deletion csv/src/test/scala/docs/scaladsl/CsvSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ abstract class CsvSpec
with ScalaFutures
with LogCapturing {

implicit val system = ActorSystem(this.getClass.getSimpleName)
implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName)

override protected def afterAll(): Unit =
TestKit.shutdownActorSystem(system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.scalatest.wordspec.AnyWordSpec

class PassThroughExamples extends AnyWordSpec with BeforeAndAfterAll with Matchers with ScalaFutures {

implicit val system = ActorSystem("Test")
implicit val system: ActorSystem = ActorSystem("Test")

"PassThroughFlow" should {
" original message is maintained " in {
Expand Down Expand Up @@ -103,7 +103,7 @@ object PassThroughFlow {
//#PassThrough

object PassThroughFlowKafkaCommitExample {
implicit val system = ActorSystem("Test")
implicit val system: ActorSystem = ActorSystem("Test")

def dummy(): Unit = {
// #passThroughKafkaFlow
Expand Down
3 changes: 1 addition & 2 deletions dynamodb/src/test/scala/docs/scaladsl/ExampleSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ import java.net.URI

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.stream.connectors.testkit.scaladsl.LogCapturing
import pekko.stream.scaladsl.{ FlowWithContext, SourceWithContext }

import scala.util.{ Failure, Success, Try }
//#init-client
import org.apache.pekko.actor.ActorSystem

//#init-client
import org.apache.pekko
import pekko.stream.connectors.dynamodb.DynamoDbOp._
import pekko.stream.connectors.dynamodb.scaladsl._
import pekko.stream.scaladsl.{ Sink, Source }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ trait ElasticsearchConnectorBehaviour {

val indexName = "sink7"
val createBooks = Source(books)
.map { book: (String, Book) =>
.map { (book: (String, Book)) =>
WriteMessage.createUpsertMessage(id = book._1, source = book._2)
}
.via(
Expand All @@ -391,7 +391,7 @@ trait ElasticsearchConnectorBehaviour {

// Update sink7/_doc with the second dataset
val upserts = Source(updatedBooks)
.map { book: (String, JsObject) =>
.map { (book: (String, JsObject)) =>
WriteMessage.createUpsertMessage(id = book._1, source = book._2)
}
.via(
Expand Down Expand Up @@ -453,7 +453,7 @@ trait ElasticsearchConnectorBehaviour {
"read and write document-version if configured to do so" in {

case class VersionTestDoc(id: String, name: String, value: Int)
implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc)
implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc.apply)

val indexName = "version-test-scala"
val typeName = "_doc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait ElasticsearchSpecUtils { this: AnyWordSpec with ScalaFutures =>

case class Book(title: String, shouldSkip: Option[Boolean] = None, price: Int = 10)

implicit val format: JsonFormat[Book] = jsonFormat3(Book)
implicit val format: JsonFormat[Book] = jsonFormat3(Book.apply)
// #define-class

def register(connectionSettings: ElasticsearchConnectionSettings,
Expand Down
Loading

0 comments on commit 1ec219d

Please sign in to comment.