Skip to content

Commit

Permalink
Merge pull request #10 from Rhahi/async_stream
Browse files Browse the repository at this point in the history
Enable asynchronous usage of KRPC
  • Loading branch information
BenChung authored Oct 17, 2022
2 parents dbf32b6 + 064c7f9 commit 5f98358
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 12 deletions.
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ LightXML = "9c8b4983-aa76-5018-a973-4c85ecc9e179"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
ProtoBuf = "3349acd9-ac6a-5e09-bcdb-63829b23a429"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

[compat]
julia = "1.6"
LightXML = "^0.9"
MacroTools = "^0.5"
ProtoBuf = "^0.11"
julia = "1.6"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Expand Down
13 changes: 12 additions & 1 deletion src/KRPC.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module KRPC
using Sockets
using ProtoBuf
using LightXML
using UUIDs
import MacroTools

include("proto/krpc.jl")
Expand All @@ -16,6 +17,7 @@ struct Listener{T}
connection::T
current_value
channel::Channel
uuid::UUID
end

"""
Expand All @@ -27,8 +29,17 @@ mutable struct KRPCConnection
identifier::Array{UInt8, 1}

str_listener::Union{Nothing, Task}
listeners::Dict{UInt64, Listener}
one_to_many::Dict{UInt64, Array{UUID, 1}}
listeners::Dict{UUID, Listener}
active::Channel
semaphore::Base.Semaphore

function KRPCConnection(conn::TCPSocket, stream_conn::TCPSocket, identifier::Array{UInt8, 1}, active::Channel)
new(
conn, stream_conn, identifier,
Nothing(), Dict{UInt8, Array{UUID, 1}}(), Dict{UUID, Listener}(), active, Base.Semaphore(1)
)
end
end

function Base.show(io::IO, conn::KRPCConnection)
Expand Down
2 changes: 1 addition & 1 deletion src/connection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function kerbal_connect(client_name::String, host::String="localhost", port::Int
connect_or_error(str_conn, krpc.schema.ConnectionRequest(client_identifier=resp.client_identifier,_type=krpc.schema.ConnectionRequest_Type.STREAM))

active = Channel(0)
conn = KRPCConnection(conn, str_conn, resp.client_identifier, Nothing(), Dict{UInt8, Listener}(), active)
conn = KRPCConnection(conn, str_conn, resp.client_identifier, active)
conn.str_listener = @async stream_listener(conn)
bind(active, conn.str_listener)

Expand Down
2 changes: 2 additions & 0 deletions src/raw.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ function RecvRawProto(inp::IO)
end

function SendBiMessage(conn::KRPCConnection, req::KRPC.krpc.schema.Request)
Base.acquire(conn.semaphore)
iob = PipeBuffer()
SendRawProto(conn.conn, req)
res = readproto(RecvRawProto(conn.conn), krpc.schema.Response())

if hasproperty(res, :error)
throw(make_error(res.error))
end
Base.release(conn.semaphore)
return res
end

Expand Down
34 changes: 25 additions & 9 deletions src/streams.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,16 @@ function add_stream(conn::KRPCConnection, calls::T) where {K, T<:Tuple{Vararg{RT
default_value = Any[Nothing() for i=1:K]
out = Channel{Vector}(0)
bind(out, conn.str_listener)
listener = Listener(request_map, conn, default_value, out)
setindex!.((conn.listeners, ), (listener, ), getproperty.(handles, :id))
uuid = uuid4()
listener = Listener(request_map, conn, default_value, out, uuid)
conn.listeners[uuid] = listener
for id in getproperty.(handles, :id)
if id in keys(conn.one_to_many)
push!(conn.one_to_many[id], uuid)
else
conn.one_to_many[id] = [uuid]
end
end
return listener
end

Expand Down Expand Up @@ -71,9 +79,15 @@ function Base.close(channel::Listener)
if !(id in keys(channel.streams))
error("Attempted to remove unbound stream. Check if the stream has already been removed.")
end
delete!(channel.connection.listeners, id)
push!(req, RemoveStream_Phantom(id))
lmap = channel.connection.one_to_many[id]
index = findfirst(x -> x==channel.uuid, lmap)
deleteat!(lmap, index)
if length(lmap) == 0
delete!(channel.connection.one_to_many, id)
push!(req, RemoveStream_Phantom(id))
end
end
delete!(channel.connection.listeners, channel.uuid)
kerbal(channel.connection, (req..., ))
end

Expand Down Expand Up @@ -117,13 +131,15 @@ function stream_listener(conn::KRPCConnection)
while (!isready(conn.active) || take!(conn.active))
updated = Set{Listener}()
data = readproto(RecvRawProto(conn.stream_conn), krpc.schema.StreamUpdate())
for result in data.results
if !(result.id in keys(conn.listeners))
for result in data.results
if !(result.id in keys(conn.one_to_many))
continue
end
listener = conn.listeners[result.id]
update_value(listener, result.id, result.result.value)
push!(updated, listener)
for uuid in conn.one_to_many[result.id]
listener = conn.listeners[uuid]
update_value(listener, result.id, result.result.value)
push!(updated, listener)
end
end
send_current_values.(updated)
end
Expand Down

0 comments on commit 5f98358

Please sign in to comment.