Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Local environment mode / integration testing setup #200

Merged
merged 5 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 15 additions & 30 deletions fission-server/src/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
//! The Axum Application State

use crate::{db::Pool, dns::DnsServer, routes::ws::WsPeerMap, setups::ServerSetup};
use anyhow::{anyhow, Result};
use axum::extract::ws;
use dashmap::DashMap;
use fission_core::ed_did_key::EdDidKey;
use futures::channel::mpsc::Sender;
use std::{net::SocketAddr, sync::Arc};

use crate::{db::Pool, dns::DnsServer, traits::ServerSetup};

/// A channel for transmitting messages to a websocket peer
pub type WsPeer = Sender<ws::Message>;

/// A map of all websocket peers connected to each DID-specific channel
pub type WsPeerMap = Arc<DashMap<String, DashMap<SocketAddr, WsPeer>>>;
use std::sync::Arc;

#[derive(Clone)]
/// Global application route state.
Expand All @@ -27,21 +17,23 @@ pub struct AppState<S: ServerSetup> {
/// The service that sends account verification codes
pub verification_code_sender: S::VerificationCodeSender,
/// The currently connected websocket peers
pub ws_peer_map: WsPeerMap,
pub ws_peer_map: Arc<WsPeerMap>,
/// The server's decentralized identity (signing/private key)
pub server_keypair: Arc<EdDidKey>,
/// The DNS server state. Used for answering DoH queries
pub dns_server: DnsServer,
}

/// Builder for [`AppState`]
#[derive(Debug)]
pub struct AppStateBuilder<S: ServerSetup> {
db_pool: Option<Pool>,
ipfs_peers: Vec<String>,
ipfs_db: Option<S::IpfsDatabase>,
verification_code_sender: Option<S::VerificationCodeSender>,
server_keypair: Option<EdDidKey>,
dns_server: Option<DnsServer>,
ws_peer_map: Arc<WsPeerMap>,
}

impl<S: ServerSetup> Default for AppStateBuilder<S> {
Expand All @@ -53,6 +45,7 @@ impl<S: ServerSetup> Default for AppStateBuilder<S> {
verification_code_sender: None,
server_keypair: None,
dns_server: None,
ws_peer_map: Default::default(),
}
}
}
Expand All @@ -78,12 +71,14 @@ impl<S: ServerSetup> AppStateBuilder<S> {
.dns_server
.ok_or_else(|| anyhow!("dns_server is required"))?;

let ws_peer_map = self.ws_peer_map;

Ok(AppState {
db_pool,
ipfs_peers,
ipfs_db,
verification_code_sender,
ws_peer_map: Default::default(),
ws_peer_map,
server_keypair: Arc::new(did),
dns_server,
})
Expand Down Expand Up @@ -127,6 +122,12 @@ impl<S: ServerSetup> AppStateBuilder<S> {
self.dns_server = Some(dns_server);
self
}

/// Set the websocket peer map
pub fn with_ws_peer_map(mut self, ws_peer_map: Arc<WsPeerMap>) -> Self {
self.ws_peer_map = ws_peer_map;
self
}
}

impl<S> std::fmt::Debug for AppState<S>
Expand All @@ -145,19 +146,3 @@ where
.finish()
}
}

impl<S> std::fmt::Debug for AppStateBuilder<S>
where
S: ServerSetup,
S::IpfsDatabase: std::fmt::Debug,
S::VerificationCodeSender: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AppStateBuilder")
.field("db_pool", &self.db_pool)
.field("ipfs_peers", &self.ipfs_peers)
.field("ipfs_db", &self.ipfs_db)
.field("verification_code_sender", &self.verification_code_sender)
.finish()
}
}
4 changes: 2 additions & 2 deletions fission-server/src/extract/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use serde_json::json;

// 🧬

