Skip to content

Commit

Permalink
support scala3 for amqp connector (#154)
Browse files Browse the repository at this point in the history
format
  • Loading branch information
pjfanning committed Jun 26, 2023
1 parent 2a07730 commit 822ac3d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,14 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
def withAutomaticRelease(automaticRelease: Boolean): AmqpCachedConnectionProvider =
copy(automaticRelease = automaticRelease)

override def get: Connection = getRecursive(provider)

@tailrec
override def get: Connection = state.get match {
private def getRecursive(amqpConnectionProvider: AmqpConnectionProvider): Connection = state.get match {
case Empty =>
if (state.compareAndSet(Empty, Connecting)) {
try {
val connection = provider.get
val connection = amqpConnectionProvider.get
if (!state.compareAndSet(Connecting, Connected(connection, 1)))
throw new ConcurrentModificationException(
"Unexpected concurrent modification while creating the connection.")
Expand All @@ -391,34 +393,38 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
state.compareAndSet(Connecting, Empty)
throw e
}
} else get
case Connecting => get
} else getRecursive(amqpConnectionProvider)
case Connecting => getRecursive(amqpConnectionProvider)
case c @ Connected(connection, clients) =>
if (state.compareAndSet(c, Connected(connection, clients + 1))) connection
else get
case Closing => get
else getRecursive(amqpConnectionProvider)
case Closing => getRecursive(amqpConnectionProvider)
}

override def release(connection: Connection): Unit = releaseRecursive(provider, connection)

@tailrec
override def release(connection: Connection): Unit = state.get match {
case Empty => throw new IllegalStateException("There is no connection to release.")
case Connecting => release(connection)
case c @ Connected(cachedConnection, clients) =>
if (cachedConnection != connection)
throw new IllegalArgumentException("Can't release a connection that's not owned by this provider")

if (clients == 1 || !automaticRelease) {
if (state.compareAndSet(c, Closing)) {
provider.release(connection)
if (!state.compareAndSet(Closing, Empty))
throw new ConcurrentModificationException(
"Unexpected concurrent modification while closing the connection.")
private def releaseRecursive(amqpConnectionProvider: AmqpConnectionProvider, connection: Connection): Unit =
state.get match {
case Empty => throw new IllegalStateException("There is no connection to release.")
case Connecting => releaseRecursive(amqpConnectionProvider, connection)
case c @ Connected(cachedConnection, clients) =>
if (cachedConnection != connection)
throw new IllegalArgumentException("Can't release a connection that's not owned by this provider")

if (clients == 1 || !automaticRelease) {
if (state.compareAndSet(c, Closing)) {
amqpConnectionProvider.release(connection)
if (!state.compareAndSet(Closing, Empty))
throw new ConcurrentModificationException(
"Unexpected concurrent modification while closing the connection.")
}
} else {
if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
releaseRecursive(amqpConnectionProvider, connection)
}
} else {
if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1))) release(connection)
}
case Closing => release(connection)
}
case Closing => releaseRecursive(amqpConnectionProvider, connection)
}

private def copy(automaticRelease: Boolean): AmqpCachedConnectionProvider =
new AmqpCachedConnectionProvider(provider, automaticRelease)
Expand Down
1 change: 0 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ object Dependencies {
"com.fasterxml.jackson.core" % "jackson-databind" % JacksonDatabindVersion)

val Amqp = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"com.rabbitmq" % "amqp-client" % "5.14.2" // APLv2
) ++ Mockito)
Expand Down

0 comments on commit 822ac3d

Please sign in to comment.