Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add redis based reliability reporting #778

Open
wants to merge 48 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
a8b64b7
feat: Add database tracking and report for Push Reliability
jrconlin Sep 23, 2024
dc1763a
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Sep 25, 2024
14bd4fb
f lint
jrconlin Sep 25, 2024
e914b14
f remove extra from pending pr
jrconlin Sep 25, 2024
4f25172
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 2, 2024
e9ff9f5
f r's
jrconlin Oct 2, 2024
c9a3512
f r's
jrconlin Oct 2, 2024
f8c7ee9
f lint
jrconlin Oct 2, 2024
1dd624f
f post test
jrconlin Oct 8, 2024
05e73cc
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 8, 2024
8291039
f r's
jrconlin Oct 9, 2024
ed67eb4
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 11, 2024
6c2c7d4
feat: Add redis based reliability reporting
jrconlin Oct 8, 2024
3a0beef
f isort
jrconlin Oct 17, 2024
efb1cf5
f add documentation
jrconlin Oct 17, 2024
c658a09
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 17, 2024
588a4ea
Merge branch 'master' into feat/SYNC-4324_track_db
jrconlin Oct 17, 2024
1c7e26a
f r's
jrconlin Oct 18, 2024
07e7db3
f r's
jrconlin Oct 18, 2024
950c996
Merge branch 'feat/SYNC-4324_track_db' into feat/SYNC-4327_redis
jrconlin Oct 21, 2024
853268c
f post merge fix
jrconlin Oct 21, 2024
28bd921
f add metric logging to reliability_cron.py
jrconlin Oct 22, 2024
e465b36
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 24, 2024
148ac9b
Merge branch 'master' into feat/SYNC-4327_redis
jrconlin Oct 31, 2024
b42ea79
Merge branch 'master' into feat/SYNC-4327_redis
rachaelcrook Nov 6, 2024
7fea96c
Merge branch 'master' into feat/SYNC-4327_redis
pjenvey Nov 13, 2024
d9824f3
f r's
jrconlin Nov 22, 2024
62ecdc7
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Nov 22, 2024
3de403d
f fix tests
jrconlin Nov 22, 2024
51d7f7f
f r's
jrconlin Dec 6, 2024
b86ef6b
f autocomplete
jrconlin Dec 6, 2024
b0d1984
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Dec 17, 2024
1f15f74
f r's
jrconlin Dec 17, 2024
76b02dd
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Dec 17, 2024
1bf49d2
f fix up pymarks
jrconlin Dec 18, 2024
92eb64f
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Dec 18, 2024
9c7b13c
f switch to pytest.ini
jrconlin Dec 18, 2024
2243e8a
f force the config file
jrconlin Dec 18, 2024
15c5e46
f move pytest to integration root
jrconlin Dec 19, 2024
e7939f6
f use the one in /code?
jrconlin Dec 19, 2024
20956fd
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Dec 30, 2024
0f20043
f copy up pytest.ini
jrconlin Dec 30, 2024
10a6de3
f remove pytest.ini (in favor or just pyproject.toml)
jrconlin Dec 30, 2024
c799b00
f clean up integration test
jrconlin Dec 30, 2024
bb580b1
Merge branch 'master' into feat/SYNC-4327_redis
jrconlin Jan 13, 2025
6aaf27f
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Jan 14, 2025
dd4231d
f r's
jrconlin Jan 15, 2025
6752916
Merge branch 'feat/SYNC-4327_redis' of github.com:mozilla-services/au…
jrconlin Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ CARGO = cargo
# Let's be very explicit about it for now.
TESTS_DIR := `pwd`/tests
TEST_RESULTS_DIR ?= workspace/test-results
PYTEST_ARGS ?= $(if $(SKIP_SENTRY),-m "not sentry") $(if $(TEST_STUB),,-m "not stub") # Stub tests do not work in CI
# NOTE: Do not be clever.
# The integration tests (and a few others) use pytest markers to control
# the tests that are being run. These markers are set and defined within
# the `./pyproject.toml`. That is the single source of truth.
PYTEST_ARGS := ${PYTEST_ARGS}
INTEGRATION_TEST_DIR := $(TESTS_DIR)/integration
INTEGRATION_TEST_FILE := $(INTEGRATION_TEST_DIR)/test_integration_all_rust.py
NOTIFICATION_TEST_DIR := $(TESTS_DIR)/notification
Expand Down Expand Up @@ -46,17 +50,17 @@ integration-test-clean:
$(DOCKER_COMPOSE) -f $(INTEGRATION_TEST_DIR)/docker-compose.yml down
docker rm integration-tests