use crate::{app_state::AppState, authority::Authority, error::AppError, traits::ServerSetup};
use crate::{app_state::AppState, authority::Authority, error::AppError, setups::ServerSetup};
use fission_core::{
authority,
authority::Error::{InvalidUcan, MissingCredentials},
Expand Down Expand Up @@ -157,7 +157,7 @@ async fn do_extract_authority<F: Clone + DeserializeOwned>(
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::test_context::{TestContext, TestSetup};
use crate::{setups::test::TestSetup, test_utils::test_context::TestContext};
use axum::{
body::BoxBody,
extract::State,
Expand Down
2 changes: 1 addition & 1 deletion fission-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ pub mod models;
pub mod router;
pub mod routes;
pub mod settings;
pub mod setups;
pub mod tracer;
pub mod tracing_layers;
pub mod traits;

#[cfg(test)]
mod test_utils;
71 changes: 55 additions & 16 deletions fission-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@ use fission_server::{
docs::ApiDoc,
metrics::{process, prom::setup_metrics_recorder},
middleware::{self, request_ulid::MakeRequestUlid, runtime},
models::email_verification::EmailVerificationCodeSender,
router,
routes::fallback::notfound_404,
settings::{Otel, Settings},
routes::{fallback::notfound_404, ws::WsPeerMap},
settings::{AppEnvironment, Otel, Settings},
setups::{
local::{LocalSetup, WebsocketCodeSender},
prod::{EmailVerificationCodeSender, IpfsHttpApiDatabase, ProdSetup},
ServerSetup,
},
tracer::init_tracer,
tracing_layers::{
format_layer::LogFmtLayer,
metrics_layer::{MetricsLayer, METRIC_META_PREFIX},
storage_layer::StorageLayer,
},
traits::{IpfsHttpApiDatabase, ServerSetup},
};
use http::header;
use metrics_exporter_prometheus::PrometheusHandle;
Expand All @@ -36,6 +39,7 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
path::PathBuf,
process::exit,
sync::Arc,
time::Duration,
};
use tokio::{
Expand Down Expand Up @@ -64,14 +68,6 @@ use utoipa_swagger_ui::SwaggerUi;
/// Request identifier field.
const REQUEST_ID: &str = "request_id";

#[derive(Clone, Debug, Default)]
pub struct ProdSetup;

impl ServerSetup for ProdSetup {
type IpfsDatabase = IpfsHttpApiDatabase;
type VerificationCodeSender = EmailVerificationCodeSender;
}

#[tokio::main]
async fn main() -> Result<()> {
let (stdout_writer, _stdout_guard) = tracing_appender::non_blocking(io::stdout());
Expand All @@ -88,7 +84,22 @@ async fn main() -> Result<()> {
"starting server",
);

let app_state = setup_app_state(&settings, db_pool.clone()).await?;
match settings.server.environment {
AppEnvironment::Staging | AppEnvironment::Prod => {
let app_state = setup_prod_app_state(&settings, db_pool).await?;
run_with_app_state(settings, app_state).await
}
AppEnvironment::Local | AppEnvironment::Dev => {
let app_state = setup_local_app_state(&settings, db_pool).await?;
run_with_app_state(settings, app_state).await
}
}
}

async fn run_with_app_state<S: ServerSetup + 'static>(
settings: Settings,
app_state: AppState<S>,
) -> Result<()> {
let dns_server = app_state.dns_server.clone();
let recorder_handle = setup_metrics_recorder()?;
let cancellation_token = CancellationToken::new();
Expand Down Expand Up @@ -165,7 +176,7 @@ async fn serve_metrics(
Ok(())
}

async fn setup_app_state(settings: &Settings, db_pool: Pool) -> Result<AppState<ProdSetup>> {
async fn setup_prod_app_state(settings: &Settings, db_pool: Pool) -> Result<AppState<ProdSetup>> {
let server_keypair = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("config")
.join(&settings.server.keypair_path);
Expand All @@ -190,8 +201,36 @@ async fn setup_app_state(settings: &Settings, db_pool: Pool) -> Result<AppState<
Ok(app_state)
}

async fn serve_app(
app_state: AppState<ProdSetup>,
async fn setup_local_app_state(settings: &Settings, db_pool: Pool) -> Result<AppState<LocalSetup>> {
let server_keypair = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("config")
.join(&settings.server.keypair_path);

let server_keypair = fs::read_to_string(&server_keypair)
.await
.map_err(|e| anyhow!(e))
.and_then(|pem| EdDidKey::from_pkcs8_pem(&pem).map_err(|e| anyhow!(e)))
.map_err(|e| anyhow!("Couldn't load server DID from {}: {}. Make sure to generate a key by running `openssl genpkey -algorithm ed25519 -out {}`", server_keypair.to_string_lossy(), e, server_keypair.to_string_lossy()))?;

let dns_server = DnsServer::new(&settings.dns, db_pool.clone(), server_keypair.did())?;

let ws_peer_map = Arc::new(WsPeerMap::default());

let app_state = AppStateBuilder::<LocalSetup>::default()
.with_db_pool(db_pool)
.with_ipfs_peers(settings.ipfs.peers.clone())
.with_ws_peer_map(Arc::clone(&ws_peer_map))
.with_verification_code_sender(WebsocketCodeSender::new(ws_peer_map))
.with_ipfs_db(IpfsHttpApiDatabase::default())
.with_server_keypair(server_keypair)
.with_dns_server(dns_server)
.finalize()?;

Ok(app_state)
}

async fn serve_app<S: ServerSetup + 'static>(
app_state: AppState<S>,
settings: Settings,
token: CancellationToken,
) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion fission-server/src/models/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::{
db::{schema::accounts, Conn},
models::volume::{NewVolumeRecord, Volume},
traits::IpfsDatabase,
setups::IpfsDatabase,
};
use anyhow::{bail, Result};
use chrono::NaiveDateTime;
Expand Down
83 changes: 1 addition & 82 deletions fission-server/src/models/email_verification.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Email Verification Model
use crate::db::{schema::email_verifications, Conn};
use anyhow::Result;
use async_trait::async_trait;
use chrono::NaiveDateTime;
use diesel::{
dsl::{now, IntervalDsl},
Expand All @@ -10,88 +10,7 @@ use diesel::{
use diesel_async::RunQueryDsl;
use fission_core::common::EmailVerifyRequest;
use hex::ToHex;
use mailgun_rs::{EmailAddress, Mailgun, MailgunRegion, Message};
use rand::Rng;
use std::collections::HashMap;
use tracing::log;

use crate::{
db::{schema::email_verifications, Conn},
settings,
traits::VerificationCodeSender,
};

#[derive(Debug, Clone)]
/// Sends verification codes over email
pub struct EmailVerificationCodeSender {
settings: settings::Mailgun,
}

impl EmailVerificationCodeSender {
/// Create a new EmailVerificationCodeSender
pub fn new(settings: settings::Mailgun) -> Self {
Self { settings }
}

fn sender(&self) -> EmailAddress {
EmailAddress::name_address(&self.settings.from_name, &self.settings.from_address)
}

fn subject(&self) -> &str {
self.settings.subject.as_str()
}

fn template(&self) -> &str {
self.settings.template.as_str()
}

fn api_key(&self) -> &str {
self.settings.api_key.as_str()
}

fn domain(&self) -> &str {
self.settings.domain.as_str()
}

fn message(&self, email: &str, code: &str) -> Message {
let delivery_address = EmailAddress::address(email);
let template_vars = HashMap::from_iter([("code".to_string(), code.to_string())]);

Message {
to: vec![delivery_address],
subject: self.subject().to_string(),
template: self.template().to_string(),
template_vars,
..Default::default()
}
}
}

#[async_trait]
impl VerificationCodeSender for EmailVerificationCodeSender {
/// Sends the code to the user
async fn send_code(&self, email: &str, code: &str) -> Result<()> {
let message = self.message(email, code);

log::debug!(
"Sending verification email:\nTo: {}\nSubject: {}\nTemplate: {}\nTemplate Vars: {:?}",
email,
message.subject,
message.template,
message.template_vars
);

let client = Mailgun {
message,
api_key: self.api_key().to_string(),
domain: self.domain().to_string(),
};

client.async_send(MailgunRegion::US, &self.sender()).await?;

Ok(())
}
}

/// Email Verification Request
#[derive(Insertable, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion fission-server/src/models/volume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use diesel_async::RunQueryDsl;

use crate::{
db::{schema::volumes, Conn},
traits::IpfsDatabase,
setups::IpfsDatabase,
};

#[derive(Debug, Queryable, Insertable, Clone, Identifiable, Selectable, ToSchema)]
Expand Down
4 changes: 2 additions & 2 deletions fission-server/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
routes::{
account, auth, capability_indexing, doh, fallback::notfound_404, health, ipfs, ping, ws,
},
traits::ServerSetup,
setups::ServerSetup,
};
use axum::{
routing::{delete, get, patch, post},
Expand Down Expand Up @@ -35,7 +35,7 @@ pub fn setup_app_router<S: ServerSetup + 'static>(app_state: AppState<S>) -> Rou
.with_state(app_state.clone());

let api_router = Router::new()
.route("/relay/:did", get(ws::handler))
.route("/relay/:topic", get(ws::handler))
.route("/auth/email/verify", post(auth::request_token))
.route("/server-did", get(auth::server_did))
.route("/account", post(account::create_account))
Expand Down
Loading
Loading