diff --git a/documentation/configuration.md b/documentation/configuration.md index 429e11e3..cf086678 100644 --- a/documentation/configuration.md +++ b/documentation/configuration.md @@ -24,7 +24,7 @@ USAGE: liftbridge [global options] command [command options] [arguments...] VERSION: - v1.2.0 + v1.3.0 COMMANDS: help, h Shows a list of commands or help for one command diff --git a/documentation/quick_start.md b/documentation/quick_start.md index 854bf2a4..c8c5935f 100644 --- a/documentation/quick_start.md +++ b/documentation/quick_start.md @@ -43,12 +43,13 @@ only be set on one server when bootstrapping a cluster.** ```shell $ liftbridge --raft-bootstrap-seed -INFO[2020-04-27 17:20:21] Liftbridge Version: v1.2.0 -INFO[2020-04-27 17:20:21] Server ID: agTv5PkzvgKygm688EMd4C -INFO[2020-04-27 17:20:21] Namespace: liftbridge-default -INFO[2020-04-27 17:20:21] Retention Policy: [Age: 1 week, Compact: false] -INFO[2020-04-27 17:20:21] Starting server on 0.0.0.0:9292... -INFO[2020-04-27 17:20:23] Server became metadata leader, performing leader promotion actions +INFO[2020-10-15 14:29:50] Liftbridge Version: v1.3.0 +INFO[2020-10-15 14:29:50] Server ID: 4nbhBr66WnRsy0I5oKF9bo +INFO[2020-10-15 14:29:50] Namespace: liftbridge-default +INFO[2020-10-15 14:29:50] Default Retention Policy: [Age: 1 week, Compact: false] +INFO[2020-10-15 14:29:50] Default Partition Pausing: disabled +INFO[2020-10-15 14:29:50] Starting server on 0.0.0.0:9292... +INFO[2020-10-15 14:29:51] Server became metadata leader, performing leader promotion actions ``` Once a leader has been elected, other servers will automatically join the cluster. @@ -56,11 +57,12 @@ We set the `--data-dir` and `--port` flags to avoid clobbering the first server. ```shell $ liftbridge --data-dir /tmp/liftbridge/server-2 --port=9293 -INFO[2020-04-27 17:21:10] Liftbridge Version: v1.2.0 -INFO[2020-04-27 17:21:10] Server ID: OtKIIhwxBztcSkgZD0xXQP -INFO[2020-04-27 17:21:10] Namespace: liftbridge-default -INFO[2020-04-27 17:21:10] Retention Policy: [Age: 1 week, Compact: false] -INFO[2020-04-27 17:21:10] Starting server on 0.0.0.0:9293... +INFO[2020-10-15 14:30:48] Liftbridge Version: v1.3.0 +INFO[2020-10-15 14:30:48] Server ID: lbW05esZTab3guEwcmWD9M +INFO[2020-10-15 14:30:48] Namespace: liftbridge-default +INFO[2020-10-15 14:30:48] Default Retention Policy: [Age: 1 week, Compact: false] +INFO[2020-10-15 14:30:48] Default Partition Pausing: disabled +INFO[2020-10-15 14:30:48] Starting server on 0.0.0.0:9293... ``` We can also bootstrap a cluster by providing the explicit cluster configuration. diff --git a/server/version.go b/server/version.go index 8bbdb1d0..07fbc788 100644 --- a/server/version.go +++ b/server/version.go @@ -1,4 +1,4 @@ package server // Version of the Liftbridge server. -const Version = "v1.2.0" +const Version = "v1.3.0" diff --git a/website/versioned_docs/version-v1.3.0/client_implementation.md b/website/versioned_docs/version-v1.3.0/client_implementation.md new file mode 100644 index 00000000..ae3e703f --- /dev/null +++ b/website/versioned_docs/version-v1.3.0/client_implementation.md @@ -0,0 +1,2094 @@ +--- +id: version-v1.3.0-client-implementation +title: Client Implementation Guidance +original_id: client-implementation +--- + +This documentation provides guidance on implementing a [client +library](clients.md) for the Liftbridge server. If information is missing or +inaccurate, feel free to make a PR fixing it. + +## Generating gRPC API + +Liftbridge relies on gRPC and Protocol Buffers (protobufs) for its API. These +definitions are stored in the +[liftbridge-api](https://github.com/liftbridge-io/liftbridge-api) repository. +Refer to the [gRPC documentation](https://grpc.io/docs/) for guidance on +generating the gRPC code from the protobuf definitions for your particular +programming language. The liftbridge-api repository also contains generated +code for some languages which you can use in your client library (for example, +the official Go client [relies on](https://github.com/liftbridge-io/go-liftbridge/blob/6fbf530bb220797fd91174d9f858fad3114dbc48/go.mod#L8) +the generated Go code in liftbridge-api). If there is no generated code for +your language, feel free to make a PR adding it. Otherwise, you can simply +vendor the generated code alongside your client library. + +Generating the gRPC API results in a low-level client that can be used to talk +to Liftbridge. However, it's useful to implement a higher-level client that +provides a more user-friendly abstraction. The remainder of this guide relates +to implementing a high-level client. + +## Client Interface + +A high-level client has the following operations: + +| Operation | Description | +|:----|:----| +| [CreateStream](#createstream) | Creates a new stream attached to a NATS subject (or group of related NATS subjects if partitioned) | +| [DeleteStream](#deletestream) | Deletes a stream and all of its partitions | +| [PauseStream](#pausestream) | Pauses some or all of a stream's partitions until they are published to | +| [SetStreamReadonly](#setstreamreadonly) | Sets some or all of a stream's partitions as readonly or readwrite | +| [Subscribe](#subscribe) | Creates an ephemeral subscription for a given stream that messages are received on | +| [Publish](#publish) | Publishes a new message to a Liftbridge stream | +| [PublishAsync](#publishasync) | Publishes a new message to a Liftbridge stream asynchronously | +| [PublishToSubject](#publishtosubject) | Publishes a new message to a NATS subject | +| [FetchMetadata](#fetchmetadata) | Retrieves metadata from the cluster | +| [FetchPartitionMetadata](#fetchpartitionmetadata) | Retrieves partition metadata from the partition leader | +| [SetCursor](#setcursor) | Persists a cursor position for a particular stream partition. | +| [FetchCursor](#fetchcursor) | Retrieves a cursor position for a particular stream partition. | +| [Close](#close) | Closes any client connections to Liftbridge | + +Below is the interface definition of the Go Liftbridge client. We'll walk +through each of these operations in more detail below. + +```go +// Client is the main API used to communicate with a Liftbridge cluster. Call +// Connect to get a Client instance. +type Client interface { + // Close the client connection. + Close() error + + // CreateStream creates a new stream attached to a NATS subject. Subject is + // the NATS subject the stream is attached to, and name is the stream + // identifier, unique per subject. It returns ErrStreamExists if a stream + // with the given subject and name already exists. + CreateStream(ctx context.Context, subject, name string, opts ...StreamOption) error + + // DeleteStream deletes a stream and all of its partitions. Name is the + // stream identifier, globally unique. + DeleteStream(ctx context.Context, name string) error + + // PauseStream pauses a stream and some or all of its partitions. Name is + // the stream identifier, globally unique. It returns an ErrNoSuchPartition + // if the given stream or partition does not exist. By default, this will + // pause all partitions. A partition is resumed when it is published to via + // the Liftbridge Publish API or ResumeAll is enabled and another partition + // in the stream is published to. + PauseStream(ctx context.Context, name string, opts ...PauseOption) error + + // SetStreamReadonly sets the readonly flag on a stream and some or all of + // its partitions. Name is the stream identifier, globally unique. It + // returns an ErrNoSuchPartition if the given stream or partition does not + // exist. By default, this will set the readonly flag on all partitions. + // Subscribers to a readonly partition will see their subscription ended + // with a ErrReadonlyPartition error once all messages currently in the + // partition have been read. + SetStreamReadonly(ctx context.Context, name string, opts ...ReadonlyOption) error + + // Subscribe creates an ephemeral subscription for the given stream. It + // begins receiving messages starting at the configured position and waits + // for new messages when it reaches the end of the stream. The default + // start position is the end of the stream. It returns an + // ErrNoSuchPartition if the given stream or partition does not exist. Use + // a cancelable Context to close a subscription. + Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error + + // Publish publishes a new message to the Liftbridge stream. The partition + // that gets published to is determined by the provided partition or + // Partitioner passed through MessageOptions, if any. If a partition or + // Partitioner is not provided, this defaults to the base partition. This + // partition determines the underlying NATS subject that gets published to. + // To publish directly to a specific NATS subject, use the low-level + // PublishToSubject API. + // + // If the AckPolicy is not NONE, this will synchronously block until the + // ack is received. If the ack is not received in time, ErrAckTimeout is + // returned. If AckPolicy is NONE, this returns nil on success. + Publish(ctx context.Context, stream string, value []byte, opts ...MessageOption) (*Ack, error) + + // PublishAsync publishes a new message to the Liftbridge stream and + // asynchronously processes the ack or error for the message. + PublishAsync(ctx context.Context, stream string, value []byte, ackHandler AckHandler, opts ...MessageOption) error + + // PublishToSubject publishes a new message to the NATS subject. Note that + // because this publishes directly to a subject, there may be multiple (or + // no) streams that receive the message. As a result, MessageOptions + // related to partitioning will be ignored. To publish at the + // stream/partition level, use the high-level Publish API. + // + // If the AckPolicy is not NONE and a deadline is provided, this will + // synchronously block until the first ack is received. If an ack is not + // received in time, ErrAckTimeout is returned. If an AckPolicy and + // deadline are configured, this returns the first Ack on success, + // otherwise it returns nil. + PublishToSubject(ctx context.Context, subject string, value []byte, opts ...MessageOption) (*Ack, error) + + // FetchMetadata returns cluster metadata including broker and stream + // information. + FetchMetadata(ctx context.Context) (*Metadata, error) + + // SetCursor persists a cursor position for a particular stream partition. + // This can be used to checkpoint a consumer's position in a stream to + // resume processing later. + SetCursor(ctx context.Context, id, stream string, partition int32, offset int64) error + + // FetchCursor retrieves a cursor position for a particular stream + // partition. It returns -1 if the cursor does not exist. + FetchCursor(ctx context.Context, id, stream string, partition int32) (int64, error) +} +``` + +### CreateStream + +```go +// CreateStream creates a new stream attached to a NATS subject. Subject is +// the NATS subject the stream is attached to, and name is the stream +// identifier, unique per subject. It returns ErrStreamExists if a stream +// with the given subject and name already exists. +func CreateStream(ctx context.Context, subject, name string, opts ...StreamOption) error +``` + +`CreateStream` creates a new stream attached to a NATS subject or set of NATS +subjects. Every stream is uniquely identified by a name. + +Streams are partitioned for increased parallelism. Each partition maps to a +separate NATS subject. For example, a stream created for the subject `foo` with +three partitions has partitions that map to the NATS subjects `foo`, `foo.1`, +and `foo.2`, respectively. By default, streams consist of just a single +partition. + +Each stream partition has a server in the cluster that acts as the leader and +some set of followers which replicate the partition depending on the stream's +replication factor. Liftbridge handles the cases where leaders and followers +fail to maximize availability. + +In the Go client example above, `CreateStream` takes four arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. | language-dependent | +| subject | string | A subject string which is the NATS subject to attach the stream to. If the stream has more than one partition, this is used as the base subject for each partition. | yes | +| name | string | A name string which uniquely identifies the stream in the Liftbridge cluster. Attempting to create another stream with the same name will result in an error. | yes | +| options | stream options | Zero or more stream options. These are used to pass in optional settings for the stream. This is a common [Go pattern](https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis) for implementing extensible APIs. In other languages, this might be replaced with a builder pattern or optional named arguments. These `CreateStream` options are described below. | language-dependent | + +The stream options are the equivalent of optional named arguments used to +configure a stream. Supported options are: + +| Option | Type | Description | Default | +|:----|:----|:----|:----| +| Group | string | The name of a load-balance group for the stream to join. When there are multiple streams in the same group, messages on the subject will be distributed randomly among them. | | +| MaxReplication | bool | Sets the stream replication factor equal to the current number of servers in the cluster. This means all partitions for the stream will be fully replicated within the cluster. | false | +| ReplicationFactor | int | Sets the replication factor for the stream. The replication factor controls the number of servers a stream's partitions should be replicated to. For example, a value of 1 would mean only 1 server would have the data, and a value of 3 would mean 3 servers would have it. A value of -1 will signal to the server to set the replication factor equal to the current number of servers in the cluster (i.e. MaxReplication). | 1 | +| Partitions | int | Sets the number of partitions for the stream. | 1 | +| RetentionMaxBytes | int64 | The maximum size a stream's log can grow to, in bytes, before we will discard old log segments to free up space. A value of 0 indicates no limit. If this is not set, it takes the server default. | | +| RetentionMaxMessages | int64 | The maximum size a stream's log can grow to, in number of messages, before we will discard old log segments to free up space. A value of 0 indicates no limit. If this is not set, it takes the server default. | | +| RetentionMaxAge | time duration | The TTL for stream log segment files, after which they are deleted. A value of 0 indicates no TTL. If this is not set, it takes the server default. | | +| CleanerInterval | time duration | The frequency to check if a new stream log segment file should be rolled and whether any segments are eligible for deletion based on the retention policy or compaction if enabled. If this is not set, it takes the server default. | | +| SegmentMaxBytes | int64 | The maximum size of a single stream log segment file in bytes. Retention is always done a file at a time, so a larger segment size means fewer files but less granular control over retention. If this is not set, it takes the server default. | | +| SegmentMaxAge | time duration | The maximum time before a new stream log segment is rolled out. A value of 0 means new segments will only be rolled when segment.max.bytes is reached. Retention is always done a file at a time, so a larger value means fewer files but less granular control over retention. If this is not set, it takes the server default. | | +| CompactMaxGoroutines | int32 | The maximum number of concurrent goroutines to use for compaction on a stream log (only applicable if compact.enabled is true). If this is not set, it takes the server default. | | +| CompactEnabled | bool | Enable message compaction by key on the server for this stream. If this is not set, it takes the server default. | | + +`CreateStream` returns/throws an error if the operation fails, specifically +`ErrStreamExists` if a stream with the given name already exists. + +[Implementation Guidance](#createstream-implementation) + +### DeleteStream + +```go +// DeleteStream deletes a stream and all of its partitions. Name is the stream +// identifier, globally unique. +func (c *client) DeleteStream(ctx context.Context, name string) error +``` + +`DeleteStream` deletes a stream and all of its partitions. This will remove any +data stored on disk for the stream and all of its partitions. + +In the Go client example above, `DeleteStream` takes two arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. | language-dependent | +| name | string | The name of the stream to delete. | yes | + +`DeleteStream` returns/throws an error if the operation fails, specifically +`ErrNoSuchStream` if the stream doesn't exist. + +[Implementation Guidance](#deletestream-implementation) + +### PauseStream + +```go +// PauseStream pauses a stream and some or all of its partitions. Name is the +// stream identifier, globally unique. It returns an ErrNoSuchPartition if the +// given stream or partition does not exist. By default, this will pause all +// partitions. A partition is resumed when it is published to via the +// Liftbridge Publish API or ResumeAll is enabled and another partition in the +// stream is published to. +func (c *client) PauseStream(ctx context.Context, name string, options ...PauseOption) +``` + +`PauseStream` pauses some or all of a stream's partitions. In effect, thie +means closing NATS subscriptions, stopping replication, closing any associated +file handles, and generally releasing any resources used in maintaining a +partition. This can be done selectively on partitions or the entirety of a +stream. + +A partition is paused until it is published to via the Liftbridge +[`Publish`](#publish) API or `ResumeAll` is enabled and another partition in +the stream is published to. + +Different sets of partitions can be paused independently with subsequent +`PauseStream` requests to the same stream, and pausing partitions is an +idempotent operation. However, note that the stream will be updated with the +latest `ResumeAll` value specified on the request. + +In the Go client example above, `PauseStream` takes three arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. | language-dependent | +| name | string | The name of the stream to pause partitions for. | yes | +| options | pause options | Zero or more pause options. These are used to pass in optional settings for pausing the stream. This is a common [Go pattern](https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis) for implementing extensible APIs. In other languages, this might be replaced with a builder pattern or optional named arguments. These `PauseStream` options are described below. | language-dependent | + +The pause options are the equivalent of optional named arguments used to +configure stream pausing. Supported options are: + +| Option | Type | Description | Default | +|:----|:----|:----|:----| +| PausePartitions | list of ints | Specifies the stream partitions to pause. If not set, all partitions will be paused. | | +| ResumeAll | bool | Resume all partitions in the stream if any are published to instead of resuming only the partition published to. | false | + +`PauseStream` returns/throws an error if the operation fails, specifically +`ErrNoSuchPartition` if the stream or partition(s) do not exist. + +[Implementation Guidance](#pausestream-implementation) + +### SetStreamReadonly + +```go +// SetStreamReadonly sets the readonly flag on a stream and some or all of +// its partitions. Name is the stream identifier, globally unique. It +// returns an ErrNoSuchPartition if the given stream or partition does not +// exist. By default, this will set the readonly flag on all partitions. +// Subscribers to a readonly partition will see their subscription ended +// with a ErrReadonlyPartition error once all messages currently in the +// partition have been read. +func (c *client) SetStreamReadonly(ctx context.Context, name string, options ...ReadonlyOption) error +``` + +`SetStreamReadonly` sets some or all of a stream's partitions as readonly or +readwrite. Attempting to publish to a readonly partition will result in an +`ErrReadonlyPartition` error/exception. Subscribers consuming a readonly +partition will read up to the end of the partition and then receive an +`ErrReadonlyPartition` error. + +In the Go client example above, `SetStreamReadonly` takes three arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. | language-dependent | +| name | string | The name of the stream whose partitions to set the readonly flag for. | yes | +| options | readonly options | Zero or more readonly options. These are used to pass in optional settings for setting the readonly flag. This is a common [Go pattern](https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis) for implementing extensible APIs. In other languages, this might be replaced with a builder pattern or optional named arguments. These `SetStreamReadonly` options are described below. | language-dependent | + +The readonly options are the equivalent of optional named arguments used to +configure readonly requests. Supported options are: + +| Option | Type | Description | Default | +|:----|:----|:----|:----| +| ReadonlyPartitions | list of ints | Specifies the stream partitions to set the readonly flag for. If not set, all partitions will be changed. | | +| Readonly | bool | Defines if the partitions should be set to readonly or readwrite. | false | + +`SetStreamReadonly` returns/throws an error if the operation fails, +specifically `ErrNoSuchPartition` if the stream or partition(s) do not exist. + +[Implementation Guidance](#setstreamreadonly-implementation) + +### Subscribe + +```go +// Subscribe creates an ephemeral subscription for the given stream. It +// begins receiving messages starting at the configured position and waits +// for new messages when it reaches the end of the stream. The default +// start position is the end of the stream. It returns an ErrNoSuchStream +// if the given stream does not exist. Use a cancelable Context to close a +// subscription. +func Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error +``` + +`Subscribe` is used to consume streams. By default, it begins reading messages +from the end of the stream and blocks for new messages. + +By default, the client automatically binds to the partition's leader for a +subscribe request. However, it is also possible to bind a subscribe request to +a random ISR replica, including partition followers, with an additional opt-in +flag. Like the leader, an ISR follower will only expose committed messages. +However, there may be some lag behind what is committed on the leader before +it's committed on a follower. + +In the Go client example above, `Subscribe` takes four arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. Cancellation is particularly important for `Subscribe` because it's how subscriptions are closed. Messages will continue to received on a subscription until it is closed. In Go, this is done using a cancellable context. In other languages, this might be replaced by an explicit argument or other language-specific idiom (e.g. returning an object with a `Cancel` API). Timeouts might be implemented using an explicit argument, an optional named argument, or some other idiom. | language-dependent | +| stream | string | A stream name string which is the stream to read messages from. | yes | +| handler | callback | A handler callback used to receive messages from the stream, described below. | yes | +| options | subscription options | Zero or more subscription options. These are used to pass in optional settings for the subscription. This is a common [Go pattern](https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis) for implementing extensible APIs. In other languages, this might be replaced with a builder pattern or optional named arguments. These `Subscribe` options are described below. | language-dependent | + +The subscribe handler is a callback function that takes a message and an error. +If the error is not null, the subscription will be terminated and no more +messages will be received. The Go signature of the handler callback is detailed +below. In other languages, this might be a stream mechanism instead of a +callback. See [below](#messages-and-acks) for guidance on implementing +messages. + +```go +// Handler is the callback invoked by Subscribe when a message is received on +// the specified stream. If err is not nil, the subscription will be terminated +// and no more messages will be received. +type Handler func(msg *Message, err error) +``` + +The subscription options are the equivalent of optional named arguments used to +configure a subscription. Supported options are: + +| Option | Type | Description | Default | +|:----|:----|:----|:----| +| Partition | int | Specifies the stream partition to consume. | 0 | +| StartAtEarliestReceived | bool | Sets the subscription start position to the earliest message received in the stream. | false | +| StartAtLatestReceived | bool | Sets the subscription start position to the last message received in the stream. | false | +| StartAtOffset | int | Sets the subscription start position to the first message with an offset greater than or equal to the given offset. | | +| StartAtTime | timestamp | Sets the subscription start position to the first message with a timestamp greater than or equal to the given time. | | +| StartAtTimeDelta | time duration | Sets the subscription start position to the first message with a timestamp greater than or equal to `now - delta`. | | +| ReadISRReplica | bool | Sets the subscription to one of a random ISR replica instead of subscribing to the partition's leader. | false | +| Resume | bool | Specifies whether a paused partition should be resumed before subscribing. | false | + +Currently, `Subscribe` can only subscribe to a single partition. In the future, +there will be functionality for consuming all partitions. + +`Subscribe` returns/throws an error if the operation fails, specifically +`ErrNoSuchPartition` if the specified stream or partition does not exist. + +[Implementation Guidance](#subscribe-implementation) + +### Publish + +```go +// Publish publishes a new message to the Liftbridge stream. The partition +// that gets published to is determined by the provided partition or +// Partitioner passed through MessageOptions, if any. If a partition or +// Partitioner is not provided, this defaults to the base partition. This +// partition determines the underlying NATS subject that gets published to. +// To publish directly to a specific NATS subject, use the low-level +// PublishToSubject API. +// +// If the AckPolicy is not NONE, this will synchronously block until the +// ack is received. If the ack is not received in time, ErrAckTimeout is +// returned. If AckPolicy is NONE, this returns nil on success. +Publish(ctx context.Context, stream string, value []byte, opts ...MessageOption) (*Ack, error) +``` + +`Publish` sends a message to a Liftbridge stream. Since Liftbridge streams +attach to normal NATS subjects, it's also possible to [publish messages +directly to NATS](https://github.com/liftbridge-io/go-liftbridge#publishing-directly-with-nats) +using a [NATS client](https://nats.io/download/). Liftbridge works fine with +plain, opaque NATS messages, but it also extends NATS with a protobuf-based +[envelope protocol](./envelope_protocol.md). This allows publishers to add +metadata to messages like a key, headers, and acking information. Liftbridge +client libraries may provide helper methods to make it easy to create envelopes +and deal with acks yourself using a NATS client directly ([described +below](#low-level-publish-helpers)). However, the `Publish` API is intended to +abstract this work away from you. + +`Publish` is a synchronous operation, meaning when it returns, the message has +been successfully published. `Publish` can also be configured to block until a +message acknowledgement (ack) is returned from the cluster. This is useful for +ensuring a message has been stored and replicated, guaranteeing at-least-once +delivery. The default ack policy is `LEADER`, meaning the ack is sent once the +partition leader has stored the message. See [below](#messages-and-acks) for +guidance on implementing acks. + +If the ack policy is not `NONE`, `Publish` will synchronously block until the +ack is received. If the ack is not received in time, an `ErrAckTimeout` +error/exception is thrown. If the ack policy is `NONE`, this returns +immediately. + +`Publish` can send messages to particular stream partitions using a +`Partitioner` or an explicitly provided partition. By default, it publishes to +the base partition. The partition determines the underlying NATS subject that +gets published to. To publish directly to a specific NATS subject, use the +low-level [`PublishToSubject`](#publishtosubject) API. + +In the Go client example above, `Publish` takes four arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. With `Publish`, the timeout is particularly important as it relates to acking. If an ack policy is set (see below) and a timeout is provided, `Publish` will block until the first ack is received. If the ack is not received in time, a timeout error is returned. If the ack policy is `NONE` or a timeout is not set, `Publish` returns as soon as the message has been published. | language-dependent | +| stream | string | The stream to publish to. | yes | +| value | bytes | The message value to publish consisting of opaque bytes. | yes | +| options | message options | Zero or more message options. These are used to pass in optional settings for the message. This is a common [Go pattern](https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis) for implementing extensible APIs. In other languages, this might be replaced with a builder pattern or optional named arguments. These `Publish` options are described below. | language-dependent | + +The publish message options are the equivalent of optional named arguments used +to configure a message. Supported options are: + +| Option | Type | Description | Default | +|:----|:----|:----|:----| +| AckInbox | string | Sets the NATS subject Liftbridge should publish the message ack to. If this is not set, the server will generate a random inbox. This generally does not need to be configured when using the `Publish` API since the server will handle acks for you. Instead, it's used if, for some reason, you wish to handle the ack yourself. | | +| CorrelationID | string | Sets the identifier used to correlate an ack with the published message. If it's not set, the ack will not have a correlation ID. This generally does not need to be configured when using the `Publish` API since the server will handle acks for you. Instead, it's used if, for some reason, you wish to handle the ack yourself. | | +| AckPolicyAll | bool | Sets the ack policy of the message to `ALL`. This means the ack will be sent when the message has been stored by all partition replicas. | false | +| AckPolicyLeader | bool | Sets the ack policy of the message to `LEADER`. This means the ack will be sent when the partition leader has stored the message. This is the default ack policy if not otherwise set. | true | +| AckPolicyNone | bool | Sets the ack policy of the message to `NONE`. This means no ack will be sent. | false | +| Header | string and bytes | String and opaque bytes representing a key-value pair to set as a header on the message. This may overwrite previously set headers. Client libraries may choose to forgo this option in favor of the bulk `Headers` option below. This is a convenience for setting a single header. | | +| Headers | map of strings to bytes | Map of strings to opaque bytes representing key-value pairs to set as headers on the message. This may overwrite previously set headers. | | +| Key | bytes | Opaque bytes used for the message key. If Liftbridge has stream compaction enabled, the stream will retain only the last message for each key (if not set, the message is always retained). | | +| PartitionByKey | bool | Flag which maps the message to a stream partition based on a hash of the message key. This computes the partition number for the message by hashing the key and modding by the number of partitions for the first stream found with the subject of the published message. This does not work with streams containing wildcards in their subjects, e.g. `foo.*`, since this matches on the subject literal of the published message. This also has undefined behavior if there are multiple streams for the given subject. This is used to derive the actual NATS subject the message is published to, e.g. `foo`, `foo.1`, `foo.2`, etc. By default, it's published to the subject provided. | false | +| PartitionByRoundRobin | bool | Flag which maps the message to stream partitions in a round-robin fashion. This computes the partition number for the message by atomically incrementing a counter for the message subject and modding it by the number of partitions for the first stream found with the subject. This does not work with streams containing wildcards in their subjects, e.g. `foo.*`, since this matches on the subject literal of the published message. This also has undefined behavior if there are multiple streams for the given subject. This is used to derive the actual NATS subject the message is published to, e.g. `foo`, `foo.1`, `foo.2`, etc. By default, it's published to the subject provided. | false | +| PartitionBy | partitioner | `Partitioner` (detailed below) which sets the strategy used to map the message to a stream partition. This is used to derive the actual NATS subject the message is published to, e.g. `foo`, `foo.1`, `foo.2`, etc. By default, it's published to the subject provided. | | +| ToPartition | int | Sets the partition to publish the message to. If this is set, any specified `Partitioner` will not be used. This is used to derive the actual NATS subject the message is published to, e.g. `foo`, `foo.1`, `foo.2`, etc. By default, it's published to the subject provided. | | + +`Partitioner` is an interface which implements logic for mapping a message to a +stream partition. It passes a `Metadata` object into `Partition`, which is +described later. The Go interface definition is shown below. + +```go +// Partitioner is used to map a message to a stream partition. +type Partitioner interface { + // Partition computes the partition number for a given message. + Partition(stream string, key, value []byte, metadata *Metadata) int32 +} +``` + +[Implementation Guidance](#publish-implementation) + +### PublishAsync + +```go +// PublishAsync publishes a new message to the Liftbridge stream and +// asynchronously processes the ack or error for the message. +PublishAsync(ctx context.Context, stream string, value []byte, ackHandler AckHandler, opts ...MessageOption) error +``` + +`PublishAsync` sends a message to a Liftbridge stream asynchronously. This is +similar to [`Publish`](#publish), but rather than waiting for the ack, it +dispatches the ack with an ack handler callback. + +If the ack policy is not `NONE`, `PublishAsync` will asynchronously dispatch +the ack when it is received. If the ack is not received in time, an +`ErrAckTimeout` error/exception is dispatched. If the ack policy is `NONE`, the +callback will not be invoked. If no ack handler is provided, any ack will be +ignored. + +In the Go client example above, `PublishAsync` takes five arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. With `PublishAsync`, the timeout is particularly important as it relates to acking. If an ack policy is set (see below) and a timeout is provided, `PublishAsync` will allow up to this much time to pass until the ack is received. If the ack is not received in time, a timeout error is returned. | language-dependent | +| stream | string | The stream to publish to. | yes | +| value | bytes | The message value to publish consisting of opaque bytes. | yes | +| ackHandler | callback | A handler callback used to receive the ack for the published message, described below. | yes | +| options | message options | Zero or more message options. These are used to pass in optional settings for the message. This is a common [Go pattern](https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis) for implementing extensible APIs. In other languages, this might be replaced with a builder pattern or optional named arguments. These `PublishAsync` options are described below. | language-dependent | + +The ack handler is a callback function that takes an ack and an error. If the +error is not null, this indicates the message was not successfully acked and +the corresponding ack argument will be null. The Go signature of the ack +handler callback is detailed below. In other languages, this might be a stream +mechanism instead of a callback. See [below](#messages-and-acks) for guidance +on implementing acks. + +```go +// AckHandler is used to handle the results of asynchronous publishes to a +// stream. If the AckPolicy on the published message is not NONE, the handler +// will receive the ack once it's received from the cluster or an error if the +// message was not received successfully. +type AckHandler func(ack *Ack, err error) +``` + +The `PublishAsync` message options are the equivalent of optional named +arguments used to configure a message. Supported options are the same as those +supported by [`Publish`](#publish). + +[Implementation Guidance](#publishasync-implementation) + +### PublishToSubject + +```go +// PublishToSubject publishes a new message to the NATS subject. Note that +// because this publishes directly to a subject, there may be multiple (or +// no) streams that receive the message. As a result, MessageOptions +// related to partitioning will be ignored. To publish at the +// stream/partition level, use the high-level Publish API. +// +// If the AckPolicy is not NONE and a deadline is provided, this will +// synchronously block until the first ack is received. If an ack is not +// received in time, ErrAckTimeout is returned. If an AckPolicy and +// deadline are configured, this returns the first Ack on success, +// otherwise it returns nil. +PublishToSubject(ctx context.Context, subject string, value []byte, opts ...MessageOption) (*Ack, error) +``` + +`PublishToSubject` sends a message to a NATS subject (and, in turn, any streams +that match the subject). Since Liftbridge streams attach to normal NATS +subjects, it's also possible to [publish messages directly to +NATS](https://github.com/liftbridge-io/go-liftbridge#publishing-directly-with-nats) +using a [NATS client](https://nats.io/download/). Liftbridge works fine with +plain, opaque NATS messages, but it also extends NATS with a protobuf-based +[envelope protocol](./envelope_protocol.md). This allows publishers to add +metadata to messages like a key, headers, and acking information. Liftbridge +client libraries may provide helper methods to make it easy to create envelopes +and deal with acks yourself using a NATS client directly ([described +below](#low-level-publish-helpers)). However, the `PublishToSubject` API is +intended to abstract this work away from you. + +`PublishToSubject` is a synchronous operation, meaning when it returns, the +message has been successfully published. `PublishToSubject` can also be +configured to block until a message acknowledgement (ack) is returned from the +cluster. This is useful for ensuring a message has been stored and replicated, +guaranteeing at-least-once delivery. The default ack policy is `LEADER`, +meaning the ack is sent once the partition leader has stored the message. +`PublishToSubject` only waits on the first ack received. This is important to +be aware of since there may be multiple streams attached to a subject. See +[below](#messages-and-acks) for guidance on implementing acks. + +In the Go client example above, `PublishToSubject` takes four arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. With `PublishToSubject`, the timeout is particularly important as it relates to acking. If an ack policy is set (see below) and a timeout is provided, `PublishToSubject` will block until the first ack is received. If the ack is not received in time, a timeout error is returned. If the ack policy is `NONE` or a timeout is not set, `PublishToSubject` returns as soon as the message has been published. | language-dependent | +| subject | string | The NATS subject to publish to. | yes | +| value | bytes | The message value to publish consisting of opaque bytes. | yes | +| options | message options | Zero or more message options. These are used to pass in optional settings for the message. This is a common [Go pattern](https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis) for implementing extensible APIs. In other languages, this might be replaced with a builder pattern or optional named arguments. These `PublishToSubject` options are described below. | language-dependent | + +The publish message options are the equivalent of optional named arguments used +to configure a message. Unlike `Publish`, `PublishToSubject` does not support +message options relating to partitioning since it operates at a lower level. +Supported options are: + +| Option | Type | Description | Default | +|:----|:----|:----|:----| +| AckInbox | string | Sets the NATS subject Liftbridge should publish the message ack to. If this is not set, the server will generate a random inbox. This generally does not need to be configured when using the `PublishToSubject` API since the server will handle acks for you. Instead, it's used if, for some reason, you wish to handle the ack yourself. | | +| CorrelationID | string | Sets the identifier used to correlate an ack with the published message. If it's not set, the ack will not have a correlation ID. This generally does not need to be configured when using the `PublishToSubject` API since the server will handle acks for you. Instead, it's used if, for some reason, you wish to handle the ack yourself. | | +| AckPolicyAll | bool | Sets the ack policy of the message to `ALL`. This means the ack will be sent when the message has been stored by all partition replicas. | false | +| AckPolicyLeader | bool | Sets the ack policy of the message to `LEADER`. This means the ack will be sent when the partition leader has stored the message. This is the default ack policy if not otherwise set. | true | +| AckPolicyNone | bool | Sets the ack policy of the message to `NONE`. This means no ack will be sent. | false | +| Header | string and bytes | String and opaque bytes representing a key-value pair to set as a header on the message. This may overwrite previously set headers. Client libraries may choose to forgo this option in favor of the bulk `Headers` option below. This is a convenience for setting a single header. | | +| Headers | map of strings to bytes | Map of strings to opaque bytes representing key-value pairs to set as headers on the message. This may overwrite previously set headers. | | +| Key | bytes | Opaque bytes used for the message key. If Liftbridge has stream compaction enabled, the stream will retain only the last message for each key (if not set, the message is always retained). | | + +[Implementation Guidance](#publishtosubject-implementation) + +#### Low-Level Publish Helpers + +Because Liftbridge is simply a consumer of NATS, it's also possible to publish +messages directly to NATS rather than using Liftbridge's `Publish` and +`PublishToSubject` APIs. This can be useful for extending the capabilities of +NATS in existing systems. This means we can publish NATS messages as normal, as +shown in the example Go code below, and Liftbridge streams will be able to +receive them. + +```go +package main + +import "github.com/nats-io/nats.go" + +func main() { + // Connect to NATS. + nc, _ := nats.Connect(nats.DefaultURL) + + // Publish a message. + nc.Publish("foo.bar", []byte("Hello, world!")) + nc.Flush() +} +``` + +However, these low-level publishes lose out on some of the additional +capabilities of Liftbridge provided by message +[envelopes](./envelope_protocol.md), such as message headers, keys, etc. As a +result, client libraries may provide helper methods to facilitate publishing +message envelopes directly to NATS as well as handling acks. These include +`NewMessage`, `UnmarshalAck`, and `UnmarshalMessage` described below. + +##### NewMessage + +```go +// NewMessage returns a serialized message for the given payload and options. +func NewMessage(value []byte, options ...MessageOption) []byte +``` + +`NewMessage` creates a Liftbridge message envelope serialized to bytes ready +for publishing to NATS. This consists of an [envelope +header](envelope_protocol.md#liftbridge-envelope-header) followed by the +serialized message protobuf. It takes the same arguments as `Publish` (see +above) with the exception of the context and subject. + +Note that the envelope protocol does not need to be implemented in the +`Publish` API since the envelope serialization is handled by the server. + +##### UnmarshalMessage + +```go +// UnmarshalMessage deserializes a message from the given byte slice. It +// returns an error if the given data is not actually a Message. +func UnmarshalMessage(data []byte) (Message, error) +``` + +`UnmarshalMessage` is a helper method which effectively does the reverse of +`NewMessage`, taking a serialized message envelope and returning a deserialized +message object or indication the data is not actually a message. + +##### UnmarshalAck + +```go +// UnmarshalAck deserializes an Ack from the given byte slice. It returns an +// error if the given data is not actually an Ack. +func UnmarshalAck(data []byte) (*Ack, error) +``` + +`UnmarshalAck` is used to deserialize a message ack received on a NATS +subscription. It takes a single argument consisting of the ack bytes as +received from NATS and throws an error/exception if the bytes are not actually +a serialized ack envelope. + +This is useful for handling acks for messages published with a set ack inbox +(i.e. the NATS subject Liftbridge will publish message acks to). For example: + +```go +// Connect to NATS. +nc, _ := nats.Connect(nats.DefaultURL) + +// Subscribe to ack inbox. +nc.Subscribe("my.ack.inbox", func(msg *nats.Msg) { + ack, _ := lift.UnmarshalAck(msg.Data) + // CorrelationId can be used to correlate an ack with a particular message. + if ack.CorrelationID() == "123" { + println("message acked!") + } +}) + +// Publish a message envelope. +nc.Publish("foo.bar", lift.NewMessage( + []byte("hello"), + lift.AckInbox("my.ack.inbox"), + lift.CorrelationID("123"), +)) +``` + +### FetchMetadata + +```go +// FetchMetadata returns cluster metadata including broker and stream +// information. +func FetchMetadata(ctx context.Context) (*Metadata, error) +``` + +Liftbridge provides a metadata API which clients can use to retrieve cluster +metadata including server (broker) and stream information. There are a few key +use cases for this metadata: + +- The client can connect to a single seed server, fetch the cluster metadata, + and then populate a local cache of known servers in order to allow for + failover of subsequent RPCs. This should be handled internally by client + libraries. +- Subscribe requests must be sent to the leader of the requested stream + partition, so the client can fetch metadata from the cluster to determine + which server to send particular requests to. This should be handled + internally by client libraries. +- Stream partitioning requires knowing how many partitions exist for a stream, + so the client can fetch metadata from the cluster to determine partitions. + This should be handled internally by client libraries for things such as + `PartitionByKey` and `PartitionByRoundRobin` but may be handled by users for + more advanced use cases. + +`FetchMetadata` retrieves an immutable object containing this cluster metadata. +As noted in the above use cases, most uses of this metadata is handled +internally by client libraries. Generally, `FetchMetadata` is only needed by +users in cases where the number of stream partitions is required. + +In the Go client example above, `FetchMetadata` takes one argument: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. | language-dependent | + +[Implementation Guidance](#fetchmetadata-implementation) + +### FetchPartitionMetadata + +```go +// FetchPartitionMetadata retrieves the latest partition metadata from partition leader +// The main interest is to retrieve Highest Watermark and Newest Offset +FetchPartitionMetadata(ctx context.Context, stream string, partition int32) (*PartitionMetadataResponse, error) +``` + +Liftbridge provides a partition metadata API which can be used to retrieve +information for a partition. Most importantly, there are use cases where it is +useful to retrieve the high watermark and newest offset of the partition for +tighter control over subscriptions and published messages. A few key points to +take into account: + +- The client must connect to the partition leader to fetch partition metadata. + Only the partition leader maintains the most up-to-date version of the high + watermark and newest offset. Thus, the request must be sent to the partition + leader. +- The client should maintain an internal version of the metadata to know, for a + given partition of a stream, which broker is currently the leader. It should + be noted that the partition leader is different from the cluster's metadata + leader. + +In the Go client example above, `FetchPartitionMetadata` takes three arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. | language-dependent | +| stream | string | Stream name | yes | +| partition | int | ID of the partition | yes | + +[Implementation Guidance](#fetchpartitionmetadata-implementation) + +### SetCursor + +```go +// SetCursor persists a cursor position for a particular stream partition. +// This can be used to checkpoint a consumer's position in a stream to +// resume processing later. +SetCursor(ctx context.Context, id, stream string, partition int32, offset int64) error +``` + +`SetCursor` is used to checkpoint a consumer's position in a stream partition. +This is useful to allow consumer's to resume processing a stream while +remaining stateless by using [`FetchCursor`](#fetchcursor). + +Because cursors are stored in an internal stream that is partitioned, clients +need to send this request to the appropriate partition leader. The internal +cursors stream is partitioned by cursor key. A cursor key consists of +`,,`. To map this key to a partition, hash it with +an IEEE CRC32 and then mod by the number of partitions in the `__cursors` +stream. This request mapping should be handled by the client implementation. + +`SetCursor` throws an error/exception if the cursors stream does not exist, the +particular cursors partition does not exist, the server is not the leader for +the cursors partition, or the server failed to durably store the cursor. + +In the Go client example above, `SetCursor` takes five arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. | language-dependent | +| id | string | Unique cursor identifier | yes | +| stream | string | Name of stream cursor belongs to | yes | +| partition | int | ID of the stream partition cursor belongs to | yes | +| offset | int | Cursor offset position | yes | + +[Implementation Guidance](#setcursor-implementation) + +### FetchCursor + +```go +// FetchCursor retrieves a cursor position for a particular stream +// partition. It returns -1 if the cursor does not exist. +FetchCursor(ctx context.Context, id, stream string, partition int32) (int64, error) +``` + +`FetchCursor` is used to retrieve a consumer's position in a stream partition +stored by [`SetCursor`](#setcursor). This is useful to allow consumer's to +resume processing a stream while remaining stateless. + +Because cursors are stored in an internal stream that is partitioned, clients +need to send this request to the appropriate partition leader. The internal +cursors stream is partitioned by cursor key. A cursor key consists of +`,,`. To map this key to a partition, hash it with +an IEEE CRC32 and then mod by the number of partitions in the `__cursors` +stream. This request mapping should be handled by the client implementation. + +`FetchCursor` throws an error/exception if the cursors stream does not exist, +the particular cursors partition does not exist, the server is not the leader +for the cursors partition, or the server failed to load the cursor. + +In the Go client example above, `FetchCursor` takes four arguments: + +| Argument | Type | Description | Required | +|:----|:----|:----|:----| +| context | context | A [context](https://golang.org/pkg/context/#Context) which is a Go idiom for passing things like a timeout, cancellation signal, and other values across API boundaries. For Liftbridge, this is primarily used for two things: request timeouts and cancellation. In other languages, this might be replaced by explicit arguments, optional named arguments, or other language-specific idioms. | language-dependent | +| id | string | Unique cursor identifier | yes | +| stream | string | Name of stream cursor belongs to | yes | +| partition | int | ID of the stream partition cursor belongs to | yes | + +[Implementation Guidance](#fetchcursor-implementation) + +### Close + +```go +// Close the client connection. +func Close() error +``` + +`Close` simply releases all resources associated with the client, namely all +gRPC connections to the server. There might be multiple connections since a +client might have connections to multiple servers in a cluster and/or +connection pooling. `Close` takes no arguments and returns/throws an error if +the operation fails. + +[Implementation Guidance](#close-implementation) + +## Client Implementation + +Below is implementation guidance for the client interface described above. Like +the interface, specific implementation details may differ depending on the +programming language. + +### Messages and Acks + +It's recommended to not expose transport and protocol details to users in +client library implementations. Currently, Liftbridge relies on gRPC and +Protocol Buffers, but it's possible that this could change in the future. These +should be considered implementation details. Thus, it's recommended to provide +wrapping types around `Message`, `Ack`, `AckPolicy`, and any other API objects. +Implementations should handle converting protobufs into these wrappers. The Go +implementation of these is shown below to demonstrate: + +```go +// Message received from a Liftbridge stream. +type Message struct { + offset int64 + key []byte + value []byte + partition int32 + timestamp time.Time + stream string + subject string + replySubject string + headers map[string][]byte +} + +func messageFromProto(wireMsg *proto.Message) *Message { + if wireMsg == nil { + return nil + } + msg := &Message{ + offset: wireMsg.GetOffset(), + key: wireMsg.GetKey(), + value: wireMsg.GetValue(), + partition: wireMsg.GetPartition(), + timestamp: time.Unix(0, wireMsg.GetTimestamp()), + stream: wireMsg.GetStream(), + subject: wireMsg.GetSubject(), + replySubject: wireMsg.GetReplySubject(), + headers: wireMsg.GetHeaders(), + } + return msg +} + +// Offset is a monotonic message sequence in the stream partition. +func (m *Message) Offset() int64 { + return m.offset +} + +// Key is an optional label set on a Message, useful for partitioning and +// stream compaction. +func (m *Message) Key() []byte { + return m.key +} + +// Value is the Message payload. +func (m *Message) Value() []byte { + return m.value +} + +// Timestamp is the time the Message was received by the server. +func (m *Message) Timestamp() time.Time { + return m.timestamp +} + +// Subject is the NATS subject the Message was received on. +func (m *Message) Subject() string { + return m.subject +} + +// ReplySubject is the NATS reply subject on the Message, if any. +func (m *Message) ReplySubject() string { + return m.replySubject +} + +// Headers is a set of key-value pairs. +func (m *Message) Headers() map[string][]byte { + headers := make(map[string][]byte, len(m.headers)) + for key, value := range m.headers { + headers[key] = value + } + return headers +} + +// Stream the Message was received on. +func (m *Message) Stream() string { + return m.stream +} + +// Partition the Message was received on. +func (m *Message) Partition() int32 { + return m.partition +} + +// AckPolicy controls the behavior of message acknowledgements. +type AckPolicy int32 + +func (a AckPolicy) toProto() proto.AckPolicy { + return proto.AckPolicy(a) +} + +// Ack represents an acknowledgement that a message was committed to a stream +// partition. +type Ack struct { + stream string + partitionSubject string + messageSubject string + offset int64 + ackInbox string + correlationID string + ackPolicy AckPolicy + receptionTimestamp time.Time + commitTimestamp time.Time +} + +func ackFromProto(wireAck *proto.Ack) *Ack { + if wireAck == nil { + return nil + } + ack := &Ack{ + stream: wireAck.GetStream(), + partitionSubject: wireAck.GetPartitionSubject(), + messageSubject: wireAck.GetMsgSubject(), + offset: wireAck.GetOffset(), + ackInbox: wireAck.GetAckInbox(), + correlationID: wireAck.GetCorrelationId(), + ackPolicy: AckPolicy(wireAck.GetAckPolicy()), + receptionTimestamp: time.Unix(0, wireAck.GetReceptionTimestamp()), + commitTimestamp: time.Unix(0, wireAck.GetCommitTimestamp()), + } + return ack +} + +// Stream the Message was received on. +func (a *Ack) Stream() string { + return a.stream +} + +// PartitionSubject is the NATS subject the partition is attached to. +func (a *Ack) PartitionSubject() string { + return a.partitionSubject +} + +// MessageSubject is the NATS subject the message was received on. +func (a *Ack) MessageSubject() string { + return a.messageSubject +} + +// Offset is the partition offset the message was committed to. +func (a *Ack) Offset() int64 { + return a.offset +} + +// AckInbox is the NATS subject the ack was published to. +func (a *Ack) AckInbox() string { + return a.ackInbox +} + +// CorrelationID is the user-supplied value from the message. +func (a *Ack) CorrelationID() string { + return a.correlationID +} + +// AckPolicy sent on the message. +func (a *Ack) AckPolicy() AckPolicy { + return a.ackPolicy +} + +// ReceptionTimestamp is the timestamp the message was received by the server. +func (a *Ack) ReceptionTimestamp() time.Time { + return a.receptionTimestamp +} + +// CommitTimestamp is the timestamp the message was committed. +func (a *Ack) CommitTimestamp() time.Time { + return a.commitTimestamp +} +``` + +With [`PublishAsync`](#publishasync), the server can also send back `PublishResponse` messages +containing a `PublishAsyncError` in the event of a failed publish. Similar to +`Ack`, client implementations should wrap this error object before exposing it +to the user. An example of this is shown in the Go implementation +`asyncErrorFromProto` below. + +```go +func asyncErrorFromProto(asyncError *proto.PublishAsyncError) error { + if asyncError == nil { + return nil + } + switch asyncError.Code { + case proto.PublishAsyncError_NOT_FOUND: + return ErrNoSuchPartition + case proto.PublishAsyncError_READONLY: + return ErrReadonlyPartition + default: + return errors.New(asyncError.Message) + } +} +``` + +### RPCs + +While Liftbridge has a concept of a metadata leader, or controller, which is +responsible for handling updates to cluster metadata, most RPCs can be made to +any server in the cluster. The only exception to this is `Subscribe`, which +requires the RPC to be made to the leader of the partition being subscribed to. +Metadata RPCs will be automatically forwarded to the metadata leader in the +cluster. + +With this in mind, we recommend implementing a reusable method for making +resilient RPCs by retrying requests on certain failures and cycling through +known servers. It's important to be mindful of which errors are retried for +idempotency reasons. This resilient RPC method can be used for RPCs such as: + +- `CreateStream` +- `DeleteStream` +- `PauseStream` +- `SetStreamReadonly` +- `PublishToSubject` +- `FetchMetadata` + +The Go implementation of this, called `doResilientRPC`, is shown below along +with the `dialBroker` helper. It takes an RPC closure and retries on gRPC +`Unavailable` errors while trying different servers in the cluster. +`dialBroker` relies on the internally managed [metadata +object](#fetchmetadata-implementation) to track server addresses in the +cluster. + +```go +// doResilientRPC executes the given RPC and performs retries if it fails due +// to the broker being unavailable, cycling through the known broker list. +func (c *client) doResilientRPC(rpc func(client proto.APIClient) error) (err error) { + c.mu.RLock() + client := c.apiClient + c.mu.RUnlock() + + for i := 0; i < 10; i++ { + err = rpc(client) + if status.Code(err) == codes.Unavailable { + conn, err := c.dialBroker() + if err != nil { + return err + } + client = proto.NewAPIClient(conn) + c.mu.Lock() + c.apiClient = client + c.conn.Close() + c.conn = conn + c.mu.Unlock() + } else { + break + } + } + return +} + +// dialBroker dials each broker in the cluster, in random order, returning a +// gRPC ClientConn to the first one that is successful. +func (c *client) dialBroker() (*grpc.ClientConn, error) { + var ( + conn *grpc.ClientConn + err error + addrs = c.metadata.getAddrs() + perm = rand.Perm(len(addrs)) + ) + for _, i := range perm { + conn, err = grpc.Dial(addrs[i], c.dialOpts...) + if err == nil { + break + } + } + if conn == nil { + return nil, err + } + return conn, nil +} +``` + +Additionally, RPCs such as `SetCursor`, `FetchCursor`, and +`FetchPartitionMetadata` cannot be sent to any random server but, rather, need +to be sent to leaders of particular partitions. Thus, it's also useful to +implement a variant of `doResilientRPC` for sending RPCs to partition leaders +in a fault-tolerant fashion. The Go implementation of this is called +`doResilientLeaderRPC` and is shown below. + +```go +// doResilientLeaderRPC sends the given RPC to the partition leader and +// performs retries if it fails due to the broker being unavailable. +func (c *client) doResilientLeaderRPC(ctx context.Context, rpc func(client proto.APIClient) error, + stream string, partition int32) (err error) { + + var ( + pool *connPool + addr string + conn *conn + ) + for i := 0; i < 5; i++ { + pool, addr, err = c.getPoolAndAddr(stream, partition, false) + if err != nil { + time.Sleep(50 * time.Millisecond) + c.metadata.update(ctx) + continue + } + conn, err = pool.get(c.connFactory(addr)) + if err != nil { + time.Sleep(50 * time.Millisecond) + c.metadata.update(ctx) + continue + } + err = rpc(conn) + pool.put(conn) + if err != nil { + if status.Code(err) == codes.Unavailable { + time.Sleep(50 * time.Millisecond) + c.metadata.update(ctx) + continue + } + } + break + } + return err +} +``` + +`getPoolAndAddr` is a helper which returns a connection pool and the address +for the leader of the given partition. + +### Connection Pooling + +A single client might have multiple connections to different servers in a +Liftbridge cluster or even multiple connections to the same server. For +efficiency purposes, it's recommended client libraries implement connection +pooling to reuse and limit gRPC connections. For simplicity, we recommend +having a dedicated connection for [all RPCs with the exception of +`Subscribe`](#rpcs) and using connection pooling for `Subscribe` RPCs which are +long-lived and asynchronous. + +There are two important client options that control pooling behavior: +`KeepAliveTime` and `MaxConnsPerBroker`. `KeepAliveTime` is the amount of time +a pooled connection can be idle (unused) before it is closed and removed from +the pool. A suggested default for this is 30 seconds. `MaxConnsPerBroker` is +the maximum number of connections to pool for a given server in the cluster. A +suggested default for this is 2. + +Since a connection pool is generally scoped to a server, a client will +typically have a map of server addresses to connection pools. + +A connection pool generally has three key operations: + +- `get`: return a gRPC connection from the pool, if available, or create one + using a provided connection factory. If the connection was taken from a pool, + an expiration timer should be canceled on it to prevent it from being closed. +- `put`: return a gRPC connection to the pool if there is capacity or close it + if not. If the connection is added to the pool, a timer should be set to + close the connection if it's unused for `KeepAliveTime`. +- `close`: clean up the connection pool by closing all active gRPC connections + and stopping all timers. + +### CreateStream Implementation + +The `CreateStream` implementation simply constructs a gRPC request and executes +it using the [resilient RPC method](#rpcs) described above. If the +`AlreadyExists` gRPC error is returned, an `ErrStreamExists` error/exception is +thrown. Otherwise, any other error/exception is thrown if the operation failed. + +Also, clients can set custom configurations for the stream to be created. The +exhaustive list of supported stream configurations are: + +```plaintext +RetentionMaxBytes +RetentionMaxMessages +RetentionMaxAge +CleanerInterval +SegmentMaxBytes +SegmentMaxAge +CompactEnabled +CompactMaxGoroutines +``` + +Refer to [Stream Configuration](configuration.md#streams-configuration-settings) +for more details. Note that these settings are optional. If not provided, the +default configurations of the broker will be used instead. + +In order to differentiate between custom configuration specified by the user +and the server's default configuration, we use 3 custom `NullableType` wrappers +in setting options for the `CreateStreamRequest`. These custom types are: + +```proto +message NullableInt64 { + int64 value = 1; +} + +message NullableInt32 { + int32 value = 1; +} + +message NullableBool { + bool value = 1; +} + +``` + +Note: if `CompactMaxGoroutines` is configured, you have to make sure manually +that `CompactEnabled` is also set. The reason is that if this is not enabled +explicitly, the servier will use the default configuration and that may be to +disable compaction on the service side, which renders `CompactMaxGoroutines` to +be unused. + +```go +// CreateStream creates a new stream attached to a NATS subject. Subject is the +// NATS subject the stream is attached to, and name is the stream identifier, +// unique per subject. It returns ErrStreamExists if a stream with the given +// subject and name already exists. +func (c *client) CreateStream(ctx context.Context, subject, name string, options ...StreamOption) error { + opts := &StreamOptions{} + for _, opt := range options { + if err := opt(opts); err != nil { + return err + } + } + + req := opts.newRequest(subject, name) + err := c.doResilientRPC(func(client proto.APIClient) error { + _, err := client.CreateStream(ctx, req) + return err + }) + if status.Code(err) == codes.AlreadyExists { + return ErrStreamExists + } + return err +} +``` + +`StreamOptions.newRequest` creates a `CreateStreamRequest` protobuf and applies +the specified options to it. + +### DeleteStream Implementation + +The `DeleteStream` implementation simply constructs a gRPC request and executes +it using the [resilient RPC method](#rpcs) described above. If the +`NotFound` gRPC error is returned, an `ErrNoSuchStream` error/exception is +thrown. Otherwise, any other error/exception is thrown if the operation failed. + +```go +// DeleteStream deletes a stream and all of its partitions. Name is the stream +// identifier, globally unique. +func (c *client) DeleteStream(ctx context.Context, name string) error { + req := &proto.DeleteStreamRequest{Name: name} + err := c.doResilientRPC(func(client proto.APIClient) error { + _, err := client.DeleteStream(ctx, req) + return err + }) + if status.Code(err) == codes.NotFound { + return ErrNoSuchStream + } + return err +} +``` + +### PauseStream Implementation + +The `PauseStream` implementation simply constructs a gRPC request and executes +it using the [resilient RPC method](#rpcs) described above. If the `NotFound` +gRPC error is returned, an `ErrNoSuchPartition` error/exception is thrown. +Otherwise, any other error/exception is thrown if the operation failed. + +```go +// PauseStream pauses a stream and some or all of its partitions. Name is the +// stream identifier, globally unique. It returns an ErrNoSuchPartition if the +// given stream or partition does not exist. By default, this will pause all +// partitions. A partition is resumed when it is published to via the +// Liftbridge Publish API or ResumeAll is enabled and another partition in the +// stream is published to. +func (c *client) PauseStream(ctx context.Context, name string, options ...PauseOption) error { + opts := &PauseOptions{} + for _, opt := range options { + if err := opt(opts); err != nil { + return err + } + } + + req := &proto.PauseStreamRequest{ + Name: name, + Partitions: opts.Partitions, + ResumeAll: opts.ResumeAll, + } + err := c.doResilientRPC(func(client proto.APIClient) error { + _, err := client.PauseStream(ctx, req) + return err + }) + if status.Code(err) == codes.NotFound { + return ErrNoSuchPartition + } + return err +} +``` + +### SetStreamReadonly Implementation + +The `SetStreamReadonly` implementation simply constructs a gRPC request and +executes it using the [resilient RPC method](#rpcs) described above. If the +`NotFound` gRPC error is returned, an `ErrNoSuchPartition` error/exception is +thrown. Otherwise, any other error/exception is thrown if the operation +failed. + +```go +// SetStreamReadonly sets the readonly flag on a stream and some or all of +// its partitions. Name is the stream identifier, globally unique. It +// returns an ErrNoSuchPartition if the given stream or partition does not +// exist. By default, this will set the readonly flag on all partitions. +// Subscribers to a readonly partition will see their subscription ended +// with a ErrReadonlyPartition error once all messages currently in the +// partition have been read. +func (c *client) SetStreamReadonly(ctx context.Context, name string, options ...ReadonlyOption) error { + opts := &ReadonlyOptions{} + for _, opt := range options { + if err := opt(opts); err != nil { + return err + } + } + + req := &proto.SetStreamReadonlyRequest{ + Name: name, + Partitions: opts.Partitions, + Readonly: !opts.Readwrite, + } + err := c.doResilientRPC(func(client proto.APIClient) error { + _, err := client.SetStreamReadonly(ctx, req) + return err + }) + if status.Code(err) == codes.NotFound { + return ErrNoSuchPartition + } + return err +} +``` + +### Subscribe Implementation + +`Subscribe` works by making a gRPC request which returns a stream that sends +back messages on a partition. As such, there are essentially two main +implementation components: making the subscribe request and starting a +dispatcher which asynchronously sends messages received on the gRPC stream to +the user, either using a callback or some other message-passing mechanism. The +Go implementation of this is shown below: + +```go +// Subscribe creates an ephemeral subscription for the given stream. It begins +// receiving messages starting at the configured position and waits for new +// messages when it reaches the end of the stream. The default start position +// is the end of the stream. It returns an ErrNoSuchPartition if the given +// stream or partition does not exist. Use a cancelable Context to close a +// subscription. +func (c *client) Subscribe(ctx context.Context, streamName string, handler Handler, + options ...SubscriptionOption) (err error) { + + opts := &SubscriptionOptions{} + for _, opt := range options { + if err := opt(opts); err != nil { + return err + } + } + + stream, releaseConn, err := c.subscribe(ctx, streamName, opts) + if err != nil { + return err + } + + go c.dispatchStream(ctx, streamName, stream, releaseConn, handler) + return nil +} +``` + +The `Subscribe` RPC can be sent to either the leader or a random ISR replica of +the partition the client wants to subscribe to. By default, it should subscribe +to the leader. A `ReadISRReplica` option must be enabled on the subscribe +request in order to subscribe to an ISR replica. + +Whether subscribing to the leader (by default) or a random ISR replica, the +address should be determined by [fetching the metadata](#fetchmetadata) from +the cluster. This metadata can (and should) be cached in the client. It's +recommended `Subscribe` use [connection pooling](#connection-pooling) since +each `Subscribe` call will involve a long-lived gRPC connection to a different +server. + +When the subscription stream is created, the server sends an empty message to +indicate the subscription was successfully created. Otherwise, an error is sent +on the stream if the subscribe failed. This handshake message must be handled +and should not be exposed to the user. A gRPC `FailedPrecondition` error +indicates the server is not the partition leader, perhaps because the leader +has since changed. In this case, the client should refresh the metadata and +retry. It's recommended retries also wait a bit, e.g. between 10 and 500 ms, +ideally with some jitter. A gRPC `NotFound` error is returned if the requested +partition does not exist. + +After the subscription is created and the server has returned a gRPC stream for +the client to receive messages on, `Subscribe` should start an asynchronous +thread, coroutine, or equivalent to send messages to the user. For example, +this might invoke a callback or place messages in a buffer. Alternatively, +`Subscribe` may be blocking, in which case it simply yields a stream of +messages to the user in a synchronous manner. There are a few important pieces +of behavior to point out with the dispatching component: + +- If an error is received on the gRPC stream, it should be propagated to the + user. This should signal that the subscription is terminated and no more + messages are to be received. +- If an error indicating a transient network issue occurs, e.g. a gRPC + `Unavailable` error, the client may opt to resubscribe to the partition + starting at the last received offset. It's recommended to provide a + configuration, `ResubscribeWaitTime`, which is the amount of time to attempt + to re-establish a subscription after being disconnected. A suggested default + for this is 30 seconds. It's recommended to have some wait time with jitter + between resubscribe attempts. This resubscribe logic implies `Subscribe` is + tracking the last received offset from the partition in order to know where + to start a resubscribe from. Note that the internal resubscribe logic is + purely best effort and, at the moment, it's up to users to track their + position in the stream if needed. +- When a subscription is closed, either explicitly or due to an error, its + connection should be returned to the connection pool. + +### Publish Implementation + +`Publish` originally used the `Publish` RPC endpoint. This endpoint will be +deprecated in a future version of Liftbridge. Instead, `Publish` should rely on +the `PublishAsync` streaming endpoint and wrap it with synchronous logic. Refer +to the [`PublishAsync` implementation](#publishasync-implementation) for +guidance on implementing the asynchronous component. The remainder of this +builds on that implementation. + +`Publish` relies on a private `publishAsync` helper function by synchronously +waiting for the dispatched ack. If the ack policy is `NONE`, the message is +published while ignoring the ack. Otherwise, `Publish` waits for the ack or +error dispatched by `publishAsync`. The Go implementation of this is shown +below. Refer to the [`PublishAsync` implementation](#publishasync-implementation) +for the implementation of `publishAsync`. + +```go +// Publish publishes a new message to the Liftbridge stream. The partition that +// gets published to is determined by the provided partition or Partitioner +// passed through MessageOptions, if any. If a partition or Partitioner is not +// provided, this defaults to the base partition. This partition determines the +// underlying NATS subject that gets published to. To publish directly to a +// spedcific NATS subject, use the low-level PublishToSubject API. +// +// If the AckPolicy is not NONE, this will synchronously block until the ack is +// received. If the ack is not received in time, ErrAckTimeout is returned. If +// AckPolicy is NONE, this returns nil on success. +func (c *client) Publish(ctx context.Context, stream string, value []byte, + options ...MessageOption) (*Ack, error) { + + opts := &MessageOptions{Headers: make(map[string][]byte)} + for _, opt := range options { + opt(opts) + } + + if opts.AckPolicy == AckPolicy(proto.AckPolicy_NONE) { + // Fire and forget. + err := c.publishAsync(ctx, stream, value, nil, opts) + return nil, err + } + + // Publish and wait for ack. + var ( + ackCh = make(chan *Ack, 1) + errorCh = make(chan error, 1) + ) + err := c.publishAsync(ctx, stream, value, func(ack *Ack, err error) { + if err != nil { + errorCh <- err + return + } + ackCh <- ack + }, opts) + if err != nil { + return nil, err + } + + select { + case ack := <-ackCh: + return ack, nil + case err := <-errorCh: + return nil, err + } +} +``` + +### PublishAsync Implementation + +The `PublishAsync` RPC endpoint is a bidirectional streaming API. This means +both the client can continually send new messages to the server and the server +can continually send acks back to the client. This requires the client to have +a background connection to a server in the cluster for handling asynchronous +publishes. This also requires a long-running process for dispatching acks +received from the server. We recommend initializing a dedicated gRPC connection +for `PublishAsync` and starting a background thread for dispatching acks when +the client is initialized. In the Go implementation, we start a `dispatchAcks` +goroutine, shown below, which receives acks from the server and dispatches the +appropriate callback. `PublishResponse`, the message sent by the server, also +includes a `PublishAsyncError`. This is set if an async publish failed, e.g. +because the partition published to does not exist. + +Since the `PublishAsync` RPC is a long-lived streaming endpoint, it's possible +for the connection to be disrupted, such as in the case of a server failure. +Therefore, it's important that this connection be re-established in the event +of a disconnection. Note that there are two different threads of execution +which use the `PublishAsync` connection concurrently, one for publishing +messages and one for receiving acks. To simplify the reconnect logic, we +recommend handling the reconnect only on the ack dispatch side. This also has +the added benefit of avoiding duplicate deliveries in the event of a disconnect +during publish. Thus, it's up to end users to implement retries around publish +in order to handle these types of transient failures. The Go implementation of +`dispatchAcks` along with a helper function for re-establishing the +`PublishAsync` connection called `newAsyncStream` is shown below. + +```go +func (c *client) dispatchAcks() { + c.mu.RLock() + asyncStream := c.asyncStream + c.mu.RUnlock() + for { + resp, err := asyncStream.Recv() + if err == io.EOF { + return + } + if err != nil { + stream, ok := c.newAsyncStream() + if !ok { + return + } + asyncStream = stream + c.mu.Lock() + c.asyncStream = stream + c.mu.Unlock() + continue + } + + var correlationID string + if resp.AsyncError != nil { + correlationID = resp.CorrelationId + } else if resp.Ack != nil { + // TODO: Use resp.CorrelationId once Ack.CorrelationId is removed. + correlationID = resp.Ack.CorrelationId + } + ctx := c.removeAckContext(correlationID) + if ctx != nil && ctx.handler != nil { + ctx.handler(ackFromProto(resp.Ack), asyncErrorFromProto(resp.AsyncError)) + } + } +} + +func (c *client) newAsyncStream() (stream proto.API_PublishAsyncClient, ok bool) { + for { + err := c.doResilientRPC(func(client proto.APIClient) error { + resp, err := client.PublishAsync(context.Background()) + if err != nil { + return err + } + stream = resp + return nil + }) + if err == nil { + return stream, true + } + if c.isClosed() { + return nil, false + } + time.Sleep(50 * time.Millisecond) + } +} +``` + +Recall that `dispatchAcks` is invoked as a long-running goroutine on client +initialization. The client also maintains a mapping of correlation ID to +`ackContext`, which is a struct used to store a timer and ack handler for a +given message. When an ack is received from the server, its `ackContext` is +removed from the map and the handler callback is invoked. The timer is used to +implement an ack timeout, which similarly invokes the handler callback but with +an error instead of the ack. + +```go +// ackContext tracks state for an in-flight message expecting an ack. +type ackContext struct { + handler AckHandler + timer *time.Timer +} + +func (c *client) removeAckContext(cid string) *ackContext { + var timer *time.Timer + c.mu.Lock() + ctx := c.ackContexts[cid] + if ctx != nil { + timer = ctx.timer + delete(c.ackContexts, cid) + } + c.mu.Unlock() + // Cancel ack timeout if any. + if timer != nil { + timer.Stop() + } + return ctx +} +``` + +So far, we have covered how the `PublishAsync` streaming connection is managed +and how acks are dispatched. Now we will look at the actual publish +implementation. Both [`Publish`](#publish-implementation) and `PublishAsync` +rely on a private `publishAsync` helper function. + +```go +// PublishAsync publishes a new message to the Liftbridge stream and +// asynchronously processes the ack or error for the message. +func (c *client) PublishAsync(ctx context.Context, stream string, value []byte, + ackHandler AckHandler, options ...MessageOption) error { + + opts := &MessageOptions{Headers: make(map[string][]byte)} + for _, opt := range options { + opt(opts) + } + return c.publishAsync(ctx, stream, value, ackHandler, opts) +} +``` + +This `publishAsync` helper function handles assigning a correlation ID to the +message options (if one is not specified) and constructs a `PublishRequest` for +sending to the server. It then sets up the `ackContext` and sends the +`PublishRequest` to the server using the `PublishAsync` gRPC stream. There is a +client option for configuring a global ack timeout for publishes called +`AckWaitTime`. This can be overridden on individual publishes. In the Go +implementation, shown below, this is done via the `Context` deadline. + +```go +func (c *client) publishAsync(ctx context.Context, stream string, value []byte, + ackHandler AckHandler, opts *MessageOptions) error { + + if opts.CorrelationID == "" { + opts.CorrelationID = nuid.Next() + } + + req, err := c.newPublishRequest(ctx, stream, value, opts) + if err != nil { + return err + } + + c.mu.Lock() + asyncStream := c.asyncStream + if ackHandler != nil { + // Setup ack timeout. + var ( + timeout = c.opts.AckWaitTime + deadline, ok = ctx.Deadline() + ) + if ok { + timeout = time.Until(deadline) + } + ack := &ackContext{ + handler: ackHandler, + timer: time.AfterFunc(timeout, func() { + ackCtx := c.removeAckContext(req.CorrelationId) + // Ack was processed before timeout finished. + if ackCtx == nil { + return + } + if ackCtx.handler != nil { + ackCtx.handler(nil, ErrAckTimeout) + } + }), + } + c.ackContexts[req.CorrelationId] = ack + } + c.mu.Unlock() + + if err := asyncStream.Send(req); err != nil { + c.removeAckContext(req.CorrelationId) + return err + } + + return nil +} +``` + +`newPublishRequest` determines which stream partition to publish to using the +`partition` helper and then constructs the `PublishRequest`. + +```go +func (c *client) newPublishRequest(ctx context.Context, stream string, value []byte, + opts *MessageOptions) (*proto.PublishRequest, error) { + + // Determine which partition to publish to. + partition, err := c.partition(ctx, stream, opts.Key, value, opts) + if err != nil { + return nil, err + } + + return &proto.PublishRequest{ + Stream: stream, + Partition: partition, + Key: opts.Key, + Value: value, + AckInbox: opts.AckInbox, + CorrelationId: opts.CorrelationID, + AckPolicy: opts.AckPolicy.toProto(), + }, nil +} + +// partition determines the partition ID to publish the message to. If a +// partition was explicitly provided, it will be returned. If a Partitioner was +// provided, it will be used to compute the partition. Otherwise, 0 will be +// returned. +func (c *client) partition(ctx context.Context, stream string, key, value []byte, + opts *MessageOptions) (int32, error) { + + var partition int32 + // If a partition is explicitly provided, use it. + if opts.Partition != nil { + partition = *opts.Partition + } else if opts.Partitioner != nil { + // Make sure we have metadata for the stream and, if not, update it. + metadata, err := c.waitForStreamMetadata(ctx, stream) + if err != nil { + return 0, err + } + partition = opts.Partitioner.Partition(stream, key, value, metadata) + } + return partition, nil +} +``` + +The partition to publish to is determined in the following way: + +1. If a partition is explicitly provided using the `ToPartition` message + option, use it. +2. If a `Partitioner` is provided, use it to compute the partition. +3. If neither of the above apply, use partition 0 (the default partition). + +#### Partitioner Implementation + +Clients should provide at least two `Partitioner` implementations as described +above: `PartitionByKey` and `PartitionByRoundRobin`. `PartitionByKey` should +partition messages based on a hash of the message key. `PartitionByRoundRobin` +should partition messages in a round-robin fashion, i.e. cycling through the +partitions. The Go implementations of these are shown below: + +```go +var hasher = crc32.ChecksumIEEE + +// keyPartitioner is an implementation of Partitioner which partitions messages +// based on a hash of the key. +type keyPartitioner struct{} + +// Partition computes the partition number for a given message by hashing the +// key and modding by the number of stream partitions. +func (k *keyPartitioner) Partition(stream string, key, value []byte, metadata *Metadata) int32 { + if key == nil { + key = []byte("") + } + + partitions := metadata.PartitionCountForStream(stream) + if partitions == 0 { + return 0 + } + + return int32(hasher(key)) % partitions +} + +type subjectCounter struct { + sync.Mutex + count int32 +} + +// roundRobinPartitioner is an implementation of Partitioner which partitions +// messages in a round-robin fashion. +type roundRobinPartitioner struct { + sync.Mutex + subjectCounterMap map[string]*subjectCounter +} + +// Partition computes the partition number for a given message in a round-robin +// fashion by atomically incrementing a counter for the message stream and +// modding by the number of stream partitions. +func (r *roundRobinPartitioner) Partition(stream string, key, value []byte, metadata *Metadata) int32 { + partitions := metadata.PartitionCountForStream(stream) + if partitions == 0 { + return 0 + } + r.Lock() + counter, ok := r.streamCounterMap[stream] + if !ok { + counter = new(streamCounter) + r.streamCounterMap[stream] = counter + } + r.Unlock() + counter.Lock() + count := counter.count + counter.count++ + counter.Unlock() + return count % partitions +} +``` + +### PublishToSubject Implementation + +`PublishToSubject` sends a `PublishToSubjectRequest` to the `PublishToSubject` +endpoint using the using the [resilient RPC method](#rpcs) described above. +Unlike [`Publish`](#publish-implementation) and [`PublishAsync`](#publishasync-implementation), +this sets the publish subject directly rather than computing a partition for a +stream. The Go implementation of this is shown below: + +```go +// PublishToSubject publishes a new message to the NATS subject. Note that +// because this publishes directly to a subject, there may be multiple (or no) +// streams that receive the message. As a result, MessageOptions related to +// partitioning will be ignored. To publish at the stream/partition level, use +// the high-level Publish API. +// +// If the AckPolicy is not NONE and a deadline is provided, this will +// synchronously block until the first ack is received. If an ack is not +// received in time, ErrAckTimeout is returned. If an AckPolicy and deadline +// are configured, this returns the first Ack on success, otherwise it returns +// nil. +func (c *client) PublishToSubject(ctx context.Context, subject string, value []byte, + options ...MessageOption) (*Ack, error) { + + opts := &MessageOptions{Headers: make(map[string][]byte)} + for _, opt := range options { + opt(opts) + } + + req := &proto.PublishToSubjectRequest{ + Subject: subject, + Key: opts.Key, + Value: value, + AckInbox: opts.AckInbox, + CorrelationId: opts.CorrelationID, + AckPolicy: opts.AckPolicy.toProto(), + } + + // Setup ack timeout. + var ( + cancel func() + _, ok = ctx.Deadline() + ) + if !ok { + ctx, cancel = context.WithTimeout(ctx, c.opts.AckWaitTime) + defer cancel() + } + + var ack *proto.Ack + err := c.doResilientRPC(func(client proto.APIClient) error { + resp, err := client.PublishToSubject(ctx, req) + if err == nil { + ack = resp.Ack + } + return err + }) + if status.Code(err) == codes.DeadlineExceeded { + err = ErrAckTimeout + } + return ackFromProto(ack), err +} +``` + +### FetchMetadata Implementation + +`FetchMetadata` should return an immutable object which exposes cluster +metadata. This object is passed in to `Partitioner` implementations, but users +can also use it to retrieve stream and server information. Below are the +function signatures for the Go implementation of this object. This may vary +between language implementations. + +```go +// LastUpdated returns the time when this metadata was last updated from the +// server. +func (m *Metadata) LastUpdated() time.Time {} + +// Brokers returns a list of the cluster nodes. +func (m *Metadata) Brokers() []*BrokerInfo {} + +// Addrs returns the list of known broker addresses. +func (m *Metadata) Addrs() []string {} + +// GetStream returns the given stream or nil if unknown. +func (m *Metadata) GetStream(name string) *StreamInfo {} + +// PartitionCountForStream returns the number of partitions for the given +// stream. +func (m *Metadata) PartitionCountForStream(stream string) int32 {} +``` + +`BrokerInfo` is an immutable object containing server information like ID, +host, and port. `StreamInfo` is an immutable object containing stream +information like subject, name, and partitions. + +To simplify client logic, we recommend building an internal abstraction for +dealing with metadata. This internal API should include logic for fetching +metadata from the cluster using the [resilient RPC method](#rpcs) described +above, converting it into the immutable metadata object, and caching it. It +should also provide helper methods for retrieving server addresses (all +addresses, which helps simplify logic for resilient RPCs, and the address for +the leader of a given stream partition, which helps simplify logic for +`Subscribe`). + +With this internal metadata abstraction, the implementation of `FetchMetadata` +is should be trivial, as shown in the Go implementation below. In this case, +the logic of making the metadata RPC, converting it into an immutable metadata +object, and caching it is handled by the `update` function. + +```go +// FetchMetadata returns cluster metadata including broker and stream +// information. +func (c *client) FetchMetadata(ctx context.Context) (*Metadata, error) { + return c.metadata.update(ctx) +} +``` + +While metadata should be cached internally to minimize RPCs, the public +`FetchMetadata` should always force a refresh by going to the server. +Implementations may choose to provide an option for using the cached metadata. +Internally, calls such as `Subscribe` should attempt to use the cached metadata +and refresh it if it appears to be stale, e.g. because the server is no longer +the partition leader. It may also be prudent for clients to periodically +refresh metadata irrespective of these types of errors, perhaps with a +configurable interval. However, the Go client does not currently implement +this. + +### FetchPartitionMetadata Implementation + +`FetchPartitionMetadata` should return an immutable object which exposes +partition metadata. + +*NOTE*: *It is important to keep in mind that the +`FetchPartitionMetadataRequest` should be sent only to the partition leader. It +is the job of the client to figure out which broker is currently the partition +leader. `Metadata` can be used to figure out which broker is currently the +leader of the requested partition*. In Go, this is implemented using a variant +of the [resilient RPC method](#rpcs) described above. + +The object contains information of the partition, notably the high watermark +and newest offset, which is necessary in case the client wants tighter control +over the subscription/publication of messages. + +### SetCursor Implementation + +The `SetCursor` implementation simply constructs a gRPC request and +executes it using the [resilient leader RPC method](#rpcs) described above. +This requires mapping the cursor to a partition in the internal `__cursors` +stream. A cursor is mapped to a partition using its key. The cursor key +consists of `..`. This key is then hashed using +IEEE CRC32 and modded by the number of partitions in the `__cursors` stream. +The Go implementation of this is shown below. + +```go +// SetCursor persists a cursor position for a particular stream partition. +// This can be used to checkpoint a consumer's position in a stream to resume +// processing later. +func (c *client) SetCursor(ctx context.Context, id, stream string, partition int32, offset int64) error { + req := &proto.SetCursorRequest{ + Stream: stream, + Partition: partition, + CursorId: id, + Offset: offset, + } + cursorsPartition, err := c.getCursorsPartition(ctx, c.getCursorKey(id, stream, partition)) + if err != nil { + return err + } + return c.doResilientLeaderRPC(ctx, func(client proto.APIClient) error { + _, err := client.SetCursor(ctx, req) + return err + }, cursorsStream, cursorsPartition) +} + +func (c *client) getCursorKey(cursorID, streamName string, partitionID int32) []byte { + return []byte(fmt.Sprintf("%s,%s,%d", cursorID, streamName, partitionID)) +} + +func (c *client) getCursorsPartition(ctx context.Context, cursorKey []byte) (int32, error) { + // Make sure we have metadata for the cursors stream and, if not, update it. + metadata, err := c.waitForStreamMetadata(ctx, cursorsStream) + if err != nil { + return 0, err + } + + stream := metadata.GetStream(cursorsStream) + if stream == nil { + return 0, errors.New("cursors stream does not exist") + } + + return int32(hasher(cursorKey) % uint32(len(stream.Partitions()))), nil +} +``` + +### FetchCursor Implementation + +The `FetchCursor` implementation simply constructs a gRPC request and +executes it using the [resilient leader RPC method](#rpcs) described above. +This requires mapping the cursor to a partition in the internal `__cursors` +stream. A cursor is mapped to a partition using its key. The cursor key +consists of `..`. This key is then hashed using +IEEE CRC32 and modded by the number of partitions in the `__cursors` stream. +The Go implementation of this is shown below. + +```go +// FetchCursor retrieves a cursor position for a particular stream partition. +// It returns -1 if the cursor does not exist. +func (c *client) FetchCursor(ctx context.Context, id, stream string, partition int32) (int64, error) { + var ( + req = &proto.FetchCursorRequest{ + Stream: stream, + Partition: partition, + CursorId: id, + } + offset int64 + ) + cursorsPartition, err := c.getCursorsPartition(ctx, c.getCursorKey(id, stream, partition)) + if err != nil { + return 0, err + } + err = c.doResilientLeaderRPC(ctx, func(client proto.APIClient) error { + resp, err := client.FetchCursor(ctx, req) + if err != nil { + return err + } + offset = resp.Offset + return nil + }, cursorsStream, cursorsPartition) + return offset, err +} +``` + +The `getCursorKey` and `getCursorsPartition` helper implementations are shown +[above](#setcursor-implementation). + +### Close Implementation + +`Close` should be an idempotent operation which closes any gRPC connections and +connection pools associated with the client. The Go implementation of this is +shown below: + +```go +// Close the client connection. +func (c *client) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + if c.closed { + return nil + } + for _, pool := range c.pools { + if err := pool.close(); err != nil { + return err + } + } + if err := c.conn.Close(); err != nil { + return err + } + c.closed = true + return nil +} +``` diff --git a/website/versioned_docs/version-v1.3.0/clients.md b/website/versioned_docs/version-v1.3.0/clients.md new file mode 100644 index 00000000..0c8fd928 --- /dev/null +++ b/website/versioned_docs/version-v1.3.0/clients.md @@ -0,0 +1,34 @@ +--- +id: version-v1.3.0-clients +title: Clients +original_id: clients +--- + +Currently, there are limited client libraries available for Liftbridge. +However, Liftbridge uses gRPC for its client API, so low-level client libraries +can be generated quite easily using the [Liftbridge protobuf +definitions](https://github.com/liftbridge-io/liftbridge-api). See the [client +implementation guidance](client_implementation.md) documentation for help on +implementing a Liftbridge client library. + +Available client libraries are listed below. Libraries with an asterisk are +official clients. + +## Go + +[go-liftbridge](https://github.com/liftbridge-io/go-liftbridge)* + +## Java +[java-liftbridge](https://github.com/liftbridge-io/java-liftbridge)* + +## Rust + +[liftbridge-rs](https://github.com/liftbridge-io/liftbridge-rs)* + +## Node.js + +[node-liftbridge](https://github.com/paambaati/node-liftbridge) + +## Python + +[python-liftbridge](https://github.com/dgzlopes/python-liftbridge) diff --git a/website/versioned_docs/version-v1.3.0/concepts.md b/website/versioned_docs/version-v1.3.0/concepts.md new file mode 100644 index 00000000..413c489e --- /dev/null +++ b/website/versioned_docs/version-v1.3.0/concepts.md @@ -0,0 +1,378 @@ +--- +id: version-v1.3.0-concepts +title: Concepts +original_id: concepts +--- + +Liftbridge is a durable stream augmentation for NATS, so it's important to +have a good grasp of the [key concepts in +NATS](https://nats-io.github.io/docs/developer/concepts/intro.html). NATS is a +pub/sub messaging system that centers around the concept of *subjects*. Clients +publish messages to subjects and receive messages from *subscriptions* to +subjects. + +Liftbridge is currently compatible with [version 1.x.x of the NATS +protocol](https://github.com/nats-io/nats-general/blob/master/VERSIONING.md). + +## Streams and Partitions + +Fundamentally, Liftbridge is just a consumer of NATS subjects. It receives +messages received from NATS subjects and records them in a durable log which +is then exposed to subscribers. Specifically, Liftbridge centers around the +concept of a *stream*, which is a durable message stream attached to a NATS +subject. A stream consists of one or more *partitions*, which are ordered, +replicated, and durably stored on disk and serve as the unit of storage and +parallelism in Liftbridge. + +Liftbridge relies heavily on the filesystem for storing and caching stream +messages. While disks are generally perceived as slow, they are actually quite +fast in the case of linear reads and writes which is how Liftbridge operates. +As shown in this [ACM Queue article](https://queue.acm.org/detail.cfm?id=1563874), +[sequential disk access can be faster than random memory +access](https://deliveryimages.acm.org/10.1145/1570000/1563874/jacobs3.jpg). +Liftbridge also uses memory mapping for message indexing to allow for efficient +lookups. + +By default, partition data is stored in the `/tmp/liftbridge/` +directory where `namespace` is the cluster namespace used to implement +multi-tenancy for Liftbridge clusters sharing the same NATS cluster. The +default namespace is `liftbridge-default`. It can be changed with the +[`clustering.namespace`](./configuration.md#clustering-configuration-settings) +configuration. Additionally, the full data directory can be overridden with the +[`data.dir`](./configuration.md#configuration-settings) configuration. + +Streams have a few key properties: a subject, which is the corresponding NATS +subject, a name, which is a human-readable identifier for the stream, and a +replication factor, which is the number of nodes the stream's partitions should +be replicated to for redundancy. Optionally, there is a group which is the name +of a load-balance group for the stream to join. When there are multiple streams +in the same group, messages will be balanced among them. + +> **Use Case Note** +> +> The common use case is as follows. A typical subject is the command subject +> (comparable to a Kafka topic) of the CQRS pattern. The corresponding log +> created by this subject is the implementation of the event-sourcing pattern. +> The response of a command being put on a subject is a microservice worker +> reading that command off the subject and executing the command. Subsequently, +> the result of this activity is then posted to another subject, perhaps for +> downstream analytical reporting purposes. This enables the Query in the CQRS +> pattern. A careful reader saw the above remarks on partitioning, replication +> factor, redundancy, load-balance groups. These choices will impact this +> example with microservice workers, since the order of messages and guarantees +> on replication will be affected by them. +> +> Refer to these references for further information on +> [CQRS](https://martinfowler.com/bliki/CQRS.html) and [event +> sourcing](https://martinfowler.com/eaaDev/EventSourcing.html). + +There can be multiple streams attached to the same NATS subject, but stream +names must be unique within a cluster. + +By default, streams have a single partition. This partition maps directly to +the stream's NATS subject. If a stream has multiple partitions, each one maps +to a different NATS subject derived from the stream subject. For example, if a +stream with three partitions is attached to the subject "foo", the partitions +will map to the subjects "foo", "foo.1", and "foo.2", respectively. Please note +the naming convention on these subjects linked to partitions. + +Each partition has its own message log, leader, and set of followers. To reduce +resource consumption, partitions can be [paused](./pausing_streams.md). Paused +partitions are subsequently resumed once they are published to. + +### Write-Ahead Log + +Each stream partition is backed by a durable write-ahead log. All reads and +writes to the log go through the partition *leader*, which is selected by the +cluster [controller](#controller). The leader sequences each message in the +partition and sends back an acknowledgement to publishers upon committing a +message to the log. A message is committed to the log once it has been +replicated to the partition's [in-sync replica set (ISR)](#in-sync-replica-set-isr). + +> **Architect's Note** +> +> A note on configuring for durability and preventing message loss. If a +> message is published with the `LEADER` [ack policy](./ha_and_consistency_configuration.md#ack-policy) +> (the default policy), an ack is sent back to the client as soon as the leader +> has stored the message. If a message is published with the `ALL` ack +> policy, the ack is sent only after all members of the ISR have stored it. +> Thus, if the ISR is unavailable, no ack will be sent indicating there's no +> guarantee the message was committed. A minimum ISR size can also be +> configured to provide a high level of durability, but this creates an +> implicit trade-off with availability. See documentation +> [here](./ha_and_consistency_configuration.md) on configuring for high +> availability and consistency. + +Consumers read committed messages from the log through a subscription on the +partition. They can read back from the log at any arbitrary position, or +*offset*. Additionally, consumers can wait for new messages to be appended +to the log. + +> **Use Case Note** +> +> The interested reader will identify a typical consumer to be a stateless +> microservice worker. The *offset* parameter is of special interest should +> one have consumers with different and independent purposes. For example, a +> reporting consumer could have lower priority when loads are high and an +> operational consumer have higher priority, resulting in different offsets on +> the same subject. +> +> Consequently, different consumers are able to process at their own speed. +> Also, a paused or starved consumer, potentially a Pod in Kubernetes, like the +> potential reporting consumer, could easily pick up where it left off when +> things slow down. Consumers may use [cursors](./cursors.md) to track their +> state, i.e. the offset. In the future, Liftbridge will support durable +> consumer groups which will allow consumers to eschew cursor management. + +### Scalability + +Liftbridge is designed to be clustered and horizontally scalable. The +[controller](#controller) is responsible for creating stream partitions. +When a partition is created, the controller selects replicas based on the +stream's replication factor and replicates the partition to the cluster. +Once this replication completes, the partition has been created and the +leader begins processing messages. + +As mentioned above, there can exist multiple streams attached to the same NATS +subject or even subjects that are semantically equivalent e.g. "foo.bar" and +"foo.*". Each of these streams will receive a copy of the message as NATS +handles this fan-out. + +> **Use Case Note** +> +> Since multiple streams can be attached to the same subject or overlapping +> subjects, this lends itself to creating streams on the same data for +> different purposes or use cases where there might be different operational +> concerns. +> +> An example of this might be creating a stream attached to the subject +> `request.*` which acts as an audit log of all operations. Other streams would +> then be attached to more granular subjects used to perform the actual +> operations, such as `request.temperature`, `request.humidity`, and +> `request.precipitation`. + +With this in mind, we can scale linearly by adding more nodes to the Liftbridge +cluster and creating more streams which will be distributed amongst the +cluster members. This has the advantage that we don't need to worry about +partitioning so long as NATS is able to withstand the load. The downside of +this is that it results in redundant processing of messages. Consumers of each +stream are all processing the same set of messages. The other issue is because +each stream is independent of each other, they have separate guarantees. +Separate leaders/followers, separate ISRs, and separate acking means the logs +for each stream are not guaranteed to be identical, even though they are bound +to the same NATS subject. + +To accommodate this, streams are partitioned. By default, a stream consists of +just a single partition, but multiple partitions can be created for increased +parallelism. Messages can then be delivered to partitions based on their key, +in a round-robin fashion, randomly, or with some other partitioning strategy +on the client. + +> **Architect's Note** +> +> Please note that ordering within the partition is upheld but not across +> partitions. This means that the partitioning strategy is of high importance +> since an aggregated state cannot be achieved for an order-dependent consumer +> subscribing to many streams. A consumer needing total ordering of events +> can only subscribe to many streams if and only if the events are +> uncorrelated/independent between the partitions (and thus have order within a +> partition). +> +> An architect would pay particular interest to ensuring independent and +> stateless workers when applying [domain-driven design](https://dddcommunity.org/book/evans_2003/). +> As a general rule, requiring a strict global ordering of messages will +> severely limit the scalability and performance of a system. + +> **Use Case Note** +> +> One of the main use cases for partitions is implementing horizontal +> scalability. For example, imagine we are implementing analytical processing +> of clickstream events for a website. To scale this system, we could partition +> the stream by user id which would allow us to distribute load across the +> cluster while retaining event ordering per user. + +Additionally, streams can join a named load-balance group, which load balances +messages on a NATS subject amongst the streams in the group. Load-balance +groups do not affect message delivery to other streams not participating in +the group. Load-balance groups are for distributing messages from a subject +amongst a group of streams. + +> **Architect's Note** +> +> Note that the intent of load-balance groups could also be achieved using a +> partitioned stream with a random or round-robin partitioning strategy. +> However, partitioning assumes a user has already "bought in" to Liftbridge +> streams as an architectural component. But imagine a case where there is a +> pre-existing NATS subject that Liftbridge-agnostic services are already +> publishing to, and we want to turn that NATS subject into a durable log that +> Liftbridge-aware services can consume. We could attach a single stream to the +> subject, but if it's high volume, we might need a load-balance group to +> distribute the load across a set of streams. + +Partition replicas in Liftbridge primarily serve as a mechanism for high +availability by providing redundancy of stream data. By default, reads go +through the partition leader. However, consumers can opt-in to reading from +_any_ member of the partition's ISR, including followers, for further scale +out. + +The diagram below shows a cluster of three servers with a set of streams. +Partitions in yellow indicate the server is the leader for the partition. + +![cluster](assets/cluster.png) + +### In-Sync Replica Set (ISR) + +The In-Sync Replica set (ISR) is a key aspect of the replication protocol in +Liftbridge. The ISR consists of the set of partition replicas that are +currently caught up with the leader. It is equivalent to the [ISR concept in +Kafka](https://kafka.apache.org/documentation/#design_replicatedlog), and the +[replication protocol](./replication_protocol.md) works very similarly. + +In order for a message to be committed to a partition's write-ahead log, it +must be acknowledged by all brokers in the ISR. To prevent a single slow +broker from blocking progress, replicas that fall too far behind the leader +are removed from the ISR. The leader does this by making a request to the +controller. In this case, the cluster enters an under-replicated state for +the partition. + +Being "too far behind" is controlled by the `replica.max.lag.time` +configuration. This refers to both the maximum amount of time a replica can go +without making a replication request before it's removed and the amount of time +that can pass without being fully caught up with the leader before it's +removed. When a removed replica catches back up with the leader's log, it is +added back into the ISR and the cluster goes back into its fully replicated +state. + +Under normal conditions, only a replica from the ISR can be elected the leader +of a partition. This favors data consistency over availability since if the ISR +shrinks too far, there is a risk of being unable to elect a new leader. + +### Acknowledgement + +Acknowledgements are an opt-in mechanism to guarantee message delivery. If a +[message envelope](#message-envelope) has an `AckInbox`, Liftbridge will send +an ack to this NATS inbox once the message has been committed. This is used to +ensure at-least-once delivery. + +Messages can also have an optional `CorrelationId`, which is a user-defined +value which is also set on the server ack to correlate it to a published +message. + +There are a couple of things to be aware of with message acknowledgements. +First, if the publisher doesn't care about ensuring its message is stored, it +need not set an `AckInbox`. Second, because there are potentially multiple +(or no) streams attached to a NATS subject (and creation of streams is +dynamic), it's not possible for the publisher to know how many acks to expect. +This is a trade-off we make for enabling subject fan-out and wildcards while +remaining scalable and fast. We make the assertion that if guaranteed delivery +is important, the publisher should be responsible for determining the +destination streams a priori. This allows attaching streams to a subject for +use cases that do not require strong guarantees without the publisher having to +be aware. Note that this might be an area for future improvement to increase +usability. However, this is akin to other similar systems, like Kafka, where +you must first create a topic and then you publish to that topic. + + +> **Use Case Note** +> +> A common use case for a producer not caring if the ack is returned or not +> is an IoT device or sensor. This means that for the sensor, it is not +> important to know if Liftbridge indeed recorded the event. For a more +> regulated system, one could assume acknowledgements are important to the +> producer since the recorded truth now resides within Liftbridge, as is the +> case in an event-sourced system. + +### Subscription + +Subscriptions are how Liftbridge streams are consumed. A client subscribes to a +stream partition and specifies a starting offset to begin consuming from. At +this point, the server creates an ephemeral data stream for the client and +begins sending messages to it. Once it consumes up to the end of the log, the +server will wait for more messages to be published until the subscription is +closed by the client. + +Subscriptions are not stateful objects. When a subscription is created, there +is no bookkeeping done by the server, aside from the in-memory objects tied to +the lifecycle of the subscription. As a result, the server does not track the +position of a client in the log beyond the scope of a subscription. Instead, +Liftbridge provides a [cursors](./cursors.md) API which allows consumers to +checkpoint their position in the log and pick up where they left off. Stateful +consumer groups will be coming in the near future which will provide a more +managed solution to fault-tolerant consumption of streams. + +> **Architect's Note** +> +> This ties back to the previously described reporting worker starved but +> clinging to an *offset* so as not to lose probable state. With _cursors_, the +> reporting worker can be restarted without state but can resume from where it +> left off due to state stored by the server via the cursors API. When stateful +> consumer groups are implemented, this will be entirely transparent to the +> consumer. + +### Stream Retention and Compaction + +Streams support multiple log-retention rules: age-based, message-based, and +size-based. This, for example, allows semantics like "retain messages for 24 +hours", "retain 100GB worth of messages", or "retain 1,000,000 messages". + +Additionally, Liftbridge supports log *compaction*. Publishers can, optionally, +set a *key* on a [message envelope](#message-envelope). A stream can be +configured to compact by key. In this case, it retains only the last message +for each unique key. Messages that do not have a key are always retained. + +> **Architect's Note** +> +> From an architectural point of view, the choice here is to compact as much as +> possible without losing state (aggregation of events). Lineage is taken care +> of by the stream log if stored, for example, in an S3 bucket. + +## Activity Stream + +The activity stream is a Liftbridge stream that exposes internal meta-events +that have occurred in the cluster such as streams being created, deleted, +paused, or resumed. This allows clients to dynamically react to changes in +cluster state. See the activity stream [documentation](./activity.md) for more +information. + +## Controller + +The controller is the metadata leader for the cluster. Specifically, it is the +*Raft* leader. All operations which require cluster coordination, such as +creating streams, expanding ISRs, shrinking ISRs, or electing stream leaders, +go through the controller and, subsequently, Raft to ensure linearizability. +Raft automatically handles failing over the controller in the event of a +failure for high availability. + +Note that in order for the controller to make progress, a quorum (majority) of +the brokers must be running. + +Controller is also referred to as "metadata leader" in some contexts. There is +only a single controller (i.e. leader) at a given time which is elected by the +Liftbridge cluster. + +> **Architect's Note** +> +> Guidance on cluster size depends, but one important point here is that, +> currently, all servers in the cluster participate in the Raft consensus +> group. This has implications on the scalability of the cluster control plane, +> which there are [plans to address](https://github.com/liftbridge-io/liftbridge/issues/41) +> in the future. +> +> General advice is to run an odd number of servers in the cluster, e.g. 3 or +> 5, depending on scaling needs. Ideally, cluster members are run in different +> availability zones or racks for improved fault-tolerance. + +## Message Envelope + +Liftbridge extends NATS by allowing regular NATS messages to flow into durable +streams. This can be completely transparent to publishers. However, it also +allows publishers to *enhance* messages by providing additional metadata and +serializing their messages into [*envelopes*](./envelope_protocol.md). An +envelope allows publishers to set things like the `AckInbox`, `Key`, `Headers`, +and other pieces of metadata. + +## Further Reading + +A final note is to read ["Designing Event-Driven +Systems"](http://www.benstopford.com/2018/04/27/book-designing-event-driven-systems/) +by Ben Stopford for inspiration. diff --git a/website/versioned_docs/version-v1.3.0/configuration.md b/website/versioned_docs/version-v1.3.0/configuration.md new file mode 100644 index 00000000..539d534e --- /dev/null +++ b/website/versioned_docs/version-v1.3.0/configuration.md @@ -0,0 +1,220 @@ +--- +id: version-v1.3.0-configuration +title: Configuration +original_id: configuration +--- + +Liftbridge provides limited configuration through command-line flags and full +configuration using a configuration file. Flags will always take precedent +over settings in the configuration file. + +The configuration file is passed in using the `--config` flag: + +```shell +$ liftbridge --config liftbridge.yaml +``` + +To get a full list of the CLI flags, use the `--help` flag: + +```shell +$ liftbridge --help +NAME: + liftbridge - Lightweight, fault-tolerant message streams + +USAGE: + liftbridge [global options] command [command options] [arguments...] + +VERSION: + v1.3.0 + +COMMANDS: + help, h Shows a list of commands or help for one command + +GLOBAL OPTIONS: + --config FILE, -c FILE load configuration from FILE + --server-id value, --id value ID of the server in the cluster if there is no stored ID (default: random ID) + --namespace value, --ns value cluster namespace (default: "liftbridge-default") + --nats-servers ADDR[,ADDR], -n ADDR[,ADDR] connect to NATS cluster at ADDR[,ADDR] (default: "nats://127.0.0.1:4222") + --data-dir DIR, -d DIR store data in DIR (default: "/tmp/liftbridge/") + --port value, -p value port to bind to (default: 9292) + --tls-cert value server certificate file + --tls-key value private key for server certificate + --level value, -l value logging level [debug|info|warn|error] (default: "info") + --raft-bootstrap-seed bootstrap the Raft cluster by electing self as leader if there is no existing state + --raft-bootstrap-peers value bootstrap the Raft cluster with the provided list of peer IDs if there is no existing state + --help, -h show help + --version, -v print the version +``` + +## Configuration File Format + +The configuration file uses a YAML format. The configuration settings are +described below. Settings follow a hierarchical pattern delimited by periods. +The full, flattened name or nested names can be used. For example: + +```yaml +logging.level: debug +logging.recovery: true +logging.raft: true +``` + +The above configuration is equivalent to: + +```yaml +logging: + level: debug + recovery: true + raft: true +``` + +## Example Configuration File + +An example configuration file is shown below. + +```yaml +--- +listen: localhost:9293 +host: localhost +data.dir: /tmp/liftbridge/server-2 +activity.stream.enabled: true + +# Configure logging. +logging: + level: debug + raft: true + +# Define NATS cluster to connect to. +nats.servers: + - nats://localhost:4300 + - nats://localhost:4301 + +# Specify message stream settings. +streams: + retention.max: + age: 24h + messages: 100 + compact.enabled: true + +# Specify cluster settings. +clustering: + server.id: server-2 + raft.bootstrap.seed: true + replica.max.lag.time: 20s +``` + +## Overriding configuration settings with environment variables + +For configuration set in the configuration file the value can be overridden +with environment variables prefixed with `LIFTBRIDGE_`. The key must exist in +the config file to be overridden. + +For example using the config file from above one could override the host and +logging level with: + +```sh +env LIFTBRIDGE_HOST=liftbridge.example.com \ + LIFTBRIDGE_LOGGING_LEVEL=error \ + liftbridge --config config.yaml +``` + +## Configuration Settings + +Below is the list of Liftbridge configuration settings, including the name of +the setting in the configuration file and the CLI flag if it exists. + +| Name | Flag | Description | Type | Default | Valid Values | +|:----|:----|:----|:----|:----|:----| +| listen | | The server listen host/port. This is the host and port the server will bind to. If this is not specified but `host` and `port` are specified, these values will be used. If neither `listen` nor `host`/`port` are specified, the default listen address will be used. | string | 0:0:0:0:9292 | | +| host | | The server host that is advertised to clients, i.e. the address clients will attempt to connect to based on metadata API responses. If not set, `listen` will be returned to clients. This value may differ from `listen` in situations where the external address differs from the internal address, e.g. when running in a container. If `listen` is not specified, the server will also bind to this host. | string | localhost | | +| port | port | The server port that is advertised to clients. See `host` for more information on how this behaves. | int | 9292 | | +| tls.key | tls-key | The private key file for server certificate. This must be set in combination with `tls.cert` to enable TLS. | string | | +| tls.cert | tls-cert | The server certificate file. This must be set in combination with `tls.key` to enable TLS. | string | | +| tls.client.auth.enabled | tls-client-auth | Enforce client-side authentication via certificate. | bool | false | +| tls.client.auth.ca | tls-client-auth-ca | The CA certificate file to use when authenticating clients. | string | | +| logging.level | level | The logging level. | string | info | [debug, info, warn, error] | +| logging.recovery | | Log messages resulting from the replay of the Raft log on server recovery. | bool | false | | +| logging.raft | | Enables logging in the Raft subsystem. | bool | false | | +| data.dir | data-dir | The directory to store data in. | string | /tmp/liftbridge/namespace | | +| batch.max.messages | | The maximum number of messages to batch when writing to disk. | int | 1024 | +| batch.max.time | | The maximum time to wait to batch more messages when writing to disk. | duration | 0 | | +| metadata.cache.max.age | | The maximum age of cached broker metadata. | duration | 2m | | +| nats | | NATS configuration. | map | | [See below](#nats-configuration-settings) | +| streams | | Write-ahead log configuration for message streams. | map | | [See below](#streams-configuration-settings) | +| clustering | | Broker cluster configuration. | map | | [See below](#clustering-configuration-settings) | +| activity | | Meta activity event stream configuration. | map | | [See below](#activity-configuration-settings) | +| cursors | | Cursor management configuration. | map | | [See below](#cursors-configuration-settings) | + +### NATS Configuration Settings + +Below is the list of the configuration settings for the `nats` section of +the configuration file. + +| Name | Flag | Description | Type | Default | Valid Values | +|:----|:----|:----|:----|:----|:----| +| servers | nats-servers | List of NATS hosts to connect to. | list | nats://localhost:4222 | | +| user | | Username to use to connect to NATS servers. | string | | | +| password | | Password to use to connect to NATS servers. | string | | | +| tls.cert | | Path to NATS certificate file. | string | | | +| tls.key | | Path to NATS key file. | string | | | +| tls.ca | | Path to NATS CA Root file. | string | | | + +### Streams Configuration Settings + +Below is the list of the configuration settings for the `streams` section of the +configuration file. These settings are applied globally to all streams. +However, streams can be individually configured when they are created, +overriding these settings. + +| Name | Flag | Description | Type | Default | Valid Values | +|:----|:----|:----|:----|:----|:----| +| retention.max.bytes | | The maximum size a stream's log can grow to, in bytes, before we will discard old log segments to free up space. A value of 0 indicates no limit. | int64 | 0 | | +| retention.max.messages | | The maximum size a stream's log can grow to, in number of messages, before we will discard old log segments to free up space. A value of 0 indicates no limit. | int64 | 0 | | +| retention.max.age | | The TTL for stream log segment files, after which they are deleted. A value of 0 indicates no TTL. | duration | 168h | | +| cleaner.interval | | The frequency to check if a new stream log segment file should be rolled and whether any segments are eligible for deletion based on the retention policy or compaction if enabled. | duration | 5m | | +| segment.max.bytes | | The maximum size of a single stream log segment file in bytes. Retention is always done a file at a time, so a larger segment size means fewer files but less granular control over retention. | int64 | 268435456 | | +| segment.max.age | | The maximum time before a new stream log segment is rolled out. A value of 0 means new segments will only be rolled when `segment.max.bytes` is reached. Retention is always done a file at a time, so a larger value means fewer files but less granular control over retention. | duration | value of `retention.max.age` | | +| compact.enabled | | Enables stream log compaction. Compaction works by retaining only the latest message for each key and discarding older messages. The frequency in which compaction runs is controlled by `cleaner.interval`. | bool | false | | +| compact.max.goroutines | | The maximum number of concurrent goroutines to use for compaction on a stream log (only applicable if `compact.enabled` is `true`). | int | 10 | | +| auto.pause.time | | The amount of time a stream partition can go idle, i.e. not receive a message, before it is automatically paused. A value of 0 disables auto pausing. | duration | 0 | | +| auto.pause.disable.if.subscribers | | Disables automatic stream partition pausing when there are subscribers. | bool | false | | + +### Clustering Configuration Settings + +Below is the list of the configuration settings for the `clustering` section of +the configuration file. + +| Name | Flag | Description | Type | Default | Valid Values | +|:----|:----|:----|:----|:----|:----| +| server.id | server-id | ID of the server in the cluster. | string | random id | string with no spaces or periods | +| namespace | namespace | Cluster namespace. | string | liftbridge-default | string with no spaces or periods | +| raft.snapshot.retain | | The number Raft log snapshots to retain on disk. | int | 2 | | +| raft.snapshot.threshold | | Controls how many outstanding logs there must be before taking a snapshot. This prevents excessive snapshots when a small set of logs can be replayed. | int | 8192 | | +| raft.cache.size | | The number of Raft logs to hold in memory for quick lookup. | int | 512 | | +| raft.bootstrap.seed | raft-bootstrap-seed | Bootstrap the Raft cluster by electing self as leader if there is no existing state. If this is enabled, `raft.bootstrap.peers` should generally not be used, either on this node or peer nodes, since cluster topology is not being explicitly defined. Instead, peers should be started without bootstrap flags which will cause them to automatically discover the bootstrapped leader and join the cluster. | bool | false | | +| raft.bootstrap.peers | raft-bootstrap-peers | Bootstrap the Raft cluster with the provided list of peer IDs if there is no existing state. This should generally not be used in combination with `raft.bootstrap.seed` since it is explicitly defining cluster topology and the configured topology will elect a leader. Note that once the cluster is established, new nodes can join without setting bootstrap flags since they will automatically discover the elected leader and join the cluster. | list | | | +| replica.max.lag.time | | If a follower hasn't sent any replication requests or hasn't caught up to the leader's log end offset for at least this time, the leader will remove the follower from ISR. | duration | 15s | | +| replica.max.leader.timeout | | If a leader hasn't sent any replication responses for at least this time, the follower will report the leader to the controller. If a majority of the replicas report the leader, a new leader is selected by the controller. | duration | 15s | | +| replica.max.idle.wait | | The maximum amount of time a follower will wait before making a replication request once the follower is caught up with the leader. This value should always be less than `replica.max.lag.time` to avoid frequent shrinking of ISR for low-throughput streams. | duration | 10s | | +| replica.fetch.timeout | | Timeout duration for follower replication requests. | duration | 3s | | +| min.insync.replicas | | Specifies the minimum number of replicas that must acknowledge a stream write before it can be committed. If the ISR drops below this size, messages cannot be committed. | int | 1 | [1,...] | + +### Activity Configuration Settings + +Below is the list of the configuration settings for the `activity` section of +the configuration file. + +| Name | Flag | Description | Type | Default | Valid Values | +|:----|:----|:----|:----|:----|:----| +| stream.enabled | | Enables the activity stream. This will create an internal stream called `__activity` which events will be published to. | bool | false | | +| stream.publish.timeout | | The timeout for publishes to the activity stream. This is the time to wait for an ack from the activity stream, which means it's related to `stream.publish.ack.policy`. If the ack policy is `none`, this has no effect. | duration | 5s | | +| stream.publish.ack.policy | | The ack policy to use for publishes to the activity stream. The value `none` means publishes will not wait for an ack, `leader` means publishes will wait for the ack sent when the leader has committed the event, and `all` means publishes will wait for the ack sent when all replicas have committed the event. | string | all | [none, leader, all] | + +### Cursors Configuration Settings + +Below is the list of the configuration settings for the `cursors` section of +the configuration file. + +| Name | Flag | Description | Type | Default | Valid Values | +|:----|:----|:----|:----|:----|:----| +| stream.partitions | | Sets the number of partitions for the internal `__cursors` stream which stores consumer cursors. A value of 0 disables the cursors stream. This cannot be changed once it is set. | int | 0 | | +| stream.auto.pause.time | | The amount of time a partition in the internal `__cursors` stream can go idle, i.e. not receive a cursor update or fetch, before it is automatically paused. A value of 0 disables auto pausing. | duration | 1m | | diff --git a/website/versioned_docs/version-v1.3.0/cursors.md b/website/versioned_docs/version-v1.3.0/cursors.md new file mode 100644 index 00000000..55bb9d40 --- /dev/null +++ b/website/versioned_docs/version-v1.3.0/cursors.md @@ -0,0 +1,99 @@ +--- +id: version-v1.3.0-cursors +title: Cursors +original_id: cursors +--- + +> **NOTE:** The cursors API is currently in beta. This includes the `SetCursor` +> and `FetchCursor` endpoints. + +Liftbridge partitions do not maintain any state on consumers such as tracking +where consumers are in the log. This reduces complexity on the server and makes +subscriptions more efficient. However, Liftbridge provides an API for storing +consumer partition cursors. A _cursor_ is simply an offset position into a log +which is uniquely identified by an ID. This allows consumers to checkpoint +their position in a partition such that they can resume processing later or +pick up where they left off in the event of a failure. + +Cursor management is exposed through the Liftbridge [gRPC +API](https://github.com/liftbridge-io/liftbridge-api/blob/master/api.proto). +The `SetCursor` endpoint takes a `SetCursorRequest` which specifies the +identifier, stream, partition, and offset for the cursor. On success, this will +have durably stored the partition cursor. `FetchCursor` is then used to +retrieve an offset for a given cursor. + +An example of this is shown below, in which a cursor is retrieved to resume the +subscription and is subsequently updated each time a message is processed. + +```go +var ( + ctx = context.Background() + cursorID = "mycursor" + stream = "foo" +) +// Retrieve offset to resume processing from. +offset, err := client.FetchCursor(ctx, cursorID, stream, 0) +if err != nil { + panic(err) +} +if err := client.Subscribe(ctx, stream, func(msg *lift.Message, err error) { + if err != nil { + panic(err) + } + fmt.Println(msg.Timestamp(), msg.Offset(), string(msg.Key()), string(msg.Value())) + // Checkpoint current position. + if err := client.SetCursor(ctx, cursorID, stream, msg.Partition(), msg.Offset()); err != nil { + panic(err) + } +}, lift.StartAtOffset(offset+1)); err != nil { + panic(err) +} +<-ctx.Done() +``` + +This is a low-level API that is used to durably store a partition cursor. Users +must determine how often to checkpoint cursors. This is a balance between +optimizing for processing performance (frequent checkpointing will reduce +performance) and optimizing for reprocessing of messages in the event of a +consumer failover (infrequent checkpointing will result in more messages being +reprocessed upon consumer recovery). + +This low-level API will serve as a building block for higher-level consumer +functionality such as consumer groups. This will allow consumers to reliably +consume streams without having to manage cursors at all. + +## Configuring Cursor Management + +Cursors are stored in an internal Liftbridge stream named `__cursors`. This +stream is not intended to be accessed by consumers directly. The stream is +partitioned by cursor key. Keys are of the following form: + +```plaintext +,, +``` + +This means that cursors for the same ID are stored in the same partition. The +`__cursors` stream is also compacted by key such that only the most recent +versions of cursors are retained. Due to this partitioning, clients must send +`SetCursor` and `FetchCursor` requests to the respective partition leader. This +mapping should be implemented by client libraries. + +Configuration settings for cursor management are grouped under the `cursors` +namespace. See the full list of configuration options +[here](./configuration.md#cursors-configuration-settings). + +By default, the internal `__cursors` stream is disabled. To enable it, you must +set the number of partitions to something greater than 0: + +```yaml +cursors.stream.partitions: 10 +``` + +Additionally, partitions in the `__cursors` stream automatically pause when +they go idle (i.e. do not receive a cursor update or fetch) for 1 minute by +default. This can be changed or disabled with the +`cursors.stream.auto.pause.time` setting: + +```yaml +cursors.stream.auto.pause.time: 0 +``` diff --git a/website/versioned_docs/version-v1.3.0/feature_comparison.md b/website/versioned_docs/version-v1.3.0/feature_comparison.md new file mode 100644 index 00000000..fb6eb33a --- /dev/null +++ b/website/versioned_docs/version-v1.3.0/feature_comparison.md @@ -0,0 +1,41 @@ +--- +id: version-v1.3.0-feature-comparison +title: Feature Comparison +original_id: feature-comparison +--- + +Liftbridge shares some similarities with other stream-oriented pub/sub +messaging systems. The below feature-comparison matrix shows how it compares to +a few of these systems. + +> **Architect's Note** +> +> These are highly nuanced pieces of infrastructure, so a straight comparison +> of features is often of minimal value when evaluating solutions. This matrix +> is primarily meant to serve as a reference point for those attempting to +> develop a mental model of Liftbridge. + +| | Liftbridge | NATS Streaming | Apache Kafka | Apache Pulsar | +|:----|:----|:----|:----|:----| +| Runtime | self-contained binary | self-contained binary | JVM | JVM | +| At-least-once delivery | ✓ | ✓ | ✓ | ✓ | +| Transactions | | | ✓ | | +| Partitioning | ✓ | | ✓ | ✓ | +| Linear Horizontal Scalability | ✓ | | ✓ | ✓ | +| Clustering/Replication | ✓ | ✓ | ✓ | ✓ | +| External Coordination | | | Apache ZooKeeper | Apache ZooKeeper | +| Message Replay | ✓ | ✓ | ✓ | ✓ | +| Message Queueing | | ✓ | | ✓ | +| Wildcard Topic Matching | ✓ | | | | +| Topic Pausing | ✓ | | | | +| Rate Limiting | | opt-in rate matching | quotas | publish rate limiting | +| Log Retention | ✓ | ✓ | ✓ | ✓ | +| Log Compaction | ✓ | | ✓ | ✓ | +| Message Headers | ✓ | | ✓ | ✓ | +| Multiple Consumers | independent subscribers | queue subscriptions | consumer groups | subscriptions | +| Consumer Position Tracking | cursors | durable subscriptions | consumer groups | cursors | +| Event-enabled | activity stream | | | | +| Multitenancy | namespaced clusters | namespaced clusters | topic-level ACLs | multitenant shared cluster | +| Authentication | TLS | User authentication, TLS | TLS, Kerberos, SASL | TLS, Kerberos, JWT, Athenz | +| Authorization | | | ACLs | ACLs | +| Storage | filesystem | memory, filesystem, SQL| filesystem | filesystem (Apache BookKeeper), tiered storage (Amazon S3, Google Cloud Storage) | diff --git a/website/versioned_docs/version-v1.3.0/pausing_streams.md b/website/versioned_docs/version-v1.3.0/pausing_streams.md new file mode 100644 index 00000000..a1fb3f52 --- /dev/null +++ b/website/versioned_docs/version-v1.3.0/pausing_streams.md @@ -0,0 +1,52 @@ +--- +id: version-v1.3.0-pausing-streams +title: Pausing Streams +original_id: pausing-streams +--- + +Liftbridge streams can be *paused* to conserve system resources such as CPU, +memory, and file descriptors. While pausing is performed on streams, the +granularity is specified at the partition level. We can pause all or a subset +of a stream's partitions. A partition is resumed when it is published to via +the Liftbridge `Publish` or `PublishAsync` gRPC endpoints or if the stream was +paused with `ResumeAll` enabled and another partition in the stream was +published to. + +> **Use Case Note** +> +> Stream pausing can be useful for cases that involve a significant number of +> streams with only a small fraction being active at any given point in time, +> i.e. "sparse" streams. With the [activity stream](./activity.md), consumers +> can dynamically spin down when partitions are paused and spin back up once +> they are resumed. + +Pause functionality is exposed through the Liftbridge [gRPC +API](https://github.com/liftbridge-io/liftbridge-api/blob/master/api.proto). +The `PauseStream` endpoint takes a `PauseStreamRequest` which specifies the +stream and set of partitions to pause. If no partitions are specified, the +operation will pause _all_ of the stream's partitions. Additionally, the +request includes a `ResumeAll` flag which indicates if all partitions should be +resumed when one is published to or only the partition that was published to. + +When a partition is paused, the server will step down as leader or follower, +unsubscribe from the NATS subject, and close the commit log. This means +replication will stop, messages will not be received on the NATS subject, and +any file handles associated with the partition will be closed. + +Pausing is maintained across server restarts. + +## Auto Pausing + +In addition to the pause API, streams can be configured to automatically pause +partitions when they go idle, meaning no messages are received on the partition +within a specified period of time. This is configured globally using the +`streams.auto.pause.time` setting which applies the pause timeout for all +streams. By default, this is disabled. This can also be overridden on +individual streams when they are created. + +The `auto.pause.disable.if.subscribers` setting controls if the automatic +partition pausing should be disabled if there is any subscribers to the +partition. This is disabled by default. + +Only the idle partitions within a stream are paused. These partitions are +resumed when published to via the Liftbridge API. diff --git a/website/versioned_docs/version-v1.3.0/quick_start.md b/website/versioned_docs/version-v1.3.0/quick_start.md new file mode 100644 index 00000000..20ee121c --- /dev/null +++ b/website/versioned_docs/version-v1.3.0/quick_start.md @@ -0,0 +1,105 @@ +--- +id: version-v1.3.0-quick-start +title: Quick Start +original_id: quick-start +--- + +There are three ways to get started running Liftbridge on your machine: +[downloading a pre-built binary](#binary), [building from source](#building-from-source), +or [running a Docker container](#docker-container). There are also several +options for running a Liftbridge cluster [described below](#running-a-liftbridge-cluster-locally). + +## Binary + +A pre-built Liftbridge binary can be downloaded for a specific platform from +the [releases](https://github.com/liftbridge-io/liftbridge/releases) page. Once +you have installed the binary, refer to the steps under [Building From +Source](#building-from-source) for running the server. + +## Building From Source + +A Liftbridge binary can be built and installed from source using +[Go](https://golang.org/doc/install). Follow the below step to install from +source. + +```shell +$ go get github.com/liftbridge-io/liftbridge +``` +*Liftbridge uses [Go modules](https://github.com/golang/go/wiki/Modules), so +ensure this is enabled, e.g. `export GO111MODULE=on`.* + +Liftbridge currently relies on an externally running +[NATS server](https://github.com/nats-io/gnatsd). By default, it will connect +to a NATS server running on localhost. The `--nats-servers` flag allows +configuring the NATS server(s) to connect to. + +Also note that Liftbridge is clustered by default and relies on Raft for +coordination. This means a cluster of three or more servers is normally run +for high availability, and Raft manages electing a leader. A single server is +actually a cluster of size 1. For safety purposes, the server cannot elect +itself as leader without using the `--raft-bootstrap-seed` flag, which will +indicate to the server to elect itself as leader. This will start a single +server that can begin handling requests. **Use this flag with caution as it should +only be set on one server when bootstrapping a cluster.** + +```shell +$ liftbridge --raft-bootstrap-seed +INFO[2020-10-15 14:29:50] Liftbridge Version: v1.3.0 +INFO[2020-10-15 14:29:50] Server ID: 4nbhBr66WnRsy0I5oKF9bo +INFO[2020-10-15 14:29:50] Namespace: liftbridge-default +INFO[2020-10-15 14:29:50] Default Retention Policy: [Age: 1 week, Compact: false] +INFO[2020-10-15 14:29:50] Default Partition Pausing: disabled +INFO[2020-10-15 14:29:50] Starting server on 0.0.0.0:9292... +INFO[2020-10-15 14:29:51] Server became metadata leader, performing leader promotion actions +``` + +Once a leader has been elected, other servers will automatically join the cluster. +We set the `--data-dir` and `--port` flags to avoid clobbering the first server. + +```shell +$ liftbridge --data-dir /tmp/liftbridge/server-2 --port=9293 +INFO[2020-10-15 14:30:48] Liftbridge Version: v1.3.0 +INFO[2020-10-15 14:30:48] Server ID: lbW05esZTab3guEwcmWD9M +INFO[2020-10-15 14:30:48] Namespace: liftbridge-default +INFO[2020-10-15 14:30:48] Default Retention Policy: [Age: 1 week, Compact: false] +INFO[2020-10-15 14:30:48] Default Partition Pausing: disabled +INFO[2020-10-15 14:30:48] Starting server on 0.0.0.0:9293... +``` + +We can also bootstrap a cluster by providing the explicit cluster configuration. +To do this, we provide the IDs of the participating peers in the cluster using the +`--raft-bootstrap-peers` flag. Raft will then handle electing a leader. + +```shell +$ liftbridge --raft-bootstrap-peers server-2,server-3 +``` + +## Docker Container + +Instead of running a binary, you can run Liftbridge using a container. There is +a [container image](https://hub.docker.com/r/liftbridge/standalone-dev) +available which runs an instance of Liftbridge and NATS inside a single Docker +container. This is meant for development and testing purposes. Use the +following Docker commands to run this container: + +```shell +$ docker pull liftbridge/standalone-dev +$ docker run -d --name=liftbridge-main -p 4222:4222 -p 9292:9292 -p 8222:8222 -p 6222:6222 liftbridge/standalone-dev +``` + +This will run the container which will start both the NATS and Liftbridge +servers. To check the logs to see if the container started properly, run: + +```shell +$ docker logs liftbridge-main +``` + +See the [deployment guide](./deployment.md) for more information. + +## Running a Liftbridge Cluster Locally + +The quickest way to get a multi-node Liftbridge cluster up and running on your +machine is with either [Docker Compose](https://docs.docker.com/compose) or +[Kind](https://kind.sigs.k8s.io) (Kubernetes in Docker). Follow the +[deployment guide](./deployment.md) for help running a cluster locally for +development or testing. diff --git a/website/versioned_docs/version-v1.3.0/roadmap.md b/website/versioned_docs/version-v1.3.0/roadmap.md new file mode 100644 index 00000000..b264d9aa --- /dev/null +++ b/website/versioned_docs/version-v1.3.0/roadmap.md @@ -0,0 +1,157 @@ +--- +id: version-v1.3.0-roadmap +title: Product Roadmap +original_id: roadmap +--- + +This roadmap is not intended to provide a firm timeline or commitment of +features and capabilities, nor does it include the comprehensive set of planned +work. Its purpose is to offer visibility into the major items that are planned +and the rough sequencing of them. It also is intended to lay out the vision and +direction of the project. Items not included in the roadmap may still be +implemented. Similarly, items in the roadmap may end up not being implemented +or may look different than as described here. As such, the roadmap exists as a +living document and will be updated as things evolve. + +Please [create an issue](https://github.com/liftbridge-io/liftbridge/issues/new) +to provide input on the roadmap or refer to [existing issues](https://github.com/liftbridge-io/liftbridge/issues) +to comment on particular roadmap items. + +## H2 2020 + +### ~~Stream-Level Configuration ([#75](https://github.com/liftbridge-io/liftbridge/issues/75))~~ + +Allow users to configure and override server defaults for individual streams +via the gRPC API. To date, streams are configured globally using the Liftbridge +configuration. + +### ~~Asynchronous Publish API ([#215](https://github.com/liftbridge-io/liftbridge/issues/215))~~ + +The current publish endpoint is synchronous, meaning each publish is a full +server roundtrip. To enable higher throughput of messages, implement an +asynchronous publish endpoint that allows for pipelined publishes. This +endpoint will ultimately replace the synchronous endpoint since the synchronous +behavior can be implemented on top of the async API. At this point, the +synchronous endpoint will be deprecated. + +### ~~Automatic Partition Pausing ([#218](https://github.com/liftbridge-io/liftbridge/issues/218))~~ + +Allow partitions to be automatically paused when they go idle to support large +numbers of sparse streams. Users can configure the max time with no messages +before a partition is paused at the stream level or globally. This is needed to +enable the [Consumer Position Tracking API](#consumer-position-tracking-api-214httpsgithubcomliftbridge-ioliftbridgeissues214) +such that consumer offsets can be stored in an internal, partitioned stream +efficiently. + +### ~~Command-Line Tool ([#249](https://github.com/liftbridge-io/liftbridge/issues/249))~~ + +Provide a CLI tool for interacting with a Liftbridge cluster in order to +perform basic operational tasks including performance evaluation, maintenance, +and statistics gathering. + +### ~~Consumer Position Tracking API ([#214](https://github.com/liftbridge-io/liftbridge/issues/214))~~ + +Provide a means for “durable” stream subscriptions which allows consumers to +maintain their position within a stream without having to manage the positions +themselves. This initial work involves implementing a low-level API for storing +and retrieving stream cursors to allow manual checkpointing. This set of +primitives will enable higher-level functionality like [Consumer +Groups](#consumer-groups-46httpsgithubcomliftbridge-ioliftbridgeissues46), +which will provide managed checkpointing and consumer coordination. + +### Consumer Groups ([#46](https://github.com/liftbridge-io/liftbridge/issues/46)) + +Provide high-level consumer functionality to allow for durable subscriptions, +balanced stream consumption, and fault-tolerant consumers. The first step to +this will be providing support for single-member consumer groups, which will +effectively provide a durable subscription, then generalizing to multi-member +groups. The latter will require a mechanism for group coordination. + +### ~~Expose Offset Information in Metadata ([#111](https://github.com/liftbridge-io/liftbridge/issues/111))~~ + +Include partition offset information, such as the log-end offset (LEO) and high +watermark (HW), in cluster metadata. This information is useful in a variety of +use cases, such as allowing single-producer streams to know when the +end-of-stream has been reached and to support [Optimistic Concurrency +Control](#optimistic-concurrency-control-54httpsgithubcomliftbridge-ioliftbridgeissues54). + +### Optimistic Concurrency Control ([#54](https://github.com/liftbridge-io/liftbridge/issues/54)) + +Implement a CAS primitive for optimistic concurrency control on publishes. This +enables several use cases where there is one (or a small number) of publishers +for a given partition, such as idempotent publishes, monotonic writes, and +transaction logs. + +## H1 2021 + +### Monitoring API ([#222](https://github.com/liftbridge-io/liftbridge/issues/222)) + +Provide an API that exposes monitoring information and metrics about the server +to better support Liftbridge operations. This paves the way for future +monitoring and observability integrations. + +### Embedded NATS Server ([#19](https://github.com/liftbridge-io/liftbridge/issues/19)) + +Allow the Liftbridge server to run an embedded instance of NATS rather than +relying on an external NATS server. This allows Liftbridge to run as a fully +self-contained process without the need for external dependencies. It also lays +the groundwork for [Optional NATS API](#optional-nats-api-221httpsgithubcomliftbridge-ioliftbridgeissues221) +which turns NATS into an opt-in implementation detail. Eventually, Liftbridge +can be used fully on its own without NATS by allowing clients to transition to +the Liftbridge API. + +### Quorum Size Limit ([#41](https://github.com/liftbridge-io/liftbridge/issues/41)) + +Currently, all servers in a cluster participate in the metadata Raft quorum. +This severely limits scalability of the cluster. Allow having a subset of +servers form the Raft quorum to increase scalability. This involves having +non-quorum servers run as non-voting members of the cluster. + +### Authentication and Authorization ([#36](https://github.com/liftbridge-io/liftbridge/issues/36)) + +Provide mechanisms for client identity and authentication as well as granular +access control. These will likely be delivered as separate units of work with +authentication providing the groundwork for fine-grained authorization. As a +result, these may be broken out into separate roadmap items in the near future. + +### Encryption at Rest ([#217](https://github.com/liftbridge-io/liftbridge/issues/217)) + +Support encryption of stream data stored on disk. This may also include +encrypting metadata such as the Raft log. + + +## H2 2021 + +### Tiered Storage ([#110](https://github.com/liftbridge-io/liftbridge/issues/110)) + +Provide support for transparent offloading of log segments to object storage, +such as Amazon S3, so that brokers only need to keep open (or some configurable +tail of) segments locally on disk. When a consumer needs to read older +segments, the broker or, potentially, the client transparently reads from +object storage. + +### Kafka Bridge Connector ([#220](https://github.com/liftbridge-io/liftbridge/issues/220)) + +Implement a bridge process that maps Kafka topics to Liftbridge streams to +better support the migration from Kafka to Liftbridge or to support hybrid +scenarios. + +### Federated Clustering ([#219](https://github.com/liftbridge-io/liftbridge/issues/219)) + +Implement federated Liftbridge clustering to better support geo-replication. +This may involve implementing a system similar to Kafka MirrorMaker for +replicating data between clusters and/or leveraging NATS superclustering and +geo-aware subscribers. + +### Optional NATS API ([#221](https://github.com/liftbridge-io/liftbridge/issues/221)) + +With the introduction of [Embedded NATS Server](#embedded-nats-server-19httpsgithubcomliftbridge-ioliftbridgeissues19), +allow the NATS API to be exposed optionally. This, in effect, turns NATS into +an implementation detail and allows users to just interact with the Liftbridge +API as a standalone service. + +### Flatbuffers and Zero-Copy Support ([#87](https://github.com/liftbridge-io/liftbridge/issues/87), [#185](https://github.com/liftbridge-io/liftbridge/issues/185)) + +Provide opt-in support for Flatbuffers and a zero-copy API for high-performance +use cases. + diff --git a/website/versioned_sidebars/version-v1.3.0-sidebars.json b/website/versioned_sidebars/version-v1.3.0-sidebars.json new file mode 100644 index 00000000..fa0fb7e8 --- /dev/null +++ b/website/versioned_sidebars/version-v1.3.0-sidebars.json @@ -0,0 +1,32 @@ +{ + "version-v1.3.0-docs": { + "Overview": [ + "version-v1.3.0-overview", + "version-v1.3.0-faq", + "version-v1.3.0-concepts", + "version-v1.3.0-quick-start", + "version-v1.3.0-feature-comparison", + "version-v1.3.0-roadmap" + ], + "Configuration": [ + "version-v1.3.0-configuration", + "version-v1.3.0-ha-and-consistency-configuration" + ], + "Deployment": [ + "version-v1.3.0-deployment" + ], + "Developing With Liftbridge": [ + "version-v1.3.0-activity", + "version-v1.3.0-pausing-streams", + "version-v1.3.0-cursors" + ], + "Technical Deep Dive": [ + "version-v1.3.0-replication-protocol", + "version-v1.3.0-envelope-protocol" + ], + "Client Libraries": [ + "version-v1.3.0-clients", + "version-v1.3.0-client-implementation" + ] + } +} diff --git a/website/versions.json b/website/versions.json index de3d2a9f..e59ce7f6 100644 --- a/website/versions.json +++ b/website/versions.json @@ -1,4 +1,5 @@ [ + "v1.3.0", "v1.2.0", "v1.1.0", "v1.0.0",