Skip to content

Commit

Permalink
peers: add message counter
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Jun 16, 2024
1 parent e64c8eb commit e7afe5a
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 10 deletions.
4 changes: 2 additions & 2 deletions CHECKLIST.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
- [x] Add new filters to the chain, verifying with the `FilterHash`
- [ ] Optimizations
- [x] Hashmap the `BlockHash` to `FilterHash` relationship in memory
- [ ] Persist SPKs that have already been proven to be in a filter
- [ ] Persist SPKs that have already been proven to be in a filter?

#### Main thread

Expand Down Expand Up @@ -81,7 +81,7 @@
- [x] Should serve CPF
- [ ] Set up "timer"
- [x] Check for DOS
- [ ] Message counter
- [x] Message counter
- [ ] `Ping` if peer has not been heard from
- [ ] `Disconnect` peers with high latency
- [ ] Add BIP-324 with V1 fallback
Expand Down
83 changes: 83 additions & 0 deletions src/peers/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Very simple denial of service protection so a peer cannot spam us with unsolicited messages.
#[derive(Debug, Clone)]
pub(crate) struct MessageCounter {
version: i8,
verack: i8,
header: i32,
filter_header: i32,
filters: i64,
addrs: i32,
block: i32,
}

impl MessageCounter {
pub(crate) fn new() -> Self {
Self {
version: 1,
verack: 1,
header: 0,
filter_header: 0,
filters: 0,
addrs: 0,
block: 0,
}
}

pub(crate) fn got_version(&mut self) {
self.version -= 1;
}

pub(crate) fn got_verack(&mut self) {
self.verack -= 1;
}

pub(crate) fn got_header(&mut self) {
self.header -= 1;
}

pub(crate) fn got_filter_header(&mut self) {
self.filter_header -= 1;
}

pub(crate) fn got_filter(&mut self) {
self.filters -= 1;
}

pub(crate) fn got_addrs(&mut self) {
self.addrs -= 1;
}

pub(crate) fn got_block(&mut self) {
self.block -= 1;
}

pub(crate) fn sent_header(&mut self) {
self.header += 1;
}

pub(crate) fn sent_filter_header(&mut self) {
self.filter_header += 1;
}

pub(crate) fn sent_filters(&mut self) {
self.filters += 1000;
}

pub(crate) fn sent_addrs(&mut self) {
self.addrs += 5;
}

pub(crate) fn sent_block(&mut self) {
self.block += 1;
}

pub(crate) fn unsolicited(&self) -> bool {
self.version < 0
|| self.header < 0
|| self.filters < 0
|| self.verack < 0
|| self.filter_header < 0
|| self.addrs < 0
|| self.block < 0
}
}
1 change: 1 addition & 0 deletions src/peers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod counter;
pub(crate) mod dns;
pub(crate) mod outbound_messages;
pub(crate) mod peer;
Expand Down
31 changes: 23 additions & 8 deletions src/peers/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
prelude::default_port_from_network,
};

use super::reader::Reader;
use super::{counter::MessageCounter, reader::Reader};

pub(crate) struct Peer {
nonce: u32,
Expand All @@ -25,12 +25,7 @@ pub(crate) struct Peer {
main_thread_sender: Sender<PeerThreadMessage>,
main_thread_recv: Receiver<MainThreadMessage>,
network: Network,
}

enum State {
NotConnected,
Connected,
Verack,
message_counter: MessageCounter,
}

impl Peer {
Expand All @@ -43,13 +38,15 @@ impl Peer {
main_thread_recv: Receiver<MainThreadMessage>,
) -> Self {
let default_port = default_port_from_network(&network);
let message_counter = MessageCounter::new();
Self {
nonce,
ip_addr,
port: port.unwrap_or(default_port),
main_thread_sender,
main_thread_recv,
network,
message_counter,
}
}

Expand Down Expand Up @@ -92,6 +89,10 @@ impl Peer {
if read_handle.is_finished() {
return Ok(());
}
if self.message_counter.unsolicited() {
println!("oh no");
return Ok(());
}
select! {
// The peer sent us a message
peer_message = rx.recv() => {
Expand Down Expand Up @@ -141,6 +142,7 @@ impl Peer {
) -> Result<(), PeerError> {
match message {
PeerMessage::Version(version) => {
self.message_counter.got_version();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand All @@ -160,6 +162,7 @@ impl Peer {
Ok(())
}
PeerMessage::Addr(addrs) => {
self.message_counter.got_addrs();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand All @@ -170,6 +173,7 @@ impl Peer {
Ok(())
}
PeerMessage::Headers(headers) => {
self.message_counter.got_header();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand All @@ -180,6 +184,7 @@ impl Peer {
Ok(())
}
PeerMessage::FilterHeaders(cf_headers) => {
self.message_counter.got_filter_header();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand All @@ -190,6 +195,7 @@ impl Peer {
Ok(())
}
PeerMessage::Filter(filter) => {
self.message_counter.got_filter();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand All @@ -200,6 +206,7 @@ impl Peer {
Ok(())
}
PeerMessage::Block(block) => {
self.message_counter.got_block();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand All @@ -219,7 +226,10 @@ impl Peer {
.map_err(|_| PeerError::ThreadChannel)?;
Ok(())
}
PeerMessage::Verack => Ok(()),
PeerMessage::Verack => {
self.message_counter.got_verack();
Ok(())
}
PeerMessage::Ping(nonce) => {
writer
.write_all(&message_generator.new_pong(nonce))
Expand Down Expand Up @@ -249,33 +259,38 @@ impl Peer {
) -> Result<(), PeerError> {
match request {
MainThreadMessage::GetAddr => {
self.message_counter.sent_addrs();
writer
.write_all(&message_generator.new_get_addr())
.await
.map_err(|_| PeerError::BufferWrite)?;
}
MainThreadMessage::GetHeaders(config) => {
self.message_counter.sent_header();
let message = message_generator.new_get_headers(config.locators, config.stop_hash);
writer
.write_all(&message)
.await
.map_err(|_| PeerError::BufferWrite)?;
}
MainThreadMessage::GetFilterHeaders(config) => {
self.message_counter.sent_filter_header();
let message = message_generator.new_cf_headers(config);
writer
.write_all(&message)
.await
.map_err(|_| PeerError::BufferWrite)?;
}
MainThreadMessage::GetFilters(config) => {
self.message_counter.sent_filters();
let message = message_generator.new_filters(config);
writer
.write_all(&message)
.await
.map_err(|_| PeerError::BufferWrite)?;
}
MainThreadMessage::GetBlock(message) => {
self.message_counter.sent_block();
let message = message_generator.new_block(message);
writer
.write_all(&message)
Expand Down

0 comments on commit e7afe5a

Please sign in to comment.