integration-test-legacy:
integration-test-legacy: ## pytest markers are stored in `tests/pytest.ini`
$(POETRY) -V
$(POETRY) install --without dev,load,notification --no-root
$(POETRY) run pytest $(INTEGRATION_TEST_FILE) \
--junit-xml=$(TEST_RESULTS_DIR)/integration_test_legacy_results.xml \
-v $(PYTEST_ARGS)

integration-test-local:
integration-test-local: ## pytest markers are stored in `tests/pytest.ini`
$(POETRY) -V
$(POETRY) install --without dev,load,notification --no-root
$(POETRY) run pytest $(INTEGRATION_TEST_FILE) \
$(POETRY) run pytest $(INTEGRATION_TEST_FILE) \
--junit-xml=$(TEST_RESULTS_DIR)/integration_test_results.xml \
-v $(PYTEST_ARGS)

Expand Down
8 changes: 7 additions & 1 deletion autoconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ actix-service = "2.0"
docopt = "1.1"

[features]
default = ["bigtable"]
default = ["bigtable", "reliable_report"]
bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"]
emulator = ["bigtable"]
log_vapid = []
reliable_report = [
"autoconnect_settings/reliable_report",
"autoconnect_web/reliable_report",
"autoconnect_ws/reliable_report",
"autopush_common/reliable_report",
]
4 changes: 2 additions & 2 deletions autoconnect/autoconnect-common/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ impl BroadcastChangeTracker {
}
*ver = broadcast.version;
} else {
trace!("📢 Not found: {}", &b_id);
trace!("📢 Not found: {b_id}");
return Err(ApcErrorKind::BroadcastError("Broadcast not found".into()).into());
}

trace!("📢 New version of {}", &b_id);
trace!("📢 New version of {b_id}");
// Check to see if this broadcast has been updated since initialization
let bcast_index = self
.broadcast_list
Expand Down
1 change: 1 addition & 0 deletions autoconnect/autoconnect-settings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ autopush_common.workspace = true
# specify the default via the calling crate, in order to simplify default chains.
bigtable = ["autopush_common/bigtable"]
emulator = ["bigtable"]
reliable_report = ["autopush_common/reliable_report"]
14 changes: 14 additions & 0 deletions autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use autoconnect_common::{
registry::ClientRegistry,
};
use autopush_common::db::{client::DbClient, DbSettings, StorageType};
#[cfg(feature = "reliable_report")]
use autopush_common::reliability::PushReliability;

use crate::{Settings, ENV_PREFIX};

Expand All @@ -32,6 +34,9 @@ pub struct AppState {
pub settings: Settings,
pub router_url: String,
pub endpoint_url: String,

#[cfg(feature = "reliable_report")]
pub reliability: Arc<PushReliability>,
}

impl AppState {
Expand Down Expand Up @@ -84,6 +89,13 @@ impl AppState {
ENV_PREFIX.to_uppercase()
),
};

#[cfg(feature = "reliable_report")]
let reliability = Arc::new(
PushReliability::new(&settings.reliability_dsn, db.clone()).map_err(|e| {
ConfigError::Message(format!("Could not start Reliability connection: {:?}", e))
})?,
);
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(1))
.build()
Expand All @@ -103,6 +115,8 @@ impl AppState {
settings,
router_url,
endpoint_url,
#[cfg(feature = "reliable_report")]
reliability,
})
}

