-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
922783f
commit fa6a8ee
Showing
15 changed files
with
126 additions
and
116 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,106 +1,125 @@ | ||
# app/main.py | ||
from fastapi.middleware.cors import CORSMiddleware # Ensure this is included | ||
from fastapi import FastAPI, Depends, Request, Form, HTTPException | ||
from fastapi.responses import RedirectResponse, HTMLResponse | ||
from fastapi.templating import Jinja2Templates | ||
from sqlalchemy.orm import Session | ||
from starlette.middleware.base import BaseHTTPMiddleware | ||
from fastapi.staticfiles import StaticFiles | ||
from contextlib import asynccontextmanager # Add this line | ||
import logging | ||
|
||
# Import your auth, crud, database, and routers here | ||
from app.auth import authenticate_user, get_current_user, get_user_id_from_cookie | ||
from app.crud import create_user | ||
from app.init_db import init_db | ||
from app.dependencies import get_db | ||
from app.database import init_db, get_db | ||
from app.routers import user as user_router | ||
from app.routers import admin as admin_router | ||
from app.database import Base, engine | ||
from fastapi.staticfiles import StaticFiles | ||
from contextlib import asynccontextmanager | ||
from starlette.middleware.base import BaseHTTPMiddleware | ||
import logging | ||
|
||
# Logging configuration | ||
logging.basicConfig(level=logging.INFO) | ||
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) | ||
|
||
# Initialize Jinja2 templates | ||
templates = Jinja2Templates(directory="app/templates") | ||
|
||
# Create FastAPI app | ||
app = FastAPI() | ||
|
||
# Async context manager to handle the lifespan of the app (e.g., database connections) | ||
@asynccontextmanager | ||
async def lifespan(app: FastAPI): | ||
init_db() # Initialize database connection or other startup tasks | ||
init_db() # Initialize the database on app startup | ||
yield # This is where the app runs | ||
|
||
@asynccontextmanager | ||
async def get_db_session(): | ||
"""Session generator for database interactions""" | ||
db = next(get_db()) # Use the get_db() dependency for getting the session | ||
|
||
# Middleware to log and retrieve user from cookie | ||
@app.middleware("http") | ||
async def dispatch(request: Request, call_next): | ||
try: | ||
yield db | ||
finally: | ||
db.close() | ||
# Allow access to the login page | ||
if request.url.path == "/admin/" or request.url.path == "/login/": | ||
response = await call_next(request) | ||
return response | ||
|
||
# Attempt to get the user from cookies | ||
user_id = get_user_id_from_cookie(request.cookies.get("user_id")) | ||
request.state.user = await get_current_user(get_db(), user_id) | ||
|
||
app = FastAPI(lifespan=lifespan) | ||
response = await call_next(request) | ||
return response | ||
except Exception as e: | ||
logging.error(f"Error in middleware: {str(e)}") | ||
raise HTTPException(status_code=403, detail="Not authenticated") | ||
|
||
# Include routers | ||
# Include admin routes | ||
app.include_router(admin_router.router, prefix="/admin", tags=["admin"]) | ||
app.include_router(user_router.router, prefix="/users", tags=["users"]) | ||
|
||
class AddUserMiddleware(BaseHTTPMiddleware): | ||
async def dispatch(self, request: Request, call_next): | ||
async with get_db_session() as db: | ||
user_id = get_user_id_from_cookie(request.cookies.get("user_id")) | ||
if user_id: | ||
try: | ||
request.state.user = await get_current_user(db=db, user_id=user_id) # Use await here | ||
logging.info(f"User retrieved: {request.state.user.username}") | ||
except HTTPException as e: | ||
logging.error(f"HTTPException in middleware: {e.detail}") | ||
request.state.user = None | ||
else: | ||
logging.warning("No user_id cookie found.") | ||
request.state.user = None | ||
# Add CORS middleware | ||
app.add_middleware( | ||
CORSMiddleware, | ||
allow_origins=["*"], # Adjust this as necessary | ||
allow_credentials=True, | ||
allow_methods=["*"], | ||
allow_headers=["*"], | ||
) | ||
|
||
response = await call_next(request) | ||
return response | ||
|
||
app.add_middleware(AddUserMiddleware) | ||
# Mount static files (e.g., CSS, JavaScript) | ||
app.mount("/static", StaticFiles(directory="app/static"), name="static") | ||
templates = Jinja2Templates(directory="app/templates") | ||
|
||
# Root route | ||
@app.get("/") | ||
async def read_root(): | ||
return {"message": "Welcome to the Admin Dashboard"} | ||
|
||
# Admin login page | ||
@app.get("/admin/") | ||
async def login_page(request: Request): | ||
return templates.TemplateResponse("login.html", {"request": request, "user": request.state.user}) | ||
return templates.TemplateResponse("login.html", {"request": request, "title": "Admin Login"}) | ||
|
||
# Admin login form submission | ||
@app.post("/login/", response_model=None) | ||
async def login(request: Request, username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)): | ||
user = authenticate_user(username, password, db) | ||
if not user: | ||
logging.warning(f"Failed login attempt for user: {username}") | ||
return templates.TemplateResponse("login.html", { | ||
"request": request, "error": "Invalid credentials, please try again." | ||
"request": request, | ||
"title": "Admin Login", | ||
"error": "Invalid credentials, please try again." | ||
}) | ||
|
||
response = RedirectResponse(url="/dashboard/", status_code=302) | ||
response.set_cookie(key="user_id", value=str(user.id), httponly=True, secure=False, samesite="Lax") | ||
logging.info(f"User logged in: {username}, user_id set in cookie.") | ||
return response | ||
|
||
# Admin dashboard page | ||
@app.get("/dashboard/", response_model=None) | ||
async def dashboard(request: Request): | ||
if not request.state.user: | ||
if request.state.user is None: | ||
return RedirectResponse(url="/admin/") | ||
|
||
logging.info(f"Current user: {request.state.user.username}") | ||
return templates.TemplateResponse("dashboard.html", { | ||
"request": request, | ||
"user": request.state.user | ||
"user": request.state.user, | ||
"title": "Admin Dashboard" | ||
}) | ||
|
||
# Logout functionality | ||
@app.get("/logout/", response_model=None) | ||
async def logout(): | ||
response = RedirectResponse(url="/logout-confirmation/") | ||
response.delete_cookie("user_id") | ||
return response | ||
|
||
# Logout confirmation page | ||
@app.get("/logout-confirmation/", response_class=HTMLResponse) | ||
async def logout_confirmation(request: Request): | ||
return templates.TemplateResponse("logout_confirmation.html", { | ||
"request": request, | ||
"title": "Logout Confirmation", | ||
"user": request.state.user | ||
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,34 +1,55 @@ | ||
# app/models.py | ||
|
||
from sqlalchemy import Column, Integer, String | ||
from pydantic import BaseModel | ||
from sqlalchemy import create_engine, Column, Integer, String | ||
from sqlalchemy.ext.declarative import declarative_base | ||
from pydantic import BaseModel, constr | ||
from app.utils import hash_password | ||
from sqlalchemy.orm import sessionmaker, Session | ||
import os | ||
from passlib.context import CryptContext | ||
|
||
# Set up password hashing context | ||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | ||
|
||
# Define the path for the database file | ||
db_path = os.path.join(os.path.dirname(__file__), 'zola_admin.db') | ||
SQLALCHEMY_DATABASE_URL = f"sqlite:///{db_path}" # Correct format for SQLite | ||
|
||
# Create engine and session local | ||
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) | ||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | ||
|
||
Base = declarative_base() # Ensure this is created once | ||
# Base class for declarative models | ||
Base = declarative_base() | ||
|
||
# Pydantic schemas | ||
class UserCreate(BaseModel): | ||
username: str | ||
password: str | ||
|
||
class UserResponse(BaseModel): | ||
id: int | ||
username: str | ||
|
||
class Config: | ||
from_attributes = True # Enable ORM mode to convert SQLAlchemy models to Pydantic models | ||
|
||
# SQLAlchemy User model | ||
class User(Base): | ||
__tablename__ = "users" | ||
|
||
id = Column(Integer, primary_key=True, index=True) | ||
username = Column(String(150), unique=True, index=True, nullable=False) | ||
hashed_password = Column(String(255), nullable=False) | ||
username = Column(String, unique=True, index=True) | ||
hashed_password = Column(String) | ||
|
||
# Pydantic models for user creation and response | ||
class UserBase(BaseModel): | ||
username: str | ||
def set_password(self, password: str): | ||
self.hashed_password = pwd_context.hash(password) | ||
|
||
class UserCreate(UserBase): | ||
password: constr(min_length=6) # Minimum length for passwords | ||
def verify_password(self, password: str) -> bool: | ||
return pwd_context.verify(password, self.hashed_password) | ||
|
||
class UserResponse(UserBase): | ||
id: int | ||
# Add additional models as needed | ||
class AnotherModel(Base): | ||
__tablename__ = "another_table" | ||
|
||
class Config: | ||
orm_mode = True # Enable compatibility with ORM objects | ||
id = Column(Integer, primary_key=True, index=True) | ||
some_field = Column(String) | ||
|
||
# Function to hash the password and create a new User object | ||
def create_user_hashed(user: UserCreate): | ||
"""Hash the password for a new user.""" | ||
return User(username=user.username, hashed_password=hash_password(user.password)) | ||
# You can define more models here as required |
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.