from datetime import datetime, timedelta, timezone from typing import Annotated, Optional from fastapi import APIRouter, Depends, HTTPException, Request, Form from fastapi.responses import HTMLResponse, RedirectResponse from jose import jwt, JWTError from pydantic import BaseModel from sqlalchemy.orm import Session from starlette import status from config import SECRET_KEY, ALGORITHM, templates, bcrypt_context, oauth2_bearer from database import SessionLocal from models import User router = APIRouter( prefix="/auth", tags=["auth"] ) def get_db(): db = SessionLocal() try: yield db finally: db.close() db_dependency = Annotated[Session, Depends(get_db)] class CreateUserRequest(BaseModel): username: str password: str email: str = "" full_name: str = "" class Token(BaseModel): access_token: str token_type: str def authenticate_user(username: str, password: str, db: Session): user = db.query(User).filter(User.username == username).first() if not user: return False if not bcrypt_context.verify(password, user.hashed_password): return False return user def create_access_token(username: str, user_id: int, expires_delta: timedelta): encode = {"sub": username, "id": user_id} expires = datetime.now(timezone.utc) + expires_delta encode.update({"exp": expires}) return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM) async def get_current_user( request: Request, token: Annotated[Optional[str], Depends(oauth2_bearer)], ): cookie_token = request.cookies.get("access_token") effective_token = cookie_token or token if not effective_token: return None try: payload = jwt.decode(effective_token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") user_id: int = payload.get("id") if username is None or user_id is None: return None return {"username": username, "id": user_id} except JWTError: return None @router.post("/", status_code=status.HTTP_201_CREATED) async def create_user(db: db_dependency, create_user_request: CreateUserRequest): create_user_model = User( username=create_user_request.username, email=create_user_request.email or None, full_name=create_user_request.full_name or None, hashed_password=bcrypt_context.hash(create_user_request.password), ) db.add(create_user_model) db.commit() @router.post("/token", response_model=Token) async def login_for_access_token( username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db), ): user = authenticate_user(username, password, db) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate user.", ) token = create_access_token(user.username, user.id, timedelta(minutes=60)) return {"access_token": token, "token_type": "bearer"} @router.get("/login", response_class=HTMLResponse) async def login_page(request: Request): return templates.TemplateResponse(request, "auth/login.html", {}) @router.post("/login") async def login_submit(request: Request, db: db_dependency): form = await request.form() username = form.get("username", "") password = form.get("password", "") user = authenticate_user(username, password, db) if not user: return templates.TemplateResponse(request, "auth/login.html", { "error": "Invalid username or password", }) token = create_access_token(user.username, user.id, timedelta(minutes=60)) response = RedirectResponse(url="/", status_code=303) response.set_cookie(key="access_token", value=token, httponly=True, max_age=3600, samesite="lax") return response @router.get("/register", response_class=HTMLResponse) async def register_page(request: Request): return templates.TemplateResponse(request, "auth/register.html", {}) @router.post("/register") async def register_submit(request: Request, db: db_dependency): form = await request.form() username = form.get("username", "").strip() email = form.get("email", "").strip() full_name = form.get("full_name", "").strip() password = form.get("password", "") confirm = form.get("confirm_password", "") errors = [] if not username: errors.append("Username is required.") if not password: errors.append("Password is required.") if password != confirm: errors.append("Passwords do not match.") if len(password) < 6: errors.append("Password must be at least 6 characters.") if db.query(User).filter(User.username == username).first(): errors.append("Username already taken.") if email and db.query(User).filter(User.email == email).first(): errors.append("Email already registered.") if errors: return templates.TemplateResponse(request, "auth/register.html", { "errors": errors, "username": username, "email": email, "full_name": full_name, }) new_user = User( username=username, email=email or None, full_name=full_name or None, hashed_password=bcrypt_context.hash(password), ) db.add(new_user) db.commit() return RedirectResponse(url="/auth/login?registered=1", status_code=303) @router.get("/logout") async def logout(): response = RedirectResponse(url="/", status_code=303) response.delete_cookie("access_token") return response