This repository has been archived by the owner on May 18, 2021. It is now read-only.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
The following PR implements a static application-level sharding strategy for graph requests based on their IDs. HTTP requests are forwarded to the appropriate node via a custom cluster-aware HTTP middleware. This is an initial approach to support clustering in the completer (see #80). Additional details are provided below.
Running in a Cluster
The simplest way to run the completer in clustered mode is using docker-compose. To run a two node cluster:
Change the following environment variables to configure your cluster:
cluster_node_count
: the total number of nodes in the cluster. This value is used to assign a node to a shard. In order to safely change this value, you must first stop the cluster.cluster_node_id
: the index of the current node in the cluster. Note that evaluating${cluster_node_prefix}${cluster_node_id}
must return the DNS-resolvable name of this node.Additional properties supported:
cluster_node_prefix
: the prefix of the DNS-resolvable node name (without the node index)cluster_shard_count
: defaults to 10 *cluster_node_count
. Note that changing this value will result in new mappings of shards to nodes, and previously persisted graph information will no longer be accessible.Clustering Design
The self-contained nature of graph processing (no cross-graph computation is allowed) makes the completer ideally suited for sharding at the graph level. Incoming HTTP requests can be inspected for a graph ID, from which we can compute a shard and ultimately the node where they should be processed.
Due to the at-most guarantees in graph computations which may have side-effects, we must guarantee that a shard is only owned by at most a single node in the cluster at any given time. By using hash-based partitioning to map graph IDs to shards, and then shards to nodes, we can ensure that all requests pertaining to the same graph are processed by the same node in the cluster.
Upon receiving an HTTP request, the current completer node first determines whether to process locally or forward the HTTP request to another node. The forwarding logic is implemented inside an HTTP interceptor (or middleware in Gin parlance). Note that requests that are not cluster-aware (ie. have no associated graph ID), e.g. ping, are not forwarded. An exception to this are requests to create new graphs (for which no graph ID exist). In this case, a special interceptor will generate and assign a new graph ID to the request, prior to processing by the forwarding interceptor.
At node startup, a supervisor is spawned for each shard owned by the local node, as determined by the shard-to-node hash-based partitioning. Each shard supervisor will then spawn any graph actor children that were assigned to that shard and are still active. Note that this is a static strategy, so that changing the number of shards will result in new shard-to-node mappings and any previous graph data will no longer be accessible. It is still possible to modify the number of nodes in the cluster. Doing so requires all nodes to be stopped to ensure consistent allocation of shards to nodes, thus guaranteeing a single actor instance per graph across the cluster.