From e1bf9f4e3cc28d408d3c72f72ae3fb95f61fa06c Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 12 Nov 2024 21:48:15 +0200 Subject: [PATCH] Handle min_refreshed_at argument --- mobile_config/src/gateway_info.rs | 13 +++++++++---- mobile_config/src/gateway_service.rs | 12 ++++++------ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index c5fed2a03..94c12c190 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -140,11 +140,14 @@ pub(crate) mod db { join key_to_assets kta on infos.asset = kta.asset "#; const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) "; - const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) "; + const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) "; lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); - static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}"); + static ref GET_METADATA_SQL_REFRESHED_AT: String = format!("{GET_METADATA_SQL} where infos.refreshed_at > $1"); + + static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET); + } pub async fn get_info( @@ -180,14 +183,16 @@ pub(crate) mod db { pub fn all_info_stream<'a>( db: impl PgExecutor<'a> + 'a, device_types: &'a [DeviceType], - _min_refreshed_at: i64, // TODO + min_refreshed_at: DateTime, ) -> impl Stream + 'a { match device_types.is_empty() { - true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL) + true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT) + .bind(min_refreshed_at) .fetch(db) .filter_map(|metadata| async move { metadata.ok() }) .boxed(), false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL) + .bind(min_refreshed_at) .bind( device_types .iter() diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 1bc518f8c..0aaa52bd1 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -3,7 +3,7 @@ use crate::{ key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; -use chrono::Utc; +use chrono::{DateTime, Utc}; use file_store::traits::{MsgVerify, TimestampEncode}; use futures::{ stream::{Stream, StreamExt, TryStreamExt}, @@ -163,6 +163,10 @@ impl mobile_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(100); let device_types: Vec = request.device_types().map(|v| v.into()).collect(); + let min_refreshed_at = DateTime::::from_timestamp(request.min_refreshed_at as i64, 0) + .ok_or(Status::invalid_argument( + "Invalid min_refreshed_at argument", + ))?; tracing::debug!( "fetching all gateways' info. Device types: {:?} ", @@ -170,11 +174,7 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = gateway_info::db::all_info_stream( - &pool, - &device_types, - request.min_refreshed_at as i64, - ); + let stream = gateway_info::db::all_info_stream(&pool, &device_types, min_refreshed_at); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await });