Part 1: strip leading newlines from chat_with_memory's LLM response (minimax-m2 reasoning output leaks blank lines; .lstrip() at the source covers /chat, /v1/chat/completions, and the MCP chat tool). Part 2: replace the /v1/chat/completions handler with an httpx-based pass-through proxy that preserves every upstream field (tool_calls, reasoning_tokens, system_fingerprint, finish_reason, etc.) and supports end-to-end MCP-style tool calling. What changed: - models.py: OpenAIChatCompletionRequest is now permissive — typed for the common fields (tools, tool_choice, parallel_tool_calls, response_format, max_completion_tokens, seed, stream_options, reasoning_effort, modalities, etc.) and extra='allow' for forward- compat. The typed response models (OpenAIChatCompletionResponse and friends) are deleted — the handler returns upstream's JSON dict directly so unknown fields aren't silently dropped. - mem0_manager.py: adds httpx.AsyncClient + an openai_proxy_completion() method that injects a "Relevant memories" system message only when the last role is 'user' AND no tool flow is in progress, then forwards to the upstream LLM. Non-stream returns upstream JSON; stream returns an async iterator that yields raw upstream SSE bytes verbatim while side-channel-parsing for the post-stream mem0.add. Codifies the Memori #434 lessons: never mutates existing messages (only prepends system), never touches tool_call_id, runs post-add even on mid-stream error via try/finally. - main.py: handler is now ~50 lines — model_dump(exclude_unset) the request, hand off to openai_proxy_completion, return dict OR wrap in StreamingResponse. response_model=None so FastAPI doesn't validate. Deleted stream_openai_response (post-hoc word-chunking is gone). Lifespan shutdown closes mem0_manager.async_http. Research confirmed mem0 itself does not ship an HTTP /v1/chat/completions (only the in-process mem0.proxy.main.Mem0 SDK pattern), so we replicate the pattern without adding a litellm dependency. SSE/tool_calls patterns are modeled after microsoft/agent-lightning's llm_proxy. Verified locally: ast.parse OK on all three files. End-to-end smoke tests will run on beast.
285 lines
10 KiB
Python
285 lines
10 KiB
Python
"""Ultra-minimal Pydantic models for pure Mem0 API."""
|
|
|
|
from typing import List, Optional, Dict, Any, Union
|
|
from pydantic import BaseModel, ConfigDict, Field
|
|
import re
|
|
|
|
|
|
# Constants for input validation
|
|
MAX_MESSAGE_LENGTH = 50000 # ~12k tokens max per message
|
|
MAX_QUERY_LENGTH = 10000 # ~2.5k tokens max per query
|
|
MAX_USER_ID_LENGTH = 100 # Reasonable user ID length
|
|
MAX_MEMORY_ID_LENGTH = 100 # Memory IDs are typically UUIDs
|
|
MAX_CONTEXT_MESSAGES = 100 # Max conversation context messages
|
|
USER_ID_PATTERN = r"^[a-zA-Z0-9_\-\.@]+$" # Alphanumeric with common separators
|
|
|
|
|
|
# Request Models
|
|
class ChatMessage(BaseModel):
|
|
"""Chat message structure."""
|
|
|
|
role: str = Field(
|
|
..., max_length=20, description="Message role (user, assistant, system)"
|
|
)
|
|
content: str = Field(
|
|
..., max_length=MAX_MESSAGE_LENGTH, description="Message content"
|
|
)
|
|
|
|
|
|
class ChatRequest(BaseModel):
|
|
"""Ultra-minimal chat request."""
|
|
|
|
message: str = Field(..., max_length=MAX_MESSAGE_LENGTH, description="User message")
|
|
user_id: Optional[str] = Field(
|
|
"default",
|
|
max_length=MAX_USER_ID_LENGTH,
|
|
pattern=USER_ID_PATTERN,
|
|
description="User identifier (alphanumeric, _, -, ., @)",
|
|
)
|
|
agent_id: Optional[str] = Field(
|
|
None, max_length=MAX_USER_ID_LENGTH, description="Agent identifier"
|
|
)
|
|
run_id: Optional[str] = Field(
|
|
None, max_length=MAX_USER_ID_LENGTH, description="Run identifier"
|
|
)
|
|
context: Optional[List[ChatMessage]] = Field(
|
|
None,
|
|
max_length=MAX_CONTEXT_MESSAGES,
|
|
description="Previous conversation context (max 100 messages)",
|
|
)
|
|
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata")
|
|
|
|
|
|
class MemoryAddRequest(BaseModel):
|
|
"""Request to add memories with hierarchy support - open-source compatible."""
|
|
|
|
messages: List[ChatMessage] = Field(
|
|
...,
|
|
max_length=MAX_CONTEXT_MESSAGES,
|
|
description="Messages to process (max 100 messages)",
|
|
)
|
|
user_id: Optional[str] = Field(
|
|
"default",
|
|
max_length=MAX_USER_ID_LENGTH,
|
|
pattern=USER_ID_PATTERN,
|
|
description="User identifier",
|
|
)
|
|
agent_id: Optional[str] = Field(
|
|
None, max_length=MAX_USER_ID_LENGTH, description="Agent identifier"
|
|
)
|
|
run_id: Optional[str] = Field(
|
|
None, max_length=MAX_USER_ID_LENGTH, description="Run identifier"
|
|
)
|
|
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata")
|
|
|
|
|
|
class MemorySearchRequest(BaseModel):
|
|
"""Request to search memories with hierarchy filtering."""
|
|
|
|
query: str = Field(..., max_length=MAX_QUERY_LENGTH, description="Search query")
|
|
user_id: Optional[str] = Field(
|
|
"default",
|
|
max_length=MAX_USER_ID_LENGTH,
|
|
pattern=USER_ID_PATTERN,
|
|
description="User identifier",
|
|
)
|
|
agent_id: Optional[str] = Field(
|
|
None, max_length=MAX_USER_ID_LENGTH, description="Agent identifier"
|
|
)
|
|
run_id: Optional[str] = Field(
|
|
None, max_length=MAX_USER_ID_LENGTH, description="Run identifier"
|
|
)
|
|
limit: int = Field(5, ge=1, le=100, description="Maximum number of results (1-100)")
|
|
threshold: Optional[float] = Field(
|
|
None, ge=0.0, le=1.0, description="Minimum relevance score (0-1)"
|
|
)
|
|
filters: Optional[Dict[str, Any]] = Field(None, description="Additional filters")
|
|
|
|
|
|
class MemoryUpdateRequest(BaseModel):
|
|
"""Request to update a memory."""
|
|
|
|
memory_id: str = Field(
|
|
..., max_length=MAX_MEMORY_ID_LENGTH, description="Memory ID to update"
|
|
)
|
|
user_id: str = Field(
|
|
...,
|
|
max_length=MAX_USER_ID_LENGTH,
|
|
pattern=USER_ID_PATTERN,
|
|
description="User identifier for ownership verification",
|
|
)
|
|
content: str = Field(
|
|
..., max_length=MAX_MESSAGE_LENGTH, description="New memory content"
|
|
)
|
|
metadata: Optional[Dict[str, Any]] = Field(None, description="Updated metadata")
|
|
|
|
|
|
# Response Models - Ultra-minimal
|
|
|
|
|
|
class MemoryItem(BaseModel):
|
|
"""Individual memory item."""
|
|
|
|
id: str = Field(..., description="Memory unique identifier")
|
|
memory: str = Field(..., description="Memory content")
|
|
user_id: Optional[str] = Field(None, description="Associated user ID")
|
|
agent_id: Optional[str] = Field(None, description="Associated agent ID")
|
|
run_id: Optional[str] = Field(None, description="Associated run ID")
|
|
metadata: Optional[Dict[str, Any]] = Field(None, description="Memory metadata")
|
|
score: Optional[float] = Field(
|
|
None, description="Relevance score (for search results)"
|
|
)
|
|
created_at: Optional[str] = Field(None, description="Creation timestamp")
|
|
updated_at: Optional[str] = Field(None, description="Last update timestamp")
|
|
|
|
|
|
class MemorySearchResponse(BaseModel):
|
|
"""Memory search results - pure Mem0 structure."""
|
|
|
|
memories: List[MemoryItem] = Field(..., description="Found memories")
|
|
total_count: int = Field(..., description="Total number of memories found")
|
|
query: str = Field(..., description="Original search query")
|
|
|
|
|
|
class MemoryAddResponse(BaseModel):
|
|
"""Response from adding memories - pure Mem0 structure."""
|
|
|
|
added_memories: List[Dict[str, Any]] = Field(
|
|
..., description="Memories that were added"
|
|
)
|
|
message: str = Field(..., description="Success message")
|
|
|
|
|
|
class GraphRelationship(BaseModel):
|
|
"""Graph relationship structure."""
|
|
|
|
source: str = Field(..., description="Source entity")
|
|
relationship: str = Field(..., description="Relationship type")
|
|
target: str = Field(..., description="Target entity")
|
|
properties: Optional[Dict[str, Any]] = Field(
|
|
None, description="Relationship properties"
|
|
)
|
|
|
|
|
|
class GraphResponse(BaseModel):
|
|
"""Graph relationships - pure Mem0 structure."""
|
|
|
|
relationships: List[GraphRelationship] = Field(
|
|
..., description="Found relationships"
|
|
)
|
|
entities: List[str] = Field(..., description="Unique entities")
|
|
user_id: str = Field(..., description="User identifier")
|
|
|
|
|
|
class HealthResponse(BaseModel):
|
|
"""Health check response."""
|
|
|
|
status: str = Field(..., description="Service status")
|
|
services: Dict[str, str] = Field(..., description="Individual service statuses")
|
|
timestamp: str = Field(..., description="Health check timestamp")
|
|
|
|
|
|
class ErrorResponse(BaseModel):
|
|
"""Error response structure."""
|
|
|
|
error: str = Field(..., description="Error message")
|
|
detail: Optional[str] = Field(None, description="Detailed error information")
|
|
status_code: int = Field(..., description="HTTP status code")
|
|
|
|
|
|
# Statistics and Monitoring Models
|
|
|
|
|
|
class MemoryOperationStats(BaseModel):
|
|
"""Memory operation statistics."""
|
|
|
|
add: int = Field(..., description="Number of add operations")
|
|
search: int = Field(..., description="Number of search operations")
|
|
update: int = Field(..., description="Number of update operations")
|
|
delete: int = Field(..., description="Number of delete operations")
|
|
|
|
|
|
class GlobalStatsResponse(BaseModel):
|
|
"""Global application statistics."""
|
|
|
|
total_memories: int = Field(..., description="Total memories across all users")
|
|
total_users: int = Field(..., description="Total number of users")
|
|
api_calls_today: int = Field(..., description="Total API calls today")
|
|
avg_response_time_ms: float = Field(
|
|
..., description="Average response time in milliseconds"
|
|
)
|
|
memory_operations: MemoryOperationStats = Field(
|
|
..., description="Memory operation breakdown"
|
|
)
|
|
uptime_seconds: float = Field(..., description="Application uptime in seconds")
|
|
|
|
|
|
class UserStatsResponse(BaseModel):
|
|
"""User-specific statistics."""
|
|
|
|
user_id: str = Field(..., description="User identifier")
|
|
memory_count: int = Field(..., description="Number of memories for this user")
|
|
relationship_count: int = Field(
|
|
..., description="Number of graph relationships for this user"
|
|
)
|
|
last_activity: Optional[str] = Field(None, description="Last activity timestamp")
|
|
api_calls_today: int = Field(..., description="API calls made by this user today")
|
|
avg_response_time_ms: float = Field(
|
|
..., description="Average response time for this user's requests"
|
|
)
|
|
|
|
|
|
# OpenAI-Compatible API Models
|
|
#
|
|
# The /v1/chat/completions handler is a pass-through proxy: requests are
|
|
# forwarded to the upstream LLM and the upstream response is relayed verbatim
|
|
# (dict for non-stream, raw SSE bytes for stream). Hence only the REQUEST
|
|
# model lives here; the response is never re-typed (typed Pydantic response
|
|
# models silently drop unknown fields like tool_calls / refusal /
|
|
# reasoning_tokens — see the audit in the plan file).
|
|
|
|
|
|
class OpenAIChatCompletionRequest(BaseModel):
|
|
"""OpenAI chat completion request — permissive schema, forwarded as-is.
|
|
|
|
Only `model` and `messages` are required. All other standard OpenAI
|
|
parameters are typed (for client IDE/docs benefit) but optional. Unknown
|
|
fields are accepted via `extra="allow"` and forwarded to upstream, so
|
|
new OpenAI parameters don't require a code change here.
|
|
"""
|
|
|
|
model_config = ConfigDict(extra="allow")
|
|
|
|
model: str = Field(..., description="Model to use (forwarded to upstream)")
|
|
messages: List[Dict[str, Any]] = Field(
|
|
..., description="Messages array (multi-part content supported)"
|
|
)
|
|
|
|
# Common params (typed for IDE/docs; all optional)
|
|
temperature: Optional[float] = None
|
|
top_p: Optional[float] = None
|
|
n: Optional[int] = None
|
|
stream: Optional[bool] = None
|
|
stream_options: Optional[Dict[str, Any]] = None
|
|
stop: Optional[Union[str, List[str]]] = None
|
|
max_tokens: Optional[int] = None # deprecated; still accepted
|
|
max_completion_tokens: Optional[int] = None # replaces max_tokens
|
|
presence_penalty: Optional[float] = None
|
|
frequency_penalty: Optional[float] = None
|
|
seed: Optional[int] = None
|
|
logprobs: Optional[bool] = None
|
|
top_logprobs: Optional[int] = None
|
|
response_format: Optional[Dict[str, Any]] = None
|
|
tools: Optional[List[Dict[str, Any]]] = None
|
|
tool_choice: Optional[Union[str, Dict[str, Any]]] = None
|
|
parallel_tool_calls: Optional[bool] = None
|
|
reasoning_effort: Optional[str] = None # o-series / reasoning models
|
|
modalities: Optional[List[str]] = None
|
|
audio: Optional[Dict[str, Any]] = None
|
|
prediction: Optional[Dict[str, Any]] = None
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
store: Optional[bool] = None
|
|
service_tier: Optional[str] = None
|
|
logit_bias: Optional[Dict[str, float]] = None
|
|
# `user` is ignored — the authenticated user is derived from the API key.
|
|
user: Optional[str] = None
|