Skip to content

Commit

Permalink
agent: add /authorize/user/task and /authorize/user/collection ro…
Browse files Browse the repository at this point in the history
…utes

`/authorize/user/task` enables UI shard listings/status and retrieval
of task logs, as well as access to private connector networking.

`/authorize/user/collection` enables UI journal listing and data preview.

Both offer temporary support for the current data-plane-gateway,
which implements legacy authorization checks using claimed prefixes.

Also introduce an address rewrite mechanism for mapping an internal
data-plane legacy service address into the data-plane-gateway address in
external call contexts.

Issue #1627
  • Loading branch information
jgraettinger committed Sep 20, 2024
1 parent 52fac3c commit a161e70
Show file tree
Hide file tree
Showing 14 changed files with 598 additions and 61 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ tokio-util = { version = "0.7", features = ["io", "compat"] }
tonic = { version = "0.12", features = ["tls", "tls-roots"] }
hyper-util = "0.1"
tower = { version = "0.5", features = ["util"] }
tower-http = { version = "0.5", features = ["trace"] }
tower-http = { version = "0.5", features = ["cors", "trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = [
"time",
Expand Down
9 changes: 5 additions & 4 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ os.putenv("DATABASE_URL", DATABASE_URL)
os.putenv("RUST_LOG", "info")
os.putenv("DOCKER_DEFAULT_PLATFORM", "linux/amd64")

# Secret used to sign Authorizations within a local data plane, as base64("secret").
# Secret used to sign Authorizations within a local data plane, as base64("supersecret").
# Also allow requests without an Authorization (to not break data-plane-gateway just yet).
AUTH_KEYS="c2VjcmV0,AA=="
AUTH_KEYS="c3VwZXJzZWNyZXQ=,AA=="
os.putenv("CONSUMER_AUTH_KEYS", AUTH_KEYS)
os.putenv("BROKER_AUTH_KEYS", AUTH_KEYS)

Expand Down Expand Up @@ -83,8 +83,9 @@ local_resource('reactor', serve_cmd='%s/flow/.build/package/bin/flowctl-go serve
local_resource('agent', serve_cmd='%s/flow/.build/package/bin/agent \
--connector-network supabase_network_flow \
--allow-local \
--builds-root %s \
--allow-origin http://localhost:3000 \
--api-port 8675 \
--builds-root %s \
--serve-handlers \
--bin-dir %s/flow/.build/package/bin' % (REPO_BASE, FLOW_BUILDS_ROOT, REPO_BASE),
resource_deps=['reactor', 'gazette']
Expand All @@ -101,7 +102,7 @@ local_resource('create-data-plane-local-cluster',
"manual": {\
"brokerAddress": "http://localhost:8080",\
"reactorAddress": "http://localhost:9000",\
"hmacKeys": ["c2VjcmV0"]\
"hmacKeys": ["c3VwZXJzZWNyZXQ="]\
}\
}\
}\' http://localhost:8675/admin/create-data-plane' % SYSTEM_USER_TOKEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,8 @@ use super::{App, Snapshot};
use anyhow::Context;
use std::sync::Arc;

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Request {
// # JWT token to be authorized and signed.
token: String,
}

#[derive(Default, Debug, serde::Serialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Response {
// # JWT token which has been authorized for use.
token: String,
// # Address of Gazette brokers for the issued token.
broker_address: String,
// # Number of milliseconds to wait before retrying the request.
// Non-zero if and only if token is not set.
retry_millis: u64,
}
type Request = models::authorizations::TaskAuthorizationRequest;
type Response = models::authorizations::TaskAuthorization;

#[axum::debug_handler]
pub async fn authorize_task(
Expand Down Expand Up @@ -108,12 +92,11 @@ async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Re

fn evaluate_authorization(
Snapshot {
taken: _,
collections,
data_planes,
role_grants,
tasks,
refresh_tx: _,
..
}: &Snapshot,
shard_id: &str,
shard_data_plane_fqdn: &str,
Expand Down Expand Up @@ -188,22 +171,12 @@ fn evaluate_authorization(
);
};

let ops_kind = match task.spec_type {
models::CatalogType::Capture => "capture",
models::CatalogType::Collection => "derivation",
models::CatalogType::Materialization => "materialization",
models::CatalogType::Test => "test",
};

// As a special case outside of the RBAC system, allow a task to write
// to its designated partition within its ops collections.
if required_role == models::Capability::Write
&& (collection.collection_name == task_data_plane.ops_logs_name
|| collection.collection_name == task_data_plane.ops_stats_name)
&& journal_name_or_prefix.ends_with(&format!(
"/kind={ops_kind}/name={}/pivot=00",
labels::percent_encoding(&task.task_name).to_string(),
))
&& journal_name_or_prefix.ends_with(&super::ops_suffix(task))
{
// Authorized write into designated ops partition.
} else if tables::RoleGrant::is_authorized(
Expand All @@ -214,18 +187,14 @@ fn evaluate_authorization(
) {
// Authorized access through RBAC.
} else {
let ops_suffix = format!(
"/kind={ops_kind}/name={}/pivot=00",
labels::percent_encoding(&task.task_name).to_string(),
);
tracing::warn!(
%task.spec_type,
%shard_id,
%journal_name_or_prefix,
?required_role,
ops_logs=%task_data_plane.ops_logs_name,
ops_stats=%task_data_plane.ops_stats_name,
%ops_suffix,
ops_suffix=%super::ops_suffix(task),
"task authorization rejection context"
);
anyhow::bail!(
Expand All @@ -244,6 +213,9 @@ fn evaluate_authorization(
Ok((
encoding_key,
collection_data_plane.data_plane_fqdn.clone(),
collection_data_plane.broker_address.clone(),
super::maybe_rewrite_address(
task.data_plane_id != collection.data_plane_id,
&collection_data_plane.broker_address,
),
))
}
122 changes: 122 additions & 0 deletions crates/agent/src/api/authorize_user_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use super::{App, Snapshot};
use anyhow::Context;
use std::sync::Arc;

type Request = models::authorizations::UserCollectionAuthorizationRequest;
type Response = models::authorizations::UserCollectionAuthorization;

#[tracing::instrument(
skip(snapshot),
err(level = tracing::Level::WARN),
)]
async fn do_authorize_user_collection(
App { snapshot, .. }: &App,
super::ControlClaims {
sub: user_id,
email,
..
}: super::ControlClaims,
Request {
collection: collection_name,
started_unix,
}: Request,
) -> anyhow::Result<Response> {
let (has_started, started_unix) = if started_unix == 0 {
(false, jsonwebtoken::get_current_timestamp())
} else {
(true, started_unix)
};

loop {
match Snapshot::evaluate(snapshot, started_unix, |snapshot: &Snapshot| {
evaluate_authorization(snapshot, user_id, email.as_ref(), &collection_name)
}) {
Ok(response) => return Ok(response),
Err(Ok(retry_millis)) if has_started => {
return Ok(Response {
retry_millis,
..Default::default()
})
}
Err(Ok(retry_millis)) => {
() = tokio::time::sleep(std::time::Duration::from_millis(retry_millis)).await;
}
Err(Err(err)) => return Err(err),
}
}
}

fn evaluate_authorization(
snapshot: &Snapshot,
user_id: uuid::Uuid,
user_email: Option<&String>,
collection_name: &models::Collection,
) -> anyhow::Result<Response> {
if !tables::UserGrant::is_authorized(
&snapshot.role_grants,
&snapshot.user_grants,
user_id,
collection_name,
models::Capability::Read,
) {
anyhow::bail!(
"{} is not authorized to {collection_name}",
user_email.map(String::as_str).unwrap_or("user")
);
}

let Some(collection) = snapshot.collection_by_catalog_name(collection_name) else {
anyhow::bail!("collection {collection_name} is not known")
};
let Some(data_plane) = snapshot.data_planes.get_by_key(&collection.data_plane_id) else {
anyhow::bail!("couldn't resolve collection {collection_name} data-plane")
};
let Some(encoding_key) = data_plane.hmac_keys.first() else {
anyhow::bail!(
"collection data-plane {} has no configured HMAC keys",
data_plane.data_plane_name
);
};
let encoding_key = jsonwebtoken::EncodingKey::from_base64_secret(&encoding_key)?;

let iat = jsonwebtoken::get_current_timestamp();
let exp = iat + super::exp_seconds();
let header = jsonwebtoken::Header::default();

let claims = super::DataClaims {
inner: proto_gazette::Claims {
cap: proto_gazette::capability::LIST | proto_gazette::capability::READ,
exp,
iat,
iss: data_plane.data_plane_fqdn.clone(),
sub: user_id.to_string(),
sel: proto_gazette::broker::LabelSelector {
include: Some(labels::build_set([
("name:prefix", collection.journal_template_name.as_str()),
(labels::COLLECTION, collection_name.as_str()),
])),
exclude: None,
},
},
// TODO(johnny): Temporary support for data-plane-gateway.
prefixes: vec![collection.journal_template_name.clone()],
};
let token = jsonwebtoken::encode(&header, &claims, &encoding_key)
.context("failed to encode authorized JWT")?;

Ok(Response {
broker_address: super::maybe_rewrite_address(true, &data_plane.broker_address),
broker_token: token,
journal_name_prefix: collection.journal_template_name.clone(),
retry_millis: 0,
})
}

#[axum::debug_handler]
pub async fn authorize_user_collection(
axum::extract::State(app): axum::extract::State<Arc<App>>,
axum::Extension(claims): axum::Extension<super::ControlClaims>,
super::Request(request): super::Request<Request>,
) -> axum::response::Response {
super::wrap(async move { do_authorize_user_collection(&app, claims, request).await }).await
}
Loading

0 comments on commit a161e70

Please sign in to comment.