feat: project skeleton

- infra (k8s, kind, helm, docker) backbone is implemented
- security: implementation + unit tests are done
This commit is contained in:
2025-11-21 12:00:00 -05:00
commit fbe9fbba6e
46 changed files with 3450 additions and 0 deletions

0
app/core/__init__.py Normal file
View File

59
app/core/exceptions.py Normal file
View File

@@ -0,0 +1,59 @@
"""Custom HTTP exceptions for the API."""
from fastapi import HTTPException, status
class NotFoundError(HTTPException):
"""Resource not found."""
def __init__(self, detail: str = "Resource not found") -> None:
super().__init__(status_code=status.HTTP_404_NOT_FOUND, detail=detail)
class ConflictError(HTTPException):
"""Conflict with current state (e.g., version mismatch)."""
def __init__(self, detail: str = "Conflict with current state") -> None:
super().__init__(status_code=status.HTTP_409_CONFLICT, detail=detail)
class UnauthorizedError(HTTPException):
"""Authentication required or failed."""
def __init__(self, detail: str = "Not authenticated") -> None:
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
headers={"WWW-Authenticate": "Bearer"},
)
class ForbiddenError(HTTPException):
"""Insufficient permissions."""
def __init__(self, detail: str = "Insufficient permissions") -> None:
super().__init__(status_code=status.HTTP_403_FORBIDDEN, detail=detail)
class BadRequestError(HTTPException):
"""Invalid request data."""
def __init__(self, detail: str = "Invalid request") -> None:
super().__init__(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
class ValidationError(HTTPException):
"""Validation failed."""
def __init__(self, detail: str = "Validation failed") -> None:
super().__init__(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=detail)
__all__ = [
"BadRequestError",
"ConflictError",
"ForbiddenError",
"NotFoundError",
"UnauthorizedError",
"ValidationError",
]

106
app/core/security.py Normal file
View File

@@ -0,0 +1,106 @@
"""Security utilities for JWT and password hashing."""
import hashlib
import secrets
from datetime import UTC, datetime, timedelta
from typing import Any
from uuid import UUID, uuid4
import bcrypt
from jose import JWTError, jwt
from app.config import settings
def hash_password(password: str) -> str:
"""Hash a password using bcrypt."""
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return bcrypt.checkpw(plain_password.encode(), hashed_password.encode())
def create_access_token(
sub: str,
org_id: str,
org_role: str,
expires_delta: timedelta | None = None,
) -> str:
"""Create a JWT access token with org context."""
if expires_delta is None:
expires_delta = timedelta(minutes=settings.access_token_expire_minutes)
now = datetime.now(UTC)
expire = now + expires_delta
payload = {
"sub": sub,
"org_id": org_id,
"org_role": org_role,
"iss": settings.jwt_issuer,
"aud": settings.jwt_audience,
"jti": str(uuid4()),
"iat": now,
"exp": expire,
}
return jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
def decode_access_token(token: str) -> dict[str, Any]:
"""Decode and validate a JWT access token.
Raises:
JWTError: If token is invalid or expired.
"""
return jwt.decode(
token,
settings.jwt_secret_key,
algorithms=[settings.jwt_algorithm],
issuer=settings.jwt_issuer,
audience=settings.jwt_audience,
)
def generate_refresh_token() -> str:
"""Generate a secure random refresh token."""
return secrets.token_urlsafe(32)
def hash_token(token: str) -> str:
"""Hash a refresh token for storage."""
return hashlib.sha256(token.encode()).hexdigest()
def get_refresh_token_expiry() -> datetime:
"""Get expiry datetime for a new refresh token."""
return datetime.now(UTC) + timedelta(days=settings.refresh_token_expire_days)
class TokenPayload:
"""Parsed JWT token payload."""
def __init__(self, payload: dict[str, Any]) -> None:
self.user_id = UUID(payload["sub"])
self.org_id = UUID(payload["org_id"])
self.org_role = payload["org_role"]
self.issuer = payload["iss"]
self.audience = payload["aud"]
self.jti = UUID(payload["jti"])
self.issued_at = payload["iat"]
self.expires_at = payload["exp"]
__all__ = [
"JWTError",
"TokenPayload",
"create_access_token",
"decode_access_token",
"generate_refresh_token",
"get_refresh_token_expiry",
"hash_password",
"hash_token",
"verify_password",
]