Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove PubSub #233

Merged
merged 15 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions backend/.env-template
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,4 @@ OAUTH_REDIRECT_URI=abc123

GOOGLE_APPLICATION_CREDENTIALS=abc123

PUBSUB_TOKEN=abc123

MONGODB_PASSWORD=abc123
6 changes: 3 additions & 3 deletions backend/delete_old_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
from datetime import datetime
from typing import Any

import asyncio
from dotenv import find_dotenv, load_dotenv
from datetime import datetime

load_dotenv(find_dotenv())

Expand All @@ -29,7 +29,7 @@ async def count_old_rows(cutoff_date: datetime) -> int:

async def delete_old_rows(cutoff_date: datetime):
filters = get_filters(cutoff_date)
result = await USER_MONTHS.delete_many(filters) # type: ignore
result = await USER_MONTHS.delete_many(filters)
print(f"Deleted {result.deleted_count} rows")


Expand Down
1 change: 0 additions & 1 deletion backend/deploy/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ steps:
args: ["run", "create-env"]
dir: "backend"
env:
- "PUBSUB_TOKEN=${_PUBSUB_TOKEN}"
- "OAUTH_CLIENT_ID=${_OAUTH_CLIENT_ID}"
- "OAUTH_CLIENT_SECRET=${_OAUTH_CLIENT_SECRET}"
- "OAUTH_REDIRECT_URI=${_OAUTH_REDIRECT_URI}"
Expand Down
11 changes: 0 additions & 11 deletions backend/deploy/pubsub.Dockerfile

This file was deleted.

616 changes: 195 additions & 421 deletions backend/poetry.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ motor = "^3.3.1"
aiofiles = "^23.2.1"
aiounittest = "^1.4.2"
coveralls = "^3.3.1"
google-cloud-pubsub = "^2.18.4"
grpcio = "^1.59.2"
gunicorn = "^21.2.0"
pymongo = {extras = ["srv"], version = "^4.6.0"}
Expand Down
27 changes: 7 additions & 20 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ aiofiles==23.2.1
aiounittest==1.4.2
annotated-types==0.6.0
anyio==3.7.1
cachetools==5.3.2
certifi==2023.7.22
certifi==2023.11.17
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
Expand All @@ -12,39 +11,27 @@ coveralls==3.3.1
dnspython==2.4.2
docopt==0.6.2
fastapi==0.104.1
google-api-core[grpc]==2.14.0
google-auth==2.23.4
google-cloud-pubsub==2.18.4
googleapis-common-protos==1.61.0
googleapis-common-protos[grpc]==1.61.0
grpc-google-iam-v1==0.12.7
grpcio-status==1.59.2
grpcio==1.59.2
grpcio==1.59.3
gunicorn==21.2.0
h11==0.14.0
httptools==0.6.1
idna==3.4
motor==3.3.1
motor==3.3.2
packaging==23.2
proto-plus==1.22.3
protobuf==4.25.0
pyasn1-modules==0.3.0
pyasn1==0.5.0
pydantic-core==2.10.1
pydantic==2.4.2
pydantic-core==2.14.5
pydantic==2.5.2
pymongo==4.6.0
pymongo[srv]==4.6.0
python-dotenv==1.0.0
pytz==2023.3.post1
pyyaml==6.0.1
requests==2.31.0
rsa==4.9
sentry-sdk==1.34.0
sentry-sdk==1.36.0
sniffio==1.3.0
starlette==0.27.0
svgwrite==1.4.3
typing-extensions==4.8.0
urllib3==2.0.7
urllib3==2.1.0
uvicorn[standard]==0.24.0.post1
uvloop==0.19.0
watchfiles==0.21.0
Expand Down
3 changes: 3 additions & 0 deletions backend/src/aggregation/layer0/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from src.aggregation.layer0.package import get_user_data

__all__ = ["get_user_data"]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytz

from src.aggregation.layer0.languages import CommitLanguages, get_commit_languages
from src.constants import (
GRAPHQL_NODE_CHUNK_SIZE,
GRAPHQL_NODE_THREADS,
Expand All @@ -29,10 +30,6 @@
get_repo_commits,
)
from src.models import UserContributions
from src.subscriber.aggregation.user.languages import (
CommitLanguages,
get_commit_languages,
)
from src.utils import date_to_datetime, gather


Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import date
from typing import Optional

from src.aggregation.layer0.contributions import get_contributions
from src.models import UserPackage
from src.subscriber.aggregation.user.contributions import get_contributions

# from src.subscriber.aggregation.user.follows import get_user_follows

