scripts: Neo4j → mem0 v2 graph relationship import
Two helpers built during the beast deployment to migrate the legacy
Neo4j knowledge graph (decommissioned in the v3 cutover) into mem0 v2
as natural-language memories.
scripts/import_neo4j_to_mem0.py
- Connects to Neo4j via Bolt, iterates per-user relationships,
POSTs each as a /memories request.
- Two modes:
raw: "humanize(src) verb humanize(dest)." (snake_case → spaces)
--llm-rewrite: minimax-m2 via OpenAI-compat proxy rewrites each
tuple into a grammatical English sentence; the LLM
may also output SKIP for non-meaningful tuples
(postal codes, timezone offsets, self-refs).
- Tags every imported memory with metadata.source="neo4j_legacy_import"
plus neo4j_rel_type + import_timestamp for traceability/cleanup.
- Caches LLM rewrites by (source, rel, dest, user_id).
scripts/cleanup_neo4j_imports.py
- Finds and DELETEs all memories with source="neo4j_legacy_import"
for given users, via the /memories DELETE endpoint (per-user API
key, so the deletes go through mem0's normal auth + cleanup path).
Run on beast (2026-05-23): 2007 Neo4j edges → 615 net new memories in
mem0_v3 (30.6% yield after LLM SKIPs + mem0 fact-extraction dedup).
mem0 v3's fact extractor correctly deduplicated edges that restated
facts already in vector memory (e.g., manju's 9 existing memories
absorbed all 17 of her Neo4j edges).
This commit is contained in:
parent
3385b8397b
commit
3a10b72051
2 changed files with 373 additions and 0 deletions
100
scripts/cleanup_neo4j_imports.py
Normal file
100
scripts/cleanup_neo4j_imports.py
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Delete memories that were imported from Neo4j (tagged metadata.source='neo4j_legacy_import').
|
||||
|
||||
Usage:
|
||||
docker compose exec backend python /tmp/cleanup.py --user alice
|
||||
docker compose exec backend python /tmp/cleanup.py --users alice,hetashree,manju
|
||||
docker compose exec backend python /tmp/cleanup.py --dry-run --users alice
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
import httpx
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models
|
||||
|
||||
QDRANT_HOST = os.environ.get("QDRANT_HOST", "qdrant")
|
||||
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
|
||||
QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION_NAME", "mem0_v3")
|
||||
MEM0_URL = os.environ.get("MEM0_URL", "http://localhost:8000")
|
||||
|
||||
|
||||
def load_api_keys() -> dict:
|
||||
raw = os.environ.get("API_KEYS", "{}")
|
||||
keys = json.loads(raw)
|
||||
inv: dict = {}
|
||||
for k, u in keys.items():
|
||||
inv.setdefault(u, k)
|
||||
return inv
|
||||
|
||||
|
||||
def scroll_legacy_for(client: QdrantClient, user_id: str) -> list:
|
||||
"""Return all memory IDs in mem0_v3 with source='neo4j_legacy_import' for the user."""
|
||||
ids = []
|
||||
offset = None
|
||||
while True:
|
||||
points, offset = client.scroll(
|
||||
collection_name=QDRANT_COLLECTION,
|
||||
scroll_filter=models.Filter(
|
||||
must=[
|
||||
models.FieldCondition(key="user_id", match=models.MatchValue(value=user_id)),
|
||||
models.FieldCondition(key="source", match=models.MatchValue(value="neo4j_legacy_import")),
|
||||
]
|
||||
),
|
||||
limit=128,
|
||||
with_payload=False,
|
||||
with_vectors=False,
|
||||
offset=offset,
|
||||
)
|
||||
ids.extend(p.id for p in points)
|
||||
if offset is None:
|
||||
break
|
||||
return ids
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--user", help="Single user to clean")
|
||||
ap.add_argument("--users", help="Comma-separated user list")
|
||||
ap.add_argument("--dry-run", action="store_true")
|
||||
args = ap.parse_args()
|
||||
|
||||
targets = []
|
||||
if args.user:
|
||||
targets.append(args.user)
|
||||
if args.users:
|
||||
targets.extend(u.strip() for u in args.users.split(","))
|
||||
if not targets:
|
||||
print("Usage: --user X or --users a,b,c", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
api_keys = load_api_keys()
|
||||
client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
|
||||
http = httpx.Client(timeout=60.0)
|
||||
|
||||
for user_id in targets:
|
||||
ids = scroll_legacy_for(client, user_id)
|
||||
print(f"{user_id}: found {len(ids)} legacy-import memories")
|
||||
if args.dry_run:
|
||||
continue
|
||||
key = api_keys.get(user_id)
|
||||
if not key:
|
||||
print(f" no API key for {user_id} — skipping")
|
||||
continue
|
||||
deleted = errors = 0
|
||||
for mid in ids:
|
||||
r = http.delete(f"{MEM0_URL}/memories/{mid}", headers={"X-API-Key": key})
|
||||
if r.status_code == 200:
|
||||
deleted += 1
|
||||
else:
|
||||
errors += 1
|
||||
print(f" failed {mid}: HTTP {r.status_code} {r.text[:120]}")
|
||||
print(f" deleted={deleted} errors={errors}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
273
scripts/import_neo4j_to_mem0.py
Normal file
273
scripts/import_neo4j_to_mem0.py
Normal file
|
|
@ -0,0 +1,273 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Import Neo4j graph relationships into mem0 v2 as natural-language memories.
|
||||
|
||||
For each `(source)-[rel_type]->(destination)` edge in the Neo4j graph, POST a
|
||||
short sentence ("source verb destination") to mem0's `/memories` endpoint.
|
||||
mem0's v3 fact-extraction + entity-linking pipeline handles the rest.
|
||||
|
||||
Tagged with metadata.source="neo4j_legacy_import" so the imports can be
|
||||
identified (or bulk-deleted) later via the standard /memories endpoints.
|
||||
|
||||
Usage (inside the mem0-backend container, which is on the same docker network
|
||||
as neo4j-temp and the mem0 backend itself):
|
||||
|
||||
# Dry-run — show sample sentences without posting:
|
||||
docker compose exec backend python /tmp/import.py --dry-run --limit 30
|
||||
|
||||
# Single-user POC:
|
||||
docker compose exec backend python /tmp/import.py --user akshat
|
||||
|
||||
# Full sweep:
|
||||
docker compose exec backend python /tmp/import.py
|
||||
|
||||
Environment variables (all have sensible defaults for the beast deployment):
|
||||
NEO4J_URI bolt://neo4j-temp:7687
|
||||
NEO4J_USER neo4j
|
||||
NEO4J_PASS mem0_neo4j_password
|
||||
MEM0_URL http://localhost:8000
|
||||
API_KEYS JSON mapping {api_key: user_id} (already in container env)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
from neo4j import GraphDatabase
|
||||
|
||||
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://neo4j-temp:7687")
|
||||
NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j")
|
||||
NEO4J_PASS = os.environ.get("NEO4J_PASS", "mem0_neo4j_password")
|
||||
MEM0_URL = os.environ.get("MEM0_URL", "http://localhost:8000")
|
||||
|
||||
# LiteLLM proxy for sentence rewriting (--llm-rewrite mode)
|
||||
LLM_URL = os.environ.get("LLM_URL", os.environ.get("OPENAI_BASE_URL", "https://veronica.pratikn.com"))
|
||||
LLM_KEY = os.environ.get("LLM_KEY", os.environ.get("OPENAI_API_KEY", ""))
|
||||
LLM_MODEL = os.environ.get("LLM_MODEL", "minimax-m2")
|
||||
|
||||
REWRITE_PROMPT = """You are converting a knowledge-graph relationship into a natural English sentence for a personal memory system.
|
||||
|
||||
The user we're building memories for is identified by user_id="{user_id}". The graph may reference them by variant names (full name, "user_id:_{user_id}", nicknames). Treat all of those as the same person — in your output sentence, use the name "{user_id}" (capitalized appropriately) so the memory layer can link it consistently.
|
||||
|
||||
Convert this tuple into ONE natural English sentence:
|
||||
source: {source}
|
||||
relationship: {rel}
|
||||
destination: {dest}
|
||||
|
||||
Rules:
|
||||
- Output ONLY the sentence. No quotes, no preamble, no explanation, no markdown.
|
||||
- Use proper capitalization, grammar, and convert snake_case names to natural words ("custom_headers" → "custom headers").
|
||||
- If the source IS the user, write a third-person sentence starting with "{user_id_cap}" (e.g., "{user_id_cap} works at TechCorp.").
|
||||
- If the source is a non-user entity, write a third-person sentence about that entity (e.g., "Dr Seta works at Chitra.").
|
||||
- If the tuple doesn't translate to a meaningful, retrievable fact (pure identifier metadata like a postal code, generic world knowledge like a timezone offset, or self-references), respond with exactly: SKIP"""
|
||||
|
||||
|
||||
def load_api_keys() -> dict:
|
||||
raw = os.environ.get("API_KEYS", "{}")
|
||||
keys = json.loads(raw)
|
||||
inv: dict = {}
|
||||
for k, u in keys.items():
|
||||
inv.setdefault(u, k)
|
||||
return inv
|
||||
|
||||
|
||||
def humanize(name: str, user_id: Optional[str] = None) -> str:
|
||||
"""snake_case node name → human-readable. __User__ nodes collapse to the user_id."""
|
||||
if not name:
|
||||
return "<unknown>"
|
||||
# The __User__ label uses name = "user_id:_<username>"
|
||||
if name.startswith("user_id:_") and user_id:
|
||||
return user_id
|
||||
return name.replace("_", " ")
|
||||
|
||||
|
||||
def humanize_rel(rel_type: str) -> str:
|
||||
return rel_type.replace("_", " ")
|
||||
|
||||
|
||||
def build_sentence(source: str, rel_type: str, dest: str, user_id: str) -> str:
|
||||
return f"{humanize(source, user_id)} {humanize_rel(rel_type)} {humanize(dest, user_id)}."
|
||||
|
||||
|
||||
_llm_cache: dict = {}
|
||||
|
||||
|
||||
def llm_rewrite(
|
||||
source: str,
|
||||
rel_type: str,
|
||||
dest: str,
|
||||
user_id: str,
|
||||
client: httpx.Client,
|
||||
) -> Optional[str]:
|
||||
"""Ask minimax-m2 to convert the tuple to a natural sentence. Returns None for SKIP."""
|
||||
key = (source, rel_type, dest, user_id)
|
||||
if key in _llm_cache:
|
||||
return _llm_cache[key]
|
||||
|
||||
prompt = REWRITE_PROMPT.format(
|
||||
user_id=user_id,
|
||||
user_id_cap=user_id[:1].upper() + user_id[1:],
|
||||
source=source,
|
||||
rel=rel_type,
|
||||
dest=dest,
|
||||
)
|
||||
resp = client.post(
|
||||
f"{LLM_URL}/v1/chat/completions",
|
||||
headers={"Authorization": f"Bearer {LLM_KEY}"},
|
||||
json={
|
||||
"model": LLM_MODEL,
|
||||
"messages": [{"role": "user", "content": prompt}],
|
||||
"max_tokens": 400,
|
||||
"temperature": 0.2,
|
||||
},
|
||||
timeout=60.0,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
content = (data["choices"][0]["message"].get("content") or "").strip()
|
||||
# Strip wrapping quotes/whitespace
|
||||
content = content.strip(" \"'\n\r\t")
|
||||
if not content or content.upper().strip(" .!") == "SKIP":
|
||||
_llm_cache[key] = None
|
||||
return None
|
||||
_llm_cache[key] = content
|
||||
return content
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--user", help="Single user_id to migrate (default: all)")
|
||||
ap.add_argument("--limit", type=int, help="Max relationships to process")
|
||||
ap.add_argument("--dry-run", action="store_true", help="Print sentences only, no POSTs")
|
||||
ap.add_argument("--neo4j-uri", default=NEO4J_URI)
|
||||
ap.add_argument("--mem0-url", default=MEM0_URL)
|
||||
ap.add_argument(
|
||||
"--llm-rewrite",
|
||||
action="store_true",
|
||||
help="Use minimax-m2 (via veronica.pratikn.com) to rewrite tuples into natural English sentences",
|
||||
)
|
||||
ap.add_argument("--llm-url", default=LLM_URL)
|
||||
ap.add_argument("--llm-model", default=LLM_MODEL)
|
||||
args = ap.parse_args()
|
||||
|
||||
api_keys = load_api_keys()
|
||||
|
||||
# Build the cypher query
|
||||
where = "WHERE n.user_id IS NOT NULL AND m.user_id IS NOT NULL"
|
||||
params: dict = {}
|
||||
if args.user:
|
||||
where += " AND n.user_id = $uid"
|
||||
params["uid"] = args.user
|
||||
limit_clause = f"LIMIT {args.limit}" if args.limit else ""
|
||||
cypher = (
|
||||
f"MATCH (n)-[r]->(m) {where} "
|
||||
"RETURN n.name AS source, type(r) AS rel, m.name AS dest, n.user_id AS user_id "
|
||||
f"{limit_clause}"
|
||||
)
|
||||
|
||||
driver = GraphDatabase.driver(args.neo4j_uri, auth=(NEO4J_USER, NEO4J_PASS))
|
||||
with driver.session() as session:
|
||||
edges = list(session.run(cypher, **params))
|
||||
driver.close()
|
||||
|
||||
print(f"Found {len(edges)} relationships")
|
||||
|
||||
client = httpx.Client(timeout=180.0)
|
||||
|
||||
def make_sentence(e) -> Optional[str]:
|
||||
if args.llm_rewrite:
|
||||
return llm_rewrite(e["source"], e["rel"], e["dest"], e["user_id"], client)
|
||||
return build_sentence(e["source"], e["rel"], e["dest"], e["user_id"])
|
||||
|
||||
if args.dry_run:
|
||||
n_show = min(30, len(edges))
|
||||
print(f"\nSample sentences (first {n_show})"
|
||||
+ (" [LLM-rewritten]" if args.llm_rewrite else "") + ":")
|
||||
for e in edges[:n_show]:
|
||||
s = make_sentence(e)
|
||||
tup = f"({e['source']}, {e['rel']}, {e['dest']})"
|
||||
marker = "SKIP" if s is None else s
|
||||
print(f" [{e['user_id']}] {tup} -> {marker}")
|
||||
if len(edges) > n_show:
|
||||
print(f" ... and {len(edges) - n_show} more")
|
||||
# Show per-user breakdown
|
||||
by_user: dict = {}
|
||||
for e in edges:
|
||||
by_user[e["user_id"]] = by_user.get(e["user_id"], 0) + 1
|
||||
print(f"\nBy user: {by_user}")
|
||||
return 0
|
||||
|
||||
timestamp = int(time.time())
|
||||
stats = {
|
||||
"posted": 0,
|
||||
"skipped_no_key": 0,
|
||||
"skipped_llm": 0,
|
||||
"errors": 0,
|
||||
"extracted_total": 0,
|
||||
"no_facts": 0,
|
||||
}
|
||||
|
||||
for i, edge in enumerate(edges, 1):
|
||||
user_id = edge["user_id"]
|
||||
api_key = api_keys.get(user_id)
|
||||
if not api_key:
|
||||
stats["skipped_no_key"] += 1
|
||||
continue
|
||||
try:
|
||||
sentence = make_sentence(edge)
|
||||
except Exception as exc:
|
||||
stats["errors"] += 1
|
||||
print(f" [{i}/{len(edges)}] LLM REWRITE EXCEPTION: {exc}")
|
||||
continue
|
||||
if sentence is None:
|
||||
stats["skipped_llm"] += 1
|
||||
continue
|
||||
body = {
|
||||
"user_id": user_id,
|
||||
"messages": [{"role": "user", "content": sentence}],
|
||||
"metadata": {
|
||||
"source": "neo4j_legacy_import",
|
||||
"neo4j_rel_type": edge["rel"],
|
||||
"import_timestamp": timestamp,
|
||||
},
|
||||
}
|
||||
try:
|
||||
r = client.post(
|
||||
f"{args.mem0_url}/memories",
|
||||
headers={"X-API-Key": api_key},
|
||||
json=body,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
stats["errors"] += 1
|
||||
print(
|
||||
f" [{i}/{len(edges)}] {user_id} {sentence!r} HTTP {r.status_code} "
|
||||
f"{r.text[:120]}"
|
||||
)
|
||||
continue
|
||||
data = r.json()
|
||||
results = (data.get("added_memories") or [{}])[0].get("results", [])
|
||||
stats["extracted_total"] += len(results)
|
||||
if not results:
|
||||
stats["no_facts"] += 1
|
||||
stats["posted"] += 1
|
||||
if i % 5 == 0 or i == len(edges):
|
||||
print(
|
||||
f" [{i}/{len(edges)}] posted={stats['posted']} "
|
||||
f"extracted={stats['extracted_total']} "
|
||||
f"no_facts={stats['no_facts']} "
|
||||
f"llm_skipped={stats['skipped_llm']} "
|
||||
f"no_key={stats['skipped_no_key']} errors={stats['errors']}"
|
||||
)
|
||||
except Exception as exc:
|
||||
stats["errors"] += 1
|
||||
print(f" [{i}/{len(edges)}] EXCEPTION: {exc}")
|
||||
|
||||
print(f"\nDONE: {stats}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Loading…
Reference in a new issue