Skip to content

Commit

Permalink
Handle min_refreshed_at argument
Browse files Browse the repository at this point in the history
  • Loading branch information
kurotych committed Nov 12, 2024
1 parent 6691728 commit e1bf9f4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
13 changes: 9 additions & 4 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,14 @@ pub(crate) mod db {
join key_to_assets kta on infos.asset = kta.asset
"#;
const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) ";
const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) ";
const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) ";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}");
static ref GET_METADATA_SQL_REFRESHED_AT: String = format!("{GET_METADATA_SQL} where infos.refreshed_at > $1");

static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET);

}

pub async fn get_info(
Expand Down Expand Up @@ -180,14 +183,16 @@ pub(crate) mod db {
pub fn all_info_stream<'a>(
db: impl PgExecutor<'a> + 'a,
device_types: &'a [DeviceType],
_min_refreshed_at: i64, // TODO
min_refreshed_at: DateTime<Utc>,
) -> impl Stream<Item = GatewayInfo> + 'a {
match device_types.is_empty() {
true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL)
true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT)
.bind(min_refreshed_at)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed(),
false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL)
.bind(min_refreshed_at)
.bind(
device_types
.iter()
Expand Down
12 changes: 6 additions & 6 deletions mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
key_cache::KeyCache,
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
};
use chrono::Utc;
use chrono::{DateTime, Utc};
use file_store::traits::{MsgVerify, TimestampEncode};
use futures::{
stream::{Stream, StreamExt, TryStreamExt},
Expand Down Expand Up @@ -163,18 +163,18 @@ impl mobile_config::Gateway for GatewayService {
let (tx, rx) = tokio::sync::mpsc::channel(100);

let device_types: Vec<DeviceType> = request.device_types().map(|v| v.into()).collect();
let min_refreshed_at = DateTime::<Utc>::from_timestamp(request.min_refreshed_at as i64, 0)
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;

tracing::debug!(
"fetching all gateways' info. Device types: {:?} ",
device_types
);

tokio::spawn(async move {
let stream = gateway_info::db::all_info_stream(
&pool,
&device_types,
request.min_refreshed_at as i64,
);
let stream = gateway_info::db::all_info_stream(&pool, &device_types, min_refreshed_at);
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Expand Down

0 comments on commit e1bf9f4

Please sign in to comment.