"""Simple API key authentication for Mem0 Interface.""" from typing import Optional from fastapi import HTTPException, Security, status, Header from fastapi.security import APIKeyHeader import structlog from config import settings logger = structlog.get_logger(__name__) # API Key header api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True) class AuthService: """Simple authentication service using API keys mapped to users.""" def __init__(self): """Initialize auth service with API key to user mapping.""" self.api_key_to_user = settings.api_key_mapping logger.info( f"Auth service initialized with {len(self.api_key_to_user)} API keys" ) def verify_api_key(self, api_key: str) -> str: """ Verify API key and return associated user_id. Args: api_key: The API key from request header Returns: str: The user_id associated with this API key Raises: HTTPException: If API key is invalid """ if api_key not in self.api_key_to_user: logger.warning(f"Invalid API key attempted: {api_key[:10]}...") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key" ) user_id = self.api_key_to_user[api_key] logger.debug(f"API key verified for user: {user_id}") return user_id def verify_user_access(self, api_key: str, requested_user_id: str) -> str: """ Verify that the API key owner is accessing their own data. Args: api_key: The API key from request header requested_user_id: The user_id being accessed in the request Returns: str: The authenticated user_id Raises: HTTPException: If user tries to access another user's data """ authenticated_user_id = self.verify_api_key(api_key) if authenticated_user_id != requested_user_id: logger.warning( f"Unauthorized access attempt: user {authenticated_user_id} " f"tried to access {requested_user_id}'s data" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Access denied: You can only access your own memories", ) return authenticated_user_id # Global auth service instance auth_service = AuthService() async def get_current_user(api_key: str = Security(api_key_header)) -> str: """ FastAPI dependency to get current authenticated user. Args: api_key: API key from X-API-Key header Returns: str: Authenticated user_id """ return auth_service.verify_api_key(api_key) async def get_current_user_openai( authorization: Optional[str] = Header(None), x_api_key: Optional[str] = Header(None, alias="X-API-Key"), ) -> str: """ FastAPI dependency for OpenAI-compatible authentication. Supports both Authorization: Bearer and X-API-Key headers. Args: authorization: Authorization header (Bearer token) x_api_key: X-API-Key header Returns: str: Authenticated user_id Raises: HTTPException: If no valid API key is provided """ api_key = None # Try Bearer token first (OpenAI standard) if authorization and authorization.startswith("Bearer "): api_key = authorization[7:] # Remove "Bearer " prefix logger.debug("Extracted API key from Authorization Bearer token") # Fall back to X-API-Key header elif x_api_key: api_key = x_api_key logger.debug("Extracted API key from X-API-Key header") else: logger.warning("No API key provided in Authorization or X-API-Key headers") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing API key. Provide either 'Authorization: Bearer ' or 'X-API-Key: ' header", ) return auth_service.verify_api_key(api_key) async def verify_user_access( api_key: str = Security(api_key_header), user_id: Optional[str] = None ) -> str: """ FastAPI dependency to verify user can access the requested user_id. Args: api_key: API key from X-API-Key header user_id: The user_id being accessed (from path or body) Returns: str: Authenticated user_id """ authenticated_user_id = auth_service.verify_api_key(api_key) # If user_id is provided, verify access if user_id and authenticated_user_id != user_id: logger.warning( f"Access denied: {authenticated_user_id} tried to access {user_id}'s data" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied: You can only access your own memories", ) return authenticated_user_id