Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added some readmes and doc comments #377

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ namespace DurableTask.Netherite
using DurableTask.Core;
using DurableTask.Core.History;

/// <summary>
/// Update event that executes when a orchestration work item is completed.
/// </summary>
Comment on lines +13 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on my assumptions, I presume a batch refers to multiple WIs.

Suggested change
/// <summary>
/// Update event that executes when a orchestration work item is completed.
/// </summary>
/// <summary>
/// Update event that executes when a orchestration work items are completed.
/// </summary>

[DataContract]
class BatchProcessed : PartitionUpdateEvent, IRequiresPrefetch
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ abstract class PartitionUpdateEvent : PartitionEvent
[IgnoreDataMember]
public OutboxState.Batch OutboxBatch { get; set; }

/// <summary>
/// Used to determine which partition state objects should be modified by an event.
/// </summary>
public abstract void DetermineEffects(EffectTracker effects);

public abstract void ApplyTo(TrackedObject trackedObject, EffectTracker effectTracker);
Expand Down
53 changes: 53 additions & 0 deletions src/DurableTask.Netherite/Events/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Netherite Events

This directory contains the event definitions for the Netherite engine. All components in Netherite communicate via events over an event bus (i.e. Azure Event Hubs). The events are defined as .NET classes that inherit from the `DurableTask.Netherite.Event` abstract base class.

Event classes are organized on the file system based on where they execute. For example `ClientEvents` are events that execute on the client and `LoadMonitor` events are executed on the global load monitor component.

## Event types

The following are the different event types supported by Netherite.

### [Client events](./ClientEvents/)

Client events are events that are sent from the partition and *executed on the client*. In most cases, they are responses to client-initiated requests. For example, a client sends a [StateRequestReceived](./PartitionEvents/External/FromClients/StateRequestReceived.cs) event to a partition to request the state of an orchestration, and the partition responds back to the client with a [StateResponseReceived](./ClientEvents/StateResponseReceived.cs) event, containing that result.
Copy link
Member

@davidmrdavid davidmrdavid Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something confuses me here: This says that client events are sent from the partition and executed on the client, but then you explain that StateRequestReceived is sent from the client and executed in the partition which seems contradictory.

Perhaps the way to make sense of this is that StateRequestReceived is not considered a client event? If so - I recommend we call that out for clarity's sake :-) .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was worried that readers might be confused by this, and you confirmed it. :) Let me try to clarify this...


### [Partition events](./PartitionEvents/)

*Partition events* are executed on the partition (i.e. on the worker that owns the partition). They come from clients, the load monitor, or other partitions. Partitions also send events to themselves (internal events). The partition event class definitions are organized on the file system based on where they come from.

There are three types of partition events, which are used to interact with partition state:

1. **Update events**: Atomically read/update one or more tracked objects. These are deterministically replay-able - i.e. it must always have the same effect.
2. **Read events**: Reads a single object.
3. **Query events**: Scans all `InstanceState` objects. These implement the query functionality.

### [Load monitor events](./LoadMonitorEvents/)

Load monitor events are sent by the partition and executed on the global load monitor component.

### [Event fragments](./Fragments/)

(Appears to be a special type of event that allows taking large events and breaking them up into smaller events - more research is needed to confirm)
Comment on lines +29 to +31
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, I believe you're right. If I'm not mistaken - sometimes an event is split into multiple blobs and as such it's events are received in fragments that need to be re-assembled.

Related: https://github.com/microsoft/durabletask-netherite/blob/main/src/DurableTask.Netherite/Util/FragmentationAndReassembly.cs


## Event processing

Netherite operations that modify state are often composed of multiple events in a sequence. This is typically done because partition state needs to be loaded into the cache before it can be modified. For example, the process of creating a new orchestration or entity (instance) involves three steps:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm intrigued by the word "cache" here - is that particularly meaningful here? We can't we just say "loaded into memory"?

Copy link
Member Author

@cgillum cgillum Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the terminology that Sebastian used pretty consistently throughout the part 2 presentation, so I was trying to be consistent. That said, I too don't know if there's any distinction between "cached" and "in-memory" data and prefer the latter if there isn't a useful distinction.


1. **CreateRequestReceived** (phase=`Read`): This *update* event adds itself to the partition's [`PrefetchState`](../PartitionState/PrefetchState.cs) object (which is always in memory) in the target partition. This update ensures that the create request is not lost if the partition goes down.
1. **InstancePrefetch**: This asynchronously fetches the state into the cache and then submits an *update* event to do the actual state update.
1. **StateRequestReceived** (phase=`ConfirmAndProcess`): This *update* event creates the new instance state object in the partition state, sets its status to `Pending`, removes the prior event from the `PrefetchState`, and then enqueues an `ExecutionStartedEvent` which is used by DTFx to actually start the orchestration or entity.

