Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix interactions between missing/deleted params and param subscription (get_param_cached) #9

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 82 additions & 14 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,19 @@ impl Handler for RegisterServiceHandler {
}
}

async fn register_node(nodes : &RwLock<Nodes>, caller_id: &str, caller_api : &str) -> () {
async fn register_node(nodes: &RwLock<Nodes>, caller_id: &str, caller_api: &str) -> () {
let shutdown_api_url;
{
let mut nodes = nodes.write().unwrap();
match nodes.entry(caller_id.to_owned()) {
Entry::Vacant(v) => {
v.insert(caller_api.to_owned());
return
},
return;
}
Entry::Occupied(mut e) => {
let e = e.get_mut();
if e == caller_api {
return
return;
} else {
shutdown_api_url = std::mem::replace(e, caller_api.to_owned());
}
Expand All @@ -200,9 +200,14 @@ async fn register_node(nodes : &RwLock<Nodes>, caller_id: &str, caller_api : &st
}
}

async fn shutdown_node(client_api_url: &str, node_id : &str) -> anyhow::Result<()> {
async fn shutdown_node(client_api_url: &str, node_id: &str) -> anyhow::Result<()> {
let client_api = ClientApi::new(client_api_url);
let res = client_api.shutdown("/master", &format!("[{}] Reason: new node registered with same name", node_id)).await;
let res = client_api
.shutdown(
"/master",
&format!("[{}] Reason: new node registered with same name", node_id),
)
.await;
res
}

Expand Down Expand Up @@ -296,7 +301,7 @@ impl Handler for RegisterSubscriberHandler {
.entry(topic.clone())
.or_default()
.insert(caller_id.clone());

register_node(&self.data.nodes, &caller_id, &caller_api).await;

let publishers = self
Expand Down Expand Up @@ -459,9 +464,12 @@ impl Handler for RegisterPublisherHandler {
.await;
match r {
Err(e) => log::warn!("publisherUpdate call to {} failed: {}", client_api_url, e),
Ok(v) => log::debug!("publisherUpdate call to {} succeeded, returning: {:?}", client_api_url, v)
Ok(v) => log::debug!(
"publisherUpdate call to {} succeeded, returning: {:?}",
client_api_url,
v
),
}

}

return Ok((1, "", subscribers_api_urls).try_to_value()?);
Expand Down Expand Up @@ -848,8 +856,63 @@ impl Handler for DeleteParamHandler {
type Request = (String, String);
let (caller_id, key) = Request::try_from_params(params)?;
let key = resolve(&caller_id, &key);
let key = key.strip_prefix('/').unwrap_or(&key).split('/');
self.data.parameters.write().unwrap().remove(key);
let key_split = key.strip_prefix('/').unwrap_or(&key).split('/');

let mut update_futures = JoinSet::new();

{
let mut params = self.data.parameters.write().unwrap();
params.remove(key_split);
let param_subscriptions = self.data.parameter_subscriptions.read().unwrap();
log::info!("updating subscribers of deleted param {}", &key);
for subscription in param_subscriptions.iter() {
log::debug!(
"subscriber {:?} has subscription? {}",
&subscription,
one_is_prefix_of_the_other(&key, &subscription.param)
);
if one_is_prefix_of_the_other(&key, &subscription.param) {
let subscribed_key_spit = subscription
.param
.strip_prefix('/')
.unwrap_or(&subscription.param)
.split('/');
let new_value = params
.get(subscribed_key_spit)
.unwrap_or_else(|| crate::empty_struct());
update_futures.spawn(update_client_with_new_param_value(
subscription.api_uri.clone(),
caller_id.clone(),
subscription.node_id.clone(),
subscription.param.clone(),
new_value,
));
}
}
}

while let Some(res) = update_futures.join_next().await {
match res {
Ok(Ok(v)) => {
log::debug!("a subscriber has been updated (res: {:#?})", &v);
}
Ok(Err(err)) => {
log::warn!(
"Error updating a subscriber of changed param {}:\n{:#?}",
&key,
err
);
}
Err(err) => {
log::warn!(
"Error updating a subscriber of changed param {}:\n{:#?}",
&key,
err
);
}
}
}

return Ok((1, "", 0).try_to_value()?);
}
}
Expand Down Expand Up @@ -1012,7 +1075,11 @@ impl Handler for GetParamHandler {

Ok(match params.get(key_path) {
Some(value) => (1, format!("Parameter [{}]", &key_full), value.to_owned()),
None => (-1, format!("Parameter [{}] is not set", &key_full), Value::i4(0)),
None => (
-1,
format!("Parameter [{}] is not set", &key_full),
Value::i4(0),
),
}
.try_to_value()?)
}
Expand Down Expand Up @@ -1128,8 +1195,9 @@ impl Handler for SubscribeParamHandler {
.read()
.unwrap()
.get(key_split)
.unwrap_or(Value::string("".to_owned()));
Ok((1, "", value).try_to_value()?)
.unwrap_or_else(|| crate::empty_struct());

Ok((1, &format!("Subscribed to parameter [{}]", &key), value).try_to_value()?)
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod client_api;
pub mod core;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use url::Url;
use dxr::{TryToValue, Value};

mod param_tree;

Expand All @@ -34,3 +35,7 @@ pub fn url_to_socket_addr(url: &Url) -> anyhow::Result<SocketAddr> {
let port = url.port().expect("Invalid URL: no port specified");
Ok(SocketAddr::new(ip_addr, port))
}

fn empty_struct() -> Value {
std::collections::HashMap::<String, i32>::new().try_to_value().unwrap()
}
Loading