knowledge-base/backend/auth.py
2025-10-23 22:22:07 +05:30

120 lines
3.6 KiB
Python

"""Simple API key authentication for Mem0 Interface."""
from typing import Optional
from fastapi import HTTPException, Security, status
from fastapi.security import APIKeyHeader
import structlog
from config import settings
logger = structlog.get_logger(__name__)
# API Key header
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True)
class AuthService:
"""Simple authentication service using API keys mapped to users."""
def __init__(self):
"""Initialize auth service with API key to user mapping."""
self.api_key_to_user = settings.api_key_mapping
logger.info(f"Auth service initialized with {len(self.api_key_to_user)} API keys")
def verify_api_key(self, api_key: str) -> str:
"""
Verify API key and return associated user_id.
Args:
api_key: The API key from request header
Returns:
str: The user_id associated with this API key
Raises:
HTTPException: If API key is invalid
"""
if api_key not in self.api_key_to_user:
logger.warning(f"Invalid API key attempted: {api_key[:10]}...")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key"
)
user_id = self.api_key_to_user[api_key]
logger.debug(f"API key verified for user: {user_id}")
return user_id
def verify_user_access(self, api_key: str, requested_user_id: str) -> str:
"""
Verify that the API key owner is accessing their own data.
Args:
api_key: The API key from request header
requested_user_id: The user_id being accessed in the request
Returns:
str: The authenticated user_id
Raises:
HTTPException: If user tries to access another user's data
"""
authenticated_user_id = self.verify_api_key(api_key)
if authenticated_user_id != requested_user_id:
logger.warning(
f"Unauthorized access attempt: user {authenticated_user_id} "
f"tried to access {requested_user_id}'s data"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access denied: You can only access your own memories"
)
return authenticated_user_id
# Global auth service instance
auth_service = AuthService()
async def get_current_user(api_key: str = Security(api_key_header)) -> str:
"""
FastAPI dependency to get current authenticated user.
Args:
api_key: API key from X-API-Key header
Returns:
str: Authenticated user_id
"""
return auth_service.verify_api_key(api_key)
async def verify_user_access(
api_key: str = Security(api_key_header),
user_id: Optional[str] = None
) -> str:
"""
FastAPI dependency to verify user can access the requested user_id.
Args:
api_key: API key from X-API-Key header
user_id: The user_id being accessed (from path or body)
Returns:
str: Authenticated user_id
"""
authenticated_user_id = auth_service.verify_api_key(api_key)
# If user_id is provided, verify access
if user_id and authenticated_user_id != user_id:
logger.warning(
f"Access denied: {authenticated_user_id} tried to access {user_id}'s data"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied: You can only access your own memories"
)
return authenticated_user_id