Compare commits

..

No commits in common. "chore/mem0-v3-migration" and "main" have entirely different histories.

19 changed files with 362 additions and 1926 deletions

View file

@ -1,84 +0,0 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project overview
FastAPI backend that wraps the `mem0ai` SDK (pinned to `mem0ai[nlp]==2.0.2` — the "V3 memory pipeline") to expose memory operations (add/search/update/delete, plus memory-aware chat) over a REST API, an OpenAI-compatible `/v1/chat/completions` endpoint, and an MCP server. Memory is stored in Qdrant (vectors + BM25 sparse), with a sister `{collection}_entities` Qdrant collection auto-created by mem0 for entity linking. Embeddings come from a local Ollama instance; the LLM is a custom OpenAI-compatible endpoint. The frontend is two standalone HTML files (`index.html`, `graph.html`) that call the API directly — no build step.
## Common commands
```bash
# First-time setup (creates volumes, builds, brings up stack). Prompts to reset volumes if they already exist.
./setup.sh
# Day-to-day
docker compose up -d --build # rebuild + start
docker compose down # stop (keeps volumes)
docker compose down -v # stop + delete data
docker compose logs -f backend # tail backend logs (structlog JSON)
docker compose restart backend # pick up code changes (no --reload; see "Volumes" gotcha)
# Sanity check — assumes a host route to backend:8000 exists (see "Networking" gotcha).
curl http://localhost:8000/health
# Integration tests — hit the running stack, no mocks. See test_integration.py for the test list.
MEM0_API_KEY=<key-from-API_KEYS> python test_integration.py
MEM0_API_KEY=<key-from-API_KEYS> python test_integration.py -v
```
There are no unit tests and no separate lint/format/type-check setup — `test_integration.py` is the only test entry point and it requires a fully-running Docker stack. The script generates a fresh `TEST_USER = f"test_user_{int(datetime.now().timestamp())}"` per run, so for tests to pass auth checks the supplied `MEM0_API_KEY` must map to that exact user in `API_KEYS` (either set `TEST_USER` to a statically mapped user, or add a mapping for the run).
## Architecture
### Request flow
1. Client hits FastAPI (`backend/main.py`) with `X-API-Key` (or `Authorization: Bearer` for `/v1/chat/completions`).
2. `auth.py` resolves the key → `user_id` via `settings.api_key_mapping` (parsed from the `API_KEYS` env JSON). Every protected endpoint then verifies the caller's `user_id` matches the path/body `user_id` — there is no admin or cross-user access.
3. The endpoint calls `mem0_manager.mem0_manager` (singleton in `backend/mem0_manager.py`), which delegates to the `mem0ai` SDK. The SDK in turn calls Qdrant, Ollama, Cohere (reranker), and the custom OpenAI endpoint.
4. The `@timed("operation_name")` decorator from `backend/monitoring.py` wraps memory operations to log structured timings and feed the in-memory `stats` singleton that powers `/stats` and `/stats/{user_id}`.
### Three parallel API surfaces
All three live in the same FastAPI process and share auth + rate limiting:
- **Native REST** (`/chat`, `/memories*`, `/graph/relationships/{user_id}` *(deprecated — returns empty payload)*, `/stats*`, `/models`, `/users`) — authenticates via `X-API-Key`.
- **OpenAI-compatible** (`/v1/chat/completions`, also `/chat/completions`) — authenticates via `Authorization: Bearer <key>` or `X-API-Key`; supports streaming SSE. Implemented in `main.py:openai_chat_completions` and `stream_openai_response`.
- **MCP** mounted at `/mcp` (see `backend/mcp_server.py`) — uses a Starlette `MCPAuthMiddleware` that stuffs the resolved `user_id` into a `ContextVar`, which the FastMCP tools (`add_memory`, `search_memory`, `remove_memory`, `chat`) read. The MCP session manager is started inside the main FastAPI `lifespan` in `main.py` — mounted-app lifespans don't run automatically, so don't move that startup logic.
### Storage layout
- **Qdrant** — collection name from `QDRANT_COLLECTION_NAME` (default `mem0`). Embedding dim must match the embedder; see "Embedding dimensions" gotcha below. Collections created by mem0 v2 carry a `bm25` sparse-vector slot for hybrid search (semantic + keyword + entity-boost). The slot is added automatically at collection creation; existing pre-v2 collections silently degrade to semantic-only with a logged warning — they must be recreated to gain BM25.
- **`{collection}_entities`** — sister Qdrant collection lazy-created by mem0 v2 on first `add()`, same dimension as the main collection. Stores entity vectors used for ranking boost. No code touches it directly.
- **No graph store** — Neo4j and the OSS graph memory feature were removed in `mem0ai` 2.0.0 (PR #4805). The `/graph/relationships/{user_id}` endpoint is kept for client compatibility but returns `deprecated: true` with empty arrays.
- **No SQL store** — older docs mention PostgreSQL/pgvector; that's no longer used. Qdrant only.
## Important conventions / gotchas
### Networking: backend is not published to the host
`docker-compose.yml` defines the backend on the **external** `npm_network` (Nginx Proxy Manager) and only `expose`s port 8000 inside Docker. There is no `ports:` mapping. To hit it from the host you need either: (a) the NPM proxy in front of it, (b) `docker compose exec backend curl ...`, or (c) add a temporary `ports:` mapping. The `npm_network` must exist before bringing the stack up (`docker network create npm_network` if you don't run NPM).
### Claude/OpenAI-compatible monkey-patch
`mem0_manager.py` patches `mem0.llms.openai.OpenAILLM.generate_response` at import time to clear `store` and `top_p` from the config. In `mem0ai>=2.0.0`, the `store` half is redundant (upstream made `store` opt-in) but kept as a harmless safety net. The `top_p` clearing is still load-bearing: Claude (reached via the custom OpenAI-compatible endpoint) rejects `top_p` whenever `temperature` is set, and `OpenAILLM` sends both unconditionally. If you upgrade `mem0ai` and chat starts 400-ing on the custom endpoint, this patch is the first place to look.
### Embedding model and dimensions are coupled
`EMBEDDING_MODEL` and `EMBEDDING_DIMS` in `.env` / `docker-compose.yml` must agree, and they must match the dim the Qdrant collection was created with. Defaults are `qwen3-embedding:4b-q8_0` / `2560`. Switching the model requires either matching dims or recreating the Qdrant collection (`./setup.sh` → option 2 wipes volumes).
### Single-model architecture
Despite what `README.md`, `TESTING.md`, and `MEM0.md` say about intelligent routing across `o4-mini` / `gemini-2.5-pro` / `claude-sonnet-4` / `o3`, the code uses **one** model — `settings.default_model` (`claude-sonnet-4` by default). `/models` returns only that. Don't reintroduce routing without first checking with the user.
### mem0 v2 API: filters dict + top_k
mem0 v2 rejects `user_id`/`agent_id`/`run_id` as top-level kwargs on `Memory.search` and `Memory.get_all` (raises `ValueError`) — they must live inside a `filters={...}` dict. The `limit` kwarg is renamed `top_k` (default reduced 100 → 20 — pass it explicitly when you need more). `Memory.add` and `Memory.delete_all` still accept these IDs as top-level kwargs. Use the `_build_filters()` helper at the top of `mem0_manager.py` to construct the dict. Search `score` is now a fused multi-signal value (semantic + BM25 + entity boost), not raw cosine — don't compare against thresholds calibrated for the old scoring.
### ADD-only memory algorithm
`Memory.add` in mem0 v2 only emits `ADD` events; the engine no longer issues `UPDATE`/`DELETE` events based on LLM judgment. Per-user memory count grows monotonically. Explicit `Memory.update` / `Memory.delete` still work and are how the project mutates memories.
### Auth & rate limiting
- All endpoints except `/health` require a valid `X-API-Key` (or Bearer for the OpenAI-compatible routes). `API_KEYS` is a JSON object mapping keys → user IDs. Note this contradicts `AUTH_SETUP.md`, which lists `/stats` and `/models` as public — the code is authoritative.
- Rate limits via `slowapi` are set per endpoint in `main.py` decorators: chat 30/min, writes 60/min, reads 120/min, bulk user-delete 10/min. Keyed by API key (fallback to remote IP).
- Memory ownership is checked via `mem0_manager.verify_memory_ownership` (O(1) `Memory.get(memory_id)`) — use this rather than fetching all user memories.
### Config field aliases
`backend/config.py` uses Pydantic `AliasChoices` so both `OPENAI_API_KEY` and `OPENAI_COMPAT_API_KEY` (and `OPENAI_BASE_URL` / `OPENAI_COMPAT_BASE_URL`) populate the same field. `docker-compose.yml` passes `OPENAI_API_KEY`; `.env.example` documents `OPENAI_COMPAT_API_KEY`. Both work.
### Volumes mount the source in, but no hot reload
`docker-compose.yml` bind-mounts `./backend:/app` and `./frontend:/app/frontend`, and `uvicorn` runs with `--workers 4` and no `--reload`. Code edits become live only on container restart (`docker compose restart backend`).
### Logging
Use `structlog.get_logger(__name__)` with **keyword arguments** (e.g. `logger.info("msg", user_id=x)`). The last few commits explicitly fixed places that mixed stdlib `logging` (which silently drops kwargs) with structlog. Don't reintroduce `logging.getLogger` in this codebase.

View file

@ -1,4 +1,4 @@
FROM python:3.12-slim
FROM python:3.13-slim
# Set working directory
WORKDIR /app

View file

@ -130,39 +130,6 @@ async def get_current_user_openai(
return auth_service.verify_api_key(api_key)
async def get_current_user_platform(
authorization: Optional[str] = Header(None),
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
) -> str:
"""FastAPI dependency for the mem0 platform SDK (``MemoryClient``).
``MemoryClient`` authenticates with ``Authorization: Token <key>``. We also
accept ``Bearer <key>`` and ``X-API-Key`` so the same routes work from curl
and OpenAI-style clients. Resolves to the mapped user_id (raises 401 if the
key is missing or invalid).
"""
api_key = None
if authorization:
scheme, _, rest = authorization.partition(" ")
if rest and scheme.lower() in ("token", "bearer"):
api_key = rest.strip()
else:
# Bare value with no recognised scheme — treat the whole header as the key.
api_key = authorization.strip()
if not api_key and x_api_key:
api_key = x_api_key
if not api_key:
logger.warning("No API key provided in Authorization or X-API-Key headers")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing API key. Provide 'Authorization: Token <key>' (mem0 SDK), 'Bearer <key>', or 'X-API-Key'.",
)
return auth_service.verify_api_key(api_key)
async def verify_user_access(
api_key: str = Security(api_key_header), user_id: Optional[str] = None
) -> str:

View file

@ -44,6 +44,20 @@ class Settings(BaseSettings):
),
)
# Neo4j Configuration
neo4j_uri: str = Field(
default="bolt://localhost:7687",
validation_alias=AliasChoices("NEO4J_URI", "neo4j_uri"),
)
neo4j_username: str = Field(
default="neo4j",
validation_alias=AliasChoices("NEO4J_USERNAME", "neo4j_username"),
)
neo4j_password: str = Field(
default="mem0_neo4j_password",
validation_alias=AliasChoices("NEO4J_PASSWORD", "neo4j_password"),
)
# Application Configuration
log_level: str = Field(
default="INFO", validation_alias=AliasChoices("LOG_LEVEL", "log_level")

View file

@ -11,6 +11,7 @@ from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends, Security,
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
import structlog
import asyncio
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
@ -28,8 +29,6 @@ def get_rate_limit_key(request: Request) -> str:
limiter = Limiter(key_func=get_rate_limit_key)
import httpx
from models import (
ChatRequest,
MemoryAddRequest,
@ -44,10 +43,13 @@ from models import (
GlobalStatsResponse,
UserStatsResponse,
OpenAIChatCompletionRequest,
OpenAIChatCompletionResponse,
OpenAIChoice,
OpenAIChoiceMessage,
OpenAIUsage,
)
from mem0_manager import mem0_manager
from auth import get_current_user, get_current_user_openai, auth_service
from platform_compat import router as platform_router
# Configure structured logging
structlog.configure(
@ -107,20 +109,14 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.error(f"Error stopping MCP session manager: {e}")
# Close the async HTTP client used by the /v1/chat/completions proxy.
try:
await mem0_manager.aclose()
except Exception as e:
logger.warning(f"mem0_manager aclose failed: {e}")
logger.info("Shutting down Mem0 Interface POC")
# Initialize FastAPI app
app = FastAPI(
title="Mem0 Interface POC",
description="Minimal Mem0 interface backed by Qdrant (mem0ai v2 hybrid-search pipeline)",
version="2.0.0",
description="Minimal but fully functional Mem0 interface with PostgreSQL and Neo4j integration",
version="1.0.0",
lifespan=lifespan,
)
@ -340,59 +336,135 @@ async def chat_with_memory(
)
@app.post("/v1/chat/completions", response_model=None)
@app.post("/chat/completions", response_model=None)
async def stream_openai_response(
completion_id: str, model: str, content: str, created: int
):
"""Generate SSE stream for OpenAI-compatible streaming by chunking the response."""
import uuid
# First chunk with role
chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": ""},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(chunk)}\n\n"
# Stream content in chunks (3 words at a time for smooth effect)
words = content.split()
chunk_size = 3
for i in range(0, len(words), chunk_size):
word_chunk = " ".join(words[i : i + chunk_size])
if i + chunk_size < len(words):
word_chunk += " "
chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{"index": 0, "delta": {"content": word_chunk}, "finish_reason": None}
],
}
yield f"data: {json.dumps(chunk)}\n\n"
await asyncio.sleep(0.05)
# Final chunk
chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"
@app.post("/v1/chat/completions")
@app.post("/chat/completions")
@limiter.limit("30/minute")
async def openai_chat_completions(
request: Request,
completion_request: OpenAIChatCompletionRequest,
authenticated_user: str = Depends(get_current_user_openai),
):
"""OpenAI-compatible chat completions — pass-through proxy with memory injection.
Forwards the request to the upstream LLM verbatim (preserving tool_calls,
reasoning_tokens, system_fingerprint, finish_reason, etc.) and optionally
prepends a system message with relevant memories when the last role is
`user` and no tool flow is in progress. See
`mem0_manager.openai_proxy_completion` for the full injection rule and
tool-call safety contract.
"""
"""OpenAI-compatible chat completions endpoint with mem0 memory integration."""
try:
request_kwargs = completion_request.model_dump(exclude_unset=True)
# `user` is the OpenAI client-supplied identifier; we always derive the
# real user from the API key. Strip it before forwarding so it can't
# confuse upstream user-tracking.
request_kwargs.pop("user", None)
import uuid
user_id = authenticated_user
logger.info(
"openai_chat_completions",
user_id=authenticated_user,
stream=bool(request_kwargs.get("stream")),
has_tools=bool(request_kwargs.get("tools")),
model=request_kwargs.get("model"),
f"OpenAI chat completion for user: {user_id} (streaming={completion_request.stream})"
)
result = await mem0_manager.openai_proxy_completion(
request_kwargs=request_kwargs,
user_id=authenticated_user,
# Extract last user message
user_messages = [
m for m in completion_request.messages if m.get("role") == "user"
]
if not user_messages:
raise HTTPException(
status_code=400,
detail="No user messages provided. Include at least one message with role='user'.",
)
if request_kwargs.get("stream"):
last_message = user_messages[-1].get("content", "")
context = (
completion_request.messages[:-1]
if len(completion_request.messages) > 1
else None
)
# Call chat_with_memory
result = await mem0_manager.chat_with_memory(
message=last_message,
user_id=user_id,
context=context,
)
completion_id = f"chatcmpl-{uuid.uuid4().hex[:24]}"
created_time = int(time.time())
assistant_content = result.get("response", "")
if completion_request.stream:
return StreamingResponse(
result,
stream_openai_response(
completion_id=completion_id,
model=settings.default_model,
content=assistant_content,
created=created_time,
),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
return result
except httpx.HTTPStatusError as e:
# Forward upstream error body to the client with its original status
try:
detail = e.response.json()
except Exception:
detail = {"error": {"message": e.response.text[:500]}}
raise HTTPException(status_code=e.response.status_code, detail=detail)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
else:
return OpenAIChatCompletionResponse(
id=completion_id,
object="chat.completion",
created=created_time,
model=settings.default_model,
choices=[
OpenAIChoice(
index=0,
message=OpenAIChoiceMessage(
role="assistant", content=assistant_content
),
finish_reason="stop",
)
],
usage=OpenAIUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0),
)
except HTTPException:
raise
except Exception as e:
@ -463,7 +535,7 @@ async def search_memories(
query=search_request.query,
user_id=search_request.user_id,
limit=search_request.limit,
threshold=search_request.threshold or 0.1,
threshold=search_request.threshold or 0.2,
filters=search_request.filters,
agent_id=search_request.agent_id,
run_id=search_request.run_id,
@ -480,6 +552,7 @@ async def search_memories(
detail="An internal error occurred. Please try again later.",
)
@app.get("/memories/{user_id}")
@limiter.limit("120/minute")
async def get_user_memories(
@ -562,6 +635,7 @@ async def update_memory(
detail="An internal error occurred. Please try again later.",
)
@app.delete("/memories/{memory_id}")
@limiter.limit("60/minute")
async def delete_memory(
@ -626,15 +700,13 @@ async def delete_user_memories(
)
# Graph relationships endpoint - DEPRECATED in mem0 v2 (OSS graph memory removed).
# Returns an empty payload with `deprecated: true` so the frontend can render a
# clear "Graph view unavailable" state. Kept for client compatibility.
@app.get("/graph/relationships/{user_id}", deprecated=True)
# Graph relationships endpoint - pure Mem0 passthrough
@app.get("/graph/relationships/{user_id}")
@limiter.limit("60/minute")
async def get_graph_relationships(
request: Request, user_id: str, authenticated_user: str = Depends(get_current_user)
):
"""Get graph relationships - DEPRECATED: mem0 v2 removed OSS graph memory."""
"""Get graph relationships - pure Mem0 passthrough."""
try:
# Verify user can only access their own graph relationships
if authenticated_user != user_id:
@ -831,12 +903,6 @@ async def get_active_users(
}
# mem0 platform SDK (MemoryClient) compatibility routes:
# /v1/ping/, /v3/memories/{add,search}/, /v3/memories/, /v1/memories/*, /v1/entities/
app.include_router(platform_router)
# Mount MCP server at /mcp endpoint
try:
from mcp_server import create_mcp_app

View file

@ -151,8 +151,15 @@ async def remove_memory(
user_id = get_authenticated_user()
logger.info(f"MCP remove_memory: user={user_id}, memory_id={memory_id}")
# O(1) ownership check via Memory.get(); avoids get_all's v2 top_k=20 cap.
if not await mem0_manager.verify_memory_ownership(memory_id, user_id):
# Verify ownership: get user's memories and check if memory_id exists
user_memories = await mem0_manager.get_user_memories(
user_id=user_id,
limit=10000 # Get all to check ownership
)
memory_ids = {m.get("id") for m in user_memories if m.get("id")}
if memory_id not in memory_ids:
raise ValueError(f"Memory '{memory_id}' not found or access denied")
result = await mem0_manager.delete_memory(memory_id=memory_id)

View file

@ -1,12 +1,8 @@
"""Ultra-minimal Mem0 Manager - Pure Mem0 + Custom OpenAI Endpoint Only."""
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any, AsyncIterator, Union
from typing import Dict, List, Optional, Any
from datetime import datetime
import httpx
from mem0 import Memory
from openai import OpenAI
from tenacity import (
@ -23,7 +19,7 @@ from monitoring import timed
logger = structlog.get_logger(__name__)
# Retry decorator for database operations (Qdrant)
# Retry decorator for database operations (Qdrant, Neo4j)
db_retry = retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
@ -32,11 +28,7 @@ db_retry = retry(
reraise=True,
)
# Monkey-patch Mem0's OpenAI LLM to clear top_p when the configured LLM
# is Claude reached via an OpenAI-compatible endpoint: Claude rejects top_p
# whenever temperature is set, and OpenAILLM sends both unconditionally.
# (The 'store' branch is now redundant in mem0ai>=2.0.0 — upstream made it
# opt-in — but harmless; kept for safety.)
# Monkey-patch Mem0's OpenAI LLM to remove the 'store' parameter for LiteLLM compatibility
from mem0.llms.openai import OpenAILLM
_original_generate_response = OpenAILLM.generate_response
@ -45,8 +37,10 @@ _original_generate_response = OpenAILLM.generate_response
def patched_generate_response(
self, messages, response_format=None, tools=None, tool_choice="auto", **kwargs
):
# Remove 'store' parameter as LiteLLM doesn't support it
if hasattr(self.config, "store"):
self.config.store = None
# Remove 'top_p' to avoid conflict with temperature for Claude models
if hasattr(self.config, "top_p"):
self.config.top_p = None
return _original_generate_response(
@ -55,77 +49,7 @@ def patched_generate_response(
OpenAILLM.generate_response = patched_generate_response
logger.info("Applied Claude/OpenAI-compatible patch: cleared top_p (and store)")
def _build_filters(
user_id: Optional[str],
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Build the filters dict required by mem0 v2 search/get_all.
In mem0 v2.x, user_id/agent_id/run_id are rejected as top-level kwargs
on Memory.search and Memory.get_all they must live inside `filters`.
"""
merged: Dict[str, Any] = dict(extra) if extra else {}
if user_id is not None:
merged["user_id"] = user_id
if agent_id is not None:
merged["agent_id"] = agent_id
if run_id is not None:
merged["run_id"] = run_id
return merged
def _extract_text(content: Any) -> str:
"""Extract text from an OpenAI message `content` field.
`content` can be a plain string OR a list of multi-part objects
(e.g., [{"type": "text", "text": "..."}, {"type": "image_url", ...}]).
Returns concatenated text parts, or "" if no text is present.
"""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for p in content:
if isinstance(p, dict) and p.get("type") == "text":
t = p.get("text")
if isinstance(t, str):
parts.append(t)
return "\n".join(parts)
return ""
# Appended as the "## Custom Instructions" section of the additive-extraction
# prompt (mem0/configs/prompts.py::generate_additive_extraction_prompt). The
# default few-shot bias is consumer-organizer ("favourite movies", "SF restaurants"),
# which under-extracts on the work/project/relationship traffic this deployment
# actually sees. This re-prioritizes without replacing mem0's structural guidance.
CUSTOM_FACT_EXTRACTION_INSTRUCTIONS = """
This memory store serves a working assistant engineering, product, and operational contexts plus the user's people and recurring life context. Prioritize accordingly:
HIGH-VALUE facts to capture:
- Work context: company, team, role; ongoing projects with goals/status/blockers; product or domain knowledge being built; tools/frameworks/languages in active use; technical decisions and the reasoning; recurring meetings or rituals.
- People in the user's orbit: colleagues, family, friends, mentors — names, relationships, roles, what they do, the current state of the relationship or shared context.
- Recurring personal context: home/work locations, regular schedule, standing commitments, durable preferences (food restrictions, working hours, communication style), planned events with dates.
- Acquired knowledge: concepts being studied or built, specific problems being solved, prior solutions tried and their outcomes.
LOWER-PRIORITY (extract only if they reveal a pattern or future relevance):
- Single transient states ("running 5 minutes late", "didn't sleep well") capture only if they recur or signal a habit.
- Movies, music, restaurants, hobbies only when noted as durable preferences or part of a recurring activity, not when mentioned in passing.
SKIP entirely:
- Generic world knowledge (timezones, capital cities, definitions) the assistant already knows these.
- Greetings, acknowledgments, meta-conversation ("Thanks!", "Got it").
- Restatements or paraphrases of facts already in Existing Memories or Recently Extracted Memories.
Prefer specificity. "Pratik uses FastAPI for backend services" beats "Pratik does backend development." When a person is mentioned by a short name or nickname, capture the relationship if known ("Anushree is Pratik's wife") so future references resolve correctly.
""".strip()
logger.info("Applied LiteLLM compatibility patch: disabled 'store' parameter")
class Mem0Manager:
@ -135,16 +59,18 @@ class Mem0Manager:
"""
def __init__(self):
# Custom endpoint configuration with graph memory enabled
logger.info(
"Initializing Mem0Manager with custom endpoint",
model=settings.default_model,
embedding_model=settings.embedding_model,
embedding_dims=settings.embedding_dims,
qdrant_host=settings.qdrant_host,
neo4j_uri=settings.neo4j_uri,
)
config = {
"version": "v1.1",
"custom_instructions": CUSTOM_FACT_EXTRACTION_INSTRUCTIONS,
"enable_graph": True,
"llm": {
"provider": "openai",
"config": {
@ -156,16 +82,10 @@ class Mem0Manager:
},
},
"embedder": {
# Route embeddings through the OpenAI-compatible LiteLLM proxy
# rather than Ollama directly — the proxy is reachable from the
# container in all deployments, Ollama may not be. The model
# name is the same (qwen3-embedding:4b-q8_0); existing vectors
# generated via this path stay compatible.
"provider": "openai",
"provider": "ollama",
"config": {
"model": settings.embedding_model,
"api_key": settings.openai_api_key,
"openai_base_url": settings.openai_base_url,
"ollama_base_url": settings.ollama_base_url,
"embedding_dims": settings.embedding_dims,
},
},
@ -179,18 +99,20 @@ class Mem0Manager:
"on_disk": True,
},
},
"graph_store": {
"provider": "neo4j",
"config": {
"url": settings.neo4j_uri,
"username": settings.neo4j_username,
"password": settings.neo4j_password,
},
},
"reranker": {
"provider": "cohere",
"config": {
"api_key": settings.cohere_api_key,
# v3.5 supersedes v3.0: 4096-token context, multilingual
# (our users include Hindi/Hinglish content that the
# English-only v3 silently underperforms on).
"model": "rerank-v3.5",
# Raised from 10 → 50 so the rerank output cap does not
# truncate below typical over-fetch sizes (see search calls
# below, which request top_k up to ~3× the user's limit).
"top_n": 50,
"model": "rerank-english-v3.0",
"top_n": 10,
},
},
}
@ -202,23 +124,8 @@ class Mem0Manager:
timeout=60.0, # 60 second timeout for LLM calls
max_retries=2, # Retry failed requests up to 2 times
)
# Async HTTP client used by openai_proxy_completion to forward
# /v1/chat/completions to the upstream endpoint verbatim (preserves
# tool_calls, reasoning_tokens, system_fingerprint, etc., which the
# openai-python SDK can drop). Closed in the FastAPI lifespan
# shutdown via aclose().
self.async_http = httpx.AsyncClient(
timeout=httpx.Timeout(120.0, connect=10.0)
)
logger.info("Initialized ultra-minimal Mem0Manager with custom endpoint")
async def aclose(self) -> None:
"""Release async resources. Called from the FastAPI lifespan shutdown."""
try:
await self.async_http.aclose()
except Exception as e:
logger.warning("async_http aclose failed", error=str(e))
# Pure passthrough methods - no custom logic
@db_retry
@timed("add_memories")
@ -301,21 +208,19 @@ class Mem0Manager:
"query": query,
"note": "Empty query provided, no results returned. Use a specific query to search memories.",
}
# mem0 v2: entity IDs must live inside the `filters` dict; `limit` is now `top_k`.
# Over-fetch a 3050-candidate pool so the Cohere reranker (rerank=True)
# has room to reorder; then truncate to the caller's requested limit.
overfetch = max(limit * 3, 30)
# Direct Mem0 search - trust native handling
result = self.memory.search(
query=query,
filters=_build_filters(user_id, agent_id, run_id, extra=filters),
top_k=overfetch,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
limit=limit,
threshold=threshold,
rerank=True,
filters=filters,
)
memories = result.get("results", [])[:limit]
return {
"memories": memories,
"total_count": len(memories),
"memories": result.get("results", []),
"total_count": len(result.get("results", [])),
"query": query,
}
except Exception as e:
@ -333,10 +238,13 @@ class Mem0Manager:
) -> List[Dict[str, Any]]:
"""Get all memories for a user - native Mem0 pattern."""
try:
# mem0 v2: entity IDs must live inside the `filters` dict; `limit` is now `top_k`.
# Direct Mem0 get_all call - trust native parameter handling
result = self.memory.get_all(
filters=_build_filters(user_id, agent_id, run_id, extra=filters),
top_k=limit,
user_id=user_id,
limit=limit,
agent_id=agent_id,
run_id=run_id,
filters=filters,
)
return result.get("results", [])
except Exception as e:
@ -415,14 +323,44 @@ class Mem0Manager:
run_id: Optional[str],
limit: int = 50,
) -> Dict[str, Any]:
"""Graph relationships — deprecated in mem0 v2 (OSS graph memory removed).
"""Get graph relationships - using correct Mem0 get_all() method."""
try:
# Use get_all() to retrieve memories with graph relationships
result = self.memory.get_all(
user_id=user_id, agent_id=agent_id, run_id=run_id, limit=limit
)
mem0 v2.0.0 deleted the OSS graph store (Neo4j/Memgraph/Kuzu/AGE drivers).
Entity relationships now influence ranking via a parallel `{collection}_entities`
Qdrant collection rather than being directly traversable. We return an empty
graph payload plus a `deprecated` marker so clients (frontend graph.html) can
render a clear "Graph view unavailable" state instead of erroring.
"""
# Extract relationships from Mem0's response structure
relationships = result.get("relations", [])
# For entities, we can derive them from memory results or relations
entities = []
if "results" in result:
# Extract unique entities from memories and relationships
entity_set = set()
# Add entities from relationships
for rel in relationships:
if "source" in rel:
entity_set.add(rel["source"])
if "target" in rel:
entity_set.add(rel["target"])
entities = [{"name": entity} for entity in entity_set]
return {
"relationships": relationships,
"entities": entities,
"user_id": user_id,
"agent_id": agent_id,
"run_id": run_id,
"total_memories": len(result.get("results", [])),
"total_relationships": len(relationships),
}
except Exception as e:
logger.error(f"Error getting graph relationships: {e}")
# Return empty but structured response on error
return {
"relationships": [],
"entities": [],
@ -431,11 +369,7 @@ class Mem0Manager:
"run_id": run_id,
"total_memories": 0,
"total_relationships": 0,
"deprecated": True,
"deprecation_note": (
"OSS graph memory was removed in mem0 v2.0.0. Use search/get_all for "
"memory retrieval; entity links now affect ranking only."
),
"error": str(e),
}
@timed("chat_with_memory")
@ -456,16 +390,15 @@ class Mem0Manager:
logger.info("Starting chat request", user_id=user_id)
search_start_time = time.time()
# Over-fetch for the Cohere reranker (rerank=True), then keep the
# top 10 reranked memories for the system prompt.
search_result = self.memory.search(
query=message,
filters=_build_filters(user_id, agent_id, run_id),
top_k=30,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
limit=10,
threshold=0.3,
rerank=True,
)
relevant_memories = search_result.get("results", [])[:10]
relevant_memories = search_result.get("results", [])
memories_str = "\n".join(
f"- {entry['memory']}" for entry in relevant_memories
)
@ -491,11 +424,7 @@ class Mem0Manager:
response = self.openai_client.chat.completions.create(
model=settings.default_model, messages=messages
)
# Strip leading whitespace — reasoning models (minimax-m2) leak
# blank lines from their reasoning output into visible content.
# lstrip(), not strip(), preserves intentional trailing whitespace
# inside content (e.g., inside a code block).
assistant_response = (response.choices[0].message.content or "").lstrip()
assistant_response = response.choices[0].message.content
llm_time = time.time() - llm_start_time
logger.debug(
"LLM call completed",
@ -549,261 +478,6 @@ class Mem0Manager:
"model_used": None,
}
# ------------------------------------------------------------------
# OpenAI-compatible /v1/chat/completions proxy
# ------------------------------------------------------------------
# Design: forward client requests to the upstream endpoint verbatim,
# injecting a "Relevant memories" system message ONLY when the last
# message is from the user AND no tool flow is in progress. The
# opinionated chat_with_memory above is NOT used on this path because:
# (a) clients pick their own model, tools, response_format etc.,
# (b) we must preserve every upstream field (tool_calls, reasoning_tokens,
# system_fingerprint, finish_reason), which the openai-python SDK and
# typed Pydantic responses strip silently,
# (c) streaming must be real (SSE pass-through), not post-hoc word-chunking.
#
# Tool-call safety contract (Memori issue #434 — codified here):
# - We only PREPEND a system message; never mutate, reorder, or delete
# existing messages.
# - We never touch tool messages or tool_call_id values.
# - We skip injection on tool-result follow-ups (last role != "user").
# - We always run the post-stream mem0.add even if upstream errors
# mid-flight (via try/finally).
async def openai_proxy_completion(
self,
request_kwargs: Dict[str, Any],
user_id: str,
) -> Union[Dict[str, Any], AsyncIterator[bytes]]:
"""Proxy a chat-completions request to the upstream LLM.
Returns:
- dict (upstream JSON) when stream is falsy
- async iterator of SSE bytes when stream is truthy
"""
messages = request_kwargs.get("messages") or []
if not messages:
raise ValueError("messages array is empty")
# Injection rule: last role == 'user' AND no prior tool message.
last_msg = messages[-1] if isinstance(messages[-1], dict) else {}
last_role = last_msg.get("role")
has_tool_message = any(
isinstance(m, dict) and m.get("role") == "tool" for m in messages
)
inject_memory = last_role == "user" and not has_tool_message
if inject_memory:
last_user_text = _extract_text(last_msg.get("content"))
if last_user_text:
try:
search_result = self.memory.search(
query=last_user_text,
filters=_build_filters(user_id),
top_k=30,
threshold=0.3,
rerank=True,
)
relevant = (search_result.get("results") or [])[:10]
except Exception as e:
logger.warning(
"memory search failed; proceeding without injection",
error=str(e),
user_id=user_id,
)
relevant = []
if relevant:
memories_str = "\n".join(
f"- {m.get('memory', '')}" for m in relevant
)
mem_block = (
"Relevant memories about the user "
"(use when helpful, ignore otherwise):\n" + memories_str
)
# Merge into existing leading system message if present;
# otherwise prepend a new one. Either way we never mutate
# any other message in the list.
if (
messages
and isinstance(messages[0], dict)
and messages[0].get("role") == "system"
):
existing = _extract_text(messages[0].get("content")) or ""
merged = (existing + "\n\n" + mem_block) if existing else mem_block
messages = [
{"role": "system", "content": merged},
*messages[1:],
]
else:
messages = [
{"role": "system", "content": mem_block},
*messages,
]
request_kwargs["messages"] = messages
logger.info(
"memory injected",
user_id=user_id,
memory_count=len(relevant),
)
url = settings.openai_base_url.rstrip("/") + "/v1/chat/completions"
headers = {
"Authorization": f"Bearer {settings.openai_api_key}",
"Content-Type": "application/json",
}
is_stream = bool(request_kwargs.get("stream"))
if not is_stream:
resp = await self.async_http.post(
url, json=request_kwargs, headers=headers
)
if resp.status_code >= 400:
# Surface upstream error body to the client unchanged
try:
err_body = resp.json()
except Exception:
err_body = {"error": {"message": resp.text[:500]}}
raise httpx.HTTPStatusError(
f"Upstream {resp.status_code}", request=resp.request, response=resp
)
data = resp.json()
# Fire-and-forget mem0.add in the background. Skips tool-only
# responses inside _post_completion_add.
asyncio.create_task(self._post_completion_add(messages, data, user_id))
return data
return self._stream_proxy(url, request_kwargs, headers, messages, user_id)
async def _stream_proxy(
self,
url: str,
request_kwargs: Dict[str, Any],
headers: Dict[str, str],
messages: List[Dict[str, Any]],
user_id: str,
) -> AsyncIterator[bytes]:
"""Stream upstream SSE bytes verbatim; accumulate content for post-stream add."""
accumulated_content: List[str] = []
saw_tool_calls = False
buffer = b""
try:
async with self.async_http.stream(
"POST", url, json=request_kwargs, headers=headers
) as upstream:
if upstream.status_code >= 400:
body_bytes = await upstream.aread()
err = {
"error": {
"message": (
f"Upstream {upstream.status_code}: "
f"{body_bytes.decode('utf-8', errors='replace')[:500]}"
)
}
}
yield f"data: {json.dumps(err)}\n\n".encode("utf-8")
yield b"data: [DONE]\n\n"
return
async for chunk in upstream.aiter_bytes():
if not chunk:
continue
# Forward bytes verbatim — preserves the exact SSE wire format
yield chunk
# Side-channel parse for content/tool_calls accumulation.
# Events are \n\n-separated; data lines start with "data: ".
buffer += chunk
while b"\n\n" in buffer:
event_bytes, buffer = buffer.split(b"\n\n", 1)
for raw_line in event_bytes.split(b"\n"):
if not raw_line.startswith(b"data: "):
continue
payload = raw_line[6:].strip()
if not payload or payload == b"[DONE]":
continue
try:
obj = json.loads(payload)
delta = (
(obj.get("choices") or [{}])[0].get("delta") or {}
)
content_piece = delta.get("content")
if isinstance(content_piece, str):
accumulated_content.append(content_piece)
if delta.get("tool_calls"):
saw_tool_calls = True
except (json.JSONDecodeError, KeyError, IndexError, TypeError):
pass
except httpx.HTTPError as e:
logger.warning("upstream stream error", error=str(e), user_id=user_id)
err = {"error": {"message": f"Upstream stream error: {e}"}}
yield f"data: {json.dumps(err)}\n\n".encode("utf-8")
yield b"data: [DONE]\n\n"
finally:
# Per Memori #434: post-stream mem0.add must run even on mid-stream
# error so partial content is captured. Skipped for tool-only or
# empty responses.
full = "".join(accumulated_content).lstrip()
if full and not saw_tool_calls:
last_user_text = (
_extract_text(messages[-1].get("content"))
if messages
and isinstance(messages[-1], dict)
and messages[-1].get("role") == "user"
else None
)
if last_user_text:
try:
# Synchronous mem0.add in a thread to avoid blocking
# the response loop after StreamingResponse closes.
await asyncio.to_thread(
self.memory.add,
[
{"role": "user", "content": last_user_text},
{"role": "assistant", "content": full},
],
user_id=user_id,
)
except Exception as e:
logger.warning(
"post-stream mem0.add failed",
error=str(e),
user_id=user_id,
)
async def _post_completion_add(
self,
messages: List[Dict[str, Any]],
response_data: Dict[str, Any],
user_id: str,
) -> None:
"""Background mem0.add after a non-stream completion."""
try:
choice = (response_data.get("choices") or [{}])[0]
msg = choice.get("message") or {}
content = msg.get("content")
if not content or not isinstance(content, str):
return # tool-only or non-text completion — skip
content = content.lstrip()
last_user_text = (
_extract_text(messages[-1].get("content"))
if messages
and isinstance(messages[-1], dict)
and messages[-1].get("role") == "user"
else None
)
if not last_user_text:
return
await asyncio.to_thread(
self.memory.add,
[
{"role": "user", "content": last_user_text},
{"role": "assistant", "content": content},
],
user_id=user_id,
)
except Exception as e:
logger.warning(
"post-completion mem0.add failed", error=str(e), user_id=user_id
)
async def health_check(self) -> Dict[str, str]:
"""Basic health check - just connectivity."""
status = {}
@ -817,9 +491,7 @@ class Mem0Manager:
# Check Mem0 memory
try:
self.memory.search(
query="test", filters={"user_id": "health_check"}, top_k=1
)
self.memory.search(query="test", user_id="health_check", limit=1)
status["mem0_memory"] = "healthy"
except Exception as e:
status["mem0_memory"] = f"unhealthy: {str(e)}"

View file

@ -1,7 +1,7 @@
"""Ultra-minimal Pydantic models for pure Mem0 API."""
from typing import List, Optional, Dict, Any, Union
from pydantic import BaseModel, ConfigDict, Field
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
import re
@ -230,56 +230,82 @@ class UserStatsResponse(BaseModel):
# OpenAI-Compatible API Models
#
# The /v1/chat/completions handler is a pass-through proxy: requests are
# forwarded to the upstream LLM and the upstream response is relayed verbatim
# (dict for non-stream, raw SSE bytes for stream). Hence only the REQUEST
# model lives here; the response is never re-typed (typed Pydantic response
# models silently drop unknown fields like tool_calls / refusal /
# reasoning_tokens — see the audit in the plan file).
class OpenAIMessage(BaseModel):
"""OpenAI message format."""
role: str = Field(..., description="Message role (system, user, assistant)")
content: str = Field(..., description="Message content")
class OpenAIChatCompletionRequest(BaseModel):
"""OpenAI chat completion request — permissive schema, forwarded as-is.
"""OpenAI chat completion request format."""
Only `model` and `messages` are required. All other standard OpenAI
parameters are typed (for client IDE/docs benefit) but optional. Unknown
fields are accepted via `extra="allow"` and forwarded to upstream, so
new OpenAI parameters don't require a code change here.
"""
model_config = ConfigDict(extra="allow")
model: str = Field(..., description="Model to use (forwarded to upstream)")
messages: List[Dict[str, Any]] = Field(
..., description="Messages array (multi-part content supported)"
model: str = Field(..., description="Model to use (will use configured default)")
messages: List[Dict[str, str]] = Field(..., description="List of messages")
temperature: Optional[float] = Field(0.7, description="Sampling temperature")
max_tokens: Optional[int] = Field(None, description="Maximum tokens to generate")
stream: Optional[bool] = Field(False, description="Whether to stream responses")
top_p: Optional[float] = Field(1.0, description="Nucleus sampling parameter")
n: Optional[int] = Field(1, description="Number of completions to generate")
stop: Optional[List[str]] = Field(None, description="Stop sequences")
presence_penalty: Optional[float] = Field(0, description="Presence penalty")
frequency_penalty: Optional[float] = Field(0, description="Frequency penalty")
user: Optional[str] = Field(
None, description="User identifier (ignored, uses API key)"
)
# Common params (typed for IDE/docs; all optional)
temperature: Optional[float] = None
top_p: Optional[float] = None
n: Optional[int] = None
stream: Optional[bool] = None
stream_options: Optional[Dict[str, Any]] = None
stop: Optional[Union[str, List[str]]] = None
max_tokens: Optional[int] = None # deprecated; still accepted
max_completion_tokens: Optional[int] = None # replaces max_tokens
presence_penalty: Optional[float] = None
frequency_penalty: Optional[float] = None
seed: Optional[int] = None
logprobs: Optional[bool] = None
top_logprobs: Optional[int] = None
response_format: Optional[Dict[str, Any]] = None
tools: Optional[List[Dict[str, Any]]] = None
tool_choice: Optional[Union[str, Dict[str, Any]]] = None
parallel_tool_calls: Optional[bool] = None
reasoning_effort: Optional[str] = None # o-series / reasoning models
modalities: Optional[List[str]] = None
audio: Optional[Dict[str, Any]] = None
prediction: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
store: Optional[bool] = None
service_tier: Optional[str] = None
logit_bias: Optional[Dict[str, float]] = None
# `user` is ignored — the authenticated user is derived from the API key.
user: Optional[str] = None
class OpenAIUsage(BaseModel):
"""Token usage information."""
prompt_tokens: int = Field(..., description="Tokens in the prompt")
completion_tokens: int = Field(..., description="Tokens in the completion")
total_tokens: int = Field(..., description="Total tokens used")
class OpenAIChoiceMessage(BaseModel):
"""Message in a choice."""
role: str = Field(..., description="Role of the message")
content: str = Field(..., description="Content of the message")
class OpenAIChoice(BaseModel):
"""Individual completion choice."""
index: int = Field(..., description="Choice index")
message: OpenAIChoiceMessage = Field(..., description="Message content")
finish_reason: str = Field(..., description="Reason for completion finish")
class OpenAIChatCompletionResponse(BaseModel):
"""OpenAI chat completion response format."""
id: str = Field(..., description="Unique completion ID")
object: str = Field(default="chat.completion", description="Object type")
created: int = Field(..., description="Unix timestamp of creation")
model: str = Field(..., description="Model used for completion")
choices: List[OpenAIChoice] = Field(..., description="List of completion choices")
usage: Optional[OpenAIUsage] = Field(None, description="Token usage information")
# Streaming-specific models
class OpenAIStreamDelta(BaseModel):
"""Delta content in a streaming chunk."""
role: Optional[str] = Field(None, description="Role (only in first chunk)")
content: Optional[str] = Field(None, description="Incremental content")
class OpenAIStreamChoice(BaseModel):
"""Individual streaming choice."""
index: int = Field(..., description="Choice index")
delta: OpenAIStreamDelta = Field(..., description="Delta content")
finish_reason: Optional[str] = Field(
None, description="Reason for completion finish"
)

View file

@ -1,224 +0,0 @@
"""mem0 platform API compatibility layer.
Implements the subset of the hosted mem0 platform API that the ``mem0ai``
``MemoryClient`` (pinned ``mem0ai==2.0.2``) actually calls, so
``MemoryClient(host="https://memory.pratikn.com", api_key=...)`` works against
this self-hosted server. Each platform route maps onto the existing
``mem0_manager`` singleton.
Contract notes (verified against the installed SDK source):
- Auth header is ``Authorization: Token <key>`` (handled by
``get_current_user_platform``, which also accepts Bearer / X-API-Key).
- Core ops use ``/v3/memories/*``; item ops use ``/v1/memories/*``; all paths
carry a trailing slash and are registered here at the exact path the SDK calls
(so FastAPI matches exactly and never issues a slash redirect).
- ``GET /v1/ping/`` runs at client construction and MUST return non-empty
``org_id`` and ``project_id`` or the SDK's ``Project(...)`` raises.
- Scoping (user_id/agent_id/run_id) is carried in ``filters`` (search/get_all),
the top-level body (add), or the query string (delete_all). It defaults to the
authenticated user; a mismatch is rejected with 403 (same model as the native
endpoints).
"""
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Request
import structlog
from auth import get_current_user_platform
from mem0_manager import mem0_manager
logger = structlog.get_logger(__name__)
router = APIRouter(tags=["mem0-platform-compat"])
def _require_self(requested: Optional[str], authed: str) -> str:
"""Return the user_id to operate on: default to the authenticated user,
reject a mismatch with 403 (consistent with the native endpoints)."""
if not requested:
return authed
if requested != authed:
raise HTTPException(
status_code=403,
detail=f"Access denied: you can only access your own memories (authenticated as '{authed}')",
)
return authed
async def _json_body(request: Request) -> Dict[str, Any]:
"""Parse the JSON body defensively (the SDK sends varied shapes)."""
try:
body = await request.json()
except Exception:
body = None
return body if isinstance(body, dict) else {}
def _split_filters(body: Dict[str, Any], authed: str):
"""Pull the scoping IDs out of the SDK's ``filters`` object and return
(target_user, agent_id, run_id, remaining_filters)."""
filters = dict(body.get("filters") or {})
target = _require_self(filters.pop("user_id", None), authed)
agent_id = filters.pop("agent_id", None)
run_id = filters.pop("run_id", None)
filters.pop("app_id", None) # accepted by the SDK; unused here
return target, agent_id, run_id, (filters or None)
async def _owned_or_404(memory_id: str, user: str) -> None:
if not await mem0_manager.verify_memory_ownership(memory_id, user):
raise HTTPException(status_code=404, detail=f"Memory '{memory_id}' not found")
@router.get("/v1/ping/")
async def ping(user: str = Depends(get_current_user_platform)) -> Dict[str, Any]:
"""Client construction validation. ``org_id``/``project_id`` MUST be
non-empty or the SDK's ``Project(...)`` raises 'org_id and project_id must
be set'."""
return {
"status": "ok",
"user_email": user,
"org_id": "default-org",
"project_id": "default-project",
}
@router.post("/v3/memories/add/")
async def add_memories(
request: Request, user: str = Depends(get_current_user_platform)
) -> Dict[str, Any]:
body = await _json_body(request)
messages = body.get("messages")
if not messages:
raise HTTPException(status_code=422, detail="`messages` is required")
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
target = _require_self(body.get("user_id"), user)
raw = await mem0_manager.add_memories(
messages=messages,
user_id=target,
agent_id=body.get("agent_id"),
run_id=body.get("run_id"),
metadata=body.get("metadata"),
)
# mem0_manager wraps the mem0 result as {"added_memories": [<mem0 dict>], ...};
# the mem0 dict is already {"results": [...]} (the platform shape).
added = raw.get("added_memories") or []
if added and isinstance(added[0], dict) and "results" in added[0]:
return added[0]
return {"results": added}
@router.post("/v3/memories/search/")
async def search_memories(
request: Request, user: str = Depends(get_current_user_platform)
) -> Dict[str, Any]:
body = await _json_body(request)
query = body.get("query")
if not query:
raise HTTPException(status_code=422, detail="`query` is required")
target, agent_id, run_id, extra = _split_filters(body, user)
top_k = body.get("top_k") or body.get("limit") or 10
result = await mem0_manager.search_memories(
query=query,
user_id=target,
limit=int(top_k),
filters=extra,
agent_id=agent_id,
run_id=run_id,
)
return {"results": result.get("memories", [])}
@router.post("/v3/memories/")
async def get_all_memories(
request: Request, user: str = Depends(get_current_user_platform)
) -> Dict[str, Any]:
body = await _json_body(request)
target, agent_id, run_id, extra = _split_filters(body, user)
# The SDK sends page/page_size as query params. mem0's get_all has no offset,
# so we fetch up to page*page_size and slice the requested page.
try:
page = max(int(request.query_params.get("page", 1)), 1)
page_size = min(max(int(request.query_params.get("page_size", 100)), 1), 1000)
except ValueError:
page, page_size = 1, 100
items = await mem0_manager.get_user_memories(
user_id=target,
limit=page * page_size,
agent_id=agent_id,
run_id=run_id,
filters=extra,
)
total = len(items)
start = (page - 1) * page_size
return {
"count": total,
"next": page + 1 if start + page_size < total else None,
"previous": page - 1 if page > 1 else None,
"results": items[start : start + page_size],
}
@router.get("/v1/memories/{memory_id}/")
async def get_memory(
memory_id: str, user: str = Depends(get_current_user_platform)
) -> Dict[str, Any]:
await _owned_or_404(memory_id, user)
mem = await mem0_manager.get_memory(memory_id)
if mem is None:
raise HTTPException(status_code=404, detail=f"Memory '{memory_id}' not found")
return mem
@router.put("/v1/memories/{memory_id}/")
async def update_memory(
memory_id: str, request: Request, user: str = Depends(get_current_user_platform)
) -> Dict[str, Any]:
await _owned_or_404(memory_id, user)
body = await _json_body(request)
text = body.get("text") or body.get("memory") or body.get("data")
if not text:
raise HTTPException(
status_code=422, detail="`text` is required to update a memory"
)
return await mem0_manager.update_memory(memory_id=memory_id, content=text)
@router.delete("/v1/memories/{memory_id}/")
async def delete_memory(
memory_id: str, user: str = Depends(get_current_user_platform)
) -> Dict[str, Any]:
await _owned_or_404(memory_id, user)
return await mem0_manager.delete_memory(memory_id=memory_id)
@router.get("/v1/memories/{memory_id}/history/")
async def memory_history(
memory_id: str, user: str = Depends(get_current_user_platform)
) -> List[Dict[str, Any]]:
await _owned_or_404(memory_id, user)
result = await mem0_manager.get_memory_history(memory_id)
return result.get("history", [])
@router.delete("/v1/memories/")
async def delete_all_memories(
request: Request, user: str = Depends(get_current_user_platform)
) -> Dict[str, Any]:
target = _require_self(request.query_params.get("user_id"), user)
return await mem0_manager.delete_user_memories(user_id=target)
@router.get("/v1/entities/")
async def list_entities(
user: str = Depends(get_current_user_platform),
) -> Dict[str, Any]:
# This server is single-user-per-key; report the authenticated user as the
# only entity (the platform returns all users/agents/runs with memories).
return {"results": [{"id": user, "name": user, "type": "user"}], "count": 1}

View file

@ -4,14 +4,16 @@ uvicorn[standard]
python-multipart
# Mem0 and AI
mem0ai[nlp]==2.0.2
fastembed>=0.3.1
mem0ai
openai
google-genai
cohere
# Database
qdrant-client>=1.12.0
qdrant-client
neo4j
langchain-neo4j
rank-bm25
ollama
# Utilities

View file

@ -1,7 +1,7 @@
services:
# Qdrant vector database for vector + sparse (BM25) storage
# Qdrant vector database for vector storage
qdrant:
image: qdrant/qdrant:v1.18.1
image: qdrant/qdrant:latest
container_name: mem0-qdrant
expose:
- "6333"
@ -18,6 +18,36 @@ services:
retries: 5
restart: unless-stopped
# Neo4j with APOC for graph relationships
neo4j:
image: neo4j:5.26.4
container_name: mem0-neo4j
environment:
NEO4J_AUTH: ${NEO4J_AUTH:-neo4j/mem0_neo4j_password}
NEO4J_PLUGINS: '["apoc"]'
NEO4J_apoc_export_file_enabled: true
NEO4J_apoc_import_file_enabled: true
NEO4J_apoc_import_file_use__neo4j__config: true
NEO4J_ACCEPT_LICENSE_AGREEMENT: yes
NEO4J_dbms_security_procedures_unrestricted: apoc.*
NEO4J_dbms_security_procedures_allowlist: apoc.*
expose:
- "7474" # HTTP - Internal only
- "7687" # Bolt - Internal only
networks:
- mem0_network
volumes:
- neo4j_data:/data
- neo4j_logs:/logs
- neo4j_import:/var/lib/neo4j/import
- neo4j_plugins:/plugins
healthcheck:
test: ["CMD", "cypher-shell", "-u", "neo4j", "-p", "${NEO4J_PASSWORD:-mem0_neo4j_password}", "RETURN 1"]
interval: 10s
timeout: 10s
retries: 5
restart: unless-stopped
# Backend API service
backend:
build:
@ -32,6 +62,9 @@ services:
QDRANT_HOST: qdrant
QDRANT_PORT: 6333
QDRANT_COLLECTION_NAME: ${QDRANT_COLLECTION_NAME:-mem0}
NEO4J_URI: bolt://neo4j:7687
NEO4J_USERNAME: ${NEO4J_USERNAME:-neo4j}
NEO4J_PASSWORD: ${NEO4J_PASSWORD:-mem0_neo4j_password}
LOG_LEVEL: ${LOG_LEVEL:-INFO}
CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:3000}
DEFAULT_MODEL: ${DEFAULT_MODEL:-claude-sonnet-4}
@ -47,24 +80,20 @@ services:
depends_on:
qdrant:
condition: service_healthy
# Backend startup loads spaCy (NLP extra), initializes mem0 v2, mounts MCP,
# and warms 4 workers — needs ~30-60s. start_period below keeps healthcheck
# failures from counting (and from triggering willfarrell/autoheal) during
# boot.
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 90s
neo4j:
condition: service_healthy
restart: unless-stopped
volumes:
- ./backend:/app
- ./frontend:/app/frontend
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4", "--proxy-headers", "--forwarded-allow-ips", "*"]
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
volumes:
qdrant_data:
neo4j_data:
neo4j_logs:
neo4j_import:
neo4j_plugins:
networks:
mem0_network:

View file

@ -1,197 +0,0 @@
# Migration Runbook: mem0 v0.1.x/v1.x → v2.0.2 (the "V3 pipeline")
This runbook covers the **operational** half of the migration — backups, the
Qdrant collection rebuild for BM25, the Neo4j dump, cutover and rollback. The
**code-level** half (Dockerfile, requirements, mem0_manager rewrites, etc.) is
already committed on this branch; this document is what to follow when taking
those code changes to a stack that has live data.
## TL;DR
1. Snapshot Qdrant + dump Neo4j (Phase 2).
2. Deploy v2 backend to a scratch stack (Phase 3).
3. Rebuild the Qdrant collection with BM25 by warm-up-add → scroll/upsert →
swap (Phase 4).
4. Run integration tests (Phase 5).
5. Cutover production with the same steps inside a maintenance window (Phase 6).
## Phase 1 — Pre-flight
```bash
# 1. Capture the exact running mem0ai version (record for the ticket)
docker compose exec backend pip show mem0ai
# 2. Tag the pre-migration commit
git tag pre-mem0-v3-migration && git push --tags
# 3. Verify free disk on the volumes (snapshots can be sizeable)
docker system df -v | grep -E "qdrant_data|neo4j_data"
```
## Phase 2 — Backups (read-only on production)
### Qdrant snapshot
```bash
# Create snapshot of the live mem0 collection (qdrant runs inside the network as 'qdrant')
docker compose exec backend curl -X POST \
"http://qdrant:6333/collections/mem0/snapshots?wait=true"
# Returns JSON with the snapshot filename, e.g. mem0-XXXXXXXX.snapshot.
# Copy it off the qdrant container's volume:
docker compose exec qdrant ls /qdrant/storage/collections/mem0/snapshots/
mkdir -p ./backups/qdrant
docker cp mem0-qdrant:/qdrant/storage/collections/mem0/snapshots/<snapshot-file> ./backups/qdrant/
```
### Neo4j offline dump (decommission path)
Neo4j 5.x requires the database to be stopped to dump it.
```bash
mkdir -p ./backups/neo4j
docker compose stop neo4j
docker run --rm \
--volumes-from mem0-neo4j \
-v "$(pwd)/backups/neo4j:/dumps" \
neo4j:5.26.4 \
neo4j-admin database dump neo4j --to-path=/dumps
# (No need to restart neo4j — it is being decommissioned.)
```
Keep both backups for **at least 30 days** post-cutover. Calendar a reminder.
### Pre-cutover per-user memory counts
```bash
# Iterate API_KEYS users, hit /stats/{user_id}, save the count. Adjust per your auth.
for user in $(jq -r 'values | unique[]' <<< "$API_KEYS"); do
echo -n "$user: "
docker compose exec backend curl -s -H "X-API-Key: <admin-or-user-key>" \
"http://localhost:8000/stats/$user" | jq -r '.memory_count // 0'
done > pre-cutover-counts.txt
```
## Phase 3 — Deploy v2 backend (scratch stack first)
Use a developer or staging machine with a **restored copy** of the prod
snapshot, not prod itself.
```bash
# Restore the prod snapshot onto the scratch Qdrant
docker compose exec backend curl -X POST \
"http://qdrant:6333/collections/mem0_legacy/snapshots/upload?priority=snapshot" \
-H "Content-Type: multipart/form-data" \
-F "snapshot=@/backups/qdrant/<snapshot-file>"
# (or restore as 'mem0' if you want to start with the legacy name)
# Build + start the v2 backend
docker compose build --no-cache backend
docker compose up -d
docker compose logs -f backend
```
Watch for:
- `Applied Claude/OpenAI-compatible patch: cleared top_p (and store)` — patch loaded.
- `Initialized ultra-minimal Mem0Manager with custom endpoint` — startup OK.
- No errors mentioning `graph_store` or `enable_graph` (we removed them).
- On any search/get_all: no `ValueError` from filters.
## Phase 4 — Rebuild the Qdrant collection for BM25
Pre-v2 collections lack the `bm25` sparse-vector slot. mem0 v2 silently
downgrades to semantic-only on them — to get full hybrid search you must
recreate the collection.
```bash
# 1. Set the env var so the v2 backend creates a NEW collection with the right schema
docker compose exec backend sh -c 'QDRANT_COLLECTION_NAME=mem0_v3 \
python -c "from mem0_manager import mem0_manager; \
mem0_manager.memory.add([{\"role\":\"user\",\"content\":\"warm-up\"}], user_id=\"__warmup__\")"'
# This lazy-creates mem0_v3 + mem0_v3_entities with the bm25 slot.
# (Delete the warm-up memory after if you care.)
# 2. Run the migration script — preserves id + vector + payload, no re-embed
docker compose exec backend python /app/../scripts/migrate_qdrant_to_v3.py \
--source mem0 --target mem0_v3 \
--qdrant-host qdrant --qdrant-port 6333 --dry-run
# Inspect the per-user counts. If OK, run for real:
docker compose exec backend python /app/../scripts/migrate_qdrant_to_v3.py \
--source mem0 --target mem0_v3 \
--qdrant-host qdrant --qdrant-port 6333
# 3. Swap names. Qdrant has no in-place rename — use snapshot+upload.
# Snapshot mem0_v3, upload as mem0_swap, then snapshot mem0 as mem0_legacy, then
# upload mem0_swap as mem0. Or simply point QDRANT_COLLECTION_NAME at mem0_v3 in
# docker-compose.yml and keep `mem0` around as the legacy backup.
```
Easiest path: **leave the legacy collection alone** and update
`QDRANT_COLLECTION_NAME` to `mem0_v3` in `.env` / `docker-compose.yml`. The
legacy `mem0` collection sits there as an extra backup until you delete it.
## Phase 5 — Integration tests
```bash
MEM0_API_KEY=<dev-key-mapped-to-test-user> python test_integration.py -v
```
The test script generates a fresh `TEST_USER` per run — make sure the supplied
API key maps to that user (see CLAUDE.md "There are no unit tests..." note).
Expected: all pass. The `/graph/relationships/{user_id}` test should accept the
new `deprecated: true` payload.
## Phase 6 — Production cutover
Maintenance window ~30 min.
1. Communicate the window.
2. Re-snapshot Qdrant immediately before the deploy (so the rollback snapshot
is the freshest possible).
3. `git pull` the migration branch (or merge to main first).
4. `docker compose build --no-cache backend && docker compose up -d backend`.
5. Run the Phase 4 collection rebuild on prod.
6. Smoke test: `/health`, one `/chat` round-trip, one `/memories` write, one
`/memories/search` read.
7. Verify per-user counts match `pre-cutover-counts.txt` (use the same loop).
## Rollback
### Before the first v2 write hits prod (fully safe)
```bash
git revert <migration-commit-sha>
docker compose build --no-cache backend
docker compose up -d backend
```
### After cutover but snapshot still on disk (loses post-cutover writes)
```bash
# Stop the backend so no more writes land on the v2 collection
docker compose stop backend
# Restore the pre-cutover Qdrant snapshot to a fresh name, then swap
docker compose exec qdrant curl -X POST \
"http://qdrant:6333/collections/mem0_rollback/snapshots/upload?priority=snapshot" \
-H "Content-Type: multipart/form-data" \
-F "snapshot=@/qdrant/snapshots/<pre-cutover-snapshot>"
# Update QDRANT_COLLECTION_NAME=mem0_rollback or rename via snapshot+upload.
# Restore Neo4j if needed
docker run --rm \
--volumes-from mem0-neo4j \
-v "$(pwd)/backups/neo4j:/dumps" \
neo4j:5.26.4 \
neo4j-admin database load neo4j --from-path=/dumps --overwrite-destination=true
# Revert code and restart
git revert <migration-commit-sha>
docker compose build --no-cache backend
docker compose up -d backend
```
### After snapshot retention expires
**Irreversible.** Keep the pre-cutover snapshot and Neo4j dump for ≥30 days.

View file

@ -1,97 +0,0 @@
#!/usr/bin/env bash
#
# Qdrant snapshot + off-host rotation.
#
# Snapshots both collections (mem0_v3 + mem0_v3_entities) back-to-back via the
# Qdrant REST API, downloads them to a date-stamped local directory, uploads to
# the configured rclone remote, prunes local copies older than 14 days, and
# emits a Prometheus textfile metric for future scrape.
#
# Env vars (override defaults):
# QDRANT_CONTAINER container name (default: mem0-qdrant)
# COLLECTIONS space-separated collection names
# (default: "mem0_v3 mem0_v3_entities")
# BACKUP_DIR local backup root
# (default: ~/aistuff/mem0/backups/qdrant)
# RCLONE_REMOTE rclone remote path (e.g. b2:mem0-backups/qdrant).
# If unset, off-host upload is skipped.
# LOCAL_RETENTION_DAYS how long to keep local copies (default: 14)
# TEXTFILE_DIR Prometheus node_exporter textfile collector dir
# (default: /var/lib/node_exporter/textfile_collector,
# skipped if the dir does not exist)
#
# Suggested cron (daily at 03:00 UTC):
# 0 3 * * * RCLONE_REMOTE=b2:mem0-backups/qdrant /home/ubuntu/aistuff/mem0/scripts/backup_qdrant.sh >> /home/ubuntu/aistuff/mem0/backups/backup.log 2>&1
#
# Exit codes:
# 0 success
# 1 snapshot/download failure
# 2 rclone failure (after local download succeeded)
set -euo pipefail
QDRANT_CONTAINER="${QDRANT_CONTAINER:-mem0-qdrant}"
COLLECTIONS="${COLLECTIONS:-mem0_v3 mem0_v3_entities}"
BACKUP_DIR="${BACKUP_DIR:-$HOME/aistuff/mem0/backups/qdrant}"
RCLONE_REMOTE="${RCLONE_REMOTE:-}"
LOCAL_RETENTION_DAYS="${LOCAL_RETENTION_DAYS:-14}"
TEXTFILE_DIR="${TEXTFILE_DIR:-/var/lib/node_exporter/textfile_collector}"
TS="$(date -u +%Y%m%dT%H%M%SZ)"
DAY="$(date -u +%Y-%m-%d)"
TARGET_DIR="$BACKUP_DIR/$DAY"
mkdir -p "$TARGET_DIR"
log() { printf '[%s] %s\n' "$(date -u +%FT%TZ)" "$*"; }
log "starting backup ts=$TS dir=$TARGET_DIR collections=$COLLECTIONS"
total_bytes=0
for col in $COLLECTIONS; do
log "snapshot create: $col"
resp=$(docker exec "$QDRANT_CONTAINER" curl -fsS -X POST \
"http://localhost:6333/collections/$col/snapshots?wait=true")
snap_name=$(printf '%s' "$resp" \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["result"]["name"])')
out_file="$TARGET_DIR/${col}_${TS}_${snap_name}"
log "snapshot download: $col/$snap_name -> $out_file"
docker cp "$QDRANT_CONTAINER:/qdrant/snapshots/$col/$snap_name" "$out_file"
# Remove the in-container snapshot to avoid disk bloat on the volume.
docker exec "$QDRANT_CONTAINER" curl -fsS -X DELETE \
"http://localhost:6333/collections/$col/snapshots/$snap_name" >/dev/null
size=$(stat -c %s "$out_file" 2>/dev/null || stat -f %z "$out_file")
total_bytes=$((total_bytes + size))
log "downloaded: $out_file ($size bytes)"
done
if [ -n "$RCLONE_REMOTE" ]; then
log "rclone copy: $TARGET_DIR -> $RCLONE_REMOTE/$DAY"
if ! rclone copy "$TARGET_DIR" "$RCLONE_REMOTE/$DAY"; then
log "rclone failed (local copies retained)"
exit 2
fi
else
log "RCLONE_REMOTE unset; skipping off-host upload"
fi
log "pruning local copies older than $LOCAL_RETENTION_DAYS days"
find "$BACKUP_DIR" -mindepth 1 -maxdepth 1 -type d -mtime "+$LOCAL_RETENTION_DAYS" -exec rm -rf {} +
if [ -d "$TEXTFILE_DIR" ]; then
tmp="$(mktemp)"
{
echo "# HELP qdrant_last_backup_timestamp_seconds Unix timestamp of last successful Qdrant backup."
echo "# TYPE qdrant_last_backup_timestamp_seconds gauge"
echo "qdrant_last_backup_timestamp_seconds $(date -u +%s)"
echo "# HELP qdrant_last_backup_bytes Total bytes of last successful Qdrant backup."
echo "# TYPE qdrant_last_backup_bytes gauge"
echo "qdrant_last_backup_bytes $total_bytes"
} > "$tmp"
mv "$tmp" "$TEXTFILE_DIR/qdrant_backup.prom"
log "textfile metric written: $TEXTFILE_DIR/qdrant_backup.prom"
fi
log "backup complete: $total_bytes bytes across $(echo "$COLLECTIONS" | wc -w) collection(s)"

View file

@ -1,100 +0,0 @@
#!/usr/bin/env python3
"""Delete memories that were imported from Neo4j (tagged metadata.source='neo4j_legacy_import').
Usage:
docker compose exec backend python /tmp/cleanup.py --user alice
docker compose exec backend python /tmp/cleanup.py --users alice,hetashree,manju
docker compose exec backend python /tmp/cleanup.py --dry-run --users alice
"""
import argparse
import json
import os
import re
import sys
import httpx
from qdrant_client import QdrantClient
from qdrant_client.http import models
QDRANT_HOST = os.environ.get("QDRANT_HOST", "qdrant")
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION_NAME", "mem0_v3")
MEM0_URL = os.environ.get("MEM0_URL", "http://localhost:8000")
def load_api_keys() -> dict:
raw = os.environ.get("API_KEYS", "{}")
keys = json.loads(raw)
inv: dict = {}
for k, u in keys.items():
inv.setdefault(u, k)
return inv
def scroll_legacy_for(client: QdrantClient, user_id: str) -> list:
"""Return all memory IDs in mem0_v3 with source='neo4j_legacy_import' for the user."""
ids = []
offset = None
while True:
points, offset = client.scroll(
collection_name=QDRANT_COLLECTION,
scroll_filter=models.Filter(
must=[
models.FieldCondition(key="user_id", match=models.MatchValue(value=user_id)),
models.FieldCondition(key="source", match=models.MatchValue(value="neo4j_legacy_import")),
]
),
limit=128,
with_payload=False,
with_vectors=False,
offset=offset,
)
ids.extend(p.id for p in points)
if offset is None:
break
return ids
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--user", help="Single user to clean")
ap.add_argument("--users", help="Comma-separated user list")
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
targets = []
if args.user:
targets.append(args.user)
if args.users:
targets.extend(u.strip() for u in args.users.split(","))
if not targets:
print("Usage: --user X or --users a,b,c", file=sys.stderr)
return 2
api_keys = load_api_keys()
client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
http = httpx.Client(timeout=60.0)
for user_id in targets:
ids = scroll_legacy_for(client, user_id)
print(f"{user_id}: found {len(ids)} legacy-import memories")
if args.dry_run:
continue
key = api_keys.get(user_id)
if not key:
print(f" no API key for {user_id} — skipping")
continue
deleted = errors = 0
for mid in ids:
r = http.delete(f"{MEM0_URL}/memories/{mid}", headers={"X-API-Key": key})
if r.status_code == 200:
deleted += 1
else:
errors += 1
print(f" failed {mid}: HTTP {r.status_code} {r.text[:120]}")
print(f" deleted={deleted} errors={errors}")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -1,273 +0,0 @@
#!/usr/bin/env python3
"""Import Neo4j graph relationships into mem0 v2 as natural-language memories.
For each `(source)-[rel_type]->(destination)` edge in the Neo4j graph, POST a
short sentence ("source verb destination") to mem0's `/memories` endpoint.
mem0's v3 fact-extraction + entity-linking pipeline handles the rest.
Tagged with metadata.source="neo4j_legacy_import" so the imports can be
identified (or bulk-deleted) later via the standard /memories endpoints.
Usage (inside the mem0-backend container, which is on the same docker network
as neo4j-temp and the mem0 backend itself):
# Dry-run — show sample sentences without posting:
docker compose exec backend python /tmp/import.py --dry-run --limit 30
# Single-user POC:
docker compose exec backend python /tmp/import.py --user akshat
# Full sweep:
docker compose exec backend python /tmp/import.py
Environment variables (all have sensible defaults for the beast deployment):
NEO4J_URI bolt://neo4j-temp:7687
NEO4J_USER neo4j
NEO4J_PASS mem0_neo4j_password
MEM0_URL http://localhost:8000
API_KEYS JSON mapping {api_key: user_id} (already in container env)
"""
import argparse
import json
import os
import sys
import time
from typing import Optional
import httpx
from neo4j import GraphDatabase
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://neo4j-temp:7687")
NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j")
NEO4J_PASS = os.environ.get("NEO4J_PASS", "mem0_neo4j_password")
MEM0_URL = os.environ.get("MEM0_URL", "http://localhost:8000")
# LiteLLM proxy for sentence rewriting (--llm-rewrite mode)
LLM_URL = os.environ.get("LLM_URL", os.environ.get("OPENAI_BASE_URL", "https://veronica.pratikn.com"))
LLM_KEY = os.environ.get("LLM_KEY", os.environ.get("OPENAI_API_KEY", ""))
LLM_MODEL = os.environ.get("LLM_MODEL", "minimax-m2")
REWRITE_PROMPT = """You are converting a knowledge-graph relationship into a natural English sentence for a personal memory system.
The user we're building memories for is identified by user_id="{user_id}". The graph may reference them by variant names (full name, "user_id:_{user_id}", nicknames). Treat all of those as the same person — in your output sentence, use the name "{user_id}" (capitalized appropriately) so the memory layer can link it consistently.
Convert this tuple into ONE natural English sentence:
source: {source}
relationship: {rel}
destination: {dest}
Rules:
- Output ONLY the sentence. No quotes, no preamble, no explanation, no markdown.
- Use proper capitalization, grammar, and convert snake_case names to natural words ("custom_headers" "custom headers").
- If the source IS the user, write a third-person sentence starting with "{user_id_cap}" (e.g., "{user_id_cap} works at TechCorp.").
- If the source is a non-user entity, write a third-person sentence about that entity (e.g., "Dr Seta works at Chitra.").
- If the tuple doesn't translate to a meaningful, retrievable fact (pure identifier metadata like a postal code, generic world knowledge like a timezone offset, or self-references), respond with exactly: SKIP"""
def load_api_keys() -> dict:
raw = os.environ.get("API_KEYS", "{}")
keys = json.loads(raw)
inv: dict = {}
for k, u in keys.items():
inv.setdefault(u, k)
return inv
def humanize(name: str, user_id: Optional[str] = None) -> str:
"""snake_case node name → human-readable. __User__ nodes collapse to the user_id."""
if not name:
return "<unknown>"
# The __User__ label uses name = "user_id:_<username>"
if name.startswith("user_id:_") and user_id:
return user_id
return name.replace("_", " ")
def humanize_rel(rel_type: str) -> str:
return rel_type.replace("_", " ")
def build_sentence(source: str, rel_type: str, dest: str, user_id: str) -> str:
return f"{humanize(source, user_id)} {humanize_rel(rel_type)} {humanize(dest, user_id)}."
_llm_cache: dict = {}
def llm_rewrite(
source: str,
rel_type: str,
dest: str,
user_id: str,
client: httpx.Client,
) -> Optional[str]:
"""Ask minimax-m2 to convert the tuple to a natural sentence. Returns None for SKIP."""
key = (source, rel_type, dest, user_id)
if key in _llm_cache:
return _llm_cache[key]
prompt = REWRITE_PROMPT.format(
user_id=user_id,
user_id_cap=user_id[:1].upper() + user_id[1:],
source=source,
rel=rel_type,
dest=dest,
)
resp = client.post(
f"{LLM_URL}/v1/chat/completions",
headers={"Authorization": f"Bearer {LLM_KEY}"},
json={
"model": LLM_MODEL,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 400,
"temperature": 0.2,
},
timeout=60.0,
)
resp.raise_for_status()
data = resp.json()
content = (data["choices"][0]["message"].get("content") or "").strip()
# Strip wrapping quotes/whitespace
content = content.strip(" \"'\n\r\t")
if not content or content.upper().strip(" .!") == "SKIP":
_llm_cache[key] = None
return None
_llm_cache[key] = content
return content
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--user", help="Single user_id to migrate (default: all)")
ap.add_argument("--limit", type=int, help="Max relationships to process")
ap.add_argument("--dry-run", action="store_true", help="Print sentences only, no POSTs")
ap.add_argument("--neo4j-uri", default=NEO4J_URI)
ap.add_argument("--mem0-url", default=MEM0_URL)
ap.add_argument(
"--llm-rewrite",
action="store_true",
help="Use minimax-m2 (via veronica.pratikn.com) to rewrite tuples into natural English sentences",
)
ap.add_argument("--llm-url", default=LLM_URL)
ap.add_argument("--llm-model", default=LLM_MODEL)
args = ap.parse_args()
api_keys = load_api_keys()
# Build the cypher query
where = "WHERE n.user_id IS NOT NULL AND m.user_id IS NOT NULL"
params: dict = {}
if args.user:
where += " AND n.user_id = $uid"
params["uid"] = args.user
limit_clause = f"LIMIT {args.limit}" if args.limit else ""
cypher = (
f"MATCH (n)-[r]->(m) {where} "
"RETURN n.name AS source, type(r) AS rel, m.name AS dest, n.user_id AS user_id "
f"{limit_clause}"
)
driver = GraphDatabase.driver(args.neo4j_uri, auth=(NEO4J_USER, NEO4J_PASS))
with driver.session() as session:
edges = list(session.run(cypher, **params))
driver.close()
print(f"Found {len(edges)} relationships")
client = httpx.Client(timeout=180.0)
def make_sentence(e) -> Optional[str]:
if args.llm_rewrite:
return llm_rewrite(e["source"], e["rel"], e["dest"], e["user_id"], client)
return build_sentence(e["source"], e["rel"], e["dest"], e["user_id"])
if args.dry_run:
n_show = min(30, len(edges))
print(f"\nSample sentences (first {n_show})"
+ (" [LLM-rewritten]" if args.llm_rewrite else "") + ":")
for e in edges[:n_show]:
s = make_sentence(e)
tup = f"({e['source']}, {e['rel']}, {e['dest']})"
marker = "SKIP" if s is None else s
print(f" [{e['user_id']}] {tup} -> {marker}")
if len(edges) > n_show:
print(f" ... and {len(edges) - n_show} more")
# Show per-user breakdown
by_user: dict = {}
for e in edges:
by_user[e["user_id"]] = by_user.get(e["user_id"], 0) + 1
print(f"\nBy user: {by_user}")
return 0
timestamp = int(time.time())
stats = {
"posted": 0,
"skipped_no_key": 0,
"skipped_llm": 0,
"errors": 0,
"extracted_total": 0,
"no_facts": 0,
}
for i, edge in enumerate(edges, 1):
user_id = edge["user_id"]
api_key = api_keys.get(user_id)
if not api_key:
stats["skipped_no_key"] += 1
continue
try:
sentence = make_sentence(edge)
except Exception as exc:
stats["errors"] += 1
print(f" [{i}/{len(edges)}] LLM REWRITE EXCEPTION: {exc}")
continue
if sentence is None:
stats["skipped_llm"] += 1
continue
body = {
"user_id": user_id,
"messages": [{"role": "user", "content": sentence}],
"metadata": {
"source": "neo4j_legacy_import",
"neo4j_rel_type": edge["rel"],
"import_timestamp": timestamp,
},
}
try:
r = client.post(
f"{args.mem0_url}/memories",
headers={"X-API-Key": api_key},
json=body,
)
if r.status_code != 200:
stats["errors"] += 1
print(
f" [{i}/{len(edges)}] {user_id} {sentence!r} HTTP {r.status_code} "
f"{r.text[:120]}"
)
continue
data = r.json()
results = (data.get("added_memories") or [{}])[0].get("results", [])
stats["extracted_total"] += len(results)
if not results:
stats["no_facts"] += 1
stats["posted"] += 1
if i % 5 == 0 or i == len(edges):
print(
f" [{i}/{len(edges)}] posted={stats['posted']} "
f"extracted={stats['extracted_total']} "
f"no_facts={stats['no_facts']} "
f"llm_skipped={stats['skipped_llm']} "
f"no_key={stats['skipped_no_key']} errors={stats['errors']}"
)
except Exception as exc:
stats["errors"] += 1
print(f" [{i}/{len(edges)}] EXCEPTION: {exc}")
print(f"\nDONE: {stats}")
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -1,189 +0,0 @@
#!/usr/bin/env python3
"""Migrate a legacy mem0 v0.1.x/v1.x Qdrant collection to a v2-compatible one.
Why this script exists
----------------------
mem0 v2 stores a `bm25` sparse vector alongside each dense vector to enable
hybrid search. Pre-v2 collections lack that slot mem0's Qdrant adapter
silently downgrades to semantic-only writes on them. To unlock BM25 you must
recreate the collection with the sparse slot AND copy the existing points
over (preserving id, vector, payload no re-embed needed).
How it works
------------
1. Connect to Qdrant.
2. Scroll all points from the source collection (with vectors + payload).
3. Upsert them into the target collection in batches.
4. Verify counts match per `user_id`.
The target collection MUST already exist with the BM25 slot. The recommended
way to create it is to boot the v2 backend pointed at `QDRANT_COLLECTION_NAME=<target>`
and trigger one `add()` call mem0 lazy-creates the collection (and the sister
`<target>_entities` collection) with the right schema.
Usage
-----
# Dry run (no writes):
python scripts/migrate_qdrant_to_v3.py \\
--source mem0 --target mem0_v3 \\
--qdrant-host localhost --qdrant-port 6333 \\
--dry-run
# Real migration:
python scripts/migrate_qdrant_to_v3.py \\
--source mem0 --target mem0_v3 \\
--qdrant-host localhost --qdrant-port 6333
# From inside the backend container (where Qdrant resolves as `qdrant`):
docker compose exec backend python /app/../scripts/migrate_qdrant_to_v3.py \\
--source mem0 --target mem0_v3 --qdrant-host qdrant --qdrant-port 6333
Prereqs
-------
- qdrant-client>=1.12.0 installed
- A fresh Qdrant snapshot of the source collection (see docs/MIGRATION_RUNBOOK.md)
- The target collection created via a v2 backend warm-up add()
"""
import argparse
import sys
from collections import Counter
from typing import Optional
from qdrant_client import QdrantClient
from qdrant_client.http import models
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--source", required=True, help="Source (legacy) collection name")
p.add_argument("--target", required=True, help="Target (v2-created) collection name")
p.add_argument("--qdrant-host", default="localhost")
p.add_argument("--qdrant-port", type=int, default=6333)
p.add_argument("--batch-size", type=int, default=256, help="Scroll/upsert batch size")
p.add_argument("--dry-run", action="store_true", help="Read-only — show counts, no writes")
return p.parse_args()
def collection_must_exist(client: QdrantClient, name: str) -> models.CollectionInfo:
if not client.collection_exists(name):
print(f"ERROR: collection {name!r} does not exist on Qdrant.", file=sys.stderr)
sys.exit(2)
return client.get_collection(name)
def verify_target_has_bm25(target_info: models.CollectionInfo) -> None:
sparse = getattr(target_info.config.params, "sparse_vectors", None)
if not sparse or "bm25" not in sparse:
print(
"ERROR: target collection has no `bm25` sparse-vector slot. Did you create "
"it via a v2 backend warm-up add()? See docs/MIGRATION_RUNBOOK.md.",
file=sys.stderr,
)
sys.exit(2)
def count_per_user(client: QdrantClient, collection: str) -> Counter:
counts: Counter = Counter()
offset: Optional[models.PointId] = None
while True:
points, offset = client.scroll(
collection_name=collection,
limit=1024,
with_payload=["user_id"],
with_vectors=False,
offset=offset,
)
for p in points:
uid = (p.payload or {}).get("user_id", "<none>")
counts[uid] += 1
if offset is None:
break
return counts
def migrate(
client: QdrantClient, source: str, target: str, batch_size: int, dry_run: bool
) -> int:
transferred = 0
offset: Optional[models.PointId] = None
while True:
points, offset = client.scroll(
collection_name=source,
limit=batch_size,
with_payload=True,
with_vectors=True,
offset=offset,
)
if not points:
break
if not dry_run:
client.upsert(
collection_name=target,
points=[
models.PointStruct(id=p.id, vector=p.vector, payload=p.payload)
for p in points
],
wait=True,
)
transferred += len(points)
print(f" ... transferred {transferred} points")
if offset is None:
break
return transferred
def main() -> None:
args = parse_args()
client = QdrantClient(host=args.qdrant_host, port=args.qdrant_port)
src_info = collection_must_exist(client, args.source)
tgt_info = collection_must_exist(client, args.target)
verify_target_has_bm25(tgt_info)
src_count = client.count(args.source, exact=True).count
tgt_count_before = client.count(args.target, exact=True).count
print(f"Source {args.source!r}: {src_count} points")
print(f"Target {args.target!r}: {tgt_count_before} points (before)")
if tgt_count_before > 1:
print(
"WARNING: target collection is non-empty (>1 point). Migration will "
"upsert into it; ids collide → existing points overwritten."
)
print("\nPer-user count (source):")
src_per_user = count_per_user(client, args.source)
for uid, c in src_per_user.most_common():
print(f" {uid}: {c}")
if args.dry_run:
print("\nDRY RUN — no writes performed.")
sys.exit(0)
print("\nMigrating points (preserving id + vector + payload, no re-embed)...")
transferred = migrate(client, args.source, args.target, args.batch_size, dry_run=False)
tgt_count_after = client.count(args.target, exact=True).count
print(f"\nDone. Transferred {transferred} points.")
print(f"Target {args.target!r}: {tgt_count_after} points (after)")
print("\nPer-user count (target, after):")
tgt_per_user = count_per_user(client, args.target)
mismatches = 0
for uid, src_c in src_per_user.most_common():
tgt_c = tgt_per_user.get(uid, 0)
marker = "OK" if tgt_c == src_c else f"MISMATCH ({tgt_c})"
print(f" {uid}: src={src_c} tgt={tgt_c} [{marker}]")
if tgt_c != src_c:
mismatches += 1
if mismatches:
print(f"\nERROR: {mismatches} user(s) have count mismatches. Investigate before swap.", file=sys.stderr)
sys.exit(3)
print("\nAll per-user counts match. Safe to proceed with collection swap.")
if __name__ == "__main__":
main()

View file

@ -1,81 +0,0 @@
#!/usr/bin/env bash
#
# Weekly Qdrant restore sanity check.
#
# Finds the most recent backup tarball for SOURCE_COLLECTION, restores it into
# a transient collection, asserts the restored point count >= production, and
# cleans up. Exits non-zero on any failure so cron alerting catches it.
#
# Env vars:
# QDRANT_CONTAINER container name (default: mem0-qdrant)
# BACKUP_DIR local backup root
# (default: ~/aistuff/mem0/backups/qdrant)
# SOURCE_COLLECTION collection to verify (default: mem0_v3)
# TEST_COLLECTION transient collection name
# (default: mem0_v3_restore_test)
#
# Suggested cron (weekly Sunday 04:00 UTC):
# 0 4 * * 0 /home/ubuntu/aistuff/mem0/scripts/restore_test.sh >> /home/ubuntu/aistuff/mem0/backups/restore_test.log 2>&1
set -euo pipefail
QDRANT_CONTAINER="${QDRANT_CONTAINER:-mem0-qdrant}"
BACKUP_DIR="${BACKUP_DIR:-$HOME/aistuff/mem0/backups/qdrant}"
SOURCE_COLLECTION="${SOURCE_COLLECTION:-mem0_v3}"
TEST_COLLECTION="${TEST_COLLECTION:-mem0_v3_restore_test}"
log() { printf '[%s] %s\n' "$(date -u +%FT%TZ)" "$*"; }
# Pick the most recent backup for SOURCE_COLLECTION. The "_[0-9]*" suffix
# anchors on the timestamp digit so a collection named "mem0_v3" does NOT
# also match "mem0_v3_entities_*" files in the same directory.
latest="$(find "$BACKUP_DIR" -type f -name "${SOURCE_COLLECTION}_[0-9]*" -printf '%T@ %p\n' 2>/dev/null \
| sort -nr | head -1 | cut -d' ' -f2-)"
if [ -z "$latest" ]; then
log "ERROR: no backup found under $BACKUP_DIR matching ${SOURCE_COLLECTION}_[0-9]*"
exit 1
fi
log "latest backup: $latest"
prod_count=$(docker exec "$QDRANT_CONTAINER" curl -fsS -X POST \
"http://localhost:6333/collections/$SOURCE_COLLECTION/points/count" \
-H "Content-Type: application/json" \
-d '{"exact":true}' \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["result"]["count"])')
log "production count ($SOURCE_COLLECTION): $prod_count"
# Drop any leftover test collection from a previous failed run.
docker exec "$QDRANT_CONTAINER" curl -fsS -X DELETE \
"http://localhost:6333/collections/$TEST_COLLECTION" >/dev/null 2>&1 || true
snap_basename="$(basename "$latest")"
log "copying snapshot into container: /tmp/$snap_basename"
docker cp "$latest" "$QDRANT_CONTAINER:/tmp/$snap_basename"
# Use the upload endpoint (multipart) rather than recover-from-URL —
# file:// recovery is disabled by default in recent Qdrant (returns 403),
# and upload doesn't need an allowlist.
log "restoring into $TEST_COLLECTION via /snapshots/upload"
docker exec "$QDRANT_CONTAINER" curl -fsS -X POST \
"http://localhost:6333/collections/$TEST_COLLECTION/snapshots/upload?priority=snapshot" \
-H "Content-Type: multipart/form-data" \
-F "snapshot=@/tmp/$snap_basename" \
>/dev/null
restored_count=$(docker exec "$QDRANT_CONTAINER" curl -fsS -X POST \
"http://localhost:6333/collections/$TEST_COLLECTION/points/count" \
-H "Content-Type: application/json" \
-d '{"exact":true}' \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["result"]["count"])')
log "restored count: $restored_count"
# Cleanup whether or not the assertion passes.
docker exec "$QDRANT_CONTAINER" curl -fsS -X DELETE \
"http://localhost:6333/collections/$TEST_COLLECTION" >/dev/null
docker exec "$QDRANT_CONTAINER" rm -f "/tmp/$snap_basename"
if [ "$restored_count" -lt "$prod_count" ]; then
log "FAIL: restored=$restored_count < production=$prod_count"
exit 1
fi
log "OK: restored=$restored_count >= production=$prod_count"

View file

@ -3,7 +3,7 @@
set -e
COMPOSE_PROJECT_NAME="${COMPOSE_PROJECT_NAME:-mem0}"
VOLUMES=("qdrant_data")
VOLUMES=("qdrant_data" "neo4j_data" "neo4j_logs" "neo4j_import" "neo4j_plugins")
echo "=========================================="
echo " Mem0 Setup Script"

View file

@ -1,102 +0,0 @@
"""End-to-end compatibility test: drive the real mem0ai MemoryClient against
this server. Run INSIDE the backend container (which has mem0ai==2.0.2):
ssh beast 'cd ~/aistuff/mem0 && docker compose exec -T backend python' < test_sdk_compat.py
Requires MEM0_API_KEY in the environment (mapped to user 'pratik'). Exercises the
full SDK surface and cleans up the memories it creates (scoped to a throwaway
agent_id so it never touches real memories).
"""
import os
import sys
import time
from mem0 import MemoryClient
KEY = os.environ.get("MEM0_API_KEY")
if not KEY:
sys.exit("set MEM0_API_KEY (mapped to user 'pratik')")
HOST = os.environ.get("MEM0_HOST", "https://memory.pratikn.com")
USER = os.environ.get("MEM0_TEST_USER", "pratik")
AGENT = "sdk_compat_test" # isolates test data from real memories
results = []
def check(name, cond, info=""):
results.append(bool(cond))
print(("PASS " if cond else "FAIL "), name, "--", str(info)[:240])
# --- construct (hits GET /v1/ping/, validates Token auth + org/project) ---
try:
c = MemoryClient(host=HOST, api_key=KEY)
check("construct", True, f"user_email={c.user_email} org={c.org_id} proj={c.project_id}")
except Exception as e:
check("construct", False, repr(e))
print("\nRESULT: RED (cannot construct client)")
sys.exit(1)
# --- add (POST /v3/memories/add/) ---
probe = f"SDK compat probe {int(time.time())}: Pratik is validating the mem0 SDK compatibility layer and uses FastAPI."
try:
r = c.add(probe, user_id=USER, agent_id=AGENT)
check("add", isinstance(r, dict), r)
except Exception as e:
check("add", False, repr(e))
time.sleep(3) # allow async extraction/indexing to settle
# --- search (POST /v3/memories/search/) ---
try:
s = c.search("mem0 SDK compatibility layer", filters={"user_id": USER, "agent_id": AGENT})
check("search.shape", isinstance(s, dict) and "results" in s, s)
except Exception as e:
check("search.shape", False, repr(e))
# --- get_all (POST /v3/memories/) ---
ids = []
try:
g = c.get_all(filters={"user_id": USER, "agent_id": AGENT})
ok = isinstance(g, dict) and "results" in g and "count" in g
check("get_all.shape", ok, g)
ids = [m.get("id") for m in (g.get("results") or []) if m.get("id")]
except Exception as e:
check("get_all.shape", False, repr(e))
mid = ids[0] if ids else None
print(f" (created {len(ids)} memory id(s) under agent={AGENT})")
# --- item ops (best-effort; depend on extraction producing >=1 fact) ---
if mid:
try:
one = c.get(mid)
check("get", isinstance(one, dict) and one.get("id") == mid, one)
except Exception as e:
check("get", False, repr(e))
try:
h = c.history(mid)
check("history.is_list", isinstance(h, list), h)
except Exception as e:
check("history.is_list", False, repr(e))
try:
u = c.update(mid, text="SDK compat probe (updated)")
check("update", isinstance(u, dict), u)
except Exception as e:
check("update", False, repr(e))
# --- cleanup: delete only the ids we created ---
deleted = 0
for i in ids:
try:
c.delete(i)
deleted += 1
except Exception as e:
print(" delete error", i, repr(e))
print(f" cleanup: deleted {deleted}/{len(ids)}")
green = results and all(results)
print("\nRESULT:", "GREEN" if green else "RED", f"({sum(results)}/{len(results)} checks passed)")
sys.exit(0 if green else 1)