Skip to content

Commit

Permalink
merge bitcoin#28165: transport abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed Aug 27, 2024
1 parent debc6b1 commit cafef26
Show file tree
Hide file tree
Showing 8 changed files with 578 additions and 144 deletions.
2 changes: 1 addition & 1 deletion src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-listenonion", strprintf("Automatically create Tor onion service (default: %d)", DEFAULT_LISTEN_ONION), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxconnections=<n>", strprintf("Maintain at most <n> connections to peers (temporary service connections excluded) (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxreceivebuffer=<n>", strprintf("Maximum per-connection receive buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXRECEIVEBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxsendbuffer=<n>", strprintf("Maximum per-connection send buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxsendbuffer=<n>", strprintf("Maximum per-connection memory usage for the send buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxtimeadjustment", strprintf("Maximum allowed median peer time offset adjustment. Local perspective of time may be influenced by peers forward or backward by this amount. (default: %u seconds)", DEFAULT_MAX_TIME_ADJUSTMENT), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxuploadtarget=<n>", strprintf("Tries to keep outbound traffic under the given target (in MiB per 24h). Limit does not apply to peers with 'download' permission. 0 = no limit (default: %d)", DEFAULT_MAX_UPLOAD_TARGET), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-onion=<ip:port>", "Use separate SOCKS5 proxy to reach peers via Tor onion services, set -noonion to disable (default: -proxy)", ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
Expand Down
209 changes: 150 additions & 59 deletions src/net.cpp

Large diffs are not rendered by default.

203 changes: 139 additions & 64 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ struct CSerializedNetMsg {

std::vector<unsigned char> data;
std::string m_type;

/** Compute total memory usage of this object (own memory + any dynamic memory). */
size_t GetMemoryUsage() const noexcept;
};

/** Different types of connections to a peer. This enum encapsulates the
Expand Down Expand Up @@ -350,42 +353,105 @@ class CNetMessage {
}
};

/** The TransportDeserializer takes care of holding and deserializing the
* network receive buffer. It can deserialize the network buffer into a
* transport protocol agnostic CNetMessage (message type & payload)
*/
class TransportDeserializer {
/** The Transport converts one connection's sent messages to wire bytes, and received bytes back. */
class Transport {
public:
// returns true if the current deserialization is complete
virtual bool Complete() const = 0;
// set the serialization context version
virtual void SetVersion(int version) = 0;
/** read and deserialize data, advances msg_bytes data pointer */
virtual int Read(Span<const uint8_t>& msg_bytes) = 0;
// decomposes a message from the context
virtual CNetMessage GetMessage(std::chrono::microseconds time, bool& reject_message) = 0;
virtual ~TransportDeserializer() {}
virtual ~Transport() {}

// 1. Receiver side functions, for decoding bytes received on the wire into transport protocol
// agnostic CNetMessage (message type & payload) objects.

/** Returns true if the current message is complete (so GetReceivedMessage can be called). */
virtual bool ReceivedMessageComplete() const = 0;
/** Set the deserialization context version for objects returned by GetReceivedMessage. */
virtual void SetReceiveVersion(int version) = 0;

/** Feed wire bytes to the transport.
*
* @return false if some bytes were invalid, in which case the transport can't be used anymore.
*
* Consumed bytes are chopped off the front of msg_bytes.
*/
virtual bool ReceivedBytes(Span<const uint8_t>& msg_bytes) = 0;

/** Retrieve a completed message from transport.
*
* This can only be called when ReceivedMessageComplete() is true.
*
* If reject_message=true is returned the message itself is invalid, but (other than false
* returned by ReceivedBytes) the transport is not in an inconsistent state.
*/
virtual CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) = 0;

// 2. Sending side functions, for converting messages into bytes to be sent over the wire.

/** Set the next message to send.
*
* If no message can currently be set (perhaps because the previous one is not yet done being
* sent), returns false, and msg will be unmodified. Otherwise msg is enqueued (and
* possibly moved-from) and true is returned.
*/
virtual bool SetMessageToSend(CSerializedNetMsg& msg) noexcept = 0;

/** Return type for GetBytesToSend, consisting of:
* - Span<const uint8_t> to_send: span of bytes to be sent over the wire (possibly empty).
* - bool more: whether there will be more bytes to be sent after the ones in to_send are
* all sent (as signaled by MarkBytesSent()).
* - const std::string& m_type: message type on behalf of which this is being sent.
*/
using BytesToSend = std::tuple<
Span<const uint8_t> /*to_send*/,
bool /*more*/,
const std::string& /*m_type*/
>;

/** Get bytes to send on the wire.
*
* As a const function, it does not modify the transport's observable state, and is thus safe
* to be called multiple times.
*
* The bytes returned by this function act as a stream which can only be appended to. This
* means that with the exception of MarkBytesSent, operations on the transport can only append
* to what is being returned.
*
* Note that m_type and to_send refer to data that is internal to the transport, and calling
* any non-const function on this object may invalidate them.
*/
virtual BytesToSend GetBytesToSend() const noexcept = 0;

/** Report how many bytes returned by the last GetBytesToSend() have been sent.
*
* bytes_sent cannot exceed to_send.size() of the last GetBytesToSend() result.
*
* If bytes_sent=0, this call has no effect.
*/
virtual void MarkBytesSent(size_t bytes_sent) noexcept = 0;

/** Return the memory usage of this transport attributable to buffered data to send. */
virtual size_t GetSendMemoryUsage() const noexcept = 0;
};

class V1TransportDeserializer final : public TransportDeserializer
class V1Transport final : public Transport
{
private:
const CChainParams& m_chain_params;
CMessageHeader::MessageStartChars m_magic_bytes;
const NodeId m_node_id; // Only for logging
mutable CHash256 hasher;
mutable uint256 data_hash;
bool in_data; // parsing header (false) or data (true)
CDataStream hdrbuf; // partially received header
CMessageHeader hdr; // complete header
CDataStream vRecv; // received message data
unsigned int nHdrPos;
unsigned int nDataPos;

const uint256& GetMessageHash() const;
int readHeader(Span<const uint8_t> msg_bytes);
int readData(Span<const uint8_t> msg_bytes);

void Reset() {
mutable Mutex m_recv_mutex; //!< Lock for receive state
mutable CHash256 hasher GUARDED_BY(m_recv_mutex);
mutable uint256 data_hash GUARDED_BY(m_recv_mutex);
bool in_data GUARDED_BY(m_recv_mutex); // parsing header (false) or data (true)
CDataStream hdrbuf GUARDED_BY(m_recv_mutex); // partially received header
CMessageHeader hdr GUARDED_BY(m_recv_mutex); // complete header
CDataStream vRecv GUARDED_BY(m_recv_mutex); // received message data
unsigned int nHdrPos GUARDED_BY(m_recv_mutex);
unsigned int nDataPos GUARDED_BY(m_recv_mutex);

const uint256& GetMessageHash() const EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
int readHeader(Span<const uint8_t> msg_bytes) EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
int readData(Span<const uint8_t> msg_bytes) EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);

void Reset() EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex) {
AssertLockHeld(m_recv_mutex);
vRecv.clear();
hdrbuf.clear();
hdrbuf.resize(24);
Expand All @@ -396,52 +462,60 @@ class V1TransportDeserializer final : public TransportDeserializer
hasher.Reset();
}

public:
V1TransportDeserializer(const CChainParams& chain_params, const NodeId node_id, int nTypeIn, int nVersionIn)
: m_chain_params(chain_params),
m_node_id(node_id),
hdrbuf(nTypeIn, nVersionIn),
vRecv(nTypeIn, nVersionIn)
bool CompleteInternal() const noexcept EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex)
{
Reset();
AssertLockHeld(m_recv_mutex);
if (!in_data) return false;
return hdr.nMessageSize == nDataPos;
}

bool Complete() const override
/** Lock for sending state. */
mutable Mutex m_send_mutex;
/** The header of the message currently being sent. */
std::vector<uint8_t> m_header_to_send GUARDED_BY(m_send_mutex);
/** The data of the message currently being sent. */
CSerializedNetMsg m_message_to_send GUARDED_BY(m_send_mutex);
/** Whether we're currently sending header bytes or message bytes. */
bool m_sending_header GUARDED_BY(m_send_mutex) {false};
/** How many bytes have been sent so far (from m_header_to_send, or from m_message_to_send.data). */
size_t m_bytes_sent GUARDED_BY(m_send_mutex) {0};

public:
V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept;

bool ReceivedMessageComplete() const override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
{
if (!in_data)
return false;
return (hdr.nMessageSize == nDataPos);
AssertLockNotHeld(m_recv_mutex);
return WITH_LOCK(m_recv_mutex, return CompleteInternal());
}
void SetVersion(int nVersionIn) override

void SetReceiveVersion(int nVersionIn) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
{
AssertLockNotHeld(m_recv_mutex);
LOCK(m_recv_mutex);
hdrbuf.SetVersion(nVersionIn);
vRecv.SetVersion(nVersionIn);
}
int Read(Span<const uint8_t>& msg_bytes) override

bool ReceivedBytes(Span<const uint8_t>& msg_bytes) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
{
AssertLockNotHeld(m_recv_mutex);
LOCK(m_recv_mutex);
int ret = in_data ? readData(msg_bytes) : readHeader(msg_bytes);
if (ret < 0) {
Reset();
} else {
msg_bytes = msg_bytes.subspan(ret);
}
return ret;
return ret >= 0;
}
CNetMessage GetMessage(std::chrono::microseconds time, bool& reject_message) override;
};

/** The TransportSerializer prepares messages for the network transport
*/
class TransportSerializer {
public:
// prepare message for transport (header construction, error-correction computation, payload encryption, etc.)
virtual void prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const = 0;
virtual ~TransportSerializer() {}
};
CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex);

class V1TransportSerializer : public TransportSerializer {
public:
void prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const override;
bool SetMessageToSend(CSerializedNetMsg& msg) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
BytesToSend GetBytesToSend() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
};

/** Information about a peer */
Expand All @@ -451,8 +525,9 @@ class CNode
friend struct ConnmanTestMsg;

public:
const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread
const std::unique_ptr<const TransportSerializer> m_serializer;
/** Transport serializer/deserializer. The receive side functions are only called under cs_vRecv, while
* the sending side functions are only called under cs_vSend. */
const std::unique_ptr<Transport> m_transport;

NetPermissionFlags m_permissionFlags{NetPermissionFlags::None}; // treated as const outside of fuzz tester

Expand All @@ -466,12 +541,12 @@ class CNode
*/
std::shared_ptr<Sock> m_sock GUARDED_BY(m_sock_mutex);

/** Total size of all vSendMsg entries */
size_t nSendSize GUARDED_BY(cs_vSend){0};
/** Offset inside the first vSendMsg already sent */
size_t nSendOffset GUARDED_BY(cs_vSend){0};
/** Sum of GetMemoryUsage of all vSendMsg entries. */
size_t m_send_memusage GUARDED_BY(cs_vSend){0};
/** Total number of bytes sent on the wire to this peer. */
uint64_t nSendBytes GUARDED_BY(cs_vSend){0};
std::deque<std::vector<unsigned char>> vSendMsg GUARDED_BY(cs_vSend);
/** Messages still to be fed to m_transport->SetMessageToSend. */
std::deque<CSerializedNetMsg> vSendMsg GUARDED_BY(cs_vSend);
std::atomic<size_t> nSendMsgSize{0};
Mutex cs_vSend;
Mutex m_sock_mutex;
Expand Down
7 changes: 5 additions & 2 deletions src/test/denialofservice_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
{
LOCK(dummyNode1.cs_vSend);
BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
dummyNode1.vSendMsg.clear();
dummyNode1.nSendMsgSize = 0;
}
connman.FlushSendBuffer(dummyNode1);
{
LOCK(dummyNode1.cs_vSend);
BOOST_CHECK(dummyNode1.vSendMsg.empty());
}

int64_t nStartTime = GetTime();
Expand Down
Loading

0 comments on commit cafef26

Please sign in to comment.