From 6a41e0fd6b6697c4bdb5cc4ba4361833cd692c1b Mon Sep 17 00:00:00 2001 From: Wilfried OLLIVIER Date: Fri, 11 Nov 2022 17:18:11 +0100 Subject: [PATCH] Scaffold async runner that creates index and push dummy data --- src/exporters/elastic.rs | 61 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/exporters/elastic.rs b/src/exporters/elastic.rs index e60dd8b4..dacd3016 100644 --- a/src/exporters/elastic.rs +++ b/src/exporters/elastic.rs @@ -11,6 +11,8 @@ use elasticsearch::{ http::transport::{SingleNodeConnectionPool, Transport, TransportBuilder}, CreateParts, Elasticsearch, Error, }; +use hyper::StatusCode; +use serde::{Deserialize, Serialize}; use url::Url; /// Default url for Elastic endpoint @@ -41,7 +43,9 @@ impl Exporter for ElasticExporter { Err(e) => panic!("{}", e), }; - client.create(CreateParts::IndexId("test", "test")).send(); + if let Err(e) = self.runner(client) { + error!("{}", e) + } } fn get_options() -> Vec> { @@ -110,12 +114,67 @@ impl Exporter for ElasticExporter { } } +const ES_INDEX_NAME: &str = "scaphandre"; + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct ScaphandreData { + pub wip: i32, +} + impl ElasticExporter { /// Instantiates and returns a new ElasticExporter // TODO: make sensor mutable pub fn new(sensor: Box) -> ElasticExporter { ElasticExporter { _sensor: sensor } } + + #[tokio::main] + pub async fn runner(&self, client: Elasticsearch) -> Result<(), Error> { + self.ensure_index(&client).await?; + + // WIP + let create_test_resp = client + .create(CreateParts::IndexId(ES_INDEX_NAME, "42")) + .body(ScaphandreData { wip: 42 }) + .send() + .await?; + + println!("create test resp {}", create_test_resp.status_code()); + + Ok(()) + } + + async fn ensure_index(&self, client: &Elasticsearch) -> Result<(), Error> { + let index_exist_resp = client + .indices() + .exists(elasticsearch::indices::IndicesExistsParts::Index(&[ + ES_INDEX_NAME, + ])) + .send() + .await?; + + if index_exist_resp.status_code() == StatusCode::OK { + return Ok(()); + } + + let index_create_resp = client + .indices() + .create(elasticsearch::indices::IndicesCreateParts::Index( + ES_INDEX_NAME, + )) + .send() + .await?; + + // WIP + if !index_create_resp.status_code().is_success() { + println!( + "Error while creating index: status_code {}", + index_create_resp.status_code() + ) + } + + Ok(()) + } } /// Inits a new elastic client