Compare commits
No commits in common. "chore/mem0-v3-migration" and "main" have entirely different histories.
chore/mem0
...
main
19 changed files with 362 additions and 1926 deletions
84
CLAUDE.md
84
CLAUDE.md
|
|
@ -1,84 +0,0 @@
|
|||
# CLAUDE.md
|
||||
|
||||
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
|
||||
|
||||
## Project overview
|
||||
|
||||
FastAPI backend that wraps the `mem0ai` SDK (pinned to `mem0ai[nlp]==2.0.2` — the "V3 memory pipeline") to expose memory operations (add/search/update/delete, plus memory-aware chat) over a REST API, an OpenAI-compatible `/v1/chat/completions` endpoint, and an MCP server. Memory is stored in Qdrant (vectors + BM25 sparse), with a sister `{collection}_entities` Qdrant collection auto-created by mem0 for entity linking. Embeddings come from a local Ollama instance; the LLM is a custom OpenAI-compatible endpoint. The frontend is two standalone HTML files (`index.html`, `graph.html`) that call the API directly — no build step.
|
||||
|
||||
## Common commands
|
||||
|
||||
```bash
|
||||
# First-time setup (creates volumes, builds, brings up stack). Prompts to reset volumes if they already exist.
|
||||
./setup.sh
|
||||
|
||||
# Day-to-day
|
||||
docker compose up -d --build # rebuild + start
|
||||
docker compose down # stop (keeps volumes)
|
||||
docker compose down -v # stop + delete data
|
||||
docker compose logs -f backend # tail backend logs (structlog JSON)
|
||||
docker compose restart backend # pick up code changes (no --reload; see "Volumes" gotcha)
|
||||
|
||||
# Sanity check — assumes a host route to backend:8000 exists (see "Networking" gotcha).
|
||||
curl http://localhost:8000/health
|
||||
|
||||
# Integration tests — hit the running stack, no mocks. See test_integration.py for the test list.
|
||||
MEM0_API_KEY=<key-from-API_KEYS> python test_integration.py
|
||||
MEM0_API_KEY=<key-from-API_KEYS> python test_integration.py -v
|
||||
```
|
||||
|
||||
There are no unit tests and no separate lint/format/type-check setup — `test_integration.py` is the only test entry point and it requires a fully-running Docker stack. The script generates a fresh `TEST_USER = f"test_user_{int(datetime.now().timestamp())}"` per run, so for tests to pass auth checks the supplied `MEM0_API_KEY` must map to that exact user in `API_KEYS` (either set `TEST_USER` to a statically mapped user, or add a mapping for the run).
|
||||
|
||||
## Architecture
|
||||
|
||||
### Request flow
|
||||
1. Client hits FastAPI (`backend/main.py`) with `X-API-Key` (or `Authorization: Bearer` for `/v1/chat/completions`).
|
||||
2. `auth.py` resolves the key → `user_id` via `settings.api_key_mapping` (parsed from the `API_KEYS` env JSON). Every protected endpoint then verifies the caller's `user_id` matches the path/body `user_id` — there is no admin or cross-user access.
|
||||
3. The endpoint calls `mem0_manager.mem0_manager` (singleton in `backend/mem0_manager.py`), which delegates to the `mem0ai` SDK. The SDK in turn calls Qdrant, Ollama, Cohere (reranker), and the custom OpenAI endpoint.
|
||||
4. The `@timed("operation_name")` decorator from `backend/monitoring.py` wraps memory operations to log structured timings and feed the in-memory `stats` singleton that powers `/stats` and `/stats/{user_id}`.
|
||||
|
||||
### Three parallel API surfaces
|
||||
All three live in the same FastAPI process and share auth + rate limiting:
|
||||
- **Native REST** (`/chat`, `/memories*`, `/graph/relationships/{user_id}` *(deprecated — returns empty payload)*, `/stats*`, `/models`, `/users`) — authenticates via `X-API-Key`.
|
||||
- **OpenAI-compatible** (`/v1/chat/completions`, also `/chat/completions`) — authenticates via `Authorization: Bearer <key>` or `X-API-Key`; supports streaming SSE. Implemented in `main.py:openai_chat_completions` and `stream_openai_response`.
|
||||
- **MCP** mounted at `/mcp` (see `backend/mcp_server.py`) — uses a Starlette `MCPAuthMiddleware` that stuffs the resolved `user_id` into a `ContextVar`, which the FastMCP tools (`add_memory`, `search_memory`, `remove_memory`, `chat`) read. The MCP session manager is started inside the main FastAPI `lifespan` in `main.py` — mounted-app lifespans don't run automatically, so don't move that startup logic.
|
||||
|
||||
### Storage layout
|
||||
- **Qdrant** — collection name from `QDRANT_COLLECTION_NAME` (default `mem0`). Embedding dim must match the embedder; see "Embedding dimensions" gotcha below. Collections created by mem0 v2 carry a `bm25` sparse-vector slot for hybrid search (semantic + keyword + entity-boost). The slot is added automatically at collection creation; existing pre-v2 collections silently degrade to semantic-only with a logged warning — they must be recreated to gain BM25.
|
||||
- **`{collection}_entities`** — sister Qdrant collection lazy-created by mem0 v2 on first `add()`, same dimension as the main collection. Stores entity vectors used for ranking boost. No code touches it directly.
|
||||
- **No graph store** — Neo4j and the OSS graph memory feature were removed in `mem0ai` 2.0.0 (PR #4805). The `/graph/relationships/{user_id}` endpoint is kept for client compatibility but returns `deprecated: true` with empty arrays.
|
||||
- **No SQL store** — older docs mention PostgreSQL/pgvector; that's no longer used. Qdrant only.
|
||||
|
||||
## Important conventions / gotchas
|
||||
|
||||
### Networking: backend is not published to the host
|
||||
`docker-compose.yml` defines the backend on the **external** `npm_network` (Nginx Proxy Manager) and only `expose`s port 8000 inside Docker. There is no `ports:` mapping. To hit it from the host you need either: (a) the NPM proxy in front of it, (b) `docker compose exec backend curl ...`, or (c) add a temporary `ports:` mapping. The `npm_network` must exist before bringing the stack up (`docker network create npm_network` if you don't run NPM).
|
||||
|
||||
### Claude/OpenAI-compatible monkey-patch
|
||||
`mem0_manager.py` patches `mem0.llms.openai.OpenAILLM.generate_response` at import time to clear `store` and `top_p` from the config. In `mem0ai>=2.0.0`, the `store` half is redundant (upstream made `store` opt-in) but kept as a harmless safety net. The `top_p` clearing is still load-bearing: Claude (reached via the custom OpenAI-compatible endpoint) rejects `top_p` whenever `temperature` is set, and `OpenAILLM` sends both unconditionally. If you upgrade `mem0ai` and chat starts 400-ing on the custom endpoint, this patch is the first place to look.
|
||||
|
||||
### Embedding model and dimensions are coupled
|
||||
`EMBEDDING_MODEL` and `EMBEDDING_DIMS` in `.env` / `docker-compose.yml` must agree, and they must match the dim the Qdrant collection was created with. Defaults are `qwen3-embedding:4b-q8_0` / `2560`. Switching the model requires either matching dims or recreating the Qdrant collection (`./setup.sh` → option 2 wipes volumes).
|
||||
|
||||
### Single-model architecture
|
||||
Despite what `README.md`, `TESTING.md`, and `MEM0.md` say about intelligent routing across `o4-mini` / `gemini-2.5-pro` / `claude-sonnet-4` / `o3`, the code uses **one** model — `settings.default_model` (`claude-sonnet-4` by default). `/models` returns only that. Don't reintroduce routing without first checking with the user.
|
||||
|
||||
### mem0 v2 API: filters dict + top_k
|
||||
mem0 v2 rejects `user_id`/`agent_id`/`run_id` as top-level kwargs on `Memory.search` and `Memory.get_all` (raises `ValueError`) — they must live inside a `filters={...}` dict. The `limit` kwarg is renamed `top_k` (default reduced 100 → 20 — pass it explicitly when you need more). `Memory.add` and `Memory.delete_all` still accept these IDs as top-level kwargs. Use the `_build_filters()` helper at the top of `mem0_manager.py` to construct the dict. Search `score` is now a fused multi-signal value (semantic + BM25 + entity boost), not raw cosine — don't compare against thresholds calibrated for the old scoring.
|
||||
|
||||
### ADD-only memory algorithm
|
||||
`Memory.add` in mem0 v2 only emits `ADD` events; the engine no longer issues `UPDATE`/`DELETE` events based on LLM judgment. Per-user memory count grows monotonically. Explicit `Memory.update` / `Memory.delete` still work and are how the project mutates memories.
|
||||
|
||||
### Auth & rate limiting
|
||||
- All endpoints except `/health` require a valid `X-API-Key` (or Bearer for the OpenAI-compatible routes). `API_KEYS` is a JSON object mapping keys → user IDs. Note this contradicts `AUTH_SETUP.md`, which lists `/stats` and `/models` as public — the code is authoritative.
|
||||
- Rate limits via `slowapi` are set per endpoint in `main.py` decorators: chat 30/min, writes 60/min, reads 120/min, bulk user-delete 10/min. Keyed by API key (fallback to remote IP).
|
||||
- Memory ownership is checked via `mem0_manager.verify_memory_ownership` (O(1) `Memory.get(memory_id)`) — use this rather than fetching all user memories.
|
||||
|
||||
### Config field aliases
|
||||
`backend/config.py` uses Pydantic `AliasChoices` so both `OPENAI_API_KEY` and `OPENAI_COMPAT_API_KEY` (and `OPENAI_BASE_URL` / `OPENAI_COMPAT_BASE_URL`) populate the same field. `docker-compose.yml` passes `OPENAI_API_KEY`; `.env.example` documents `OPENAI_COMPAT_API_KEY`. Both work.
|
||||
|
||||
### Volumes mount the source in, but no hot reload
|
||||
`docker-compose.yml` bind-mounts `./backend:/app` and `./frontend:/app/frontend`, and `uvicorn` runs with `--workers 4` and no `--reload`. Code edits become live only on container restart (`docker compose restart backend`).
|
||||
|
||||
### Logging
|
||||
Use `structlog.get_logger(__name__)` with **keyword arguments** (e.g. `logger.info("msg", user_id=x)`). The last few commits explicitly fixed places that mixed stdlib `logging` (which silently drops kwargs) with structlog. Don't reintroduce `logging.getLogger` in this codebase.
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
FROM python:3.12-slim
|
||||
FROM python:3.13-slim
|
||||
|
||||
# Set working directory
|
||||
WORKDIR /app
|
||||
|
|
|
|||
|
|
@ -130,39 +130,6 @@ async def get_current_user_openai(
|
|||
return auth_service.verify_api_key(api_key)
|
||||
|
||||
|
||||
async def get_current_user_platform(
|
||||
authorization: Optional[str] = Header(None),
|
||||
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
|
||||
) -> str:
|
||||
"""FastAPI dependency for the mem0 platform SDK (``MemoryClient``).
|
||||
|
||||
``MemoryClient`` authenticates with ``Authorization: Token <key>``. We also
|
||||
accept ``Bearer <key>`` and ``X-API-Key`` so the same routes work from curl
|
||||
and OpenAI-style clients. Resolves to the mapped user_id (raises 401 if the
|
||||
key is missing or invalid).
|
||||
"""
|
||||
api_key = None
|
||||
|
||||
if authorization:
|
||||
scheme, _, rest = authorization.partition(" ")
|
||||
if rest and scheme.lower() in ("token", "bearer"):
|
||||
api_key = rest.strip()
|
||||
else:
|
||||
# Bare value with no recognised scheme — treat the whole header as the key.
|
||||
api_key = authorization.strip()
|
||||
if not api_key and x_api_key:
|
||||
api_key = x_api_key
|
||||
|
||||
if not api_key:
|
||||
logger.warning("No API key provided in Authorization or X-API-Key headers")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Missing API key. Provide 'Authorization: Token <key>' (mem0 SDK), 'Bearer <key>', or 'X-API-Key'.",
|
||||
)
|
||||
|
||||
return auth_service.verify_api_key(api_key)
|
||||
|
||||
|
||||
async def verify_user_access(
|
||||
api_key: str = Security(api_key_header), user_id: Optional[str] = None
|
||||
) -> str:
|
||||
|
|
|
|||
|
|
@ -44,6 +44,20 @@ class Settings(BaseSettings):
|
|||
),
|
||||
)
|
||||
|
||||
# Neo4j Configuration
|
||||
neo4j_uri: str = Field(
|
||||
default="bolt://localhost:7687",
|
||||
validation_alias=AliasChoices("NEO4J_URI", "neo4j_uri"),
|
||||
)
|
||||
neo4j_username: str = Field(
|
||||
default="neo4j",
|
||||
validation_alias=AliasChoices("NEO4J_USERNAME", "neo4j_username"),
|
||||
)
|
||||
neo4j_password: str = Field(
|
||||
default="mem0_neo4j_password",
|
||||
validation_alias=AliasChoices("NEO4J_PASSWORD", "neo4j_password"),
|
||||
)
|
||||
|
||||
# Application Configuration
|
||||
log_level: str = Field(
|
||||
default="INFO", validation_alias=AliasChoices("LOG_LEVEL", "log_level")
|
||||
|
|
|
|||
184
backend/main.py
184
backend/main.py
|
|
@ -11,6 +11,7 @@ from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends, Security,
|
|||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
import structlog
|
||||
import asyncio
|
||||
from slowapi import Limiter, _rate_limit_exceeded_handler
|
||||
from slowapi.util import get_remote_address
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
|
|
@ -28,8 +29,6 @@ def get_rate_limit_key(request: Request) -> str:
|
|||
|
||||
|
||||
limiter = Limiter(key_func=get_rate_limit_key)
|
||||
import httpx
|
||||
|
||||
from models import (
|
||||
ChatRequest,
|
||||
MemoryAddRequest,
|
||||
|
|
@ -44,10 +43,13 @@ from models import (
|
|||
GlobalStatsResponse,
|
||||
UserStatsResponse,
|
||||
OpenAIChatCompletionRequest,
|
||||
OpenAIChatCompletionResponse,
|
||||
OpenAIChoice,
|
||||
OpenAIChoiceMessage,
|
||||
OpenAIUsage,
|
||||
)
|
||||
from mem0_manager import mem0_manager
|
||||
from auth import get_current_user, get_current_user_openai, auth_service
|
||||
from platform_compat import router as platform_router
|
||||
|
||||
# Configure structured logging
|
||||
structlog.configure(
|
||||
|
|
@ -107,20 +109,14 @@ async def lifespan(app: FastAPI):
|
|||
except Exception as e:
|
||||
logger.error(f"Error stopping MCP session manager: {e}")
|
||||
|
||||
# Close the async HTTP client used by the /v1/chat/completions proxy.
|
||||
try:
|
||||
await mem0_manager.aclose()
|
||||
except Exception as e:
|
||||
logger.warning(f"mem0_manager aclose failed: {e}")
|
||||
|
||||
logger.info("Shutting down Mem0 Interface POC")
|
||||
|
||||
|
||||
# Initialize FastAPI app
|
||||
app = FastAPI(
|
||||
title="Mem0 Interface POC",
|
||||
description="Minimal Mem0 interface backed by Qdrant (mem0ai v2 hybrid-search pipeline)",
|
||||
version="2.0.0",
|
||||
description="Minimal but fully functional Mem0 interface with PostgreSQL and Neo4j integration",
|
||||
version="1.0.0",
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
|
|
@ -340,59 +336,135 @@ async def chat_with_memory(
|
|||
)
|
||||
|
||||
|
||||
@app.post("/v1/chat/completions", response_model=None)
|
||||
@app.post("/chat/completions", response_model=None)
|
||||
async def stream_openai_response(
|
||||
completion_id: str, model: str, content: str, created: int
|
||||
):
|
||||
"""Generate SSE stream for OpenAI-compatible streaming by chunking the response."""
|
||||
import uuid
|
||||
|
||||
# First chunk with role
|
||||
chunk = {
|
||||
"id": completion_id,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created,
|
||||
"model": model,
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {"role": "assistant", "content": ""},
|
||||
"finish_reason": None,
|
||||
}
|
||||
],
|
||||
}
|
||||
yield f"data: {json.dumps(chunk)}\n\n"
|
||||
|
||||
# Stream content in chunks (3 words at a time for smooth effect)
|
||||
words = content.split()
|
||||
chunk_size = 3
|
||||
|
||||
for i in range(0, len(words), chunk_size):
|
||||
word_chunk = " ".join(words[i : i + chunk_size])
|
||||
if i + chunk_size < len(words):
|
||||
word_chunk += " "
|
||||
|
||||
chunk = {
|
||||
"id": completion_id,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created,
|
||||
"model": model,
|
||||
"choices": [
|
||||
{"index": 0, "delta": {"content": word_chunk}, "finish_reason": None}
|
||||
],
|
||||
}
|
||||
yield f"data: {json.dumps(chunk)}\n\n"
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
# Final chunk
|
||||
chunk = {
|
||||
"id": completion_id,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created,
|
||||
"model": model,
|
||||
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
|
||||
}
|
||||
yield f"data: {json.dumps(chunk)}\n\n"
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
|
||||
@app.post("/v1/chat/completions")
|
||||
@app.post("/chat/completions")
|
||||
@limiter.limit("30/minute")
|
||||
async def openai_chat_completions(
|
||||
request: Request,
|
||||
completion_request: OpenAIChatCompletionRequest,
|
||||
authenticated_user: str = Depends(get_current_user_openai),
|
||||
):
|
||||
"""OpenAI-compatible chat completions — pass-through proxy with memory injection.
|
||||
|
||||
Forwards the request to the upstream LLM verbatim (preserving tool_calls,
|
||||
reasoning_tokens, system_fingerprint, finish_reason, etc.) and optionally
|
||||
prepends a system message with relevant memories when the last role is
|
||||
`user` and no tool flow is in progress. See
|
||||
`mem0_manager.openai_proxy_completion` for the full injection rule and
|
||||
tool-call safety contract.
|
||||
"""
|
||||
"""OpenAI-compatible chat completions endpoint with mem0 memory integration."""
|
||||
try:
|
||||
request_kwargs = completion_request.model_dump(exclude_unset=True)
|
||||
# `user` is the OpenAI client-supplied identifier; we always derive the
|
||||
# real user from the API key. Strip it before forwarding so it can't
|
||||
# confuse upstream user-tracking.
|
||||
request_kwargs.pop("user", None)
|
||||
import uuid
|
||||
|
||||
user_id = authenticated_user
|
||||
logger.info(
|
||||
"openai_chat_completions",
|
||||
user_id=authenticated_user,
|
||||
stream=bool(request_kwargs.get("stream")),
|
||||
has_tools=bool(request_kwargs.get("tools")),
|
||||
model=request_kwargs.get("model"),
|
||||
f"OpenAI chat completion for user: {user_id} (streaming={completion_request.stream})"
|
||||
)
|
||||
|
||||
result = await mem0_manager.openai_proxy_completion(
|
||||
request_kwargs=request_kwargs,
|
||||
user_id=authenticated_user,
|
||||
# Extract last user message
|
||||
user_messages = [
|
||||
m for m in completion_request.messages if m.get("role") == "user"
|
||||
]
|
||||
if not user_messages:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="No user messages provided. Include at least one message with role='user'.",
|
||||
)
|
||||
|
||||
last_message = user_messages[-1].get("content", "")
|
||||
context = (
|
||||
completion_request.messages[:-1]
|
||||
if len(completion_request.messages) > 1
|
||||
else None
|
||||
)
|
||||
|
||||
if request_kwargs.get("stream"):
|
||||
# Call chat_with_memory
|
||||
result = await mem0_manager.chat_with_memory(
|
||||
message=last_message,
|
||||
user_id=user_id,
|
||||
context=context,
|
||||
)
|
||||
|
||||
completion_id = f"chatcmpl-{uuid.uuid4().hex[:24]}"
|
||||
created_time = int(time.time())
|
||||
assistant_content = result.get("response", "")
|
||||
|
||||
if completion_request.stream:
|
||||
return StreamingResponse(
|
||||
result,
|
||||
stream_openai_response(
|
||||
completion_id=completion_id,
|
||||
model=settings.default_model,
|
||||
content=assistant_content,
|
||||
created=created_time,
|
||||
),
|
||||
media_type="text/event-stream",
|
||||
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
|
||||
)
|
||||
return result
|
||||
except httpx.HTTPStatusError as e:
|
||||
# Forward upstream error body to the client with its original status
|
||||
try:
|
||||
detail = e.response.json()
|
||||
except Exception:
|
||||
detail = {"error": {"message": e.response.text[:500]}}
|
||||
raise HTTPException(status_code=e.response.status_code, detail=detail)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
else:
|
||||
return OpenAIChatCompletionResponse(
|
||||
id=completion_id,
|
||||
object="chat.completion",
|
||||
created=created_time,
|
||||
model=settings.default_model,
|
||||
choices=[
|
||||
OpenAIChoice(
|
||||
index=0,
|
||||
message=OpenAIChoiceMessage(
|
||||
role="assistant", content=assistant_content
|
||||
),
|
||||
finish_reason="stop",
|
||||
)
|
||||
],
|
||||
usage=OpenAIUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0),
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
|
|
@ -463,7 +535,7 @@ async def search_memories(
|
|||
query=search_request.query,
|
||||
user_id=search_request.user_id,
|
||||
limit=search_request.limit,
|
||||
threshold=search_request.threshold or 0.1,
|
||||
threshold=search_request.threshold or 0.2,
|
||||
filters=search_request.filters,
|
||||
agent_id=search_request.agent_id,
|
||||
run_id=search_request.run_id,
|
||||
|
|
@ -480,6 +552,7 @@ async def search_memories(
|
|||
detail="An internal error occurred. Please try again later.",
|
||||
)
|
||||
|
||||
|
||||
@app.get("/memories/{user_id}")
|
||||
@limiter.limit("120/minute")
|
||||
async def get_user_memories(
|
||||
|
|
@ -562,6 +635,7 @@ async def update_memory(
|
|||
detail="An internal error occurred. Please try again later.",
|
||||
)
|
||||
|
||||
|
||||
@app.delete("/memories/{memory_id}")
|
||||
@limiter.limit("60/minute")
|
||||
async def delete_memory(
|
||||
|
|
@ -626,15 +700,13 @@ async def delete_user_memories(
|
|||
)
|
||||
|
||||
|
||||
# Graph relationships endpoint - DEPRECATED in mem0 v2 (OSS graph memory removed).
|
||||
# Returns an empty payload with `deprecated: true` so the frontend can render a
|
||||
# clear "Graph view unavailable" state. Kept for client compatibility.
|
||||
@app.get("/graph/relationships/{user_id}", deprecated=True)
|
||||
# Graph relationships endpoint - pure Mem0 passthrough
|
||||
@app.get("/graph/relationships/{user_id}")
|
||||
@limiter.limit("60/minute")
|
||||
async def get_graph_relationships(
|
||||
request: Request, user_id: str, authenticated_user: str = Depends(get_current_user)
|
||||
):
|
||||
"""Get graph relationships - DEPRECATED: mem0 v2 removed OSS graph memory."""
|
||||
"""Get graph relationships - pure Mem0 passthrough."""
|
||||
try:
|
||||
# Verify user can only access their own graph relationships
|
||||
if authenticated_user != user_id:
|
||||
|
|
@ -831,12 +903,6 @@ async def get_active_users(
|
|||
}
|
||||
|
||||
|
||||
|
||||
# mem0 platform SDK (MemoryClient) compatibility routes:
|
||||
# /v1/ping/, /v3/memories/{add,search}/, /v3/memories/, /v1/memories/*, /v1/entities/
|
||||
app.include_router(platform_router)
|
||||
|
||||
|
||||
# Mount MCP server at /mcp endpoint
|
||||
try:
|
||||
from mcp_server import create_mcp_app
|
||||
|
|
|
|||
|
|
@ -151,8 +151,15 @@ async def remove_memory(
|
|||
user_id = get_authenticated_user()
|
||||
logger.info(f"MCP remove_memory: user={user_id}, memory_id={memory_id}")
|
||||
|
||||
# O(1) ownership check via Memory.get(); avoids get_all's v2 top_k=20 cap.
|
||||
if not await mem0_manager.verify_memory_ownership(memory_id, user_id):
|
||||
# Verify ownership: get user's memories and check if memory_id exists
|
||||
user_memories = await mem0_manager.get_user_memories(
|
||||
user_id=user_id,
|
||||
limit=10000 # Get all to check ownership
|
||||
)
|
||||
|
||||
memory_ids = {m.get("id") for m in user_memories if m.get("id")}
|
||||
|
||||
if memory_id not in memory_ids:
|
||||
raise ValueError(f"Memory '{memory_id}' not found or access denied")
|
||||
|
||||
result = await mem0_manager.delete_memory(memory_id=memory_id)
|
||||
|
|
|
|||
|
|
@ -1,12 +1,8 @@
|
|||
"""Ultra-minimal Mem0 Manager - Pure Mem0 + Custom OpenAI Endpoint Only."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Any, AsyncIterator, Union
|
||||
from typing import Dict, List, Optional, Any
|
||||
from datetime import datetime
|
||||
|
||||
import httpx
|
||||
from mem0 import Memory
|
||||
from openai import OpenAI
|
||||
from tenacity import (
|
||||
|
|
@ -23,7 +19,7 @@ from monitoring import timed
|
|||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# Retry decorator for database operations (Qdrant)
|
||||
# Retry decorator for database operations (Qdrant, Neo4j)
|
||||
db_retry = retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=1, max=10),
|
||||
|
|
@ -32,11 +28,7 @@ db_retry = retry(
|
|||
reraise=True,
|
||||
)
|
||||
|
||||
# Monkey-patch Mem0's OpenAI LLM to clear top_p when the configured LLM
|
||||
# is Claude reached via an OpenAI-compatible endpoint: Claude rejects top_p
|
||||
# whenever temperature is set, and OpenAILLM sends both unconditionally.
|
||||
# (The 'store' branch is now redundant in mem0ai>=2.0.0 — upstream made it
|
||||
# opt-in — but harmless; kept for safety.)
|
||||
# Monkey-patch Mem0's OpenAI LLM to remove the 'store' parameter for LiteLLM compatibility
|
||||
from mem0.llms.openai import OpenAILLM
|
||||
|
||||
_original_generate_response = OpenAILLM.generate_response
|
||||
|
|
@ -45,8 +37,10 @@ _original_generate_response = OpenAILLM.generate_response
|
|||
def patched_generate_response(
|
||||
self, messages, response_format=None, tools=None, tool_choice="auto", **kwargs
|
||||
):
|
||||
# Remove 'store' parameter as LiteLLM doesn't support it
|
||||
if hasattr(self.config, "store"):
|
||||
self.config.store = None
|
||||
# Remove 'top_p' to avoid conflict with temperature for Claude models
|
||||
if hasattr(self.config, "top_p"):
|
||||
self.config.top_p = None
|
||||
return _original_generate_response(
|
||||
|
|
@ -55,77 +49,7 @@ def patched_generate_response(
|
|||
|
||||
|
||||
OpenAILLM.generate_response = patched_generate_response
|
||||
logger.info("Applied Claude/OpenAI-compatible patch: cleared top_p (and store)")
|
||||
|
||||
|
||||
def _build_filters(
|
||||
user_id: Optional[str],
|
||||
agent_id: Optional[str] = None,
|
||||
run_id: Optional[str] = None,
|
||||
extra: Optional[Dict[str, Any]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Build the filters dict required by mem0 v2 search/get_all.
|
||||
|
||||
In mem0 v2.x, user_id/agent_id/run_id are rejected as top-level kwargs
|
||||
on Memory.search and Memory.get_all — they must live inside `filters`.
|
||||
"""
|
||||
merged: Dict[str, Any] = dict(extra) if extra else {}
|
||||
if user_id is not None:
|
||||
merged["user_id"] = user_id
|
||||
if agent_id is not None:
|
||||
merged["agent_id"] = agent_id
|
||||
if run_id is not None:
|
||||
merged["run_id"] = run_id
|
||||
return merged
|
||||
|
||||
|
||||
def _extract_text(content: Any) -> str:
|
||||
"""Extract text from an OpenAI message `content` field.
|
||||
|
||||
`content` can be a plain string OR a list of multi-part objects
|
||||
(e.g., [{"type": "text", "text": "..."}, {"type": "image_url", ...}]).
|
||||
Returns concatenated text parts, or "" if no text is present.
|
||||
"""
|
||||
if content is None:
|
||||
return ""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
parts = []
|
||||
for p in content:
|
||||
if isinstance(p, dict) and p.get("type") == "text":
|
||||
t = p.get("text")
|
||||
if isinstance(t, str):
|
||||
parts.append(t)
|
||||
return "\n".join(parts)
|
||||
return ""
|
||||
|
||||
|
||||
# Appended as the "## Custom Instructions" section of the additive-extraction
|
||||
# prompt (mem0/configs/prompts.py::generate_additive_extraction_prompt). The
|
||||
# default few-shot bias is consumer-organizer ("favourite movies", "SF restaurants"),
|
||||
# which under-extracts on the work/project/relationship traffic this deployment
|
||||
# actually sees. This re-prioritizes without replacing mem0's structural guidance.
|
||||
CUSTOM_FACT_EXTRACTION_INSTRUCTIONS = """
|
||||
This memory store serves a working assistant — engineering, product, and operational contexts plus the user's people and recurring life context. Prioritize accordingly:
|
||||
|
||||
HIGH-VALUE facts to capture:
|
||||
- Work context: company, team, role; ongoing projects with goals/status/blockers; product or domain knowledge being built; tools/frameworks/languages in active use; technical decisions and the reasoning; recurring meetings or rituals.
|
||||
- People in the user's orbit: colleagues, family, friends, mentors — names, relationships, roles, what they do, the current state of the relationship or shared context.
|
||||
- Recurring personal context: home/work locations, regular schedule, standing commitments, durable preferences (food restrictions, working hours, communication style), planned events with dates.
|
||||
- Acquired knowledge: concepts being studied or built, specific problems being solved, prior solutions tried and their outcomes.
|
||||
|
||||
LOWER-PRIORITY (extract only if they reveal a pattern or future relevance):
|
||||
- Single transient states ("running 5 minutes late", "didn't sleep well") — capture only if they recur or signal a habit.
|
||||
- Movies, music, restaurants, hobbies — only when noted as durable preferences or part of a recurring activity, not when mentioned in passing.
|
||||
|
||||
SKIP entirely:
|
||||
- Generic world knowledge (timezones, capital cities, definitions) — the assistant already knows these.
|
||||
- Greetings, acknowledgments, meta-conversation ("Thanks!", "Got it").
|
||||
- Restatements or paraphrases of facts already in Existing Memories or Recently Extracted Memories.
|
||||
|
||||
Prefer specificity. "Pratik uses FastAPI for backend services" beats "Pratik does backend development." When a person is mentioned by a short name or nickname, capture the relationship if known ("Anushree is Pratik's wife") so future references resolve correctly.
|
||||
""".strip()
|
||||
logger.info("Applied LiteLLM compatibility patch: disabled 'store' parameter")
|
||||
|
||||
|
||||
class Mem0Manager:
|
||||
|
|
@ -135,16 +59,18 @@ class Mem0Manager:
|
|||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Custom endpoint configuration with graph memory enabled
|
||||
logger.info(
|
||||
"Initializing Mem0Manager with custom endpoint",
|
||||
model=settings.default_model,
|
||||
embedding_model=settings.embedding_model,
|
||||
embedding_dims=settings.embedding_dims,
|
||||
qdrant_host=settings.qdrant_host,
|
||||
neo4j_uri=settings.neo4j_uri,
|
||||
)
|
||||
config = {
|
||||
"version": "v1.1",
|
||||
"custom_instructions": CUSTOM_FACT_EXTRACTION_INSTRUCTIONS,
|
||||
"enable_graph": True,
|
||||
"llm": {
|
||||
"provider": "openai",
|
||||
"config": {
|
||||
|
|
@ -156,16 +82,10 @@ class Mem0Manager:
|
|||
},
|
||||
},
|
||||
"embedder": {
|
||||
# Route embeddings through the OpenAI-compatible LiteLLM proxy
|
||||
# rather than Ollama directly — the proxy is reachable from the
|
||||
# container in all deployments, Ollama may not be. The model
|
||||
# name is the same (qwen3-embedding:4b-q8_0); existing vectors
|
||||
# generated via this path stay compatible.
|
||||
"provider": "openai",
|
||||
"provider": "ollama",
|
||||
"config": {
|
||||
"model": settings.embedding_model,
|
||||
"api_key": settings.openai_api_key,
|
||||
"openai_base_url": settings.openai_base_url,
|
||||
"ollama_base_url": settings.ollama_base_url,
|
||||
"embedding_dims": settings.embedding_dims,
|
||||
},
|
||||
},
|
||||
|
|
@ -179,18 +99,20 @@ class Mem0Manager:
|
|||
"on_disk": True,
|
||||
},
|
||||
},
|
||||
"graph_store": {
|
||||
"provider": "neo4j",
|
||||
"config": {
|
||||
"url": settings.neo4j_uri,
|
||||
"username": settings.neo4j_username,
|
||||
"password": settings.neo4j_password,
|
||||
},
|
||||
},
|
||||
"reranker": {
|
||||
"provider": "cohere",
|
||||
"config": {
|
||||
"api_key": settings.cohere_api_key,
|
||||
# v3.5 supersedes v3.0: 4096-token context, multilingual
|
||||
# (our users include Hindi/Hinglish content that the
|
||||
# English-only v3 silently underperforms on).
|
||||
"model": "rerank-v3.5",
|
||||
# Raised from 10 → 50 so the rerank output cap does not
|
||||
# truncate below typical over-fetch sizes (see search calls
|
||||
# below, which request top_k up to ~3× the user's limit).
|
||||
"top_n": 50,
|
||||
"model": "rerank-english-v3.0",
|
||||
"top_n": 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -202,23 +124,8 @@ class Mem0Manager:
|
|||
timeout=60.0, # 60 second timeout for LLM calls
|
||||
max_retries=2, # Retry failed requests up to 2 times
|
||||
)
|
||||
# Async HTTP client used by openai_proxy_completion to forward
|
||||
# /v1/chat/completions to the upstream endpoint verbatim (preserves
|
||||
# tool_calls, reasoning_tokens, system_fingerprint, etc., which the
|
||||
# openai-python SDK can drop). Closed in the FastAPI lifespan
|
||||
# shutdown via aclose().
|
||||
self.async_http = httpx.AsyncClient(
|
||||
timeout=httpx.Timeout(120.0, connect=10.0)
|
||||
)
|
||||
logger.info("Initialized ultra-minimal Mem0Manager with custom endpoint")
|
||||
|
||||
async def aclose(self) -> None:
|
||||
"""Release async resources. Called from the FastAPI lifespan shutdown."""
|
||||
try:
|
||||
await self.async_http.aclose()
|
||||
except Exception as e:
|
||||
logger.warning("async_http aclose failed", error=str(e))
|
||||
|
||||
# Pure passthrough methods - no custom logic
|
||||
@db_retry
|
||||
@timed("add_memories")
|
||||
|
|
@ -301,21 +208,19 @@ class Mem0Manager:
|
|||
"query": query,
|
||||
"note": "Empty query provided, no results returned. Use a specific query to search memories.",
|
||||
}
|
||||
# mem0 v2: entity IDs must live inside the `filters` dict; `limit` is now `top_k`.
|
||||
# Over-fetch a 30–50-candidate pool so the Cohere reranker (rerank=True)
|
||||
# has room to reorder; then truncate to the caller's requested limit.
|
||||
overfetch = max(limit * 3, 30)
|
||||
# Direct Mem0 search - trust native handling
|
||||
result = self.memory.search(
|
||||
query=query,
|
||||
filters=_build_filters(user_id, agent_id, run_id, extra=filters),
|
||||
top_k=overfetch,
|
||||
user_id=user_id,
|
||||
agent_id=agent_id,
|
||||
run_id=run_id,
|
||||
limit=limit,
|
||||
threshold=threshold,
|
||||
rerank=True,
|
||||
filters=filters,
|
||||
)
|
||||
memories = result.get("results", [])[:limit]
|
||||
return {
|
||||
"memories": memories,
|
||||
"total_count": len(memories),
|
||||
"memories": result.get("results", []),
|
||||
"total_count": len(result.get("results", [])),
|
||||
"query": query,
|
||||
}
|
||||
except Exception as e:
|
||||
|
|
@ -333,10 +238,13 @@ class Mem0Manager:
|
|||
) -> List[Dict[str, Any]]:
|
||||
"""Get all memories for a user - native Mem0 pattern."""
|
||||
try:
|
||||
# mem0 v2: entity IDs must live inside the `filters` dict; `limit` is now `top_k`.
|
||||
# Direct Mem0 get_all call - trust native parameter handling
|
||||
result = self.memory.get_all(
|
||||
filters=_build_filters(user_id, agent_id, run_id, extra=filters),
|
||||
top_k=limit,
|
||||
user_id=user_id,
|
||||
limit=limit,
|
||||
agent_id=agent_id,
|
||||
run_id=run_id,
|
||||
filters=filters,
|
||||
)
|
||||
return result.get("results", [])
|
||||
except Exception as e:
|
||||
|
|
@ -415,28 +323,54 @@ class Mem0Manager:
|
|||
run_id: Optional[str],
|
||||
limit: int = 50,
|
||||
) -> Dict[str, Any]:
|
||||
"""Graph relationships — deprecated in mem0 v2 (OSS graph memory removed).
|
||||
"""Get graph relationships - using correct Mem0 get_all() method."""
|
||||
try:
|
||||
# Use get_all() to retrieve memories with graph relationships
|
||||
result = self.memory.get_all(
|
||||
user_id=user_id, agent_id=agent_id, run_id=run_id, limit=limit
|
||||
)
|
||||
|
||||
mem0 v2.0.0 deleted the OSS graph store (Neo4j/Memgraph/Kuzu/AGE drivers).
|
||||
Entity relationships now influence ranking via a parallel `{collection}_entities`
|
||||
Qdrant collection rather than being directly traversable. We return an empty
|
||||
graph payload plus a `deprecated` marker so clients (frontend graph.html) can
|
||||
render a clear "Graph view unavailable" state instead of erroring.
|
||||
"""
|
||||
return {
|
||||
"relationships": [],
|
||||
"entities": [],
|
||||
"user_id": user_id,
|
||||
"agent_id": agent_id,
|
||||
"run_id": run_id,
|
||||
"total_memories": 0,
|
||||
"total_relationships": 0,
|
||||
"deprecated": True,
|
||||
"deprecation_note": (
|
||||
"OSS graph memory was removed in mem0 v2.0.0. Use search/get_all for "
|
||||
"memory retrieval; entity links now affect ranking only."
|
||||
),
|
||||
}
|
||||
# Extract relationships from Mem0's response structure
|
||||
relationships = result.get("relations", [])
|
||||
|
||||
# For entities, we can derive them from memory results or relations
|
||||
entities = []
|
||||
if "results" in result:
|
||||
# Extract unique entities from memories and relationships
|
||||
entity_set = set()
|
||||
|
||||
# Add entities from relationships
|
||||
for rel in relationships:
|
||||
if "source" in rel:
|
||||
entity_set.add(rel["source"])
|
||||
if "target" in rel:
|
||||
entity_set.add(rel["target"])
|
||||
|
||||
entities = [{"name": entity} for entity in entity_set]
|
||||
|
||||
return {
|
||||
"relationships": relationships,
|
||||
"entities": entities,
|
||||
"user_id": user_id,
|
||||
"agent_id": agent_id,
|
||||
"run_id": run_id,
|
||||
"total_memories": len(result.get("results", [])),
|
||||
"total_relationships": len(relationships),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting graph relationships: {e}")
|
||||
# Return empty but structured response on error
|
||||
return {
|
||||
"relationships": [],
|
||||
"entities": [],
|
||||
"user_id": user_id,
|
||||
"agent_id": agent_id,
|
||||
"run_id": run_id,
|
||||
"total_memories": 0,
|
||||
"total_relationships": 0,
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
@timed("chat_with_memory")
|
||||
async def chat_with_memory(
|
||||
|
|
@ -456,16 +390,15 @@ class Mem0Manager:
|
|||
logger.info("Starting chat request", user_id=user_id)
|
||||
|
||||
search_start_time = time.time()
|
||||
# Over-fetch for the Cohere reranker (rerank=True), then keep the
|
||||
# top 10 reranked memories for the system prompt.
|
||||
search_result = self.memory.search(
|
||||
query=message,
|
||||
filters=_build_filters(user_id, agent_id, run_id),
|
||||
top_k=30,
|
||||
user_id=user_id,
|
||||
agent_id=agent_id,
|
||||
run_id=run_id,
|
||||
limit=10,
|
||||
threshold=0.3,
|
||||
rerank=True,
|
||||
)
|
||||
relevant_memories = search_result.get("results", [])[:10]
|
||||
relevant_memories = search_result.get("results", [])
|
||||
memories_str = "\n".join(
|
||||
f"- {entry['memory']}" for entry in relevant_memories
|
||||
)
|
||||
|
|
@ -491,11 +424,7 @@ class Mem0Manager:
|
|||
response = self.openai_client.chat.completions.create(
|
||||
model=settings.default_model, messages=messages
|
||||
)
|
||||
# Strip leading whitespace — reasoning models (minimax-m2) leak
|
||||
# blank lines from their reasoning output into visible content.
|
||||
# lstrip(), not strip(), preserves intentional trailing whitespace
|
||||
# inside content (e.g., inside a code block).
|
||||
assistant_response = (response.choices[0].message.content or "").lstrip()
|
||||
assistant_response = response.choices[0].message.content
|
||||
llm_time = time.time() - llm_start_time
|
||||
logger.debug(
|
||||
"LLM call completed",
|
||||
|
|
@ -549,261 +478,6 @@ class Mem0Manager:
|
|||
"model_used": None,
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# OpenAI-compatible /v1/chat/completions proxy
|
||||
# ------------------------------------------------------------------
|
||||
# Design: forward client requests to the upstream endpoint verbatim,
|
||||
# injecting a "Relevant memories" system message ONLY when the last
|
||||
# message is from the user AND no tool flow is in progress. The
|
||||
# opinionated chat_with_memory above is NOT used on this path because:
|
||||
# (a) clients pick their own model, tools, response_format etc.,
|
||||
# (b) we must preserve every upstream field (tool_calls, reasoning_tokens,
|
||||
# system_fingerprint, finish_reason), which the openai-python SDK and
|
||||
# typed Pydantic responses strip silently,
|
||||
# (c) streaming must be real (SSE pass-through), not post-hoc word-chunking.
|
||||
#
|
||||
# Tool-call safety contract (Memori issue #434 — codified here):
|
||||
# - We only PREPEND a system message; never mutate, reorder, or delete
|
||||
# existing messages.
|
||||
# - We never touch tool messages or tool_call_id values.
|
||||
# - We skip injection on tool-result follow-ups (last role != "user").
|
||||
# - We always run the post-stream mem0.add even if upstream errors
|
||||
# mid-flight (via try/finally).
|
||||
async def openai_proxy_completion(
|
||||
self,
|
||||
request_kwargs: Dict[str, Any],
|
||||
user_id: str,
|
||||
) -> Union[Dict[str, Any], AsyncIterator[bytes]]:
|
||||
"""Proxy a chat-completions request to the upstream LLM.
|
||||
|
||||
Returns:
|
||||
- dict (upstream JSON) when stream is falsy
|
||||
- async iterator of SSE bytes when stream is truthy
|
||||
"""
|
||||
messages = request_kwargs.get("messages") or []
|
||||
if not messages:
|
||||
raise ValueError("messages array is empty")
|
||||
|
||||
# Injection rule: last role == 'user' AND no prior tool message.
|
||||
last_msg = messages[-1] if isinstance(messages[-1], dict) else {}
|
||||
last_role = last_msg.get("role")
|
||||
has_tool_message = any(
|
||||
isinstance(m, dict) and m.get("role") == "tool" for m in messages
|
||||
)
|
||||
inject_memory = last_role == "user" and not has_tool_message
|
||||
|
||||
if inject_memory:
|
||||
last_user_text = _extract_text(last_msg.get("content"))
|
||||
if last_user_text:
|
||||
try:
|
||||
search_result = self.memory.search(
|
||||
query=last_user_text,
|
||||
filters=_build_filters(user_id),
|
||||
top_k=30,
|
||||
threshold=0.3,
|
||||
rerank=True,
|
||||
)
|
||||
relevant = (search_result.get("results") or [])[:10]
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"memory search failed; proceeding without injection",
|
||||
error=str(e),
|
||||
user_id=user_id,
|
||||
)
|
||||
relevant = []
|
||||
if relevant:
|
||||
memories_str = "\n".join(
|
||||
f"- {m.get('memory', '')}" for m in relevant
|
||||
)
|
||||
mem_block = (
|
||||
"Relevant memories about the user "
|
||||
"(use when helpful, ignore otherwise):\n" + memories_str
|
||||
)
|
||||
# Merge into existing leading system message if present;
|
||||
# otherwise prepend a new one. Either way we never mutate
|
||||
# any other message in the list.
|
||||
if (
|
||||
messages
|
||||
and isinstance(messages[0], dict)
|
||||
and messages[0].get("role") == "system"
|
||||
):
|
||||
existing = _extract_text(messages[0].get("content")) or ""
|
||||
merged = (existing + "\n\n" + mem_block) if existing else mem_block
|
||||
messages = [
|
||||
{"role": "system", "content": merged},
|
||||
*messages[1:],
|
||||
]
|
||||
else:
|
||||
messages = [
|
||||
{"role": "system", "content": mem_block},
|
||||
*messages,
|
||||
]
|
||||
request_kwargs["messages"] = messages
|
||||
logger.info(
|
||||
"memory injected",
|
||||
user_id=user_id,
|
||||
memory_count=len(relevant),
|
||||
)
|
||||
|
||||
url = settings.openai_base_url.rstrip("/") + "/v1/chat/completions"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {settings.openai_api_key}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
is_stream = bool(request_kwargs.get("stream"))
|
||||
|
||||
if not is_stream:
|
||||
resp = await self.async_http.post(
|
||||
url, json=request_kwargs, headers=headers
|
||||
)
|
||||
if resp.status_code >= 400:
|
||||
# Surface upstream error body to the client unchanged
|
||||
try:
|
||||
err_body = resp.json()
|
||||
except Exception:
|
||||
err_body = {"error": {"message": resp.text[:500]}}
|
||||
raise httpx.HTTPStatusError(
|
||||
f"Upstream {resp.status_code}", request=resp.request, response=resp
|
||||
)
|
||||
data = resp.json()
|
||||
# Fire-and-forget mem0.add in the background. Skips tool-only
|
||||
# responses inside _post_completion_add.
|
||||
asyncio.create_task(self._post_completion_add(messages, data, user_id))
|
||||
return data
|
||||
|
||||
return self._stream_proxy(url, request_kwargs, headers, messages, user_id)
|
||||
|
||||
async def _stream_proxy(
|
||||
self,
|
||||
url: str,
|
||||
request_kwargs: Dict[str, Any],
|
||||
headers: Dict[str, str],
|
||||
messages: List[Dict[str, Any]],
|
||||
user_id: str,
|
||||
) -> AsyncIterator[bytes]:
|
||||
"""Stream upstream SSE bytes verbatim; accumulate content for post-stream add."""
|
||||
accumulated_content: List[str] = []
|
||||
saw_tool_calls = False
|
||||
buffer = b""
|
||||
try:
|
||||
async with self.async_http.stream(
|
||||
"POST", url, json=request_kwargs, headers=headers
|
||||
) as upstream:
|
||||
if upstream.status_code >= 400:
|
||||
body_bytes = await upstream.aread()
|
||||
err = {
|
||||
"error": {
|
||||
"message": (
|
||||
f"Upstream {upstream.status_code}: "
|
||||
f"{body_bytes.decode('utf-8', errors='replace')[:500]}"
|
||||
)
|
||||
}
|
||||
}
|
||||
yield f"data: {json.dumps(err)}\n\n".encode("utf-8")
|
||||
yield b"data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
async for chunk in upstream.aiter_bytes():
|
||||
if not chunk:
|
||||
continue
|
||||
# Forward bytes verbatim — preserves the exact SSE wire format
|
||||
yield chunk
|
||||
# Side-channel parse for content/tool_calls accumulation.
|
||||
# Events are \n\n-separated; data lines start with "data: ".
|
||||
buffer += chunk
|
||||
while b"\n\n" in buffer:
|
||||
event_bytes, buffer = buffer.split(b"\n\n", 1)
|
||||
for raw_line in event_bytes.split(b"\n"):
|
||||
if not raw_line.startswith(b"data: "):
|
||||
continue
|
||||
payload = raw_line[6:].strip()
|
||||
if not payload or payload == b"[DONE]":
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(payload)
|
||||
delta = (
|
||||
(obj.get("choices") or [{}])[0].get("delta") or {}
|
||||
)
|
||||
content_piece = delta.get("content")
|
||||
if isinstance(content_piece, str):
|
||||
accumulated_content.append(content_piece)
|
||||
if delta.get("tool_calls"):
|
||||
saw_tool_calls = True
|
||||
except (json.JSONDecodeError, KeyError, IndexError, TypeError):
|
||||
pass
|
||||
except httpx.HTTPError as e:
|
||||
logger.warning("upstream stream error", error=str(e), user_id=user_id)
|
||||
err = {"error": {"message": f"Upstream stream error: {e}"}}
|
||||
yield f"data: {json.dumps(err)}\n\n".encode("utf-8")
|
||||
yield b"data: [DONE]\n\n"
|
||||
finally:
|
||||
# Per Memori #434: post-stream mem0.add must run even on mid-stream
|
||||
# error so partial content is captured. Skipped for tool-only or
|
||||
# empty responses.
|
||||
full = "".join(accumulated_content).lstrip()
|
||||
if full and not saw_tool_calls:
|
||||
last_user_text = (
|
||||
_extract_text(messages[-1].get("content"))
|
||||
if messages
|
||||
and isinstance(messages[-1], dict)
|
||||
and messages[-1].get("role") == "user"
|
||||
else None
|
||||
)
|
||||
if last_user_text:
|
||||
try:
|
||||
# Synchronous mem0.add in a thread to avoid blocking
|
||||
# the response loop after StreamingResponse closes.
|
||||
await asyncio.to_thread(
|
||||
self.memory.add,
|
||||
[
|
||||
{"role": "user", "content": last_user_text},
|
||||
{"role": "assistant", "content": full},
|
||||
],
|
||||
user_id=user_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"post-stream mem0.add failed",
|
||||
error=str(e),
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
async def _post_completion_add(
|
||||
self,
|
||||
messages: List[Dict[str, Any]],
|
||||
response_data: Dict[str, Any],
|
||||
user_id: str,
|
||||
) -> None:
|
||||
"""Background mem0.add after a non-stream completion."""
|
||||
try:
|
||||
choice = (response_data.get("choices") or [{}])[0]
|
||||
msg = choice.get("message") or {}
|
||||
content = msg.get("content")
|
||||
if not content or not isinstance(content, str):
|
||||
return # tool-only or non-text completion — skip
|
||||
content = content.lstrip()
|
||||
last_user_text = (
|
||||
_extract_text(messages[-1].get("content"))
|
||||
if messages
|
||||
and isinstance(messages[-1], dict)
|
||||
and messages[-1].get("role") == "user"
|
||||
else None
|
||||
)
|
||||
if not last_user_text:
|
||||
return
|
||||
await asyncio.to_thread(
|
||||
self.memory.add,
|
||||
[
|
||||
{"role": "user", "content": last_user_text},
|
||||
{"role": "assistant", "content": content},
|
||||
],
|
||||
user_id=user_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"post-completion mem0.add failed", error=str(e), user_id=user_id
|
||||
)
|
||||
|
||||
async def health_check(self) -> Dict[str, str]:
|
||||
"""Basic health check - just connectivity."""
|
||||
status = {}
|
||||
|
|
@ -817,9 +491,7 @@ class Mem0Manager:
|
|||
|
||||
# Check Mem0 memory
|
||||
try:
|
||||
self.memory.search(
|
||||
query="test", filters={"user_id": "health_check"}, top_k=1
|
||||
)
|
||||
self.memory.search(query="test", user_id="health_check", limit=1)
|
||||
status["mem0_memory"] = "healthy"
|
||||
except Exception as e:
|
||||
status["mem0_memory"] = f"unhealthy: {str(e)}"
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
"""Ultra-minimal Pydantic models for pure Mem0 API."""
|
||||
|
||||
from typing import List, Optional, Dict, Any, Union
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
from typing import List, Optional, Dict, Any
|
||||
from pydantic import BaseModel, Field
|
||||
import re
|
||||
|
||||
|
||||
|
|
@ -230,56 +230,82 @@ class UserStatsResponse(BaseModel):
|
|||
|
||||
|
||||
# OpenAI-Compatible API Models
|
||||
#
|
||||
# The /v1/chat/completions handler is a pass-through proxy: requests are
|
||||
# forwarded to the upstream LLM and the upstream response is relayed verbatim
|
||||
# (dict for non-stream, raw SSE bytes for stream). Hence only the REQUEST
|
||||
# model lives here; the response is never re-typed (typed Pydantic response
|
||||
# models silently drop unknown fields like tool_calls / refusal /
|
||||
# reasoning_tokens — see the audit in the plan file).
|
||||
|
||||
|
||||
class OpenAIMessage(BaseModel):
|
||||
"""OpenAI message format."""
|
||||
|
||||
role: str = Field(..., description="Message role (system, user, assistant)")
|
||||
content: str = Field(..., description="Message content")
|
||||
|
||||
|
||||
class OpenAIChatCompletionRequest(BaseModel):
|
||||
"""OpenAI chat completion request — permissive schema, forwarded as-is.
|
||||
"""OpenAI chat completion request format."""
|
||||
|
||||
Only `model` and `messages` are required. All other standard OpenAI
|
||||
parameters are typed (for client IDE/docs benefit) but optional. Unknown
|
||||
fields are accepted via `extra="allow"` and forwarded to upstream, so
|
||||
new OpenAI parameters don't require a code change here.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(extra="allow")
|
||||
|
||||
model: str = Field(..., description="Model to use (forwarded to upstream)")
|
||||
messages: List[Dict[str, Any]] = Field(
|
||||
..., description="Messages array (multi-part content supported)"
|
||||
model: str = Field(..., description="Model to use (will use configured default)")
|
||||
messages: List[Dict[str, str]] = Field(..., description="List of messages")
|
||||
temperature: Optional[float] = Field(0.7, description="Sampling temperature")
|
||||
max_tokens: Optional[int] = Field(None, description="Maximum tokens to generate")
|
||||
stream: Optional[bool] = Field(False, description="Whether to stream responses")
|
||||
top_p: Optional[float] = Field(1.0, description="Nucleus sampling parameter")
|
||||
n: Optional[int] = Field(1, description="Number of completions to generate")
|
||||
stop: Optional[List[str]] = Field(None, description="Stop sequences")
|
||||
presence_penalty: Optional[float] = Field(0, description="Presence penalty")
|
||||
frequency_penalty: Optional[float] = Field(0, description="Frequency penalty")
|
||||
user: Optional[str] = Field(
|
||||
None, description="User identifier (ignored, uses API key)"
|
||||
)
|
||||
|
||||
# Common params (typed for IDE/docs; all optional)
|
||||
temperature: Optional[float] = None
|
||||
top_p: Optional[float] = None
|
||||
n: Optional[int] = None
|
||||
stream: Optional[bool] = None
|
||||
stream_options: Optional[Dict[str, Any]] = None
|
||||
stop: Optional[Union[str, List[str]]] = None
|
||||
max_tokens: Optional[int] = None # deprecated; still accepted
|
||||
max_completion_tokens: Optional[int] = None # replaces max_tokens
|
||||
presence_penalty: Optional[float] = None
|
||||
frequency_penalty: Optional[float] = None
|
||||
seed: Optional[int] = None
|
||||
logprobs: Optional[bool] = None
|
||||
top_logprobs: Optional[int] = None
|
||||
response_format: Optional[Dict[str, Any]] = None
|
||||
tools: Optional[List[Dict[str, Any]]] = None
|
||||
tool_choice: Optional[Union[str, Dict[str, Any]]] = None
|
||||
parallel_tool_calls: Optional[bool] = None
|
||||
reasoning_effort: Optional[str] = None # o-series / reasoning models
|
||||
modalities: Optional[List[str]] = None
|
||||
audio: Optional[Dict[str, Any]] = None
|
||||
prediction: Optional[Dict[str, Any]] = None
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
store: Optional[bool] = None
|
||||
service_tier: Optional[str] = None
|
||||
logit_bias: Optional[Dict[str, float]] = None
|
||||
# `user` is ignored — the authenticated user is derived from the API key.
|
||||
user: Optional[str] = None
|
||||
|
||||
class OpenAIUsage(BaseModel):
|
||||
"""Token usage information."""
|
||||
|
||||
prompt_tokens: int = Field(..., description="Tokens in the prompt")
|
||||
completion_tokens: int = Field(..., description="Tokens in the completion")
|
||||
total_tokens: int = Field(..., description="Total tokens used")
|
||||
|
||||
|
||||
class OpenAIChoiceMessage(BaseModel):
|
||||
"""Message in a choice."""
|
||||
|
||||
role: str = Field(..., description="Role of the message")
|
||||
content: str = Field(..., description="Content of the message")
|
||||
|
||||
|
||||
class OpenAIChoice(BaseModel):
|
||||
"""Individual completion choice."""
|
||||
|
||||
index: int = Field(..., description="Choice index")
|
||||
message: OpenAIChoiceMessage = Field(..., description="Message content")
|
||||
finish_reason: str = Field(..., description="Reason for completion finish")
|
||||
|
||||
|
||||
class OpenAIChatCompletionResponse(BaseModel):
|
||||
"""OpenAI chat completion response format."""
|
||||
|
||||
id: str = Field(..., description="Unique completion ID")
|
||||
object: str = Field(default="chat.completion", description="Object type")
|
||||
created: int = Field(..., description="Unix timestamp of creation")
|
||||
model: str = Field(..., description="Model used for completion")
|
||||
choices: List[OpenAIChoice] = Field(..., description="List of completion choices")
|
||||
usage: Optional[OpenAIUsage] = Field(None, description="Token usage information")
|
||||
|
||||
|
||||
# Streaming-specific models
|
||||
|
||||
|
||||
class OpenAIStreamDelta(BaseModel):
|
||||
"""Delta content in a streaming chunk."""
|
||||
|
||||
role: Optional[str] = Field(None, description="Role (only in first chunk)")
|
||||
content: Optional[str] = Field(None, description="Incremental content")
|
||||
|
||||
|
||||
class OpenAIStreamChoice(BaseModel):
|
||||
"""Individual streaming choice."""
|
||||
|
||||
index: int = Field(..., description="Choice index")
|
||||
delta: OpenAIStreamDelta = Field(..., description="Delta content")
|
||||
finish_reason: Optional[str] = Field(
|
||||
None, description="Reason for completion finish"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,224 +0,0 @@
|
|||
"""mem0 platform API compatibility layer.
|
||||
|
||||
Implements the subset of the hosted mem0 platform API that the ``mem0ai``
|
||||
``MemoryClient`` (pinned ``mem0ai==2.0.2``) actually calls, so
|
||||
``MemoryClient(host="https://memory.pratikn.com", api_key=...)`` works against
|
||||
this self-hosted server. Each platform route maps onto the existing
|
||||
``mem0_manager`` singleton.
|
||||
|
||||
Contract notes (verified against the installed SDK source):
|
||||
- Auth header is ``Authorization: Token <key>`` (handled by
|
||||
``get_current_user_platform``, which also accepts Bearer / X-API-Key).
|
||||
- Core ops use ``/v3/memories/*``; item ops use ``/v1/memories/*``; all paths
|
||||
carry a trailing slash and are registered here at the exact path the SDK calls
|
||||
(so FastAPI matches exactly and never issues a slash redirect).
|
||||
- ``GET /v1/ping/`` runs at client construction and MUST return non-empty
|
||||
``org_id`` and ``project_id`` or the SDK's ``Project(...)`` raises.
|
||||
- Scoping (user_id/agent_id/run_id) is carried in ``filters`` (search/get_all),
|
||||
the top-level body (add), or the query string (delete_all). It defaults to the
|
||||
authenticated user; a mismatch is rejected with 403 (same model as the native
|
||||
endpoints).
|
||||
"""
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request
|
||||
import structlog
|
||||
|
||||
from auth import get_current_user_platform
|
||||
from mem0_manager import mem0_manager
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
router = APIRouter(tags=["mem0-platform-compat"])
|
||||
|
||||
|
||||
def _require_self(requested: Optional[str], authed: str) -> str:
|
||||
"""Return the user_id to operate on: default to the authenticated user,
|
||||
reject a mismatch with 403 (consistent with the native endpoints)."""
|
||||
if not requested:
|
||||
return authed
|
||||
if requested != authed:
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail=f"Access denied: you can only access your own memories (authenticated as '{authed}')",
|
||||
)
|
||||
return authed
|
||||
|
||||
|
||||
async def _json_body(request: Request) -> Dict[str, Any]:
|
||||
"""Parse the JSON body defensively (the SDK sends varied shapes)."""
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
body = None
|
||||
return body if isinstance(body, dict) else {}
|
||||
|
||||
|
||||
def _split_filters(body: Dict[str, Any], authed: str):
|
||||
"""Pull the scoping IDs out of the SDK's ``filters`` object and return
|
||||
(target_user, agent_id, run_id, remaining_filters)."""
|
||||
filters = dict(body.get("filters") or {})
|
||||
target = _require_self(filters.pop("user_id", None), authed)
|
||||
agent_id = filters.pop("agent_id", None)
|
||||
run_id = filters.pop("run_id", None)
|
||||
filters.pop("app_id", None) # accepted by the SDK; unused here
|
||||
return target, agent_id, run_id, (filters or None)
|
||||
|
||||
|
||||
async def _owned_or_404(memory_id: str, user: str) -> None:
|
||||
if not await mem0_manager.verify_memory_ownership(memory_id, user):
|
||||
raise HTTPException(status_code=404, detail=f"Memory '{memory_id}' not found")
|
||||
|
||||
|
||||
@router.get("/v1/ping/")
|
||||
async def ping(user: str = Depends(get_current_user_platform)) -> Dict[str, Any]:
|
||||
"""Client construction validation. ``org_id``/``project_id`` MUST be
|
||||
non-empty or the SDK's ``Project(...)`` raises 'org_id and project_id must
|
||||
be set'."""
|
||||
return {
|
||||
"status": "ok",
|
||||
"user_email": user,
|
||||
"org_id": "default-org",
|
||||
"project_id": "default-project",
|
||||
}
|
||||
|
||||
|
||||
@router.post("/v3/memories/add/")
|
||||
async def add_memories(
|
||||
request: Request, user: str = Depends(get_current_user_platform)
|
||||
) -> Dict[str, Any]:
|
||||
body = await _json_body(request)
|
||||
messages = body.get("messages")
|
||||
if not messages:
|
||||
raise HTTPException(status_code=422, detail="`messages` is required")
|
||||
if isinstance(messages, str):
|
||||
messages = [{"role": "user", "content": messages}]
|
||||
target = _require_self(body.get("user_id"), user)
|
||||
|
||||
raw = await mem0_manager.add_memories(
|
||||
messages=messages,
|
||||
user_id=target,
|
||||
agent_id=body.get("agent_id"),
|
||||
run_id=body.get("run_id"),
|
||||
metadata=body.get("metadata"),
|
||||
)
|
||||
# mem0_manager wraps the mem0 result as {"added_memories": [<mem0 dict>], ...};
|
||||
# the mem0 dict is already {"results": [...]} (the platform shape).
|
||||
added = raw.get("added_memories") or []
|
||||
if added and isinstance(added[0], dict) and "results" in added[0]:
|
||||
return added[0]
|
||||
return {"results": added}
|
||||
|
||||
|
||||
@router.post("/v3/memories/search/")
|
||||
async def search_memories(
|
||||
request: Request, user: str = Depends(get_current_user_platform)
|
||||
) -> Dict[str, Any]:
|
||||
body = await _json_body(request)
|
||||
query = body.get("query")
|
||||
if not query:
|
||||
raise HTTPException(status_code=422, detail="`query` is required")
|
||||
target, agent_id, run_id, extra = _split_filters(body, user)
|
||||
top_k = body.get("top_k") or body.get("limit") or 10
|
||||
|
||||
result = await mem0_manager.search_memories(
|
||||
query=query,
|
||||
user_id=target,
|
||||
limit=int(top_k),
|
||||
filters=extra,
|
||||
agent_id=agent_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
return {"results": result.get("memories", [])}
|
||||
|
||||
|
||||
@router.post("/v3/memories/")
|
||||
async def get_all_memories(
|
||||
request: Request, user: str = Depends(get_current_user_platform)
|
||||
) -> Dict[str, Any]:
|
||||
body = await _json_body(request)
|
||||
target, agent_id, run_id, extra = _split_filters(body, user)
|
||||
|
||||
# The SDK sends page/page_size as query params. mem0's get_all has no offset,
|
||||
# so we fetch up to page*page_size and slice the requested page.
|
||||
try:
|
||||
page = max(int(request.query_params.get("page", 1)), 1)
|
||||
page_size = min(max(int(request.query_params.get("page_size", 100)), 1), 1000)
|
||||
except ValueError:
|
||||
page, page_size = 1, 100
|
||||
|
||||
items = await mem0_manager.get_user_memories(
|
||||
user_id=target,
|
||||
limit=page * page_size,
|
||||
agent_id=agent_id,
|
||||
run_id=run_id,
|
||||
filters=extra,
|
||||
)
|
||||
total = len(items)
|
||||
start = (page - 1) * page_size
|
||||
return {
|
||||
"count": total,
|
||||
"next": page + 1 if start + page_size < total else None,
|
||||
"previous": page - 1 if page > 1 else None,
|
||||
"results": items[start : start + page_size],
|
||||
}
|
||||
|
||||
|
||||
@router.get("/v1/memories/{memory_id}/")
|
||||
async def get_memory(
|
||||
memory_id: str, user: str = Depends(get_current_user_platform)
|
||||
) -> Dict[str, Any]:
|
||||
await _owned_or_404(memory_id, user)
|
||||
mem = await mem0_manager.get_memory(memory_id)
|
||||
if mem is None:
|
||||
raise HTTPException(status_code=404, detail=f"Memory '{memory_id}' not found")
|
||||
return mem
|
||||
|
||||
|
||||
@router.put("/v1/memories/{memory_id}/")
|
||||
async def update_memory(
|
||||
memory_id: str, request: Request, user: str = Depends(get_current_user_platform)
|
||||
) -> Dict[str, Any]:
|
||||
await _owned_or_404(memory_id, user)
|
||||
body = await _json_body(request)
|
||||
text = body.get("text") or body.get("memory") or body.get("data")
|
||||
if not text:
|
||||
raise HTTPException(
|
||||
status_code=422, detail="`text` is required to update a memory"
|
||||
)
|
||||
return await mem0_manager.update_memory(memory_id=memory_id, content=text)
|
||||
|
||||
|
||||
@router.delete("/v1/memories/{memory_id}/")
|
||||
async def delete_memory(
|
||||
memory_id: str, user: str = Depends(get_current_user_platform)
|
||||
) -> Dict[str, Any]:
|
||||
await _owned_or_404(memory_id, user)
|
||||
return await mem0_manager.delete_memory(memory_id=memory_id)
|
||||
|
||||
|
||||
@router.get("/v1/memories/{memory_id}/history/")
|
||||
async def memory_history(
|
||||
memory_id: str, user: str = Depends(get_current_user_platform)
|
||||
) -> List[Dict[str, Any]]:
|
||||
await _owned_or_404(memory_id, user)
|
||||
result = await mem0_manager.get_memory_history(memory_id)
|
||||
return result.get("history", [])
|
||||
|
||||
|
||||
@router.delete("/v1/memories/")
|
||||
async def delete_all_memories(
|
||||
request: Request, user: str = Depends(get_current_user_platform)
|
||||
) -> Dict[str, Any]:
|
||||
target = _require_self(request.query_params.get("user_id"), user)
|
||||
return await mem0_manager.delete_user_memories(user_id=target)
|
||||
|
||||
|
||||
@router.get("/v1/entities/")
|
||||
async def list_entities(
|
||||
user: str = Depends(get_current_user_platform),
|
||||
) -> Dict[str, Any]:
|
||||
# This server is single-user-per-key; report the authenticated user as the
|
||||
# only entity (the platform returns all users/agents/runs with memories).
|
||||
return {"results": [{"id": user, "name": user, "type": "user"}], "count": 1}
|
||||
|
|
@ -4,14 +4,16 @@ uvicorn[standard]
|
|||
python-multipart
|
||||
|
||||
# Mem0 and AI
|
||||
mem0ai[nlp]==2.0.2
|
||||
fastembed>=0.3.1
|
||||
mem0ai
|
||||
openai
|
||||
google-genai
|
||||
cohere
|
||||
|
||||
# Database
|
||||
qdrant-client>=1.12.0
|
||||
qdrant-client
|
||||
neo4j
|
||||
langchain-neo4j
|
||||
rank-bm25
|
||||
ollama
|
||||
|
||||
# Utilities
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
services:
|
||||
# Qdrant vector database for vector + sparse (BM25) storage
|
||||
# Qdrant vector database for vector storage
|
||||
qdrant:
|
||||
image: qdrant/qdrant:v1.18.1
|
||||
image: qdrant/qdrant:latest
|
||||
container_name: mem0-qdrant
|
||||
expose:
|
||||
- "6333"
|
||||
|
|
@ -18,9 +18,39 @@ services:
|
|||
retries: 5
|
||||
restart: unless-stopped
|
||||
|
||||
# Neo4j with APOC for graph relationships
|
||||
neo4j:
|
||||
image: neo4j:5.26.4
|
||||
container_name: mem0-neo4j
|
||||
environment:
|
||||
NEO4J_AUTH: ${NEO4J_AUTH:-neo4j/mem0_neo4j_password}
|
||||
NEO4J_PLUGINS: '["apoc"]'
|
||||
NEO4J_apoc_export_file_enabled: true
|
||||
NEO4J_apoc_import_file_enabled: true
|
||||
NEO4J_apoc_import_file_use__neo4j__config: true
|
||||
NEO4J_ACCEPT_LICENSE_AGREEMENT: yes
|
||||
NEO4J_dbms_security_procedures_unrestricted: apoc.*
|
||||
NEO4J_dbms_security_procedures_allowlist: apoc.*
|
||||
expose:
|
||||
- "7474" # HTTP - Internal only
|
||||
- "7687" # Bolt - Internal only
|
||||
networks:
|
||||
- mem0_network
|
||||
volumes:
|
||||
- neo4j_data:/data
|
||||
- neo4j_logs:/logs
|
||||
- neo4j_import:/var/lib/neo4j/import
|
||||
- neo4j_plugins:/plugins
|
||||
healthcheck:
|
||||
test: ["CMD", "cypher-shell", "-u", "neo4j", "-p", "${NEO4J_PASSWORD:-mem0_neo4j_password}", "RETURN 1"]
|
||||
interval: 10s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
restart: unless-stopped
|
||||
|
||||
# Backend API service
|
||||
backend:
|
||||
build:
|
||||
build:
|
||||
context: ./backend
|
||||
dockerfile: Dockerfile
|
||||
container_name: mem0-backend
|
||||
|
|
@ -32,6 +62,9 @@ services:
|
|||
QDRANT_HOST: qdrant
|
||||
QDRANT_PORT: 6333
|
||||
QDRANT_COLLECTION_NAME: ${QDRANT_COLLECTION_NAME:-mem0}
|
||||
NEO4J_URI: bolt://neo4j:7687
|
||||
NEO4J_USERNAME: ${NEO4J_USERNAME:-neo4j}
|
||||
NEO4J_PASSWORD: ${NEO4J_PASSWORD:-mem0_neo4j_password}
|
||||
LOG_LEVEL: ${LOG_LEVEL:-INFO}
|
||||
CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:3000}
|
||||
DEFAULT_MODEL: ${DEFAULT_MODEL:-claude-sonnet-4}
|
||||
|
|
@ -47,24 +80,20 @@ services:
|
|||
depends_on:
|
||||
qdrant:
|
||||
condition: service_healthy
|
||||
# Backend startup loads spaCy (NLP extra), initializes mem0 v2, mounts MCP,
|
||||
# and warms 4 workers — needs ~30-60s. start_period below keeps healthcheck
|
||||
# failures from counting (and from triggering willfarrell/autoheal) during
|
||||
# boot.
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 5
|
||||
start_period: 90s
|
||||
neo4j:
|
||||
condition: service_healthy
|
||||
restart: unless-stopped
|
||||
volumes:
|
||||
- ./backend:/app
|
||||
- ./frontend:/app/frontend
|
||||
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4", "--proxy-headers", "--forwarded-allow-ips", "*"]
|
||||
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
|
||||
|
||||
volumes:
|
||||
qdrant_data:
|
||||
neo4j_data:
|
||||
neo4j_logs:
|
||||
neo4j_import:
|
||||
neo4j_plugins:
|
||||
|
||||
networks:
|
||||
mem0_network:
|
||||
|
|
|
|||
|
|
@ -1,197 +0,0 @@
|
|||
# Migration Runbook: mem0 v0.1.x/v1.x → v2.0.2 (the "V3 pipeline")
|
||||
|
||||
This runbook covers the **operational** half of the migration — backups, the
|
||||
Qdrant collection rebuild for BM25, the Neo4j dump, cutover and rollback. The
|
||||
**code-level** half (Dockerfile, requirements, mem0_manager rewrites, etc.) is
|
||||
already committed on this branch; this document is what to follow when taking
|
||||
those code changes to a stack that has live data.
|
||||
|
||||
## TL;DR
|
||||
|
||||
1. Snapshot Qdrant + dump Neo4j (Phase 2).
|
||||
2. Deploy v2 backend to a scratch stack (Phase 3).
|
||||
3. Rebuild the Qdrant collection with BM25 by warm-up-add → scroll/upsert →
|
||||
swap (Phase 4).
|
||||
4. Run integration tests (Phase 5).
|
||||
5. Cutover production with the same steps inside a maintenance window (Phase 6).
|
||||
|
||||
## Phase 1 — Pre-flight
|
||||
|
||||
```bash
|
||||
# 1. Capture the exact running mem0ai version (record for the ticket)
|
||||
docker compose exec backend pip show mem0ai
|
||||
|
||||
# 2. Tag the pre-migration commit
|
||||
git tag pre-mem0-v3-migration && git push --tags
|
||||
|
||||
# 3. Verify free disk on the volumes (snapshots can be sizeable)
|
||||
docker system df -v | grep -E "qdrant_data|neo4j_data"
|
||||
```
|
||||
|
||||
## Phase 2 — Backups (read-only on production)
|
||||
|
||||
### Qdrant snapshot
|
||||
|
||||
```bash
|
||||
# Create snapshot of the live mem0 collection (qdrant runs inside the network as 'qdrant')
|
||||
docker compose exec backend curl -X POST \
|
||||
"http://qdrant:6333/collections/mem0/snapshots?wait=true"
|
||||
|
||||
# Returns JSON with the snapshot filename, e.g. mem0-XXXXXXXX.snapshot.
|
||||
# Copy it off the qdrant container's volume:
|
||||
docker compose exec qdrant ls /qdrant/storage/collections/mem0/snapshots/
|
||||
mkdir -p ./backups/qdrant
|
||||
docker cp mem0-qdrant:/qdrant/storage/collections/mem0/snapshots/<snapshot-file> ./backups/qdrant/
|
||||
```
|
||||
|
||||
### Neo4j offline dump (decommission path)
|
||||
|
||||
Neo4j 5.x requires the database to be stopped to dump it.
|
||||
|
||||
```bash
|
||||
mkdir -p ./backups/neo4j
|
||||
docker compose stop neo4j
|
||||
docker run --rm \
|
||||
--volumes-from mem0-neo4j \
|
||||
-v "$(pwd)/backups/neo4j:/dumps" \
|
||||
neo4j:5.26.4 \
|
||||
neo4j-admin database dump neo4j --to-path=/dumps
|
||||
# (No need to restart neo4j — it is being decommissioned.)
|
||||
```
|
||||
|
||||
Keep both backups for **at least 30 days** post-cutover. Calendar a reminder.
|
||||
|
||||
### Pre-cutover per-user memory counts
|
||||
|
||||
```bash
|
||||
# Iterate API_KEYS users, hit /stats/{user_id}, save the count. Adjust per your auth.
|
||||
for user in $(jq -r 'values | unique[]' <<< "$API_KEYS"); do
|
||||
echo -n "$user: "
|
||||
docker compose exec backend curl -s -H "X-API-Key: <admin-or-user-key>" \
|
||||
"http://localhost:8000/stats/$user" | jq -r '.memory_count // 0'
|
||||
done > pre-cutover-counts.txt
|
||||
```
|
||||
|
||||
## Phase 3 — Deploy v2 backend (scratch stack first)
|
||||
|
||||
Use a developer or staging machine with a **restored copy** of the prod
|
||||
snapshot, not prod itself.
|
||||
|
||||
```bash
|
||||
# Restore the prod snapshot onto the scratch Qdrant
|
||||
docker compose exec backend curl -X POST \
|
||||
"http://qdrant:6333/collections/mem0_legacy/snapshots/upload?priority=snapshot" \
|
||||
-H "Content-Type: multipart/form-data" \
|
||||
-F "snapshot=@/backups/qdrant/<snapshot-file>"
|
||||
# (or restore as 'mem0' if you want to start with the legacy name)
|
||||
|
||||
# Build + start the v2 backend
|
||||
docker compose build --no-cache backend
|
||||
docker compose up -d
|
||||
docker compose logs -f backend
|
||||
```
|
||||
|
||||
Watch for:
|
||||
- `Applied Claude/OpenAI-compatible patch: cleared top_p (and store)` — patch loaded.
|
||||
- `Initialized ultra-minimal Mem0Manager with custom endpoint` — startup OK.
|
||||
- No errors mentioning `graph_store` or `enable_graph` (we removed them).
|
||||
- On any search/get_all: no `ValueError` from filters.
|
||||
|
||||
## Phase 4 — Rebuild the Qdrant collection for BM25
|
||||
|
||||
Pre-v2 collections lack the `bm25` sparse-vector slot. mem0 v2 silently
|
||||
downgrades to semantic-only on them — to get full hybrid search you must
|
||||
recreate the collection.
|
||||
|
||||
```bash
|
||||
# 1. Set the env var so the v2 backend creates a NEW collection with the right schema
|
||||
docker compose exec backend sh -c 'QDRANT_COLLECTION_NAME=mem0_v3 \
|
||||
python -c "from mem0_manager import mem0_manager; \
|
||||
mem0_manager.memory.add([{\"role\":\"user\",\"content\":\"warm-up\"}], user_id=\"__warmup__\")"'
|
||||
# This lazy-creates mem0_v3 + mem0_v3_entities with the bm25 slot.
|
||||
# (Delete the warm-up memory after if you care.)
|
||||
|
||||
# 2. Run the migration script — preserves id + vector + payload, no re-embed
|
||||
docker compose exec backend python /app/../scripts/migrate_qdrant_to_v3.py \
|
||||
--source mem0 --target mem0_v3 \
|
||||
--qdrant-host qdrant --qdrant-port 6333 --dry-run
|
||||
# Inspect the per-user counts. If OK, run for real:
|
||||
docker compose exec backend python /app/../scripts/migrate_qdrant_to_v3.py \
|
||||
--source mem0 --target mem0_v3 \
|
||||
--qdrant-host qdrant --qdrant-port 6333
|
||||
|
||||
# 3. Swap names. Qdrant has no in-place rename — use snapshot+upload.
|
||||
# Snapshot mem0_v3, upload as mem0_swap, then snapshot mem0 as mem0_legacy, then
|
||||
# upload mem0_swap as mem0. Or simply point QDRANT_COLLECTION_NAME at mem0_v3 in
|
||||
# docker-compose.yml and keep `mem0` around as the legacy backup.
|
||||
```
|
||||
|
||||
Easiest path: **leave the legacy collection alone** and update
|
||||
`QDRANT_COLLECTION_NAME` to `mem0_v3` in `.env` / `docker-compose.yml`. The
|
||||
legacy `mem0` collection sits there as an extra backup until you delete it.
|
||||
|
||||
## Phase 5 — Integration tests
|
||||
|
||||
```bash
|
||||
MEM0_API_KEY=<dev-key-mapped-to-test-user> python test_integration.py -v
|
||||
```
|
||||
|
||||
The test script generates a fresh `TEST_USER` per run — make sure the supplied
|
||||
API key maps to that user (see CLAUDE.md "There are no unit tests..." note).
|
||||
|
||||
Expected: all pass. The `/graph/relationships/{user_id}` test should accept the
|
||||
new `deprecated: true` payload.
|
||||
|
||||
## Phase 6 — Production cutover
|
||||
|
||||
Maintenance window ~30 min.
|
||||
|
||||
1. Communicate the window.
|
||||
2. Re-snapshot Qdrant immediately before the deploy (so the rollback snapshot
|
||||
is the freshest possible).
|
||||
3. `git pull` the migration branch (or merge to main first).
|
||||
4. `docker compose build --no-cache backend && docker compose up -d backend`.
|
||||
5. Run the Phase 4 collection rebuild on prod.
|
||||
6. Smoke test: `/health`, one `/chat` round-trip, one `/memories` write, one
|
||||
`/memories/search` read.
|
||||
7. Verify per-user counts match `pre-cutover-counts.txt` (use the same loop).
|
||||
|
||||
## Rollback
|
||||
|
||||
### Before the first v2 write hits prod (fully safe)
|
||||
|
||||
```bash
|
||||
git revert <migration-commit-sha>
|
||||
docker compose build --no-cache backend
|
||||
docker compose up -d backend
|
||||
```
|
||||
|
||||
### After cutover but snapshot still on disk (loses post-cutover writes)
|
||||
|
||||
```bash
|
||||
# Stop the backend so no more writes land on the v2 collection
|
||||
docker compose stop backend
|
||||
|
||||
# Restore the pre-cutover Qdrant snapshot to a fresh name, then swap
|
||||
docker compose exec qdrant curl -X POST \
|
||||
"http://qdrant:6333/collections/mem0_rollback/snapshots/upload?priority=snapshot" \
|
||||
-H "Content-Type: multipart/form-data" \
|
||||
-F "snapshot=@/qdrant/snapshots/<pre-cutover-snapshot>"
|
||||
# Update QDRANT_COLLECTION_NAME=mem0_rollback or rename via snapshot+upload.
|
||||
|
||||
# Restore Neo4j if needed
|
||||
docker run --rm \
|
||||
--volumes-from mem0-neo4j \
|
||||
-v "$(pwd)/backups/neo4j:/dumps" \
|
||||
neo4j:5.26.4 \
|
||||
neo4j-admin database load neo4j --from-path=/dumps --overwrite-destination=true
|
||||
|
||||
# Revert code and restart
|
||||
git revert <migration-commit-sha>
|
||||
docker compose build --no-cache backend
|
||||
docker compose up -d backend
|
||||
```
|
||||
|
||||
### After snapshot retention expires
|
||||
|
||||
**Irreversible.** Keep the pre-cutover snapshot and Neo4j dump for ≥30 days.
|
||||
|
|
@ -1,97 +0,0 @@
|
|||
#!/usr/bin/env bash
|
||||
#
|
||||
# Qdrant snapshot + off-host rotation.
|
||||
#
|
||||
# Snapshots both collections (mem0_v3 + mem0_v3_entities) back-to-back via the
|
||||
# Qdrant REST API, downloads them to a date-stamped local directory, uploads to
|
||||
# the configured rclone remote, prunes local copies older than 14 days, and
|
||||
# emits a Prometheus textfile metric for future scrape.
|
||||
#
|
||||
# Env vars (override defaults):
|
||||
# QDRANT_CONTAINER container name (default: mem0-qdrant)
|
||||
# COLLECTIONS space-separated collection names
|
||||
# (default: "mem0_v3 mem0_v3_entities")
|
||||
# BACKUP_DIR local backup root
|
||||
# (default: ~/aistuff/mem0/backups/qdrant)
|
||||
# RCLONE_REMOTE rclone remote path (e.g. b2:mem0-backups/qdrant).
|
||||
# If unset, off-host upload is skipped.
|
||||
# LOCAL_RETENTION_DAYS how long to keep local copies (default: 14)
|
||||
# TEXTFILE_DIR Prometheus node_exporter textfile collector dir
|
||||
# (default: /var/lib/node_exporter/textfile_collector,
|
||||
# skipped if the dir does not exist)
|
||||
#
|
||||
# Suggested cron (daily at 03:00 UTC):
|
||||
# 0 3 * * * RCLONE_REMOTE=b2:mem0-backups/qdrant /home/ubuntu/aistuff/mem0/scripts/backup_qdrant.sh >> /home/ubuntu/aistuff/mem0/backups/backup.log 2>&1
|
||||
#
|
||||
# Exit codes:
|
||||
# 0 success
|
||||
# 1 snapshot/download failure
|
||||
# 2 rclone failure (after local download succeeded)
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
QDRANT_CONTAINER="${QDRANT_CONTAINER:-mem0-qdrant}"
|
||||
COLLECTIONS="${COLLECTIONS:-mem0_v3 mem0_v3_entities}"
|
||||
BACKUP_DIR="${BACKUP_DIR:-$HOME/aistuff/mem0/backups/qdrant}"
|
||||
RCLONE_REMOTE="${RCLONE_REMOTE:-}"
|
||||
LOCAL_RETENTION_DAYS="${LOCAL_RETENTION_DAYS:-14}"
|
||||
TEXTFILE_DIR="${TEXTFILE_DIR:-/var/lib/node_exporter/textfile_collector}"
|
||||
|
||||
TS="$(date -u +%Y%m%dT%H%M%SZ)"
|
||||
DAY="$(date -u +%Y-%m-%d)"
|
||||
TARGET_DIR="$BACKUP_DIR/$DAY"
|
||||
mkdir -p "$TARGET_DIR"
|
||||
|
||||
log() { printf '[%s] %s\n' "$(date -u +%FT%TZ)" "$*"; }
|
||||
|
||||
log "starting backup ts=$TS dir=$TARGET_DIR collections=$COLLECTIONS"
|
||||
|
||||
total_bytes=0
|
||||
for col in $COLLECTIONS; do
|
||||
log "snapshot create: $col"
|
||||
resp=$(docker exec "$QDRANT_CONTAINER" curl -fsS -X POST \
|
||||
"http://localhost:6333/collections/$col/snapshots?wait=true")
|
||||
snap_name=$(printf '%s' "$resp" \
|
||||
| python3 -c 'import sys,json; print(json.load(sys.stdin)["result"]["name"])')
|
||||
|
||||
out_file="$TARGET_DIR/${col}_${TS}_${snap_name}"
|
||||
log "snapshot download: $col/$snap_name -> $out_file"
|
||||
docker cp "$QDRANT_CONTAINER:/qdrant/snapshots/$col/$snap_name" "$out_file"
|
||||
|
||||
# Remove the in-container snapshot to avoid disk bloat on the volume.
|
||||
docker exec "$QDRANT_CONTAINER" curl -fsS -X DELETE \
|
||||
"http://localhost:6333/collections/$col/snapshots/$snap_name" >/dev/null
|
||||
|
||||
size=$(stat -c %s "$out_file" 2>/dev/null || stat -f %z "$out_file")
|
||||
total_bytes=$((total_bytes + size))
|
||||
log "downloaded: $out_file ($size bytes)"
|
||||
done
|
||||
|
||||
if [ -n "$RCLONE_REMOTE" ]; then
|
||||
log "rclone copy: $TARGET_DIR -> $RCLONE_REMOTE/$DAY"
|
||||
if ! rclone copy "$TARGET_DIR" "$RCLONE_REMOTE/$DAY"; then
|
||||
log "rclone failed (local copies retained)"
|
||||
exit 2
|
||||
fi
|
||||
else
|
||||
log "RCLONE_REMOTE unset; skipping off-host upload"
|
||||
fi
|
||||
|
||||
log "pruning local copies older than $LOCAL_RETENTION_DAYS days"
|
||||
find "$BACKUP_DIR" -mindepth 1 -maxdepth 1 -type d -mtime "+$LOCAL_RETENTION_DAYS" -exec rm -rf {} +
|
||||
|
||||
if [ -d "$TEXTFILE_DIR" ]; then
|
||||
tmp="$(mktemp)"
|
||||
{
|
||||
echo "# HELP qdrant_last_backup_timestamp_seconds Unix timestamp of last successful Qdrant backup."
|
||||
echo "# TYPE qdrant_last_backup_timestamp_seconds gauge"
|
||||
echo "qdrant_last_backup_timestamp_seconds $(date -u +%s)"
|
||||
echo "# HELP qdrant_last_backup_bytes Total bytes of last successful Qdrant backup."
|
||||
echo "# TYPE qdrant_last_backup_bytes gauge"
|
||||
echo "qdrant_last_backup_bytes $total_bytes"
|
||||
} > "$tmp"
|
||||
mv "$tmp" "$TEXTFILE_DIR/qdrant_backup.prom"
|
||||
log "textfile metric written: $TEXTFILE_DIR/qdrant_backup.prom"
|
||||
fi
|
||||
|
||||
log "backup complete: $total_bytes bytes across $(echo "$COLLECTIONS" | wc -w) collection(s)"
|
||||
|
|
@ -1,100 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Delete memories that were imported from Neo4j (tagged metadata.source='neo4j_legacy_import').
|
||||
|
||||
Usage:
|
||||
docker compose exec backend python /tmp/cleanup.py --user alice
|
||||
docker compose exec backend python /tmp/cleanup.py --users alice,hetashree,manju
|
||||
docker compose exec backend python /tmp/cleanup.py --dry-run --users alice
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
import httpx
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models
|
||||
|
||||
QDRANT_HOST = os.environ.get("QDRANT_HOST", "qdrant")
|
||||
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
|
||||
QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION_NAME", "mem0_v3")
|
||||
MEM0_URL = os.environ.get("MEM0_URL", "http://localhost:8000")
|
||||
|
||||
|
||||
def load_api_keys() -> dict:
|
||||
raw = os.environ.get("API_KEYS", "{}")
|
||||
keys = json.loads(raw)
|
||||
inv: dict = {}
|
||||
for k, u in keys.items():
|
||||
inv.setdefault(u, k)
|
||||
return inv
|
||||
|
||||
|
||||
def scroll_legacy_for(client: QdrantClient, user_id: str) -> list:
|
||||
"""Return all memory IDs in mem0_v3 with source='neo4j_legacy_import' for the user."""
|
||||
ids = []
|
||||
offset = None
|
||||
while True:
|
||||
points, offset = client.scroll(
|
||||
collection_name=QDRANT_COLLECTION,
|
||||
scroll_filter=models.Filter(
|
||||
must=[
|
||||
models.FieldCondition(key="user_id", match=models.MatchValue(value=user_id)),
|
||||
models.FieldCondition(key="source", match=models.MatchValue(value="neo4j_legacy_import")),
|
||||
]
|
||||
),
|
||||
limit=128,
|
||||
with_payload=False,
|
||||
with_vectors=False,
|
||||
offset=offset,
|
||||
)
|
||||
ids.extend(p.id for p in points)
|
||||
if offset is None:
|
||||
break
|
||||
return ids
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--user", help="Single user to clean")
|
||||
ap.add_argument("--users", help="Comma-separated user list")
|
||||
ap.add_argument("--dry-run", action="store_true")
|
||||
args = ap.parse_args()
|
||||
|
||||
targets = []
|
||||
if args.user:
|
||||
targets.append(args.user)
|
||||
if args.users:
|
||||
targets.extend(u.strip() for u in args.users.split(","))
|
||||
if not targets:
|
||||
print("Usage: --user X or --users a,b,c", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
api_keys = load_api_keys()
|
||||
client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
|
||||
http = httpx.Client(timeout=60.0)
|
||||
|
||||
for user_id in targets:
|
||||
ids = scroll_legacy_for(client, user_id)
|
||||
print(f"{user_id}: found {len(ids)} legacy-import memories")
|
||||
if args.dry_run:
|
||||
continue
|
||||
key = api_keys.get(user_id)
|
||||
if not key:
|
||||
print(f" no API key for {user_id} — skipping")
|
||||
continue
|
||||
deleted = errors = 0
|
||||
for mid in ids:
|
||||
r = http.delete(f"{MEM0_URL}/memories/{mid}", headers={"X-API-Key": key})
|
||||
if r.status_code == 200:
|
||||
deleted += 1
|
||||
else:
|
||||
errors += 1
|
||||
print(f" failed {mid}: HTTP {r.status_code} {r.text[:120]}")
|
||||
print(f" deleted={deleted} errors={errors}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
|
@ -1,273 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Import Neo4j graph relationships into mem0 v2 as natural-language memories.
|
||||
|
||||
For each `(source)-[rel_type]->(destination)` edge in the Neo4j graph, POST a
|
||||
short sentence ("source verb destination") to mem0's `/memories` endpoint.
|
||||
mem0's v3 fact-extraction + entity-linking pipeline handles the rest.
|
||||
|
||||
Tagged with metadata.source="neo4j_legacy_import" so the imports can be
|
||||
identified (or bulk-deleted) later via the standard /memories endpoints.
|
||||
|
||||
Usage (inside the mem0-backend container, which is on the same docker network
|
||||
as neo4j-temp and the mem0 backend itself):
|
||||
|
||||
# Dry-run — show sample sentences without posting:
|
||||
docker compose exec backend python /tmp/import.py --dry-run --limit 30
|
||||
|
||||
# Single-user POC:
|
||||
docker compose exec backend python /tmp/import.py --user akshat
|
||||
|
||||
# Full sweep:
|
||||
docker compose exec backend python /tmp/import.py
|
||||
|
||||
Environment variables (all have sensible defaults for the beast deployment):
|
||||
NEO4J_URI bolt://neo4j-temp:7687
|
||||
NEO4J_USER neo4j
|
||||
NEO4J_PASS mem0_neo4j_password
|
||||
MEM0_URL http://localhost:8000
|
||||
API_KEYS JSON mapping {api_key: user_id} (already in container env)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
from neo4j import GraphDatabase
|
||||
|
||||
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://neo4j-temp:7687")
|
||||
NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j")
|
||||
NEO4J_PASS = os.environ.get("NEO4J_PASS", "mem0_neo4j_password")
|
||||
MEM0_URL = os.environ.get("MEM0_URL", "http://localhost:8000")
|
||||
|
||||
# LiteLLM proxy for sentence rewriting (--llm-rewrite mode)
|
||||
LLM_URL = os.environ.get("LLM_URL", os.environ.get("OPENAI_BASE_URL", "https://veronica.pratikn.com"))
|
||||
LLM_KEY = os.environ.get("LLM_KEY", os.environ.get("OPENAI_API_KEY", ""))
|
||||
LLM_MODEL = os.environ.get("LLM_MODEL", "minimax-m2")
|
||||
|
||||
REWRITE_PROMPT = """You are converting a knowledge-graph relationship into a natural English sentence for a personal memory system.
|
||||
|
||||
The user we're building memories for is identified by user_id="{user_id}". The graph may reference them by variant names (full name, "user_id:_{user_id}", nicknames). Treat all of those as the same person — in your output sentence, use the name "{user_id}" (capitalized appropriately) so the memory layer can link it consistently.
|
||||
|
||||
Convert this tuple into ONE natural English sentence:
|
||||
source: {source}
|
||||
relationship: {rel}
|
||||
destination: {dest}
|
||||
|
||||
Rules:
|
||||
- Output ONLY the sentence. No quotes, no preamble, no explanation, no markdown.
|
||||
- Use proper capitalization, grammar, and convert snake_case names to natural words ("custom_headers" → "custom headers").
|
||||
- If the source IS the user, write a third-person sentence starting with "{user_id_cap}" (e.g., "{user_id_cap} works at TechCorp.").
|
||||
- If the source is a non-user entity, write a third-person sentence about that entity (e.g., "Dr Seta works at Chitra.").
|
||||
- If the tuple doesn't translate to a meaningful, retrievable fact (pure identifier metadata like a postal code, generic world knowledge like a timezone offset, or self-references), respond with exactly: SKIP"""
|
||||
|
||||
|
||||
def load_api_keys() -> dict:
|
||||
raw = os.environ.get("API_KEYS", "{}")
|
||||
keys = json.loads(raw)
|
||||
inv: dict = {}
|
||||
for k, u in keys.items():
|
||||
inv.setdefault(u, k)
|
||||
return inv
|
||||
|
||||
|
||||
def humanize(name: str, user_id: Optional[str] = None) -> str:
|
||||
"""snake_case node name → human-readable. __User__ nodes collapse to the user_id."""
|
||||
if not name:
|
||||
return "<unknown>"
|
||||
# The __User__ label uses name = "user_id:_<username>"
|
||||
if name.startswith("user_id:_") and user_id:
|
||||
return user_id
|
||||
return name.replace("_", " ")
|
||||
|
||||
|
||||
def humanize_rel(rel_type: str) -> str:
|
||||
return rel_type.replace("_", " ")
|
||||
|
||||
|
||||
def build_sentence(source: str, rel_type: str, dest: str, user_id: str) -> str:
|
||||
return f"{humanize(source, user_id)} {humanize_rel(rel_type)} {humanize(dest, user_id)}."
|
||||
|
||||
|
||||
_llm_cache: dict = {}
|
||||
|
||||
|
||||
def llm_rewrite(
|
||||
source: str,
|
||||
rel_type: str,
|
||||
dest: str,
|
||||
user_id: str,
|
||||
client: httpx.Client,
|
||||
) -> Optional[str]:
|
||||
"""Ask minimax-m2 to convert the tuple to a natural sentence. Returns None for SKIP."""
|
||||
key = (source, rel_type, dest, user_id)
|
||||
if key in _llm_cache:
|
||||
return _llm_cache[key]
|
||||
|
||||
prompt = REWRITE_PROMPT.format(
|
||||
user_id=user_id,
|
||||
user_id_cap=user_id[:1].upper() + user_id[1:],
|
||||
source=source,
|
||||
rel=rel_type,
|
||||
dest=dest,
|
||||
)
|
||||
resp = client.post(
|
||||
f"{LLM_URL}/v1/chat/completions",
|
||||
headers={"Authorization": f"Bearer {LLM_KEY}"},
|
||||
json={
|
||||
"model": LLM_MODEL,
|
||||
"messages": [{"role": "user", "content": prompt}],
|
||||
"max_tokens": 400,
|
||||
"temperature": 0.2,
|
||||
},
|
||||
timeout=60.0,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
content = (data["choices"][0]["message"].get("content") or "").strip()
|
||||
# Strip wrapping quotes/whitespace
|
||||
content = content.strip(" \"'\n\r\t")
|
||||
if not content or content.upper().strip(" .!") == "SKIP":
|
||||
_llm_cache[key] = None
|
||||
return None
|
||||
_llm_cache[key] = content
|
||||
return content
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--user", help="Single user_id to migrate (default: all)")
|
||||
ap.add_argument("--limit", type=int, help="Max relationships to process")
|
||||
ap.add_argument("--dry-run", action="store_true", help="Print sentences only, no POSTs")
|
||||
ap.add_argument("--neo4j-uri", default=NEO4J_URI)
|
||||
ap.add_argument("--mem0-url", default=MEM0_URL)
|
||||
ap.add_argument(
|
||||
"--llm-rewrite",
|
||||
action="store_true",
|
||||
help="Use minimax-m2 (via veronica.pratikn.com) to rewrite tuples into natural English sentences",
|
||||
)
|
||||
ap.add_argument("--llm-url", default=LLM_URL)
|
||||
ap.add_argument("--llm-model", default=LLM_MODEL)
|
||||
args = ap.parse_args()
|
||||
|
||||
api_keys = load_api_keys()
|
||||
|
||||
# Build the cypher query
|
||||
where = "WHERE n.user_id IS NOT NULL AND m.user_id IS NOT NULL"
|
||||
params: dict = {}
|
||||
if args.user:
|
||||
where += " AND n.user_id = $uid"
|
||||
params["uid"] = args.user
|
||||
limit_clause = f"LIMIT {args.limit}" if args.limit else ""
|
||||
cypher = (
|
||||
f"MATCH (n)-[r]->(m) {where} "
|
||||
"RETURN n.name AS source, type(r) AS rel, m.name AS dest, n.user_id AS user_id "
|
||||
f"{limit_clause}"
|
||||
)
|
||||
|
||||
driver = GraphDatabase.driver(args.neo4j_uri, auth=(NEO4J_USER, NEO4J_PASS))
|
||||
with driver.session() as session:
|
||||
edges = list(session.run(cypher, **params))
|
||||
driver.close()
|
||||
|
||||
print(f"Found {len(edges)} relationships")
|
||||
|
||||
client = httpx.Client(timeout=180.0)
|
||||
|
||||
def make_sentence(e) -> Optional[str]:
|
||||
if args.llm_rewrite:
|
||||
return llm_rewrite(e["source"], e["rel"], e["dest"], e["user_id"], client)
|
||||
return build_sentence(e["source"], e["rel"], e["dest"], e["user_id"])
|
||||
|
||||
if args.dry_run:
|
||||
n_show = min(30, len(edges))
|
||||
print(f"\nSample sentences (first {n_show})"
|
||||
+ (" [LLM-rewritten]" if args.llm_rewrite else "") + ":")
|
||||
for e in edges[:n_show]:
|
||||
s = make_sentence(e)
|
||||
tup = f"({e['source']}, {e['rel']}, {e['dest']})"
|
||||
marker = "SKIP" if s is None else s
|
||||
print(f" [{e['user_id']}] {tup} -> {marker}")
|
||||
if len(edges) > n_show:
|
||||
print(f" ... and {len(edges) - n_show} more")
|
||||
# Show per-user breakdown
|
||||
by_user: dict = {}
|
||||
for e in edges:
|
||||
by_user[e["user_id"]] = by_user.get(e["user_id"], 0) + 1
|
||||
print(f"\nBy user: {by_user}")
|
||||
return 0
|
||||
|
||||
timestamp = int(time.time())
|
||||
stats = {
|
||||
"posted": 0,
|
||||
"skipped_no_key": 0,
|
||||
"skipped_llm": 0,
|
||||
"errors": 0,
|
||||
"extracted_total": 0,
|
||||
"no_facts": 0,
|
||||
}
|
||||
|
||||
for i, edge in enumerate(edges, 1):
|
||||
user_id = edge["user_id"]
|
||||
api_key = api_keys.get(user_id)
|
||||
if not api_key:
|
||||
stats["skipped_no_key"] += 1
|
||||
continue
|
||||
try:
|
||||
sentence = make_sentence(edge)
|
||||
except Exception as exc:
|
||||
stats["errors"] += 1
|
||||
print(f" [{i}/{len(edges)}] LLM REWRITE EXCEPTION: {exc}")
|
||||
continue
|
||||
if sentence is None:
|
||||
stats["skipped_llm"] += 1
|
||||
continue
|
||||
body = {
|
||||
"user_id": user_id,
|
||||
"messages": [{"role": "user", "content": sentence}],
|
||||
"metadata": {
|
||||
"source": "neo4j_legacy_import",
|
||||
"neo4j_rel_type": edge["rel"],
|
||||
"import_timestamp": timestamp,
|
||||
},
|
||||
}
|
||||
try:
|
||||
r = client.post(
|
||||
f"{args.mem0_url}/memories",
|
||||
headers={"X-API-Key": api_key},
|
||||
json=body,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
stats["errors"] += 1
|
||||
print(
|
||||
f" [{i}/{len(edges)}] {user_id} {sentence!r} HTTP {r.status_code} "
|
||||
f"{r.text[:120]}"
|
||||
)
|
||||
continue
|
||||
data = r.json()
|
||||
results = (data.get("added_memories") or [{}])[0].get("results", [])
|
||||
stats["extracted_total"] += len(results)
|
||||
if not results:
|
||||
stats["no_facts"] += 1
|
||||
stats["posted"] += 1
|
||||
if i % 5 == 0 or i == len(edges):
|
||||
print(
|
||||
f" [{i}/{len(edges)}] posted={stats['posted']} "
|
||||
f"extracted={stats['extracted_total']} "
|
||||
f"no_facts={stats['no_facts']} "
|
||||
f"llm_skipped={stats['skipped_llm']} "
|
||||
f"no_key={stats['skipped_no_key']} errors={stats['errors']}"
|
||||
)
|
||||
except Exception as exc:
|
||||
stats["errors"] += 1
|
||||
print(f" [{i}/{len(edges)}] EXCEPTION: {exc}")
|
||||
|
||||
print(f"\nDONE: {stats}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
|
@ -1,189 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Migrate a legacy mem0 v0.1.x/v1.x Qdrant collection to a v2-compatible one.
|
||||
|
||||
Why this script exists
|
||||
----------------------
|
||||
mem0 v2 stores a `bm25` sparse vector alongside each dense vector to enable
|
||||
hybrid search. Pre-v2 collections lack that slot — mem0's Qdrant adapter
|
||||
silently downgrades to semantic-only writes on them. To unlock BM25 you must
|
||||
recreate the collection with the sparse slot AND copy the existing points
|
||||
over (preserving id, vector, payload — no re-embed needed).
|
||||
|
||||
How it works
|
||||
------------
|
||||
1. Connect to Qdrant.
|
||||
2. Scroll all points from the source collection (with vectors + payload).
|
||||
3. Upsert them into the target collection in batches.
|
||||
4. Verify counts match per `user_id`.
|
||||
|
||||
The target collection MUST already exist with the BM25 slot. The recommended
|
||||
way to create it is to boot the v2 backend pointed at `QDRANT_COLLECTION_NAME=<target>`
|
||||
and trigger one `add()` call — mem0 lazy-creates the collection (and the sister
|
||||
`<target>_entities` collection) with the right schema.
|
||||
|
||||
Usage
|
||||
-----
|
||||
# Dry run (no writes):
|
||||
python scripts/migrate_qdrant_to_v3.py \\
|
||||
--source mem0 --target mem0_v3 \\
|
||||
--qdrant-host localhost --qdrant-port 6333 \\
|
||||
--dry-run
|
||||
|
||||
# Real migration:
|
||||
python scripts/migrate_qdrant_to_v3.py \\
|
||||
--source mem0 --target mem0_v3 \\
|
||||
--qdrant-host localhost --qdrant-port 6333
|
||||
|
||||
# From inside the backend container (where Qdrant resolves as `qdrant`):
|
||||
docker compose exec backend python /app/../scripts/migrate_qdrant_to_v3.py \\
|
||||
--source mem0 --target mem0_v3 --qdrant-host qdrant --qdrant-port 6333
|
||||
|
||||
Prereqs
|
||||
-------
|
||||
- qdrant-client>=1.12.0 installed
|
||||
- A fresh Qdrant snapshot of the source collection (see docs/MIGRATION_RUNBOOK.md)
|
||||
- The target collection created via a v2 backend warm-up add()
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from collections import Counter
|
||||
from typing import Optional
|
||||
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
|
||||
p.add_argument("--source", required=True, help="Source (legacy) collection name")
|
||||
p.add_argument("--target", required=True, help="Target (v2-created) collection name")
|
||||
p.add_argument("--qdrant-host", default="localhost")
|
||||
p.add_argument("--qdrant-port", type=int, default=6333)
|
||||
p.add_argument("--batch-size", type=int, default=256, help="Scroll/upsert batch size")
|
||||
p.add_argument("--dry-run", action="store_true", help="Read-only — show counts, no writes")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def collection_must_exist(client: QdrantClient, name: str) -> models.CollectionInfo:
|
||||
if not client.collection_exists(name):
|
||||
print(f"ERROR: collection {name!r} does not exist on Qdrant.", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
return client.get_collection(name)
|
||||
|
||||
|
||||
def verify_target_has_bm25(target_info: models.CollectionInfo) -> None:
|
||||
sparse = getattr(target_info.config.params, "sparse_vectors", None)
|
||||
if not sparse or "bm25" not in sparse:
|
||||
print(
|
||||
"ERROR: target collection has no `bm25` sparse-vector slot. Did you create "
|
||||
"it via a v2 backend warm-up add()? See docs/MIGRATION_RUNBOOK.md.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
def count_per_user(client: QdrantClient, collection: str) -> Counter:
|
||||
counts: Counter = Counter()
|
||||
offset: Optional[models.PointId] = None
|
||||
while True:
|
||||
points, offset = client.scroll(
|
||||
collection_name=collection,
|
||||
limit=1024,
|
||||
with_payload=["user_id"],
|
||||
with_vectors=False,
|
||||
offset=offset,
|
||||
)
|
||||
for p in points:
|
||||
uid = (p.payload or {}).get("user_id", "<none>")
|
||||
counts[uid] += 1
|
||||
if offset is None:
|
||||
break
|
||||
return counts
|
||||
|
||||
|
||||
def migrate(
|
||||
client: QdrantClient, source: str, target: str, batch_size: int, dry_run: bool
|
||||
) -> int:
|
||||
transferred = 0
|
||||
offset: Optional[models.PointId] = None
|
||||
while True:
|
||||
points, offset = client.scroll(
|
||||
collection_name=source,
|
||||
limit=batch_size,
|
||||
with_payload=True,
|
||||
with_vectors=True,
|
||||
offset=offset,
|
||||
)
|
||||
if not points:
|
||||
break
|
||||
|
||||
if not dry_run:
|
||||
client.upsert(
|
||||
collection_name=target,
|
||||
points=[
|
||||
models.PointStruct(id=p.id, vector=p.vector, payload=p.payload)
|
||||
for p in points
|
||||
],
|
||||
wait=True,
|
||||
)
|
||||
transferred += len(points)
|
||||
print(f" ... transferred {transferred} points")
|
||||
|
||||
if offset is None:
|
||||
break
|
||||
return transferred
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
client = QdrantClient(host=args.qdrant_host, port=args.qdrant_port)
|
||||
|
||||
src_info = collection_must_exist(client, args.source)
|
||||
tgt_info = collection_must_exist(client, args.target)
|
||||
verify_target_has_bm25(tgt_info)
|
||||
|
||||
src_count = client.count(args.source, exact=True).count
|
||||
tgt_count_before = client.count(args.target, exact=True).count
|
||||
print(f"Source {args.source!r}: {src_count} points")
|
||||
print(f"Target {args.target!r}: {tgt_count_before} points (before)")
|
||||
if tgt_count_before > 1:
|
||||
print(
|
||||
"WARNING: target collection is non-empty (>1 point). Migration will "
|
||||
"upsert into it; ids collide → existing points overwritten."
|
||||
)
|
||||
|
||||
print("\nPer-user count (source):")
|
||||
src_per_user = count_per_user(client, args.source)
|
||||
for uid, c in src_per_user.most_common():
|
||||
print(f" {uid}: {c}")
|
||||
|
||||
if args.dry_run:
|
||||
print("\nDRY RUN — no writes performed.")
|
||||
sys.exit(0)
|
||||
|
||||
print("\nMigrating points (preserving id + vector + payload, no re-embed)...")
|
||||
transferred = migrate(client, args.source, args.target, args.batch_size, dry_run=False)
|
||||
|
||||
tgt_count_after = client.count(args.target, exact=True).count
|
||||
print(f"\nDone. Transferred {transferred} points.")
|
||||
print(f"Target {args.target!r}: {tgt_count_after} points (after)")
|
||||
|
||||
print("\nPer-user count (target, after):")
|
||||
tgt_per_user = count_per_user(client, args.target)
|
||||
mismatches = 0
|
||||
for uid, src_c in src_per_user.most_common():
|
||||
tgt_c = tgt_per_user.get(uid, 0)
|
||||
marker = "OK" if tgt_c == src_c else f"MISMATCH ({tgt_c})"
|
||||
print(f" {uid}: src={src_c} tgt={tgt_c} [{marker}]")
|
||||
if tgt_c != src_c:
|
||||
mismatches += 1
|
||||
|
||||
if mismatches:
|
||||
print(f"\nERROR: {mismatches} user(s) have count mismatches. Investigate before swap.", file=sys.stderr)
|
||||
sys.exit(3)
|
||||
print("\nAll per-user counts match. Safe to proceed with collection swap.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,81 +0,0 @@
|
|||
#!/usr/bin/env bash
|
||||
#
|
||||
# Weekly Qdrant restore sanity check.
|
||||
#
|
||||
# Finds the most recent backup tarball for SOURCE_COLLECTION, restores it into
|
||||
# a transient collection, asserts the restored point count >= production, and
|
||||
# cleans up. Exits non-zero on any failure so cron alerting catches it.
|
||||
#
|
||||
# Env vars:
|
||||
# QDRANT_CONTAINER container name (default: mem0-qdrant)
|
||||
# BACKUP_DIR local backup root
|
||||
# (default: ~/aistuff/mem0/backups/qdrant)
|
||||
# SOURCE_COLLECTION collection to verify (default: mem0_v3)
|
||||
# TEST_COLLECTION transient collection name
|
||||
# (default: mem0_v3_restore_test)
|
||||
#
|
||||
# Suggested cron (weekly Sunday 04:00 UTC):
|
||||
# 0 4 * * 0 /home/ubuntu/aistuff/mem0/scripts/restore_test.sh >> /home/ubuntu/aistuff/mem0/backups/restore_test.log 2>&1
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
QDRANT_CONTAINER="${QDRANT_CONTAINER:-mem0-qdrant}"
|
||||
BACKUP_DIR="${BACKUP_DIR:-$HOME/aistuff/mem0/backups/qdrant}"
|
||||
SOURCE_COLLECTION="${SOURCE_COLLECTION:-mem0_v3}"
|
||||
TEST_COLLECTION="${TEST_COLLECTION:-mem0_v3_restore_test}"
|
||||
|
||||
log() { printf '[%s] %s\n' "$(date -u +%FT%TZ)" "$*"; }
|
||||
|
||||
# Pick the most recent backup for SOURCE_COLLECTION. The "_[0-9]*" suffix
|
||||
# anchors on the timestamp digit so a collection named "mem0_v3" does NOT
|
||||
# also match "mem0_v3_entities_*" files in the same directory.
|
||||
latest="$(find "$BACKUP_DIR" -type f -name "${SOURCE_COLLECTION}_[0-9]*" -printf '%T@ %p\n' 2>/dev/null \
|
||||
| sort -nr | head -1 | cut -d' ' -f2-)"
|
||||
if [ -z "$latest" ]; then
|
||||
log "ERROR: no backup found under $BACKUP_DIR matching ${SOURCE_COLLECTION}_[0-9]*"
|
||||
exit 1
|
||||
fi
|
||||
log "latest backup: $latest"
|
||||
|
||||
prod_count=$(docker exec "$QDRANT_CONTAINER" curl -fsS -X POST \
|
||||
"http://localhost:6333/collections/$SOURCE_COLLECTION/points/count" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"exact":true}' \
|
||||
| python3 -c 'import sys,json; print(json.load(sys.stdin)["result"]["count"])')
|
||||
log "production count ($SOURCE_COLLECTION): $prod_count"
|
||||
|
||||
# Drop any leftover test collection from a previous failed run.
|
||||
docker exec "$QDRANT_CONTAINER" curl -fsS -X DELETE \
|
||||
"http://localhost:6333/collections/$TEST_COLLECTION" >/dev/null 2>&1 || true
|
||||
|
||||
snap_basename="$(basename "$latest")"
|
||||
log "copying snapshot into container: /tmp/$snap_basename"
|
||||
docker cp "$latest" "$QDRANT_CONTAINER:/tmp/$snap_basename"
|
||||
|
||||
# Use the upload endpoint (multipart) rather than recover-from-URL —
|
||||
# file:// recovery is disabled by default in recent Qdrant (returns 403),
|
||||
# and upload doesn't need an allowlist.
|
||||
log "restoring into $TEST_COLLECTION via /snapshots/upload"
|
||||
docker exec "$QDRANT_CONTAINER" curl -fsS -X POST \
|
||||
"http://localhost:6333/collections/$TEST_COLLECTION/snapshots/upload?priority=snapshot" \
|
||||
-H "Content-Type: multipart/form-data" \
|
||||
-F "snapshot=@/tmp/$snap_basename" \
|
||||
>/dev/null
|
||||
|
||||
restored_count=$(docker exec "$QDRANT_CONTAINER" curl -fsS -X POST \
|
||||
"http://localhost:6333/collections/$TEST_COLLECTION/points/count" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"exact":true}' \
|
||||
| python3 -c 'import sys,json; print(json.load(sys.stdin)["result"]["count"])')
|
||||
log "restored count: $restored_count"
|
||||
|
||||
# Cleanup whether or not the assertion passes.
|
||||
docker exec "$QDRANT_CONTAINER" curl -fsS -X DELETE \
|
||||
"http://localhost:6333/collections/$TEST_COLLECTION" >/dev/null
|
||||
docker exec "$QDRANT_CONTAINER" rm -f "/tmp/$snap_basename"
|
||||
|
||||
if [ "$restored_count" -lt "$prod_count" ]; then
|
||||
log "FAIL: restored=$restored_count < production=$prod_count"
|
||||
exit 1
|
||||
fi
|
||||
log "OK: restored=$restored_count >= production=$prod_count"
|
||||
2
setup.sh
2
setup.sh
|
|
@ -3,7 +3,7 @@
|
|||
set -e
|
||||
|
||||
COMPOSE_PROJECT_NAME="${COMPOSE_PROJECT_NAME:-mem0}"
|
||||
VOLUMES=("qdrant_data")
|
||||
VOLUMES=("qdrant_data" "neo4j_data" "neo4j_logs" "neo4j_import" "neo4j_plugins")
|
||||
|
||||
echo "=========================================="
|
||||
echo " Mem0 Setup Script"
|
||||
|
|
|
|||
|
|
@ -1,102 +0,0 @@
|
|||
"""End-to-end compatibility test: drive the real mem0ai MemoryClient against
|
||||
this server. Run INSIDE the backend container (which has mem0ai==2.0.2):
|
||||
|
||||
ssh beast 'cd ~/aistuff/mem0 && docker compose exec -T backend python' < test_sdk_compat.py
|
||||
|
||||
Requires MEM0_API_KEY in the environment (mapped to user 'pratik'). Exercises the
|
||||
full SDK surface and cleans up the memories it creates (scoped to a throwaway
|
||||
agent_id so it never touches real memories).
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
from mem0 import MemoryClient
|
||||
|
||||
KEY = os.environ.get("MEM0_API_KEY")
|
||||
if not KEY:
|
||||
sys.exit("set MEM0_API_KEY (mapped to user 'pratik')")
|
||||
|
||||
HOST = os.environ.get("MEM0_HOST", "https://memory.pratikn.com")
|
||||
USER = os.environ.get("MEM0_TEST_USER", "pratik")
|
||||
AGENT = "sdk_compat_test" # isolates test data from real memories
|
||||
|
||||
results = []
|
||||
|
||||
|
||||
def check(name, cond, info=""):
|
||||
results.append(bool(cond))
|
||||
print(("PASS " if cond else "FAIL "), name, "--", str(info)[:240])
|
||||
|
||||
|
||||
# --- construct (hits GET /v1/ping/, validates Token auth + org/project) ---
|
||||
try:
|
||||
c = MemoryClient(host=HOST, api_key=KEY)
|
||||
check("construct", True, f"user_email={c.user_email} org={c.org_id} proj={c.project_id}")
|
||||
except Exception as e:
|
||||
check("construct", False, repr(e))
|
||||
print("\nRESULT: RED (cannot construct client)")
|
||||
sys.exit(1)
|
||||
|
||||
# --- add (POST /v3/memories/add/) ---
|
||||
probe = f"SDK compat probe {int(time.time())}: Pratik is validating the mem0 SDK compatibility layer and uses FastAPI."
|
||||
try:
|
||||
r = c.add(probe, user_id=USER, agent_id=AGENT)
|
||||
check("add", isinstance(r, dict), r)
|
||||
except Exception as e:
|
||||
check("add", False, repr(e))
|
||||
|
||||
time.sleep(3) # allow async extraction/indexing to settle
|
||||
|
||||
# --- search (POST /v3/memories/search/) ---
|
||||
try:
|
||||
s = c.search("mem0 SDK compatibility layer", filters={"user_id": USER, "agent_id": AGENT})
|
||||
check("search.shape", isinstance(s, dict) and "results" in s, s)
|
||||
except Exception as e:
|
||||
check("search.shape", False, repr(e))
|
||||
|
||||
# --- get_all (POST /v3/memories/) ---
|
||||
ids = []
|
||||
try:
|
||||
g = c.get_all(filters={"user_id": USER, "agent_id": AGENT})
|
||||
ok = isinstance(g, dict) and "results" in g and "count" in g
|
||||
check("get_all.shape", ok, g)
|
||||
ids = [m.get("id") for m in (g.get("results") or []) if m.get("id")]
|
||||
except Exception as e:
|
||||
check("get_all.shape", False, repr(e))
|
||||
|
||||
mid = ids[0] if ids else None
|
||||
print(f" (created {len(ids)} memory id(s) under agent={AGENT})")
|
||||
|
||||
# --- item ops (best-effort; depend on extraction producing >=1 fact) ---
|
||||
if mid:
|
||||
try:
|
||||
one = c.get(mid)
|
||||
check("get", isinstance(one, dict) and one.get("id") == mid, one)
|
||||
except Exception as e:
|
||||
check("get", False, repr(e))
|
||||
try:
|
||||
h = c.history(mid)
|
||||
check("history.is_list", isinstance(h, list), h)
|
||||
except Exception as e:
|
||||
check("history.is_list", False, repr(e))
|
||||
try:
|
||||
u = c.update(mid, text="SDK compat probe (updated)")
|
||||
check("update", isinstance(u, dict), u)
|
||||
except Exception as e:
|
||||
check("update", False, repr(e))
|
||||
|
||||
# --- cleanup: delete only the ids we created ---
|
||||
deleted = 0
|
||||
for i in ids:
|
||||
try:
|
||||
c.delete(i)
|
||||
deleted += 1
|
||||
except Exception as e:
|
||||
print(" delete error", i, repr(e))
|
||||
print(f" cleanup: deleted {deleted}/{len(ids)}")
|
||||
|
||||
green = results and all(results)
|
||||
print("\nRESULT:", "GREEN" if green else "RED", f"({sum(results)}/{len(results)} checks passed)")
|
||||
sys.exit(0 if green else 1)
|
||||
Loading…
Reference in a new issue