Skip to content

Commit

Permalink
fix(rpc): cleanup, docs, default perform rpc timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
anunaym14 committed Feb 21, 2025
1 parent cef14ed commit 7fbc9e8
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 40 deletions.
6 changes: 3 additions & 3 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,14 +585,14 @@ func (e *RTCEngine) handleDataPacket(msg webrtc.DataChannelMessage) {
}
case *livekit.DataPacket_RpcResponse:
if e.OnRpcResponse != nil {
if _, ok := msg.RpcResponse.Value.(*livekit.RpcResponse_Payload); ok {
switch msg.RpcResponse.Value.(type) {
case *livekit.RpcResponse_Payload:
e.OnRpcResponse(msg.RpcResponse.RequestId, &msg.RpcResponse.Value.(*livekit.RpcResponse_Payload).Payload, nil)
} else if _, ok := msg.RpcResponse.Value.(*livekit.RpcResponse_Error); ok {
case *livekit.RpcResponse_Error:
e.OnRpcResponse(msg.RpcResponse.RequestId, nil, fromProto(msg.RpcResponse.Value.(*livekit.RpcResponse_Error).Error))
}
}
}

}

func (e *RTCEngine) readDataPacket(msg webrtc.DataChannelMessage) (*livekit.DataPacket, error) {
Expand Down
5 changes: 0 additions & 5 deletions examples/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func performGreeting(room *lksdk.Room) {
DestinationIdentity: "greeter",
Method: "arrival",
Payload: "Hello",
ResponseTimeout: 10000 * time.Millisecond,
})

if err != nil {
Expand All @@ -51,7 +50,6 @@ func performDisconnection(room *lksdk.Room) {
DestinationIdentity: "greeter",
Method: "arrival",
Payload: "You still there?",
ResponseTimeout: 10000 * time.Millisecond,
})

if err != nil {
Expand Down Expand Up @@ -80,7 +78,6 @@ func performSquareRoot(room *lksdk.Room) {
DestinationIdentity: "math-genius",
Method: "square-root",
Payload: string(payload),
ResponseTimeout: 10000 * time.Millisecond,
})

if err != nil {
Expand Down Expand Up @@ -111,7 +108,6 @@ func performQuantumHypergeometricSeries(room *lksdk.Room) {
DestinationIdentity: "math-genius",
Method: "quantum-hypergeometric-series",
Payload: string(payload),
ResponseTimeout: 10000 * time.Millisecond,
})

if err != nil {
Expand Down Expand Up @@ -155,7 +151,6 @@ func performDivide(room *lksdk.Room) {
DestinationIdentity: "math-genius",
Method: "divide",
Payload: string(payload),
ResponseTimeout: 10000 * time.Millisecond,
})

