Skip to content

Commit

Permalink
implement semaphore for SendBiMessage
Browse files Browse the repository at this point in the history
During async applications, when multiple SendBiMessage requests are
made while one is not completely resolved, KRPC hangs or errors.
This change makes sure that the exchange is complete before accepting new one.
  • Loading branch information
Rhahi committed Oct 17, 2022
1 parent 1a07e16 commit 064c7f9
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
8 changes: 8 additions & 0 deletions src/KRPC.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ mutable struct KRPCConnection
one_to_many::Dict{UInt64, Array{UUID, 1}}
listeners::Dict{UUID, Listener}
active::Channel
semaphore::Base.Semaphore

function KRPCConnection(conn::TCPSocket, stream_conn::TCPSocket, identifier::Array{UInt8, 1}, active::Channel)
new(
conn, stream_conn, identifier,
Nothing(), Dict{UInt8, Array{UUID, 1}}(), Dict{UUID, Listener}(), active, Base.Semaphore(1)
)
end
end

function Base.show(io::IO, conn::KRPCConnection)
Expand Down
2 changes: 1 addition & 1 deletion src/connection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function kerbal_connect(client_name::String, host::String="localhost", port::Int
connect_or_error(str_conn, krpc.schema.ConnectionRequest(client_identifier=resp.client_identifier,_type=krpc.schema.ConnectionRequest_Type.STREAM))

active = Channel(0)
conn = KRPCConnection(conn, str_conn, resp.client_identifier, Nothing(), Dict{UInt8, Array{UUID, 1}}(), Dict{UUID, Listener}(), active)
conn = KRPCConnection(conn, str_conn, resp.client_identifier, active)
conn.str_listener = @async stream_listener(conn)
bind(active, conn.str_listener)

Expand Down
2 changes: 2 additions & 0 deletions src/raw.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ function RecvRawProto(inp::IO)
end

function SendBiMessage(conn::KRPCConnection, req::KRPC.krpc.schema.Request)
Base.acquire(conn.semaphore)
iob = PipeBuffer()
SendRawProto(conn.conn, req)
res = readproto(RecvRawProto(conn.conn), krpc.schema.Response())

if hasproperty(res, :error)
throw(make_error(res.error))
end
Base.release(conn.semaphore)
return res
end

Expand Down

0 comments on commit 064c7f9

Please sign in to comment.