From e91224482460c85ec91362bd3341351a45d402b7 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Tue, 17 Sep 2024 15:17:02 -0500 Subject: [PATCH] flowctl: refactor config and support new collection & task authorizations This change introduces the agent API to `flowctl`, which is the proverbial straw which motivated a deeper refactor of flowctl configuration. As a headline feature, `flowctl` supports the new task and collection authorization APIs and uses them in support of serving existing subcommands for reading collections, previews, and read ops logs or stats. Clean up management of access and refresh tokens by obtaining access tokens or generating refresh tokens prior to calling into a particular sub-command. Preserve the ability to run `flowctl` in an unauthenticated mode. Make it easier to use `flowctl` against a local stack by introducing alternative defaults when running under a "local" profile. Also fix handling of single-use refresh tokens, where we must retain the updated secret after using it to generate a new access token. We could now consider having `flowctl` create single-use refresh tokens rather than multi-use ones, but I didn't want to take that step just yet. Also fix mis-ordering of output when reading journals. Also fix OffsetNotYetAvailable error when reading a journal in non-blocking mode. Issue #1627 --- crates/flowctl/src/auth/mod.rs | 39 +--- crates/flowctl/src/auth/roles.rs | 15 +- crates/flowctl/src/catalog/delete.rs | 10 +- crates/flowctl/src/catalog/mod.rs | 23 +- crates/flowctl/src/catalog/publish.rs | 24 +- crates/flowctl/src/catalog/pull_specs.rs | 5 +- crates/flowctl/src/catalog/test.rs | 12 +- crates/flowctl/src/client.rs | 209 ++++++++++++++++++ crates/flowctl/src/collection/mod.rs | 46 ++-- crates/flowctl/src/collection/read/mod.rs | 73 +++--- crates/flowctl/src/config.rs | 220 +++++++++++-------- crates/flowctl/src/controlplane.rs | 170 -------------- crates/flowctl/src/dataplane.rs | 66 ------ crates/flowctl/src/draft/author.rs | 15 +- crates/flowctl/src/draft/develop.rs | 7 +- crates/flowctl/src/draft/mod.rs | 46 ++-- crates/flowctl/src/generate/mod.rs | 3 +- crates/flowctl/src/lib.rs | 111 +++++++--- crates/flowctl/src/local_specs.rs | 16 +- crates/flowctl/src/ops.rs | 186 ++-------------- crates/flowctl/src/output.rs | 2 +- crates/flowctl/src/poll.rs | 2 +- crates/flowctl/src/preview/journal_reader.rs | 60 +++-- crates/flowctl/src/preview/mod.rs | 44 +++- crates/flowctl/src/raw/mod.rs | 28 +-- crates/flowctl/src/raw/oauth.rs | 18 +- 26 files changed, 671 insertions(+), 779 deletions(-) create mode 100644 crates/flowctl/src/client.rs delete mode 100644 crates/flowctl/src/controlplane.rs delete mode 100644 crates/flowctl/src/dataplane.rs diff --git a/crates/flowctl/src/auth/mod.rs b/crates/flowctl/src/auth/mod.rs index 4f4c9acb38..17ead583a8 100644 --- a/crates/flowctl/src/auth/mod.rs +++ b/crates/flowctl/src/auth/mod.rs @@ -2,8 +2,6 @@ mod roles; use anyhow::Context; -use crate::controlplane; - #[derive(Debug, clap::Args)] #[clap(rename_all = "kebab-case")] pub struct Auth { @@ -47,21 +45,6 @@ pub enum Command { /// Unlike 'read' or 'write', the subject of an 'admin' grant also inherits /// capabilities granted to the object role from still-other roles. Roles(roles::Roles), - - /// Fetches and prints an auth token that can be used to access a Flow data plane. - /// - /// The returned token can be used to access the Flow data plane with 3rd party tools. - /// For example, you can use curl to access a private port of a running task by running: - /// ```ignore - /// curl -H "Authorization: Bearer $(flowctl auth data-plane-access-token --prefix myTenant/)" https://myPort.myHost.data-plane.example/ - /// ``` - DataPlaneAccessToken(DataPlaneAccessToken), -} - -#[derive(Debug, clap::Args)] -pub struct DataPlaneAccessToken { - #[clap(long, required = true)] - prefix: Vec, } #[derive(Debug, clap::Args)] @@ -76,12 +59,11 @@ impl Auth { match &self.cmd { Command::Login => do_login(ctx).await, Command::Token(Token { token }) => { - controlplane::configure_new_access_token(ctx, token.clone()).await?; + ctx.config.user_access_token = Some(token.clone()); println!("Configured access token."); Ok(()) } Command::Roles(roles) => roles.run(ctx).await, - Command::DataPlaneAccessToken(args) => do_data_plane_access_token(ctx, args).await, } } } @@ -89,7 +71,11 @@ impl Auth { async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> { use crossterm::tty::IsTty; - let url = ctx.config().get_dashboard_url("/admin/api")?.to_string(); + let url = ctx + .config + .get_dashboard_url() + .join("/admin/api")? + .to_string(); println!("\nOpening browser to: {url}"); if let Err(_) = open::that(&url) { @@ -118,7 +104,7 @@ async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> { // Copied credentials will often accidentally contain extra whitespace characters. let token = token.trim().to_string(); - controlplane::configure_new_access_token(ctx, token).await?; + ctx.config.user_access_token = Some(token); println!("\nConfigured access token."); Ok(()) } else { @@ -131,14 +117,3 @@ async fn do_login(ctx: &mut crate::CliContext) -> anyhow::Result<()> { ); } } - -async fn do_data_plane_access_token( - ctx: &mut crate::CliContext, - args: &DataPlaneAccessToken, -) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let access = - crate::dataplane::fetch_data_plane_access_token(client, args.prefix.clone()).await?; - println!("{}", access.auth_token); - Ok(()) -} diff --git a/crates/flowctl/src/auth/roles.rs b/crates/flowctl/src/auth/roles.rs index 132da95a8e..41710db471 100644 --- a/crates/flowctl/src/auth/roles.rs +++ b/crates/flowctl/src/auth/roles.rs @@ -138,8 +138,7 @@ pub async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> { } } let rows: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("combined_grants_ext") .select( vec![ @@ -177,8 +176,7 @@ pub async fn do_grant( // Upsert user grants to `user_grants` and role grants to `role_grants`. let rows: Vec = if let Some(subject_user_id) = subject_user_id { api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("user_grants") .select(grant_revoke_columns()) .upsert( @@ -195,8 +193,7 @@ pub async fn do_grant( .await? } else if let Some(subject_role) = subject_role { api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("role_grants") .select(grant_revoke_columns()) .upsert( @@ -231,8 +228,7 @@ pub async fn do_revoke( // Revoke user grants from `user_grants` and role grants from `role_grants`. let rows: Vec = if let Some(subject_user_id) = subject_user_id { api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("user_grants") .select(grant_revoke_columns()) .eq("user_id", subject_user_id.to_string()) @@ -242,8 +238,7 @@ pub async fn do_revoke( .await? } else if let Some(subject_role) = subject_role { api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("role_grants") .select(grant_revoke_columns()) .eq("subject_role", subject_role) diff --git a/crates/flowctl/src/catalog/delete.rs b/crates/flowctl/src/catalog/delete.rs index 6cb84be8fd..e501377632 100644 --- a/crates/flowctl/src/catalog/delete.rs +++ b/crates/flowctl/src/catalog/delete.rs @@ -69,9 +69,8 @@ pub async fn do_delete( type_selector: type_selector.clone(), }; - let client = ctx.controlplane_client().await?; let specs = catalog::fetch_live_specs::( - client.clone(), + &ctx.client, &list_args, vec![ "id", @@ -98,7 +97,7 @@ pub async fn do_delete( anyhow::bail!("delete operation cancelled"); } - let draft = draft::create_draft(client.clone()) + let draft = draft::create_draft(&ctx.client) .await .context("failed to create draft")?; println!( @@ -121,8 +120,7 @@ pub async fn do_delete( .collect::>(); api_exec::>( - ctx.controlplane_client() - .await? + ctx.client .from("draft_specs") //.select("catalog_name,spec_type") .upsert(serde_json::to_string(&draft_specs).unwrap()) @@ -131,7 +129,7 @@ pub async fn do_delete( .await?; tracing::debug!("added deletions to draft"); - draft::publish(client.clone(), "", draft.id, false).await?; + draft::publish(&ctx.client, "", draft.id, false).await?; // extra newline before, since `publish` will output a bunch of logs println!("\nsuccessfully deleted {} spec(s)", draft_specs.len()); diff --git a/crates/flowctl/src/catalog/mod.rs b/crates/flowctl/src/catalog/mod.rs index 2fc3ebd70d..4b27ad6c3c 100644 --- a/crates/flowctl/src/catalog/mod.rs +++ b/crates/flowctl/src/catalog/mod.rs @@ -4,7 +4,7 @@ mod pull_specs; mod test; use crate::{ - api_exec, api_exec_paginated, controlplane, + api_exec, api_exec_paginated, output::{to_table_row, CliOutput, JsonCell}, }; use anyhow::Context; @@ -226,7 +226,7 @@ impl Catalog { /// # Panics /// If the name_selector `name` and `prefix` are both non-empty. pub async fn fetch_live_specs( - cp_client: controlplane::Client, + client: &crate::Client, list: &List, columns: Vec<&'static str>, ) -> anyhow::Result> @@ -242,7 +242,7 @@ where panic!("cannot specify both 'name' and 'prefix' for filtering live specs"); } - let builder = cp_client.from("live_specs_ext").select(columns.join(",")); + let builder = client.from("live_specs_ext").select(columns.join(",")); let builder = list.type_selector.add_spec_type_filters(builder); // Drive the actual request(s) based on the name selector, since the arguments there may @@ -448,8 +448,7 @@ async fn do_list(ctx: &mut crate::CliContext, list_args: &List) -> anyhow::Resul columns.push("reads_from"); columns.push("writes_to"); } - let client = ctx.controlplane_client().await?; - let rows = fetch_live_specs::(client, list_args, columns).await?; + let rows = fetch_live_specs::(&ctx.client, list_args, columns).await?; ctx.write_all(rows, list_args.flows) } @@ -499,8 +498,7 @@ async fn do_history(ctx: &mut crate::CliContext, History { name }: &History) -> } } let rows: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("publication_specs_ext") .like("catalog_name", format!("{name}%")) .select( @@ -531,7 +529,7 @@ async fn do_draft( publication_id, }: &Draft, ) -> anyhow::Result<()> { - let draft_id = ctx.config().cur_draft()?; + let draft_id = ctx.config.selected_draft()?; #[derive(Deserialize)] struct Row { @@ -550,8 +548,7 @@ async fn do_draft( spec_type, } = if let Some(publication_id) = publication_id { api_exec( - ctx.controlplane_client() - .await? + ctx.client .from("publication_specs_ext") .eq("catalog_name", name) .eq("pub_id", publication_id.to_string()) @@ -561,8 +558,7 @@ async fn do_draft( .await? } else { api_exec( - ctx.controlplane_client() - .await? + ctx.client .from("live_specs") .eq("catalog_name", name) .not("is", "spec_type", "null") @@ -596,8 +592,7 @@ async fn do_draft( tracing::debug!(?draft_spec, "inserting draft"); let rows: Vec = api_exec( - ctx.controlplane_client() - .await? + ctx.client .from("draft_specs") .select("catalog_name,spec_type") .upsert(serde_json::to_string(&draft_spec).unwrap()) diff --git a/crates/flowctl/src/catalog/publish.rs b/crates/flowctl/src/catalog/publish.rs index 2e620235e4..ec3b6f9ec7 100644 --- a/crates/flowctl/src/catalog/publish.rs +++ b/crates/flowctl/src/catalog/publish.rs @@ -1,4 +1,4 @@ -use crate::{catalog::SpecSummaryItem, controlplane, draft, local_specs, CliContext}; +use crate::{catalog::SpecSummaryItem, draft, local_specs, CliContext}; use anyhow::Context; #[derive(Debug, clap::Args)] @@ -24,19 +24,17 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result< // in common error scenarios. For example, we don't create the draft until after bundling, because // then we'd have to clean up the empty draft if the bundling fails. The very first thing is to create the client, // since that can fail due to missing/expired credentials. - let client = ctx.controlplane_client().await?; - anyhow::ensure!(args.auto_approve || std::io::stdin().is_tty(), "The publish command must be run interactively unless the `--auto-approve` flag is provided"); let (draft_catalog, _validations) = - local_specs::load_and_validate(client.clone(), &args.source).await?; + local_specs::load_and_validate(&ctx.client, &args.source).await?; - let draft = draft::create_draft(client.clone()).await?; + let draft = draft::create_draft(&ctx.client).await?; println!("Created draft: {}", &draft.id); tracing::info!(draft_id = %draft.id, "created draft"); - draft::upsert_draft_specs(client.clone(), draft.id, &draft_catalog).await?; + draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?; - let removed = draft::remove_unchanged(&client, draft.id).await?; + let removed = draft::remove_unchanged(&ctx.client, draft.id).await?; if !removed.is_empty() { println!("The following specs are identical to the currently published specs, and have been pruned from the draft:"); for name in removed.iter() { @@ -50,7 +48,7 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result< if summary.is_empty() { println!("No specs would be changed by this publication, nothing to publish."); - try_delete_draft(client, draft.id).await; + try_delete_draft(&ctx.client, draft.id).await; return Ok(()); } @@ -59,17 +57,17 @@ pub async fn do_publish(ctx: &mut CliContext, args: &Publish) -> anyhow::Result< if !(args.auto_approve || prompt_to_continue().await) { println!("\nCancelling"); - try_delete_draft(client.clone(), draft.id).await; + try_delete_draft(&ctx.client, draft.id).await; anyhow::bail!("publish cancelled"); } println!("Proceeding to publish..."); let publish_result = - draft::publish(client.clone(), &args.default_data_plane, draft.id, false).await; + draft::publish(&ctx.client, &args.default_data_plane, draft.id, false).await; // The draft will have been deleted automatically if the publish was successful. if let Err(err) = publish_result.as_ref() { tracing::error!(draft_id = %draft.id, error = %err, "publication error"); - try_delete_draft(client, draft.id).await; + try_delete_draft(&ctx.client, draft.id).await; } publish_result.context("Publish failed")?; println!("\nPublish successful"); @@ -90,8 +88,8 @@ async fn prompt_to_continue() -> bool { } } -async fn try_delete_draft(client: controlplane::Client, draft_id: models::Id) { - if let Err(del_err) = draft::delete_draft(client.clone(), draft_id).await { +async fn try_delete_draft(client: &crate::Client, draft_id: models::Id) { + if let Err(del_err) = draft::delete_draft(client, draft_id).await { tracing::error!(draft_id = %draft_id, error = %del_err, "failed to delete draft"); } } diff --git a/crates/flowctl/src/catalog/pull_specs.rs b/crates/flowctl/src/catalog/pull_specs.rs index 45ab0d0936..1edf371893 100644 --- a/crates/flowctl/src/catalog/pull_specs.rs +++ b/crates/flowctl/src/catalog/pull_specs.rs @@ -23,10 +23,9 @@ pub struct PullSpecs { } pub async fn do_pull_specs(ctx: &mut CliContext, args: &PullSpecs) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; // Retrieve identified live specifications. let live_specs = fetch_live_specs::( - client.clone(), + &ctx.client, &List { flows: false, name_selector: args.name_selector.clone(), @@ -58,7 +57,7 @@ pub async fn do_pull_specs(ctx: &mut CliContext, args: &PullSpecs) -> anyhow::Re let sources = local_specs::indirect_and_write_resources(sources)?; println!("Wrote {count} specifications under {target}."); - let () = local_specs::generate_files(client, sources).await?; + let () = local_specs::generate_files(&ctx.client, sources).await?; Ok(()) } diff --git a/crates/flowctl/src/catalog/test.rs b/crates/flowctl/src/catalog/test.rs index 41e9126c89..b5a01f7f53 100644 --- a/crates/flowctl/src/catalog/test.rs +++ b/crates/flowctl/src/catalog/test.rs @@ -16,24 +16,22 @@ pub struct TestArgs { /// and discoverable to users. There's also no need for any confirmation steps, since we're not /// actually modifying the published specs. pub async fn do_test(ctx: &mut CliContext, args: &TestArgs) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let (draft_catalog, _validations) = - local_specs::load_and_validate(client.clone(), &args.source).await?; + local_specs::load_and_validate(&ctx.client, &args.source).await?; - let draft = draft::create_draft(client.clone()).await?; + let draft = draft::create_draft(&ctx.client).await?; println!("Created draft: {}", &draft.id); tracing::info!(draft_id = %draft.id, "created draft"); - let spec_rows = draft::upsert_draft_specs(client.clone(), draft.id, &draft_catalog).await?; + let spec_rows = draft::upsert_draft_specs(&ctx.client, draft.id, &draft_catalog).await?; println!("Running tests for catalog items:"); ctx.write_all(spec_rows, ())?; println!("Starting tests..."); // Technically, test is just a publish with the dry-run flag set to true. let publish_result = - draft::publish(client.clone(), &args.default_data_plane, draft.id, true).await; + draft::publish(&ctx.client, &args.default_data_plane, draft.id, true).await; - if let Err(del_err) = draft::delete_draft(client.clone(), draft.id).await { + if let Err(del_err) = draft::delete_draft(&ctx.client, draft.id).await { tracing::error!(draft_id = %draft.id, error = %del_err, "failed to delete draft"); } publish_result.context("Tests failed")?; diff --git a/crates/flowctl/src/client.rs b/crates/flowctl/src/client.rs new file mode 100644 index 0000000000..2f1fe540c3 --- /dev/null +++ b/crates/flowctl/src/client.rs @@ -0,0 +1,209 @@ +/// Client encapsulates sub-clients for various control-plane +/// and data-plane services that `flowctl` interacts with. +#[derive(Clone)] +pub struct Client { + // URL of the control-plane agent API. + agent_endpoint: url::Url, + // HTTP client to use for REST requests. + http_client: reqwest::Client, + // PostgREST client. + pg_client: postgrest::Postgrest, + // User's access token, if authenticated. + user_access_token: Option, + // Base shard client which is cloned to build token-specific clients. + shard_client: gazette::shard::Client, + // Base journal client which is cloned to build token-specific clients. + journal_client: gazette::journal::Client, +} + +impl Client { + /// Build a new Client from the Config. + pub fn new(config: &crate::config::Config) -> Self { + let user_access_token = config.user_access_token.clone(); + + let mut pg_client = postgrest::Postgrest::new(config.get_pg_url().as_str()) + .insert_header("apikey", config.get_pg_public_token()); + + if let Some(token) = user_access_token.as_ref() { + pg_client = pg_client.insert_header("Authorization", &format!("Bearer {token}")); + } + + // Build journal and shard clients with an empty default service address. + // We'll use their with_endpoint_and_metadata() routines to cheaply clone + // new clients using dynamic addresses and access tokens, while re-using + // underlying connections. + let router = gazette::Router::new("local"); + + let journal_client = gazette::journal::Client::new( + String::new(), + gazette::Metadata::default(), + router.clone(), + ); + let shard_client = gazette::shard::Client::new( + String::new(), + gazette::Metadata::default(), + router.clone(), + ); + + Self { + agent_endpoint: config.get_agent_url().clone(), + http_client: reqwest::Client::new(), + journal_client, + pg_client, + shard_client, + user_access_token, + } + } + + pub fn from(&self, table: &str) -> postgrest::Builder { + self.pg_client.from(table) + } + + pub fn rpc(&self, function: &str, params: String) -> postgrest::Builder { + self.pg_client.rpc(function, params) + } + + pub fn is_authenticated(&self) -> bool { + self.user_access_token.is_some() + } + + pub async fn agent_unary( + &self, + path: &str, + request: &Request, + ) -> anyhow::Result + where + Request: serde::Serialize, + Response: serde::de::DeserializeOwned, + { + let mut builder = self + .http_client + .post(self.agent_endpoint.join(path)?) + .json(request); + + if let Some(token) = &self.user_access_token { + builder = builder.bearer_auth(token); + } + + let response = self + .http_client + .execute(builder.build()?) + .await? + .error_for_status()? + .json() + .await?; + + Ok(response) + } +} + +#[tracing::instrument(skip(client), err)] +pub async fn fetch_task_authorization( + client: &Client, + task: &str, +) -> anyhow::Result<( + String, + String, + String, + gazette::shard::Client, + gazette::journal::Client, +)> { + let started_unix = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let models::authorizations::UserTaskAuthorization { + broker_address, + broker_token, + ops_logs_journal, + ops_stats_journal, + reactor_address, + reactor_token, + shard_id_prefix, + retry_millis: _, + } = loop { + let response: models::authorizations::UserTaskAuthorization = client + .agent_unary( + "/authorize/user/task", + &models::authorizations::UserTaskAuthorizationRequest { + started_unix, + task: models::Name::new(task), + }, + ) + .await?; + + if response.retry_millis != 0 { + tracing::debug!(response.retry_millis, "sleeping before retrying request"); + () = tokio::time::sleep(std::time::Duration::from_millis(response.retry_millis)).await; + continue; + } + break response; + }; + + let mut md = gazette::Metadata::default(); + md.bearer_token(&reactor_token)?; + + let shard_client = client + .shard_client + .with_endpoint_and_metadata(reactor_address, md); + + let mut md = gazette::Metadata::default(); + md.bearer_token(&broker_token)?; + + let journal_client = client + .journal_client + .with_endpoint_and_metadata(broker_address, md); + + Ok(( + shard_id_prefix, + ops_logs_journal, + ops_stats_journal, + shard_client, + journal_client, + )) +} + +#[tracing::instrument(skip(client), err)] +pub async fn fetch_collection_authorization( + client: &Client, + collection: &str, +) -> anyhow::Result<(String, gazette::journal::Client)> { + let started_unix = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let models::authorizations::UserCollectionAuthorization { + broker_address, + broker_token, + journal_name_prefix, + retry_millis: _, + } = loop { + let response: models::authorizations::UserCollectionAuthorization = client + .agent_unary( + "/authorize/user/collection", + &models::authorizations::UserCollectionAuthorizationRequest { + started_unix, + collection: models::Collection::new(collection), + }, + ) + .await?; + + if response.retry_millis != 0 { + tracing::debug!(response.retry_millis, "sleeping before retrying request"); + () = tokio::time::sleep(std::time::Duration::from_millis(response.retry_millis)).await; + continue; + } + break response; + }; + + let mut md = gazette::Metadata::default(); + md.bearer_token(&broker_token)?; + + let journal_client = client + .journal_client + .with_endpoint_and_metadata(broker_address, md); + + Ok((journal_name_prefix, journal_client)) +} diff --git a/crates/flowctl/src/collection/mod.rs b/crates/flowctl/src/collection/mod.rs index 80adf777ca..6088a947f3 100644 --- a/crates/flowctl/src/collection/mod.rs +++ b/crates/flowctl/src/collection/mod.rs @@ -6,7 +6,6 @@ use proto_flow::flow; use proto_gazette::broker; use time::OffsetDateTime; -use crate::dataplane::journal_client_for; use crate::output::{to_table_row, CliOutput, JsonCell}; use self::read::ReadArgs; @@ -30,12 +29,13 @@ fn parse_partition_selector(arg: &str) -> Result broker::LabelSelector { + pub fn build_label_selector(&self, journal_name_prefix: String) -> broker::LabelSelector { assemble::journal_selector( + // Synthesize a minimal CollectionSpec to satisfy `journal_selector()`. &flow::CollectionSpec { name: self.collection.to_string(), partition_template: Some(broker::JournalSpec { - name: self.collection.to_string(), + name: journal_name_prefix, ..Default::default() }), ..Default::default() @@ -110,13 +110,15 @@ pub struct ListFragmentsArgs { #[clap(flatten)] pub selector: CollectionJournalSelector, - /// If provided, then the frament listing will include a pre-signed URL for each fragment, which is valid for the given duration. + /// If provided, then the fragment listing will include a pre-signed URL for each fragment, + /// which is valid for the given duration. /// This can be used to fetch fragment data directly from cloud storage. #[clap(long)] pub signature_ttl: Option, /// Only include fragments which were written within the provided duration from the present. - /// For example, `--since 10m` will only output fragments that have been written within the last 10 minutes. + /// For example, `--since 10m` will only output fragments that have been written within + /// the last 10 minutes. #[clap(long)] pub since: Option, } @@ -176,22 +178,23 @@ impl CliOutput for broker::fragments_response::Fragment { async fn do_list_fragments( ctx: &mut crate::CliContext, - args: &ListFragmentsArgs, + ListFragmentsArgs { + selector, + signature_ttl, + since, + }: &ListFragmentsArgs, ) -> Result<(), anyhow::Error> { - let client = journal_client_for( - ctx.controlplane_client().await?, - vec![args.selector.collection.clone()], - ) - .await?; + let (journal_name_prefix, client) = + crate::client::fetch_collection_authorization(&ctx.client, &selector.collection).await?; let list_resp = client .list(broker::ListRequest { - selector: Some(args.selector.build_label_selector()), + selector: Some(selector.build_label_selector(journal_name_prefix)), ..Default::default() }) .await?; - let start_time = if let Some(since) = args.since { + let start_time = if let Some(since) = *since { let timepoint = OffsetDateTime::now_utc() - *since; tracing::debug!(%since, begin_mod_time = %timepoint, "resolved --since to begin_mod_time"); timepoint.unix_timestamp() @@ -199,9 +202,7 @@ async fn do_list_fragments( 0 }; - let signature_ttl = args - .signature_ttl - .map(|ttl| std::time::Duration::from(*ttl).into()); + let signature_ttl = signature_ttl.map(|ttl| std::time::Duration::from(*ttl).into()); let mut fragments = Vec::with_capacity(32); for journal in list_resp.journals { let req = broker::FragmentsRequest { @@ -216,22 +217,19 @@ async fn do_list_fragments( fragments.extend(frag_resp.fragments); } - ctx.write_all(fragments, args.signature_ttl.is_some()) + ctx.write_all(fragments, signature_ttl.is_some()) } async fn do_list_journals( ctx: &mut crate::CliContext, - args: &CollectionJournalSelector, + selector: &CollectionJournalSelector, ) -> Result<(), anyhow::Error> { - let client = journal_client_for( - ctx.controlplane_client().await?, - vec![args.collection.clone()], - ) - .await?; + let (journal_name_prefix, client) = + crate::client::fetch_collection_authorization(&ctx.client, &selector.collection).await?; let list_resp = client .list(broker::ListRequest { - selector: Some(args.build_label_selector()), + selector: Some(selector.build_label_selector(journal_name_prefix)), ..Default::default() }) .await?; diff --git a/crates/flowctl/src/collection/read/mod.rs b/crates/flowctl/src/collection/read/mod.rs index 2ebc4ae953..81171247bd 100644 --- a/crates/flowctl/src/collection/read/mod.rs +++ b/crates/flowctl/src/collection/read/mod.rs @@ -1,11 +1,10 @@ -use crate::dataplane::{self}; use crate::{collection::CollectionJournalSelector, output::OutputType}; use anyhow::Context; use futures::StreamExt; use gazette::journal::ReadJsonLine; use proto_gazette::broker; +use std::io::Write; use time::OffsetDateTime; -use tokio::io::AsyncWriteExt; #[derive(clap::Args, Default, Debug, Clone)] pub struct ReadArgs { @@ -20,8 +19,6 @@ pub struct ReadArgs { /// the default. #[clap(long)] pub uncommitted: bool, - #[clap(skip)] - pub auth_prefixes: Vec, } /// Common definition for arguments specifying the begin and and bounds of a read command. @@ -42,18 +39,21 @@ pub struct ReadBounds { /// - Only uncommitted reads are supported /// - Any acknowledgements (documents with `/_meta/ack` value `true`) are also printed /// These limitations should all be addressed in the future when we add support for committed reads. -pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> anyhow::Result<()> { - if !args.uncommitted { +pub async fn read_collection( + ctx: &mut crate::CliContext, + ReadArgs { + selector, + bounds, + uncommitted, + }: &ReadArgs, +) -> anyhow::Result<()> { + if !uncommitted { anyhow::bail!("missing the `--uncommitted` flag. This flag is currently required, though a future release will add support for committed reads, which will be the default."); } // output can be either None or Some(OutputType::Json), but cannot be explicitly set to // anything else. _Eventually_, we may want to support outputting collection data as yaml // or a table, but certainly not right now. - if let Some(naughty_output_type) = ctx - .output_args() - .output - .filter(|ot| *ot != OutputType::Json) - { + if let Some(naughty_output_type) = ctx.output.output.filter(|ot| *ot != OutputType::Json) { let clap_enum = clap::ValueEnum::to_possible_value(&naughty_output_type) .expect("possible value cannot be None"); let name = clap_enum.get_name(); @@ -62,17 +62,12 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an ); } - let auth_prefixes = if args.auth_prefixes.is_empty() { - vec![args.selector.collection.clone()] - } else { - args.auth_prefixes.clone() - }; - let cp_client = ctx.controlplane_client().await?; - let client = dataplane::journal_client_for(cp_client, auth_prefixes).await?; + let (journal_name_prefix, journal_client) = + crate::client::fetch_collection_authorization(&ctx.client, &selector.collection).await?; - let list_resp = client + let list_resp = journal_client .list(broker::ListRequest { - selector: Some(args.selector.build_label_selector()), + selector: Some(selector.build_label_selector(journal_name_prefix)), ..Default::default() }) .await @@ -84,7 +79,7 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an .map(|j| j.spec.unwrap()) .collect::>(); - tracing::debug!(journal_count = journals.len(), collection = %args.selector.collection, "listed journals"); + tracing::debug!(journal_count = journals.len(), collection = %selector.collection, "listed journals"); let maybe_journal = journals.pop(); if !journals.is_empty() { // TODO: implement a sequencer and allow reading from multiple journals @@ -94,11 +89,19 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an let journal = maybe_journal.ok_or_else(|| { anyhow::anyhow!( "collection '{}' does not exist or has never been written to (it has no journals)", - args.selector.collection + selector.collection ) })?; - let begin_mod_time = if let Some(since) = args.bounds.since { + read_collection_journal(journal_client, &journal.name, bounds).await +} + +pub async fn read_collection_journal( + journal_client: gazette::journal::Client, + journal_name: &str, + bounds: &ReadBounds, +) -> anyhow::Result<()> { + let begin_mod_time = if let Some(since) = bounds.since { let start_time = OffsetDateTime::now_utc() - *since; tracing::debug!(%since, begin_mod_time = %start_time, "resolved --since to begin_mod_time"); (start_time - OffsetDateTime::UNIX_EPOCH).as_seconds_f64() as i64 @@ -106,27 +109,23 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an 0 }; - let mut lines = client.read_json_lines( + let mut lines = journal_client.read_json_lines( broker::ReadRequest { - journal: journal.name.clone(), + journal: journal_name.to_string(), offset: 0, - block: args.bounds.follow, + block: bounds.follow, begin_mod_time, ..Default::default() }, 1, ); - tracing::debug!(journal = %journal.name, "starting read of journal"); + tracing::debug!(%journal_name, "starting read of journal"); let policy = doc::SerPolicy::noop(); + let mut stdout = std::io::stdout(); while let Some(line) = lines.next().await { match line { - Err(err) if err.is_transient() => { - tracing::warn!(%err, "error reading collection (will retry)"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - Err(err) => anyhow::bail!(err), Ok(ReadJsonLine::Meta(_)) => (), Ok(ReadJsonLine::Doc { root, @@ -134,8 +133,16 @@ pub async fn read_collection(ctx: &mut crate::CliContext, args: &ReadArgs) -> an }) => { let mut v = serde_json::to_vec(&policy.on(root.get())).unwrap(); v.push(b'\n'); - tokio::io::stdout().write_all(&v).await?; + () = stdout.write_all(&v)?; + } + Err(gazette::Error::BrokerStatus(broker::Status::OffsetNotYetAvailable)) => { + break; // Graceful EOF of non-blocking read. } + Err(err) if err.is_transient() => { + tracing::warn!(%err, "error reading collection (will retry)"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + Err(err) => anyhow::bail!(err), } } diff --git a/crates/flowctl/src/config.rs b/crates/flowctl/src/config.rs index 6a7959c560..c03d4a6c9f 100644 --- a/crates/flowctl/src/config.rs +++ b/crates/flowctl/src/config.rs @@ -1,29 +1,114 @@ use anyhow::Context; -use serde::{Deserialize, Serialize}; use std::path::PathBuf; -lazy_static::lazy_static! { - static ref DEFAULT_DASHBOARD_URL: url::Url = url::Url::parse("https://dashboard.estuary.dev/").unwrap(); -} - -#[derive(Debug, Serialize, Deserialize, Default)] +/// Configuration of `flowctl`. +/// +/// We generally keep this minimal and prefer to use built-in default +/// or local value fallbacks, because that means we can update these +/// defaults in future releases of flowctl without breaking local +/// User configuration. +#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] pub struct Config { + /// URL endpoint of the Flow control-plane Agent API. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub agent_url: Option, /// URL of the Flow UI, which will be used as a base when flowctl generates links to it. + #[serde(default, skip_serializing_if = "Option::is_none")] pub dashboard_url: Option, /// ID of the current draft, or None if no draft is configured. + #[serde(default, skip_serializing_if = "Option::is_none")] pub draft: Option, - // Current access token, or None if no token is set. - pub api: Option, + /// Public (shared) anonymous token of the control-plane API. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pg_public_token: Option, + /// URL endpoint of the Flow control-plane PostgREST API. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pg_url: Option, + /// Users's access token for the control-plane API. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub user_access_token: Option, + /// User's refresh token for the control-plane API, + /// used to generate access_token when it's unset or expires. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub user_refresh_token: Option, + + #[serde(skip)] + is_local: bool, + + // Legacy API stanza, which is being phased out. + #[serde(default, skip_serializing)] + api: Option, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct RefreshToken { + pub id: models::Id, + pub secret: String, +} + +#[derive(Debug, serde::Deserialize)] +struct DeprecatedAPISection { + #[allow(dead_code)] + endpoint: url::Url, + #[allow(dead_code)] + public_token: String, + access_token: String, + refresh_token: Option, } impl Config { + pub fn selected_draft(&self) -> anyhow::Result { + self.draft + .ok_or(anyhow::anyhow!("No draft is currently selected")) + } + + pub fn get_agent_url(&self) -> &url::Url { + if let Some(agent_url) = &self.agent_url { + agent_url + } else if self.is_local { + &LOCAL_AGENT_URL + } else { + &DEFAULT_AGENT_URL + } + } + + pub fn get_dashboard_url(&self) -> &url::Url { + if let Some(dashboard_url) = &self.dashboard_url { + dashboard_url + } else if self.is_local { + &LOCAL_DASHBOARD_URL + } else { + &DEFAULT_DASHBOARD_URL + } + } + + pub fn get_pg_public_token(&self) -> &str { + if let Some(pg_public_token) = &self.pg_public_token { + pg_public_token + } else if self.is_local { + LOCAL_PG_PUBLIC_TOKEN + } else { + DEFAULT_PG_PUBLIC_TOKEN + } + } + + pub fn get_pg_url(&self) -> &url::Url { + if let Some(pg_url) = &self.pg_url { + pg_url + } else if self.is_local { + &LOCAL_PG_URL + } else { + &DEFAULT_PG_URL + } + } + /// Loads the config corresponding to the given named `profile`. /// This loads from: /// - $HOME/.config/flowctl/${profile}.json on linux /// - $HOME/Library/Application Support/flowctl/${profile}.json on macos pub fn load(profile: &str) -> anyhow::Result { let config_file = Config::file_path(profile)?; - let config = match std::fs::read(&config_file) { + let mut config = match std::fs::read(&config_file) { Ok(v) => { let cfg = serde_json::from_slice(&v).with_context(|| { format!( @@ -43,9 +128,35 @@ impl Config { Config::default() } Err(err) => { - return Err(err).context("opening config"); + return Err(err).context("failed to read config"); } }; + + // Migrate legacy portions of the config. + if let Some(DeprecatedAPISection { + endpoint: _, + public_token: _, + access_token, + refresh_token, + }) = config.api.take() + { + config.user_access_token = Some(access_token); + config.user_refresh_token = refresh_token; + } + + // If a refresh token is not defined, attempt to parse one from the environment. + if config.user_refresh_token.is_none() { + if let Ok(env_token) = std::env::var(FLOW_AUTH_TOKEN) { + let decoded = base64::decode(env_token).context("FLOW_AUTH_TOKEN is not base64")?; + let token: RefreshToken = + serde_json::from_slice(&decoded).context("FLOW_AUTH_TOKEN is invalid JSON")?; + + tracing::info!("using refresh token from environment variable {FLOW_AUTH_TOKEN}"); + config.user_refresh_token = Some(token); + } + } + config.is_local = profile == "local"; + Ok(config) } @@ -83,86 +194,21 @@ impl Config { let path = Config::config_dir()?.join(format!("{profile}.json")); Ok(path) } - - pub fn cur_draft(&self) -> anyhow::Result { - match self.draft { - Some(draft) => Ok(draft), - None => { - anyhow::bail!("You must create or select a draft"); - } - } - } - - pub fn set_access_token(&mut self, access_token: String) { - // Don't overwrite the other fields of api if they are already present. - if let Some(api) = self.api.as_mut() { - api.access_token = access_token; - } else { - self.api = Some(API::managed(access_token)); - } - } - - pub fn set_refresh_token(&mut self, refresh_token: RefreshToken) { - // Don't overwrite the other fields of api if they are already present. - if let Some(api) = self.api.as_mut() { - api.refresh_token = Some(refresh_token); - } - } - - pub fn get_dashboard_url(&self, path: &str) -> anyhow::Result { - let base = self - .dashboard_url - .as_ref() - .unwrap_or(&*DEFAULT_DASHBOARD_URL); - let url = base.join(path).context( - "failed to join path to configured dashboard_url, the dashboard_url is likely invalid", - )?; - Ok(url) - } -} - -#[derive(Deserialize, Serialize, Debug)] -pub struct RefreshToken { - pub id: String, - pub secret: String, } -impl RefreshToken { - pub fn from_base64(encoded_token: &str) -> anyhow::Result { - let decoded = base64::decode(encoded_token).context("invalid base64")?; - let tk: RefreshToken = serde_json::from_slice(&decoded)?; - Ok(tk) - } - - pub fn to_base64(&self) -> anyhow::Result { - let ser = serde_json::to_vec(self)?; - Ok(base64::encode(&ser)) - } -} +lazy_static::lazy_static! { + static ref DEFAULT_AGENT_URL: url::Url = url::Url::parse("https://agent-api-1084703453822.us-central1.run.app").unwrap(); + static ref DEFAULT_DASHBOARD_URL: url::Url = url::Url::parse("https://dashboard.estuary.dev/").unwrap(); + static ref DEFAULT_PG_URL: url::Url = url::Url::parse("https://eyrcnmuzzyriypdajwdk.supabase.co/rest/v1").unwrap(); -#[derive(Debug, Serialize, Deserialize)] -pub struct API { - // URL endpoint of the Flow control-plane Rest API. - pub endpoint: url::Url, - // Public (shared) anonymous token of the control-plane API. - pub public_token: String, - // Secret access token of the control-plane API. - pub access_token: String, - // Secret refresh token of the control-plane API, used to generate access_token when it expires. - pub refresh_token: Option, + // Used only when profile is "local". + static ref LOCAL_AGENT_URL: url::Url = url::Url::parse("http://localhost:8675/").unwrap(); + static ref LOCAL_DASHBOARD_URL: url::Url = url::Url::parse("http://localhost:3000/").unwrap(); + static ref LOCAL_PG_URL: url::Url = url::Url::parse("http://localhost:5431/rest/v1").unwrap(); } -pub const PUBLIC_TOKEN: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImV5cmNubXV6enlyaXlwZGFqd2RrIiwicm9sZSI6ImFub24iLCJpYXQiOjE2NDg3NTA1NzksImV4cCI6MTk2NDMyNjU3OX0.y1OyXD3-DYMz10eGxzo1eeamVMMUwIIeOoMryTRAoco"; - -pub const ENDPOINT: &str = "https://eyrcnmuzzyriypdajwdk.supabase.co/rest/v1"; +const DEFAULT_PG_PUBLIC_TOKEN: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImV5cmNubXV6enlyaXlwZGFqd2RrIiwicm9sZSI6ImFub24iLCJpYXQiOjE2NDg3NTA1NzksImV4cCI6MTk2NDMyNjU3OX0.y1OyXD3-DYMz10eGxzo1eeamVMMUwIIeOoMryTRAoco"; +const LOCAL_PG_PUBLIC_TOKEN: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0"; -impl API { - fn managed(access_token: String) -> Self { - Self { - endpoint: url::Url::parse(ENDPOINT).unwrap(), - public_token: PUBLIC_TOKEN.to_string(), - access_token, - refresh_token: None, - } - } -} +// Environment variable which is inspected for a base64-encoded refresh token. +const FLOW_AUTH_TOKEN: &str = "FLOW_AUTH_TOKEN"; diff --git a/crates/flowctl/src/controlplane.rs b/crates/flowctl/src/controlplane.rs deleted file mode 100644 index bfaf4cb9ee..0000000000 --- a/crates/flowctl/src/controlplane.rs +++ /dev/null @@ -1,170 +0,0 @@ -use crate::config::{RefreshToken, ENDPOINT, PUBLIC_TOKEN}; -use crate::{api_exec, CliContext}; -use anyhow::Context; -use serde::Deserialize; -use std::fmt::{self, Debug}; -use std::ops::Deref; -use std::sync::Arc; - -/// A wafer-thin wrapper around a `Postgrest` client that makes it -/// cheaply cloneable and implements `Debug`. This allows us to create -/// a client once and then store it in the `CliContext` for future re-use. -/// This client implements `Deref`, so you can use it -/// just like you would the normal `Postgrest` client. -#[derive(Clone)] -pub struct Client(Arc, bool); - -impl Client { - /// Is this client authenticated (versus being an anonymous user)? - pub fn is_authenticated(&self) -> bool { - self.1 - } -} - -impl Debug for Client { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // We can't really give a better debug impl since Postgrest - // keeps all of its members private. - f.write_str("controlplane::Client") - } -} - -impl Deref for Client { - type Target = postgrest::Postgrest; - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} - -#[derive(Deserialize)] -struct AccessTokenResponse { - access_token: String, -} - -/// Creates a new client. **you should instead call `CliContext::controlplane_client(&mut Self)`**, which -/// will re-use the existing client if possible. -// TODO(johnny): This really needs a deep overhaul. We're not updating refresh -// tokens as we should be, and the structure of the Config is problematic. -// I'm resisting refactoring it more substantially right now, but it needs it. -pub(crate) async fn new_client(ctx: &mut CliContext) -> anyhow::Result { - match ctx.config_mut().api { - Some(ref mut api) => { - let client = postgrest::Postgrest::new(api.endpoint.as_str()); - let client = client.insert_header("apikey", &api.public_token); - - // Try to give users a more friendly error message if we know their credentials are expired. - if let Err(e) = check_access_token(&api.access_token) { - if let Some(refresh_token) = &api.refresh_token { - let response = api_exec::(client.rpc( - "generate_access_token", - format!( - r#"{{"refresh_token_id": "{}", "secret": "{}"}}"#, - refresh_token.id, refresh_token.secret - ), - )) - .await?; - api.access_token = response.access_token; - } else { - return Err(e); - } - } - let client = - client.insert_header("Authorization", format!("Bearer {}", &api.access_token)); - Ok(Client(Arc::new(client), true)) - } - None => { - // If there has been no prior login, but FLOW_AUTH_TOKEN is available, we use that to - // generate an access_token and automatically login the user - if let Ok(env_token) = std::env::var(FLOW_AUTH_TOKEN) { - let client = postgrest::Postgrest::new(ENDPOINT); - let client = client.insert_header("apikey", PUBLIC_TOKEN); - - let refresh_token = RefreshToken::from_base64(&env_token)?; - let response = api_exec::(client.rpc( - "generate_access_token", - format!( - r#"{{"refresh_token_id": "{}", "secret": "{}"}}"#, - refresh_token.id, refresh_token.secret - ), - )) - .await?; - - let _jwt = check_access_token(&response.access_token)?; - ctx.config_mut() - .set_access_token(response.access_token.clone()); - - let client = client - .insert_header("Authorization", format!("Bearer {}", response.access_token)); - Ok(Client(Arc::new(client), true)) - } else { - tracing::warn!("You are not authenticated. Run `auth login` to login to Flow."); - - let client = postgrest::Postgrest::new(ENDPOINT); - let client = client.insert_header("apikey", PUBLIC_TOKEN); - Ok(Client(Arc::new(client), false)) - } - } - } -} - -pub async fn configure_new_access_token( - ctx: &mut CliContext, - access_token: String, -) -> anyhow::Result<()> { - let jwt = check_access_token(&access_token)?; - ctx.config_mut().set_access_token(access_token); - let client = ctx.controlplane_client().await?; - - let refresh_token = api_exec::(client.rpc( - "create_refresh_token", - r#"{"multi_use": true, "valid_for": "90d", "detail": "Created by flowctl"}"#, - )) - .await?; - ctx.config_mut().set_refresh_token(refresh_token); - - let message = if let Some(email) = jwt.email { - format!("Configured access token for user '{email}'") - } else { - "Configured access token".to_string() - }; - println!("{}", message); - Ok(()) -} - -fn check_access_token(access_token: &str) -> anyhow::Result { - let jwt = parse_jwt(access_token).context("invalid access_token")?; - // Try to give users a more friendly error message if we know their credentials are expired. - if jwt.is_expired() { - anyhow::bail!("access token is expired, please re-authenticate and then try again"); - } - Ok(jwt) -} - -const FLOW_AUTH_TOKEN: &str = "FLOW_AUTH_TOKEN"; -#[derive(Deserialize)] -struct JWT { - exp: i64, - email: Option, -} - -impl JWT { - fn is_expired(&self) -> bool { - let exp = time::OffsetDateTime::from_unix_timestamp(self.exp).unwrap_or_else(|err| { - tracing::error!(exp = self.exp, error = %err, "invalid exp in JWT"); - time::OffsetDateTime::UNIX_EPOCH - }); - time::OffsetDateTime::now_utc() >= exp - } -} - -fn parse_jwt(jwt: &str) -> anyhow::Result { - let payload = jwt - .split('.') - .nth(1) - .ok_or_else(|| anyhow::anyhow!("invalid JWT"))?; - let json_data = - base64::decode_config(payload, base64::URL_SAFE_NO_PAD).context("invalid JWT")?; - let data: JWT = serde_json::from_slice(&json_data).context("parsing JWT data")?; - Ok(data) -} diff --git a/crates/flowctl/src/dataplane.rs b/crates/flowctl/src/dataplane.rs deleted file mode 100644 index c10e0c5313..0000000000 --- a/crates/flowctl/src/dataplane.rs +++ /dev/null @@ -1,66 +0,0 @@ -use crate::controlplane; -use anyhow::Context; -use serde::Deserialize; - -#[derive(Deserialize, Clone, PartialEq, Debug)] -pub struct DataPlaneAccess { - #[serde(rename = "token")] - pub auth_token: String, - pub gateway_url: String, -} - -/// Fetches connection info for accessing a data plane for the given catalog namespace prefixes. -pub async fn fetch_data_plane_access_token( - client: controlplane::Client, - prefixes: Vec, -) -> anyhow::Result { - tracing::debug!(?prefixes, "requesting data-plane access token for prefixes"); - - let body = serde_json::to_string(&serde_json::json!({ - "prefixes": prefixes, - })) - .context("serializing prefix parameters")?; - - let req = client.rpc("gateway_auth_token", body).build(); - tracing::trace!(?req, "built request to execute"); - let resp = req - .send() - .await - .and_then(|r| r.error_for_status()) - .context("requesting data plane gateway auth token")?; - let json: serde_json::Value = resp.json().await?; - tracing::trace!(response_body = ?json, "got response from control-plane"); - let mut auths: Vec = - serde_json::from_value(json).context("failed to decode response")?; - let access = auths.pop().ok_or_else(|| { - anyhow::anyhow!( - "no data-plane access tokens were returned for the given prefixes, access is denied" - ) - })?; - if !auths.is_empty() { - let num_tokens = auths.len() + 1; - anyhow::bail!("received {} tokens for the given set of prefixes: {:?}. This is not yet implemented in flowctl", num_tokens, prefixes); - } - Ok(access) -} - -/// Returns an authenticated journal client that's authorized to the given prefixes. -pub async fn journal_client_for( - cp_client: controlplane::Client, - prefixes: Vec, -) -> anyhow::Result { - let DataPlaneAccess { - auth_token, - gateway_url, - } = fetch_data_plane_access_token(cp_client, prefixes).await?; - tracing::debug!(%gateway_url, "acquired data-plane-gateway access token"); - - let mut metadata = gazette::Metadata::default(); - metadata.bearer_token(&auth_token)?; - - let router = gazette::Router::new(&gateway_url, "local")?; - let client = gazette::journal::Client::new(Default::default(), router, metadata); - - tracing::debug!(%gateway_url, "connected data-plane client"); - Ok(client) -} diff --git a/crates/flowctl/src/draft/author.rs b/crates/flowctl/src/draft/author.rs index b65b105244..1b628bad82 100644 --- a/crates/flowctl/src/draft/author.rs +++ b/crates/flowctl/src/draft/author.rs @@ -1,4 +1,4 @@ -use crate::{api_exec, catalog::SpecSummaryItem, controlplane, local_specs}; +use crate::{api_exec, catalog::SpecSummaryItem, local_specs}; use anyhow::Context; use futures::{stream::FuturesOrdered, StreamExt}; use serde::Serialize; @@ -11,7 +11,7 @@ pub struct Author { source: String, } -pub async fn clear_draft(client: controlplane::Client, draft_id: models::Id) -> anyhow::Result<()> { +pub async fn clear_draft(client: &crate::Client, draft_id: models::Id) -> anyhow::Result<()> { tracing::info!(%draft_id, "clearing existing specs from draft"); api_exec::>( client @@ -25,7 +25,7 @@ pub async fn clear_draft(client: controlplane::Client, draft_id: models::Id) -> } pub async fn upsert_draft_specs( - client: controlplane::Client, + client: &crate::Client, draft_id: models::Id, draft: &tables::DraftCatalog, ) -> anyhow::Result> { @@ -130,12 +130,11 @@ pub async fn do_author( ctx: &mut crate::CliContext, Author { source }: &Author, ) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let draft_id = ctx.config().cur_draft()?; - let (draft, _) = local_specs::load_and_validate(client.clone(), &source).await?; + let draft_id = ctx.config.selected_draft()?; + let (draft, _) = local_specs::load_and_validate(&ctx.client, &source).await?; - clear_draft(client.clone(), draft_id).await?; - let rows = upsert_draft_specs(client, draft_id, &draft).await?; + clear_draft(&ctx.client, draft_id).await?; + let rows = upsert_draft_specs(&ctx.client, draft_id, &draft).await?; ctx.write_all(rows, ()) } diff --git a/crates/flowctl/src/draft/develop.rs b/crates/flowctl/src/draft/develop.rs index 0d24d8f143..03c46d2d47 100644 --- a/crates/flowctl/src/draft/develop.rs +++ b/crates/flowctl/src/draft/develop.rs @@ -24,10 +24,9 @@ pub async fn do_develop( flat, }: &Develop, ) -> anyhow::Result<()> { - let draft_id = ctx.config().cur_draft()?; - let client = ctx.controlplane_client().await?; + let draft_id = ctx.config.selected_draft()?; let rows: Vec = api_exec_paginated( - client + ctx.client .from("draft_specs") .select("catalog_name,spec,spec_type,expect_pub_id") .not("is", "spec_type", "null") @@ -46,7 +45,7 @@ pub async fn do_develop( let sources = local_specs::indirect_and_write_resources(sources)?; println!("Wrote {count} specifications under {target}."); - let () = local_specs::generate_files(client, sources).await?; + let () = local_specs::generate_files(&ctx.client, sources).await?; Ok(()) } diff --git a/crates/flowctl/src/draft/mod.rs b/crates/flowctl/src/draft/mod.rs index f24960b23f..c577146272 100644 --- a/crates/flowctl/src/draft/mod.rs +++ b/crates/flowctl/src/draft/mod.rs @@ -2,7 +2,6 @@ use std::collections::BTreeSet; use crate::{ api_exec, api_exec_paginated, - controlplane::Client, output::{to_table_row, CliOutput, JsonCell}, }; use anyhow::Context; @@ -128,7 +127,7 @@ impl CliOutput for DraftRow { } } -pub async fn create_draft(client: Client) -> Result { +pub async fn create_draft(client: &crate::Client) -> Result { let row: DraftRow = api_exec( client .from("drafts") @@ -141,7 +140,10 @@ pub async fn create_draft(client: Client) -> Result { Ok(row) } -pub async fn delete_draft(client: Client, draft_id: models::Id) -> Result { +pub async fn delete_draft( + client: &crate::Client, + draft_id: models::Id, +) -> Result { let row: DraftRow = api_exec( client .from("drafts") @@ -156,10 +158,9 @@ pub async fn delete_draft(client: Client, draft_id: models::Id) -> Result anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let row = create_draft(client).await?; + let row = create_draft(&ctx.client).await?; - ctx.config_mut().draft = Some(row.id.clone()); + ctx.config.draft = Some(row.id.clone()); ctx.write_all(Some(row), ()) } @@ -181,11 +182,10 @@ async fn do_delete(ctx: &mut crate::CliContext) -> anyhow::Result<()> { to_table_row(self, &["/id", "/updated_at"]) } } - let client = ctx.controlplane_client().await?; - let draft_id = ctx.config().cur_draft()?; - let row = delete_draft(client, draft_id).await?; + let draft_id = ctx.config.selected_draft()?; + let row = delete_draft(&ctx.client, draft_id).await?; - ctx.config_mut().draft.take(); + ctx.config.draft.take(); ctx.write_all(Some(row), ()) } @@ -223,8 +223,7 @@ async fn do_describe(ctx: &mut crate::CliContext) -> anyhow::Result<()> { } } let rows: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("draft_specs_ext") .select( vec![ @@ -237,7 +236,7 @@ async fn do_describe(ctx: &mut crate::CliContext) -> anyhow::Result<()> { ] .join(","), ) - .eq("draft_id", ctx.config().cur_draft()?.to_string()), + .eq("draft_id", ctx.config.selected_draft()?.to_string()), ) .await?; @@ -269,8 +268,7 @@ async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> { } } let rows: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("drafts_ext") .select("created_at,detail,id,num_specs,updated_at"), ) @@ -278,7 +276,7 @@ async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> { // Decorate the id to mark the selected draft, but only if we're outputting a table let cur_draft = ctx - .config() + .config .draft .map(|id| id.to_string()) .unwrap_or_default(); @@ -298,7 +296,7 @@ async fn do_list(ctx: &mut crate::CliContext) -> anyhow::Result<()> { /// that are identical to their live specs, accounting for changes to inferred schemas. /// Returns the set of specs that were removed from the draft (as a `BTreeSet` so they're ordered). pub async fn remove_unchanged( - client: &Client, + client: &crate::Client, draft_id: models::Id, ) -> anyhow::Result> { #[derive(Deserialize)] @@ -321,8 +319,7 @@ async fn do_select( Select { id: select_id }: &Select, ) -> anyhow::Result<()> { let matched: Vec = api_exec_paginated( - ctx.controlplane_client() - .await? + ctx.client .from("drafts") .eq("id", select_id.to_string()) .select("id"), @@ -333,7 +330,7 @@ async fn do_select( anyhow::bail!("draft {select_id} does not exist"); } - ctx.config_mut().draft = Some(select_id.clone()); + ctx.config.draft = Some(select_id.clone()); do_list(ctx).await } @@ -342,19 +339,18 @@ async fn do_publish( data_plane_name: &str, dry_run: bool, ) -> anyhow::Result<()> { - let draft_id = ctx.config().cur_draft()?; - let client = ctx.controlplane_client().await?; + let draft_id = ctx.config.selected_draft()?; - publish(client, data_plane_name, draft_id, dry_run).await?; + publish(&ctx.client, data_plane_name, draft_id, dry_run).await?; if !dry_run { - ctx.config_mut().draft.take(); + ctx.config.draft.take(); } Ok(()) } pub async fn publish( - client: Client, + client: &crate::Client, default_data_plane_name: &str, draft_id: models::Id, dry_run: bool, diff --git a/crates/flowctl/src/generate/mod.rs b/crates/flowctl/src/generate/mod.rs index bdf4ea9d59..f30142ce3e 100644 --- a/crates/flowctl/src/generate/mod.rs +++ b/crates/flowctl/src/generate/mod.rs @@ -44,8 +44,7 @@ impl Generate { build::write_files(&project_root, files)?; - let client = ctx.controlplane_client().await?; - let () = local_specs::generate_files(client, draft).await?; + let () = local_specs::generate_files(&ctx.client, draft).await?; Ok(()) } } diff --git a/crates/flowctl/src/lib.rs b/crates/flowctl/src/lib.rs index e967436543..59649aaf11 100644 --- a/crates/flowctl/src/lib.rs +++ b/crates/flowctl/src/lib.rs @@ -5,10 +5,9 @@ use clap::Parser; mod auth; mod catalog; +mod client; mod collection; mod config; -mod controlplane; -mod dataplane; mod draft; mod generate; mod local_specs; @@ -19,8 +18,8 @@ mod poll; mod preview; mod raw; +use client::Client; use output::{Output, OutputType}; -use pagination::into_items; use poll::poll_while_queued; /// A command-line tool for working with Estuary Flow. @@ -97,38 +96,14 @@ pub enum Command { Raw(raw::Advanced), } -#[derive(Debug)] pub struct CliContext { + client: Client, config: config::Config, output: output::Output, - controlplane_client: Option, } impl CliContext { - /// Returns a client to the controlplane, creating a new one if necessary. - /// This function will return an error if the authentication credentials - /// are missing, invalid, or expired. - pub async fn controlplane_client(&mut self) -> anyhow::Result { - if self.controlplane_client.is_none() { - let client = controlplane::new_client(self).await?; - self.controlplane_client = Some(client.clone()) - } - Ok(self.controlplane_client.clone().unwrap()) - } - - pub fn config_mut(&mut self) -> &mut config::Config { - &mut self.config - } - - pub fn config(&self) -> &config::Config { - &self.config - } - - pub fn output_args(&self) -> &output::Output { - &self.output - } - - pub fn write_all(&mut self, items: I, table_alt: T::TableAlt) -> anyhow::Result<()> + fn write_all(&mut self, items: I, table_alt: T::TableAlt) -> anyhow::Result<()> where T: output::CliOutput, I: IntoIterator, @@ -140,7 +115,7 @@ impl CliContext { } } - pub fn get_output_type(&mut self) -> OutputType { + fn get_output_type(&mut self) -> OutputType { use crossterm::tty::IsTty; if let Some(ty) = self.output.output { @@ -157,12 +132,71 @@ impl CliContext { impl Cli { pub async fn run(&self) -> anyhow::Result<()> { - let config = config::Config::load(&self.profile)?; + let mut config = config::Config::load(&self.profile)?; let output = self.output.clone(); + + // If the configured access token has expired then remove it before continuing. + if let Some(token) = &config.user_access_token { + let claims: models::authorizations::ControlClaims = + parse_jwt_claims(token).context("failed to parse control-plane access token")?; + + let now = time::OffsetDateTime::now_utc(); + let exp = time::OffsetDateTime::from_unix_timestamp(claims.exp as i64).unwrap(); + + if now + std::time::Duration::from_secs(60) > exp { + tracing::info!(expired=%exp, "removing expired user access token from configuration"); + config.user_access_token = None; + } + } + + if config.user_access_token.is_some() && config.user_refresh_token.is_some() { + // Authorization is current: nothing to do. + } else if config.user_access_token.is_some() { + // We have an access token but no refresh token. Create one. + let refresh_token = api_exec::( + Client::new(&config).rpc( + "create_refresh_token", + serde_json::json!({"multi_use": true, "valid_for": "90d", "detail": "Created by flowctl"}) + .to_string(), + ), + ) + .await?; + + config.user_refresh_token = Some(refresh_token); + + tracing::info!("created new refresh token"); + } else if let Some(config::RefreshToken { id, secret }) = &config.user_refresh_token { + // We have a refresh token but no access token. Generate one. + + #[derive(serde::Deserialize)] + struct Response { + access_token: String, + refresh_token: Option, // Set iff the token was single-use. + } + let Response { + access_token, + refresh_token: next_refresh_token, + } = api_exec::(Client::new(&config).rpc( + "generate_access_token", + serde_json::json!({"refresh_token_id": id, "secret": secret}).to_string(), + )) + .await + .context("failed to obtain access token")?; + + if next_refresh_token.is_some() { + config.user_refresh_token = next_refresh_token; + } + config.user_access_token = Some(access_token); + + tracing::info!("generated a new access token"); + } else { + tracing::warn!("You are not authenticated. Run `auth login` to login to Flow."); + } + let mut context = CliContext { + client: Client::new(&config), config, output, - controlplane_client: None, }; match &self.cmd { @@ -176,7 +210,7 @@ impl Cli { Command::Raw(advanced) => advanced.run(&mut context).await, }?; - context.config().write(&self.profile)?; + context.config.write(&self.profile)?; Ok(()) } @@ -213,7 +247,7 @@ where { use futures::TryStreamExt; - let pages = into_items(b).try_collect().await?; + let pages = pagination::into_items(b).try_collect().await?; Ok(pages) } @@ -258,3 +292,12 @@ fn format_user(email: Option, full_name: Option, id: Option(token: &str) -> anyhow::Result { + let claims = token + .split('.') + .nth(1) + .ok_or_else(|| anyhow::anyhow!("malformed token"))?; + let claims = base64::decode_config(claims, base64::URL_SAFE_NO_PAD)?; + anyhow::Result::Ok(serde_json::from_slice(&claims)?) +} diff --git a/crates/flowctl/src/local_specs.rs b/crates/flowctl/src/local_specs.rs index 9ea708bf16..1f4ef366df 100644 --- a/crates/flowctl/src/local_specs.rs +++ b/crates/flowctl/src/local_specs.rs @@ -6,7 +6,7 @@ use tables::CatalogResolver; /// Load and validate sources and derivation connectors (only). /// Capture and materialization connectors are not validated. pub(crate) async fn load_and_validate( - client: crate::controlplane::Client, + client: &crate::Client, source: &str, ) -> anyhow::Result<(tables::DraftCatalog, tables::Validations)> { let source = build::arg_source_to_url(source, false)?; @@ -17,7 +17,7 @@ pub(crate) async fn load_and_validate( /// Load and validate sources and all connectors. pub(crate) async fn load_and_validate_full( - client: crate::controlplane::Client, + client: &crate::Client, source: &str, network: &str, ) -> anyhow::Result<(tables::DraftCatalog, tables::Validations)> { @@ -29,7 +29,7 @@ pub(crate) async fn load_and_validate_full( /// Generate connector files by validating sources with derivation connectors. pub(crate) async fn generate_files( - client: crate::controlplane::Client, + client: &crate::Client, sources: tables::DraftCatalog, ) -> anyhow::Result<()> { let (mut draft, built) = validate(client, true, false, true, sources, "").await; @@ -67,7 +67,7 @@ pub(crate) async fn load(source: &url::Url) -> tables::DraftCatalog { } async fn validate( - client: crate::controlplane::Client, + client: &crate::Client, noop_captures: bool, noop_derivations: bool, noop_materializations: bool, @@ -77,7 +77,11 @@ async fn validate( let source = &draft.fetches[0].resource.clone(); let project_root = build::project_root(source); - let mut live = Resolver { client }.resolve(draft.all_catalog_names()).await; + let mut live = Resolver { + client: client.clone(), + } + .resolve(draft.all_catalog_names()) + .await; let output = if !live.errors.is_empty() { // If there's a live catalog resolution error, surface it through built tables. @@ -191,7 +195,7 @@ pub(crate) fn pick_policy( } pub(crate) struct Resolver { - pub client: crate::controlplane::Client, + pub client: crate::Client, } impl tables::CatalogResolver for Resolver { diff --git a/crates/flowctl/src/ops.rs b/crates/flowctl/src/ops.rs index beb0b5fc9d..7649323d68 100644 --- a/crates/flowctl/src/ops.rs +++ b/crates/flowctl/src/ops.rs @@ -1,9 +1,4 @@ -use serde_json::Value; - -use crate::collection::{ - read::{read_collection, ReadArgs, ReadBounds}, - CollectionJournalSelector, -}; +use crate::collection::read::ReadBounds; #[derive(clap::Args, Debug)] pub struct Logs { @@ -14,17 +9,23 @@ pub struct Logs { pub bounds: ReadBounds, } +/// Selects a Flow task. +#[derive(clap::Args, Debug, Default, Clone)] +pub struct TaskSelector { + /// The name of the task + #[clap(long)] + pub task: String, +} + impl Logs { pub async fn run(&self, ctx: &mut crate::CliContext) -> anyhow::Result<()> { - let uncommitted = true; // logs reads are always 'uncommitted' because logs aren't written inside transactions. - let read_args = read_args( + read_task_ops_journal( + &ctx.client, &self.task.task, OpsCollection::Logs, &self.bounds, - uncommitted, - ); - read_collection(ctx, &read_args).await?; - Ok(()) + ) + .await } } @@ -34,161 +35,18 @@ pub enum OpsCollection { Stats, } -pub fn read_args( +pub async fn read_task_ops_journal( + client: &crate::Client, task_name: &str, collection: OpsCollection, bounds: &ReadBounds, - uncommitted: bool, -) -> ReadArgs { - let logs_or_stats = match collection { - OpsCollection::Logs => "logs", - OpsCollection::Stats => "stats", - }; - // Once we implement federated data planes, we'll need to update this to - // fetch the name of the data plane based on the tenant. - let collection = format!("ops.us-central1.v1/{logs_or_stats}"); +) -> anyhow::Result<()> { + let (_shard_id_prefix, ops_logs_journal, ops_stats_journal, _shard_client, journal_client) = + crate::client::fetch_task_authorization(client, task_name).await?; - let mut include = std::collections::BTreeMap::new(); - include.insert( - "name".to_string(), - vec![Value::String(task_name.to_string())], - ); - let selector = CollectionJournalSelector { - collection, - partitions: Some(models::PartitionSelector { - include, - exclude: Default::default(), - }), - ..Default::default() + let journal_name = match collection { + OpsCollection::Logs => ops_logs_journal, + OpsCollection::Stats => ops_stats_journal, }; - ReadArgs { - selector, - uncommitted, - bounds: bounds.clone(), - auth_prefixes: vec![task_name.to_string()], - } -} - -/// Selects one or more Flow tasks within a single tenant. -#[derive(clap::Args, Debug, Default, Clone)] -pub struct TaskSelector { - /// The name of the task - #[clap(long)] - pub task: String, - // Selects all tasks with the given type - // - // Requires the `--tenant ` argument - //#[clap(long, arg_enum, requires("tenant"))] - //pub task_type: Option, - - // Selects all tasks within the given tenant - // - // The `--task-type` may also be specified to further limit the selection to only tasks of the given - // type. - //#[clap(long)] - //pub tenant: Option, -} - -/* -#[derive(Debug, clap::ArgEnum, PartialEq, Eq, Clone, Copy)] -pub enum TaskType { - Capture, - Derivation, - Materialization, -} - -impl TaskType { - fn label_value(&self) -> &'static str { - match self { - TaskType::Capture => "capture", - TaskType::Derivation => "derivation", - TaskType::Materialization => "materialization", - } - } -} - -impl TaskSelector { - fn tenant_name(&self) -> Result<&str, anyhow::Error> { - self.tenant - .as_deref() - .or_else(|| self.task.as_deref().map(tenant)) - .ok_or_else(|| anyhow::anyhow!("missing required task selector argument")) - } -} - -*/ - -#[cfg(test)] -mod test { - // use super::*; - - /* - #[test] - fn logs_translates_into_journals_read_commands() { - assert_logs_command( - TaskSelector { - task: Some(String::from("acmeCo/test/capture")), - ..Default::default() - }, - "estuary.dev/collection=ops/acmeCo/logs,estuary.dev/field/name=acmeCo%2Ftest%2Fcapture", - ); - assert_logs_command( - TaskSelector { - task_type: Some(TaskType::Capture), - tenant: Some("acmeCo".to_owned()), - task: None, - }, - "estuary.dev/collection=ops/acmeCo/logs,estuary.dev/field/kind=capture", - ); - assert_logs_command( - TaskSelector { - task_type: Some(TaskType::Derivation), - tenant: Some("acmeCo".to_owned()), - task: None, - }, - "estuary.dev/collection=ops/acmeCo/logs,estuary.dev/field/kind=derivation", - ); - assert_logs_command( - TaskSelector { - task_type: Some(TaskType::Materialization), - tenant: Some("acmeCo".to_owned()), - task: None, - }, - "estuary.dev/collection=ops/acmeCo/logs,estuary.dev/field/kind=materialization", - ); - assert_logs_command( - TaskSelector { - tenant: Some(String::from("acmeCo")), - ..Default::default() - }, - "estuary.dev/collection=ops/acmeCo/logs", - ); - } - - fn assert_logs_command(selector: TaskSelector, expected_label_selector: &str) { - let args = Args { - task: selector.clone(), - // Any extra arguments should be appended to whatever is generated - other: vec![String::from("an extra arg")], - }; - let cmd = args - .try_into_exec_external() - .expect("failed to convert args"); - let expected = ExecExternal::from(( - GO_FLOWCTL, - vec![ - "journals", - "read", - "--selector", - expected_label_selector, - "an extra arg", - ], - )); - assert_eq!( - expected, cmd, - "expected selector: {:?} to return journal selector: '{}', but got: {:?}", - selector, expected_label_selector, cmd - ); - } - */ + crate::collection::read::read_collection_journal(journal_client, &journal_name, bounds).await } diff --git a/crates/flowctl/src/output.rs b/crates/flowctl/src/output.rs index b43366cccb..e911668db0 100644 --- a/crates/flowctl/src/output.rs +++ b/crates/flowctl/src/output.rs @@ -15,7 +15,7 @@ pub enum OutputType { Json, /// Format output as YAML Yaml, - /// Format the output as a prett-printed table + /// Format the output as a pretty-printed table Table, } diff --git a/crates/flowctl/src/poll.rs b/crates/flowctl/src/poll.rs index 5159159052..07db9b1764 100644 --- a/crates/flowctl/src/poll.rs +++ b/crates/flowctl/src/poll.rs @@ -3,7 +3,7 @@ use serde::Deserialize; // Poll an async task in `table` having `id` until it's no longer queued. // While we're waiting print out logs under `logs_token`. pub async fn poll_while_queued( - client: &postgrest::Postgrest, + client: &crate::Client, table: &str, id: models::Id, logs_token: &str, diff --git a/crates/flowctl/src/preview/journal_reader.rs b/crates/flowctl/src/preview/journal_reader.rs index 9fa27d9234..2181bf2192 100644 --- a/crates/flowctl/src/preview/journal_reader.rs +++ b/crates/flowctl/src/preview/journal_reader.rs @@ -7,7 +7,7 @@ use proto_gazette::{broker, consumer}; /// collection journals. #[derive(Clone)] pub struct Reader { - control_plane: crate::controlplane::Client, + client: crate::Client, delay: std::time::Duration, } @@ -25,9 +25,9 @@ impl Reader { /// /// `delay` is an artificial, injected delay between a read and a subsequent checkpoint. /// It emulates back-pressure and encourages amortized transactions and reductions. - pub fn new(control_plane: crate::controlplane::Client, delay: std::time::Duration) -> Self { + pub fn new(client: &crate::Client, delay: std::time::Duration) -> Self { Self { - control_plane, + client: client.clone(), delay, } } @@ -38,50 +38,42 @@ impl Reader { mut resume: proto_gazette::consumer::Checkpoint, ) -> mpsc::Receiver> { let reader = coroutines::try_coroutine(move |mut co| async move { - // We must be able to access all sourced collections. - let access_prefixes = sources - .iter() - .map(|source| source.collection.clone()) - .collect(); - - let data_plane_client = - crate::dataplane::journal_client_for(self.control_plane, access_prefixes).await?; + // Concurrently fetch authorizations for all sourced collections. + let sources = futures::future::try_join_all(sources.iter().map(|source| { + crate::client::fetch_collection_authorization(&self.client, &source.collection) + .map_ok(move |(_journal_name_prefix, client)| (source, client)) + })) + .await?; // Concurrently list the journals of every Source. - let journals: Vec<(&Source, Vec)> = - futures::future::try_join_all(sources.iter().map(|source| { - Self::list_journals(source, data_plane_client.clone()) - .map_ok(move |l| (source, l)) + let journals: Vec<(&Source, Vec, &gazette::journal::Client)> = + futures::future::try_join_all(sources.iter().map(|(source, client)| { + Self::list_journals(*source, client).map_ok(move |l| (*source, l, client)) })) .await?; - // Flatten into (binding, source, journal). - let journals: Vec<(u32, &Source, String)> = journals - .into_iter() + // Flatten into (binding, source, journal, client). + let journals: Vec<(u32, &Source, String, &gazette::journal::Client)> = journals + .iter() .enumerate() - .flat_map(|(binding, (source, journals))| { + .flat_map(|(binding, (source, journals, client))| { journals.into_iter().map(move |journal| { ( binding as u32, - source, + *source, format!("{};{}", journal.name, source.read_suffix), + *client, ) }) }) .collect(); // Map into a stream that yields lines from across all journals, as they're ready. - let mut journals = - futures::stream::select_all(journals.iter().map(|(binding, source, journal)| { - Self::read_journal_lines( - *binding, - data_plane_client.clone(), - journal, - &resume, - source, - ) - .boxed() - })); + let mut journals = futures::stream::select_all(journals.iter().map( + |(binding, source, journal, client)| { + Self::read_journal_lines(*binding, client, journal, &resume, source).boxed() + }, + )); // Reset-able timer for delivery of delayed checkpoints. let deadline = tokio::time::sleep(std::time::Duration::MAX); @@ -147,7 +139,7 @@ impl Reader { async fn list_journals( source: &Source, - client: gazette::journal::Client, + client: &gazette::journal::Client, ) -> anyhow::Result> { let resp = client .list(broker::ListRequest { @@ -179,7 +171,7 @@ impl Reader { fn read_journal_lines<'s>( binding: u32, - client: gazette::journal::Client, + client: &gazette::journal::Client, journal: &'s String, resume: &consumer::Checkpoint, source: &Source, @@ -198,7 +190,7 @@ impl Reader { .map(|b| b.seconds) .unwrap_or_default(); - let mut lines = client.read_json_lines( + let mut lines = client.clone().read_json_lines( broker::ReadRequest { journal: journal.clone(), offset, diff --git a/crates/flowctl/src/preview/mod.rs b/crates/flowctl/src/preview/mod.rs index 9568d11aa9..3d295d72ec 100644 --- a/crates/flowctl/src/preview/mod.rs +++ b/crates/flowctl/src/preview/mod.rs @@ -90,11 +90,10 @@ impl Preview { } = self; let source = build::arg_source_to_url(source, false)?; - let client = ctx.controlplane_client().await?; // TODO(johnny): validate only `name`, if presented. let (_sources, validations) = - local_specs::load_and_validate_full(client, source.as_str(), &network).await?; + local_specs::load_and_validate_full(&ctx.client, source.as_str(), &network).await?; let runtime = runtime::Runtime::new( true, // Allow local. @@ -134,7 +133,7 @@ impl Preview { } else { None }; - let journal_reader = journal_reader::Reader::new(ctx.controlplane_client().await?, delay); + let journal_reader = journal_reader::Reader::new(&ctx.client, delay); let initial_state = models::RawValue::from_str(initial_state).context("initial state is not valid JSON")?; @@ -274,8 +273,16 @@ async fn preview_capture( output_state: bool, output_apply: bool, ) -> anyhow::Result<()> { - let responses_rx = - runtime::harness::run_capture(delay, runtime, sessions, &spec, state, state_dir, timeout, output_apply); + let responses_rx = runtime::harness::run_capture( + delay, + runtime, + sessions, + &spec, + state, + state_dir, + timeout, + output_apply, + ); tokio::pin!(responses_rx); while let Some(response) = responses_rx.try_next().await? { @@ -303,7 +310,11 @@ async fn preview_capture( internal.checkpoint.unwrap_or_default(); let collection = "connectorState"; - let state_json = state.as_ref().map(|s| serde_json::to_string(s)).transpose()?.unwrap_or("{}".to_string()); + let state_json = state + .as_ref() + .map(|s| serde_json::to_string(s)) + .transpose()? + .unwrap_or("{}".to_string()); if output_state { print!("[{collection:?},{state_json}]\n"); @@ -350,7 +361,11 @@ async fn preview_derivation( tracing::debug!(stats=?ops::DebugJson(stats), "flushed"); } else if let Some(derive::response::StartedCommit { state }) = response.started_commit { let collection = "connectorState"; - let state_json = state.as_ref().map(|s| serde_json::to_string(s)).transpose()?.unwrap_or("{}".to_string()); + let state_json = state + .as_ref() + .map(|s| serde_json::to_string(s)) + .transpose()? + .unwrap_or("{}".to_string()); if output_state { print!("[{collection:?},{state_json}]\n"); } @@ -373,7 +388,14 @@ async fn preview_materialization( output_apply: bool, ) -> anyhow::Result<()> { let responses_rx = runtime::harness::run_materialize( - reader, runtime, sessions, &spec, state, state_dir, timeout, output_apply, + reader, + runtime, + sessions, + &spec, + state, + state_dir, + timeout, + output_apply, ); tokio::pin!(responses_rx); @@ -389,7 +411,11 @@ async fn preview_materialization( } else if let Some(materialize::response::StartedCommit { state }) = response.started_commit { let collection = "connectorState"; - let state_json = state.as_ref().map(|s| serde_json::to_string(s)).transpose()?.unwrap_or("{}".to_string()); + let state_json = state + .as_ref() + .map(|s| serde_json::to_string(s)) + .transpose()? + .unwrap_or("{}".to_string()); if output_state { print!("[{collection:?},{state_json}]\n"); } diff --git a/crates/flowctl/src/raw/mod.rs b/crates/flowctl/src/raw/mod.rs index 9214e11c12..e977e950d1 100644 --- a/crates/flowctl/src/raw/mod.rs +++ b/crates/flowctl/src/raw/mod.rs @@ -1,5 +1,5 @@ use crate::{ - collection::read::{read_collection, ReadBounds}, + collection::read::ReadBounds, local_specs, ops::{OpsCollection, TaskSelector}, }; @@ -163,14 +163,13 @@ pub struct Stats { impl Stats { pub async fn run(&self, ctx: &mut crate::CliContext) -> anyhow::Result<()> { - let read_args = crate::ops::read_args( + crate::ops::read_task_ops_journal( + &ctx.client, &self.task.task, OpsCollection::Stats, &self.bounds, - self.uncommitted, - ); - read_collection(ctx, &read_args).await?; - Ok(()) + ) + .await } } @@ -199,8 +198,7 @@ impl Advanced { } async fn do_get(ctx: &mut crate::CliContext, Get { table, query }: &Get) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let req = client.from(table).build().query(query); + let req = ctx.client.from(table).build().query(query); tracing::debug!(?req, "built request to execute"); println!("{}", req.send().await?.text().await?); @@ -211,8 +209,7 @@ async fn do_update( ctx: &mut crate::CliContext, Update { table, query, body }: &Update, ) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let req = client.from(table).update(body).build().query(query); + let req = ctx.client.from(table).update(body).build().query(query); tracing::debug!(?req, "built request to execute"); println!("{}", req.send().await?.text().await?); @@ -227,8 +224,7 @@ async fn do_rpc( body, }: &Rpc, ) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let req = client.rpc(function, body).build().query(query); + let req = ctx.client.rpc(function, body.clone()).build().query(query); tracing::debug!(?req, "built request to execute"); println!("{}", req.send().await?.text().await?); @@ -236,8 +232,9 @@ async fn do_rpc( } async fn do_build(ctx: &mut crate::CliContext, build: &Build) -> anyhow::Result<()> { - let client = ctx.controlplane_client().await?; - let resolver = local_specs::Resolver { client }; + let resolver = local_specs::Resolver { + client: ctx.client.clone(), + }; let Build { db_path, @@ -299,8 +296,7 @@ async fn do_combine( ctx: &mut crate::CliContext, Combine { source, collection }: &Combine, ) -> anyhow::Result<()> { - let (_sources, validations) = - local_specs::load_and_validate(ctx.controlplane_client().await?, source).await?; + let (_sources, validations) = local_specs::load_and_validate(&ctx.client, source).await?; let collection = match validations .built_collections diff --git a/crates/flowctl/src/raw/oauth.rs b/crates/flowctl/src/raw/oauth.rs index 112e262e91..1736827d34 100644 --- a/crates/flowctl/src/raw/oauth.rs +++ b/crates/flowctl/src/raw/oauth.rs @@ -54,6 +54,9 @@ pub async fn do_oauth( injected_values, }: &Oauth, ) -> anyhow::Result<()> { + let Some(user_access_token) = &ctx.config.user_access_token else { + anyhow::bail!("This comment can only be run when authenticated"); + }; let source = build::arg_source_to_url(source, false)?; let draft = local_specs::surface_errors(local_specs::load(&source).await.into_result())?; @@ -175,13 +178,8 @@ pub async fn do_oauth( tracing::warn!( "Make sure that your application has {redirect_uri} set as an allowed redirect URL" ); - let api = ctx - .config - .api - .as_ref() - .expect("Cannot connect to edge functions"); - let mut oauth_endpoint = api.endpoint.clone(); + let mut oauth_endpoint = ctx.config.get_pg_url().clone(); oauth_endpoint.set_path("functions/v1/oauth"); #[derive(serde::Deserialize, serde::Serialize)] @@ -192,8 +190,8 @@ pub async fn do_oauth( let authorize_response_bytes = reqwest::Client::new() .post(oauth_endpoint.clone()) - .bearer_auth(api.access_token.to_owned()) - .header("apikey", api.public_token.to_owned()) + .bearer_auth(user_access_token) + .header("apikey", ctx.config.get_pg_public_token()) .json(&serde_json::json!({ "operation": "auth-url", "connector_config": { @@ -253,8 +251,8 @@ pub async fn do_oauth( let code_response = reqwest::Client::new() .post(oauth_endpoint) - .bearer_auth(api.access_token.to_owned()) - .header("apikey", api.public_token.to_owned()) + .bearer_auth(user_access_token) + .header("apikey", ctx.config.get_pg_public_token()) .json(&code_request_body) .send() .await?