-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnvml_system.go
147 lines (136 loc) · 4.29 KB
/
nvml_system.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
pb "GCS/proto"
"encoding/json"
"github.com/gorilla/websocket"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"log/slog"
"strings"
)
func (c *ResourceClient) nvml_sys_handler() error {
switch c.rm.Type {
case RESOUECE_GET_TYPE_ALL:
//slog.Debug("get all GPU resource")
err := c.grpcHandler(c.rm.NodeAddress, GPU_ALL_INDEX_STRING)
if err != nil {
slog.Error("resource grpcHandler error",
"ERR_MSG", err.Error())
return err
}
case RESOUECE_GET_TYPE_PARTIAL:
//slog.Debug("get partial GPU resource")
for _, v := range *c.rm.OccupiedList {
if c.rm.NodeAddress == v.NodeAddress {
err := c.grpcHandler(c.rm.NodeAddress, v.GPUIndex)
if err != nil {
slog.Error("resource grpcHandler error",
"ERR_MSG", err.Error())
return err
}
break
}
}
}
return nil
}
func (c *ResourceClient) grpcHandler(addr string, gpuIdx string) error {
// 连接grpc服务器
conn, err := grpc.Dial(addr+GCS_INFO_CATCH_GRPC_PORT, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
slog.Error("resource grpc.Dial get error",
"ERR_MSG", err.Error())
return err
}
// 延迟关闭连接
defer conn.Close()
// 初始化客户端
rpcClient := pb.NewGcsInfoCatchServiceDockerClient(conn)
// 初始化上下文,设置请求超时时间为1秒
//ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// 延迟关闭请求会话
//defer cancel()
stream, err := rpcClient.NvmlUtilizationRate(context.Background(), &pb.NvmlInfoReuqestMsg{IndexID: gpuIdx})
if err != nil {
slog.Error(" client.NvmlUtilizationRate stream get err",
"ERR_MSG", err.Error())
return err
}
for {
// 通过 Recv() 不断获取服务端send()推送的消息
resp, err := stream.Recv()
// err==io.EOF则表示服务端关闭stream了 退出
if err == io.EOF { //说明资源读取正常完成了,所以 break 出循环
//slog.Debug("resource rpc stream read over")
break
}
if err != nil && err != io.EOF {
slog.Error("resource rpc stream server receive error",
"ERR_MSG", err.Error())
return err
}
//组装到 sendmsg 中
c.sm.GPUIndex = AssembleToRespondString(resp.GetIndexID())
c.sm.Occupied = AssembleToRespondString(resp.GetOccupied())
c.sm.Temperature = AssembleToRespondString(resp.GetTemperature())
c.sm.MemUtilize = AssembleToRespondString(resp.GetMemRate())
c.sm.Utilize = AssembleToRespondString(resp.GetUtilizationRate())
}
/*slog.Debug("NVML Info:", "RPC_NODE", c.rm.NodeName,
"GetIndexID", c.sm.GPUIndex,
"GetOccupied", c.sm.Occupied,
"GetMemRate", c.sm.MemUtilize,
"GetUtilizationRate", c.sm.Utilize)*/
//send
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
slog.Error("c.conn.NextWriter error",
"ERR_MSG", err.Error())
return err
}
sdmsg, _ := json.Marshal(c.sm)
_, err = w.Write(sdmsg)
if err != nil {
slog.Error("w.Write error",
"ERR_MSG", err.Error())
}
if err := w.Close(); err != nil {
slog.Error("w.Close error",
"ERR_MSG", err.Error())
return err
}
return nil
}
func checkGPUOccupiedOrNot(addr string, gpuIndex string) bool {
conn, err := grpc.Dial(addr+GCS_INFO_CATCH_GRPC_PORT, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
slog.Error("resource grpc.Dial get error",
"ERR_MSG", err.Error())
return false
}
// 延迟关闭连接
defer conn.Close()
// 初始化客户端
rpcClient := pb.NewGcsInfoCatchServiceDockerClient(conn)
stream, err := rpcClient.NvmlUtilizationRate(context.Background(), &pb.NvmlInfoReuqestMsg{IndexID: gpuIndex})
if err != nil {
slog.Error(" client.NvmlUtilizationRate stream get err",
"ERR_MSG", err.Error())
return false
}
// 通过 Recv() 不断获取服务端send()推送的消息
resp, err := stream.Recv()
// err==io.EOF则表示服务端关闭stream了 退出
if err == io.EOF {
//slog.Debug("resource rpc stream read over")
return true //因为只有一次读取,如果第一次读取就是 EOF,说明有问题,所以不能被使用
}
if err != nil && err != io.EOF {
slog.Error("resource rpc stream server receive error",
"ERR_MSG", err.Error())
return true //读取 stream 出错了,说明有问题,所以不能被使用
}
return strings.Contains(AssembleToRespondString(resp.GetOccupied()), "1")
}