Skip to content

Commit

Permalink
move agents monitor task to go
Browse files Browse the repository at this point in the history
  • Loading branch information
wh1te909 committed Jan 30, 2021
1 parent ada627f commit 8aa11c5
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 25 deletions.
21 changes: 0 additions & 21 deletions api/tacticalrmm/agents/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,6 @@
logger.configure(**settings.LOG_CONFIG)


def _check_agent_service(pk: int) -> None:
agent = Agent.objects.get(pk=pk)
r = asyncio.run(agent.nats_cmd({"func": "ping"}, timeout=2))
# if the agent is respoding to pong from the rpc service but is not showing as online (handled by tacticalagent service)
# then tacticalagent service is hung. forcefully restart it
if r == "pong":
logger.info(
f"Detected crashed tacticalagent service on {agent.hostname} v{agent.version}, attempting recovery"
)
data = {"func": "recover", "payload": {"mode": "tacagent"}}
asyncio.run(agent.nats_cmd(data, wait=False))


@app.task
def monitor_agents_task() -> None:
q = Agent.objects.only("pk", "version", "last_seen", "overdue_time")
agents: List[int] = [i.pk for i in q if i.has_nats and i.status != "online"]
for agent in agents:
_check_agent_service(agent)


def agent_update(pk: int) -> str:
agent = Agent.objects.get(pk=pk)
# skip if we can't determine the arch
Expand Down
2 changes: 2 additions & 0 deletions api/tacticalrmm/natsapi/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@
path("winupdates/", views.NatsWinUpdates.as_view()),
path("choco/", views.NatsChoco.as_view()),
path("wmi/", views.NatsWMI.as_view()),
path("offline/", views.OfflineAgents.as_view()),
path("logcrash/", views.LogCrash.as_view()),
]
28 changes: 28 additions & 0 deletions api/tacticalrmm/natsapi/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,31 @@ def get(self, request):
if pyver.parse(i.version) >= pyver.parse("1.2.0") and i.status == "online"
]
return Response({"agent_ids": online})


class OfflineAgents(APIView):
authentication_classes = []
permission_classes = []

def get(self, request):
agents = Agent.objects.only(
"pk", "agent_id", "version", "last_seen", "overdue_time"
)
offline: List[str] = [
i.agent_id for i in agents if i.has_nats and i.status != "online"
]
return Response({"agent_ids": offline})


class LogCrash(APIView):
authentication_classes = []
permission_classes = []

def post(self, request):
agent = get_object_or_404(Agent, agent_id=request.data["agentid"])
logger.info(
f"Detected crashed tacticalagent service on {agent.hostname} v{agent.version}, attempting recovery"
)
agent.last_seen = djangotime.now()
agent.save(update_fields=["last_seen"])
return Response("ok")
4 changes: 0 additions & 4 deletions api/tacticalrmm/tacticalrmm/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
"task": "agents.tasks.auto_self_agent_update_task",
"schedule": crontab(minute=35, hour="*"),
},
"check-agentservice": {
"task": "agents.tasks.monitor_agents_task",
"schedule": crontab(minute="*/15"),
},
"remove-salt": {
"task": "agents.tasks.remove_salt_task",
"schedule": crontab(minute=14, hour="*/2"),
Expand Down
1 change: 1 addition & 0 deletions natsapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func Listen(apihost, natshost, version string, debug bool) {
}

go getWMI(rClient, nc)
go monitorAgents(rClient, nc)

nc.Subscribe("*", func(msg *nats.Msg) {
var mh codec.MsgpackHandle
Expand Down
Binary file modified natsapi/bin/nats-api
Binary file not shown.
37 changes: 37 additions & 0 deletions natsapi/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,43 @@ import (
"github.com/ugorji/go/codec"
)

func monitorAgents(c *resty.Client, nc *nats.Conn) {
var payload, recPayload []byte
var mh codec.MsgpackHandle
mh.RawToString = true
ret := codec.NewEncoderBytes(&payload, new(codec.MsgpackHandle))
ret.Encode(map[string]string{"func": "ping"})

rec := codec.NewEncoderBytes(&recPayload, new(codec.MsgpackHandle))
rec.Encode(Recovery{
Func: "recover",
Data: map[string]string{"mode": "tacagent"},
})

tick := time.NewTicker(10 * time.Minute)
for range tick.C {
agentids, _ := c.R().SetResult(&AgentIDS{}).Get("/offline/")
ids := agentids.Result().(*AgentIDS).IDs
var resp string
for _, id := range ids {
out, err := nc.Request(id, payload, 2*time.Second)
if err != nil {
continue
}
dec := codec.NewDecoderBytes(out.Data, &mh)
if err := dec.Decode(&resp); err == nil {
// if the agent is respoding to pong from the rpc service but is not showing as online (handled by tacticalagent service)
// then tacticalagent service is hung. forcefully restart it
if resp == "pong" {
nc.Publish(id, recPayload)
p := map[string]string{"agentid": id}
c.R().SetBody(p).Post("/logcrash/")
}
}
}
}
}

func getWMI(c *resty.Client, nc *nats.Conn) {
var payload []byte
var mh codec.MsgpackHandle
Expand Down
5 changes: 5 additions & 0 deletions natsapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ type NatsInfo struct {
type AgentIDS struct {
IDs []string `json:"agent_ids"`
}

type Recovery struct {
Func string `json:"func"`
Data map[string]string `json:"payload"`
}

0 comments on commit 8aa11c5

Please sign in to comment.