Expand Down
3 changes: 3 additions & 0 deletions backend/src/aggregation/layer1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from src.aggregation.layer1.user import query_user

__all__ = ["query_user"]
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import timedelta
from typing import List
from typing import List, Tuple

from src.constants import OWNER, REPO
from src.data.github.rest import (
Expand Down Expand Up @@ -35,7 +35,7 @@ async def get_valid_db_user(user_id: str) -> bool:
@alru_cache(ttl=timedelta(minutes=15))
async def get_repo_stargazers(
owner: str = OWNER, repo: str = REPO, no_cache: bool = False
) -> List[str]:
) -> Tuple[bool, List[str]]:
access_token = get_access_token()
data: List[str] = []
page = 0
Expand All @@ -45,7 +45,7 @@ async def get_repo_stargazers(
data.extend(temp_data)
page += 1

return (True, data) # type: ignore
return (True, data)


async def get_user_stars(user_id: str) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
from calendar import monthrange
from datetime import date, datetime, timedelta
from typing import List, Optional
from typing import List, Optional, Tuple

import requests

from src.constants import API_VERSION, BACKEND_URL, DOCKER, LOCAL_PUBLISHER, PROD
from src.aggregation.layer0.package import get_user_data
from src.constants import API_VERSION, BACKEND_URL, PROD
from src.data.github.graphql import GraphQLErrorRateLimit
from src.data.mongo.secret import update_keys
from src.data.mongo.user_months import UserMonth, get_user_months, set_user_month
from src.models.user.main import UserPackage
from src.subscriber.aggregation import get_user_data
from src.utils import alru_cache, date_to_datetime

s = requests.Session()

# NOTE: query user from PubSub, not from subscriber user router

# Formerly the subscriber, compute and save new data here


async def query_user_month(
Expand Down Expand Up @@ -72,7 +73,7 @@ async def query_user(
start_date: date = date.today() - timedelta(365),
end_date: date = date.today(),
no_cache: bool = False,
) -> UserPackage:
) -> Tuple[bool, UserPackage]:
# Return (possibly incomplete) within 45 seconds
start_time = datetime.now()
incomplete = False
Expand All @@ -85,17 +86,17 @@ async def query_user(
curr_months = [x.month for x in curr_data if x.complete]

month, year = start_date.month, start_date.year
months: List[date] = []
new_months: List[date] = []
while date(year, month, 1) <= end_date:
start = date(year, month, 1)
if date_to_datetime(start) not in curr_months:
months.append(start)
new_months.append(start)
month = month % 12 + 1
year = year + (month == 1)

# Start with complete months and add any incomplete months
all_user_packages: List[UserPackage] = [x.data for x in curr_data if x.complete]
for month in months:
for month in new_months:
if datetime.now() - start_time < timedelta(seconds=40):
temp = await query_user_month(user_id, access_token, private_access, month)
if temp is not None:
Expand All @@ -112,14 +113,12 @@ async def query_user(
out += user_package
out.incomplete = incomplete

if incomplete or len(months) > 1:
if incomplete or len(new_months) > 1:
# cache buster for publisher
if PROD:
s.get(f"{BACKEND_URL}/user/{user_id}?no_cache=True")
elif DOCKER:
s.get(f"{LOCAL_PUBLISHER}/user/{user_id}?no_cache=True")

return (False, out) # type: ignore
return (False, out)

# only cache if just the current month updated
return (True, out) # type: ignore
return (True, out)
4 changes: 4 additions & 0 deletions backend/src/aggregation/layer2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from src.aggregation.layer2.auth import get_is_valid_user
from src.aggregation.layer2.user import get_user, get_user_demo

__all__ = ["get_is_valid_user", "get_user", "get_user_demo"]
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from datetime import timedelta
from typing import Tuple

from src.constants import OWNER, REPO
from src.data.github.rest import RESTError
from src.subscriber.aggregation import (
from src.aggregation.layer1.auth import (
get_repo_stargazers,
get_user_stars,
get_valid_db_user,
get_valid_github_user,
)
from src.constants import OWNER, REPO
from src.data.github.rest import RESTError
from src.utils import alru_cache

USER_WHITELIST = [
Expand Down Expand Up @@ -43,17 +44,17 @@ async def check_user_starred_repo(


@alru_cache(ttl=timedelta(hours=1))
async def get_is_valid_user(user_id: str) -> str:
async def get_is_valid_user(user_id: str) -> Tuple[bool, str]:
if user_id in USER_WHITELIST:
return (True, "Valid user") # type: ignore
return (True, "Valid user")

valid_github_user = await check_github_user_exists(user_id)
if not valid_github_user:
return (False, "GitHub user not found") # type: ignore
return (False, "GitHub user not found")

valid_db_user = await check_db_user_exists(user_id)
user_starred = await check_user_starred_repo(user_id)
if not (user_starred or valid_db_user):
return (False, "Repo not starred") # type: ignore
return (False, "Repo not starred")

return (True, "Valid user") # type: ignore
return (True, "Valid user")
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
from datetime import date, timedelta
from typing import Optional
from typing import Optional, Tuple

from src.aggregation.layer0 import get_user_data
from src.data.mongo.secret.functions import update_keys
from src.data.mongo.user import PublicUserModel, get_public_user as db_get_public_user
from src.data.mongo.user_months import get_user_months
from src.models import UserPackage
from src.publisher.processing.pubsub import publish_user

# TODO: replace with call to subscriber so compute not on publisher
from src.subscriber.aggregation import get_user_data
from src.models.background import UpdateUserBackgroundTask
from src.utils import alru_cache


@alru_cache()
async def update_user(
user_id: str, access_token: Optional[str], private_access: bool
) -> bool:
"""Sends a message to pubsub to request a user update (auto cache updates)"""
await publish_user(user_id, access_token, private_access)
return (True, True) # type: ignore
# Formerly the publisher, loads existing data here


async def _get_user(
Expand All @@ -42,23 +33,26 @@ async def get_user(
start_date: date,
end_date: date,
no_cache: bool = False,
) -> Optional[UserPackage]:
) -> Tuple[bool, Tuple[Optional[UserPackage], Optional[UpdateUserBackgroundTask]]]:
user: Optional[PublicUserModel] = await db_get_public_user(user_id)
if user is None:
return (False, None) # type: ignore
return (False, (None, None))

private_access = user.private_access or False
await update_user(user_id, user.access_token, private_access)
user_data = await _get_user(user_id, private_access, start_date, end_date)
return (user_data is not None, user_data) # type: ignore
background_task = UpdateUserBackgroundTask(
user_id=user_id, access_token=user.access_token, private_access=private_access
)
return (user_data is not None, (user_data, background_task))


@alru_cache(ttl=timedelta(minutes=15))
async def get_user_demo(
user_id: str, start_date: date, end_date: date, no_cache: bool = False
) -> UserPackage:
) -> Tuple[bool, UserPackage]:
await update_keys()
timezone_str = "US/Eastern"
# recompute/cache but don't save to db
data = await get_user_data(
user_id=user_id,
start_date=start_date,
Expand All @@ -67,4 +61,4 @@ async def get_user_demo(
access_token=None,
catch_errors=True,
)
return (True, data) # type: ignore
return (True, data)
16 changes: 7 additions & 9 deletions backend/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
BLACKLIST = ["Jupyter Notebook", "HTML"] # languages to ignore

# OAUTH
OAUTH_CLIENT_ID = os.getenv("OAUTH_CLIENT_ID", "") # client ID for GitHub OAuth App
OAUTH_CLIENT_SECRET = os.getenv("OAUTH_CLIENT_SECRET", "") # client secret for App
OAUTH_REDIRECT_URI = os.getenv("OAUTH_REDIRECT_URI", "") # redirect uri for App

# PUBSUB
PUBSUB_TOKEN = os.getenv("PUBSUB_TOKEN", "")
# LOCAL_SUBSCRIBER = "http://" + ("subscriber" if DOCKER else "localhost") + ":8001"
LOCAL_SUBSCRIBER = "http://backend:8000" if DOCKER else BACKEND_URL
LOCAL_PUBLISHER = "http://backend:8000" if DOCKER else BACKEND_URL
prefix = "PROD" if PROD else "DEV"
# client ID for GitHub OAuth App
OAUTH_CLIENT_ID = os.getenv(f"{prefix}_OAUTH_CLIENT_ID", "")
# client secret for App
OAUTH_CLIENT_SECRET = os.getenv(f"{prefix}_OAUTH_CLIENT_SECRET", "")
# redirect uri for App
OAUTH_REDIRECT_URI = os.getenv(f"{prefix}_OAUTH_REDIRECT_URI", "")


# MONGODB
Expand Down
2 changes: 1 addition & 1 deletion backend/src/data/github/auth/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_unknown_user(access_token: str) -> Optional[str]:
}

r = s.get("https://api.github.com/user", params={}, headers=headers)
return r.json().get("login", None) # type: ignore
return r.json().get("login", None)


class OAuthError(Exception):
Expand Down
Loading
Loading