-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade a JS socket to TLS directly if possible #3341
base: main
Are you sure you want to change the base?
Changes from all commits
f3091ce
40d2113
7d7d543
02e788a
7874843
297fc7d
12e95aa
b49edb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,16 +38,29 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type => | |
|
||
private[tls] def forAsync[F[_]]( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we rename this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or couldn't we just pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see, it was bincompat. But this is all private anyway. We can just add the exclusion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I didn't want to just add the exclusion unilaterally. Since it was fairly easy to avoid I did, but I've gone back to what it was before ignored the error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah, this is good practice so thanks for doing that. But in this case I think its cleaner like this. |
||
socket: Socket[F], | ||
clientMode: Boolean, | ||
upgrade: fs2.io.Duplex => facade.tls.TLSSocket | ||
)(implicit F: Async[F]): Resource[F, TLSSocket[F]] = | ||
for { | ||
duplexOut <- mkDuplex(socket.reads) | ||
(duplex, out) = duplexOut | ||
_ <- out.through(socket.writes).compile.drain.background | ||
tlsSockReadable <- suspendReadableAndRead( | ||
destroyIfNotEnded = false, | ||
destroyIfCanceled = false | ||
)(upgrade(duplex)) | ||
tlsSockReadable <- socket match { | ||
case Socket.AsyncSocket(sock, _) if clientMode => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In theory the same logic should work for server sockets as well, but it results in a hang. I think this is connected to the pause or readable state of the socket vs duplex but I can't figure out how to unpause the socket at the right moment to allow things to flow. |
||
for { | ||
tlsSockReadable <- suspendReadableAndRead( | ||
destroyIfNotEnded = false, | ||
destroyIfCanceled = false | ||
)(upgrade(sock)) | ||
} yield tlsSockReadable | ||
case _ => | ||
for { | ||
duplexOut <- mkDuplex(socket.reads) | ||
(duplex, out) = duplexOut | ||
_ <- out.through(socket.writes).compile.drain.background | ||
tlsSockReadable <- suspendReadableAndRead( | ||
destroyIfNotEnded = false, | ||
destroyIfCanceled = false | ||
)(upgrade(duplex)) | ||
} yield tlsSockReadable | ||
} | ||
(tlsSock, readable) = tlsSockReadable | ||
readStream <- SuspendedStream(readable) | ||
} yield new AsyncTLSSocket( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking about this. Do we know what happens to this
SuspendedStream
after thesock
is directly upgraded?What I mean is that this
readStream
has already established listeners on thesock
. So if events are firing on those listeners assock
is being used, but nobody is consuming from theSuspendedStream
, then I am concerned that this is actually a memory leak.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question.
As far as I can tell the listeners are all on control events, rather than data, i.e. they trigger changes of behaviour rather than push any data anywhere.
The read loop boils down to stream pull -> readable.read(), so if the stream isn't being consumed then
read()
is never being called and there is no data to leak.This also means that we're relying on the Javascript runtime to propagate any error or close events, which I think is a reasonable assumption, but we could add additional listeners for them if we feel that it's needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, sure. But those triggers still invoke callbacks, that typically put things in
Queue
s. If they are not consumed, they accumulate and leak.fs2/io/js/src/main/scala/fs2/io/ioplatform.scala
Line 83 in 00fb259
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we're going to leak any data anywhere, but I guess we will end up with a channel full of
()
.The only
Queue
that I can see in the process issynchronous
so wont accumulate anything.It seems like it ought to be possible for that channel not to be a channel since it's only acting as an indicator that something can be read or not (that is we need to communicate that something can be read, but duplicate events prior to a read are meaningless)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sorry, this is what I meant by "memory leak". Not specifically a data leak, as in bytes of data.
This is a great point. I wonder if we can replace it with a
Signal
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is a misconception. Unless whoever is pushing to the
synchronous
queue respect backpressure and stops trying to push, the attempts to push will queue up unboundedly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes sorry, I actually thought that we wouldn't leak anything (including
()
). Had to reread the Node docs to correct myself about how the readable event actually works (I thought it was only fired once, rather than repeatedly).I'm happy to switch over to a
Signal
here but I wonder if it's worth it with #3348 now which should solve the issue as well. Don't know if you've got any preference for handling related PRs like these?I admit I'd assumed that the internals of fs2 would get that right 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is very confusing. While working on #3348 I discovered that the
readable
event may be fired multiple times in a row (honestly it seemed a bit buggy) withoutread()
s in between, which is why we needed a counter for readable events and not simply a boolean toggle ...I think if you merge #3348 into your PR, then my concerns about memory leaks would be addressed. But I need to think about this more. The "dangling" listeners are difficult to reason about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree that sounds like a bug. I thought that there wouldn't be a new
readable
event until you've done aread()
. I guess maybe it can half fill the internal buffer emitreadable
, then fill it and emit anotherreadable
? 🤷At the worst the second
read()
will just always returnnull
and then be flattened away immediately.Will do that merge 👍 We're happy running off snapshot builds for a while so we've got time to get it right