Each of the above steps are atomic, allowing us to reliably recover intermediate state after a crash.

The reason for this design is that the partition state is not always in memory. It is stored in **FasterKV** and is only loaded into memory when needed. If an attempt is made to update some state when it is not in memory, the update will be queued until the state is loaded into memory, stalling the pipeline of (ordered) events the follows the update.

### Partition event pipeline

Events in a Netherite partition are processed in a pipeline that's composed of various "workers" once they are submitted to `IPartitionState.Submit(...)`.

* `IntakeWorker`: Assigns commit log position to each event and decides whether to send the event to a `LogWorker` or a `StoreWorker`.
* `LogWorker`: Continuously persists update events to the commit log using **FasterLog**.
* `StoreWorker`: Handles read/update/query events on the partition state, and periodically/asynchronously checkpoints to **FasterKV**.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something worth calling out as we get into the nitty gritty - FasterKV also has an inner log (called the hybrid log) and this is important to remember in particularly hairy FASTER investigations. So when we talk about "the FASTER log" it's not always immediately clear if we mean FasterLog of FasterKV's "hybrid log". Just FYI, in case we want to sneak that info somewhere in here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting to know that FasterKV is internally implemented using a log, but I'll defer to Sebastian on whether we need additional clarity on the terminology here. I wouldn't expect the average Netherite developer to know the internals of FASTER, but then again, every Netherite contributor I know does know the internals of FASTER so the data already contradicts my assumptions. :)


This particular design is interesting because it allows events to be processed and persisted in parallel, allowing the system to go faster.
9 changes: 9 additions & 0 deletions src/DurableTask.Netherite/PartitionState/ActivitiesState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@ namespace DurableTask.Netherite
[DataContract]
class ActivitiesState : TrackedObject
{
/// <summary>
/// Activities currently executing on the current partition.
/// </summary>
[DataMember]
public Dictionary<long, ActivityInfo> Pending { get; private set; }

/// <summary>
/// Activities that aren't yet executing. These activities are candidates for offloading to other partitions.
/// </summary>
[DataMember]
public Queue<ActivityInfo> LocalBacklog { get; private set; }

/// <summary>
/// Activities that were enqueued onto this partition from another partition.
/// </summary>
[DataMember]
public Queue<ActivityInfo> QueuedRemotes { get; private set; }

Expand Down
32 changes: 32 additions & 0 deletions src/DurableTask.Netherite/PartitionState/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Partition state

A partition is a collection of `TrackedObject` objects whose state is tracked by event sourcing. There are two types of objects stored in a partition:

1. **Singleton state objects**: These are large objects that internally contain collections. For example, `TimerState` contains a collection of all timers in the partition.
2. **Per-instance state objects**: These are defined per instance ID and contain the instance state and the history state.

## Partition state objects

The following are the partition state objects managed by Netherite:

| Object | Type | Description |
|-------------------|-----------|------------------------------------------------------------------|
| `InstanceState` | Instance | Contains the state of an orchestration or entity instance. |
| `HistoryState` | Instance | Stores the history of one orchestration or entity instance. |
| `ActivitiesState` | Singleton | Contains the state of all activities in the partition (in-progress or queued). |
| `DedupState` | Singleton | Stores deduplication vector for messages from other partitions. |
| `OutboxState` | Singleton | Buffers all outgoing messages and responses. |
| `PrefetchState` | Singleton | Buffers client requests until state is in memory. |
| `QueriesState` | Singleton | Buffers client query requests. |
| `ReassemblyState` | Singleton | Buffers received fragments until reassembled. |
| `SessionState` | Singleton | Stores all orchestration and entity instance messages (in-progress or queued). |
| `StatsState` | Singleton | Stores the list and counts of all instances in the partition. |
| `TimersState` | Singleton | Buffers instance messages scheduled for future delivery. |

Each partition has its own isolated collection of these state objects.

## Updating partition state

Partition state is updated by submitting events to the partition state object. The partition state object is responsible for applying the event to the state and updating the state in memory. The in-memory state is then periodically persisted to FasterKV.

See [../Events/README.md](../Events/README.md) for more information on the different types of events that can be submitted to the partition state.