- Add /v1/chat/completions and /chat/completions endpoints (OpenAI SDK compatible) - Add streaming support with SSE for chat completions - Add get_current_user_openai auth supporting Bearer token and X-API-Key - Add OpenAI-compatible request/response models (OpenAIChatCompletionRequest, etc.) - Cherry-pick improved login UI from cloud branch (styled login screen, logout button)
158 lines
4.8 KiB
Python
158 lines
4.8 KiB
Python
"""Simple API key authentication for Mem0 Interface."""
|
|
|
|
from typing import Optional
|
|
from fastapi import HTTPException, Security, status, Header
|
|
from fastapi.security import APIKeyHeader
|
|
import structlog
|
|
|
|
from config import settings
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
# API Key header
|
|
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True)
|
|
|
|
|
|
class AuthService:
|
|
"""Simple authentication service using API keys mapped to users."""
|
|
|
|
def __init__(self):
|
|
"""Initialize auth service with API key to user mapping."""
|
|
self.api_key_to_user = settings.api_key_mapping
|
|
logger.info(
|
|
f"Auth service initialized with {len(self.api_key_to_user)} API keys"
|
|
)
|
|
|
|
def verify_api_key(self, api_key: str) -> str:
|
|
"""
|
|
Verify API key and return associated user_id.
|
|
|
|
Args:
|
|
api_key: The API key from request header
|
|
|
|
Returns:
|
|
str: The user_id associated with this API key
|
|
|
|
Raises:
|
|
HTTPException: If API key is invalid
|
|
"""
|
|
if api_key not in self.api_key_to_user:
|
|
logger.warning(f"Invalid API key attempted: {api_key[:10]}...")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key"
|
|
)
|
|
|
|
user_id = self.api_key_to_user[api_key]
|
|
logger.debug(f"API key verified for user: {user_id}")
|
|
return user_id
|
|
|
|
def verify_user_access(self, api_key: str, requested_user_id: str) -> str:
|
|
"""
|
|
Verify that the API key owner is accessing their own data.
|
|
|
|
Args:
|
|
api_key: The API key from request header
|
|
requested_user_id: The user_id being accessed in the request
|
|
|
|
Returns:
|
|
str: The authenticated user_id
|
|
|
|
Raises:
|
|
HTTPException: If user tries to access another user's data
|
|
"""
|
|
authenticated_user_id = self.verify_api_key(api_key)
|
|
|
|
if authenticated_user_id != requested_user_id:
|
|
logger.warning(
|
|
f"Unauthorized access attempt: user {authenticated_user_id} "
|
|
f"tried to access {requested_user_id}'s data"
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Access denied: You can only access your own memories",
|
|
)
|
|
|
|
return authenticated_user_id
|
|
|
|
|
|
# Global auth service instance
|
|
auth_service = AuthService()
|
|
|
|
|
|
async def get_current_user(api_key: str = Security(api_key_header)) -> str:
|
|
"""
|
|
FastAPI dependency to get current authenticated user.
|
|
|
|
Args:
|
|
api_key: API key from X-API-Key header
|
|
|
|
Returns:
|
|
str: Authenticated user_id
|
|
"""
|
|
return auth_service.verify_api_key(api_key)
|
|
|
|
|
|
async def get_current_user_openai(
|
|
authorization: Optional[str] = Header(None),
|
|
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
|
|
) -> str:
|
|
"""
|
|
FastAPI dependency for OpenAI-compatible authentication.
|
|
Supports both Authorization: Bearer and X-API-Key headers.
|
|
|
|
Args:
|
|
authorization: Authorization header (Bearer token)
|
|
x_api_key: X-API-Key header
|
|
|
|
Returns:
|
|
str: Authenticated user_id
|
|
|
|
Raises:
|
|
HTTPException: If no valid API key is provided
|
|
"""
|
|
api_key = None
|
|
|
|
# Try Bearer token first (OpenAI standard)
|
|
if authorization and authorization.startswith("Bearer "):
|
|
api_key = authorization[7:] # Remove "Bearer " prefix
|
|
logger.debug("Extracted API key from Authorization Bearer token")
|
|
# Fall back to X-API-Key header
|
|
elif x_api_key:
|
|
api_key = x_api_key
|
|
logger.debug("Extracted API key from X-API-Key header")
|
|
else:
|
|
logger.warning("No API key provided in Authorization or X-API-Key headers")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing API key. Provide either 'Authorization: Bearer <key>' or 'X-API-Key: <key>' header",
|
|
)
|
|
|
|
return auth_service.verify_api_key(api_key)
|
|
|
|
|
|
async def verify_user_access(
|
|
api_key: str = Security(api_key_header), user_id: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
FastAPI dependency to verify user can access the requested user_id.
|
|
|
|
Args:
|
|
api_key: API key from X-API-Key header
|
|
user_id: The user_id being accessed (from path or body)
|
|
|
|
Returns:
|
|
str: Authenticated user_id
|
|
"""
|
|
authenticated_user_id = auth_service.verify_api_key(api_key)
|
|
|
|
# If user_id is provided, verify access
|
|
if user_id and authenticated_user_id != user_id:
|
|
logger.warning(
|
|
f"Access denied: {authenticated_user_id} tried to access {user_id}'s data"
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Access denied: You can only access your own memories",
|
|
)
|
|
|
|
return authenticated_user_id
|