diff --git a/Cargo.lock b/Cargo.lock index 74087a040..fbcebc3ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,7 +1615,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#0452523b68781b85ea2aba2d1c06edabd5898159" +source = "git+https://github.com/helium/proto?branch=master#ad4db785573778069b559f916b9329ab40854700" dependencies = [ "base64 0.21.7", "byteorder", @@ -3821,7 +3821,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#0452523b68781b85ea2aba2d1c06edabd5898159" +source = "git+https://github.com/helium/proto?branch=master#ad4db785573778069b559f916b9329ab40854700" dependencies = [ "bytes", "prost", diff --git a/mobile_config/src/client/gateway_client.rs b/mobile_config/src/client/gateway_client.rs index a1a4a381f..7c028d652 100644 --- a/mobile_config/src/client/gateway_client.rs +++ b/mobile_config/src/client/gateway_client.rs @@ -107,6 +107,7 @@ impl GatewayInfoResolver for GatewayClient { let mut req = mobile_config::GatewayInfoStreamReqV1 { batch_size: self.batch_size, device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(), + min_refreshed_at: 0, signer: self.signing_key.public_key().into(), signature: vec![], }; diff --git a/mobile_config/src/client/mod.rs b/mobile_config/src/client/mod.rs index f9ecd7d18..723bdda42 100644 --- a/mobile_config/src/client/mod.rs +++ b/mobile_config/src/client/mod.rs @@ -27,6 +27,8 @@ pub enum ClientError { LocationParseError(#[from] std::num::ParseIntError), #[error("unknown service provider {0}")] UnknownServiceProvider(String), + #[error("Invalid GatewayInfo proto response {0}")] + InvalidGatewayInfoProto(#[from] crate::gateway_info::GatewayInfoProtoParseError), } macro_rules! call_with_retry { diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index ada624a81..30a49b9e4 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, TimeZone, Utc}; use futures::stream::BoxStream; use helium_crypto::PublicKeyBinary; use helium_proto::services::mobile_config::{ @@ -17,6 +18,8 @@ pub struct GatewayInfo { pub address: PublicKeyBinary, pub metadata: Option, pub device_type: DeviceType, + pub refreshed_at: DateTime, + pub created_at: DateTime, } impl GatewayInfo { @@ -25,8 +28,18 @@ impl GatewayInfo { } } +#[derive(thiserror::Error, Debug)] +pub enum GatewayInfoProtoParseError { + #[error("Invalid location string: {0}")] + InvalidLocation(#[from] std::num::ParseIntError), + #[error("Invalid created_at: {0}")] + InvalidCreatedAt(u64), + #[error("Invalid refreshed_at: {0}")] + InvalidRefreshedAt(u64), +} + impl TryFrom for GatewayInfo { - type Error = std::num::ParseIntError; + type Error = GatewayInfoProtoParseError; fn try_from(info: GatewayInfoProto) -> Result { let metadata = if let Some(ref metadata) = info.metadata { @@ -38,10 +51,27 @@ impl TryFrom for GatewayInfo { None }; let device_type = info.device_type().into(); + + let created_at = Utc + .timestamp_opt(info.created_at as i64, 0) + .single() + .ok_or(GatewayInfoProtoParseError::InvalidCreatedAt( + info.created_at, + ))?; + + let refreshed_at = Utc + .timestamp_opt(info.refreshed_at as i64, 0) + .single() + .ok_or(GatewayInfoProtoParseError::InvalidRefreshedAt( + info.refreshed_at, + ))?; + Ok(Self { address: info.address.into(), metadata, device_type, + created_at, + refreshed_at, }) } } @@ -61,6 +91,8 @@ impl TryFrom for GatewayInfoProto { address: info.address.into(), metadata, device_type: info.device_type as i32, + created_at: info.created_at.timestamp() as u64, + refreshed_at: info.created_at.timestamp() as u64, }) } } @@ -115,22 +147,28 @@ impl std::str::FromStr for DeviceType { pub(crate) mod db { use super::{DeviceType, GatewayInfo, GatewayMetadata}; + use chrono::{DateTime, Utc}; use futures::stream::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; use sqlx::{types::Json, PgExecutor, Row}; use std::str::FromStr; const GET_METADATA_SQL: &str = r#" - select kta.entity_key, infos.location::bigint, infos.device_type + select kta.entity_key, infos.location::bigint, infos.device_type, + infos.refreshed_at, infos.created_at from mobile_hotspot_infos infos join key_to_assets kta on infos.asset = kta.asset "#; const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) "; - const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; + const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) "; lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); - static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}"); + static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL} + where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#); + + static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET); + } pub async fn get_info( @@ -166,13 +204,16 @@ pub(crate) mod db { pub fn all_info_stream<'a>( db: impl PgExecutor<'a> + 'a, device_types: &'a [DeviceType], + min_refreshed_at: DateTime, ) -> impl Stream + 'a { match device_types.is_empty() { - true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL) + true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT) + .bind(min_refreshed_at) .fetch(db) .filter_map(|metadata| async move { metadata.ok() }) .boxed(), false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL) + .bind(min_refreshed_at) .bind( device_types .iter() @@ -200,6 +241,13 @@ pub(crate) mod db { .as_ref(), ) .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; + let created_at = row.get::, &str>("created_at"); + // `refreshed_at` can be NULL in the database schema. + // If so, fallback to using `created_at` as the default value of `refreshed_at`. + let refreshed_at = row + .get::>, &str>("refreshed_at") + .unwrap_or(created_at); + Ok(Self { address: PublicKeyBinary::from_str( &bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(), @@ -207,6 +255,8 @@ pub(crate) mod db { .map_err(|err| sqlx::Error::Decode(Box::new(err)))?, metadata, device_type, + refreshed_at, + created_at, }) } } diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 90f1a11f9..9932f4341 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -3,7 +3,7 @@ use crate::{ key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; -use chrono::Utc; +use chrono::{TimeZone, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ stream::{Stream, StreamExt, TryStreamExt}, @@ -163,6 +163,12 @@ impl mobile_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(100); let device_types: Vec = request.device_types().map(|v| v.into()).collect(); + let min_refreshed_at = Utc + .timestamp_opt(request.min_refreshed_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_refreshed_at argument", + ))?; tracing::debug!( "fetching all gateways' info. Device types: {:?} ", @@ -170,7 +176,7 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = gateway_info::db::all_info_stream(&pool, &device_types); + let stream = gateway_info::db::all_info_stream(&pool, &device_types, min_refreshed_at); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await }); diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index c1eb71dd3..0ffaa2cc5 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use futures::stream::StreamExt; use helium_crypto::{KeyTag, Keypair, PublicKey, PublicKeyBinary, Sign}; @@ -82,16 +83,151 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { Ok(()) } +async fn spawn_gateway_service( + pool: PgPool, + admin_pub_key: PublicKey, +) -> ( + String, + tokio::task::JoinHandle>, +) { + let server_key = make_keypair(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Start the gateway server + let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]); + let (_key_cache_tx, key_cache) = KeyCache::new(keys); + let gws = GatewayService::new(key_cache, pool, server_key); + let handle = tokio::spawn( + transport::Server::builder() + .add_service(proto::GatewayServer::new(gws)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), + ); + + (format!("http://{addr}"), handle) +} + +#[sqlx::test] +async fn gateway_stream_info_refreshed_at(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let asset2_hex_idx = 631711286145955327_i64; + let asset2_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); + let now_plus_10 = now + chrono::Duration::seconds(10); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + ) + .await; + add_db_record( + &pool, + "asset2", + asset2_hex_idx, + "\"wifiDataOnly\"", + asset2_pubkey.clone().into(), + now_plus_10, + Some(now_plus_10), + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + // Regression test + let req = make_gateway_stream_signed_req(&admin_key, &[], 0); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 2); + + // No device types but filter by refreshed_at + let req = make_gateway_stream_signed_req(&admin_key, &[], now_plus_10.timestamp() as u64); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + assert_eq!( + resp.gateways.first().unwrap().device_type, + Into::::into(DeviceType::WifiDataOnly) + ); + + // No refreshed_at but filter by device_type + let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiIndoor], 0); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + assert_eq!( + resp.gateways.first().unwrap().device_type, + Into::::into(DeviceType::WifiIndoor) + ); + + // Filter by device_type and refreshed_at + let req = make_gateway_stream_signed_req( + &admin_key, + &[DeviceType::WifiIndoor], + now.timestamp() as u64, + ); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + assert_eq!( + resp.gateways.first().unwrap().device_type, + Into::::into(DeviceType::WifiIndoor) + ); +} + +#[sqlx::test] +async fn gateway_stream_info_refreshed_at_is_null(pool: PgPool) { + let admin_key = make_keypair(); + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + let now = Utc::now(); + + create_db_tables(&pool).await; + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + None, + ) + .await; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await.unwrap(); + + let req = make_gateway_stream_signed_req(&admin_key, &[], now.timestamp() as u64); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + + // Make sure the gateway was returned + let resp = stream.next().await.unwrap().unwrap(); + assert_eq!(resp.gateways.len(), 1); + + let req = make_gateway_stream_signed_req(&admin_key, &[], (now.timestamp() + 1) as u64); + let mut stream = client.info_stream(req).await.unwrap().into_inner(); + // Response is empty + assert!(stream.next().await.is_none()); +} + #[sqlx::test] async fn gateway_stream_info_data_types(pool: PgPool) { let admin_key = make_keypair(); - let server_key = make_keypair(); let asset1_pubkey = make_keypair().public_key().clone(); let asset1_hex_idx = 631711281837647359_i64; let asset2_hex_idx = 631711286145955327_i64; let asset3_hex_idx = 631711286145006591_i64; let asset2_pubkey = make_keypair().public_key().clone(); let asset3_pubkey = make_keypair().public_key().clone(); + let now = Utc::now(); create_db_tables(&pool).await; add_db_record( @@ -100,6 +236,8 @@ async fn gateway_stream_info_data_types(pool: PgPool) { asset1_hex_idx, "\"wifiIndoor\"", asset1_pubkey.clone().into(), + now, + Some(now), ) .await; add_db_record( @@ -108,6 +246,8 @@ async fn gateway_stream_info_data_types(pool: PgPool) { asset2_hex_idx, "\"wifiDataOnly\"", asset2_pubkey.clone().into(), + now, + Some(now), ) .await; add_db_record( @@ -116,27 +256,17 @@ async fn gateway_stream_info_data_types(pool: PgPool) { asset3_hex_idx, "\"wifiDataOnly\"", asset3_pubkey.clone().into(), + now, + Some(now), ) .await; - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; - // Start the gateway server - let keys = CacheKeys::from_iter([(admin_key.public_key().to_owned(), KeyRole::Administrator)]); - let (_key_cache_tx, key_cache) = KeyCache::new(keys); - let gws = GatewayService::new(key_cache, pool.clone(), server_key); - let _handle = tokio::spawn( - transport::Server::builder() - .add_service(proto::GatewayServer::new(gws)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), - ); - let mut client = GatewayClient::connect(format!("http://{addr}")) - .await - .unwrap(); + let mut client = GatewayClient::connect(addr).await.unwrap(); // Check wifi indoor - let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiIndoor]); + let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiIndoor], 0); let mut stream = client.info_stream(req).await.unwrap().into_inner(); let res = stream.next().await.unwrap().unwrap(); let gw_info = res.gateways.first().unwrap(); @@ -153,7 +283,7 @@ async fn gateway_stream_info_data_types(pool: PgPool) { assert!(stream.next().await.is_none()); // Check wifi data only - let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiDataOnly]); + let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiDataOnly], 0); let stream = client.info_stream(req).await.unwrap().into_inner(); let resp = stream @@ -174,7 +304,7 @@ async fn gateway_stream_info_data_types(pool: PgPool) { ); // Check all - let req = make_gateway_stream_signed_req(&admin_key, &[]); + let req = make_gateway_stream_signed_req(&admin_key, &[], 0); let stream = client.info_stream(req).await.unwrap().into_inner(); let resp = stream @@ -191,23 +321,34 @@ async fn add_db_record( location: i64, device_type: &str, key: PublicKeyBinary, + created_at: DateTime, + refreshed_at: Option>, ) { - add_mobile_hotspot_infos(pool, asset, location, device_type).await; + add_mobile_hotspot_infos(pool, asset, location, device_type, created_at, refreshed_at).await; add_asset_key(pool, asset, key).await; } -async fn add_mobile_hotspot_infos(pool: &PgPool, asset: &str, location: i64, device_type: &str) { +async fn add_mobile_hotspot_infos( + pool: &PgPool, + asset: &str, + location: i64, + device_type: &str, + created_at: DateTime, + refreshed_at: Option>, +) { sqlx::query( r#" INSERT INTO -"mobile_hotspot_infos" ("asset", "location", "device_type") +"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at") VALUES -($1, $2, $3::jsonb); +($1, $2, $3::jsonb, $4, $5); "#, ) .bind(asset) .bind(location) .bind(device_type) + .bind(created_at) + .bind(refreshed_at) .execute(pool) .await .unwrap(); @@ -235,7 +376,9 @@ async fn create_db_tables(pool: &PgPool) { CREATE TABLE mobile_hotspot_infos ( asset character varying(255) NULL, location numeric NULL, - device_type jsonb NOT NULL + device_type jsonb NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + refreshed_at timestamptz );"#, ) .execute(pool) @@ -260,12 +403,14 @@ fn make_keypair() -> Keypair { fn make_gateway_stream_signed_req( signer: &Keypair, device_types: &[DeviceType], + min_refreshed_at: u64, ) -> proto::GatewayInfoStreamReqV1 { let mut req = GatewayInfoStreamReqV1 { batch_size: 10000, signer: signer.public_key().to_vec(), signature: vec![], device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(), + min_refreshed_at, }; req.signature = signer.sign(&req.encode_to_vec()).unwrap(); diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index 5c6e9b9e8..91ad8cde3 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -34,6 +34,8 @@ impl GatewayInfoResolver for MockGatewayInfoResolver { address: address.clone(), metadata: None, device_type: DeviceType::Cbrs, + refreshed_at: Utc::now(), + created_at: Utc::now(), })) }