Skip to content

Commit

Permalink
More tests and reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
bomoko committed Feb 7, 2024
1 parent 56ae401 commit 2c7f67e
Show file tree
Hide file tree
Showing 5 changed files with 617 additions and 84 deletions.
159 changes: 78 additions & 81 deletions internal/handler/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts

// processMessageQueue reads in a rabbitMQ item and dispatches it to the appropriate function to process
func (h *Messaging) processMessageQueue(message mq.Message) {
var insights InsightsData
var resource ResourceDestination

acknowledgeMessage := func(message mq.Message) func() {
return func() {
// Ack to remove from queue
Expand All @@ -59,6 +58,9 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
}
}(message)

// here we unmarshal the initial incoming message body
// notice how there is a "type" associated with the detail,
// this is the primary driver used to determine which subsystem this message will be processed by.
incoming := &InsightsMessage{}
err := json.Unmarshal(message.Body(), incoming)

Expand All @@ -68,16 +70,13 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
return
}

// if we have direct problems or facts, we process them differently - skipping all
// the extra processing below.
if incoming.Type == "direct.facts" {
switch incoming.Type {
case "direct.facts":
resp := processFactsDirectly(message, h)
slog.Debug(resp)
acknowledgeMessage()
return
}

if incoming.Type == "direct.problems" {
case "direct.problems":
resp, _ := processProblemsDirectly(message, h)
if h.EnableDebug {
for _, d := range resp {
Expand All @@ -86,20 +85,15 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
}
acknowledgeMessage()
return
}

// We also directly process deletion of problems and facts
if incoming.Type == "direct.delete.problems" {
case "direct.delete.problems":
slog.Debug("Deleting problems")
_, err := deleteProblemsDirectly(message, h)
if err != nil {
slog.Error(err.Error())
}
acknowledgeMessage() // Should we be acknowledging this error?
return
}

if incoming.Type == "direct.delete.facts" {
case "direct.delete.facts":
_, err := deleteFactsDirectly(message, h)
if err != nil {
slog.Error(err.Error())
Expand All @@ -108,6 +102,62 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
return
}

// If we get here, we don't have an assigned type - which means we process the data via inferrence.
// there are essentially two steps that happen there
// First - we preprocess and clean up the incoming data
// resource = contains details about where this came from
// insights = contains details about the actual insights data itself
resource, insights, err := preprocessIncomingMessageData(incoming)

if err != nil {
slog.Error("Error preprocessing - rejecting message and exiting", "Error", err.Error())
rejectMessage(false)
}

slog.Debug("Insights", "data", fmt.Sprint(insights))
slog.Debug("Target", "data", fmt.Sprint(resource))

// Process s3 upload
if !h.S3Config.Disabled {
if insights.InsightsType != Direct {
err := h.sendToLagoonS3(incoming, insights, resource)
if err != nil {
slog.Error("Unable to send to S3", "Error", err.Error())
}
}
}

// Process Lagoon API integration
if !h.LagoonAPI.Disabled {
if insights.InsightsType != Sbom &&
insights.InsightsType != Image &&
insights.InsightsType != Raw &&
insights.InsightsType != Direct {
slog.Error("only 'sbom', 'direct', 'raw', and 'image' types are currently supported for api processing")
} else {
err := h.sendToLagoonAPI(incoming, resource, insights)

if err != nil {
slog.Error("Unable to send to the API", "Error", err.Error())
rejectMessage(false)
return
}
}
}
acknowledgeMessage()
}

// preprocessIncomingMessageData deals with what are now legacy types, where most of the insight information
// used for further downstream processing is extracted from the message.
func preprocessIncomingMessageData(incoming *InsightsMessage) (ResourceDestination, InsightsData, error) {
var resource ResourceDestination
// Set some insight data defaults
insights := InsightsData{
LagoonType: Facts,
OutputFileExt: "json",
OutputFileMIMEType: "application/json",
}

// Check labels for insights data from message
if incoming.Labels != nil {
labelKeys := make([]string, 0, len(incoming.Labels))
Expand All @@ -116,38 +166,25 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
}
sort.Strings(labelKeys)

// Set some insight data defaults
insights = InsightsData{
LagoonType: Facts,
OutputFileExt: "json",
OutputFileMIMEType: "application/json",
}

for _, label := range labelKeys {
if label == "lagoon.sh/project" {
switch label {
case "lagoon.sh/project":
resource.Project = incoming.Labels["lagoon.sh/project"]
}
if label == "lagoon.sh/environment" {
case "lagoon.sh/environment":
resource.Environment = incoming.Labels["lagoon.sh/environment"]
}
if label == "lagoon.sh/service" {
case "lagoon.sh/service":
resource.Service = incoming.Labels["lagoon.sh/service"]
}

if label == "lagoon.sh/insightsType" {
case "lagoon.sh/insightsType":
insights.InputType = incoming.Labels["lagoon.sh/insightsType"]
}
if incoming.Labels["lagoon.sh/insightsType"] == "image-gz" {
insights.LagoonType = ImageFacts
}
if label == "lagoon.sh/insightsOutputCompressed" {
if incoming.Labels["lagoon.sh/insightsType"] == "image-gz" {
insights.LagoonType = ImageFacts
}
case "lagoon.sh/insightsOutputCompressed":
compressed, _ := strconv.ParseBool(incoming.Labels["lagoon.sh/insightsOutputCompressed"])
insights.OutputCompressed = compressed
}
if label == "lagoon.sh/insightsOutputFileMIMEType" {
case "lagoon.sh/insightsOutputFileMIMEType":
insights.OutputFileMIMEType = incoming.Labels["lagoon.sh/insightsOutputFileMIMEType"]
}
if label == "lagoon.sh/insightsOutputFileExt" {
case "lagoon.sh/insightsOutputFileExt":
insights.OutputFileExt = incoming.Labels["lagoon.sh/insightsOutputFileExt"]
}
}
Expand All @@ -169,9 +206,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) {

// Determine incoming payload type
if incoming.Payload == nil && incoming.BinaryPayload == nil {
slog.Debug("No payload was found - rejecting message and exiting")
rejectMessage(false)
return
return resource, insights, fmt.Errorf("No payload was found")
}
if len(incoming.Payload) != 0 {
insights.InputPayload = Payload
Expand All @@ -180,43 +215,5 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
insights.InputPayload = BinaryPayload
}

// Debug
//if h.EnableDebug {
// log.Println("[DEBUG] insights:", insights)
// log.Println("[DEBUG] target:", resource)
//}
slog.Debug("Insights", "data", fmt.Sprint(insights))
slog.Debug("Target", "data", fmt.Sprint(resource))

// Process s3 upload
if !h.S3Config.Disabled {
if insights.InsightsType != Direct {
err := h.sendToLagoonS3(incoming, insights, resource)
if err != nil {
//log.Printf("Unable to send to S3: %s", err.Error())
slog.Error("Unable to send to S3", "Error", err.Error())

// TODO: BETTER ERROR HANDLING
}
}
}

// Process Lagoon API integration
if !h.LagoonAPI.Disabled {
if insights.InsightsType != Sbom &&
insights.InsightsType != Image &&
insights.InsightsType != Raw &&
insights.InsightsType != Direct {
slog.Error("only 'sbom', 'direct', 'raw', and 'image' types are currently supported for api processing")
} else {
err := h.sendToLagoonAPI(incoming, resource, insights)

if err != nil {
slog.Error("Unable to send to the API", "Error", err.Error())
rejectMessage(false)
return
}
}
}
acknowledgeMessage()
return resource, insights, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Asset description

* test1_envtesting.json - contains a fully stacked "env" section that is used to ensure that parsing and checking environments works.
Loading

0 comments on commit 2c7f67e

Please sign in to comment.