Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No functional changes; (re)ordering imports and fixing clippy warnings #402

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{path::PathBuf, time::Duration};
use std::path::PathBuf;

use tokio::time::Duration;

use crate::server::prelude::Config;

Expand All @@ -16,6 +18,7 @@ impl ClientBuilder {
}

/// Creates a `ClientBuilder` using a configuration file as the initial state.
#[expect(clippy::result_unit_err)]
pub fn from_config(path: impl Into<PathBuf>) -> Result<ClientBuilder, ()> {
Ok(ClientBuilder {
config: ClientConfig::load(&path.into())?,
Expand Down
26 changes: 14 additions & 12 deletions lib/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use std::{
collections::BTreeMap,
path::{Path, PathBuf},
str::FromStr,
time::Duration,
};

use tokio::time::Duration;

use crate::{
core::config::Config,
crypto::SecurityPolicy,
Expand Down Expand Up @@ -82,17 +83,18 @@ impl ClientUserToken {
);
valid = false;
}
} else {
if self.cert_path.is_none() && self.private_key_path.is_none() {
error!(
"User token {} fails to provide a password or certificate info.",
self.user
);
valid = false;
} else if self.cert_path.is_none() || self.private_key_path.is_none() {
error!("User token {} fails to provide both a certificate path and a private key path.", self.user);
valid = false;
}
} else if self.cert_path.is_none() && self.private_key_path.is_none() {
error!(
"User token {} fails to provide a password or certificate info.",
self.user
);
valid = false;
} else if self.cert_path.is_none() || self.private_key_path.is_none() {
error!(
"User token {} fails to provide both a certificate path and a private key path.",
self.user
);
valid = false;
}
valid
}
Expand Down
3 changes: 2 additions & 1 deletion lib/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@
//!
//! ```no_run
//! use std::sync::Arc;
//! use std::time::Duration;
//!
//! use opcua::client::{ClientBuilder, IdentityToken, Session, DataChangeCallback, MonitoredItem};
//! use opcua::types::{
//! EndpointDescription, MessageSecurityMode, UserTokenPolicy, StatusCode,
//! NodeId, TimestampsToReturn, MonitoredItemCreateRequest, DataValue
//! };
//! use tokio::time::Duration;
//!
//! #[tokio::main]
//! async fn main() {
Expand Down
4 changes: 2 additions & 2 deletions lib/src/client/retry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use tokio::time::Duration;

pub(crate) struct ExponentialBackoff {
max_sleep: Duration,
Expand Down Expand Up @@ -26,7 +26,7 @@ impl Iterator for ExponentialBackoff {
return None;
}

let next_sleep = self.current_sleep.clone();
let next_sleep = self.current_sleep;
self.current_sleep = self.max_sleep.min(self.current_sleep * 2);
self.retry_count += 1;

Expand Down
28 changes: 10 additions & 18 deletions lib/src/client/session/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{path::PathBuf, str::FromStr, sync::Arc};

use chrono::Duration;
use tokio::{pin, select};

use crate::{
client::{
Expand Down Expand Up @@ -165,9 +164,8 @@ impl Client {
let server_endpoints = self
.get_server_endpoints_from_url(server_url)
.await
.map_err(|status_code| {
.inspect_err(|status_code| {
error!("Cannot get endpoints for server, error - {}", status_code);
status_code
})?;

// Find the server endpoint that matches the one desired
Expand All @@ -180,12 +178,11 @@ impl Client {
endpoint.security_mode,
)
.ok_or(StatusCode::BadTcpEndpointUrlInvalid)
.map_err(|status_code| {
.inspect_err(|_| {
error!(
"Cannot find matching endpoint for {}",
endpoint.endpoint_url.as_ref()
);
status_code
})?;

Ok(self
Expand Down Expand Up @@ -509,10 +506,10 @@ impl Client {
let mut evt_loop = channel.connect().await?;

let send_fut = self.get_server_endpoints_inner(&endpoint, &channel);
pin!(send_fut);
tokio::pin!(send_fut);

let res = loop {
select! {
tokio::select! {
r = evt_loop.poll() => {
if let TransportPollResult::Closed(e) = r {
return Err(e);
Expand Down Expand Up @@ -549,12 +546,7 @@ impl Client {
let response = channel.send(request, self.config.request_timeout).await?;
if let SupportedMessage::FindServersResponse(response) = response {
process_service_result(&response.response_header)?;
let servers = if let Some(servers) = response.servers {
servers
} else {
Vec::new()
};
Ok(servers)
Ok(response.servers.unwrap_or_default())
} else {
Err(process_unexpected_response(response))
}
Expand Down Expand Up @@ -588,10 +580,10 @@ impl Client {
let mut evt_loop = channel.connect().await?;

let send_fut = self.find_servers_inner(discovery_endpoint_url, &channel);
pin!(send_fut);
tokio::pin!(send_fut);

let res = loop {
select! {
tokio::select! {
r = evt_loop.poll() => {
if let TransportPollResult::Closed(e) = r {
return Err(e);
Expand Down Expand Up @@ -731,7 +723,7 @@ impl Client {

let Some(endpoint) = endpoints
.iter()
.filter(|e| self.is_supported_endpoint(*e))
.filter(|e| self.is_supported_endpoint(e))
.max_by(|a, b| a.security_level.cmp(&b.security_level))
else {
error!("Cannot find an endpoint that we call register server on");
Expand All @@ -753,10 +745,10 @@ impl Client {
let mut evt_loop = channel.connect().await?;

let send_fut = self.register_server_inner(server, &channel);
pin!(send_fut);
tokio::pin!(send_fut);

let res = loop {
select! {
tokio::select! {
r = evt_loop.poll() => {
if let TransportPollResult::Closed(e) = r {
return Err(e);
Expand Down
6 changes: 2 additions & 4 deletions lib/src/client/session/connect.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::sync::Arc;

use tokio::{pin, select};

use crate::{
client::transport::{SecureChannelEventLoop, TransportPollResult},
types::{NodeId, StatusCode},
Expand Down Expand Up @@ -42,10 +40,10 @@ impl SessionConnector {
let mut event_loop = self.inner.channel.connect_no_retry().await?;

let activate_fut = self.ensure_and_activate_session();
pin!(activate_fut);
tokio::pin!(activate_fut);

let res = loop {
select! {
tokio::select! {
r = event_loop.poll() => {
if let TransportPollResult::Closed(c) = r {
return Err(c);
Expand Down
25 changes: 13 additions & 12 deletions lib/src/client/session/event_loop.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use std::sync::Arc;

use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use tokio::{
sync::watch,
time::{interval, sleep_until, Duration, Instant, Interval, MissedTickBehavior},
};

use crate::{
client::{
Expand Down Expand Up @@ -57,23 +58,23 @@ enum SessionEventLoopState {
#[must_use = "The session event loop must be started for the session to work"]
pub struct SessionEventLoop {
inner: Arc<Session>,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
retry: SessionRetryPolicy,
keep_alive_interval: Duration,
trigger_publish_rx: watch::Receiver<Instant>,
}

impl SessionEventLoop {
pub(crate) fn new(
inner: Arc<Session>,
retry: SessionRetryPolicy,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
keep_alive_interval: Duration,
trigger_publish_rx: watch::Receiver<Instant>,
) -> Self {
Self {
inner,
retry,
trigger_publish_recv,
keep_alive_interval,
trigger_publish_rx,
}
}

Expand Down Expand Up @@ -183,7 +184,7 @@ impl SessionEventLoop {
))
}
SessionEventLoopState::Connecting(connector, mut backoff, next_try) => {
tokio::time::sleep_until(next_try.into()).await;
sleep_until(next_try).await;

match connector.try_connect().await {
Ok((channel, result)) => {
Expand All @@ -200,7 +201,7 @@ impl SessionEventLoop {
.boxed(),
SubscriptionEventLoop::new(
slf.inner.clone(),
slf.trigger_publish_recv.clone(),
slf.trigger_publish_rx.clone(),
)
.run()
.boxed(),
Expand Down Expand Up @@ -245,13 +246,13 @@ enum SessionTickEvent {
}

struct SessionIntervals {
keep_alive: tokio::time::Interval,
keep_alive: Interval,
}

impl SessionIntervals {
pub fn new(keep_alive_interval: Duration) -> Self {
let mut keep_alive = tokio::time::interval(keep_alive_interval);
keep_alive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut keep_alive = interval(keep_alive_interval);
keep_alive.set_missed_tick_behavior(MissedTickBehavior::Skip);

Self { keep_alive }
}
Expand Down
2 changes: 2 additions & 0 deletions lib/src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ mod client;
mod connect;
mod event_loop;
mod services;

#[expect(clippy::module_inception)]
mod session;

/// Information about the server endpoint, security policy, security mode and user identity that the session will
Expand Down
Loading
Loading