Pin mem0ai[nlp]==2.0.2 and fastembed for the new hybrid-search pipeline. Drop OSS graph memory (removed upstream in 2.0.0, PR #4805): remove Neo4j service, env vars, volumes, and driver deps; mark /graph/relationships deprecated. Rewrite Memory.search/get_all/chat/health call sites to use the v2 filters={} + top_k API (entity IDs at top level now raise ValueError). Tighten MCP remove_memory ownership check to O(1) verify_memory_ownership so it doesn't silently truncate at the new top_k=20 default. Downgrade base image to python:3.12-slim for spaCy. Adds scripts/migrate_qdrant_to_v3.py (scroll+upsert with per-user count parity check) and docs/MIGRATION_RUNBOOK.md covering snapshot, dump, collection rebuild, cutover, and rollback procedures.
483 lines
17 KiB
Python
483 lines
17 KiB
Python
"""Ultra-minimal Mem0 Manager - Pure Mem0 + Custom OpenAI Endpoint Only."""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional, Any
|
|
from datetime import datetime
|
|
from mem0 import Memory
|
|
from openai import OpenAI
|
|
from tenacity import (
|
|
retry,
|
|
stop_after_attempt,
|
|
wait_exponential,
|
|
retry_if_exception_type,
|
|
before_sleep_log,
|
|
)
|
|
import structlog
|
|
|
|
from config import settings
|
|
from monitoring import timed
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
# Retry decorator for database operations (Qdrant)
|
|
db_retry = retry(
|
|
stop=stop_after_attempt(3),
|
|
wait=wait_exponential(multiplier=1, min=1, max=10),
|
|
retry=retry_if_exception_type((ConnectionError, TimeoutError, OSError)),
|
|
before_sleep=before_sleep_log(logger, logging.WARNING),
|
|
reraise=True,
|
|
)
|
|
|
|
# Monkey-patch Mem0's OpenAI LLM to clear top_p when the configured LLM
|
|
# is Claude reached via an OpenAI-compatible endpoint: Claude rejects top_p
|
|
# whenever temperature is set, and OpenAILLM sends both unconditionally.
|
|
# (The 'store' branch is now redundant in mem0ai>=2.0.0 — upstream made it
|
|
# opt-in — but harmless; kept for safety.)
|
|
from mem0.llms.openai import OpenAILLM
|
|
|
|
_original_generate_response = OpenAILLM.generate_response
|
|
|
|
|
|
def patched_generate_response(
|
|
self, messages, response_format=None, tools=None, tool_choice="auto", **kwargs
|
|
):
|
|
if hasattr(self.config, "store"):
|
|
self.config.store = None
|
|
if hasattr(self.config, "top_p"):
|
|
self.config.top_p = None
|
|
return _original_generate_response(
|
|
self, messages, response_format, tools, tool_choice, **kwargs
|
|
)
|
|
|
|
|
|
OpenAILLM.generate_response = patched_generate_response
|
|
logger.info("Applied Claude/OpenAI-compatible patch: cleared top_p (and store)")
|
|
|
|
|
|
def _build_filters(
|
|
user_id: Optional[str],
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
extra: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Build the filters dict required by mem0 v2 search/get_all.
|
|
|
|
In mem0 v2.x, user_id/agent_id/run_id are rejected as top-level kwargs
|
|
on Memory.search and Memory.get_all — they must live inside `filters`.
|
|
"""
|
|
merged: Dict[str, Any] = dict(extra) if extra else {}
|
|
if user_id is not None:
|
|
merged["user_id"] = user_id
|
|
if agent_id is not None:
|
|
merged["agent_id"] = agent_id
|
|
if run_id is not None:
|
|
merged["run_id"] = run_id
|
|
return merged
|
|
|
|
|
|
class Mem0Manager:
|
|
"""
|
|
Ultra-minimal manager that bridges custom OpenAI endpoint with pure Mem0.
|
|
No custom logic - let Mem0 handle all memory intelligence.
|
|
"""
|
|
|
|
def __init__(self):
|
|
logger.info(
|
|
"Initializing Mem0Manager with custom endpoint",
|
|
model=settings.default_model,
|
|
embedding_model=settings.embedding_model,
|
|
embedding_dims=settings.embedding_dims,
|
|
qdrant_host=settings.qdrant_host,
|
|
)
|
|
config = {
|
|
"version": "v1.1",
|
|
"llm": {
|
|
"provider": "openai",
|
|
"config": {
|
|
"model": settings.default_model,
|
|
"api_key": settings.openai_api_key,
|
|
"openai_base_url": settings.openai_base_url,
|
|
"temperature": 0.1,
|
|
"top_p": None,
|
|
},
|
|
},
|
|
"embedder": {
|
|
"provider": "ollama",
|
|
"config": {
|
|
"model": settings.embedding_model,
|
|
"ollama_base_url": settings.ollama_base_url,
|
|
"embedding_dims": settings.embedding_dims,
|
|
},
|
|
},
|
|
"vector_store": {
|
|
"provider": "qdrant",
|
|
"config": {
|
|
"collection_name": settings.qdrant_collection_name,
|
|
"host": settings.qdrant_host,
|
|
"port": settings.qdrant_port,
|
|
"embedding_model_dims": settings.embedding_dims,
|
|
"on_disk": True,
|
|
},
|
|
},
|
|
"reranker": {
|
|
"provider": "cohere",
|
|
"config": {
|
|
"api_key": settings.cohere_api_key,
|
|
"model": "rerank-english-v3.0",
|
|
"top_n": 10,
|
|
},
|
|
},
|
|
}
|
|
|
|
self.memory = Memory.from_config(config)
|
|
self.openai_client = OpenAI(
|
|
api_key=settings.openai_api_key,
|
|
base_url=settings.openai_base_url,
|
|
timeout=60.0, # 60 second timeout for LLM calls
|
|
max_retries=2, # Retry failed requests up to 2 times
|
|
)
|
|
logger.info("Initialized ultra-minimal Mem0Manager with custom endpoint")
|
|
|
|
# Pure passthrough methods - no custom logic
|
|
@db_retry
|
|
@timed("add_memories")
|
|
async def add_memories(
|
|
self,
|
|
messages: List[Dict[str, str]],
|
|
user_id: Optional[str] = "default",
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Add memories - simplified native Mem0 pattern (10 lines vs 45)."""
|
|
try:
|
|
# Convert ChatMessage objects to dict if needed
|
|
formatted_messages = []
|
|
for msg in messages:
|
|
if hasattr(msg, "dict"):
|
|
formatted_messages.append(msg.dict())
|
|
else:
|
|
formatted_messages.append(msg)
|
|
|
|
# Auto-enhance metadata for better memory quality
|
|
combined_metadata = metadata or {}
|
|
|
|
# Add automatic metadata enhancement
|
|
auto_metadata = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"source": "chat_conversation",
|
|
"message_count": len(formatted_messages),
|
|
"auto_generated": True,
|
|
}
|
|
|
|
# Merge user metadata with auto metadata (user metadata takes precedence)
|
|
enhanced_metadata = {**auto_metadata, **combined_metadata}
|
|
|
|
# Direct Mem0 add with enhanced metadata
|
|
result = self.memory.add(
|
|
formatted_messages,
|
|
user_id=user_id,
|
|
agent_id=agent_id,
|
|
run_id=run_id,
|
|
metadata=enhanced_metadata,
|
|
)
|
|
|
|
return {
|
|
"added_memories": result if isinstance(result, list) else [result],
|
|
"message": "Memories added successfully",
|
|
"hierarchy": {
|
|
"user_id": user_id,
|
|
"agent_id": agent_id,
|
|
"run_id": run_id,
|
|
},
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error adding memories: {e}")
|
|
raise
|
|
|
|
@db_retry
|
|
@timed("search_memories")
|
|
async def search_memories(
|
|
self,
|
|
query: str,
|
|
user_id: Optional[str] = "default",
|
|
limit: int = 5,
|
|
threshold: Optional[float] = None,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
# keyword_search: bool = False,
|
|
# rerank: bool = False,
|
|
# filter_memories: bool = False,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Search memories - native Mem0 pattern"""
|
|
try:
|
|
# Minimal empty query protection for API compatibility
|
|
if not query or query.strip() == "":
|
|
return {
|
|
"memories": [],
|
|
"total_count": 0,
|
|
"query": query,
|
|
"note": "Empty query provided, no results returned. Use a specific query to search memories.",
|
|
}
|
|
# mem0 v2: entity IDs must live inside the `filters` dict; `limit` is now `top_k`.
|
|
result = self.memory.search(
|
|
query=query,
|
|
filters=_build_filters(user_id, agent_id, run_id, extra=filters),
|
|
top_k=limit,
|
|
threshold=threshold,
|
|
)
|
|
return {
|
|
"memories": result.get("results", []),
|
|
"total_count": len(result.get("results", [])),
|
|
"query": query,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error searching memories: {e}")
|
|
raise
|
|
|
|
@db_retry
|
|
async def get_user_memories(
|
|
self,
|
|
user_id: str,
|
|
limit: int = 10,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get all memories for a user - native Mem0 pattern."""
|
|
try:
|
|
# mem0 v2: entity IDs must live inside the `filters` dict; `limit` is now `top_k`.
|
|
result = self.memory.get_all(
|
|
filters=_build_filters(user_id, agent_id, run_id, extra=filters),
|
|
top_k=limit,
|
|
)
|
|
return result.get("results", [])
|
|
except Exception as e:
|
|
logger.error(f"Error getting user memories: {e}")
|
|
raise
|
|
|
|
@db_retry
|
|
async def get_memory(self, memory_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get a single memory by ID. Returns None if not found."""
|
|
try:
|
|
result = self.memory.get(memory_id=memory_id)
|
|
return result
|
|
except Exception as e:
|
|
logger.debug(f"Memory {memory_id} not found or error: {e}")
|
|
return None
|
|
|
|
async def verify_memory_ownership(self, memory_id: str, user_id: str) -> bool:
|
|
"""Check if a memory belongs to a user. O(1) instead of O(n)."""
|
|
memory = await self.get_memory(memory_id)
|
|
if memory is None:
|
|
return False
|
|
return memory.get("user_id") == user_id
|
|
|
|
@db_retry
|
|
@timed("update_memory")
|
|
async def update_memory(
|
|
self,
|
|
memory_id: str,
|
|
content: str,
|
|
) -> Dict[str, Any]:
|
|
"""Update memory - pure Mem0 passthrough."""
|
|
try:
|
|
result = self.memory.update(memory_id=memory_id, data=content)
|
|
return {"message": "Memory updated successfully", "result": result}
|
|
except Exception as e:
|
|
logger.error(f"Error updating memory: {e}")
|
|
raise
|
|
|
|
@db_retry
|
|
@timed("delete_memory")
|
|
async def delete_memory(self, memory_id: str) -> Dict[str, Any]:
|
|
"""Delete memory - pure Mem0 passthrough."""
|
|
try:
|
|
self.memory.delete(memory_id=memory_id)
|
|
return {"message": "Memory deleted successfully"}
|
|
except Exception as e:
|
|
logger.error(f"Error deleting memory: {e}")
|
|
raise
|
|
|
|
async def delete_user_memories(self, user_id: Optional[str]) -> Dict[str, Any]:
|
|
"""Delete all user memories - pure Mem0 passthrough."""
|
|
try:
|
|
self.memory.delete_all(user_id=user_id)
|
|
return {"message": "All user memories deleted successfully"}
|
|
except Exception as e:
|
|
logger.error(f"Error deleting user memories: {e}")
|
|
raise
|
|
|
|
async def get_memory_history(self, memory_id: str) -> Dict[str, Any]:
|
|
"""Get memory change history - pure Mem0 passthrough."""
|
|
try:
|
|
history = self.memory.history(memory_id=memory_id)
|
|
return {
|
|
"memory_id": memory_id,
|
|
"history": history,
|
|
"message": "Memory history retrieved successfully",
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting memory history: {e}")
|
|
raise
|
|
|
|
async def get_graph_relationships(
|
|
self,
|
|
user_id: Optional[str],
|
|
agent_id: Optional[str],
|
|
run_id: Optional[str],
|
|
limit: int = 50,
|
|
) -> Dict[str, Any]:
|
|
"""Graph relationships — deprecated in mem0 v2 (OSS graph memory removed).
|
|
|
|
mem0 v2.0.0 deleted the OSS graph store (Neo4j/Memgraph/Kuzu/AGE drivers).
|
|
Entity relationships now influence ranking via a parallel `{collection}_entities`
|
|
Qdrant collection rather than being directly traversable. We return an empty
|
|
graph payload plus a `deprecated` marker so clients (frontend graph.html) can
|
|
render a clear "Graph view unavailable" state instead of erroring.
|
|
"""
|
|
return {
|
|
"relationships": [],
|
|
"entities": [],
|
|
"user_id": user_id,
|
|
"agent_id": agent_id,
|
|
"run_id": run_id,
|
|
"total_memories": 0,
|
|
"total_relationships": 0,
|
|
"deprecated": True,
|
|
"deprecation_note": (
|
|
"OSS graph memory was removed in mem0 v2.0.0. Use search/get_all for "
|
|
"memory retrieval; entity links now affect ranking only."
|
|
),
|
|
}
|
|
|
|
@timed("chat_with_memory")
|
|
async def chat_with_memory(
|
|
self,
|
|
message: str,
|
|
user_id: Optional[str] = None,
|
|
agent_id: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
context: Optional[List[Dict[str, str]]] = None,
|
|
# metadata: Optional[Dict[str, Any]] = None
|
|
) -> Dict[str, Any]:
|
|
"""Chat with memory - native Mem0 pattern with detailed timing."""
|
|
import time
|
|
|
|
try:
|
|
total_start_time = time.time()
|
|
logger.info("Starting chat request", user_id=user_id)
|
|
|
|
search_start_time = time.time()
|
|
search_result = self.memory.search(
|
|
query=message,
|
|
filters=_build_filters(user_id, agent_id, run_id),
|
|
top_k=10,
|
|
threshold=0.3,
|
|
)
|
|
relevant_memories = search_result.get("results", [])
|
|
memories_str = "\n".join(
|
|
f"- {entry['memory']}" for entry in relevant_memories
|
|
)
|
|
search_time = time.time() - search_start_time
|
|
logger.debug(
|
|
"Memory search completed",
|
|
search_time_s=round(search_time, 2),
|
|
memories_found=len(relevant_memories),
|
|
)
|
|
|
|
prep_start_time = time.time()
|
|
system_prompt = f"You are a helpful AI. Answer the question based on query and memories.\nUser Memories:\n{memories_str}"
|
|
messages = [{"role": "system", "content": system_prompt}]
|
|
|
|
if context:
|
|
messages.extend(context)
|
|
logger.debug("Added context messages", context_count=len(context))
|
|
|
|
messages.append({"role": "user", "content": message})
|
|
prep_time = time.time() - prep_start_time
|
|
|
|
llm_start_time = time.time()
|
|
response = self.openai_client.chat.completions.create(
|
|
model=settings.default_model, messages=messages
|
|
)
|
|
assistant_response = response.choices[0].message.content
|
|
llm_time = time.time() - llm_start_time
|
|
logger.debug(
|
|
"LLM call completed",
|
|
llm_time_s=round(llm_time, 2),
|
|
model=settings.default_model,
|
|
)
|
|
|
|
add_start_time = time.time()
|
|
memory_messages = [
|
|
{"role": "user", "content": message},
|
|
{"role": "assistant", "content": assistant_response},
|
|
]
|
|
self.memory.add(memory_messages, user_id=user_id)
|
|
add_time = time.time() - add_start_time
|
|
|
|
total_time = time.time() - total_start_time
|
|
logger.info(
|
|
"Chat request completed",
|
|
user_id=user_id,
|
|
total_time_s=round(total_time, 2),
|
|
search_time_s=round(search_time, 2),
|
|
llm_time_s=round(llm_time, 2),
|
|
add_time_s=round(add_time, 2),
|
|
memories_used=len(relevant_memories),
|
|
model=settings.default_model,
|
|
)
|
|
|
|
return {
|
|
"response": assistant_response,
|
|
"memories_used": len(relevant_memories),
|
|
"model_used": settings.default_model,
|
|
"timing": {
|
|
"total": round(total_time, 2),
|
|
"search": round(search_time, 2),
|
|
"llm": round(llm_time, 2),
|
|
"add": round(add_time, 2),
|
|
},
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error in chat_with_memory",
|
|
error=str(e),
|
|
user_id=user_id,
|
|
exc_info=True,
|
|
)
|
|
return {
|
|
"error": str(e),
|
|
"response": "I apologize, but I encountered an error processing your request.",
|
|
"memories_used": 0,
|
|
"model_used": None,
|
|
}
|
|
|
|
async def health_check(self) -> Dict[str, str]:
|
|
"""Basic health check - just connectivity."""
|
|
status = {}
|
|
|
|
# Check custom OpenAI endpoint
|
|
try:
|
|
models = self.openai_client.models.list()
|
|
status["openai_endpoint"] = "healthy"
|
|
except Exception as e:
|
|
status["openai_endpoint"] = f"unhealthy: {str(e)}"
|
|
|
|
# Check Mem0 memory
|
|
try:
|
|
self.memory.search(
|
|
query="test", filters={"user_id": "health_check"}, top_k=1
|
|
)
|
|
status["mem0_memory"] = "healthy"
|
|
except Exception as e:
|
|
status["mem0_memory"] = f"unhealthy: {str(e)}"
|
|
|
|
return status
|
|
|
|
|
|
# Global instance
|
|
mem0_manager = Mem0Manager()
|