Skip to content

Commit

Permalink
feat(datadog): add tokio as optional dependency
Browse files Browse the repository at this point in the history
Signed-off-by: Jérémie Drouet <[email protected]>
  • Loading branch information
jdrouet committed Mar 10, 2021
1 parent 7747f75 commit 8e6e2ae
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 29 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"

[features]
default = ["datadog"]
datadog = ["datadog-client"]
datadog = ["datadog-client", "tokio"]

[dependencies]
loggerv = "0.7.2"
Expand All @@ -30,6 +30,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

datadog-client = { version = "0.1", optional = true }
tokio = { version = "1", features = ["full"], optional = true }

[profile.release]
lto = true
Expand Down
22 changes: 15 additions & 7 deletions src/exporters/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use datadog_client::metrics::{Point, Serie, Type};
use std::collections::HashMap;
use std::thread;
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;

fn merge<A>(first: Vec<A>, second: Vec<A>) -> Vec<A> {
second.into_iter().fold(first, |mut res, item| {
Expand Down Expand Up @@ -32,7 +33,10 @@ pub struct DatadogExporter {
impl Exporter for DatadogExporter {
/// Lanches runner()
fn run(&mut self, parameters: ArgMatches) {
self.runner(&parameters);
let rt = Runtime::new().unwrap();
rt.block_on(async move {
self.runner(&parameters).await;
});
}

/// Returns options needed for that exporter, as a HashMap
Expand Down Expand Up @@ -87,7 +91,7 @@ impl DatadogExporter {
Client::new(config)
}

fn runner(&mut self, parameters: &ArgMatches) {
async fn runner(&mut self, parameters: &ArgMatches<'_>) {
if let Some(timeout) = parameters.value_of("timeout") {
let now = Instant::now();
let timeout = timeout
Expand All @@ -110,18 +114,22 @@ impl DatadogExporter {
info!("Measurement step is: {}s", step_duration);

while now.elapsed().as_secs() <= timeout {
self.iterate(parameters);
self.iterate(parameters).await;
thread::sleep(Duration::new(step_duration, step_duration_nano));
}
} else {
self.iterate(parameters);
self.iterate(parameters).await;
}
}

fn iterate(&mut self, parameters: &ArgMatches) {
async fn iterate(&mut self, parameters: &ArgMatches<'_>) {
self.topology.refresh();
let _series = self.collect_series();
let _client = Self::build_client(parameters);
let series = self.collect_series();
let client = Self::build_client(parameters);
match client.post_metrics(&series).await {
Ok(_) => log::debug!("metrics sent"),
Err(_) => log::warn!("unable to send metrics"),
};
}

fn create_consumption_serie(&self) -> Serie {
Expand Down
59 changes: 38 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pub mod exporters;
pub mod sensors;
use clap::ArgMatches;
use exporters::{
json::JSONExporter, prometheus::PrometheusExporter, qemu::QemuExporter,
riemann::RiemannExporter, stdout::StdoutExporter, Exporter, ExporterOption,
datadog::DatadogExporter, json::JSONExporter, prometheus::PrometheusExporter,
qemu::QemuExporter, riemann::RiemannExporter, stdout::StdoutExporter, Exporter, ExporterOption,
};
use sensors::{powercap_rapl::PowercapRAPLSensor, Sensor};
use std::collections::HashMap;
Expand Down Expand Up @@ -52,32 +52,44 @@ fn get_sensor(matches: &ArgMatches) -> Box<dyn Sensor> {
pub fn run(matches: ArgMatches) {
loggerv::init_with_verbosity(matches.occurrences_of("v")).unwrap();

let sensor_boxed = get_sensor(&matches);
let exporter_parameters;

if let Some(stdout_exporter_parameters) = matches.subcommand_matches("stdout") {
exporter_parameters = stdout_exporter_parameters.clone();
let mut exporter = StdoutExporter::new(sensor_boxed);
let exporter_parameters = stdout_exporter_parameters.clone();
let mut exporter = StdoutExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else if let Some(json_exporter_parameters) = matches.subcommand_matches("json") {
exporter_parameters = json_exporter_parameters.clone();
let mut exporter = JSONExporter::new(sensor_boxed);
return;
}
if let Some(json_exporter_parameters) = matches.subcommand_matches("json") {
let exporter_parameters = json_exporter_parameters.clone();
let mut exporter = JSONExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else if let Some(riemann_exporter_parameters) = matches.subcommand_matches("riemann") {
exporter_parameters = riemann_exporter_parameters.clone();
let mut exporter = RiemannExporter::new(sensor_boxed);
return;
}
if let Some(riemann_exporter_parameters) = matches.subcommand_matches("riemann") {
let exporter_parameters = riemann_exporter_parameters.clone();
let mut exporter = RiemannExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else if let Some(prometheus_exporter_parameters) = matches.subcommand_matches("prometheus") {
exporter_parameters = prometheus_exporter_parameters.clone();
let mut exporter = PrometheusExporter::new(sensor_boxed);
return;
}
if let Some(prometheus_exporter_parameters) = matches.subcommand_matches("prometheus") {
let exporter_parameters = prometheus_exporter_parameters.clone();
let mut exporter = PrometheusExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else if let Some(qemu_exporter_parameters) = matches.subcommand_matches("qemu") {
exporter_parameters = qemu_exporter_parameters.clone();
let mut exporter = QemuExporter::new(sensor_boxed);
return;
}
if let Some(qemu_exporter_parameters) = matches.subcommand_matches("qemu") {
let exporter_parameters = qemu_exporter_parameters.clone();
let mut exporter = QemuExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
} else {
error!("Couldn't determine which exporter has been chosen.");
return;
}
#[cfg(feature = "datadog")]
if let Some(datadog_exporter_parameters) = matches.subcommand_matches("datadog") {
let exporter_parameters = datadog_exporter_parameters.clone();
let mut exporter = DatadogExporter::new(get_sensor(&matches));
exporter.run(exporter_parameters);
return;
}
error!("Couldn't determine which exporter has been chosen.");
}

/// Returns options needed for each exporter as a HashMap.
Expand All @@ -104,6 +116,11 @@ pub fn get_exporters_options() -> HashMap<String, HashMap<String, ExporterOption
String::from("qemu"),
exporters::qemu::QemuExporter::get_options(),
);
#[cfg(feature = "datadog")]
options.insert(
String::from("datadog"),
exporters::datadog::DatadogExporter::get_options(),
);
options
}

Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ fn main() {
"prometheus" => "Prometheus exporter exposes power consumption metrics on an http endpoint (/metrics is default) in prometheus accepted format",
"riemann" => "Riemann exporter sends power consumption metrics to a Riemann server",
"qemu" => "Qemu exporter watches all Qemu/KVM virtual machines running on the host and exposes metrics of each of them in a dedicated folder",
#[cfg(feature = "datadog")]
"datadog" => "Datadog exporter sends power consumption metrics to Datadog",
_ => "Unknown exporter",
}
);
Expand Down

0 comments on commit 8e6e2ae

Please sign in to comment.