From 91d058ce6f26295affb7731539dc5ad8b7dc91a9 Mon Sep 17 00:00:00 2001 From: Ryan Breen Date: Tue, 14 Nov 2023 23:48:02 -0500 Subject: [PATCH] fix: Better relay handling --- nomen/src/config/cfg.rs | 13 ---------- nomen/src/db/index.rs | 3 +++ nomen/src/db/mod.rs | 3 ++- nomen/src/db/relay_index.rs | 10 +++++--- .../subcommands/index/events/relay_index.rs | 24 +++++++++++++------ 5 files changed, 29 insertions(+), 24 deletions(-) diff --git a/nomen/src/config/cfg.rs b/nomen/src/config/cfg.rs index 8cec781..43666b1 100644 --- a/nomen/src/config/cfg.rs +++ b/nomen/src/config/cfg.rs @@ -73,19 +73,6 @@ impl Config { Ok((keys, client)) } - pub async fn nostr_keys_client( - &self, - keys: &nostr_sdk::Keys, - ) -> anyhow::Result { - let client = nostr_sdk::Client::with_opts(keys, Options::new().wait_for_send(true)); - let relays = self.relays(); - for relay in relays { - client.add_relay(relay, None).await?; - } - client.connect().await; - Ok(client) - } - pub async fn nostr_random_client( &self, ) -> anyhow::Result<(nostr_sdk::Keys, nostr_sdk::Client)> { diff --git a/nomen/src/db/index.rs b/nomen/src/db/index.rs index 06d0ea5..7df20e4 100644 --- a/nomen/src/db/index.rs +++ b/nomen/src/db/index.rs @@ -191,5 +191,8 @@ pub async fn reindex( sqlx::query("DELETE FROM name_events;") .execute(conn) .await?; + sqlx::query("DELETE FROM relay_index_queue;") + .execute(conn) + .await?; Ok(()) } diff --git a/nomen/src/db/mod.rs b/nomen/src/db/mod.rs index ed21a2d..35f1688 100644 --- a/nomen/src/db/mod.rs +++ b/nomen/src/db/mod.rs @@ -9,7 +9,7 @@ pub mod raw; pub mod relay_index; pub mod stats; -static MIGRATIONS: [&str; 17] = [ +static MIGRATIONS: [&str; 18] = [ "CREATE TABLE event_log (id INTEGER PRIMARY KEY, created_at, type, data);", "CREATE TABLE index_height (blockheight INTEGER PRIMARY KEY, blockhash);", "CREATE TABLE raw_blockchain (id INTEGER PRIMARY KEY, blockhash, txid, blocktime, blockheight, txheight, vout, data, indexed_at);", @@ -46,6 +46,7 @@ static MIGRATIONS: [&str; 17] = [ "CREATE TABLE relay_index_queue (name);", "ALTER TABLE blockchain_index ADD COLUMN v1_upgrade_blockheight;", "ALTER TABLE blockchain_index ADD COLUMN v1_upgrade_txid", + "CREATE UNIQUE INDEX riq_name_idx ON relay_index_queue (name)", ]; pub async fn initialize(config: &Config) -> anyhow::Result { diff --git a/nomen/src/db/relay_index.rs b/nomen/src/db/relay_index.rs index 02b9935..53f3841 100644 --- a/nomen/src/db/relay_index.rs +++ b/nomen/src/db/relay_index.rs @@ -4,7 +4,7 @@ pub async fn queue( conn: impl sqlx::Executor<'_, Database = Sqlite> + Copy, name: &str, ) -> anyhow::Result<()> { - sqlx::query("INSERT INTO relay_index_queue (name) VALUES (?)") + sqlx::query("INSERT OR IGNORE INTO relay_index_queue (name) VALUES (?)") .bind(name) .execute(conn) .await?; @@ -31,8 +31,12 @@ pub async fn fetch_all( Ok(results) } -pub async fn clear(conn: impl sqlx::Executor<'_, Database = Sqlite> + Copy) -> anyhow::Result<()> { - sqlx::query("DELETE FROM relay_index_queue;") +pub async fn delete( + conn: impl sqlx::Executor<'_, Database = Sqlite> + Copy, + name: &str, +) -> anyhow::Result<()> { + sqlx::query("DELETE FROM relay_index_queue WHERE name = ?;") + .bind(name) .execute(conn) .await?; Ok(()) diff --git a/nomen/src/subcommands/index/events/relay_index.rs b/nomen/src/subcommands/index/events/relay_index.rs index 222882f..95c0ea7 100644 --- a/nomen/src/subcommands/index/events/relay_index.rs +++ b/nomen/src/subcommands/index/events/relay_index.rs @@ -19,18 +19,19 @@ pub async fn publish(config: &Config, pool: &SqlitePool) -> anyhow::Result<()> { .expect("Missing config validation for secret") .into(); let keys = Keys::new(sk); - let client = config.nostr_keys_client(&keys).await?; + let (_, client) = config.nostr_random_client().await?; + tracing::info!("Publishing relay index."); let names = db::relay_index::fetch_all(pool).await?; + send_events(pool, names, keys, &client).await?; + tracing::info!("Publishing relay index complete."); - send_event(names, keys, &client).await?; - - db::relay_index::clear(pool).await?; client.disconnect().await.ok(); Ok(()) } -async fn send_event( +async fn send_events( + conn: &SqlitePool, names: Vec, keys: Keys, client: &nostr_sdk::Client, @@ -50,8 +51,17 @@ async fn send_event( ) .to_event(&keys)?; - if client.send_event(event).await.is_err() { - tracing::error!("Unable to broadcast event during relay index publish"); + match client.send_event(event.clone()).await { + Ok(s) => { + tracing::info!("Broadcast event id {s}"); + db::relay_index::delete(conn, &name.name).await?; + } + Err(e) => { + tracing::error!( + "Unable to broadcast event {} during relay index publish: {e}", + event.id + ); + } } } Ok(())