Skip to content

Commit

Permalink
Scaffold async runner that creates index and push dummy data
Browse files Browse the repository at this point in the history
  • Loading branch information
papey committed Mar 11, 2023
1 parent 31e6391 commit 6a41e0f
Showing 1 changed file with 60 additions and 1 deletion.
61 changes: 60 additions & 1 deletion src/exporters/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use elasticsearch::{
http::transport::{SingleNodeConnectionPool, Transport, TransportBuilder},
CreateParts, Elasticsearch, Error,
};
use hyper::StatusCode;
use serde::{Deserialize, Serialize};
use url::Url;

/// Default url for Elastic endpoint
Expand Down Expand Up @@ -41,7 +43,9 @@ impl Exporter for ElasticExporter {
Err(e) => panic!("{}", e),
};

client.create(CreateParts::IndexId("test", "test")).send();
if let Err(e) = self.runner(client) {
error!("{}", e)
}
}

fn get_options() -> Vec<clap::Arg<'static, 'static>> {
Expand Down Expand Up @@ -110,12 +114,67 @@ impl Exporter for ElasticExporter {
}
}

const ES_INDEX_NAME: &str = "scaphandre";

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ScaphandreData {
pub wip: i32,
}

impl ElasticExporter {
/// Instantiates and returns a new ElasticExporter
// TODO: make sensor mutable
pub fn new(sensor: Box<dyn Sensor>) -> ElasticExporter {
ElasticExporter { _sensor: sensor }
}

#[tokio::main]
pub async fn runner(&self, client: Elasticsearch) -> Result<(), Error> {
self.ensure_index(&client).await?;

// WIP
let create_test_resp = client
.create(CreateParts::IndexId(ES_INDEX_NAME, "42"))
.body(ScaphandreData { wip: 42 })
.send()
.await?;

println!("create test resp {}", create_test_resp.status_code());

Ok(())
}

async fn ensure_index(&self, client: &Elasticsearch) -> Result<(), Error> {
let index_exist_resp = client
.indices()
.exists(elasticsearch::indices::IndicesExistsParts::Index(&[
ES_INDEX_NAME,
]))
.send()
.await?;

if index_exist_resp.status_code() == StatusCode::OK {
return Ok(());
}

let index_create_resp = client
.indices()
.create(elasticsearch::indices::IndicesCreateParts::Index(
ES_INDEX_NAME,
))
.send()
.await?;

// WIP
if !index_create_resp.status_code().is_success() {
println!(
"Error while creating index: status_code {}",
index_create_resp.status_code()
)
}

Ok(())
}
}

/// Inits a new elastic client
Expand Down

0 comments on commit 6a41e0f

Please sign in to comment.