-
Notifications
You must be signed in to change notification settings - Fork 0
Message Bus
The MessageBus is a fundamental part of the platform, facilitating communicate between various system components through message passing. This approach enables a loosely coupled architecture, where components can interact without strong dependencies. Messages exchanged via the message bus can be categorized into three distinct types:
消息总线是平台的基本组成部分,它通过消息传递促进各种系统组件之间的通信。这种方法支持松散耦合的体系结构,其中组件可以在没有强依赖性的情况下进行交互。通过消息总线交换的消息可以分为三种不同的类型:
- Data 数据
- Events 事件
- Commands 命令
While the message bus is considered a lower-level component, users typically interact with it indirectly. Actor and Strategy classes provide two convenience methods built on top of the underlying
MessageBus
, for easier custom data and signal publishing:虽然消息总线被认为是一个较低级别的组件,但用户通常间接与它交互。Actor 和 Strategy 类提供了两个基于底层
MessageBus
构建的便捷方法,以便更轻松地自定义数据和信号发布:
def publish_data(self, data_type: DataType, data: Data) -> None:
...
def publish_signal(self, name: str, value, ts_event: int | None = None) -> None:
...
For advanced users and specific use cases, direct access to the message bus is available from within Actor and Strategy classes through the
self.msgbus
reference, which exposes the message bus interface directly. To publish a custom message, simply provide a topic as astr
and any Python object as the message payload, for example:对于高级用户和特定用例,可以通过
self.msgbus
引用直接访问消息总线,该引用直接公开消息总线接口。要发布自定义消息,只需提供一个str
类型的主题和任何 Python 对象作为消息负载,例如:
self.msgbus.publish("MyTopic", "MyMessage")
The MessageBus can be backed with any database or message broker technology which has an integration written for it, this then enables external publishing of messages.
消息总线可以使用任何为其编写了集成的数据库或消息代理技术作为后盾,这使得能够外部发布消息。
Redis is currently supported for all serializable messages which are published externally. The minimum supported Redis version is 6.2.0.
目前,Redis 支持所有外部发布的可序列化消息。支持的最低 Redis 版本是 6.2.0。
Under the hood, when a backing database (or any other compatible technology) is configured, all outgoing messages are first serialized. These serialized messages are then transmitted via a Multiple-Producer Single-Consumer (MPSC) channel to a separate thread, which is implemented in Rust. In this separate thread, the message is written to its final destination, which is presently Redis streams.
在底层,当配置了后备数据库(或任何其他兼容技术)时,所有传出消息首先被序列化。然后,这些序列化消息通过多生产者单消费者 (MPSC) 通道传输到一个单独的线程,该线程是用 Rust 实现的。在这个单独的线程中,消息被写入其最终目的地,目前是 Redis 流。
This design is primarily driven by performance considerations. By offloading the I/O operations to a separate thread, we ensure that the main thread remains unblocked and can continue its tasks without being hindered by the potentially time-consuming operations involved in interacting with a database or client.
这种设计主要由性能考虑驱动。通过将 I/O 操作卸载到一个单独的线程,我们确保主线程保持畅通,并且可以继续执行其任务,而不会受到与数据库或客户端交互所涉及的潜在耗时操作的阻碍。
Most Nautilus built-in objects are serializable, dictionaries
dict[str, Any]
containing serializable primitives, as well as primitive types themselves such asstr
,int
,float
,bool
andbytes
. Additional custom types can be registered by calling the following registration function from theserialization
subpackage:大多数 Nautilus 内置对象都是可序列化的,包含可序列化原语的字典
dict[str, Any]
,以及原语类型本身,例如str
、int
、float
、bool
和bytes
。可以通过调用serialization
子包中的以下注册函数来注册其他自定义类型:
def register_serializable_type(
cls,
to_dict: Callable[[Any], dict[str, Any]],
from_dict: Callable[[dict[str, Any]], Any],
):
...
cls
: The type to register.cls
:要注册的类型。to_dict
: The delegate to instantiate a dict of primitive types from the object.to_dict
:从对象实例化原语类型字典的委托。from_dict
: The delegate to instantiate the object from a dict of primitive types.from_dict
:从原语类型字典实例化对象的委托。
The message bus external backing technology can be configured by importing the
MessageBusConfig
object and passing this to yourTradingNodeConfig
. Each of these config options will be described below.可以通过导入
MessageBusConfig
对象并将其传递给您的TradingNodeConfig
来配置消息总线外部支持技术。下面将介绍这些配置选项中的每一个。
... # Other config omitted
message_bus=MessageBusConfig(
database=DatabaseConfig(),
encoding="json",
timestamps_as_iso8601=True,
buffer_interval_ms=100,
autotrim_mins=30,
use_trader_prefix=True,
use_trader_id=True,
use_instance_id=False,
streams_prefix="streams",
types_filter=[QuoteTick, TradeTick],
)
...
A
DatabaseConfig
must be provided, for a default Redis setup on the local loopback you can pass aDatabaseConfig()
, which will use defaults to match.必须提供
DatabaseConfig
,对于本地环回上的默认 Redis 设置,您可以传递DatabaseConfig()
,它将使用默认值进行匹配。
Two encodings are currently supported by the built-in
Serializer
used by theMessageBus
:
MessageBus
使用的内置Serializer
当前支持两种编码:
- JSON (
json
)- MessagePack (
msgpack
)
Use the
encoding
config option to control the message writing encoding.使用
encoding
配置选项来控制消息写入编码。
The
msgpack
encoding is used by default as it offers the most optimal serialization and memory performance. It's recommended to usejson
encoding for human readability when performance is not a primary concern.默认情况下使用
msgpack
编码,因为它提供了最佳的序列化和内存性能。当性能不是主要问题时,建议使用json
编码以提高人类可读性。
By default timestamps are formatted as UNIX epoch nanosecond integers. Alternatively you can configure ISO 8601 string formatting by setting the
timestamps_as_iso8601
toTrue
.默认情况下,时间戳格式化为 UNIX 纪元纳秒整数。或者,您可以通过将
timestamps_as_iso8601
设置为True
来配置 ISO 8601 字符串格式。
Message stream keys are essential for identifying individual trader nodes and organizing messages within streams. They can be tailored to meet your specific requirements and use cases. In the context of message bus streams, a trader key is typically structured as follows:
消息流键对于识别各个交易者节点和组织流中的消息至关重要。它们可以定制以满足您的特定要求和用例。在消息总线流的上下文中,交易者键通常结构如下:
trader:{trader_id}:{instance_id}:{streams_prefix}
The following options are available for configuring message stream keys:
以下选项可用于配置消息流键:
If the key should begin with the
trader
string.如果键应该以
trader
字符串开头。
If the key should include the trader ID for the node.
如果键应该包含节点的交易者 ID。
Each trader node is assigned a unique 'instance ID,' which is a UUIDv4. This instance ID helps distinguish individual traders when messages are distributed across multiple streams. You can include the instance ID in the trader key by setting the
use_instance_id
configuration option toTrue
. This is particularly useful when you need to track and identify traders across various streams in a multi-node trading system.每个交易者节点都被分配了一个唯一的“实例 ID”,它是一个 UUIDv4。当消息分布在多个流中时,此实例 ID 有助于区分各个交易者。您可以通过将
use_instance_id
配置选项设置为True
在交易者键中包含实例 ID。当您需要在多节点交易系统中的各种流中跟踪和识别交易者时,这特别有用。
The
streams_prefix
string enables you to group all streams for a single trader instance or organize messages for multiple instances. Configure this by passing a string to thestreams_prefix
configuration option, ensuring other prefixes are set tofalse
.
streams_prefix
字符串使您能够将单个交易者实例的所有流分组,或者组织多个实例的消息。通过将字符串传递给streams_prefix
配置选项来配置此项,确保其他前缀设置为false
。
Indicates whether the producer will write a separate stream for each topic. This is particularly useful for Redis backings, which do not support wildcard topics when listening to streams. If set to False, all messages will be written to the same stream.
指示生产者是否将为每个主题写入单独的流。这对于 Redis 后备特别有用,因为 Redis 在侦听流时不支持通配符主题。如果设置为 False,则所有消息都将写入同一个流。
Redis does not support wildcard stream topics. For better compatibility with Redis, it is recommended to set this option to False.
Redis 不支持通配符流主题。为了更好地与 Redis 兼容,建议将此选项设置为 False。
When messages are published on the message bus, they are serialized and written to a stream if a backing for the message bus is configured and enabled. To prevent flooding the stream with data like high-frequency quotes, you may filter out certain types of messages from external publication.
当消息在消息总线上发布时,如果配置并启用了消息总线的后备,则它们将被序列化并写入流。为了防止流中充斥着高频报价之类的数据,您可以过滤掉某些类型的消息,使其不进行外部发布。
To enable this filtering mechanism, pass a list of type objects to the
types_filter
parameter in the message bus configuration, specifying which types of messages should be excluded from external publication.要启用此过滤机制,请将类型对象列表传递给消息总线配置中的
types_filter
参数,指定应从外部发布中排除哪些类型的消息。
from nautilus_trader.config import MessageBusConfig
from nautilus_trader.data import TradeTick
from nautilus_trader.data import QuoteTick
# Create a MessageBusConfig instance with types filtering
# 使用类型过滤创建 MessageBusConfig 实例
message_bus = MessageBusConfig(
types_filter=[QuoteTick, TradeTick]
)
The
autotrim_mins
configuration parameter allows you to specify the lookback window in minutes for automatic stream trimming in your message streams. Automatic stream trimming helps manage the size of your message streams by removing older messages, ensuring that the streams remain manageable in terms of storage and performance.
autotrim_mins
配置参数允许您指定消息流中自动流修剪的回溯窗口(以分钟为单位)。自动流修剪通过删除较旧的消息来帮助管理消息流的大小,确保流在存储和性能方面保持可管理性。
The current Redis implementation will maintain the
autotrim_mins
as a maximum width (plus roughly a minute, as streams are trimmed no more than once per minute). Rather than a maximum lookback window based on the current wall clock time.当前的 Redis 实现将维护
autotrim_mins
作为最大宽度(加上大约一分钟,因为流每分钟最多修剪一次)。而不是基于当前挂钟时间的最大回溯窗口。
The minimum supported Redis version is 6.2.0.
支持的最低 Redis 版本是 6.2.0。
The message bus within a
TradingNode
(node) is referred to as the "internal message bus". A producer node is one which publishes messages onto an external stream (see external publishing). The consumer node listens to external streams to receive and publish deserialized message payloads on its internal message bus.
TradingNode
(节点)内的消息总线称为“内部消息总线”。生产者节点是在外部流上发布消息的节点(请参阅外部发布)。消费者节点侦听外部流以接收反序列化消息负载并将其发布到其内部消息总线上。
┌───────────────────────────┐
│ │
│ │
│ │
│ Producer Node │
│ │
│ │
│ │
│ │
│ │
└─────────────┬─────────────┘
│
│
┌───────────────────────────────▼──────────────────────────────┐
│ │
│ Stream │
│ │
└─────────────┬────────────────────────────────────┬───────────┘
│ │
│ │
┌─────────────▼───────────┐ ┌─────────────▼───────────┐
│ │ │ │
│ │ │ │
│ Consumer Node 1 │ │ Consumer Node 2 │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
└─────────────────────────┘ └─────────────────────────┘
Set the
LiveDataEngineConfig.external_clients
with the list ofclient_ids
intended to represent the external streaming clients. The DataEngine will filter out subscription commands for these clients, ensuring that the external streaming provides the necessary data for any subscriptions to these clients.使用旨在表示外部流客户端的
client_ids
列表设置LiveDataEngineConfig.external_clients
。DataEngine 将过滤掉这些客户端的订阅命令,确保外部流为这些客户端的任何订阅提供必要的数据。
The following example details a streaming setup where a producer node publishes Binance data externally, and a downstream consumer node publishes these data messages onto its internal message bus.
以下示例详细介绍了一个流设置,其中生产者节点在外部发布 Binance 数据,而下游消费者节点将其内部消息总线上的这些数据消息发布。
We configure the
MessageBus
of the producer node to publish to a "binance" stream. The settingsuse_trader_id
,use_trader_prefix
, anduse_instance_id
are all set toFalse
to ensure a simple and predictable stream key that the consumer nodes can register for.我们将生产者节点的
MessageBus
配置为发布到“binance”流。设置use_trader_id
、use_trader_prefix
和use_instance_id
都设置为False
,以确保消费者节点可以注册一个简单且可预测的流密钥。
message_bus=MessageBusConfig(
database=DatabaseConfig(timeout=2),
use_trader_id=False,
use_trader_prefix=False,
use_instance_id=False,
streams_prefix="binance", # <---
stream_per_topic=False,
autotrim_mins=30,
),
We configure the
MessageBus
of the consumer node to receive messages from the same "binance" stream. The node will listen to the external stream keys to publish these messages onto its internal message bus. Additionally, we declare the client ID "BINANCE_EXT" as an external client. This ensures that the DataEngine does not attempt to send data commands to this client ID, as we expect these messages to be published onto the internal message bus from the external stream, to which the node has subscribed to the relevant topics.我们将消费者节点的
MessageBus
配置为接收来自同一“binance”流的消息。该节点将侦听外部流密钥以将其内部消息总线上的这些消息发布。此外,我们将客户端 ID“BINANCE_EXT”声明为外部客户端。这确保了 DataEngine 不会尝试向此客户端 ID 发送数据命令,因为我们希望这些消息从外部流发布到内部消息总线上,节点已订阅相关主题。
data_engine=LiveDataEngineConfig(
external_clients=[ClientId("BINANCE_EXT")],
),
message_bus=MessageBusConfig(
database=DatabaseConfig(timeout=2),
external_streams=["binance"], # <---
),