knowledge-base/scripts/import_neo4j_to_mem0.py
Pratik Narola 3a10b72051 scripts: Neo4j → mem0 v2 graph relationship import
Two helpers built during the beast deployment to migrate the legacy
Neo4j knowledge graph (decommissioned in the v3 cutover) into mem0 v2
as natural-language memories.

scripts/import_neo4j_to_mem0.py
  - Connects to Neo4j via Bolt, iterates per-user relationships,
    POSTs each as a /memories request.
  - Two modes:
      raw:           "humanize(src) verb humanize(dest)." (snake_case → spaces)
      --llm-rewrite: minimax-m2 via OpenAI-compat proxy rewrites each
                     tuple into a grammatical English sentence; the LLM
                     may also output SKIP for non-meaningful tuples
                     (postal codes, timezone offsets, self-refs).
  - Tags every imported memory with metadata.source="neo4j_legacy_import"
    plus neo4j_rel_type + import_timestamp for traceability/cleanup.
  - Caches LLM rewrites by (source, rel, dest, user_id).

scripts/cleanup_neo4j_imports.py
  - Finds and DELETEs all memories with source="neo4j_legacy_import"
    for given users, via the /memories DELETE endpoint (per-user API
    key, so the deletes go through mem0's normal auth + cleanup path).

Run on beast (2026-05-23): 2007 Neo4j edges → 615 net new memories in
mem0_v3 (30.6% yield after LLM SKIPs + mem0 fact-extraction dedup).
mem0 v3's fact extractor correctly deduplicated edges that restated
facts already in vector memory (e.g., manju's 9 existing memories
absorbed all 17 of her Neo4j edges).
2026-05-23 18:27:00 +05:30

273 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""Import Neo4j graph relationships into mem0 v2 as natural-language memories.
For each `(source)-[rel_type]->(destination)` edge in the Neo4j graph, POST a
short sentence ("source verb destination") to mem0's `/memories` endpoint.
mem0's v3 fact-extraction + entity-linking pipeline handles the rest.
Tagged with metadata.source="neo4j_legacy_import" so the imports can be
identified (or bulk-deleted) later via the standard /memories endpoints.
Usage (inside the mem0-backend container, which is on the same docker network
as neo4j-temp and the mem0 backend itself):
# Dry-run — show sample sentences without posting:
docker compose exec backend python /tmp/import.py --dry-run --limit 30
# Single-user POC:
docker compose exec backend python /tmp/import.py --user akshat
# Full sweep:
docker compose exec backend python /tmp/import.py
Environment variables (all have sensible defaults for the beast deployment):
NEO4J_URI bolt://neo4j-temp:7687
NEO4J_USER neo4j
NEO4J_PASS mem0_neo4j_password
MEM0_URL http://localhost:8000
API_KEYS JSON mapping {api_key: user_id} (already in container env)
"""
import argparse
import json
import os
import sys
import time
from typing import Optional
import httpx
from neo4j import GraphDatabase
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://neo4j-temp:7687")
NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j")
NEO4J_PASS = os.environ.get("NEO4J_PASS", "mem0_neo4j_password")
MEM0_URL = os.environ.get("MEM0_URL", "http://localhost:8000")
# LiteLLM proxy for sentence rewriting (--llm-rewrite mode)
LLM_URL = os.environ.get("LLM_URL", os.environ.get("OPENAI_BASE_URL", "https://veronica.pratikn.com"))
LLM_KEY = os.environ.get("LLM_KEY", os.environ.get("OPENAI_API_KEY", ""))
LLM_MODEL = os.environ.get("LLM_MODEL", "minimax-m2")
REWRITE_PROMPT = """You are converting a knowledge-graph relationship into a natural English sentence for a personal memory system.
The user we're building memories for is identified by user_id="{user_id}". The graph may reference them by variant names (full name, "user_id:_{user_id}", nicknames). Treat all of those as the same person — in your output sentence, use the name "{user_id}" (capitalized appropriately) so the memory layer can link it consistently.
Convert this tuple into ONE natural English sentence:
source: {source}
relationship: {rel}
destination: {dest}
Rules:
- Output ONLY the sentence. No quotes, no preamble, no explanation, no markdown.
- Use proper capitalization, grammar, and convert snake_case names to natural words ("custom_headers""custom headers").
- If the source IS the user, write a third-person sentence starting with "{user_id_cap}" (e.g., "{user_id_cap} works at TechCorp.").
- If the source is a non-user entity, write a third-person sentence about that entity (e.g., "Dr Seta works at Chitra.").
- If the tuple doesn't translate to a meaningful, retrievable fact (pure identifier metadata like a postal code, generic world knowledge like a timezone offset, or self-references), respond with exactly: SKIP"""
def load_api_keys() -> dict:
raw = os.environ.get("API_KEYS", "{}")
keys = json.loads(raw)
inv: dict = {}
for k, u in keys.items():
inv.setdefault(u, k)
return inv
def humanize(name: str, user_id: Optional[str] = None) -> str:
"""snake_case node name → human-readable. __User__ nodes collapse to the user_id."""
if not name:
return "<unknown>"
# The __User__ label uses name = "user_id:_<username>"
if name.startswith("user_id:_") and user_id:
return user_id
return name.replace("_", " ")
def humanize_rel(rel_type: str) -> str:
return rel_type.replace("_", " ")
def build_sentence(source: str, rel_type: str, dest: str, user_id: str) -> str:
return f"{humanize(source, user_id)} {humanize_rel(rel_type)} {humanize(dest, user_id)}."
_llm_cache: dict = {}
def llm_rewrite(
source: str,
rel_type: str,
dest: str,
user_id: str,
client: httpx.Client,
) -> Optional[str]:
"""Ask minimax-m2 to convert the tuple to a natural sentence. Returns None for SKIP."""
key = (source, rel_type, dest, user_id)
if key in _llm_cache:
return _llm_cache[key]
prompt = REWRITE_PROMPT.format(
user_id=user_id,
user_id_cap=user_id[:1].upper() + user_id[1:],
source=source,
rel=rel_type,
dest=dest,
)
resp = client.post(
f"{LLM_URL}/v1/chat/completions",
headers={"Authorization": f"Bearer {LLM_KEY}"},
json={
"model": LLM_MODEL,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 400,
"temperature": 0.2,
},
timeout=60.0,
)
resp.raise_for_status()
data = resp.json()
content = (data["choices"][0]["message"].get("content") or "").strip()
# Strip wrapping quotes/whitespace
content = content.strip(" \"'\n\r\t")
if not content or content.upper().strip(" .!") == "SKIP":
_llm_cache[key] = None
return None
_llm_cache[key] = content
return content
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--user", help="Single user_id to migrate (default: all)")
ap.add_argument("--limit", type=int, help="Max relationships to process")
ap.add_argument("--dry-run", action="store_true", help="Print sentences only, no POSTs")
ap.add_argument("--neo4j-uri", default=NEO4J_URI)
ap.add_argument("--mem0-url", default=MEM0_URL)
ap.add_argument(
"--llm-rewrite",
action="store_true",
help="Use minimax-m2 (via veronica.pratikn.com) to rewrite tuples into natural English sentences",
)
ap.add_argument("--llm-url", default=LLM_URL)
ap.add_argument("--llm-model", default=LLM_MODEL)
args = ap.parse_args()
api_keys = load_api_keys()
# Build the cypher query
where = "WHERE n.user_id IS NOT NULL AND m.user_id IS NOT NULL"
params: dict = {}
if args.user:
where += " AND n.user_id = $uid"
params["uid"] = args.user
limit_clause = f"LIMIT {args.limit}" if args.limit else ""
cypher = (
f"MATCH (n)-[r]->(m) {where} "
"RETURN n.name AS source, type(r) AS rel, m.name AS dest, n.user_id AS user_id "
f"{limit_clause}"
)
driver = GraphDatabase.driver(args.neo4j_uri, auth=(NEO4J_USER, NEO4J_PASS))
with driver.session() as session:
edges = list(session.run(cypher, **params))
driver.close()
print(f"Found {len(edges)} relationships")
client = httpx.Client(timeout=180.0)
def make_sentence(e) -> Optional[str]:
if args.llm_rewrite:
return llm_rewrite(e["source"], e["rel"], e["dest"], e["user_id"], client)
return build_sentence(e["source"], e["rel"], e["dest"], e["user_id"])
if args.dry_run:
n_show = min(30, len(edges))
print(f"\nSample sentences (first {n_show})"
+ (" [LLM-rewritten]" if args.llm_rewrite else "") + ":")
for e in edges[:n_show]:
s = make_sentence(e)
tup = f"({e['source']}, {e['rel']}, {e['dest']})"
marker = "SKIP" if s is None else s
print(f" [{e['user_id']}] {tup} -> {marker}")
if len(edges) > n_show:
print(f" ... and {len(edges) - n_show} more")
# Show per-user breakdown
by_user: dict = {}
for e in edges:
by_user[e["user_id"]] = by_user.get(e["user_id"], 0) + 1
print(f"\nBy user: {by_user}")
return 0
timestamp = int(time.time())
stats = {
"posted": 0,
"skipped_no_key": 0,
"skipped_llm": 0,
"errors": 0,
"extracted_total": 0,
"no_facts": 0,
}
for i, edge in enumerate(edges, 1):
user_id = edge["user_id"]
api_key = api_keys.get(user_id)
if not api_key:
stats["skipped_no_key"] += 1
continue
try:
sentence = make_sentence(edge)
except Exception as exc:
stats["errors"] += 1
print(f" [{i}/{len(edges)}] LLM REWRITE EXCEPTION: {exc}")
continue
if sentence is None:
stats["skipped_llm"] += 1
continue
body = {
"user_id": user_id,
"messages": [{"role": "user", "content": sentence}],
"metadata": {
"source": "neo4j_legacy_import",
"neo4j_rel_type": edge["rel"],
"import_timestamp": timestamp,
},
}
try:
r = client.post(
f"{args.mem0_url}/memories",
headers={"X-API-Key": api_key},
json=body,
)
if r.status_code != 200:
stats["errors"] += 1
print(
f" [{i}/{len(edges)}] {user_id} {sentence!r} HTTP {r.status_code} "
f"{r.text[:120]}"
)
continue
data = r.json()
results = (data.get("added_memories") or [{}])[0].get("results", [])
stats["extracted_total"] += len(results)
if not results:
stats["no_facts"] += 1
stats["posted"] += 1
if i % 5 == 0 or i == len(edges):
print(
f" [{i}/{len(edges)}] posted={stats['posted']} "
f"extracted={stats['extracted_total']} "
f"no_facts={stats['no_facts']} "
f"llm_skipped={stats['skipped_llm']} "
f"no_key={stats['skipped_no_key']} errors={stats['errors']}"
)
except Exception as exc:
stats["errors"] += 1
print(f" [{i}/{len(edges)}] EXCEPTION: {exc}")
print(f"\nDONE: {stats}")
return 0
if __name__ == "__main__":
sys.exit(main())