Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jshearer/update invoice generator #1670

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/billing-integrations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
itertools = { workspace = true }
132 changes: 93 additions & 39 deletions crates/billing-integrations/src/stripe.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{bail, Context};
use chrono::{Duration, ParseError, Utc};
use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sqlx::Postgres;
use sqlx::{postgres::PgPoolOptions, types::chrono::NaiveDate, Pool};
Expand Down Expand Up @@ -106,7 +107,7 @@ struct LineItem {

#[derive(PartialEq, Eq, PartialOrd, Ord, Hash)]
enum InvoiceResult {
Created,
Created(PaymentProvider),
Updated,
LessThanMinimum,
FreeTier,
Expand All @@ -117,26 +118,43 @@ enum InvoiceResult {
}

impl InvoiceResult {
pub fn message(&self) -> &str {
pub fn message(&self) -> String {
match self {
InvoiceResult::Created => "Published new invoice",
InvoiceResult::Updated => "Updated existing invoice",
InvoiceResult::Created(provider) => {
if provider == &PaymentProvider::Stripe {
"Published new invoice".to_string()
} else {
format!("Published new invoice for tenant using {provider:?} provider")
}
}
InvoiceResult::Updated => "Updated existing invoice".to_string(),
InvoiceResult::LessThanMinimum => {
"Skipping invoice for less than the minimum chargable amount ($0.50)"
"Skipping invoice for less than the minimum chargable amount ($0.50)".to_string()
}
InvoiceResult::FreeTier => "Skipping usage invoice for tenant in free tier",
InvoiceResult::FreeTier => "Skipping usage invoice for tenant in free tier".to_string(),
InvoiceResult::FutureTrialStart => {
"Skipping invoice ending before free trial start date"
"Skipping invoice ending before free trial start date".to_string()
}
InvoiceResult::NoDataMoved => {
"Skipping invoice for tenant with no data movement".to_string()
}
InvoiceResult::NoDataMoved => "Skipping invoice for tenant with no data movement",
InvoiceResult::NoFullPipeline => {
"Skipping invoice for tenant without an active pipeline"
"Skipping invoice for tenant without an active pipeline".to_string()
}
InvoiceResult::Error => "Error publishing invoices",
InvoiceResult::Error => "Error publishing invoices".to_string(),
}
}
}

#[derive(
Serialize, Deserialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Copy, sqlx::Type,
)]
#[sqlx(type_name = "payment_provider_type", rename_all = "lowercase")]
enum PaymentProvider {
Stripe,
External,
}

#[derive(Serialize, Deserialize, Debug, Clone, sqlx::FromRow)]
struct Invoice {
subtotal: i64,
Expand All @@ -149,6 +167,7 @@ struct Invoice {
has_payment_method: Option<bool>,
capture_hours: Option<f64>,
materialization_hours: Option<f64>,
payment_provider: PaymentProvider,
}

impl Invoice {
Expand Down Expand Up @@ -462,7 +481,7 @@ impl Invoice {
if maybe_invoice.is_some() {
return Ok(InvoiceResult::Updated);
} else {
return Ok(InvoiceResult::Created);
return Ok(InvoiceResult::Created(self.payment_provider));
}
}
}
Expand Down Expand Up @@ -492,8 +511,10 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> {
extra as "extra: sqlx::types::Json<Option<Extra>>",
customer.has_payment_method as has_payment_method,
dataflow_hours.capture_hours::float as capture_hours,
dataflow_hours.materialization_hours::float as materialization_hours
dataflow_hours.materialization_hours::float as materialization_hours,
tenants.payment_provider as "payment_provider!: PaymentProvider"
from invoices_ext
left join tenants on tenants.tenant = billed_prefix
inner join lateral(
select bool_or("invoice_settings/default_payment_method" is not null) as has_payment_method
from stripe.customers
Expand Down Expand Up @@ -539,8 +560,10 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> {
extra as "extra: sqlx::types::Json<Option<Extra>>",
customer.has_payment_method as has_payment_method,
dataflow_hours.capture_hours::float as capture_hours,
dataflow_hours.materialization_hours::float as materialization_hours
dataflow_hours.materialization_hours::float as materialization_hours,
tenants.payment_provider as "payment_provider!: PaymentProvider"
from invoices_ext
left join tenants on tenants.tenant = billed_prefix
inner join lateral(
select bool_or("invoice_settings/default_payment_method" is not null) as has_payment_method
from stripe.customers
Expand Down Expand Up @@ -601,10 +624,11 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> {
match res {
Err(err) => {
let formatted = format!(
"Error publishing invoice for {tenant}",
"Error publishing {invoice_type:?} invoice for {tenant}",
tenant = response.billed_prefix,
invoice_type = response.invoice_type
);
bail!("{}\n{err:?}", formatted, err = err);
bail!("{}: {err:?}", formatted, err = err);
}
Ok(res) => {
tracing::debug!(
Expand All @@ -615,7 +639,7 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> {
res.message()
);
match res {
InvoiceResult::Created
InvoiceResult::Created(_)
| InvoiceResult::Updated
| InvoiceResult::Error => {}
// Remove any incorrectly created invoices that are now skipped for whatever reason
Expand All @@ -629,7 +653,13 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> {
.await?
{
Some(c) => c,
None => return Ok((res, response.subtotal)),
None => {
return Ok((
res,
response.subtotal,
response.billed_prefix.to_owned(),
))
}
};

let customer_id = customer.id.to_string();
Expand All @@ -647,45 +677,69 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> {
}
}
}
Ok((res, response.subtotal))
Ok((res, response.subtotal, response.billed_prefix.to_owned()))
}
}
}
.boxed()
})
.collect();

let collected: HashMap<_, _> = futures::stream::iter(invoice_futures)
// Let's run 10 `upsert_invoice()`s at a time
.buffer_unordered(10)
.or_else(|err| async move {
if !cmd.fail_fast {
tracing::error!("{}", err.to_string());
Ok((InvoiceResult::Error, 0))
} else {
Err(err)
}
})
.try_fold(HashMap::new(), |mut map, (res, subtotal)| async move {
let (subtotal_sum, count) = map.entry(res).or_insert((0, 0));
*subtotal_sum += subtotal;
*count += 1;
Ok(map)
})
.await?;
let collected: HashMap<InvoiceResult, (i64, i32, Vec<(String, i64)>)> =
futures::stream::iter(invoice_futures)
.buffer_unordered(5)
.or_else(|err| async move {
if !cmd.fail_fast {
tracing::error!("{}", err.to_string());
Ok((InvoiceResult::Error, 0, "".to_string()))
} else {
Err(err)
}
})
.try_fold(
HashMap::new(),
|mut map, (res, subtotal, tenant)| async move {
let (subtotal_sum, count, tenants) = map.entry(res).or_insert((0, 0, vec![]));
*subtotal_sum += subtotal;
*count += 1;
tenants.push((tenant, subtotal));
Ok(map)
},
)
.await?;

for (status, (subtotal_agg, count)) in collected.iter() {
for (status, (subtotal_agg, count, tenants)) in collected.iter() {
tracing::info!(
"[{:4} invoices]: {:70}${:.2}",
count,
status.message(),
*subtotal_agg as f64 / 100.0
);
let limit = match status {
InvoiceResult::Created(_) | InvoiceResult::Updated => 30,
InvoiceResult::NoDataMoved
| InvoiceResult::NoFullPipeline
| InvoiceResult::LessThanMinimum
| InvoiceResult::FreeTier => 0,
_ => 4,
};
let sorted_tenants = tenants
.iter()
.sorted_by(|(_, a), (_, b)| b.cmp(a))
.collect_vec();

let (displayed_tenants, remainder_tenants) =
sorted_tenants.split_at(limit.min(tenants.len()));
for (tenant, subtotal) in displayed_tenants {
tracing::info!(" - {:} ${:.2}", tenant, *subtotal as f64 / 100.0);
}
if limit > 0 && remainder_tenants.len() > 0 {
tracing::info!(" - ... {} Others", remainder_tenants.len(),);
}
}

Ok(())
}

#[tracing::instrument(skip(db_client))]
async fn get_tenant_trial_date(
db_client: &Pool<Postgres>,
Expand Down Expand Up @@ -721,7 +775,7 @@ async fn get_or_create_customer_for_tenant(
},
)
.await
.context("Searching for a customer")?;
.context(format!("Searching for tenant {tenant}"))?;

let customer = if let Some(customer) = customers.data.into_iter().next() {
tracing::debug!("Found existing customer {id}", id = customer.id.to_string());
Expand Down
Loading