Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport_service: Improve connection stability by downgrading connections on substream inactivity #260

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8495a9b
transport_service: Add logs when the receiver is closed
lexnv Sep 25, 2024
70511ae
tcp/connection: Move handling of yamux substream to dedicated function
lexnv Sep 25, 2024
94ac0f5
tcp/connection: Move handling of negotiated substreams to dedicated fn
lexnv Sep 25, 2024
2b87e86
tcp/connection: Move handling of protocol commands to dedicated fn
lexnv Sep 25, 2024
2f917e1
tcp: Add endpoint to logs
lexnv Sep 25, 2024
50cbd34
tcp: Adjust handler fns to return Result<bool>
lexnv Sep 25, 2024
0004aea
protocol-set: Extend OpenSubstream with connection ID
lexnv Sep 27, 2024
0071b71
transport-service: Add keepalive timeout tracker for connections
lexnv Sep 27, 2024
4e20728
transport-service: Use connection tracker
lexnv Sep 27, 2024
e2ad0c7
tests: Adjust to new interface and downgrade a few logs
lexnv Sep 27, 2024
a327ac4
transport-service/tests: Check connections are downgraded
lexnv Sep 30, 2024
67cc183
transport-service/tests: Check substream opening resets keep alive
lexnv Sep 30, 2024
9ad7f7b
transport-service/tests: Ensure service is polled enough times
lexnv Sep 30, 2024
30ba5be
tcp: TcpConnection no longer needs the connection ID field
lexnv Sep 30, 2024
e32b925
Merge remote-tracking branch 'origin/master' into lexnv/stable-connec…
lexnv Oct 1, 2024
177cc92
transport-service/tests: Test assumptions for dropping connections on
lexnv Sep 30, 2024
52cd146
transport-service/tests: Extend test for multiple substreams being
lexnv Sep 30, 2024
35c67f0
transport-service: Simplify keep alive hashmap tracking
lexnv Sep 30, 2024
38665fd
transport-service: Replace FuturesUnordered with more efficient
lexnv Sep 30, 2024
71571a8
transport-service: Upgrade connection on substream activity
lexnv Sep 30, 2024
b28aa67
transport_service: Ensure proper ordering
lexnv Oct 1, 2024
d3058eb
transport_service: Break down logic to multiple fns
lexnv Oct 1, 2024
fa9ab26
transport-service/tests: Keep track of connection ordering
lexnv Oct 3, 2024
eedad30
transport-service: Remove next stale connections
lexnv Oct 3, 2024
4aebc0d
transport-service/tests: Ensure the connection is upgraded
lexnv Oct 3, 2024
ff13252
transport-service/tests: Ensure next stale connections are removed
lexnv Oct 3, 2024
7ff145a
transport-service/tests: Ensure downgrading works after timeout
lexnv Oct 3, 2024
5dfdc69
transport_service: Simplify keep alive tracker by using futuresunordered
lexnv Oct 28, 2024
bb889df
transport_service: Populate futures on substream activity
lexnv Oct 28, 2024
e0c66b7
transport_service: Wake the cx.waker on filtering out Poll::Ready events
lexnv Oct 28, 2024
ee21a8d
Merge remote-tracking branch 'origin/master' into lexnv/stable-connec…
lexnv Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ impl ConnectionHandle {
}
}

/// Try to upgrade the connection to active state.
pub fn try_open(&mut self) {
if let ConnectionType::Inactive(inactive) = &self.connection {
if let Some(active) = inactive.upgrade() {
self.connection = ConnectionType::Active(active);
}
}
}

/// Attempt to acquire permit which will keep the connection open for indefinite time.
pub fn try_get_permit(&self) -> Option<Permit> {
match &self.connection {
Expand All @@ -120,6 +129,7 @@ impl ConnectionHandle {
protocol: protocol.clone(),
fallback_names,
substream_id,
connection_id: self.connection_id.clone(),
permit,
})
.map_err(|error| match error {
Expand All @@ -141,10 +151,15 @@ impl ConnectionHandle {
TrySendError::Closed(_) => Error::ConnectionClosed,
})
}

/// Check if the connection is active.
pub fn is_active(&self) -> bool {
matches!(self.connection, ConnectionType::Active(_))
}
}

/// Type which allows the connection to be kept open.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Permit {
/// Active connection.
_connection: Sender<ProtocolCommand>,
Expand Down
2 changes: 2 additions & 0 deletions src/protocol/notification/tests/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ async fn remote_opens_multiple_inbound_substreams() {
SubstreamId::from(0usize),
Box::new(DummySubstream::new()),
),
connection_id: ConnectionId::from(0usize),
})
.await
.unwrap();
Expand Down Expand Up @@ -511,6 +512,7 @@ async fn remote_opens_multiple_inbound_substreams() {
SubstreamId::from(0usize),
Box::new(substream),
),
connection_id: ConnectionId::from(0usize),
})
.await
.unwrap();
Expand Down
10 changes: 9 additions & 1 deletion src/protocol/protocol_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ pub enum InnerTransportEvent {
/// distinguish between different outbound substreams.
direction: Direction,

/// Connection ID.
connection_id: ConnectionId,

/// Substream.
substream: Substream,
},
Expand Down Expand Up @@ -149,6 +152,7 @@ impl From<InnerTransportEvent> for TransportEvent {
fallback,
direction,
substream,
..
} => TransportEvent::SubstreamOpened {
peer,
protocol,
Expand All @@ -164,7 +168,7 @@ impl From<InnerTransportEvent> for TransportEvent {
}

/// Events emitted by the installed protocols to transport.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ProtocolCommand {
/// Open substream.
OpenSubstream {
Expand Down Expand Up @@ -192,6 +196,9 @@ pub enum ProtocolCommand {
/// and associate incoming substreams with whatever logic it has.
substream_id: SubstreamId,

/// Connection ID.
connection_id: ConnectionId,

/// Connection permit.
///
/// `Permit` allows the connection to be kept open while the permit is held and it is given
Expand Down Expand Up @@ -300,6 +307,7 @@ impl ProtocolSet {
fallback,
direction,
substream,
connection_id: self.connection.connection_id().clone(),
};

protocol_context
Expand Down
Loading