Expand Down
7 changes: 7 additions & 0 deletions autoconnect/autoconnect-settings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ pub struct Settings {
///
/// By default, the number of available physical CPUs is used as the worker count.
pub actix_workers: Option<usize>,
#[cfg(feature = "reliable_report")]
/// The DNS for the reliability data store. This is normally a Redis compatible
/// storage system. See [Connection Parameters](https://docs.rs/redis/latest/redis/#connection-parameters)
/// for details.
pub reliability_dsn: Option<String>,
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for Settings {
Expand Down Expand Up @@ -139,6 +144,8 @@ impl Default for Settings {
msg_limit: 150,
actix_max_connections: None,
actix_workers: None,
#[cfg(feature = "reliable_report")]
reliability_dsn: None,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions autoconnect/autoconnect-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ tokio.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = [
"autopush_common/reliable_report",
"autoconnect_ws/reliable_report",
]
32 changes: 28 additions & 4 deletions autoconnect/autoconnect-web/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,45 @@ pub async fn ws_route(
}

/// Deliver a Push notification directly to a connected client
#[allow(unused_mut)]
pub async fn push_route(
uaid: web::Path<Uuid>,
notif: web::Json<Notification>,
mut notif: web::Json<Notification>,
app_state: web::Data<AppState>,
) -> HttpResponse {
trace!(
"⏩ push_route, uaid: {} channel_id: {}",
"⏩ in push_route, uaid: {} channel_id: {}",
uaid,
notif.channel_id
notif.channel_id,
);
#[cfg(feature = "reliable_report")]
{
notif
.record_reliability(
&app_state.reliability,
autopush_common::reliability::ReliabilityState::IntAccepted,
)
.await;
notif
.record_reliability(
&app_state.reliability,
autopush_common::reliability::ReliabilityState::Transmitted,
)
.await;
}
// Attempt to send the notification to the UA using WebSocket protocol, or store on failure.
let result = app_state
.clients
.notify(uaid.into_inner(), notif.into_inner())
.notify(uaid.into_inner(), notif.clone())
.await;
if result.is_ok() {
#[cfg(feature = "reliable_report")]
notif
.record_reliability(
&app_state.reliability,
autopush_common::reliability::ReliabilityState::Accepted,
)
.await;
HttpResponse::Ok().finish()
} else {
HttpResponse::NotFound().body("Client not available")
Expand Down
4 changes: 4 additions & 0 deletions autoconnect/autoconnect-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ ctor.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = [
"autopush_common/reliable_report",
"autoconnect_ws_sm/reliable_report",
]
1 change: 1 addition & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ serde_json.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = []
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ impl WebPushClient {
&self.app_state.settings
}

#[cfg(feature = "reliable_report")]
pub fn app_reliability(&self) -> &autopush_common::reliability::PushReliability {
&self.app_state.reliability
}

/// Connect this `WebPushClient` to the `ClientRegistry`
///
/// Returning a `Stream` of `ServerNotification`s from the `ClientRegistry`
Expand Down Expand Up @@ -233,6 +238,7 @@ impl WebPushClient {
let connected_at = self.connected_at;
rt::spawn(async move {
app_state.db.save_messages(&uaid, notifs).await?;
// XXX: record reliability
debug!("Finished saving unacked direct notifs, checking for reconnect");
let Some(user) = app_state.db.get_user(&uaid).await? else {
return Err(SMErrorKind::Internal(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl WebPushClient {
if msg.sortkey_timestamp.is_none() {
expired_topic_sort_keys.push(msg.chidmessageid());
}
// XXX: record ReliabilityState::Expired?
false
});
// TODO: A batch remove_messages would be nicer
Expand Down Expand Up @@ -163,6 +164,22 @@ impl WebPushClient {
Ok(smsgs)
}

#[cfg(feature = "reliable_report")]
/// Record and transition the state for trackable messages.
async fn record_state(
&self,
messages: &mut Vec<Notification>,
state: autopush_common::reliability::ReliabilityState,
) {
// *Note* because `.map()` is sync
// we can't call the async func without additional hoops.
for message in messages {
message
.record_reliability(&self.app_state.reliability, state)
.await;
}
}

/// Read a chunk (max count 10 returned) of Notifications from storage
///
/// This alternates between reading Topic Notifications and Timestamp
Expand All @@ -186,10 +203,20 @@ impl WebPushClient {
let topic_resp = if self.flags.include_topic {
trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
// Get the most recent max 11 messages.
self.app_state
#[allow(unused_mut)]
let mut messages = self
.app_state
.db
.fetch_topic_messages(&self.uaid, 11)
.await?
.await?;
#[cfg(feature = "reliable_report")]
// Since we pulled these from storage, mark them as "retrieved"
self.record_state(
&mut messages.messages,
autopush_common::reliability::ReliabilityState::Retrieved,
)
.await;
messages
} else {
Default::default()
};
Expand Down Expand Up @@ -226,7 +253,8 @@ impl WebPushClient {
"🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
timestamp
);
let timestamp_resp = self
#[allow(unused_mut)]
let mut timestamp_resp = self
.app_state
.db
.fetch_timestamp_messages(&self.uaid, timestamp, 10)
Expand All @@ -244,6 +272,13 @@ impl WebPushClient {
)
.with_tag("topic", "false")
.send();
#[cfg(feature = "reliable_report")]
// Since we pulled these from storage, mark them as "retrieved"
self.record_state(
&mut timestamp_resp.messages,
autopush_common::reliability::ReliabilityState::Retrieved,
)
.await;
}

Ok(CheckStorageResponse {
Expand Down
Loading