knowledge-base/backend/models.py
Pratik Narola e5a4d1c7c2 feat: rewrite /v1/chat/completions as a real OpenAI-compat proxy
Part 1: strip leading newlines from chat_with_memory's LLM response
(minimax-m2 reasoning output leaks blank lines; .lstrip() at the source
covers /chat, /v1/chat/completions, and the MCP chat tool).

Part 2: replace the /v1/chat/completions handler with an httpx-based
pass-through proxy that preserves every upstream field (tool_calls,
reasoning_tokens, system_fingerprint, finish_reason, etc.) and supports
end-to-end MCP-style tool calling.

What changed:
- models.py: OpenAIChatCompletionRequest is now permissive — typed for
  the common fields (tools, tool_choice, parallel_tool_calls,
  response_format, max_completion_tokens, seed, stream_options,
  reasoning_effort, modalities, etc.) and extra='allow' for forward-
  compat. The typed response models (OpenAIChatCompletionResponse and
  friends) are deleted — the handler returns upstream's JSON dict
  directly so unknown fields aren't silently dropped.
- mem0_manager.py: adds httpx.AsyncClient + an openai_proxy_completion()
  method that injects a "Relevant memories" system message only when
  the last role is 'user' AND no tool flow is in progress, then forwards
  to the upstream LLM. Non-stream returns upstream JSON; stream returns
  an async iterator that yields raw upstream SSE bytes verbatim while
  side-channel-parsing for the post-stream mem0.add. Codifies the
  Memori #434 lessons: never mutates existing messages (only prepends
  system), never touches tool_call_id, runs post-add even on mid-stream
  error via try/finally.
- main.py: handler is now ~50 lines — model_dump(exclude_unset) the
  request, hand off to openai_proxy_completion, return dict OR wrap in
  StreamingResponse. response_model=None so FastAPI doesn't validate.
  Deleted stream_openai_response (post-hoc word-chunking is gone).
  Lifespan shutdown closes mem0_manager.async_http.

Research confirmed mem0 itself does not ship an HTTP /v1/chat/completions
(only the in-process mem0.proxy.main.Mem0 SDK pattern), so we replicate
the pattern without adding a litellm dependency. SSE/tool_calls patterns
are modeled after microsoft/agent-lightning's llm_proxy.

Verified locally: ast.parse OK on all three files. End-to-end smoke tests
will run on beast.
2026-05-23 21:08:31 +05:30

285 lines
10 KiB
Python

