Skip to content

Commit

Permalink
squash connect event
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed Feb 5, 2025
1 parent 9ef7b8a commit 324620e
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 204 deletions.
277 changes: 73 additions & 204 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ extern crate bitfield;
mod error;
pub mod ffi;
pub use error::{Status, StatusCode};
mod types;
pub use types::ConnectionEvent;

//
// The following starts the C interop layer of MsQuic API.
Expand Down Expand Up @@ -802,173 +804,6 @@ pub struct ListenerEvent {
pub payload: ListenerEventPayload,
}

pub type ConnectionEventType = u32;
pub const CONNECTION_EVENT_CONNECTED: ConnectionEventType = 0;
pub const CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: ConnectionEventType = 1;
pub const CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: ConnectionEventType = 2;
pub const CONNECTION_EVENT_SHUTDOWN_COMPLETE: ConnectionEventType = 3;
pub const CONNECTION_EVENT_LOCAL_ADDRESS_CHANGED: ConnectionEventType = 4;
pub const CONNECTION_EVENT_PEER_ADDRESS_CHANGED: ConnectionEventType = 5;
pub const CONNECTION_EVENT_PEER_STREAM_STARTED: ConnectionEventType = 6;
pub const CONNECTION_EVENT_STREAMS_AVAILABLE: ConnectionEventType = 7;
pub const CONNECTION_EVENT_PEER_NEEDS_STREAMS: ConnectionEventType = 8;
pub const CONNECTION_EVENT_IDEAL_PROCESSOR_CHANGED: ConnectionEventType = 9;
pub const CONNECTION_EVENT_DATAGRAM_STATE_CHANGED: ConnectionEventType = 10;
pub const CONNECTION_EVENT_DATAGRAM_RECEIVED: ConnectionEventType = 11;
pub const CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED: ConnectionEventType = 12;
pub const CONNECTION_EVENT_RESUMED: ConnectionEventType = 13;
pub const CONNECTION_EVENT_RESUMPTION_TICKET_RECEIVED: ConnectionEventType = 14;
pub const CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED: ConnectionEventType = 15;

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventConnected {
pub session_resumed: BOOLEAN,
pub negotiated_alpn_length: u8,
pub negotiated_alpn: *const u8,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventConnectionShutdownByTransport {
pub status: u32,
pub error_code: u62,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventConnectionShutdownByPeer {
pub error_code: u62,
}

bitfield! {
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct ConnectionEventShutdownCompleteBitfields(BOOLEAN);
// The fields default to BOOLEAN
pub handshake_completed, _: 0, 0;
pub peer_acknowledged_shutdown, _: 1, 1;
pub app_close_in_progress, _: 2, 2;
_reserved, _: 7, 3;
}

#[repr(C, packed)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventShutdownComplete {
pub bit_flags: ConnectionEventShutdownCompleteBitfields,
}

#[repr(C, packed)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventLocalAddressChanged {
pub address: *const Addr,
}

