-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
825 lines (715 loc) · 29.6 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
# coding=utf-8
"""
Scripty's Dashboard
Tested with Python 3.8.
"""
import datetime
import threading
import time
import typing
import uuid
import flask_discord
import jwt
import oauthlib.oauth2.rfc6749.errors
import requests
from babel.numbers import format_currency
from flask import Flask, render_template, request, redirect, url_for, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_discord import DiscordOAuth2Session, requires_authorization
import stripe
from sqlalchemy import text
import config
import pycountry
import discord_webhook
from werkzeug.exceptions import BadRequest, MethodNotAllowed, Forbidden
app = Flask(__name__)
session = requests.session()
if config.DEBUG:
import os
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
app.config["SQLALCHEMY_DATABASE_URI"] = config.DATABASE_URI_DEBUG
else:
app.config["SQLALCHEMY_DATABASE_URI"] = config.DATABASE_URI_PROD
app.config["DISCORD_CLIENT_ID"] = config.DISCORD_CLIENT_ID
app.config["DISCORD_CLIENT_SECRET"] = config.DISCORD_SECRET_KEY
app.config["DISCORD_REDIRECT_URI"] = config.SITE_URL + "/oauth/callback"
app.secret_key = config.SECRET_KEY
stripe.api_key = config.STRIPE_PRIVATE_API_KEY
discord = DiscordOAuth2Session(app)
db = SQLAlchemy(app)
COUNTRY_LIST = [
{"name": country.name, "code": country.alpha_2} for country in pycountry.countries
]
COUNTRY_LIST.sort(key=lambda country: country["name"])
PROVINCE_LIST = {
country["code"]: [province.name for province in pycountry.subdivisions.get(country_code=country["code"])]
for country in COUNTRY_LIST
}
for country in COUNTRY_LIST:
PROVINCE_LIST[country["code"]].sort(key=lambda province: province)
# random ID for tracking if a user completed the auth flow
AUTH_FLOW_MAP_LOCK = threading.Lock()
# AUTH_FLOW_MAP is a map of auth flow IDs to the timestamp of when they were created
# dispose of auth flow IDs that are older than 30 minutes
AUTH_FLOW_MAP: typing.Dict[str, float] = {}
# spawn a background thread that runs every 30 minutes to dispose of old auth flow IDs
def auth_flow_map_cleanup():
while True:
time.sleep(30 * 60)
with AUTH_FLOW_MAP_LOCK:
for auth_flow_id in list(AUTH_FLOW_MAP.keys()):
if AUTH_FLOW_MAP[auth_flow_id] < (time.time() - 30 * 60):
del AUTH_FLOW_MAP[auth_flow_id]
threading.Thread(target=auth_flow_map_cleanup, daemon=True).start()
DISCORD_INVITE_SUCCESS_WEBHOOK = lambda: discord_webhook.DiscordWebhook(
url=config.DISCORD_INVITE_SUCCESS_WEBHOOK_URL,
username="Scripty Invites",
)
class User(db.Model):
"""
A Discord-authenticated user.
# Fields
discord_id: int (primary key)
stripe_customer_id: Option[str]
stripe_subscription_id: Option[str]
subscribed: bool
"""
# noinspection SpellCheckingInspection
__tablename__ = "users"
discord_id = db.Column(db.BigInteger, primary_key=True)
stripe_customer_id = db.Column(db.String)
stripe_subscription_id = db.Column(db.String)
subscribed = db.Column(db.Boolean, default=False, nullable=False, server_default=text("false"))
free_trial_pending = db.Column(db.Boolean, default=False, nullable=False, server_default=text("false"))
def __repr__(self):
return f"<User {self.discord_id}>"
@app.route("/oauth/redirect")
def oauth_redirect():
"""
Redirect to the Discord OAuth2 authorization page.
"""
return discord.create_session(scope=["identify", "email"])
@app.route("/oauth/invite")
def oauth_invite():
"""
Redirects to the bot invite page on Discord.
"""
oauth_data = {}
scopes = ["bot", "identify"]
# generate the random ID for tracking if a user completed the auth flow
# don't do this if the no_flow parameter is set to true
if request.args.get("no_flow") != "1":
auth_flow_id = uuid.uuid4().hex
with AUTH_FLOW_MAP_LOCK:
AUTH_FLOW_MAP[auth_flow_id] = time.time()
else:
auth_flow_id = "None"
oauth_data["flow_id"] = auth_flow_id
# check if the user requested being added to the support server
if request.args.get("support_server") == "1":
# additionally request the guilds.join scope
scopes.append("guilds.join")
# and add a query parameter to the redirect URL
oauth_data["support_server"] = "1"
else:
oauth_data["support_server"] = "0"
return discord.create_session(scope=scopes, permissions=config.BOT_PERMISSIONS_INTEGER,
data=oauth_data)
@app.route("/oauth/callback")
def oauth_callback():
"""
Callback from the Discord OAuth2 authorization page.
"""
try:
cb = discord.callback()
except (jwt.exceptions.PyJWTError, oauthlib.oauth2.rfc6749.errors.OAuth2Error):
raise BadRequest("Invalid OAuth2 data: don't mess with the URL parameters!")
except flask_discord.AccessDenied:
return render_template("oauth_rejected.html")
# check if the user requested being added to the support server
if cb.get("support_server") == "1":
# add the user to the support server
discord.fetch_user().add_to_guild(config.SUPPORT_SERVER_ID)
if flow_id := cb.get("flow_id"):
if flow_id != "None":
with AUTH_FLOW_MAP_LOCK:
if flow_id in AUTH_FLOW_MAP:
del AUTH_FLOW_MAP[flow_id]
# fire the discord webhook
hook = DISCORD_INVITE_SUCCESS_WEBHOOK()
hook.set_content(
f"Got invited to a new server! Server ID {request.args.get('guild_id')}."
)
hook.execute()
# if the user is coming from the bot invite, redirect to the setup page
# be sure to add the extra query parameters discord sends us (guild_id and permissions)
return redirect(
url_for(
"bot_setup",
guild_id=request.args.get("guild_id"),
permissions=request.args.get("permissions"),
)
)
if config.DEBUG:
user = discord.fetch_user()
if user.id != 661660243033456652:
discord.revoke()
raise Forbidden("In test mode, rejecting all other users except the bot owner.")
return redirect("/dashboard")
@app.route("/setup")
def bot_setup():
"""
Gives the user initial "getting started" steps to using the bot.
"""
no_issues = True
if (permissions := request.args.get("permissions")) is not None:
full_permissions = int(permissions)
# calculate if any permissions are missing
missing_permissions = full_permissions ^ config.BOT_PERMISSIONS_INTEGER
# if any are missing, set a flag to show the user
missing_permissions_flag = missing_permissions != 0
# check if critical permissions are missing
# these include: MANAGE_WEBHOOKS, READ_MESSAGES, SEND_MESSAGES, EMBED_LINKS, CONNECT
# humanize the names of the permissions
major_missing = []
if missing_permissions & 536870912:
major_missing.append("Manage Webhooks")
if missing_permissions & 1024:
major_missing.append("Read Messages")
if missing_permissions & 2048:
major_missing.append("Send Messages")
if missing_permissions & 16384:
major_missing.append("Embed Links")
if missing_permissions & 1048576:
major_missing.append("Connect")
warn_no_permissions = False
else:
missing_permissions_flag = False
major_missing = []
warn_no_permissions = True
if missing_permissions_flag or len(major_missing) != 0 or warn_no_permissions:
no_issues = False
return render_template(
"setup.html",
missing_permissions_flag=missing_permissions_flag,
major_missing=major_missing,
warn_no_permissions=warn_no_permissions,
no_issues=no_issues,
)
@app.route("/dashboard")
@requires_authorization
def dashboard_index():
return redirect(url_for("premium_index"))
@app.route("/premium")
@requires_authorization
def premium_index():
user = discord.fetch_user()
if user.email is None:
# need an email to continue
return redirect(url_for("oauth_redirect"))
# fetch user from the DB
user_db = User.query.filter_by(discord_id=user.id).first()
if user_db is None:
# create the user with no stripe customer ID
user_db = User(discord_id=user.id)
db.session.add(user_db)
db.session.commit()
# fetch the currency from what was passed in the URL query parameters
currency = request.args.get("currency", "usd")
# fetch products from stripe
products = stripe.Product.list()
# filter to only those with a metadata field of "tier"
products = [product for product in products.auto_paging_iter() if "tier" in product.metadata]
# sort by tier, lowest to highest
products.sort(key=lambda product: int(product.metadata["tier"]))
# fetch the prices for each product, depending on the currency
prices = []
for product in products:
product_price_map = {}
try:
product_prices = stripe.Price.list(product=product.id, currency=currency, active=True)
# if the currency is invalid, return a 400
except stripe.error.InvalidRequestError:
raise BadRequest("(Likely) Invalid currency.")
# no prices for this product in this currency, return a 400
if len(product_prices) == 0:
raise BadRequest("No prices for this currency, try picking a supported one.")
for price in product_prices.auto_paging_iter():
product_price_map[price.recurring.interval] = {
"formatted_price": format_currency(price.unit_amount / 100, currency.upper()),
"price_id": price.id
}
# parse the feature list from the metadata
features = product.metadata.get("features", "").split(";")
# add the prices to the prices dict
prices.append({"name": product.name, "prices": product_price_map, "features": features})
return render_template(
"premium.html",
currencies=["USD", "EUR", "GBP", "CAD"],
active_currency=currency.upper(),
user=f"{user.username}#{user.discriminator}",
stripe_subscription_id=user_db.stripe_subscription_id,
prices=prices,
pending_free_trial=user_db.free_trial_pending,
)
@app.route("/premium/stripe_redirect")
@requires_authorization
def premium_stripe_redirect():
"""
Redirect to the Stripe billing portal.
"""
user = discord.fetch_user()
# fetch user from the DB
user_db = User.query.filter_by(discord_id=user.id).first()
if user_db is None or user_db.stripe_customer_id is None:
raise BadRequest("You must have a subscription to access the billing portal.")
portal_session = stripe.billing_portal.Session.create(
customer=user_db.stripe_customer_id,
return_url=f"{config.SITE_URL}/premium",
)
return redirect(portal_session.url, 303)
@app.route("/premium/checkout/redirect", methods=["POST"])
@requires_authorization
def premium_checkout_redirect():
user = discord.fetch_user()
user_email = user.email
price_id = request.form["price_id"]
# try fetching the user from the DB
user_db = User.query.filter_by(discord_id=user.id).first()
if user_db is None:
# create the user in the DB
user_db = User(discord_id=user.id)
db.session.add(user_db)
db.session.commit()
subscription_data = {
"metadata": {
"discord_id": user.id,
}
}
if user_db.free_trial_pending:
# add it to the session (only if we've selected a tier 1 product)
price = stripe.Price.retrieve(
price_id,
expand=["product"],
)
tier = price.product.metadata.get("tier")
try:
tier = int(tier)
except ValueError:
raise BadRequest("Invalid tier.")
if tier == 1:
subscription_data["trial_period_days"] = 3
subscription_data["trial_settings"] = {
"end_behavior": {
"missing_payment_method": "cancel"
},
}
# we unset the free trial pending flag in the webhook
# create a checkout session
checkout_session = stripe.checkout.Session.create(
line_items=[
{
"price": price_id,
"quantity": 1,
},
],
customer_email=user_email,
automatic_tax={
'enabled': True,
},
mode="subscription",
allow_promotion_codes=True,
billing_address_collection="required",
consent_collection={
"terms_of_service": "required",
},
success_url=config.SITE_URL + "/premium/success?session_id={CHECKOUT_SESSION_ID}",
cancel_url=config.SITE_URL + "/premium",
subscription_data=subscription_data,
metadata={
"discord_id": user.id,
}
)
return redirect(checkout_session.url, code=303)
@app.route("/premium/success")
@requires_authorization
def premium_success():
checkout_session_id = request.args.get("session_id")
if checkout_session_id is None:
raise BadRequest("Missing session ID.")
checkout_session = stripe.checkout.Session.retrieve(checkout_session_id)
# add the customer ID to the DB
user = discord.fetch_user()
user_db = User.query.filter_by(discord_id=user.id).first()
if user_db is None:
user_db = User(discord_id=user.id, stripe_customer_id=checkout_session.customer)
db.session.add(user_db)
db.session.commit()
else:
user_db.stripe_customer_id = checkout_session.customer
db.session.commit()
return render_template("premium_success.html")
@app.route("/premium/create_free_trial", methods=["POST"])
@requires_authorization
def premium_create_free_trial():
# check if the authenticated user has admin permissions
user = discord.fetch_user()
discord_id = user.id
if discord_id not in config.ADMIN_IDS:
raise Forbidden("You are not an admin.")
target_id = request.form["discord_id"]
# TODO: make API call to bot to check if the user has already been given a free trial
user_id = User.query.filter_by(discord_id=target_id).first()
if user_id is None:
user_id = User(discord_id=target_id, free_trial_pending=True)
db.session.add(user_id)
else:
user_id.free_trial_pending = True
db.session.commit()
return redirect("/admin/dashboard", 303)
@app.route('/stripe_webhook', methods=['POST'])
def webhook_received():
# Retrieve the event by verifying the signature using the raw body and secret
signature = request.headers.get('stripe-signature')
try:
root_event = stripe.Webhook.construct_event(
payload=request.data, sig_header=signature, secret=config.STRIPE_WEBHOOK_SECRET)
data = root_event['data']
except Exception as e:
return e
# Get the type of webhook event sent - used to check the status of PaymentIntents.
event_type = root_event['type']
data_object = data['object']
data_previous = data.get('previous_attributes', {})
is_live = root_event["livemode"]
print('event ' + event_type)
json_model = None
if event_type == 'customer.subscription.trial_will_end':
print('Subscription trial will end at', data_object["current_period_end"])
print("Customer ID", data_object['customer'])
print("Status", data_object["status"])
# Get the user from the DB
# If we're not in live mode, fake a user ID of 0 if none exists
user_db = User.query.filter_by(stripe_customer_id=data_object['customer']).first()
if user_db is None:
if not is_live:
discord_id = 661660243033456652
else:
return jsonify({'status': 'success'})
else:
discord_id = user_db.discord_id
json_model = {
"user_id": discord_id,
"live_mode": is_live,
"event": {
"t": "customer.subscription.trial_will_end",
"d": {
"trial_end": data_object["trial_end"]
}
}
}
elif event_type == 'customer.subscription.created':
print('Subscription created')
print("Product ID", data_object['plan']['product'])
print("Customer ID", data_object['customer'])
print("Subscription ID", data_object["items"]["data"][0]["id"])
print("Status", data_object["status"])
# Get the user from the DB
user_db = User.query.filter_by(stripe_customer_id=data_object['customer']).first()
if user_db is None:
# try fetching it from the subscription's metadata
try:
discord_id = int(data_object["metadata"]["discord_id"])
# try fetching the user from the DB again with their Discord ID
user_db = User.query.filter_by(discord_id=discord_id).first()
if user_db is None:
# create them
user_db = User(discord_id=discord_id, stripe_customer_id=data_object['customer'])
# update the user in the DB
user_db.free_trial_pending = False
user_db.subscribed = True
user_db.stripe_customer_id = data_object['customer'] # avoid bad state
user_db.stripe_subscription_id = data_object["items"]["data"][0]["id"]
db.session.add(user_db)
db.session.commit()
except KeyError:
if is_live:
# no metadata, so we can't do anything
return jsonify({'status': 'success'})
else:
discord_id = config.DEBUG_FALLBACK_ID
else:
# Update the user in the DB
user_db.free_trial_pending = False
user_db.subscribed = True
user_db.stripe_customer_id = data_object['customer'] # avoid bad state
user_db.stripe_subscription_id = data_object["items"]["data"][0]["id"]
db.session.commit()
discord_id = user_db.discord_id
# if the subscription is not active, we don't care about it yet
if data_object["status"] not in ["active", "trialing"]:
return jsonify({'status': 'success'})
# To grab the metadata, we need to grab the plan's product ID, then grab the metadata from that
product_id = data_object["plan"]["product"]
product = stripe.Product.retrieve(product_id)
tier = int(product["metadata"]["tier"])
is_trial = data_object["status"] == "trialing"
# Prepare the event object
json_model = {
"user_id": discord_id,
"live_mode": is_live,
"event": {
"t": "customer.subscription.created",
"c": {
"tier": tier,
"is_trial": is_trial,
"trial_end": data_object.get("trial_end"),
}
}
}
elif event_type == 'customer.subscription.updated':
print('Subscription created', root_event.id)
print("Product ID", data_object['plan']['product'])
print("Customer ID", data_object['customer'])
print("Status", data_object["status"])
# if data["previous_attributes"] contains only *exactly* current_period_start, current_period_end,
# and latest_invoice, then the subscription was renewed
keys = ["current_period_start", "current_period_end", "latest_invoice"]
is_renewed = len(data_previous) == len(keys) and all(
key in data_previous for key in keys)
# if data["previous_attributes"]["plan"]["interval"] is not equal to data["plan"]["interval"], then
# the subscription has changed length
current_interval = data_object["plan"]["interval"]
try:
is_length_change = data_previous["plan"]["interval"] != current_interval
except KeyError:
is_length_change = False
# if data["previous_attributes"]["status"] was not "active" or "trialing," and data["status"] is "active," then
# the subscription has succeeded
currently_active = data_object["status"] in ["active", "trialing"]
try:
is_new = data_previous["status"] not in ["active", "trialing"] and currently_active
except KeyError:
is_new = False
# if data["previous_attributes"]["status"] was "trialing", and data["status"] is "active", then
# the subscription has been converted from a trial to a paid subscription
try:
trial_finished = data_previous["status"] == "trialing" and data_object["status"] == "active"
except KeyError:
trial_finished = False
# if the previous plan's product ID is not equal to the current plan's product ID, then the subscription
# has changed tiers
current_tid = data_object["plan"]["product"]
try:
is_tier_change = data_previous["plan"]["product"] != current_tid
except KeyError:
is_tier_change = False
# Get the user from the DB
# If we're not in live mode, fake a user ID of 0 if none exists
user_db = User.query.filter_by(stripe_customer_id=data_object['customer']).first()
if user_db is None:
if not is_live:
if is_renewed:
print("subscription was renewed")
discord_id = config.DEBUG_FALLBACK_ID
else:
# try falling back to metadata
try:
discord_id = int(data_object["metadata"]["discord_id"])
except KeyError:
# no metadata, so we can't do anything
return jsonify({'status': 'success'})
else:
discord_id = user_db.discord_id
# To grab the metadata, we need to grab the plan's product ID, then grab the metadata from that
product_id = data_object["plan"]["product"]
product = stripe.Product.retrieve(product_id)
try:
tier = int(product["metadata"]["tier"])
except KeyError:
# not a tiered product
return jsonify({'status': 'success'})
# Prepare the event object
json_model = {
"user_id": discord_id,
"live_mode": is_live,
"event": {
"t": "customer.subscription.updated",
"c": {
"tier": tier,
"status": data_object["status"],
"cancel_at_period_end": data_object["cancel_at_period_end"],
"current_period_start": data_object["current_period_start"],
"current_period_end": data_object["current_period_end"],
"trial_end": data_object["trial_end"],
"is_renewal": is_renewed,
"is_length_change": is_length_change,
"is_new": is_new,
"is_tier_change": is_tier_change,
"trial_finished": trial_finished,
"interval": current_interval,
}
}
}
elif event_type == 'customer.subscription.deleted':
print('Subscription canceled', root_event.id)
print("Product ID", data_object['plan']['product'])
print("Customer ID", data_object['customer'])
print("Ends at", data_object["cancel_at"])
print("Status", data_object["status"])
# Get the user from the DB
# If we're not in live mode, fake a user ID of 0 if none exists
user_db = User.query.filter_by(stripe_customer_id=data_object['customer']).first()
if user_db is None:
if not is_live:
discord_id = config.DEBUG_FALLBACK_ID
else:
return jsonify({'status': 'success'})
else:
# Delete the user's subscription ID from the DB
user_db.stripe_subscription_id = None
db.session.commit()
discord_id = user_db.discord_id
# To grab the metadata, we need to grab the plan's product ID, then grab the metadata from that
product_id = data_object["plan"]["product"]
product = stripe.Product.retrieve(product_id)
try:
tier = int(product["metadata"]["tier"])
except KeyError:
# not a tiered product
return jsonify({'status': 'success'})
# Prepare the event object
json_model = {
"user_id": discord_id,
"live_mode": is_live,
"event": {
"t": "customer.subscription.deleted",
"c": {
"tier": tier,
}
}
}
elif event_type == "radar.early_fraud_warning":
print('Early fraud warning', root_event.id)
print("Charge ID", data_object['charge'])
print("Reason", data_object["fraud_type"])
if data_object["actionable"]:
# reverse the charge and fire the cancelled subscription event
charge = stripe.Charge.retrieve(data_object['charge'], expand=["customer", "customer.subscriptions"])
stripe.Refund.create(charge=charge.id)
print("Charge reversed")
if (subscription := charge.customer.subscriptions.data.get(0)) is not None:
stripe.Subscription.delete(subscription.id)
print("Subscription cancelled")
# delete the subscription ID from the DB too
user_db = User.query.filter_by(stripe_customer_id=charge.customer.id).first()
if user_db is not None:
user_db.stripe_subscription_id = None
db.session.commit()
# no event to fire here, as Stripe will fire subscription.deleted
elif event_type == "customer.source.expiring":
print('Card expiring', root_event.id)
print("Customer ID", data_object['customer'])
# Get the user from the DB
# If we're not in live mode, fake a user ID of 0 if none exists
user_db = User.query.filter_by(stripe_customer_id=data_object['customer']).first()
if user_db is None:
if not is_live:
discord_id = config.DEBUG_FALLBACK_ID
else:
return jsonify({'status': 'success'})
else:
discord_id = user_db.discord_id
# Prepare the event object
json_model = {
"user_id": discord_id,
"live_mode": is_live,
"event": {
"t": "customer.source.expiring",
"d": {
"brand": data_object["source"]["brand"],
"last4": data_object["source"]["last4"],
}
}
}
elif event_type == "charge.dispute.created":
print('Dispute created', root_event.id)
print("Charge ID", data_object['charge'])
# We don't have a customer ID here, so we can't get the user from the DB
# fetch the charge and get the user ID from the metadata
charge = stripe.Charge.retrieve(data_object['charge'])
stripe_customer_id = charge.customer
if stripe_customer_id is None:
print("Got a dispute for a charge with no customer ID")
return jsonify({'status': 'success'})
user_db = User.query.filter_by(stripe_customer_id=stripe_customer_id).first()
if user_db is None:
if not is_live:
discord_id = config.DEBUG_FALLBACK_ID
else:
return jsonify({'status': 'success'})
else:
discord_id = user_db.discord_id
# Prepare the event object
json_model = {
"user_id": discord_id,
"live_mode": is_live,
"event": {
"t": "charge.dispute.created",
"d": {} # no data to send here since we don't need it
}
}
elif event_type == "customer.deleted":
# fetch the customer ID from the event
stripe_customer_id = data_object["id"]
# delete the user from the DB
user_db = User.query.filter_by(stripe_customer_id=stripe_customer_id).first()
if user_db is not None:
db.session.delete(user_db)
db.session.commit()
# no event to fire here, as Stripe will fire customer.subscription.deleted
else:
# no need to handle this event
pass
if json_model is not None:
print("Firing bot notification")
print(json_model)
resp = session.post(
f"{config.BOT_API_URL}/premium/stripe_webhook",
json=json_model,
headers={"Authorization": config.BOT_API_TOKEN}
)
# if not successful, log the error
if resp.status_code != 200:
print(resp.status_code, resp.text)
else:
print("No webhook fired")
return jsonify({'status': 'success'})
@app.route("/logout")
def logout():
"""
Logout the user.
"""
discord.revoke()
# if there's a redir parameter, redirect to that page
redir = request.args.get("redir")
if redir is None:
redir = "/"
return redirect(redir)
@app.route("/")
def index():
return redirect(url_for("premium_index"))
@app.route("/api/provinces")
def province_list():
country = request.args.get("country")
if country is None:
raise BadRequest("No country found")
try:
return jsonify(PROVINCE_LIST[country])
except KeyError:
raise BadRequest("Invalid country")
@app.errorhandler(flask_discord.Unauthorized)
def handle_unauthorized(_):
return redirect(url_for("oauth_redirect"))
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=3000)