Skip to content

Commit

Permalink
feat: refactor and datamine (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
davideimola authored Mar 7, 2023
1 parent e11bd1a commit 211889e
Show file tree
Hide file tree
Showing 15 changed files with 2,301 additions and 269 deletions.
26 changes: 26 additions & 0 deletions .proto/redcarbon/external_api/agents/api/v1/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ message AgentConfigurationData {
oneof data {
SentinelOneData sentinel_one = 1;
GrayLogImpossibleTravelData graylog_impossible_travel = 2;
GrayLogDataMineData graylog_datamine = 3;
}
}

Expand All @@ -35,8 +36,33 @@ message GrayLogImpossibleTravelData {
google.protobuf.Duration time_window = 4;
}

message GrayLogDataMineData {
string token = 1;
string url = 2;
bool skip_ssl = 3;
}

message GrayLogImpossibleTravelLog {
map<string,string> logs = 1;
}

message GrayLogDataMineResult {
google.protobuf.Timestamp timestamp = 1;
string uuid = 2;
string source = 3;
string message = 4;
}

enum DataType {
UNSUPPORTED = 0;
SENTINEL_ONE = 1;
GRAYLOG_IMPOSSIBLE_TRAVEL = 2;
GRAYLOG_DATAMINE = 3;
}

message GrayLogDataMineQuery {
string id = 1;
google.protobuf.Timestamp search_start_time = 2;
google.protobuf.Timestamp search_stop_time = 3;
string query = 4;
}
62 changes: 55 additions & 7 deletions .proto/redcarbon/external_api/agents/api/v1/v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,20 @@ service AgentsExternalV1Srv {
rpc PullConfigurations(PullConfigurationsReq)
returns (PullConfigurationsRes) {}

rpc SendData(SendDataReq)
returns (SendDataRes) {}
rpc SendSentinelOneData(SendSentinelOneDataReq)
returns (SendSentinelOneDataRes) {}
rpc SendGrayLogImpossibleTravelData(SendGrayLogImpossibleTravelDataReq)
returns (SendGrayLogImpossibleTravelDataRes) {}
rpc SendGrayLogDatamineQueryResultsData(SendGrayLogDatamineQueryResultsDataReq)
returns (SendGrayLogDatamineQueryResultsDataRes) {}
rpc SendGrayLogDatamineQueryErrorData (SendGrayLogDatamineQueryErrorDataReq)
returns (SendGrayLogDatamineQueryErrorDataRes) {}

rpc RefreshToken(RefreshTokenReq)
returns (RefreshTokenRes) {}

rpc GetGrayLogDataMinePendingQueries(GetGrayLogDataMinePendingQueriesReq)
returns (GetGrayLogDataMinePendingQueriesRes) {}
}

message HZReq {
Expand All @@ -37,13 +46,44 @@ message PullConfigurationsRes {
repeated AgentConfiguration agent_configurations = 1;
}

message SendDataReq {
string data = 1;
string agent_configuration_id = 2;
DataType data_type = 3;
message SendSentinelOneDataReq {
string agent_configuration_id = 1;
string data = 2;
}

message SendSentinelOneDataRes {
google.protobuf.Timestamp received_at = 1;
}

message SendGrayLogImpossibleTravelDataReq {
string agent_configuration_id = 1;
string user = 2;
repeated string ips = 3;
repeated string countries = 4;
repeated GrayLogImpossibleTravelLog impossible_travel_logs = 5;
}

message SendGrayLogImpossibleTravelDataRes {
google.protobuf.Timestamp received_at = 1;
}

message SendGrayLogDatamineQueryResultsDataReq {
string agent_configuration_id = 1;
string query_id = 2;
repeated GrayLogDataMineResult results = 3;
}

message SendGrayLogDatamineQueryResultsDataRes {
google.protobuf.Timestamp received_at = 1;
}

message SendGrayLogDatamineQueryErrorDataReq {
string agent_configuration_id = 1;
string query_id = 2;
string error = 3;
}

message SendDataRes {
message SendGrayLogDatamineQueryErrorDataRes {
google.protobuf.Timestamp received_at = 1;
}

Expand All @@ -54,3 +94,11 @@ message RefreshTokenRes {
string token = 1;
string refresh_token = 2;
}

message GetGrayLogDataMinePendingQueriesReq {
string agent_configuration_id = 1;
}

message GetGrayLogDataMinePendingQueriesRes {
repeated GrayLogDataMineQuery graylog_datamine_queries = 1;
}
113 changes: 113 additions & 0 deletions internal/graylog-datamine/graylog-datamine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package graylog_datamine

import (
"context"
"sync"
"time"

"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"

"pkg.redcarbon.ai/internal/graylog"
agentsExternalApiV1 "pkg.redcarbon.ai/proto/redcarbon/external_api/agents/api/v1"
)

type ServiceGrayLogDataMine struct {
ac *agentsExternalApiV1.AgentConfiguration
gdmConf *agentsExternalApiV1.GrayLogDataMineData
aCli agentsExternalApiV1.AgentsExternalV1SrvClient
glCli graylog.Client
}

func NewGrayLogDataMineService(conf *agentsExternalApiV1.AgentConfiguration, cli agentsExternalApiV1.AgentsExternalV1SrvClient) *ServiceGrayLogDataMine {
gdmConf := conf.Data.GetGraylogDatamine()

return &ServiceGrayLogDataMine{
ac: conf,
aCli: cli,
gdmConf: gdmConf,
glCli: graylog.NewGrayLogClient(gdmConf.Token, gdmConf.Url, gdmConf.SkipSsl),
}
}

func (s ServiceGrayLogDataMine) RunService(ctx context.Context) {
l := logrus.WithField("configurationId", s.ac.AgentConfigurationId)

l.Infof("Retrieving pending queries...")

qs, err := s.aCli.GetGrayLogDataMinePendingQueries(ctx, &agentsExternalApiV1.GetGrayLogDataMinePendingQueriesReq{
AgentConfigurationId: s.ac.AgentConfigurationId,
})
if err != nil {
l.Errorf("Error while retrieving the pending queries for error %v", err)
return
}

var wg sync.WaitGroup

for _, q := range qs.GraylogDatamineQueries {
wg.Add(1)

go func(query *agentsExternalApiV1.GrayLogDataMineQuery) {
defer wg.Done()

err := s.runQuery(ctx, query, l)
if err == nil {
return
}

l.Errorf("Error while executing the query %s for error %v", query.Id, err)

_, err = s.aCli.SendGrayLogDatamineQueryErrorData(ctx, &agentsExternalApiV1.SendGrayLogDatamineQueryErrorDataReq{
AgentConfigurationId: s.ac.AgentConfigurationId,
QueryId: query.Id,
Error: err.Error(),
})
if err != nil {
l.Errorf("Error while sending the error due to %v for query %s", err, query.Id)
}
}(q)
}

wg.Wait()

l.Infof("Done")
}

func (s ServiceGrayLogDataMine) runQuery(ctx context.Context, q *agentsExternalApiV1.GrayLogDataMineQuery, l *logrus.Entry) error {
l.Infof("Starting query %s...", q.Id)

res, err := s.glCli.QueryData(ctx, q.Query, q.SearchStartTime.AsTime(), q.SearchStopTime.AsTime(), []string{"timestamp", "gl2_message_id", "source", "message"})
if err != nil {
return err
}

var results []*agentsExternalApiV1.GrayLogDataMineResult

for _, v := range res {
t, err := time.Parse(time.RFC3339Nano, v["timestamp"])
if err != nil {
return err
}

results = append(results, &agentsExternalApiV1.GrayLogDataMineResult{
Uuid: v["gl2_message_id"],
Source: v["source"],
Message: v["message"],
Timestamp: timestamppb.New(t),
})
}

_, err = s.aCli.SendGrayLogDatamineQueryResultsData(ctx, &agentsExternalApiV1.SendGrayLogDatamineQueryResultsDataReq{
QueryId: q.Id,
AgentConfigurationId: s.ac.AgentConfigurationId,
Results: results,
})
if err != nil {
return err
}

l.Infof("Query %s successfully executed", q.Id)

return nil
}
Loading

0 comments on commit 211889e

Please sign in to comment.