This repository has been archived by the owner on Oct 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraft.go
72 lines (66 loc) · 2.09 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package arctonyx
import (
"context"
"fmt"
"github.com/kataras/go-errors"
"github.com/kataras/golog"
"github.com/readystock/raft"
"google.golang.org/grpc"
"sync"
)
type clusterClient struct {
Store
conn *grpc.ClientConn
sync *sync.Mutex
addr raft.ServerAddress
cluster *clusterServiceClient
}
func (client *clusterClient) validateConnection(leaderAddr raft.ServerAddress) error {
defer func() {
if r := recover(); r!= nil {
fmt.Println("recovered from ", r)
}
}()
client.sync.Lock()
defer client.sync.Unlock()
if !client.Store.IsLeader() {
// If the address is not the same (the leader has changed) then update the connection and reconnect.
if client.conn != nil {
client.conn.Close()
}
golog.Debugf("[%d] Connecting to leader at `%s`", client.Store.nodeId, leaderAddr)
conn, err := grpc.Dial(string(leaderAddr), grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return err
}
golog.Debugf("[%d] Connected to leader", client.Store.nodeId)
client.cluster = &clusterServiceClient{cc: conn}
}
return nil
}
func (client *clusterClient) sendCommand(command *Command) (*CommandResponse, error) {
if err := client.validateConnection(client.Store.raft.Leader()); err != nil {
return nil, err
}
golog.Debugf("[%d] Sending command to leader", client.Store.nodeId)
if result, err := client.cluster.SendCommand(context.Background(), command); err != nil {
return nil, err
} else if !result.IsSuccess {
return nil, errors.New(result.ErrorMessage)
} else {
return result, nil
}
}
func (client *clusterClient) getNextChunkInSequence(sequenceName string) (*SequenceChunkResponse, error) {
if err := client.validateConnection(client.Store.raft.Leader()); err != nil {
return nil, err
}
if result, err := client.cluster.GetSequenceChunk(context.Background(), &SequenceChunkRequest{SequenceName: sequenceName}); err != nil {
return nil, err
} else {
return result, nil
}
}
func (client *clusterClient) joinCluster(addr string) (*JoinResponse, error) {
return client.cluster.Join(context.Background(), &JoinRequest{RaftAddress: client.listen, Id: client.nodeId})
}