From 1973a5772d9fa864a3f1b980f46626e5f434a9b2 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 12 Nov 2024 19:20:23 +0200 Subject: [PATCH 1/9] Add refreshed_at, created_at --- Cargo.lock | 52 ++++++++++++++-------- Cargo.toml | 4 +- mobile_config/src/client/gateway_client.rs | 1 + mobile_config/src/gateway_info.rs | 26 +++++++++++ 4 files changed, 63 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74087a040..4abec4726 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,11 +1615,11 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#0452523b68781b85ea2aba2d1c06edabd5898159" +source = "git+https://github.com/helium/proto?branch=gateway_info_modified#5c323895982fe75b6179719317c24700ef238457" dependencies = [ "base64 0.21.7", "byteorder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "prost", "rand 0.8.5", "rand_chacha 0.3.0", @@ -1788,7 +1788,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "http 0.2.11", "http-serde", "humantime-serde", @@ -2630,7 +2630,7 @@ dependencies = [ "axum 0.7.4", "bs58 0.4.0", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "http 0.2.11", "notify", "serde", @@ -3212,7 +3212,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "hex-literal", "http 0.2.11", "lazy_static", @@ -3794,7 +3794,7 @@ dependencies = [ "h3o", "helium-anchor-gen", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=master)", "hex", "hex-literal", "itertools", @@ -3818,6 +3818,22 @@ dependencies = [ "url", ] +[[package]] +name = "helium-proto" +version = "0.1.0" +source = "git+https://github.com/helium/proto?branch=gateway_info_modified#5c323895982fe75b6179719317c24700ef238457" +dependencies = [ + "bytes", + "prost", + "prost-build", + "serde", + "serde_json", + "strum", + "strum_macros", + "tonic", + "tonic-build", +] + [[package]] name = "helium-proto" version = "0.1.0" @@ -3875,7 +3891,7 @@ dependencies = [ "async-trait", "chrono", "derive_builder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "hextree", "rust_decimal", "rust_decimal_macros", @@ -4291,7 +4307,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "http 0.2.11", "humantime-serde", "metrics", @@ -4360,7 +4376,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "hextree", "http 0.2.11", "http-serde", @@ -4402,7 +4418,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "http 0.2.11", "http-serde", "humantime-serde", @@ -4444,7 +4460,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "http-serde", "humantime-serde", "iot-config", @@ -5032,7 +5048,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "hextree", "http 0.2.11", "http-serde", @@ -5072,7 +5088,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "mobile-config", "prost", "rand 0.8.5", @@ -5108,7 +5124,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "http 0.2.11", "http-serde", "humantime-serde", @@ -5152,7 +5168,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "hex-assignments", "hextree", "http-serde", @@ -5836,7 +5852,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "http 0.2.11", "hyper 0.14.28", "jsonrpsee", @@ -5919,7 +5935,7 @@ dependencies = [ "futures-util", "helium-anchor-gen", "helium-lib", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -6558,7 +6574,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", "humantime-serde", "lazy_static", "metrics", diff --git a/Cargo.toml b/Cargo.toml index 7388ff0ad..c6656b1ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,10 +70,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", ] } -helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "gateway_info_modified", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "gateway_info_modified" } solana-client = "1.18" solana-sdk = "1.18" solana-program = "1.18" diff --git a/mobile_config/src/client/gateway_client.rs b/mobile_config/src/client/gateway_client.rs index a1a4a381f..7c028d652 100644 --- a/mobile_config/src/client/gateway_client.rs +++ b/mobile_config/src/client/gateway_client.rs @@ -107,6 +107,7 @@ impl GatewayInfoResolver for GatewayClient { let mut req = mobile_config::GatewayInfoStreamReqV1 { batch_size: self.batch_size, device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(), + min_refreshed_at: 0, signer: self.signing_key.public_key().into(), signature: vec![], }; diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index ada624a81..f57568c08 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use futures::stream::BoxStream; use helium_crypto::PublicKeyBinary; use helium_proto::services::mobile_config::{ @@ -17,6 +18,8 @@ pub struct GatewayInfo { pub address: PublicKeyBinary, pub metadata: Option, pub device_type: DeviceType, + pub refreshed_at: DateTime, + pub created_at: DateTime, } impl GatewayInfo { @@ -38,10 +41,17 @@ impl TryFrom for GatewayInfo { None }; let device_type = info.device_type().into(); + + // TODO remove unwraps + let created_at = DateTime::::from_timestamp(info.created_at as i64, 0).unwrap(); + let refreshed_at = DateTime::::from_timestamp(info.refreshed_at as i64, 0).unwrap(); + Ok(Self { address: info.address.into(), metadata, device_type, + created_at, + refreshed_at, }) } } @@ -61,6 +71,8 @@ impl TryFrom for GatewayInfoProto { address: info.address.into(), metadata, device_type: info.device_type as i32, + created_at: info.created_at.timestamp() as u64, + refreshed_at: info.created_at.timestamp() as u64, }) } } @@ -115,6 +127,7 @@ impl std::str::FromStr for DeviceType { pub(crate) mod db { use super::{DeviceType, GatewayInfo, GatewayMetadata}; + use chrono::{DateTime, Utc}; use futures::stream::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; use sqlx::{types::Json, PgExecutor, Row}; @@ -200,6 +213,17 @@ pub(crate) mod db { .as_ref(), ) .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; + let created_at = row.get::("created_at"); + // `refreshed_at` can be NULL in the database schema. + // If so, fallback to using `created_at` as the default value of `refreshed_at`. + let refreshed_at = row + .get::, &str>("refreshed_at") + .unwrap_or(created_at); + + // TODO remove unwraps + let created_at = DateTime::::from_timestamp(created_at, 0).unwrap(); + let refreshed_at = DateTime::::from_timestamp(refreshed_at, 0).unwrap(); + Ok(Self { address: PublicKeyBinary::from_str( &bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(), @@ -207,6 +231,8 @@ pub(crate) mod db { .map_err(|err| sqlx::Error::Decode(Box::new(err)))?, metadata, device_type, + refreshed_at, + created_at, }) } } From 73048e411d60d030f7432ce602c6dbcc8060af92 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 12 Nov 2024 19:28:21 +0200 Subject: [PATCH 2/9] Fix clippy --- mobile_config/tests/gateway_service.rs | 1 + mobile_verifier/tests/integrations/speedtests.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index c1eb71dd3..07e22dd61 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -266,6 +266,7 @@ fn make_gateway_stream_signed_req( signer: signer.public_key().to_vec(), signature: vec![], device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(), + min_refreshed_at: 0, }; req.signature = signer.sign(&req.encode_to_vec()).unwrap(); diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index 5c6e9b9e8..91ad8cde3 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -34,6 +34,8 @@ impl GatewayInfoResolver for MockGatewayInfoResolver { address: address.clone(), metadata: None, device_type: DeviceType::Cbrs, + refreshed_at: Utc::now(), + created_at: Utc::now(), })) } From 1cded42ac18981747c3db0dc1d543ca3a02a5d4a Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 12 Nov 2024 21:00:30 +0200 Subject: [PATCH 3/9] Fix test --- mobile_config/src/gateway_info.rs | 12 +++++------- mobile_config/src/gateway_service.rs | 6 +++++- mobile_config/tests/gateway_service.rs | 4 +++- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index f57568c08..c5fed2a03 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -134,7 +134,8 @@ pub(crate) mod db { use std::str::FromStr; const GET_METADATA_SQL: &str = r#" - select kta.entity_key, infos.location::bigint, infos.device_type + select kta.entity_key, infos.location::bigint, infos.device_type, + infos.refreshed_at, infos.created_at from mobile_hotspot_infos infos join key_to_assets kta on infos.asset = kta.asset "#; @@ -179,6 +180,7 @@ pub(crate) mod db { pub fn all_info_stream<'a>( db: impl PgExecutor<'a> + 'a, device_types: &'a [DeviceType], + _min_refreshed_at: i64, // TODO ) -> impl Stream + 'a { match device_types.is_empty() { true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL) @@ -213,17 +215,13 @@ pub(crate) mod db { .as_ref(), ) .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; - let created_at = row.get::("created_at"); + let created_at = row.get::, &str>("created_at"); // `refreshed_at` can be NULL in the database schema. // If so, fallback to using `created_at` as the default value of `refreshed_at`. let refreshed_at = row - .get::, &str>("refreshed_at") + .get::>, &str>("refreshed_at") .unwrap_or(created_at); - // TODO remove unwraps - let created_at = DateTime::::from_timestamp(created_at, 0).unwrap(); - let refreshed_at = DateTime::::from_timestamp(refreshed_at, 0).unwrap(); - Ok(Self { address: PublicKeyBinary::from_str( &bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(), diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 90f1a11f9..1bc518f8c 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -170,7 +170,11 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = gateway_info::db::all_info_stream(&pool, &device_types); + let stream = gateway_info::db::all_info_stream( + &pool, + &device_types, + request.min_refreshed_at as i64, + ); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await }); diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 07e22dd61..9bddbfe28 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -235,7 +235,9 @@ async fn create_db_tables(pool: &PgPool) { CREATE TABLE mobile_hotspot_infos ( asset character varying(255) NULL, location numeric NULL, - device_type jsonb NOT NULL + device_type jsonb NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + refreshed_at timestamptz );"#, ) .execute(pool) From 66917289b195bdbc5b885f676e21daeda31597fe Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 12 Nov 2024 21:38:27 +0200 Subject: [PATCH 4/9] Fix test --- mobile_config/tests/gateway_service.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 9bddbfe28..6a7a00588 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,3 +1,4 @@ +use chrono::Utc; use futures::stream::StreamExt; use helium_crypto::{KeyTag, Keypair, PublicKey, PublicKeyBinary, Sign}; @@ -200,14 +201,15 @@ async fn add_mobile_hotspot_infos(pool: &PgPool, asset: &str, location: i64, dev sqlx::query( r#" INSERT INTO -"mobile_hotspot_infos" ("asset", "location", "device_type") +"mobile_hotspot_infos" ("asset", "location", "device_type", "refreshed_at") VALUES -($1, $2, $3::jsonb); +($1, $2, $3::jsonb, $4); "#, ) .bind(asset) .bind(location) .bind(device_type) + .bind(Utc::now()) .execute(pool) .await .unwrap(); From e1bf9f4e3cc28d408d3c72f72ae3fb95f61fa06c Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 12 Nov 2024 21:48:15 +0200 Subject: [PATCH 5/9] Handle min_refreshed_at argument --- mobile_config/src/gateway_info.rs | 13 +++++++++---- mobile_config/src/gateway_service.rs | 12 ++++++------ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index c5fed2a03..94c12c190 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -140,11 +140,14 @@ pub(crate) mod db { join key_to_assets kta on infos.asset = kta.asset "#; const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) "; - const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; + const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) "; lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); - static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}"); + static ref GET_METADATA_SQL_REFRESHED_AT: String = format!("{GET_METADATA_SQL} where infos.refreshed_at > $1"); + + static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET); + } pub async fn get_info( @@ -180,14 +183,16 @@ pub(crate) mod db { pub fn all_info_stream<'a>( db: impl PgExecutor<'a> + 'a, device_types: &'a [DeviceType], - _min_refreshed_at: i64, // TODO + min_refreshed_at: DateTime, ) -> impl Stream + 'a { match device_types.is_empty() { - true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL) + true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT) + .bind(min_refreshed_at) .fetch(db) .filter_map(|metadata| async move { metadata.ok() }) .boxed(), false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL) + .bind(min_refreshed_at) .bind( device_types .iter() diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 1bc518f8c..0aaa52bd1 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -3,7 +3,7 @@ use crate::{ key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; -use chrono::Utc; +use chrono::{DateTime, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ stream::{Stream, StreamExt, TryStreamExt}, @@ -163,6 +163,10 @@ impl mobile_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(100); let device_types: Vec = request.device_types().map(|v| v.into()).collect(); + let min_refreshed_at = DateTime::::from_timestamp(request.min_refreshed_at as i64, 0) + .ok_or(Status::invalid_argument( + "Invalid min_refreshed_at argument", + ))?; tracing::debug!( "fetching all gateways' info. Device types: {:?} ", @@ -170,11 +174,7 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = gateway_info::db::all_info_stream( - &pool, - &device_types, - request.min_refreshed_at as i64, - ); + let stream = gateway_info::db::all_info_stream(&pool, &device_types, min_refreshed_at); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await }); From 3454d5d850a5008e2ca7d7b84c16e202b334d01c Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 12 Nov 2024 22:34:47 +0200 Subject: [PATCH 6/9] Handle error in GatewayInfoProto converting --- mobile_config/src/client/mod.rs | 2 ++ mobile_config/src/gateway_info.rs | 27 ++++++++++++++++++++++----- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/mobile_config/src/client/mod.rs b/mobile_config/src/client/mod.rs index f9ecd7d18..723bdda42 100644 --- a/mobile_config/src/client/mod.rs +++ b/mobile_config/src/client/mod.rs @@ -27,6 +27,8 @@ pub enum ClientError { LocationParseError(#[from] std::num::ParseIntError), #[error("unknown service provider {0}")] UnknownServiceProvider(String), + #[error("Invalid GatewayInfo proto response {0}")] + InvalidGatewayInfoProto(#[from] crate::gateway_info::GatewayInfoProtoParseError), } macro_rules! call_with_retry { diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 94c12c190..6ce6eb7c7 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -28,23 +28,40 @@ impl GatewayInfo { } } +#[derive(thiserror::Error, Debug)] +pub enum GatewayInfoProtoParseError { + #[error("Invalid location string: {0}")] + InvalidLocation(String), + #[error("Invalid created_at: {0}")] + InvalidCreatedAt(u64), + #[error("Invalid refreshed_at: {0}")] + InvalidRefreshedAt(u64), +} + impl TryFrom for GatewayInfo { - type Error = std::num::ParseIntError; + type Error = GatewayInfoProtoParseError; fn try_from(info: GatewayInfoProto) -> Result { let metadata = if let Some(ref metadata) = info.metadata { Some( u64::from_str_radix(&metadata.location, 16) - .map(|location| GatewayMetadata { location })?, + .map(|location| GatewayMetadata { location }) + .map_err(|_| { + GatewayInfoProtoParseError::InvalidLocation(metadata.location.clone()) + })?, ) } else { None }; let device_type = info.device_type().into(); - // TODO remove unwraps - let created_at = DateTime::::from_timestamp(info.created_at as i64, 0).unwrap(); - let refreshed_at = DateTime::::from_timestamp(info.refreshed_at as i64, 0).unwrap(); + let created_at = DateTime::::from_timestamp(info.created_at as i64, 0).ok_or( + GatewayInfoProtoParseError::InvalidCreatedAt(info.created_at), + )?; + + let refreshed_at = DateTime::::from_timestamp(info.refreshed_at as i64, 0).ok_or( + GatewayInfoProtoParseError::InvalidRefreshedAt(info.refreshed_at), + )?; Ok(Self { address: info.address.into(), From 58f0b9e4c3696efe60945131c068153596be06d1 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 13 Nov 2024 14:08:49 +0200 Subject: [PATCH 7/9] Handle refreshed_at NULL case --- mobile_config/src/gateway_info.rs | 3 +- mobile_config/tests/gateway_service.rs | 190 +++++++++++++++++++++---- 2 files changed, 167 insertions(+), 26 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 6ce6eb7c7..b0327b336 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -161,7 +161,8 @@ pub(crate) mod db { lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); - static ref GET_METADATA_SQL_REFRESHED_AT: String = format!("{GET_METADATA_SQL} where infos.refreshed_at > $1"); + static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL} + where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#); static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET); diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 6a7a00588..0ffaa2cc5 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,4 +1,4 @@ -use chrono::Utc; +use chrono::{DateTime, Utc}; use futures::stream::StreamExt; use helium_crypto::{KeyTag, Keypair, PublicKey, PublicKeyBinary, Sign}; @@ -83,16 +83,151 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { Ok(()) } +async fn spawn_gateway_service( + pool: PgPool, + admin_pub_key: PublicKey, +) -> ( + String, + tokio::task::JoinHandle>, +) { + let server_key = make_keypair(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Start the gateway server + let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); + let (_key_cache_tx, key_cache) = KeyCache::new(keys); + let gws = GatewayService::new(key_cache, pool, server_key); + let handle = tokio::spawn( + transport::Server::builder() + .add_service(proto::GatewayServer::new(gws)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), + ); + + (format!("http://{addr}"), handle) +} + +#[sqlx::test] +async fn gateway_stream_info_refreshed_at(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_plus_10 = now + chrono::Duration::seconds(10); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + ) + .await; + add_db_record( + &pool, + "asset2", + asset2_hex_idx, + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_plus_10, + Some(now_plus_10), + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + // Regression test + let req = make_gateway_stream_signed_req(&admin_key, &[], 0); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); + + // No device types but filter by refreshed_at + let req = make_gateway_stream_signed_req(&admin_key, &[], now_plus_10.timestamp() as u64); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + assert_eq!( + resp.gateways.first().unwrap().device_type, + Into::::into(DeviceType::WifiDataOnly) + ); + + // No refreshed_at but filter by device_type + let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiIndoor], 0); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + assert_eq!( + resp.gateways.first().unwrap().device_type, + Into::::into(DeviceType::WifiIndoor) + ); + + // Filter by device_type and refreshed_at + let req = make_gateway_stream_signed_req( + &admin_key, + &[DeviceType::WifiIndoor], + now.timestamp() as u64, + ); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + assert_eq!( + resp.gateways.first().unwrap().device_type, + Into::::into(DeviceType::WifiIndoor) + ); +} + +#[sqlx::test] +async fn gateway_stream_info_refreshed_at_is_null(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let now = Utc::now(); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + None, + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req(&admin_key, &[], now.timestamp() as u64); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + + // Make sure the gateway was returned + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let req = make_gateway_stream_signed_req(&admin_key, &[], (now.timestamp() + 1) as u64); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + // Response is empty + assert!(stream.next().await.is_none()); +} + #[sqlx::test] async fn gateway_stream_info_data_types(pool: PgPool) { let admin_key = make_keypair(); - let server_key = make_keypair(); let asset1_pubkey = make_keypair().public_key().clone(); let asset1_hex_idx = 631711281837647359_i64; let asset2_hex_idx = 631711286145955327_i64; let asset3_hex_idx = 631711286145006591_i64; let asset2_pubkey = make_keypair().public_key().clone(); let asset3_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); create_db_tables(&pool).await; add_db_record( @@ -101,6 +236,8 @@ async fn gateway_stream_info_data_types(pool: PgPool) { asset1_hex_idx, "\"wifiIndoor\"", asset1_pubkey.clone().into(), + now, + Some(now), ) .await; add_db_record( @@ -109,6 +246,8 @@ async fn gateway_stream_info_data_types(pool: PgPool) { asset2_hex_idx, "\"wifiDataOnly\"", asset2_pubkey.clone().into(), + now, + Some(now), ) .await; add_db_record( @@ -117,27 +256,17 @@ async fn gateway_stream_info_data_types(pool: PgPool) { asset3_hex_idx, "\"wifiDataOnly\"", asset3_pubkey.clone().into(), + now, + Some(now), ) .await; - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; - // Start the gateway server - let keys = CacheKeys::from_iter([(admin_key.public_key().to_owned(), KeyRole::Administrator)]); - let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key); - let _handle = tokio::spawn( - transport::Server::builder() - .add_service(proto::GatewayServer::new(gws)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), - ); - let mut client = GatewayClient::connect(format!("http://{addr}")) - .await - .unwrap(); + let mut client = GatewayClient::connect(addr).await.unwrap(); // Check wifi indoor - let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiIndoor]); + let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiIndoor], 0); let mut stream = client.info_stream(req).await.unwrap().into_inner(); let res = stream.next().await.unwrap().unwrap(); let gw_info = res.gateways.first().unwrap(); @@ -154,7 +283,7 @@ async fn gateway_stream_info_data_types(pool: PgPool) { assert!(stream.next().await.is_none()); // Check wifi data only - let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiDataOnly]); + let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiDataOnly], 0); let stream = client.info_stream(req).await.unwrap().into_inner(); let resp = stream @@ -175,7 +304,7 @@ async fn gateway_stream_info_data_types(pool: PgPool) { ); // Check all - let req = make_gateway_stream_signed_req(&admin_key, &[]); + let req = make_gateway_stream_signed_req(&admin_key, &[], 0); let stream = client.info_stream(req).await.unwrap().into_inner(); let resp = stream @@ -192,24 +321,34 @@ async fn add_db_record( location: i64, device_type: &str, key: PublicKeyBinary, + created_at: DateTime, + refreshed_at: Option>, ) { - add_mobile_hotspot_infos(pool, asset, location, device_type).await; + add_mobile_hotspot_infos(pool, asset, location, device_type, created_at, refreshed_at).await; add_asset_key(pool, asset, key).await; } -async fn add_mobile_hotspot_infos(pool: &PgPool, asset: &str, location: i64, device_type: &str) { +async fn add_mobile_hotspot_infos( + pool: &PgPool, + asset: &str, + location: i64, + device_type: &str, + created_at: DateTime, + refreshed_at: Option>, +) { sqlx::query( r#" INSERT INTO -"mobile_hotspot_infos" ("asset", "location", "device_type", "refreshed_at") +"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at") VALUES -($1, $2, $3::jsonb, $4); +($1, $2, $3::jsonb, $4, $5); "#, ) .bind(asset) .bind(location) .bind(device_type) - .bind(Utc::now()) + .bind(created_at) + .bind(refreshed_at) .execute(pool) .await .unwrap(); @@ -264,13 +403,14 @@ fn make_keypair() -> Keypair { fn make_gateway_stream_signed_req( signer: &Keypair, device_types: &[DeviceType], + min_refreshed_at: u64, ) -> proto::GatewayInfoStreamReqV1 { let mut req = GatewayInfoStreamReqV1 { batch_size: 10000, signer: signer.public_key().to_vec(), signature: vec![], device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(), - min_refreshed_at: 0, + min_refreshed_at, }; req.signature = signer.sign(&req.encode_to_vec()).unwrap(); From 7f71e36cc7a339a97b3d8aa18b692d3b1f82c849 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 14 Nov 2024 10:36:36 +0200 Subject: [PATCH 8/9] Fix review comments --- mobile_config/src/gateway_info.rs | 27 +++++++++++++++------------ mobile_config/src/gateway_service.rs | 6 ++++-- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index b0327b336..30a49b9e4 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use futures::stream::BoxStream; use helium_crypto::PublicKeyBinary; use helium_proto::services::mobile_config::{ @@ -31,7 +31,7 @@ impl GatewayInfo { #[derive(thiserror::Error, Debug)] pub enum GatewayInfoProtoParseError { #[error("Invalid location string: {0}")] - InvalidLocation(String), + InvalidLocation(#[from] std::num::ParseIntError), #[error("Invalid created_at: {0}")] InvalidCreatedAt(u64), #[error("Invalid refreshed_at: {0}")] @@ -45,23 +45,26 @@ impl TryFrom for GatewayInfo { let metadata = if let Some(ref metadata) = info.metadata { Some( u64::from_str_radix(&metadata.location, 16) - .map(|location| GatewayMetadata { location }) - .map_err(|_| { - GatewayInfoProtoParseError::InvalidLocation(metadata.location.clone()) - })?, + .map(|location| GatewayMetadata { location })?, ) } else { None }; let device_type = info.device_type().into(); - let created_at = DateTime::::from_timestamp(info.created_at as i64, 0).ok_or( - GatewayInfoProtoParseError::InvalidCreatedAt(info.created_at), - )?; + let created_at = Utc + .timestamp_opt(info.created_at as i64, 0) + .single() + .ok_or(GatewayInfoProtoParseError::InvalidCreatedAt( + info.created_at, + ))?; - let refreshed_at = DateTime::::from_timestamp(info.refreshed_at as i64, 0).ok_or( - GatewayInfoProtoParseError::InvalidRefreshedAt(info.refreshed_at), - )?; + let refreshed_at = Utc + .timestamp_opt(info.refreshed_at as i64, 0) + .single() + .ok_or(GatewayInfoProtoParseError::InvalidRefreshedAt( + info.refreshed_at, + ))?; Ok(Self { address: info.address.into(), diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 0aaa52bd1..9932f4341 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -3,7 +3,7 @@ use crate::{ key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; -use chrono::{DateTime, Utc}; +use chrono::{TimeZone, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ stream::{Stream, StreamExt, TryStreamExt}, @@ -163,7 +163,9 @@ impl mobile_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(100); let device_types: Vec = request.device_types().map(|v| v.into()).collect(); - let min_refreshed_at = DateTime::::from_timestamp(request.min_refreshed_at as i64, 0) + let min_refreshed_at = Utc + .timestamp_opt(request.min_refreshed_at as i64, 0) + .single() .ok_or(Status::invalid_argument( "Invalid min_refreshed_at argument", ))?; From 56b1198a22a75eaf6771c4e81100bce7da8f22aa Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 14 Nov 2024 16:35:25 +0200 Subject: [PATCH 9/9] Use helium-proto master --- Cargo.lock | 54 +++++++++++++++++++----------------------------------- Cargo.toml | 4 ++-- 2 files changed, 21 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4abec4726..fbcebc3ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,11 +1615,11 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=gateway_info_modified#5c323895982fe75b6179719317c24700ef238457" +source = "git+https://github.com/helium/proto?branch=master#ad4db785573778069b559f916b9329ab40854700" dependencies = [ "base64 0.21.7", "byteorder", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "prost", "rand 0.8.5", "rand_chacha 0.3.0", @@ -1788,7 +1788,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "http 0.2.11", "http-serde", "humantime-serde", @@ -2630,7 +2630,7 @@ dependencies = [ "axum 0.7.4", "bs58 0.4.0", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "http 0.2.11", "notify", "serde", @@ -3212,7 +3212,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "hex-literal", "http 0.2.11", "lazy_static", @@ -3794,7 +3794,7 @@ dependencies = [ "h3o", "helium-anchor-gen", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=master)", + "helium-proto", "hex", "hex-literal", "itertools", @@ -3821,23 +3821,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=gateway_info_modified#5c323895982fe75b6179719317c24700ef238457" -dependencies = [ - "bytes", - "prost", - "prost-build", - "serde", - "serde_json", - "strum", - "strum_macros", - "tonic", - "tonic-build", -] - -[[package]] -name = "helium-proto" -version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#0452523b68781b85ea2aba2d1c06edabd5898159" +source = "git+https://github.com/helium/proto?branch=master#ad4db785573778069b559f916b9329ab40854700" dependencies = [ "bytes", "prost", @@ -3891,7 +3875,7 @@ dependencies = [ "async-trait", "chrono", "derive_builder", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "hextree", "rust_decimal", "rust_decimal_macros", @@ -4307,7 +4291,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "http 0.2.11", "humantime-serde", "metrics", @@ -4376,7 +4360,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "hextree", "http 0.2.11", "http-serde", @@ -4418,7 +4402,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "http 0.2.11", "http-serde", "humantime-serde", @@ -4460,7 +4444,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "http-serde", "humantime-serde", "iot-config", @@ -5048,7 +5032,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "hextree", "http 0.2.11", "http-serde", @@ -5088,7 +5072,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "mobile-config", "prost", "rand 0.8.5", @@ -5124,7 +5108,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "http 0.2.11", "http-serde", "humantime-serde", @@ -5168,7 +5152,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "hex-assignments", "hextree", "http-serde", @@ -5852,7 +5836,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "http 0.2.11", "hyper 0.14.28", "jsonrpsee", @@ -5935,7 +5919,7 @@ dependencies = [ "futures-util", "helium-anchor-gen", "helium-lib", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -6574,7 +6558,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=gateway_info_modified)", + "helium-proto", "humantime-serde", "lazy_static", "metrics", diff --git a/Cargo.toml b/Cargo.toml index c6656b1ca..7388ff0ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,10 +70,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", ] } -helium-proto = { git = "https://github.com/helium/proto", branch = "gateway_info_modified", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "gateway_info_modified" } +beacon = { git = "https://github.com/helium/proto", branch = "master" } solana-client = "1.18" solana-sdk = "1.18" solana-program = "1.18"