Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first steps towards data-plane-gateway deprecation #1628

Merged
merged 8 commits into from
Sep 20, 2024
39 changes: 7 additions & 32 deletions crates/flowctl/src/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ mod roles;

use anyhow::Context;

use crate::controlplane;

#[derive(Debug, clap::Args)]
#[clap(rename_all = "kebab-case")]
pub struct Auth {
Expand Down Expand Up @@ -47,21 +45,6 @@ pub enum Command {
/// Unlike 'read' or 'write', the subject of an 'admin' grant also inherits
/// capabilities granted to the object role from still-other roles.
Roles(roles::Roles),

/// Fetches and prints an auth token that can be used to access a Flow data plane.
///
/// The returned token can be used to access the Flow data plane with 3rd party tools.
/// For example, you can use curl to access a private port of a running task by running:
/// ```ignore
/// curl -H "Authorization: Bearer $(flowctl auth data-plane-access-token --prefix myTenant/)" https://myPort.myHost.data-plane.example/
/// ```
DataPlaneAccessToken(DataPlaneAccessToken),
}

#[derive(Debug, clap::Args)]
pub struct DataPlaneAccessToken {
#[clap(long, required = true)]
prefix: Vec<String>,
}

#[derive(Debug, clap::Args)]
Expand All @@ -76,20 +59,23 @@ impl Auth {
match &self.cmd {
Command::Login => do_login(ctx).await,
Command::Token(Token { token }) => {
controlplane::configure_new_access_token(ctx, token.clone()).await?;
ctx.config.user_access_token = Some(token.clone());
println!("Configured access token.");
Ok(())
}
Command::Roles(roles) => roles.run(ctx).await,
Command::DataPlaneAccessToken(args) => do_data_plane_access_token(ctx, args).await,
}
}
}

