Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first steps towards data-plane-gateway deprecation #1628

Merged
merged 8 commits into from
Sep 20, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ tokio-util = { version = "0.7", features = ["io", "compat"] }
tonic = { version = "0.12", features = ["tls", "tls-roots"] }
hyper-util = "0.1"
tower = { version = "0.5", features = ["util"] }
tower-http = { version = "0.5", features = ["trace"] }
tower-http = { version = "0.5", features = ["cors", "trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = [
"time",
Expand Down
9 changes: 5 additions & 4 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ os.putenv("DATABASE_URL", DATABASE_URL)
os.putenv("RUST_LOG", "info")
os.putenv("DOCKER_DEFAULT_PLATFORM", "linux/amd64")

# Secret used to sign Authorizations within a local data plane, as base64("secret").
# Secret used to sign Authorizations within a local data plane, as base64("supersecret").
# Also allow requests without an Authorization (to not break data-plane-gateway just yet).
AUTH_KEYS="c2VjcmV0,AA=="
AUTH_KEYS="c3VwZXJzZWNyZXQ=,AA=="
os.putenv("CONSUMER_AUTH_KEYS", AUTH_KEYS)
os.putenv("BROKER_AUTH_KEYS", AUTH_KEYS)

Expand Down Expand Up @@ -83,8 +83,9 @@ local_resource('reactor', serve_cmd='%s/flow/.build/package/bin/flowctl-go serve
local_resource('agent', serve_cmd='%s/flow/.build/package/bin/agent \
--connector-network supabase_network_flow \
--allow-local \
--builds-root %s \
--allow-origin http://localhost:3000 \
--api-port 8675 \
--builds-root %s \
--serve-handlers \
--bin-dir %s/flow/.build/package/bin' % (REPO_BASE, FLOW_BUILDS_ROOT, REPO_BASE),
resource_deps=['reactor', 'gazette']
Expand All @@ -101,7 +102,7 @@ local_resource('create-data-plane-local-cluster',
"manual": {\
"brokerAddress": "http://localhost:8080",\
"reactorAddress": "http://localhost:9000",\
"hmacKeys": ["c2VjcmV0"]\
"hmacKeys": ["c3VwZXJzZWNyZXQ="]\
}\
}\
}\' http://localhost:8675/admin/create-data-plane' % SYSTEM_USER_TOKEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,8 @@ use super::{App, Snapshot};
use anyhow::Context;
use std::sync::Arc;

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Request {
// # JWT token to be authorized and signed.
token: String,
}

#[derive(Default, Debug, serde::Serialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Response {
// # JWT token which has been authorized for use.
token: String,
// # Address of Gazette brokers for the issued token.
broker_address: String,
// # Number of milliseconds to wait before retrying the request.
// Non-zero if and only if token is not set.
retry_millis: u64,
}
type Request = models::authorizations::TaskAuthorizationRequest;
type Response = models::authorizations::TaskAuthorization;

#[axum::debug_handler]
pub async fn authorize_task(
Expand Down Expand Up @@ -108,12 +92,11 @@ async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Re

fn evaluate_authorization(
Snapshot {
taken: _,
collections,
data_planes,
role_grants,
tasks,
refresh_tx: _,
..
}: &Snapshot,
shard_id: &str,
shard_data_plane_fqdn: &str,
Expand Down Expand Up @@ -188,22 +171,12 @@ fn evaluate_authorization(
);
};

let ops_kind = match task.spec_type {
models::CatalogType::Capture => "capture",
models::CatalogType::Collection => "derivation",
models::CatalogType::Materialization => "materialization",
models::CatalogType::Test => "test",
};

// As a special case outside of the RBAC system, allow a task to write
// to its designated partition within its ops collections.
if required_role == models::Capability::Write
&& (collection.collection_name == task_data_plane.ops_logs_name
|| collection.collection_name == task_data_plane.ops_stats_name)
&& journal_name_or_prefix.ends_with(&format!(
"/kind={ops_kind}/name={}/pivot=00",
labels::percent_encoding(&task.task_name).to_string(),
))
&& journal_name_or_prefix.ends_with(&super::ops_suffix(task))
{
// Authorized write into designated ops partition.
} else if tables::RoleGrant::is_authorized(
Expand All @@ -214,18 +187,14 @@ fn evaluate_authorization(
) {
// Authorized access through RBAC.
} else {
let ops_suffix = format!(
"/kind={ops_kind}/name={}/pivot=00",
labels::percent_encoding(&task.task_name).to_string(),
);
tracing::warn!(
%task.spec_type,
%shard_id,
%journal_name_or_prefix,
?required_role,
ops_logs=%task_data_plane.ops_logs_name,
ops_stats=%task_data_plane.ops_stats_name,
%ops_suffix,
ops_suffix=%super::ops_suffix(task),
"task authorization rejection context"
);
anyhow::bail!(
Expand All @@ -244,6 +213,9 @@ fn evaluate_authorization(
Ok((
encoding_key,
collection_data_plane.data_plane_fqdn.clone(),
collection_data_plane.broker_address.clone(),
super::maybe_rewrite_address(
task.data_plane_id != collection.data_plane_id,
&collection_data_plane.broker_address,
),
))
}
122 changes: 122 additions & 0 deletions crates/agent/src/api/authorize_user_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use super::{App, Snapshot};
use anyhow::Context;
use std::sync::Arc;

type Request = models::authorizations::UserCollectionAuthorizationRequest;
type Response = models::authorizations::UserCollectionAuthorization;

#[tracing::instrument(
skip(snapshot),
err(level = tracing::Level::WARN),
)]
async fn do_authorize_user_collection(
App { snapshot, .. }: &App,
super::ControlClaims {
sub: user_id,
email,
..
}: super::ControlClaims,
Request {
collection: collection_name,
started_unix,
}: Request,
) -> anyhow::Result<Response> {
let (has_started, started_unix) = if started_unix == 0 {
(false, jsonwebtoken::get_current_timestamp())
} else {
(true, started_unix)
};

loop {
match Snapshot::evaluate(snapshot, started_unix, |snapshot: &Snapshot| {
evaluate_authorization(snapshot, user_id, email.as_ref(), &collection_name)
}) {
Ok(response) => return Ok(response),
Err(Ok(retry_millis)) if has_started => {
return Ok(Response {
retry_millis,
..Default::default()
})
}
Err(Ok(retry_millis)) => {
() = tokio::time::sleep(std::time::Duration::from_millis(retry_millis)).await;
}
Err(Err(err)) => return Err(err),
}
}
}

fn evaluate_authorization(
snapshot: &Snapshot,
user_id: uuid::Uuid,
user_email: Option<&String>,
collection_name: &models::Collection,
) -> anyhow::Result<Response> {
if !tables::UserGrant::is_authorized(
&snapshot.role_grants,
&snapshot.user_grants,
user_id,
collection_name,
models::Capability::Read,
) {
anyhow::bail!(
"{} is not authorized to {collection_name}",
user_email.map(String::as_str).unwrap_or("user")
);
}

let Some(collection) = snapshot.collection_by_catalog_name(collection_name) else {
anyhow::bail!("collection {collection_name} is not known")
};
let Some(data_plane) = snapshot.data_planes.get_by_key(&collection.data_plane_id) else {
anyhow::bail!("couldn't resolve collection {collection_name} data-plane")
};
let Some(encoding_key) = data_plane.hmac_keys.first() else {
anyhow::bail!(
"collection data-plane {} has no configured HMAC keys",
data_plane.data_plane_name
);
};
let encoding_key = jsonwebtoken::EncodingKey::from_base64_secret(&encoding_key)?;

let iat = jsonwebtoken::get_current_timestamp();
let exp = iat + super::exp_seconds();
let header = jsonwebtoken::Header::default();

let claims = super::DataClaims {
inner: proto_gazette::Claims {
cap: proto_gazette::capability::LIST | proto_gazette::capability::READ,
exp,
iat,
iss: data_plane.data_plane_fqdn.clone(),
sub: user_id.to_string(),
sel: proto_gazette::broker::LabelSelector {
include: Some(labels::build_set([
("name:prefix", collection.journal_template_name.as_str()),
(labels::COLLECTION, collection_name.as_str()),
])),
exclude: None,
},
},
// TODO(johnny): Temporary support for data-plane-gateway.
prefixes: vec![collection.journal_template_name.clone()],
};
let token = jsonwebtoken::encode(&header, &claims, &encoding_key)
.context("failed to encode authorized JWT")?;

Ok(Response {
broker_address: super::maybe_rewrite_address(true, &data_plane.broker_address),
broker_token: token,
journal_name_prefix: collection.journal_template_name.clone(),
retry_millis: 0,
})
}

#[axum::debug_handler]
pub async fn authorize_user_collection(
axum::extract::State(app): axum::extract::State<Arc<App>>,
axum::Extension(claims): axum::Extension<super::ControlClaims>,
super::Request(request): super::Request<Request>,
) -> axum::response::Response {
super::wrap(async move { do_authorize_user_collection(&app, claims, request).await }).await
}
Loading