"""Ultra-minimal Pydantic models for pure Mem0 API."""
from typing import List, Optional, Dict, Any, Union
from pydantic import BaseModel, ConfigDict, Field
import re
# Constants for input validation
MAX_MESSAGE_LENGTH = 50000 # ~12k tokens max per message
MAX_QUERY_LENGTH = 10000 # ~2.5k tokens max per query
MAX_USER_ID_LENGTH = 100 # Reasonable user ID length
MAX_MEMORY_ID_LENGTH = 100 # Memory IDs are typically UUIDs
MAX_CONTEXT_MESSAGES = 100 # Max conversation context messages
USER_ID_PATTERN = r"^[a-zA-Z0-9_\-\.@]+$" # Alphanumeric with common separators
# Request Models
class ChatMessage(BaseModel):
"""Chat message structure."""
role: str = Field(
..., max_length=20, description="Message role (user, assistant, system)"
)
content: str = Field(
..., max_length=MAX_MESSAGE_LENGTH, description="Message content"
)
class ChatRequest(BaseModel):
"""Ultra-minimal chat request."""
message: str = Field(..., max_length=MAX_MESSAGE_LENGTH, description="User message")
user_id: Optional[str] = Field(
"default",
max_length=MAX_USER_ID_LENGTH,
pattern=USER_ID_PATTERN,
description="User identifier (alphanumeric, _, -, ., @)",
)
agent_id: Optional[str] = Field(
None, max_length=MAX_USER_ID_LENGTH, description="Agent identifier"
)
run_id: Optional[str] = Field(
None, max_length=MAX_USER_ID_LENGTH, description="Run identifier"
)
context: Optional[List[ChatMessage]] = Field(
None,
max_length=MAX_CONTEXT_MESSAGES,
description="Previous conversation context (max 100 messages)",
)
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata")
class MemoryAddRequest(BaseModel):
"""Request to add memories with hierarchy support - open-source compatible."""
messages: List[ChatMessage] = Field(
...,
max_length=MAX_CONTEXT_MESSAGES,
description="Messages to process (max 100 messages)",
)
user_id: Optional[str] = Field(
"default",
max_length=MAX_USER_ID_LENGTH,
pattern=USER_ID_PATTERN,
description="User identifier",
)
agent_id: Optional[str] = Field(
None, max_length=MAX_USER_ID_LENGTH, description="Agent identifier"
)
run_id: Optional[str] = Field(
None, max_length=MAX_USER_ID_LENGTH, description="Run identifier"
)
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata")
class MemorySearchRequest(BaseModel):
"""Request to search memories with hierarchy filtering."""
query: str = Field(..., max_length=MAX_QUERY_LENGTH, description="Search query")
user_id: Optional[str] = Field(
"default",
max_length=MAX_USER_ID_LENGTH,
pattern=USER_ID_PATTERN,
description="User identifier",
)
agent_id: Optional[str] = Field(
None, max_length=MAX_USER_ID_LENGTH, description="Agent identifier"
)
run_id: Optional[str] = Field(
None, max_length=MAX_USER_ID_LENGTH, description="Run identifier"
)
limit: int = Field(5, ge=1, le=100, description="Maximum number of results (1-100)")
threshold: Optional[float] = Field(
None, ge=0.0, le=1.0, description="Minimum relevance score (0-1)"
)
filters: Optional[Dict[str, Any]] = Field(None, description="Additional filters")
class MemoryUpdateRequest(BaseModel):
"""Request to update a memory."""
memory_id: str = Field(
..., max_length=MAX_MEMORY_ID_LENGTH, description="Memory ID to update"
)
user_id: str = Field(
...,
max_length=MAX_USER_ID_LENGTH,
pattern=USER_ID_PATTERN,
description="User identifier for ownership verification",
)
content: str = Field(
..., max_length=MAX_MESSAGE_LENGTH, description="New memory content"
)
metadata: Optional[Dict[str, Any]] = Field(None, description="Updated metadata")
# Response Models - Ultra-minimal
class MemoryItem(BaseModel):
"""Individual memory item."""
id: str = Field(..., description="Memory unique identifier")
memory: str = Field(..., description="Memory content")
user_id: Optional[str] = Field(None, description="Associated user ID")
agent_id: Optional[str] = Field(None, description="Associated agent ID")
run_id: Optional[str] = Field(None, description="Associated run ID")
metadata: Optional[Dict[str, Any]] = Field(None, description="Memory metadata")
score: Optional[float] = Field(
None, description="Relevance score (for search results)"
)
created_at: Optional[str] = Field(None, description="Creation timestamp")
updated_at: Optional[str] = Field(None, description="Last update timestamp")
class MemorySearchResponse(BaseModel):
"""Memory search results - pure Mem0 structure."""
memories: List[MemoryItem] = Field(..., description="Found memories")
total_count: int = Field(..., description="Total number of memories found")
query: str = Field(..., description="Original search query")
class MemoryAddResponse(BaseModel):
"""Response from adding memories - pure Mem0 structure."""
added_memories: List[Dict[str, Any]] = Field(
..., description="Memories that were added"
)
message: str = Field(..., description="Success message")
class GraphRelationship(BaseModel):
"""Graph relationship structure."""
source: str = Field(..., description="Source entity")
relationship: str = Field(..., description="Relationship type")
target: str = Field(..., description="Target entity")
properties: Optional[Dict[str, Any]] = Field(
None, description="Relationship properties"
)
class GraphResponse(BaseModel):
"""Graph relationships - pure Mem0 structure."""
relationships: List[GraphRelationship] = Field(
..., description="Found relationships"
)
entities: List[str] = Field(..., description="Unique entities")
user_id: str = Field(..., description="User identifier")
class HealthResponse(BaseModel):
"""Health check response."""
status: str = Field(..., description="Service status")
services: Dict[str, str] = Field(..., description="Individual service statuses")
timestamp: str = Field(..., description="Health check timestamp")
class ErrorResponse(BaseModel):
"""Error response structure."""
error: str = Field(..., description="Error message")
detail: Optional[str] = Field(None, description="Detailed error information")
status_code: int = Field(..., description="HTTP status code")
# Statistics and Monitoring Models
class MemoryOperationStats(BaseModel):
"""Memory operation statistics."""
add: int = Field(..., description="Number of add operations")
search: int = Field(..., description="Number of search operations")
update: int = Field(..., description="Number of update operations")
delete: int = Field(..., description="Number of delete operations")
class GlobalStatsResponse(BaseModel):
"""Global application statistics."""
total_memories: int = Field(..., description="Total memories across all users")
total_users: int = Field(..., description="Total number of users")
api_calls_today: int = Field(..., description="Total API calls today")
avg_response_time_ms: float = Field(
..., description="Average response time in milliseconds"
)
memory_operations: MemoryOperationStats = Field(
..., description="Memory operation breakdown"
)
uptime_seconds: float = Field(..., description="Application uptime in seconds")
class UserStatsResponse(BaseModel):
"""User-specific statistics."""
user_id: str = Field(..., description="User identifier")
memory_count: int = Field(..., description="Number of memories for this user")
relationship_count: int = Field(
..., description="Number of graph relationships for this user"
)
last_activity: Optional[str] = Field(None, description="Last activity timestamp")
api_calls_today: int = Field(..., description="API calls made by this user today")
avg_response_time_ms: float = Field(
..., description="Average response time for this user's requests"
)
# OpenAI-Compatible API Models
#
# The /v1/chat/completions handler is a pass-through proxy: requests are
# forwarded to the upstream LLM and the upstream response is relayed verbatim
# (dict for non-stream, raw SSE bytes for stream). Hence only the REQUEST
# model lives here; the response is never re-typed (typed Pydantic response
# models silently drop unknown fields like tool_calls / refusal /
# reasoning_tokens — see the audit in the plan file).
class OpenAIChatCompletionRequest(BaseModel):
"""OpenAI chat completion request — permissive schema, forwarded as-is.
Only `model` and `messages` are required. All other standard OpenAI
parameters are typed (for client IDE/docs benefit) but optional. Unknown
fields are accepted via `extra="allow"` and forwarded to upstream, so
new OpenAI parameters don't require a code change here.
"""
model_config = ConfigDict(extra="allow")
model: str = Field(..., description="Model to use (forwarded to upstream)")
messages: List[Dict[str, Any]] = Field(
..., description="Messages array (multi-part content supported)"
)
# Common params (typed for IDE/docs; all optional)
temperature: Optional[float] = None
top_p: Optional[float] = None
n: Optional[int] = None
stream: Optional[bool] = None
stream_options: Optional[Dict[str, Any]] = None
stop: Optional[Union[str, List[str]]] = None
max_tokens: Optional[int] = None # deprecated; still accepted
max_completion_tokens: Optional[int] = None # replaces max_tokens
presence_penalty: Optional[float] = None
frequency_penalty: Optional[float] = None
seed: Optional[int] = None
logprobs: Optional[bool] = None
top_logprobs: Optional[int] = None
response_format: Optional[Dict[str, Any]] = None
tools: Optional[List[Dict[str, Any]]] = None
tool_choice: Optional[Union[str, Dict[str, Any]]] = None
parallel_tool_calls: Optional[bool] = None
reasoning_effort: Optional[str] = None # o-series / reasoning models
modalities: Optional[List[str]] = None
audio: Optional[Dict[str, Any]] = None
prediction: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
store: Optional[bool] = None
service_tier: Optional[str] = None
logit_bias: Optional[Dict[str, float]] = None
# `user` is ignored — the authenticated user is derived from the API key.
user: Optional[str] = None