knowledge-base/backend/mem0_manager.py
Pratik Narola 06875473b2 feat: enable reranker, automate backups, tune extraction prompt
Three S-effort wins from the post-migration audit:

#1 Enable Cohere reranker on both Memory.search call sites
   (rerank=True), over-fetch top_k=max(limit*3, 30) to give the
   reranker a 30-50 candidate pool, then truncate to the caller's
   limit. Bump reranker config to rerank-v3.5 (4096 ctx, multilingual
   — matters for Hindi/Hinglish traffic) and top_n 10 → 50 so the
   output cap doesn't truncate below typical over-fetch sizes. Cohere
   was configured but never invoked; this is the single biggest
   quality lift the audit surfaced.

#2 Add scripts/backup_qdrant.sh and scripts/restore_test.sh. Daily
   snapshot of both collections back-to-back, docker cp to local
   YYYY-MM-DD dir, optional rclone off-host, prune local >14d, emit
   Prometheus textfile metric. Weekly restore_test.sh restores into a
   transient collection and asserts point count parity. Closes the
   zero-automated-backup gap.

#3 Add CUSTOM_FACT_EXTRACTION_INSTRUCTIONS, wired via MemoryConfig's
   custom_instructions field. mem0 appends this as its own
   '## Custom Instructions' section in the additive-extraction user
   prompt (verified against generate_additive_extraction_prompt) —
   does not replace mem0's role/format guidance. Re-prioritizes the
   default consumer-organizer few-shots toward work/projects/
   relationships/recurring context, the actual usage pattern here.
2026-05-23 19:53:59 +05:30

