Skip to content

Commit

Permalink
added option to send event message payloads directly to integration e…
Browse files Browse the repository at this point in the history
…ndpoint targets
  • Loading branch information
krystianity committed Oct 7, 2024
1 parent 85e2a4f commit 7bf08b5
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 31 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# ilagent CHANGELOG

## 2024-10-07, Version 0.5.1

* added option to send event message payloads directly to integration endpoint targets

## 2024-10-04, Version 0.5.0

* upgraded dependencies
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ilagent"
version = "0.5.0"
version = "0.5.1"
authors = ["Chris Froehlingsdorf <[email protected]>"]
edition = "2021"

Expand All @@ -16,7 +16,7 @@ json = "0.12"
reqwest = "0.12"
chrono = "0.4"
clap = "2"
ilert = "4.0.1"
ilert = "4.1.1"
uuid = { version = "1.10", features = ["v4"] }
ctrlc = { version = "3.4" }
rusqlite = { version = "0.32", features = ["bundled"] } # SQLite 3.46.0
Expand Down
2 changes: 2 additions & 0 deletions docs/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
listener 1883
allow_anonymous true
8 changes: 8 additions & 0 deletions docs/mqtt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
### Testing MQTT locally

Run the broker:

```sh
docker run -d -p 1883:1883 -p 9001:9001 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
```
Recommended UI: https://mqttx.app
2 changes: 1 addition & 1 deletion examples/_mqtt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash
cargo run -- daemon -p 8977 -v -v --heartbeat il1hbt123 \
-m 192.168.1.14 -q 1883 -n ilbro -e 'ilert/e' -r 'ilert/h'
-m localhost -q 1883 -n ilbro -e 'ilert/e' -r 'ilert/h'
2 changes: 1 addition & 1 deletion examples/_mqtt_overwrite
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
cargo run -- daemon -v -v \
-m 127.0.0.1 -q 1883 -n ilagent -e '#' \
-m localhost -q 1883 -n ilagent -e '#' \
--event_key 'il1api123...' \
--map_key_alert_key 'mCode' \
--map_key_summary 'comment' \
Expand Down
15 changes: 9 additions & 6 deletions src/consumers/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,15 @@ async fn handle_event_message(daemon_context: Arc<DaemonContext>, key: &str, pay
let parsed = EventQueueItemJson::parse_event_json(&daemon_context.config, payload, topic);
if let Some(mut event) = parsed {
// info!("Event queue item: {:?}", event);
event.customDetails = Some(json!({
"kafka_key": key,
"kafka_topic": topic
}));
let db_event_format = EventQueueItemJson::to_db(event);
let should_retry = poll::process_queued_event(&daemon_context.ilert_client, &db_event_format).await;
if event.customDetails.is_none() {
event.customDetails = Some(json!({
"messageKey": key,
"topic": topic
}));
}
let event_api_path = format!("/v1/events/kafka/{}", event.apiKey.as_str());
let db_event_format = EventQueueItemJson::to_db(event, Some(event_api_path));
let should_retry = poll::send_queued_event(&daemon_context.ilert_client, &db_event_format).await;
should_retry
} else {
false
Expand Down
12 changes: 9 additions & 3 deletions src/consumers/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{str, thread};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use ilert::ilert::ILert;

use serde_json::json;
use crate::db::ILDatabase;
use crate::config::ILConfig;
use crate::{hbt, DaemonContext};
Expand Down Expand Up @@ -132,8 +132,14 @@ fn handle_heartbeat_message(payload: &str) -> () {

fn handle_event_message(config: &ILConfig, db: &ILDatabase, payload: &str, topic: &str) -> () {
let parsed = crate::models::event::EventQueueItemJson::parse_event_json(&config, payload, topic);
if let Some(event) = parsed {
let db_event = EventQueueItemJson::to_db(event);
if let Some(mut event) = parsed {
if event.customDetails.is_none() {
event.customDetails = Some(json!({
"topic": topic
}));
}
let event_api_path = format!("/v1/events/mqtt/{}", event.apiKey.as_str());
let db_event = EventQueueItemJson::to_db(event, Some(event_api_path));
let insert_result = db.create_il_event(&db_event);
match insert_result {
Ok(res) => match res {
Expand Down
30 changes: 24 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::models::event_db::EventQueueItem;
const DB_MIGRATION_VAL: &str = "1";
const DB_MIGRATION_V1: &str = "mig_1";
const DB_MIGRATION_V2: &str = "mig_2";
const DB_MIGRATION_V3: &str = "mig_3";

#[derive(Debug)]
struct ILAgentItem {
Expand Down Expand Up @@ -96,6 +97,19 @@ impl ILDatabase {
info!("Database migrated to {}", DB_MIGRATION_V2);
}

let mig_3 = self.get_il_value(DB_MIGRATION_V3);
if mig_3.is_none() {

self.conn.execute(
"ALTER TABLE event_items ADD COLUMN event_api_path TEXT NULL",
[],
).expect("Database migration failed (v3, 1)");

self.set_il_val(DB_MIGRATION_V3, DB_MIGRATION_VAL)
.expect("Database migration failed (v3, set)");
info!("Database migrated to {}", DB_MIGRATION_V3);
}

/*
Run simple db migrations, if needed, like this:
Expand Down Expand Up @@ -186,13 +200,15 @@ impl ILDatabase {
images: row.get(7).unwrap_or(None),
links: row.get(8).unwrap_or(None),
custom_details: row.get(9).unwrap_or(None),
details: row.get(10).unwrap_or(None)
details: row.get(10).unwrap_or(None),
event_api_path: row.get(11).unwrap_or(None)
})
}

pub fn get_il_event(&self, event_id: &str) -> Result<Option<EventQueueItem>, rusqlite::Error> {

let mut stmt = self.conn.prepare("SELECT * FROM event_items WHERE id = ?1").unwrap();
let mut stmt = self.conn.prepare("SELECT id, api_key, event_type, alert_key, summary, created_at,
priority, images, links, custom_details, details, event_api_path FROM event_items WHERE id = ?1")?;
let query_result = stmt
.query_map(&[&event_id], |row| {
ILDatabase::convert_db_row_to_event(row)
Expand Down Expand Up @@ -231,7 +247,8 @@ impl ILDatabase {

pub fn get_il_events(&self, limit: i32) -> Result<Vec<EventQueueItem>, rusqlite::Error> {

let mut stmt = self.conn.prepare("SELECT * FROM event_items ORDER BY inserted_at ASC LIMIT ?1").unwrap();
let mut stmt = self.conn.prepare("SELECT id, api_key, event_type, alert_key, summary, created_at,
priority, images, links, custom_details, details, event_api_path FROM event_items ORDER BY inserted_at ASC LIMIT ?1")?;
let query_result = stmt
.query_map(&[&limit], |row| {
ILDatabase::convert_db_row_to_event(row)
Expand Down Expand Up @@ -276,11 +293,12 @@ impl ILDatabase {

let insert_result = self.conn.execute(
"INSERT INTO event_items (api_key, event_type, alert_key, summary, created_at, id,
priority, images, links, custom_details, details)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
priority, images, links, custom_details, details, event_api_path)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
&[&item.api_key as &dyn ToSql, &item.event_type, &item.alert_key,
&item.summary, created_at, &item_id,
&item.priority, &item.images, &item.links, &item.custom_details, &item.details],
&item.priority, &item.images, &item.links, &item.custom_details, &item.details,
&item.event_api_path],
);

match insert_result {
Expand Down
2 changes: 1 addition & 1 deletion src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn post_event(_req: HttpRequest, container: web::Data<Mutex<WebContextCont
let container = container.lock().await;

let event = event.into_inner();
let event = EventQueueItemJson::to_db(event);
let event = EventQueueItemJson::to_db(event, None);

if ILertEventType::from_str(event.event_type.as_str()).is_err() {
return HttpResponse::BadRequest().json(json!({ "error": "Unsupported value for field 'eventType'." }));
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ async fn run_event(matches: ArgMatches<'_>) -> () {
event.images = images;
event.links = links;

poll::process_queued_event(&ilert_client, &event).await;
poll::send_queued_event(&ilert_client, &event).await;
}

/**
Expand Down
5 changes: 3 additions & 2 deletions src/models/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl EventQueueItemJson {
}
}

pub fn to_db(item: EventQueueItemJson) -> EventQueueItem {
pub fn to_db(item: EventQueueItemJson, event_api_path: Option<String>) -> EventQueueItem {

let images = match item.images {
Some(v) => {
Expand Down Expand Up @@ -91,7 +91,8 @@ impl EventQueueItemJson {
priority: item.priority,
images,
links,
custom_details
custom_details,
event_api_path
}
}

Expand Down
9 changes: 6 additions & 3 deletions src/models/event_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub struct EventQueueItem {
pub priority: Option<String>,
pub images: Option<String>,
pub links: Option<String>,
pub custom_details: Option<String>
pub custom_details: Option<String>,
pub event_api_path: Option<String>
}

impl EventQueueItem {
Expand All @@ -30,7 +31,8 @@ impl EventQueueItem {
priority: None,
images: None,
links: None,
custom_details: None
custom_details: None,
event_api_path: None
}
}

Expand All @@ -47,7 +49,8 @@ impl EventQueueItem {
priority: None,
images: None,
links: None,
custom_details: None
custom_details: None,
event_api_path: None
}
}
}
22 changes: 17 additions & 5 deletions src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn run_poll_job(daemon_ctx: Arc<DaemonContext>) -> () {
async fn process_queued_events(daemon_ctx: Arc<DaemonContext>, events: Vec<EventQueueItem>) -> () {

for event in events.iter() {
let should_retry = process_queued_event(&daemon_ctx.ilert_client, event).await;
let should_retry = send_queued_event(&daemon_ctx.ilert_client, event).await;
let event_id = event.id.clone().unwrap_or("".to_string());
if !should_retry {
let del_result = daemon_ctx.db.lock().await.delete_il_event(event_id.as_str());
Expand All @@ -51,7 +51,7 @@ async fn process_queued_events(daemon_ctx: Arc<DaemonContext>, events: Vec<Event
}
}

pub async fn process_queued_event(ilert_client: &ILert, event: &EventQueueItem) -> bool {
pub async fn send_queued_event(ilert_client: &ILert, event: &EventQueueItem) -> bool {

let parsed_event = EventQueueItemJson::from_db(event.clone());

Expand Down Expand Up @@ -79,8 +79,15 @@ pub async fn process_queued_event(ilert_client: &ILert, event: &EventQueueItem)
None => None
};

let post_result = ilert_client
.create()
let mut post_request = ilert_client.create();

if let Some(event_api_path) = event.event_api_path.as_ref() {
post_request.builder.options.path = Some(event_api_path.to_string());
} else {
post_request.builder.options.path = Some("/events".to_string());
}

let post_result = post_request
.event_with_details(
event.api_key.as_str(),
event_type,
Expand All @@ -92,7 +99,7 @@ pub async fn process_queued_event(ilert_client: &ILert, event: &EventQueueItem)
parsed_event.links,
parsed_event.customDetails,
None
)
)
.execute()
.await;

Expand All @@ -117,6 +124,11 @@ pub async fn process_queued_event(ilert_client: &ILert, event: &EventQueueItem)
return true; // too many requests, retry
}

if status == 404 {
warn!("Event {} failed with bad URL {}, potentially due to bad api key value", event_id, response.url);
return false; // no point in retrying
}

if status > 499 {
warn!("Event {} failed server side exception", event_id);
return true; // 500 exceptions, retry
Expand Down

0 comments on commit 7bf08b5

Please sign in to comment.