async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> {
use crossterm::tty::IsTty;

let url = ctx.config().get_dashboard_url("/admin/api")?.to_string();
let url = ctx
.config
.get_dashboard_url()
.join("/admin/api")?
.to_string();

println!("\nOpening browser to: {url}");
if let Err(_) = open::that(&url) {
Expand Down Expand Up @@ -118,7 +104,7 @@ async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> {

// Copied credentials will often accidentally contain extra whitespace characters.
let token = token.trim().to_string();
controlplane::configure_new_access_token(ctx, token).await?;
ctx.config.user_access_token = Some(token);
println!("\nConfigured access token.");
Ok(())
} else {
Expand All @@ -131,14 +117,3 @@ async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> {
);
}
}

async fn do_data_plane_access_token(
ctx: &mut crate::CliContext,
args: &DataPlaneAccessToken,
) -> anyhow::Result<()> {
let client = ctx.controlplane_client().await?;
let access =
crate::dataplane::fetch_data_plane_access_token(client, args.prefix.clone()).await?;
println!("{}", access.auth_token);
Ok(())
}
15 changes: 5 additions & 10 deletions crates/flowctl/src/auth/roles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ pub async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> {
}
}
let rows: Vec<Row> = api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("combined_grants_ext")
.select(
vec![
Expand Down Expand Up @@ -177,8 +176,7 @@ pub async fn do_grant(
// Upsert user grants to `user_grants` and role grants to `role_grants`.
let rows: Vec<GrantRevokeRow> = if let Some(subject_user_id) = subject_user_id {
api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("user_grants")
.select(grant_revoke_columns())
.upsert(
Expand All @@ -195,8 +193,7 @@ pub async fn do_grant(
.await?
} else if let Some(subject_role) = subject_role {
api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("role_grants")
.select(grant_revoke_columns())
.upsert(
Expand Down Expand Up @@ -231,8 +228,7 @@ pub async fn do_revoke(
// Revoke user grants from `user_grants` and role grants from `role_grants`.
let rows: Vec<GrantRevokeRow> = if let Some(subject_user_id) = subject_user_id {
api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("user_grants")
.select(grant_revoke_columns())
.eq("user_id", subject_user_id.to_string())
Expand All @@ -242,8 +238,7 @@ pub async fn do_revoke(
.await?
} else if let Some(subject_role) = subject_role {
api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("role_grants")
.select(grant_revoke_columns())
.eq("subject_role", subject_role)
Expand Down
10 changes: 4 additions & 6 deletions crates/flowctl/src/catalog/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ pub async fn do_delete(
type_selector: type_selector.clone(),
};

let client = ctx.controlplane_client().await?;
let specs = catalog::fetch_live_specs::<catalog::LiveSpecRow>(
client.clone(),
&ctx.client,
&list_args,
vec![
"id",
Expand All @@ -98,7 +97,7 @@ pub async fn do_delete(
anyhow::bail!("delete operation cancelled");
}

let draft = draft::create_draft(client.clone())
let draft = draft::create_draft(&ctx.client)
.await
.context("failed to create draft")?;
println!(
Expand All @@ -121,8 +120,7 @@ pub async fn do_delete(
.collect::<Vec<DraftSpec>>();

api_exec::<Vec<serde_json::Value>>(
ctx.controlplane_client()
.await?
ctx.client
.from("draft_specs")
//.select("catalog_name,spec_type")
.upsert(serde_json::to_string(&draft_specs).unwrap())
Expand All @@ -131,7 +129,7 @@ pub async fn do_delete(
.await?;
tracing::debug!("added deletions to draft");

draft::publish(client.clone(), "", draft.id, false).await?;
draft::publish(&ctx.client, "", draft.id, false).await?;

// extra newline before, since `publish` will output a bunch of logs
println!("\nsuccessfully deleted {} spec(s)", draft_specs.len());
Expand Down
23 changes: 9 additions & 14 deletions crates/flowctl/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod pull_specs;
mod test;

use crate::{
api_exec, api_exec_paginated, controlplane,
api_exec, api_exec_paginated,
output::{to_table_row, CliOutput, JsonCell},
};
use anyhow::Context;
Expand Down Expand Up @@ -226,7 +226,7 @@ impl Catalog {
/// # Panics
/// If the name_selector `name` and `prefix` are both non-empty.
pub async fn fetch_live_specs<T>(
cp_client: controlplane::Client,
client: &crate::Client,
list: &List,
columns: Vec<&'static str>,
) -> anyhow::Result<Vec<T>>
Expand All @@ -242,7 +242,7 @@ where
panic!("cannot specify both 'name' and 'prefix' for filtering live specs");
}

let builder = cp_client.from("live_specs_ext").select(columns.join(","));
let builder = client.from("live_specs_ext").select(columns.join(","));
let builder = list.type_selector.add_spec_type_filters(builder);

// Drive the actual request(s) based on the name selector, since the arguments there may
Expand Down Expand Up @@ -448,8 +448,7 @@ async fn do_list(ctx: &mut crate::CliContext, list_args: &List) -> anyhow::Resul
columns.push("reads_from");
columns.push("writes_to");
}
let client = ctx.controlplane_client().await?;
let rows = fetch_live_specs::<LiveSpecRow>(client, list_args, columns).await?;
let rows = fetch_live_specs::<LiveSpecRow>(&ctx.client, list_args, columns).await?;

ctx.write_all(rows, list_args.flows)
}
Expand Down Expand Up @@ -499,8 +498,7 @@ async fn do_history(ctx: &mut crate::CliContext, History { name }: &History) ->
}
}
let rows: Vec<Row> = api_exec_paginated(
ctx.controlplane_client()
.await?
ctx.client
.from("publication_specs_ext")
.like("catalog_name", format!("{name}%"))
.select(
Expand Down Expand Up @@ -531,7 +529,7 @@ async fn do_draft(
publication_id,
}: &Draft,
) -> anyhow::Result<()> {
let draft_id = ctx.config().cur_draft()?;
let draft_id = ctx.config.selected_draft()?;

#[derive(Deserialize)]
struct Row {
Expand All @@ -550,8 +548,7 @@ async fn do_draft(
spec_type,
} = if let Some(publication_id) = publication_id {
api_exec(
ctx.controlplane_client()
.await?
ctx.client
.from("publication_specs_ext")
.eq("catalog_name", name)
.eq("pub_id", publication_id.to_string())
Expand All @@ -561,8 +558,7 @@ async fn do_draft(
.await?
} else {
api_exec(
ctx.controlplane_client()
.await?
ctx.client
.from("live_specs")
.eq("catalog_name", name)
.not("is", "spec_type", "null")
Expand Down Expand Up @@ -596,8 +592,7 @@ async fn do_draft(
tracing::debug!(?draft_spec, "inserting draft");

let rows: Vec<SpecSummaryItem> = api_exec(
ctx.controlplane_client()
.await?
ctx.client
.from("draft_specs")
.select("catalog_name,spec_type")
.upsert(serde_json::to_string(&draft_spec).unwrap())
Expand Down
24 changes: 11 additions & 13 deletions crates/flowctl/src/catalog/publish.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{catalog::SpecSummaryItem, controlplane, draft, local_specs, CliContext};
use crate::{catalog::SpecSummaryItem, draft, local_specs, CliContext};
use anyhow::Context;

#[derive(Debug, clap::Args)]
Expand All @@ -24,19 +24,17 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result<
// in common error scenarios. For example, we don't create the draft until after bundling, because
// then we'd have to clean up the empty draft if the bundling fails. The very first thing is to create the client,
// since that can fail due to missing/expired credentials.
let client = ctx.controlplane_client().await?;

anyhow::ensure!(args.auto_approve || std::io::stdin().is_tty(), "The publish command must be run interactively unless the `--auto-approve` flag is provided");

let (draft_catalog, _validations) =
local_specs::load_and_validate(client.clone(), &args.source).await?;
local_specs::load_and_validate(&ctx.client, &args.source).await?;

let draft = draft::create_draft(client.clone()).await?;
let draft = draft::create_draft(&ctx.client).await?;
println!("Created draft: {}", &draft.id);
tracing::info!(draft_id = %draft.id, "created draft");
draft::upsert_draft_specs(client.clone(), draft.id, &draft_catalog).await?;
draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?;

let removed = draft::remove_unchanged(&client, draft.id).await?;
let removed = draft::remove_unchanged(&ctx.client, draft.id).await?;
if !removed.is_empty() {
println!("The following specs are identical to the currently published specs, and have been pruned from the draft:");
for name in removed.iter() {
Expand All @@ -50,7 +48,7 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result<

if summary.is_empty() {
println!("No specs would be changed by this publication, nothing to publish.");
try_delete_draft(client, draft.id).await;
try_delete_draft(&ctx.client, draft.id).await;
return Ok(());
}

Expand All @@ -59,17 +57,17 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result<

if !(args.auto_approve || prompt_to_continue().await) {
println!("\nCancelling");
try_delete_draft(client.clone(), draft.id).await;
try_delete_draft(&ctx.client, draft.id).await;
anyhow::bail!("publish cancelled");
}
println!("Proceeding to publish...");

let publish_result =
draft::publish(client.clone(), &args.default_data_plane, draft.id, false).await;
draft::publish(&ctx.client, &args.default_data_plane, draft.id, false).await;
// The draft will have been deleted automatically if the publish was successful.
if let Err(err) = publish_result.as_ref() {
tracing::error!(draft_id = %draft.id, error = %err, "publication error");
try_delete_draft(client, draft.id).await;
try_delete_draft(&ctx.client, draft.id).await;
}
publish_result.context("Publish failed")?;
println!("\nPublish successful");
Expand All @@ -90,8 +88,8 @@ async fn prompt_to_continue() -> bool {
}
}

async fn try_delete_draft(client: controlplane::Client, draft_id: models::Id) {
if let Err(del_err) = draft::delete_draft(client.clone(), draft_id).await {
async fn try_delete_draft(client: &crate::Client, draft_id: models::Id) {
if let Err(del_err) = draft::delete_draft(client, draft_id).await {
tracing::error!(draft_id = %draft_id, error = %del_err, "failed to delete draft");
}
}
5 changes: 2 additions & 3 deletions crates/flowctl/src/catalog/pull_specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ pub struct PullSpecs {
}

pub async fn do_pull_specs(ctx: &mut CliContext, args: &PullSpecs) -> anyhow::Result<()> {
let client = ctx.controlplane_client().await?;
// Retrieve identified live specifications.
let live_specs = fetch_live_specs::<LiveSpecRow>(
client.clone(),
&ctx.client,
&List {
flows: false,
name_selector: args.name_selector.clone(),
Expand Down Expand Up @@ -58,7 +57,7 @@ pub async fn do_pull_specs(ctx: &mut CliContext, args: &PullSpecs) -> anyhow::Re
let sources = local_specs::indirect_and_write_resources(sources)?;

println!("Wrote {count} specifications under {target}.");
let () = local_specs::generate_files(client, sources).await?;
let () = local_specs::generate_files(&ctx.client, sources).await?;

Ok(())
}
12 changes: 5 additions & 7 deletions crates/flowctl/src/catalog/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,22 @@ pub struct TestArgs {
/// and discoverable to users. There's also no need for any confirmation steps, since we're not
/// actually modifying the published specs.
pub async fn do_test(ctx: &mut CliContext, args: &TestArgs) -> anyhow::Result<()> {
let client = ctx.controlplane_client().await?;

let (draft_catalog, _validations) =
local_specs::load_and_validate(client.clone(), &args.source).await?;
local_specs::load_and_validate(&ctx.client, &args.source).await?;

let draft = draft::create_draft(client.clone()).await?;
let draft = draft::create_draft(&ctx.client).await?;
println!("Created draft: {}", &draft.id);
tracing::info!(draft_id = %draft.id, "created draft");
let spec_rows = draft::upsert_draft_specs(client.clone(), draft.id, &draft_catalog).await?;
let spec_rows = draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?;
println!("Running tests for catalog items:");
ctx.write_all(spec_rows, ())?;
println!("Starting tests...");

// Technically, test is just a publish with the dry-run flag set to true.
let publish_result =
draft::publish(client.clone(), &args.default_data_plane, draft.id, true).await;
draft::publish(&ctx.client, &args.default_data_plane, draft.id, true).await;

if let Err(del_err) = draft::delete_draft(client.clone(), draft.id).await {
if let Err(del_err) = draft::delete_draft(&ctx.client, draft.id).await {
tracing::error!(draft_id = %draft.id, error = %del_err, "failed to delete draft");
}
publish_result.context("Tests failed")?;
Expand Down
Loading