if err != nil {
Expand Down
31 changes: 23 additions & 8 deletions localparticipant.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (p *LocalParticipant) handleParticipantDisconnected(identity string) {

p.rpcPendingResponses.Range(func(key, value interface{}) bool {
if value.(rpcPendingResponseHandler).participantIdentity == identity {
value.(rpcPendingResponseHandler).resolve(nil, RpcErrorFromBuiltInCodes(RpcRecipientDisconnected, nil))
value.(rpcPendingResponseHandler).resolve(nil, rpcErrorFromBuiltInCodes(RpcRecipientDisconnected, nil))
p.rpcPendingResponses.Delete(key)
}
return true
Expand All @@ -583,37 +583,52 @@ func (p *LocalParticipant) handleIncomingRpcResponse(requestId string, payload *
}
}

// TODO: fix default timeout
// Initiate an RPC call to a remote participant
// - @param params - For parameters for initiating the RPC call, see PerformRpcParams
// - @returns A string payload or an error
func (p *LocalParticipant) PerformRpc(params PerformRpcParams) (*string, error) {
responseTimeout := 10000 * time.Millisecond
if params.ResponseTimeout != nil {
responseTimeout = *params.ResponseTimeout
}

resultChan := make(chan *string, 1)
errorChan := make(chan error, 1)

maxRoundTripLatency := 2000 * time.Millisecond

go func() {
if byteLength(params.Payload) > MaxPayloadBytes {
errorChan <- RpcErrorFromBuiltInCodes(RpcRequestPayloadTooLarge, nil)
errorChan <- rpcErrorFromBuiltInCodes(RpcRequestPayloadTooLarge, nil)
return
}

if p.serverInfo != nil && compareVersions(p.serverInfo.Version, "1.8.0") < 0 {
errorChan <- RpcErrorFromBuiltInCodes(RpcUnsupportedServer, nil)
errorChan <- rpcErrorFromBuiltInCodes(RpcUnsupportedServer, nil)
return
}

id := uuid.New().String()
p.engine.publishRpcRequest(params.DestinationIdentity, id, params.Method, params.Payload, params.ResponseTimeout-maxRoundTripLatency)
p.engine.publishRpcRequest(params.DestinationIdentity, id, params.Method, params.Payload, responseTimeout-maxRoundTripLatency)

responseTimer := time.AfterFunc(params.ResponseTimeout, func() {
responseTimer := time.AfterFunc(responseTimeout, func() {
p.rpcPendingResponses.Delete(id)
errorChan <- RpcErrorFromBuiltInCodes(RpcResponseTimeout, nil)

select {
case errorChan <- rpcErrorFromBuiltInCodes(RpcResponseTimeout, nil):
default:
}
})

ackTimer := time.AfterFunc(maxRoundTripLatency, func() {
p.rpcPendingAcks.Delete(id)
errorChan <- RpcErrorFromBuiltInCodes(RpcConnectionTimeout, nil)
p.rpcPendingResponses.Delete(id)
responseTimer.Stop()

select {
case errorChan <- rpcErrorFromBuiltInCodes(RpcConnectionTimeout, nil):
default:
}
})

p.rpcPendingAcks.Store(id, rpcPendingAckHandler{
Expand Down
33 changes: 29 additions & 4 deletions room.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,29 @@ func (r *Room) getLocalParticipantSID() string {
return r.LocalParticipant.SID()
}

// Establishes the participant as a receiver for calls of the specified RPC method.
// Will overwrite any existing callback for the same method.
//
// - @param method - The name of the indicated RPC method
// - @param handler - Will be invoked when an RPC request for this method is received
// - @returns A promise that resolves when the method is successfully registered
//
// Example:
//
// room.LocalParticipant?.registerRpcMethod(
// "greet",
// func (data: RpcInvocationData) => {
// fmt.Println("Received greeting from ", data.callerIdentity, "with payload ", data.payload)
// return "Hello, " + data.callerIdentity + "!";
// }
// );
//
// The handler should return either a string or an error.
// If unable to respond within `responseTimeout`, the request will result in an error on the caller's side.
//
// You may throw errors of type `RpcError` with a string `message` in the handler,
// and they will be received on the caller's side with the message intact.
// Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error").
func (r *Room) RegisterRpcMethod(method string, handler RpcHandlerFunc) error {
_, ok := r.rpcHandlers.Load(method)
if ok {
Expand All @@ -943,6 +966,8 @@ func (r *Room) RegisterRpcMethod(method string, handler RpcHandlerFunc) error {
return nil
}

// Unregisters a previously registered RPC method.
// - @param method - The name of the RPC method to unregister
func (r *Room) UnregisterRpcMethod(method string) {
r.rpcHandlers.Delete(method)
}
Expand All @@ -951,13 +976,13 @@ func (r *Room) handleIncomingRpcRequest(callerIdentity, requestId, method, paylo
r.engine.publishRpcAck(callerIdentity, requestId)

if version != 1 {
r.engine.publishRpcResponse(callerIdentity, requestId, nil, RpcErrorFromBuiltInCodes(RpcUnsupportedVersion, nil))
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcUnsupportedVersion, nil))
return
}

handler, ok := r.rpcHandlers.Load(method)
if !ok {
r.engine.publishRpcResponse(callerIdentity, requestId, nil, RpcErrorFromBuiltInCodes(RpcUnsupportedMethod, nil))
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcUnsupportedMethod, nil))
return
}

Expand All @@ -973,13 +998,13 @@ func (r *Room) handleIncomingRpcRequest(callerIdentity, requestId, method, paylo
r.engine.publishRpcResponse(callerIdentity, requestId, nil, err.(*RpcError))
} else {
r.engine.log.Warnw("Uncaught error returned by RPC handler for method, using application error instead", err, "method", method)
r.engine.publishRpcResponse(callerIdentity, requestId, nil, RpcErrorFromBuiltInCodes(RpcApplicationError, nil))
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcApplicationError, nil))
}
return
}

if byteLength(response) > MaxDataBytes {
r.engine.publishRpcResponse(callerIdentity, requestId, nil, RpcErrorFromBuiltInCodes(RpcResponsePayloadTooLarge, nil))
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcResponsePayloadTooLarge, nil))
return
}

Expand Down
48 changes: 28 additions & 20 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
const (
MaxMessageBytes = 256
MaxDataBytes = 15360 // 15KiB

// Maximum payload size for RPC requests and responses. If a payload exceeds this size,
// the RPC call will fail with a RpcRequestPayloadTooLarge(1402) or RpcResponsePayloadTooLarge(1504) error.
MaxPayloadBytes = 15360 // 15KiB
)

Expand All @@ -47,11 +50,16 @@ var rpcErrorMessages = map[RpcErrorCode]string{
RpcUnsupportedVersion: "Unsupported RPC version",
}

// Parameters for initiating an RPC call
type PerformRpcParams struct {
// The identity of the destination participant
DestinationIdentity string
Method string
Payload string
ResponseTimeout time.Duration
// The name of the method to call
Method string
// The method payload
Payload string
// Timeout for receiving a response after initial connection. Default: 10000ms
ResponseTimeout *time.Duration
}

// Data passed to method handler for incoming RPC invocations
Expand All @@ -60,33 +68,32 @@ type RpcInvocationData struct {
RequestID string
// The unique participant identity of the caller.
CallerIdentity string
// The payload of the request. User-definable format, typically JSON.
// The payload of the request. User-definable format, could be JSON for example.
Payload string
// The maximum time the caller will wait for a response.
ResponseTimeout time.Duration
}

/**
* Specialized error handling for RPC methods.
*
* Instances of this type, when thrown in a method handler, will have their `message`
* serialized and sent across the wire. The sender will receive an equivalent error on the other side.
*
* Built-in types are included but developers may use any string, with a max length of 256 bytes.
*/
// Specialized error handling for RPC methods.
//
// Instances of this type, when thrown in a method handler, will have their `message`
// serialized and sent across the wire. The sender will receive an equivalent error on the other side.
//
// Built-in types are included but developers may use any string, with a max length of 256 bytes.
type RpcError struct {
Code RpcErrorCode
Message string
Data *string
}

/**
* Creates an error object with the given code and message, plus an optional data payload.
*
* If thrown in an RPC method handler, the error will be sent back to the caller.
*
* Error codes 1001-1999 are reserved for built-in errors.
*/
// Creates an error object with the given code and message, plus an optional data payload.
//
// If thrown in an RPC method handler, the error will be sent back to the caller.
//
// Error codes 1001-1999 are reserved for built-in errors.
//
// Maximum message length is 256 bytes, and maximum data payload length is 15KiB.
// If a payload exceeds these limits, it will be truncated.
func NewRpcError(code RpcErrorCode, message string, data *string) *RpcError {
err := &RpcError{Code: code, Message: truncateBytes(message, MaxMessageBytes)}

Expand Down Expand Up @@ -123,7 +130,8 @@ func (e *RpcError) Error() string {
return fmt.Sprintf("RpcError %d: %s", e.Code, e.Message)
}

func RpcErrorFromBuiltInCodes(code RpcErrorCode, data *string) *RpcError {
// Creates an error object with a built-in (or reserved) code and optional data payload.
func rpcErrorFromBuiltInCodes(code RpcErrorCode, data *string) *RpcError {
return NewRpcError(code, rpcErrorMessages[code], data)
}

Expand Down

0 comments on commit 7fbc9e8

Please sign in to comment.