531 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Ultra-minimal Mem0 Manager - Pure Mem0 + Custom OpenAI Endpoint Only."""
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime
from mem0 import Memory
from openai import OpenAI
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
import structlog
from config import settings
from monitoring import timed
logger = structlog.get_logger(__name__)
# Retry decorator for database operations (Qdrant)
db_retry = retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((ConnectionError, TimeoutError, OSError)),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
# Monkey-patch Mem0's OpenAI LLM to clear top_p when the configured LLM
# is Claude reached via an OpenAI-compatible endpoint: Claude rejects top_p
# whenever temperature is set, and OpenAILLM sends both unconditionally.
# (The 'store' branch is now redundant in mem0ai>=2.0.0 — upstream made it
# opt-in — but harmless; kept for safety.)
from mem0.llms.openai import OpenAILLM
_original_generate_response = OpenAILLM.generate_response
def patched_generate_response(
self, messages, response_format=None, tools=None, tool_choice="auto", **kwargs
):
if hasattr(self.config, "store"):
self.config.store = None
if hasattr(self.config, "top_p"):
self.config.top_p = None
return _original_generate_response(
self, messages, response_format, tools, tool_choice, **kwargs
)
OpenAILLM.generate_response = patched_generate_response
logger.info("Applied Claude/OpenAI-compatible patch: cleared top_p (and store)")
def _build_filters(
user_id: Optional[str],
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Build the filters dict required by mem0 v2 search/get_all.
In mem0 v2.x, user_id/agent_id/run_id are rejected as top-level kwargs
on Memory.search and Memory.get_all — they must live inside `filters`.
"""
merged: Dict[str, Any] = dict(extra) if extra else {}
if user_id is not None:
merged["user_id"] = user_id
if agent_id is not None:
merged["agent_id"] = agent_id
if run_id is not None:
merged["run_id"] = run_id
return merged
# Appended as the "## Custom Instructions" section of the additive-extraction
# prompt (mem0/configs/prompts.py::generate_additive_extraction_prompt). The
# default few-shot bias is consumer-organizer ("favourite movies", "SF restaurants"),
# which under-extracts on the work/project/relationship traffic this deployment
# actually sees. This re-prioritizes without replacing mem0's structural guidance.
CUSTOM_FACT_EXTRACTION_INSTRUCTIONS = """
This memory store serves a working assistant — engineering, product, and operational contexts plus the user's people and recurring life context. Prioritize accordingly:
HIGH-VALUE facts to capture:
- Work context: company, team, role; ongoing projects with goals/status/blockers; product or domain knowledge being built; tools/frameworks/languages in active use; technical decisions and the reasoning; recurring meetings or rituals.
- People in the user's orbit: colleagues, family, friends, mentors — names, relationships, roles, what they do, the current state of the relationship or shared context.
- Recurring personal context: home/work locations, regular schedule, standing commitments, durable preferences (food restrictions, working hours, communication style), planned events with dates.
- Acquired knowledge: concepts being studied or built, specific problems being solved, prior solutions tried and their outcomes.
LOWER-PRIORITY (extract only if they reveal a pattern or future relevance):
- Single transient states ("running 5 minutes late", "didn't sleep well") — capture only if they recur or signal a habit.
- Movies, music, restaurants, hobbies — only when noted as durable preferences or part of a recurring activity, not when mentioned in passing.
SKIP entirely:
- Generic world knowledge (timezones, capital cities, definitions) — the assistant already knows these.
- Greetings, acknowledgments, meta-conversation ("Thanks!", "Got it").
- Restatements or paraphrases of facts already in Existing Memories or Recently Extracted Memories.
Prefer specificity. "Pratik uses FastAPI for backend services" beats "Pratik does backend development." When a person is mentioned by a short name or nickname, capture the relationship if known ("Anushree is Pratik's wife") so future references resolve correctly.
""".strip()
class Mem0Manager:
"""
Ultra-minimal manager that bridges custom OpenAI endpoint with pure Mem0.
No custom logic - let Mem0 handle all memory intelligence.
"""
def __init__(self):
logger.info(
"Initializing Mem0Manager with custom endpoint",
model=settings.default_model,
embedding_model=settings.embedding_model,
embedding_dims=settings.embedding_dims,
qdrant_host=settings.qdrant_host,
)
config = {
"version": "v1.1",
"custom_instructions": CUSTOM_FACT_EXTRACTION_INSTRUCTIONS,
"llm": {
"provider": "openai",
"config": {
"model": settings.default_model,
"api_key": settings.openai_api_key,
"openai_base_url": settings.openai_base_url,
"temperature": 0.1,
"top_p": None,
},
},
"embedder": {
# Route embeddings through the OpenAI-compatible LiteLLM proxy
# rather than Ollama directly — the proxy is reachable from the
# container in all deployments, Ollama may not be. The model
# name is the same (qwen3-embedding:4b-q8_0); existing vectors
# generated via this path stay compatible.
"provider": "openai",
"config": {
"model": settings.embedding_model,
"api_key": settings.openai_api_key,
"openai_base_url": settings.openai_base_url,
"embedding_dims": settings.embedding_dims,
},
},
"vector_store": {
"provider": "qdrant",
"config": {
"collection_name": settings.qdrant_collection_name,
"host": settings.qdrant_host,
"port": settings.qdrant_port,
"embedding_model_dims": settings.embedding_dims,
"on_disk": True,
},
},
"reranker": {
"provider": "cohere",
"config": {
"api_key": settings.cohere_api_key,
# v3.5 supersedes v3.0: 4096-token context, multilingual
# (our users include Hindi/Hinglish content that the
# English-only v3 silently underperforms on).
"model": "rerank-v3.5",
# Raised from 10 → 50 so the rerank output cap does not
# truncate below typical over-fetch sizes (see search calls
# below, which request top_k up to ~3× the user's limit).
"top_n": 50,
},
},
}
self.memory = Memory.from_config(config)
self.openai_client = OpenAI(
api_key=settings.openai_api_key,
base_url=settings.openai_base_url,
timeout=60.0, # 60 second timeout for LLM calls
max_retries=2, # Retry failed requests up to 2 times
)
logger.info("Initialized ultra-minimal Mem0Manager with custom endpoint")
# Pure passthrough methods - no custom logic
@db_retry
@timed("add_memories")
async def add_memories(
self,
messages: List[Dict[str, str]],
user_id: Optional[str] = "default",
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Add memories - simplified native Mem0 pattern (10 lines vs 45)."""
try:
# Convert ChatMessage objects to dict if needed
formatted_messages = []
for msg in messages:
if hasattr(msg, "dict"):
formatted_messages.append(msg.dict())
else:
formatted_messages.append(msg)
# Auto-enhance metadata for better memory quality
combined_metadata = metadata or {}
# Add automatic metadata enhancement
auto_metadata = {
"timestamp": datetime.now().isoformat(),
"source": "chat_conversation",
"message_count": len(formatted_messages),
"auto_generated": True,
}
# Merge user metadata with auto metadata (user metadata takes precedence)
enhanced_metadata = {**auto_metadata, **combined_metadata}
# Direct Mem0 add with enhanced metadata
result = self.memory.add(
formatted_messages,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
metadata=enhanced_metadata,
)
return {
"added_memories": result if isinstance(result, list) else [result],
"message": "Memories added successfully",
"hierarchy": {
"user_id": user_id,
"agent_id": agent_id,
"run_id": run_id,
},
}
except Exception as e:
logger.error(f"Error adding memories: {e}")
raise
@db_retry
@timed("search_memories")
async def search_memories(
self,
query: str,
user_id: Optional[str] = "default",
limit: int = 5,
threshold: Optional[float] = None,
filters: Optional[Dict[str, Any]] = None,
# keyword_search: bool = False,
# rerank: bool = False,
# filter_memories: bool = False,
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Search memories - native Mem0 pattern"""
try:
# Minimal empty query protection for API compatibility
if not query or query.strip() == "":
return {
"memories": [],
"total_count": 0,
"query": query,
"note": "Empty query provided, no results returned. Use a specific query to search memories.",
}
# mem0 v2: entity IDs must live inside the `filters` dict; `limit` is now `top_k`.
# Over-fetch a 3050-candidate pool so the Cohere reranker (rerank=True)
# has room to reorder; then truncate to the caller's requested limit.
overfetch = max(limit * 3, 30)
result = self.memory.search(
query=query,
filters=_build_filters(user_id, agent_id, run_id, extra=filters),
top_k=overfetch,
threshold=threshold,
rerank=True,
)
memories = result.get("results", [])[:limit]
return {
"memories": memories,
"total_count": len(memories),
"query": query,
}
except Exception as e:
logger.error(f"Error searching memories: {e}")
raise
@db_retry
async def get_user_memories(
self,
user_id: str,
limit: int = 10,
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
filters: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""Get all memories for a user - native Mem0 pattern."""
try:
# mem0 v2: entity IDs must live inside the `filters` dict; `limit` is now `top_k`.
result = self.memory.get_all(
filters=_build_filters(user_id, agent_id, run_id, extra=filters),
top_k=limit,
)
return result.get("results", [])
except Exception as e:
logger.error(f"Error getting user memories: {e}")
raise
@db_retry
async def get_memory(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Get a single memory by ID. Returns None if not found."""
try:
result = self.memory.get(memory_id=memory_id)
return result
except Exception as e:
logger.debug(f"Memory {memory_id} not found or error: {e}")
return None
async def verify_memory_ownership(self, memory_id: str, user_id: str) -> bool:
"""Check if a memory belongs to a user. O(1) instead of O(n)."""
memory = await self.get_memory(memory_id)
if memory is None:
return False
return memory.get("user_id") == user_id
@db_retry
@timed("update_memory")
async def update_memory(
self,
memory_id: str,
content: str,
) -> Dict[str, Any]:
"""Update memory - pure Mem0 passthrough."""
try:
result = self.memory.update(memory_id=memory_id, data=content)
return {"message": "Memory updated successfully", "result": result}
except Exception as e:
logger.error(f"Error updating memory: {e}")
raise
@db_retry
@timed("delete_memory")
async def delete_memory(self, memory_id: str) -> Dict[str, Any]:
"""Delete memory - pure Mem0 passthrough."""
try:
self.memory.delete(memory_id=memory_id)
return {"message": "Memory deleted successfully"}
except Exception as e:
logger.error(f"Error deleting memory: {e}")
raise
async def delete_user_memories(self, user_id: Optional[str]) -> Dict[str, Any]:
"""Delete all user memories - pure Mem0 passthrough."""
try:
self.memory.delete_all(user_id=user_id)
return {"message": "All user memories deleted successfully"}
except Exception as e:
logger.error(f"Error deleting user memories: {e}")
raise
async def get_memory_history(self, memory_id: str) -> Dict[str, Any]:
"""Get memory change history - pure Mem0 passthrough."""
try:
history = self.memory.history(memory_id=memory_id)
return {
"memory_id": memory_id,
"history": history,
"message": "Memory history retrieved successfully",
}
except Exception as e:
logger.error(f"Error getting memory history: {e}")
raise
async def get_graph_relationships(
self,
user_id: Optional[str],
agent_id: Optional[str],
run_id: Optional[str],
limit: int = 50,
) -> Dict[str, Any]:
"""Graph relationships — deprecated in mem0 v2 (OSS graph memory removed).
mem0 v2.0.0 deleted the OSS graph store (Neo4j/Memgraph/Kuzu/AGE drivers).
Entity relationships now influence ranking via a parallel `{collection}_entities`
Qdrant collection rather than being directly traversable. We return an empty
graph payload plus a `deprecated` marker so clients (frontend graph.html) can
render a clear "Graph view unavailable" state instead of erroring.
"""
return {
"relationships": [],
"entities": [],
"user_id": user_id,
"agent_id": agent_id,
"run_id": run_id,
"total_memories": 0,
"total_relationships": 0,
"deprecated": True,
"deprecation_note": (
"OSS graph memory was removed in mem0 v2.0.0. Use search/get_all for "
"memory retrieval; entity links now affect ranking only."
),
}
@timed("chat_with_memory")
async def chat_with_memory(
self,
message: str,
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
context: Optional[List[Dict[str, str]]] = None,
# metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Chat with memory - native Mem0 pattern with detailed timing."""
import time
try:
total_start_time = time.time()
logger.info("Starting chat request", user_id=user_id)
search_start_time = time.time()
# Over-fetch for the Cohere reranker (rerank=True), then keep the
# top 10 reranked memories for the system prompt.
search_result = self.memory.search(
query=message,
filters=_build_filters(user_id, agent_id, run_id),
top_k=30,
threshold=0.3,
rerank=True,
)
relevant_memories = search_result.get("results", [])[:10]
memories_str = "\n".join(
f"- {entry['memory']}" for entry in relevant_memories
)
search_time = time.time() - search_start_time
logger.debug(
"Memory search completed",
search_time_s=round(search_time, 2),
memories_found=len(relevant_memories),
)
prep_start_time = time.time()
system_prompt = f"You are a helpful AI. Answer the question based on query and memories.\nUser Memories:\n{memories_str}"
messages = [{"role": "system", "content": system_prompt}]
if context:
messages.extend(context)
logger.debug("Added context messages", context_count=len(context))
messages.append({"role": "user", "content": message})
prep_time = time.time() - prep_start_time
llm_start_time = time.time()
response = self.openai_client.chat.completions.create(
model=settings.default_model, messages=messages
)
assistant_response = response.choices[0].message.content
llm_time = time.time() - llm_start_time
logger.debug(
"LLM call completed",
llm_time_s=round(llm_time, 2),
model=settings.default_model,
)
add_start_time = time.time()
memory_messages = [
{"role": "user", "content": message},
{"role": "assistant", "content": assistant_response},
]
self.memory.add(memory_messages, user_id=user_id)
add_time = time.time() - add_start_time
total_time = time.time() - total_start_time
logger.info(
"Chat request completed",
user_id=user_id,
total_time_s=round(total_time, 2),
search_time_s=round(search_time, 2),
llm_time_s=round(llm_time, 2),
add_time_s=round(add_time, 2),
memories_used=len(relevant_memories),
model=settings.default_model,
)
return {
"response": assistant_response,
"memories_used": len(relevant_memories),
"model_used": settings.default_model,
"timing": {
"total": round(total_time, 2),
"search": round(search_time, 2),
"llm": round(llm_time, 2),
"add": round(add_time, 2),
},
}
except Exception as e:
logger.error(
"Error in chat_with_memory",
error=str(e),
user_id=user_id,
exc_info=True,
)
return {
"error": str(e),
"response": "I apologize, but I encountered an error processing your request.",
"memories_used": 0,
"model_used": None,
}
async def health_check(self) -> Dict[str, str]:
"""Basic health check - just connectivity."""
status = {}
# Check custom OpenAI endpoint
try:
models = self.openai_client.models.list()
status["openai_endpoint"] = "healthy"
except Exception as e:
status["openai_endpoint"] = f"unhealthy: {str(e)}"
# Check Mem0 memory
try:
self.memory.search(
query="test", filters={"user_id": "health_check"}, top_k=1
)
status["mem0_memory"] = "healthy"
except Exception as e:
status["mem0_memory"] = f"unhealthy: {str(e)}"
return status
# Global instance
mem0_manager = Mem0Manager()