"""Simple API key authentication for Mem0 Interface.""" from typing import Optional from fastapi import HTTPException, Security, status from fastapi.security import APIKeyHeader import structlog from config import settings logger = structlog.get_logger(__name__) # API Key header api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True) class AuthService: """Simple authentication service using API keys mapped to users.""" def __init__(self): """Initialize auth service with API key to user mapping.""" self.api_key_to_user = settings.api_key_mapping logger.info(f"Auth service initialized with {len(self.api_key_to_user)} API keys") def verify_api_key(self, api_key: str) -> str: """ Verify API key and return associated user_id. Args: api_key: The API key from request header Returns: str: The user_id associated with this API key Raises: HTTPException: If API key is invalid """ if api_key not in self.api_key_to_user: logger.warning(f"Invalid API key attempted: {api_key[:10]}...") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key" ) user_id = self.api_key_to_user[api_key] logger.debug(f"API key verified for user: {user_id}") return user_id def verify_user_access(self, api_key: str, requested_user_id: str) -> str: """ Verify that the API key owner is accessing their own data. Args: api_key: The API key from request header requested_user_id: The user_id being accessed in the request Returns: str: The authenticated user_id Raises: HTTPException: If user tries to access another user's data """ authenticated_user_id = self.verify_api_key(api_key) if authenticated_user_id != requested_user_id: logger.warning( f"Unauthorized access attempt: user {authenticated_user_id} " f"tried to access {requested_user_id}'s data" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Access denied: You can only access your own memories" ) return authenticated_user_id # Global auth service instance auth_service = AuthService() async def get_current_user(api_key: str = Security(api_key_header)) -> str: """ FastAPI dependency to get current authenticated user. Args: api_key: API key from X-API-Key header Returns: str: Authenticated user_id """ return auth_service.verify_api_key(api_key) async def verify_user_access( api_key: str = Security(api_key_header), user_id: Optional[str] = None ) -> str: """ FastAPI dependency to verify user can access the requested user_id. Args: api_key: API key from X-API-Key header user_id: The user_id being accessed (from path or body) Returns: str: Authenticated user_id """ authenticated_user_id = auth_service.verify_api_key(api_key) # If user_id is provided, verify access if user_id and authenticated_user_id != user_id: logger.warning( f"Access denied: {authenticated_user_id} tried to access {user_id}'s data" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You can only access your own memories" ) return authenticated_user_id