120 lines
3.6 KiB
Python
120 lines
3.6 KiB
Python
"""Simple API key authentication for Mem0 Interface."""
|
|
|
|
from typing import Optional
|
|
from fastapi import HTTPException, Security, status
|
|
from fastapi.security import APIKeyHeader
|
|
import structlog
|
|
|
|
from config import settings
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
# API Key header
|
|
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True)
|
|
|
|
|
|
class AuthService:
|
|
"""Simple authentication service using API keys mapped to users."""
|
|
|
|
def __init__(self):
|
|
"""Initialize auth service with API key to user mapping."""
|
|
self.api_key_to_user = settings.api_key_mapping
|
|
logger.info(f"Auth service initialized with {len(self.api_key_to_user)} API keys")
|
|
|
|
def verify_api_key(self, api_key: str) -> str:
|
|
"""
|
|
Verify API key and return associated user_id.
|
|
|
|
Args:
|
|
api_key: The API key from request header
|
|
|
|
Returns:
|
|
str: The user_id associated with this API key
|
|
|
|
Raises:
|
|
HTTPException: If API key is invalid
|
|
"""
|
|
if api_key not in self.api_key_to_user:
|
|
logger.warning(f"Invalid API key attempted: {api_key[:10]}...")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid API key"
|
|
)
|
|
|
|
user_id = self.api_key_to_user[api_key]
|
|
logger.debug(f"API key verified for user: {user_id}")
|
|
return user_id
|
|
|
|
def verify_user_access(self, api_key: str, requested_user_id: str) -> str:
|
|
"""
|
|
Verify that the API key owner is accessing their own data.
|
|
|
|
Args:
|
|
api_key: The API key from request header
|
|
requested_user_id: The user_id being accessed in the request
|
|
|
|
Returns:
|
|
str: The authenticated user_id
|
|
|
|
Raises:
|
|
HTTPException: If user tries to access another user's data
|
|
"""
|
|
authenticated_user_id = self.verify_api_key(api_key)
|
|
|
|
if authenticated_user_id != requested_user_id:
|
|
logger.warning(
|
|
f"Unauthorized access attempt: user {authenticated_user_id} "
|
|
f"tried to access {requested_user_id}'s data"
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Access denied: You can only access your own memories"
|
|
)
|
|
|
|
return authenticated_user_id
|
|
|
|
|
|
# Global auth service instance
|
|
auth_service = AuthService()
|
|
|
|
|
|
async def get_current_user(api_key: str = Security(api_key_header)) -> str:
|
|
"""
|
|
FastAPI dependency to get current authenticated user.
|
|
|
|
Args:
|
|
api_key: API key from X-API-Key header
|
|
|
|
Returns:
|
|
str: Authenticated user_id
|
|
"""
|
|
return auth_service.verify_api_key(api_key)
|
|
|
|
|
|
async def verify_user_access(
|
|
api_key: str = Security(api_key_header),
|
|
user_id: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
FastAPI dependency to verify user can access the requested user_id.
|
|
|
|
Args:
|
|
api_key: API key from X-API-Key header
|
|
user_id: The user_id being accessed (from path or body)
|
|
|
|
Returns:
|
|
str: Authenticated user_id
|
|
"""
|
|
authenticated_user_id = auth_service.verify_api_key(api_key)
|
|
|
|
# If user_id is provided, verify access
|
|
if user_id and authenticated_user_id != user_id:
|
|
logger.warning(
|
|
f"Access denied: {authenticated_user_id} tried to access {user_id}'s data"
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Access denied: You can only access your own memories"
|
|
)
|
|
|
|
return authenticated_user_id
|