Skip to content

Commit

Permalink
fix busy cpu loop
Browse files Browse the repository at this point in the history
  • Loading branch information
timglabisch committed Nov 17, 2023
1 parent e2785b9 commit c25a4c2
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 24 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ build-linux-on-osx:
build: build-linux-on-osx

run:
RUST_BACKTRACE=1 cargo run -- \
cargo run --release -- \
--udp-bind=127.0.0.1:1113 \
--http-bind=127.0.0.1:8080 \
--token foo\
--agent-key foo\
--account-url foo\
--debug

example_php_client:
Expand Down
11 changes: 5 additions & 6 deletions openmetrics_udpserver/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ impl Processor {
let regex_allowed_chars = Regex::new(r"^[^a-zA-Z_:]|[^a-zA-Z0-9_:]")
.expect("Unable to compile metrics naming regex, should not happen");
loop {
match receiver.try_recv() {

This comment has been minimized.

Copy link
@timglabisch

timglabisch Nov 17, 2023

Author Member

try_recv darf man nicht so verwenden.
das verbraucht 100% CPU weil es nicht blockiert sondern guckt ob was da ist und ansonsten weiter läuft.

Ich glaube es passt nun überall, guck dir ruhig den fehler noch mal genau an und schau, ob du ähnliche stellen hast.
das profiling sieht nun aber okay aus.


match receiver.recv().await {
Ok(inbound_metric) => {
let metric_name = regex_allowed_chars
.replace_all(&inbound_metric.name.replace('.', "_"), "")
Expand Down Expand Up @@ -75,11 +76,9 @@ impl Processor {
}
}
}
Err(TryRecvError::Empty | TryRecvError::Lagged(_)) => {
yield_now().await;
}
Err(TryRecvError::Closed) => {
panic!("All metric senders were dropped, should not happen")
Err(e) => {
eprintln!("processor recv error {:#?}, investigate!", e);
::tokio::time::sleep(Duration::from_millis(300)).await;
}
}
}
Expand Down
10 changes: 4 additions & 6 deletions openmetrics_udpserver/src/serverdensity/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ServerDensityAggregator {
loop {
let mut i = 0;

match receiver.try_recv() {
match receiver.recv().await {
Ok(metric) => {
let metric_name = regex.replace_all(&metric.name, "").trim().to_string();

Expand Down Expand Up @@ -164,11 +164,9 @@ impl ServerDensityAggregator {
);
}
}
Err(TryRecvError::Closed) => {
panic!("channel disconnected, should never happen.");
}
Err(TryRecvError::Empty | TryRecvError::Lagged(_)) => {
break;
Err(e) => {
eprintln!("aggregator recv error {:#?}, investigate!", e);
::tokio::time::sleep(Duration::from_millis(300)).await;
}
};
}
Expand Down
21 changes: 10 additions & 11 deletions openmetrics_udpserver/src/udp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@ impl UdpServer {
.await
.expect("Unable to bind UDP Server");
loop {
if udp_socket.readable().await.is_ok() {
let mut buf = [0; 300];
if let Ok(read_bytes) = udp_socket.try_recv(&mut buf) {
match self.decode_buffer(&buf, read_bytes) {
Ok(inbound_metric) => {
if let Err(err) = self.metric_sender.send(inbound_metric) {
eprintln!("Unable to process inbound metric: {}", err);
}
}
Err(err) => {
eprintln!("could not decode message from socket: {}", err);
let mut buf = [0; 300];
if let Ok(read_bytes) = udp_socket.recv(&mut buf).await {
match self.decode_buffer(&buf, read_bytes) {
Ok(inbound_metric) => {
if let Err(err) = self.metric_sender.send(inbound_metric) {
eprintln!("Unable to process inbound metric: {}", err);
}
}
Err(err) => {
// it could be, that we are so fast that we read a part of the message, may we need to improve this code.
eprintln!("could not decode message from socket: {}", err);
}
}
}
}
Expand Down

0 comments on commit c25a4c2

Please sign in to comment.