#[repr(C, packed)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventPeerAddressChanged {
pub address: *const Addr,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventPeerStreamStarted {
pub stream: HQUIC,
pub flags: StreamOpenFlags,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventStreamsAvailable {
pub bidirectional_count: u16,
pub unidirectional_count: u16,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventPeerNeedsStreams {
pub bidirectional: BOOLEAN,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventIdealProcessorChanged {
pub ideal_processor: u16,
pub partition_index: u16,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventDatagramStateChanged {
pub send_enabled: BOOLEAN,
pub max_send_length: u16,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventDatagramReceived {
pub buffer: *const Buffer,
pub flags: ReceiveFlags,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventDatagramSendStateChanged {
pub client_context: *const c_void,
pub state: DatagramSendState,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventResumed {
pub resumption_state_length: u16,
pub resumption_state: *const u8,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventResumptionTicketReceived {
pub resumption_ticket_length: u32,
pub resumption_ticket: *const u8,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct ConnectionEventPeerCertificateReceived {
pub certificate: *const Certificate,
pub deferred_error_flags: u32,
pub deferred_status: u32,
pub chain: *const CertificateChain,
}

#[repr(C)]
#[derive(Copy, Clone)]
pub union ConnectionEventPayload {
pub connected: ConnectionEventConnected,
pub shutdown_initiated_by_transport: ConnectionEventConnectionShutdownByTransport,
pub shutdown_initiated_by_peer: ConnectionEventConnectionShutdownByPeer,
pub shutdown_complete: ConnectionEventShutdownComplete,
pub local_address_changed: ConnectionEventLocalAddressChanged,
pub peer_address_changed: ConnectionEventPeerAddressChanged,
pub peer_stream_started: ConnectionEventPeerStreamStarted,
pub streams_available: ConnectionEventStreamsAvailable,
pub peer_needs_streams: ConnectionEventPeerNeedsStreams,
pub ideal_processor_changed: ConnectionEventIdealProcessorChanged,
pub datagram_state_changed: ConnectionEventDatagramStateChanged,
pub datagram_received: ConnectionEventDatagramReceived,
pub datagram_send_state_changed: ConnectionEventDatagramSendStateChanged,
pub resumed: ConnectionEventResumed,
pub resumption_ticket_received: ConnectionEventResumptionTicketReceived,
pub peer_certificated_received: ConnectionEventPeerCertificateReceived,
}

#[repr(C)]
#[derive(Copy, Clone)]
pub struct ConnectionEvent {
pub event_type: ConnectionEventType,
pub payload: ConnectionEventPayload,
}

pub type StreamEventType = u32;
pub const STREAM_EVENT_START_COMPLETE: StreamEventType = 0;
pub const STREAM_EVENT_RECEIVE: StreamEventType = 1;
Expand Down Expand Up @@ -1178,6 +1013,11 @@ pub struct Stream {
unsafe impl Sync for Stream {}
unsafe impl Send for Stream {}

/// Same as Stream but does not own the handle.
/// Only used in callback wrapping where handle
/// should not be closed by default.
pub struct StreamRef(Stream);

impl From<&str> for Buffer {
fn from(data: &str) -> Buffer {
Buffer {
Expand Down Expand Up @@ -1587,18 +1427,6 @@ impl Connection {
};
}

/// # Safety
/// handler and context must be valid
/// TODO: handler needs to be Fn type.
pub unsafe fn set_stream_callback_handler(
&self,
stream_handle: HQUIC,
handler: StreamEventHandler,
context: *const c_void,
) {
unsafe { Api::set_callback_handler(stream_handle, handler as *const c_void, context) };
}

pub fn datagram_send(
&self,
buffer: &Buffer,
Expand Down Expand Up @@ -1839,6 +1667,29 @@ impl Stream {

define_quic_handle_impl!(Stream);

impl StreamRef {
/// For internal use only.
pub(crate) unsafe fn from_raw(handle: HQUIC) -> Self {
Self(Stream { handle })
}
}

impl Drop for StreamRef {
fn drop(&mut self) {
// clear the handle to prevent auto close.
self.0.handle = std::ptr::null_mut()
}
}

/// Make inner stream accessile
impl std::ops::Deref for StreamRef {
type Target = Stream;

fn deref(&self) -> &Self::Target {
&self.0
}
}

#[cfg(test)]
mod tests {

Expand All @@ -1852,7 +1703,7 @@ mod tests {
use crate::ffi::{HQUIC, QUIC_STATUS};
use crate::{
ffi, Buffer, Configuration, Connection, ConnectionEvent, CredentialConfig, Registration,
Settings, Stream, StreamEvent,
Settings, StatusCode, Stream, StreamEvent,
};

extern "C" fn test_conn_callback(
Expand All @@ -1861,37 +1712,55 @@ mod tests {
event: *mut ffi::QUIC_CONNECTION_EVENT,
) -> QUIC_STATUS {
let connection = unsafe { &*(context as *const Connection) };
let event = unsafe { (event as *const ConnectionEvent).as_ref().unwrap() };
match event.event_type {
crate::CONNECTION_EVENT_CONNECTED => {
let ev_ref = unsafe { event.as_ref().unwrap() };
let event = ConnectionEvent::from(ev_ref);
match event {
ConnectionEvent::Connected {
session_resumed,
negotiated_alpn,
} => {
let local_addr = connection.get_local_addr().unwrap().as_socket().unwrap();
let remote_addr = connection.get_remote_addr().unwrap().as_socket().unwrap();
println!("Connected({}, {})", local_addr, remote_addr);
let alpn = String::from_utf8_lossy(negotiated_alpn);
println!("Connected({local_addr}, {remote_addr}), session_resumed:{session_resumed}, negotiated_alpn:{alpn}");
}
crate::CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT => {
println!("Transport shutdown 0x{:x}", unsafe {
event.payload.shutdown_initiated_by_transport.status
})
ConnectionEvent::ShutdownInitiatedByTransport { status, error_code } => {
println!("Transport shutdown {status}, {error_code}")
}
crate::CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER => {
println!("App shutdown {}", unsafe {
event.payload.shutdown_initiated_by_peer.error_code
})
ConnectionEvent::ShutdownInitiatedByPeer { error_code } => {
println!("App shutdown {error_code}")
}
crate::CONNECTION_EVENT_SHUTDOWN_COMPLETE => println!("Shutdown complete"),
crate::CONNECTION_EVENT_PEER_STREAM_STARTED => {
println!("Peer stream started");
unsafe {
connection.set_stream_callback_handler(
event.payload.peer_stream_started.stream as HQUIC,
test_stream_callback,
context,
);
}
ConnectionEvent::ShutdownComplete {
handshake_completed,
peer_acknowledged_shutdown,
app_close_in_progress,
} => {
println!("Shutdown complete: {handshake_completed}, {peer_acknowledged_shutdown}, {app_close_in_progress}")
}
_ => println!("Other callback {}", event.event_type),
ConnectionEvent::LocalAddressChanged { address } => {
println!("Local address changed: {:?}", address.as_socket().unwrap())
}
ConnectionEvent::PeerAddressChanged { address } => {
println!("Peer address changed: {:?}", address.as_socket().unwrap())
}
ConnectionEvent::PeerStreamStarted { stream, flags } => {
println!("Peer stream started: flags: {flags}");
unsafe { stream.set_callback_handler(test_stream_callback, context) };
}
ConnectionEvent::StreamsAvailable {
bidirectional_count,
unidirectional_count,
} => {
println!(
"Streams available: bi: {bidirectional_count}, uni: {unidirectional_count}"
)
}
ConnectionEvent::PeerNeedsStreams { bidirectional } => {
println!("Peer needs streams: bi: {bidirectional}");
}
_ => println!("Connection other callback {}", ev_ref.Type),
}
0
StatusCode::QUIC_STATUS_SUCCESS.into()
}

extern "C" fn test_stream_callback(
Expand Down
Loading

0 comments on commit 324620e

Please sign in to comment.