Skip to content

Commit

Permalink
Release proper InetSocketAddress
Browse files Browse the repository at this point in the history
  • Loading branch information
divinae committed Nov 15, 2024
1 parent 6dd3a97 commit e78ee9b
Showing 1 changed file with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ internal class Endpoint(
private val deliveryPoint: Channel<RequestTask> = Channel()
private val maxEndpointIdleTime: Long = 2 * config.endpoint.connectTimeout

private var connectionAddress: InetSocketAddress? = null

private val timeout = launch(coroutineContext + CoroutineName("Endpoint timeout($host:$port)")) {
try {
while (true) {
Expand Down Expand Up @@ -100,7 +98,7 @@ internal class Endpoint(
callContext: CoroutineContext
): HttpResponseData {
try {
val connection = connect(request)
val (address, connection) = connect(request)
val input = connection.input
val originOutput = connection.output

Expand All @@ -118,7 +116,7 @@ internal class Endpoint(
} catch (cause: Throwable) {
LOGGER.debug("An error occurred while closing connection", cause)
} finally {
releaseConnection()
releaseConnection(address)
}
}

Expand Down Expand Up @@ -182,7 +180,7 @@ internal class Endpoint(
}

private suspend fun createPipeline(request: HttpRequestData) {
val connection = connect(request)
val (address, connection) = connect(request)

val pipeline = ConnectionPipeline(
config.endpoint.keepAliveTime,
Expand All @@ -193,11 +191,11 @@ internal class Endpoint(
coroutineContext
)

pipeline.pipelineContext.invokeOnCompletion { releaseConnection() }
pipeline.pipelineContext.invokeOnCompletion { releaseConnection(address) }
}

@Suppress("UNUSED_EXPRESSION")
private suspend fun connect(requestData: HttpRequestData): Connection {
private suspend fun connect(requestData: HttpRequestData): Pair<InetSocketAddress, Connection> {
val connectAttempts = config.endpoint.connectAttempts
val (connectTimeout, socketTimeout) = retrieveTimeouts(requestData)
var timeoutFails = 0
Expand All @@ -211,7 +209,7 @@ internal class Endpoint(
val connect: suspend CoroutineScope.() -> Socket = {
connectionFactory.connect(address) {
this.socketTimeout = socketTimeout
}.also { connectionAddress = address }
}
}

val socket = when (connectTimeout) {
Expand All @@ -227,7 +225,7 @@ internal class Endpoint(
}

val connection = socket.connection()
if (!secure) return@connect connection
if (!secure) return@connect address to connection

try {
if (proxy?.type == ProxyType.HTTP) {
Expand All @@ -241,7 +239,7 @@ internal class Endpoint(
takeFrom(config.https)
serverName = serverName ?: realAddress.hostname
}
return tlsSocket.connection()
return address to tlsSocket.connection()
} catch (cause: Throwable) {
try {
socket.close()
Expand Down Expand Up @@ -288,8 +286,7 @@ internal class Endpoint(
return connectTimeout to socketTimeout
}

private fun releaseConnection() {
val address = connectionAddress ?: return
private fun releaseConnection(address: InetSocketAddress) {
connectionFactory.release(address)
connections.decrementAndGet()
}
Expand Down

0 comments on commit e78ee9b

Please sign in to comment.