diff --git a/Cargo.lock b/Cargo.lock index 050886ce84..7de2c31534 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3770,6 +3770,7 @@ dependencies = [ "time 0.3.36", "unicode-normalization", "url", + "uuid 1.10.0", "validator", ] diff --git a/Cargo.toml b/Cargo.toml index c3ba737ac2..1449c35b93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -176,7 +176,7 @@ tokio-util = { version = "0.7", features = ["io", "compat"] } tonic = { version = "0.12", features = ["tls", "tls-roots"] } hyper-util = "0.1" tower = { version = "0.5", features = ["util"] } -tower-http = { version = "0.5", features = ["trace"] } +tower-http = { version = "0.5", features = ["cors", "trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = [ "time", diff --git a/Tiltfile b/Tiltfile index 3d776326e6..6025b231ef 100644 --- a/Tiltfile +++ b/Tiltfile @@ -4,9 +4,9 @@ os.putenv("DATABASE_URL", DATABASE_URL) os.putenv("RUST_LOG", "info") os.putenv("DOCKER_DEFAULT_PLATFORM", "linux/amd64") -# Secret used to sign Authorizations within a local data plane, as base64("secret"). +# Secret used to sign Authorizations within a local data plane, as base64("supersecret"). # Also allow requests without an Authorization (to not break data-plane-gateway just yet). -AUTH_KEYS="c2VjcmV0,AA==" +AUTH_KEYS="c3VwZXJzZWNyZXQ=,AA==" os.putenv("CONSUMER_AUTH_KEYS", AUTH_KEYS) os.putenv("BROKER_AUTH_KEYS", AUTH_KEYS) @@ -83,8 +83,9 @@ local_resource('reactor', serve_cmd='%s/flow/.build/package/bin/flowctl-go serve local_resource('agent', serve_cmd='%s/flow/.build/package/bin/agent \ --connector-network supabase_network_flow \ --allow-local \ - --builds-root %s \ + --allow-origin http://localhost:3000 \ --api-port 8675 \ + --builds-root %s \ --serve-handlers \ --bin-dir %s/flow/.build/package/bin' % (REPO_BASE, FLOW_BUILDS_ROOT, REPO_BASE), resource_deps=['reactor', 'gazette'] @@ -101,7 +102,7 @@ local_resource('create-data-plane-local-cluster', "manual": {\ "brokerAddress": "http://localhost:8080",\ "reactorAddress": "http://localhost:9000",\ - "hmacKeys": ["c2VjcmV0"]\ + "hmacKeys": ["c3VwZXJzZWNyZXQ="]\ }\ }\ }\' http://localhost:8675/admin/create-data-plane' % SYSTEM_USER_TOKEN, diff --git a/crates/agent/src/api/authorize.rs b/crates/agent/src/api/authorize_task.rs similarity index 84% rename from crates/agent/src/api/authorize.rs rename to crates/agent/src/api/authorize_task.rs index 6cebd58ad3..d93242e7b5 100644 --- a/crates/agent/src/api/authorize.rs +++ b/crates/agent/src/api/authorize_task.rs @@ -2,24 +2,8 @@ use super::{App, Snapshot}; use anyhow::Context; use std::sync::Arc; -#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct Request { - // # JWT token to be authorized and signed. - token: String, -} - -#[derive(Default, Debug, serde::Serialize, schemars::JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct Response { - // # JWT token which has been authorized for use. - token: String, - // # Address of Gazette brokers for the issued token. - broker_address: String, - // # Number of milliseconds to wait before retrying the request. - // Non-zero if and only if token is not set. - retry_millis: u64, -} +type Request = models::authorizations::TaskAuthorizationRequest; +type Response = models::authorizations::TaskAuthorization; #[axum::debug_handler] pub async fn authorize_task( @@ -108,12 +92,11 @@ async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Re fn evaluate_authorization( Snapshot { - taken: _, collections, data_planes, role_grants, tasks, - refresh_tx: _, + .. }: &Snapshot, shard_id: &str, shard_data_plane_fqdn: &str, @@ -188,22 +171,12 @@ fn evaluate_authorization( ); }; - let ops_kind = match task.spec_type { - models::CatalogType::Capture => "capture", - models::CatalogType::Collection => "derivation", - models::CatalogType::Materialization => "materialization", - models::CatalogType::Test => "test", - }; - // As a special case outside of the RBAC system, allow a task to write // to its designated partition within its ops collections. if required_role == models::Capability::Write && (collection.collection_name == task_data_plane.ops_logs_name || collection.collection_name == task_data_plane.ops_stats_name) - && journal_name_or_prefix.ends_with(&format!( - "/kind={ops_kind}/name={}/pivot=00", - labels::percent_encoding(&task.task_name).to_string(), - )) + && journal_name_or_prefix.ends_with(&super::ops_suffix(task)) { // Authorized write into designated ops partition. } else if tables::RoleGrant::is_authorized( @@ -214,10 +187,6 @@ fn evaluate_authorization( ) { // Authorized access through RBAC. } else { - let ops_suffix = format!( - "/kind={ops_kind}/name={}/pivot=00", - labels::percent_encoding(&task.task_name).to_string(), - ); tracing::warn!( %task.spec_type, %shard_id, @@ -225,7 +194,7 @@ fn evaluate_authorization( ?required_role, ops_logs=%task_data_plane.ops_logs_name, ops_stats=%task_data_plane.ops_stats_name, - %ops_suffix, + ops_suffix=%super::ops_suffix(task), "task authorization rejection context" ); anyhow::bail!( @@ -244,6 +213,9 @@ fn evaluate_authorization( Ok(( encoding_key, collection_data_plane.data_plane_fqdn.clone(), - collection_data_plane.broker_address.clone(), + super::maybe_rewrite_address( + task.data_plane_id != collection.data_plane_id, + &collection_data_plane.broker_address, + ), )) } diff --git a/crates/agent/src/api/authorize_user_collection.rs b/crates/agent/src/api/authorize_user_collection.rs new file mode 100644 index 0000000000..40e7d1633c --- /dev/null +++ b/crates/agent/src/api/authorize_user_collection.rs @@ -0,0 +1,122 @@ +use super::{App, Snapshot}; +use anyhow::Context; +use std::sync::Arc; + +type Request = models::authorizations::UserCollectionAuthorizationRequest; +type Response = models::authorizations::UserCollectionAuthorization; + +#[tracing::instrument( + skip(snapshot), + err(level = tracing::Level::WARN), +)] +async fn do_authorize_user_collection( + App { snapshot, .. }: &App, + super::ControlClaims { + sub: user_id, + email, + .. + }: super::ControlClaims, + Request { + collection: collection_name, + started_unix, + }: Request, +) -> anyhow::Result { + let (has_started, started_unix) = if started_unix == 0 { + (false, jsonwebtoken::get_current_timestamp()) + } else { + (true, started_unix) + }; + + loop { + match Snapshot::evaluate(snapshot, started_unix, |snapshot: &Snapshot| { + evaluate_authorization(snapshot, user_id, email.as_ref(), &collection_name) + }) { + Ok(response) => return Ok(response), + Err(Ok(retry_millis)) if has_started => { + return Ok(Response { + retry_millis, + ..Default::default() + }) + } + Err(Ok(retry_millis)) => { + () = tokio::time::sleep(std::time::Duration::from_millis(retry_millis)).await; + } + Err(Err(err)) => return Err(err), + } + } +} + +fn evaluate_authorization( + snapshot: &Snapshot, + user_id: uuid::Uuid, + user_email: Option<&String>, + collection_name: &models::Collection, +) -> anyhow::Result { + if !tables::UserGrant::is_authorized( + &snapshot.role_grants, + &snapshot.user_grants, + user_id, + collection_name, + models::Capability::Read, + ) { + anyhow::bail!( + "{} is not authorized to {collection_name}", + user_email.map(String::as_str).unwrap_or("user") + ); + } + + let Some(collection) = snapshot.collection_by_catalog_name(collection_name) else { + anyhow::bail!("collection {collection_name} is not known") + }; + let Some(data_plane) = snapshot.data_planes.get_by_key(&collection.data_plane_id) else { + anyhow::bail!("couldn't resolve collection {collection_name} data-plane") + }; + let Some(encoding_key) = data_plane.hmac_keys.first() else { + anyhow::bail!( + "collection data-plane {} has no configured HMAC keys", + data_plane.data_plane_name + ); + }; + let encoding_key = jsonwebtoken::EncodingKey::from_base64_secret(&encoding_key)?; + + let iat = jsonwebtoken::get_current_timestamp(); + let exp = iat + super::exp_seconds(); + let header = jsonwebtoken::Header::default(); + + let claims = super::DataClaims { + inner: proto_gazette::Claims { + cap: proto_gazette::capability::LIST | proto_gazette::capability::READ, + exp, + iat, + iss: data_plane.data_plane_fqdn.clone(), + sub: user_id.to_string(), + sel: proto_gazette::broker::LabelSelector { + include: Some(labels::build_set([ + ("name:prefix", collection.journal_template_name.as_str()), + (labels::COLLECTION, collection_name.as_str()), + ])), + exclude: None, + }, + }, + // TODO(johnny): Temporary support for data-plane-gateway. + prefixes: vec![collection.journal_template_name.clone()], + }; + let token = jsonwebtoken::encode(&header, &claims, &encoding_key) + .context("failed to encode authorized JWT")?; + + Ok(Response { + broker_address: super::maybe_rewrite_address(true, &data_plane.broker_address), + broker_token: token, + journal_name_prefix: collection.journal_template_name.clone(), + retry_millis: 0, + }) +} + +#[axum::debug_handler] +pub async fn authorize_user_collection( + axum::extract::State(app): axum::extract::State>, + axum::Extension(claims): axum::Extension, + super::Request(request): super::Request, +) -> axum::response::Response { + super::wrap(async move { do_authorize_user_collection(&app, claims, request).await }).await +} diff --git a/crates/agent/src/api/authorize_user_task.rs b/crates/agent/src/api/authorize_user_task.rs new file mode 100644 index 0000000000..06bdb7ac55 --- /dev/null +++ b/crates/agent/src/api/authorize_user_task.rs @@ -0,0 +1,162 @@ +use super::{App, Snapshot}; +use anyhow::Context; +use std::sync::Arc; + +type Request = models::authorizations::UserTaskAuthorizationRequest; +type Response = models::authorizations::UserTaskAuthorization; + +#[tracing::instrument( + skip(snapshot), + err(level = tracing::Level::WARN), +)] +async fn do_authorize_user_task( + App { snapshot, .. }: &App, + super::ControlClaims { + sub: user_id, + email, + .. + }: super::ControlClaims, + Request { + task: task_name, + started_unix, + }: Request, +) -> anyhow::Result { + let (has_started, started_unix) = if started_unix == 0 { + (false, jsonwebtoken::get_current_timestamp()) + } else { + (true, started_unix) + }; + + loop { + match Snapshot::evaluate(snapshot, started_unix, |snapshot: &Snapshot| { + evaluate_authorization(snapshot, user_id, email.as_ref(), &task_name) + }) { + Ok(response) => return Ok(response), + Err(Ok(retry_millis)) if has_started => { + return Ok(Response { + retry_millis, + ..Default::default() + }) + } + Err(Ok(retry_millis)) => { + () = tokio::time::sleep(std::time::Duration::from_millis(retry_millis)).await; + } + Err(Err(err)) => return Err(err), + } + } +} + +fn evaluate_authorization( + snapshot: &Snapshot, + user_id: uuid::Uuid, + user_email: Option<&String>, + task_name: &models::Name, +) -> anyhow::Result { + if !tables::UserGrant::is_authorized( + &snapshot.role_grants, + &snapshot.user_grants, + user_id, + task_name, + models::Capability::Read, + ) { + anyhow::bail!( + "{} is not authorized to {task_name}", + user_email.map(String::as_str).unwrap_or("user") + ); + } + + let Some(task) = snapshot.task_by_catalog_name(task_name) else { + anyhow::bail!("task {task_name} is not known") + }; + let Some(data_plane) = snapshot.data_planes.get_by_key(&task.data_plane_id) else { + anyhow::bail!("couldn't resolve task {task_name} data-plane") + }; + let Some(encoding_key) = data_plane.hmac_keys.first() else { + anyhow::bail!( + "task data-plane {} has no configured HMAC keys", + data_plane.data_plane_name + ); + }; + let encoding_key = jsonwebtoken::EncodingKey::from_base64_secret(&encoding_key)?; + + let (Some(ops_logs), Some(ops_stats)) = ( + snapshot.collection_by_catalog_name(&data_plane.ops_logs_name), + snapshot.collection_by_catalog_name(&data_plane.ops_stats_name), + ) else { + anyhow::bail!( + "couldn't resolve data-plane {} ops collections", + task.data_plane_id + ) + }; + + let ops_suffix = super::ops_suffix(task); + let ops_logs_journal = format!("{}{}", ops_logs.journal_template_name, &ops_suffix[1..]); + let ops_stats_journal = format!("{}{}", ops_stats.journal_template_name, &ops_suffix[1..]); + + let iat = jsonwebtoken::get_current_timestamp(); + let exp = iat + super::exp_seconds(); + let header = jsonwebtoken::Header::default(); + + let claims = super::DataClaims { + inner: proto_gazette::Claims { + cap: proto_gazette::capability::LIST | proto_gazette::capability::READ, + exp, + iat, + iss: data_plane.data_plane_fqdn.clone(), + sub: user_id.to_string(), + sel: proto_gazette::broker::LabelSelector { + include: Some(labels::build_set([ + ("name", ops_logs_journal.as_str()), + ("name", ops_stats_journal.as_str()), + ])), + exclude: None, + }, + }, + // TODO(johnny): Temporary support for data-plane-gateway. + prefixes: vec![ops_logs_journal.clone(), ops_stats_journal.clone()], + }; + let broker_token = jsonwebtoken::encode(&header, &claims, &encoding_key) + .context("failed to encode authorized JWT")?; + + let claims = super::DataClaims { + inner: proto_gazette::Claims { + cap: proto_gazette::capability::LIST + | proto_gazette::capability::READ + | proto_flow::capability::NETWORK_PROXY, + exp, + iat, + iss: claims.inner.iss, + sub: claims.inner.sub, + sel: proto_gazette::broker::LabelSelector { + include: Some(labels::build_set([( + "id:prefix", + task.shard_template_id.as_str(), + )])), + exclude: None, + }, + }, + prefixes: vec![task.task_name.to_string()], + }; + let reactor_token = jsonwebtoken::encode(&header, &claims, &encoding_key) + .context("failed to encode authorized JWT")?; + + Ok(Response { + broker_address: super::maybe_rewrite_address(true, &data_plane.broker_address), + broker_token, + ops_logs_journal, + ops_stats_journal, + reactor_address: super::maybe_rewrite_address(true, &data_plane.reactor_address), + reactor_token, + retry_millis: 0, + shard_id_prefix: task.shard_template_id.clone(), + }) +} + +#[axum::debug_handler] +pub async fn authorize_user_task( + axum::extract::State(app): axum::extract::State>, + axum::Extension(claims): axum::Extension, + super::Request(request): super::Request, +) -> axum::response::Response { + super::wrap(async move { do_authorize_user_task(&app, claims, request).await }).await +} diff --git a/crates/agent/src/api/create_data_plane.rs b/crates/agent/src/api/create_data_plane.rs index 623470ba50..79a1450b1a 100644 --- a/crates/agent/src/api/create_data_plane.rs +++ b/crates/agent/src/api/create_data_plane.rs @@ -56,7 +56,7 @@ async fn do_create_data_plane( id_generator, .. }: &App, - super::Claims { sub: user_id, .. }: super::Claims, + super::ControlClaims { sub: user_id, .. }: super::ControlClaims, Request { name, private, @@ -228,7 +228,7 @@ async fn do_create_data_plane( #[axum::debug_handler] pub async fn create_data_plane( axum::extract::State(app): axum::extract::State>, - axum::Extension(claims): axum::Extension, + axum::Extension(claims): axum::Extension, super::Request(request): super::Request, ) -> axum::response::Response { super::wrap(async move { do_create_data_plane(&app, claims, request).await }).await diff --git a/crates/agent/src/api/mod.rs b/crates/agent/src/api/mod.rs index 8f48337bf6..60e635d845 100644 --- a/crates/agent/src/api/mod.rs +++ b/crates/agent/src/api/mod.rs @@ -1,11 +1,14 @@ use axum::{http::StatusCode, response::IntoResponse}; use std::sync::{Arc, Mutex}; -mod authorize; +mod authorize_task; +mod authorize_user_collection; +mod authorize_user_task; mod create_data_plane; mod snapshot; mod update_l2_reporting; +use anyhow::Context; use snapshot::Snapshot; /// Request wraps a JSON-deserialized request type T which @@ -13,15 +16,19 @@ use snapshot::Snapshot; #[derive(Debug, Clone, Copy, Default)] pub struct Request(pub T); -/// Claims are the JWT claims attached to control-plane access tokens. -#[derive(Debug, Clone, serde::Deserialize)] -pub struct Claims { - // Note that many more fields, such as additional user metadata, - // are available if we choose to parse them. - pub sub: uuid::Uuid, - pub email: String, - pub iat: usize, - pub exp: usize, +/// ControlClaims are claims encoded within control-plane access tokens. +type ControlClaims = models::authorizations::ControlClaims; + +/// DataClaims are claims encoded within data-plane access tokens. +/// TODO(johnny): This should be a bare alias for proto_gazette::Claims. +/// We can do this once data-plane-gateway is updated to be a "dumb" proxy +/// which requires / forwards authorizations but doesn't inspect them. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct DataClaims { + #[serde(flatten)] + inner: proto_gazette::Claims, + // prefixes exclusively used by legacy auth checks in data-plane-gateway. + prefixes: Vec, } /// Rejection is an error type of reasons why an API request may fail. @@ -48,7 +55,8 @@ pub fn build_router( jwt_secret: Vec, pg_pool: sqlx::PgPool, publisher: crate::publications::Publisher, -) -> axum::Router<()> { + allow_origin: &[String], +) -> anyhow::Result> { let jwt_secret = jsonwebtoken::DecodingKey::from_secret(&jwt_secret); let mut jwt_validation = jsonwebtoken::Validation::default(); @@ -68,8 +76,45 @@ pub fn build_router( use axum::routing::post; + let allow_origin = allow_origin + .into_iter() + .map(|o| o.parse()) + .collect::, _>>() + .context("failed to parse allowed origins")?; + + let allow_headers = [ + "Cache-Control", + "Content-Language", + "Content-Length", + "Content-Type", + "Expires", + "Last-Modified", + "Pragma", + "Authorization", + ] + .into_iter() + .map(|h| h.parse().unwrap()) + .collect::>(); + + let cors = tower_http::cors::CorsLayer::new() + .allow_methods(tower_http::cors::AllowMethods::mirror_request()) + .allow_origin(tower_http::cors::AllowOrigin::list(allow_origin)) + .allow_headers(allow_headers); + let schema_router = axum::Router::new() - .route("/authorize/task", post(authorize::authorize_task)) + .route("/authorize/task", post(authorize_task::authorize_task)) + .route( + "/authorize/user/task", + post(authorize_user_task::authorize_user_task) + .route_layer(axum::middleware::from_fn_with_state(app.clone(), authorize)) + .options(preflight_handler), + ) + .route( + "/authorize/user/collection", + post(authorize_user_collection::authorize_user_collection) + .route_layer(axum::middleware::from_fn_with_state(app.clone(), authorize)) + .options(preflight_handler), + ) .route( "/admin/create-data-plane", post(create_data_plane::create_data_plane) @@ -81,9 +126,14 @@ pub fn build_router( .route_layer(axum::middleware::from_fn_with_state(app.clone(), authorize)), ) .layer(tower_http::trace::TraceLayer::new_for_http()) + .layer(cors) .with_state(app); - schema_router + Ok(schema_router) +} + +async fn preflight_handler() -> impl IntoResponse { + (StatusCode::NO_CONTENT, "") } #[axum::async_trait] @@ -140,7 +190,7 @@ async fn authorize( mut req: axum::http::Request, next: axum::middleware::Next, ) -> axum::response::Response { - let token = match jsonwebtoken::decode::( + let token = match jsonwebtoken::decode::( bearer.token(), &app.jwt_secret, &app.jwt_validation, @@ -166,3 +216,26 @@ fn exp_seconds() -> u64 { // which spreads out load from re-authorization requests over time. rand::thread_rng().gen_range(40 * 60..80 * 60) } + +fn ops_suffix(task: &snapshot::SnapshotTask) -> String { + let ops_kind = match task.spec_type { + models::CatalogType::Capture => "capture", + models::CatalogType::Collection => "derivation", + models::CatalogType::Materialization => "materialization", + models::CatalogType::Test => "test", + }; + format!( + "/kind={ops_kind}/name={}/pivot=00", + labels::percent_encoding(&task.task_name).to_string(), + ) +} + +// Support the legacy data-plane by re-writing its internal service +// addresses to use the data-plane-gateway in external contexts. +fn maybe_rewrite_address(external: bool, address: &str) -> String { + if external && address.contains("svc.cluster.local:") { + "https://us-central1.v1.estuary-data.dev".to_string() + } else { + address.to_string() + } +} diff --git a/crates/agent/src/api/snapshot.rs b/crates/agent/src/api/snapshot.rs index e37507b23c..f49a587be1 100644 --- a/crates/agent/src/api/snapshot.rs +++ b/crates/agent/src/api/snapshot.rs @@ -9,12 +9,18 @@ pub struct Snapshot { pub taken: std::time::SystemTime, // Platform collections, indexed on `journal_template_name`. pub collections: Vec, + // Indices of `collections`, indexed on `collection_name`. + pub collections_idx_name: Vec, // Platform data-planes. pub data_planes: tables::DataPlanes, // Platform role grants. pub role_grants: tables::RoleGrants, + // Platform user grants. + pub user_grants: tables::UserGrants, // Platform tasks, indexed on `shard_template_id`. pub tasks: Vec, + // Indices of `tasks`, indexed on `task_name`. + pub tasks_idx_name: Vec, // `refresh` is take()-en when the current snapshot should be refreshed. pub refresh_tx: Option>, } @@ -90,6 +96,28 @@ impl Snapshot { } } + pub fn task_by_catalog_name<'s>(&'s self, name: &models::Name) -> Option<&'s SnapshotTask> { + self.tasks_idx_name + .binary_search_by(|i| self.tasks[*i].task_name.as_str().cmp(name)) + .ok() + .map(|index| { + let task = &self.tasks[self.tasks_idx_name[index]]; + assert_eq!(&task.task_name, name); + task + }) + } + + pub fn collection_by_catalog_name<'s>(&'s self, name: &str) -> Option<&'s SnapshotCollection> { + self.collections_idx_name + .binary_search_by(|i| self.collections[*i].collection_name.as_str().cmp(name)) + .ok() + .map(|index| { + let collection = &self.collections[self.collections_idx_name[index]]; + assert_eq!(collection.collection_name.as_str(), name); + collection + }) + } + fn begin_refresh<'m>( guard: std::sync::RwLockReadGuard<'_, Self>, mu: &'m std::sync::RwLock, @@ -113,9 +141,12 @@ pub fn seed() -> (Snapshot, futures::channel::oneshot::Receiver<()>) { Snapshot { taken: std::time::SystemTime::UNIX_EPOCH, collections: Vec::new(), + collections_idx_name: Vec::new(), data_planes: tables::DataPlanes::default(), role_grants: tables::RoleGrants::default(), + user_grants: tables::UserGrants::default(), tasks: Vec::new(), + tasks_idx_name: Vec::new(), refresh_tx: Some(next_tx), }, next_rx, @@ -193,6 +224,20 @@ async fn try_fetch(pg_pool: &sqlx::PgPool) -> anyhow::Result { .await .context("failed to fetch role_grants")?; + let user_grants = sqlx::query_as!( + tables::UserGrant, + r#" + select + user_id as "user_id: uuid::Uuid", + object_role as "object_role: models::Prefix", + capability as "capability: models::Capability" + from user_grants + "#, + ) + .fetch_all(pg_pool) + .await + .context("failed to fetch role_grants")?; + let mut tasks = sqlx::query_as!( SnapshotTask, r#" @@ -211,6 +256,7 @@ async fn try_fetch(pg_pool: &sqlx::PgPool) -> anyhow::Result { let data_planes = tables::DataPlanes::from_iter(data_planes); let role_grants = tables::RoleGrants::from_iter(role_grants); + let user_grants = tables::UserGrants::from_iter(user_grants); // Shard ID and journal name templates are prefixes which are always // extended with a slash-separated suffix. Avoid inadvertent matches @@ -225,20 +271,33 @@ async fn try_fetch(pg_pool: &sqlx::PgPool) -> anyhow::Result { tasks.sort_by(|t1, t2| t1.shard_template_id.cmp(&t2.shard_template_id)); collections.sort_by(|c1, c2| c1.journal_template_name.cmp(&c2.journal_template_name)); + let mut collections_idx_name = Vec::from_iter(0..collections.len()); + collections_idx_name.sort_by(|i1, i2| { + collections[*i1] + .collection_name + .cmp(&collections[*i2].collection_name) + }); + let mut tasks_idx_name = Vec::from_iter(0..tasks.len()); + tasks_idx_name.sort_by(|i1, i2| tasks[*i1].task_name.cmp(&tasks[*i2].task_name)); + tracing::info!( collections = collections.len(), data_planes = data_planes.len(), role_grants = role_grants.len(), tasks = tasks.len(), + user_grants = user_grants.len(), "fetched authorization snapshot", ); Ok(Snapshot { taken, collections, + collections_idx_name, data_planes, role_grants, + user_grants, tasks, + tasks_idx_name, refresh_tx: None, }) } diff --git a/crates/agent/src/api/update_l2_reporting.rs b/crates/agent/src/api/update_l2_reporting.rs index b2fe5e4993..9b122607ae 100644 --- a/crates/agent/src/api/update_l2_reporting.rs +++ b/crates/agent/src/api/update_l2_reporting.rs @@ -28,7 +28,7 @@ async fn do_update_l2_reporting( id_generator, .. }: &App, - super::Claims { sub: user_id, .. }: super::Claims, + super::ControlClaims { sub: user_id, .. }: super::ControlClaims, Request { default_data_plane, dry_run, @@ -228,7 +228,7 @@ export class Derivation extends Types.IDerivation {"# #[axum::debug_handler] pub async fn update_l2_reporting( axum::extract::State(app): axum::extract::State>, - axum::Extension(claims): axum::Extension, + axum::Extension(claims): axum::Extension, super::Request(request): super::Request, ) -> axum::response::Response { super::wrap(async move { do_update_l2_reporting(&app, claims, request).await }).await diff --git a/crates/agent/src/main.rs b/crates/agent/src/main.rs index 5be54e0a72..874b5bb521 100644 --- a/crates/agent/src/main.rs +++ b/crates/agent/src/main.rs @@ -42,8 +42,12 @@ struct Args { /// The port to listen on for API requests. #[clap(long, default_value = "8080", env = "API_PORT")] api_port: u16, + /// Whether to serve job handlers within this agent instance. #[clap(long = "serve-handlers", env = "SERVE_HANDLERS")] serve_handlers: bool, + /// Origin to allow in CORS contexts. May be specified multiple times. + #[clap(long = "allow-origin")] + allow_origin: Vec, } fn main() -> Result<(), anyhow::Error> { @@ -147,7 +151,8 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> { jwt_secret.into_bytes(), pg_pool.clone(), publisher.clone(), - ); + &args.allow_origin, + )?; let api_server = axum::serve(api_listener, api_router).with_graceful_shutdown(shutdown.clone()); let api_server = async move { anyhow::Result::Ok(api_server.await?) }; diff --git a/crates/models/Cargo.toml b/crates/models/Cargo.toml index f71c681e1e..f2bc214d63 100644 --- a/crates/models/Cargo.toml +++ b/crates/models/Cargo.toml @@ -23,6 +23,7 @@ superslice = { workspace = true } time = { workspace = true } unicode-normalization = { workspace = true } url = { workspace = true } +uuid = { workspace = true } validator = { workspace = true } [dev-dependencies] diff --git a/crates/models/src/authorizations.rs b/crates/models/src/authorizations.rs new file mode 100644 index 0000000000..53bf2d66d5 --- /dev/null +++ b/crates/models/src/authorizations.rs @@ -0,0 +1,140 @@ +use validator::Validate; + +/// ControlClaims are claims encoded within control-plane access tokens. +#[derive(Debug, Clone, serde::Deserialize)] +pub struct ControlClaims { + // Note that many more fields, such as additional user metadata, + // are available if we choose to parse them. + pub sub: uuid::Uuid, + pub email: Option, + pub iat: u64, + pub exp: u64, +} + +// Data-plane claims are represented by proto_gazette::Claims, +// which is not re-exported by this crate. + +/// TaskAuthorizationRequest is sent by data-plane reactors to request +/// an authorization to a collection which is sourced or produced. +#[derive(Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskAuthorizationRequest { + /// # JWT token to be authorized and signed. + /// JWT is signed by the requesting data-plane for authorization of a + /// task to a collection. + pub token: String, +} + +/// TaskAuthorization is an authorization granted to a task for the purpose of +/// interacting with collection journals which it sources or produces. +#[derive(Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct TaskAuthorization { + /// # JWT token which has been authorized for use. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub token: String, + /// # Address of Gazette brokers for the issued token. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub broker_address: String, + /// # Number of milliseconds to wait before retrying the request. + /// Non-zero if and only if token is not set. + pub retry_millis: u64, +} + +/// UserCollectionAuthorizationRequest requests an authorization to interact +/// with a collection within its data-plane on behalf of a user. +/// It must be accompanied by a control-plane Authorization token. +#[derive( + Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema, validator::Validate, +)] +#[serde(rename_all = "camelCase")] +pub struct UserCollectionAuthorizationRequest { + /// # Collection name to be authorized. + #[validate] + pub collection: crate::Collection, + /// # Unix timestamp, in seconds, at which the operation started. + /// If this is non-zero, it lower-bounds the time of an authorization + /// snapshot required to definitively reject an authorization. + /// + /// Snapshots taken prior to this time point that reject the request + /// will return a Response asking for the operation to be retried. + /// + /// If zero, the request will block server-side until it can be + /// definitively rejected. + #[serde(default)] + pub started_unix: u64, +} + +/// UserCollectionAuthorization is an authorization granted to a user for the +/// purpose of interacting with collection journals within its data-plane. +#[derive(Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct UserCollectionAuthorization { + /// # Address of Gazette brokers for the issued token. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub broker_address: String, + /// # JWT token which has been authorized for use with brokers. + /// The token is capable of LIST and READ for journals + /// of the requested collection. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub broker_token: String, + /// # Prefix of collection Journal names. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub journal_name_prefix: String, + /// # Number of milliseconds to wait before retrying the request. + /// Non-zero if and only if other fields are not set. + #[serde(default)] + pub retry_millis: u64, +} + +#[derive( + Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema, validator::Validate, +)] +#[serde(rename_all = "camelCase")] +pub struct UserTaskAuthorizationRequest { + /// # Task name to be authorized. + #[validate] + pub task: crate::Name, + /// # Unix timestamp, in seconds, at which the operation started. + /// If this is non-zero, it lower-bounds the time of an authorization + /// snapshot required to definitively reject an authorization. + /// + /// Snapshots taken prior to this time point that reject the request + /// will return a Response asking for the operation to be retried. + /// + /// If zero, the request will block server-side until it can be + /// definitively rejected. + #[serde(default)] + pub started_unix: u64, +} + +#[derive(Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct UserTaskAuthorization { + /// # Address of Gazette brokers for the issued token. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub broker_address: String, + /// # JWT token which has been authorized for use with brokers. + /// The token is capable of LIST and READ of task ops journals. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub broker_token: String, + /// # Name of the journal holding task logs. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub ops_logs_journal: String, + /// # Name of the journal holding task stats. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub ops_stats_journal: String, + /// # Address of Reactors for the issued token. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub reactor_address: String, + /// # JWT token which has been authorized for use with reactors. + /// The token is capable of LIST, READ, and NETWORK_PROXY of task shards. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub reactor_token: String, + /// # Number of milliseconds to wait before retrying the request. + /// Non-zero if and only if token is not set. + pub retry_millis: u64, + /// # Prefix of task Shard IDs. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub shard_id_prefix: String, +} diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index 3ebf5a3984..715970dfd5 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -1,5 +1,6 @@ use std::collections::BTreeSet; +pub mod authorizations; mod captures; mod catalogs; pub mod collate;