diff --git a/scripts/cleanup_neo4j_imports.py b/scripts/cleanup_neo4j_imports.py new file mode 100644 index 0000000..535e2ea --- /dev/null +++ b/scripts/cleanup_neo4j_imports.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +"""Delete memories that were imported from Neo4j (tagged metadata.source='neo4j_legacy_import'). + +Usage: + docker compose exec backend python /tmp/cleanup.py --user alice + docker compose exec backend python /tmp/cleanup.py --users alice,hetashree,manju + docker compose exec backend python /tmp/cleanup.py --dry-run --users alice +""" +import argparse +import json +import os +import re +import sys + +import httpx +from qdrant_client import QdrantClient +from qdrant_client.http import models + +QDRANT_HOST = os.environ.get("QDRANT_HOST", "qdrant") +QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333")) +QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION_NAME", "mem0_v3") +MEM0_URL = os.environ.get("MEM0_URL", "http://localhost:8000") + + +def load_api_keys() -> dict: + raw = os.environ.get("API_KEYS", "{}") + keys = json.loads(raw) + inv: dict = {} + for k, u in keys.items(): + inv.setdefault(u, k) + return inv + + +def scroll_legacy_for(client: QdrantClient, user_id: str) -> list: + """Return all memory IDs in mem0_v3 with source='neo4j_legacy_import' for the user.""" + ids = [] + offset = None + while True: + points, offset = client.scroll( + collection_name=QDRANT_COLLECTION, + scroll_filter=models.Filter( + must=[ + models.FieldCondition(key="user_id", match=models.MatchValue(value=user_id)), + models.FieldCondition(key="source", match=models.MatchValue(value="neo4j_legacy_import")), + ] + ), + limit=128, + with_payload=False, + with_vectors=False, + offset=offset, + ) + ids.extend(p.id for p in points) + if offset is None: + break + return ids + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--user", help="Single user to clean") + ap.add_argument("--users", help="Comma-separated user list") + ap.add_argument("--dry-run", action="store_true") + args = ap.parse_args() + + targets = [] + if args.user: + targets.append(args.user) + if args.users: + targets.extend(u.strip() for u in args.users.split(",")) + if not targets: + print("Usage: --user X or --users a,b,c", file=sys.stderr) + return 2 + + api_keys = load_api_keys() + client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) + http = httpx.Client(timeout=60.0) + + for user_id in targets: + ids = scroll_legacy_for(client, user_id) + print(f"{user_id}: found {len(ids)} legacy-import memories") + if args.dry_run: + continue + key = api_keys.get(user_id) + if not key: + print(f" no API key for {user_id} — skipping") + continue + deleted = errors = 0 + for mid in ids: + r = http.delete(f"{MEM0_URL}/memories/{mid}", headers={"X-API-Key": key}) + if r.status_code == 200: + deleted += 1 + else: + errors += 1 + print(f" failed {mid}: HTTP {r.status_code} {r.text[:120]}") + print(f" deleted={deleted} errors={errors}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/import_neo4j_to_mem0.py b/scripts/import_neo4j_to_mem0.py new file mode 100644 index 0000000..a7f48a6 --- /dev/null +++ b/scripts/import_neo4j_to_mem0.py @@ -0,0 +1,273 @@ +#!/usr/bin/env python3 +"""Import Neo4j graph relationships into mem0 v2 as natural-language memories. + +For each `(source)-[rel_type]->(destination)` edge in the Neo4j graph, POST a +short sentence ("source verb destination") to mem0's `/memories` endpoint. +mem0's v3 fact-extraction + entity-linking pipeline handles the rest. + +Tagged with metadata.source="neo4j_legacy_import" so the imports can be +identified (or bulk-deleted) later via the standard /memories endpoints. + +Usage (inside the mem0-backend container, which is on the same docker network +as neo4j-temp and the mem0 backend itself): + + # Dry-run — show sample sentences without posting: + docker compose exec backend python /tmp/import.py --dry-run --limit 30 + + # Single-user POC: + docker compose exec backend python /tmp/import.py --user akshat + + # Full sweep: + docker compose exec backend python /tmp/import.py + +Environment variables (all have sensible defaults for the beast deployment): + NEO4J_URI bolt://neo4j-temp:7687 + NEO4J_USER neo4j + NEO4J_PASS mem0_neo4j_password + MEM0_URL http://localhost:8000 + API_KEYS JSON mapping {api_key: user_id} (already in container env) +""" + +import argparse +import json +import os +import sys +import time +from typing import Optional + +import httpx +from neo4j import GraphDatabase + +NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://neo4j-temp:7687") +NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j") +NEO4J_PASS = os.environ.get("NEO4J_PASS", "mem0_neo4j_password") +MEM0_URL = os.environ.get("MEM0_URL", "http://localhost:8000") + +# LiteLLM proxy for sentence rewriting (--llm-rewrite mode) +LLM_URL = os.environ.get("LLM_URL", os.environ.get("OPENAI_BASE_URL", "https://veronica.pratikn.com")) +LLM_KEY = os.environ.get("LLM_KEY", os.environ.get("OPENAI_API_KEY", "")) +LLM_MODEL = os.environ.get("LLM_MODEL", "minimax-m2") + +REWRITE_PROMPT = """You are converting a knowledge-graph relationship into a natural English sentence for a personal memory system. + +The user we're building memories for is identified by user_id="{user_id}". The graph may reference them by variant names (full name, "user_id:_{user_id}", nicknames). Treat all of those as the same person — in your output sentence, use the name "{user_id}" (capitalized appropriately) so the memory layer can link it consistently. + +Convert this tuple into ONE natural English sentence: + source: {source} + relationship: {rel} + destination: {dest} + +Rules: +- Output ONLY the sentence. No quotes, no preamble, no explanation, no markdown. +- Use proper capitalization, grammar, and convert snake_case names to natural words ("custom_headers" → "custom headers"). +- If the source IS the user, write a third-person sentence starting with "{user_id_cap}" (e.g., "{user_id_cap} works at TechCorp."). +- If the source is a non-user entity, write a third-person sentence about that entity (e.g., "Dr Seta works at Chitra."). +- If the tuple doesn't translate to a meaningful, retrievable fact (pure identifier metadata like a postal code, generic world knowledge like a timezone offset, or self-references), respond with exactly: SKIP""" + + +def load_api_keys() -> dict: + raw = os.environ.get("API_KEYS", "{}") + keys = json.loads(raw) + inv: dict = {} + for k, u in keys.items(): + inv.setdefault(u, k) + return inv + + +def humanize(name: str, user_id: Optional[str] = None) -> str: + """snake_case node name → human-readable. __User__ nodes collapse to the user_id.""" + if not name: + return "" + # The __User__ label uses name = "user_id:_" + if name.startswith("user_id:_") and user_id: + return user_id + return name.replace("_", " ") + + +def humanize_rel(rel_type: str) -> str: + return rel_type.replace("_", " ") + + +def build_sentence(source: str, rel_type: str, dest: str, user_id: str) -> str: + return f"{humanize(source, user_id)} {humanize_rel(rel_type)} {humanize(dest, user_id)}." + + +_llm_cache: dict = {} + + +def llm_rewrite( + source: str, + rel_type: str, + dest: str, + user_id: str, + client: httpx.Client, +) -> Optional[str]: + """Ask minimax-m2 to convert the tuple to a natural sentence. Returns None for SKIP.""" + key = (source, rel_type, dest, user_id) + if key in _llm_cache: + return _llm_cache[key] + + prompt = REWRITE_PROMPT.format( + user_id=user_id, + user_id_cap=user_id[:1].upper() + user_id[1:], + source=source, + rel=rel_type, + dest=dest, + ) + resp = client.post( + f"{LLM_URL}/v1/chat/completions", + headers={"Authorization": f"Bearer {LLM_KEY}"}, + json={ + "model": LLM_MODEL, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 400, + "temperature": 0.2, + }, + timeout=60.0, + ) + resp.raise_for_status() + data = resp.json() + content = (data["choices"][0]["message"].get("content") or "").strip() + # Strip wrapping quotes/whitespace + content = content.strip(" \"'\n\r\t") + if not content or content.upper().strip(" .!") == "SKIP": + _llm_cache[key] = None + return None + _llm_cache[key] = content + return content + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--user", help="Single user_id to migrate (default: all)") + ap.add_argument("--limit", type=int, help="Max relationships to process") + ap.add_argument("--dry-run", action="store_true", help="Print sentences only, no POSTs") + ap.add_argument("--neo4j-uri", default=NEO4J_URI) + ap.add_argument("--mem0-url", default=MEM0_URL) + ap.add_argument( + "--llm-rewrite", + action="store_true", + help="Use minimax-m2 (via veronica.pratikn.com) to rewrite tuples into natural English sentences", + ) + ap.add_argument("--llm-url", default=LLM_URL) + ap.add_argument("--llm-model", default=LLM_MODEL) + args = ap.parse_args() + + api_keys = load_api_keys() + + # Build the cypher query + where = "WHERE n.user_id IS NOT NULL AND m.user_id IS NOT NULL" + params: dict = {} + if args.user: + where += " AND n.user_id = $uid" + params["uid"] = args.user + limit_clause = f"LIMIT {args.limit}" if args.limit else "" + cypher = ( + f"MATCH (n)-[r]->(m) {where} " + "RETURN n.name AS source, type(r) AS rel, m.name AS dest, n.user_id AS user_id " + f"{limit_clause}" + ) + + driver = GraphDatabase.driver(args.neo4j_uri, auth=(NEO4J_USER, NEO4J_PASS)) + with driver.session() as session: + edges = list(session.run(cypher, **params)) + driver.close() + + print(f"Found {len(edges)} relationships") + + client = httpx.Client(timeout=180.0) + + def make_sentence(e) -> Optional[str]: + if args.llm_rewrite: + return llm_rewrite(e["source"], e["rel"], e["dest"], e["user_id"], client) + return build_sentence(e["source"], e["rel"], e["dest"], e["user_id"]) + + if args.dry_run: + n_show = min(30, len(edges)) + print(f"\nSample sentences (first {n_show})" + + (" [LLM-rewritten]" if args.llm_rewrite else "") + ":") + for e in edges[:n_show]: + s = make_sentence(e) + tup = f"({e['source']}, {e['rel']}, {e['dest']})" + marker = "SKIP" if s is None else s + print(f" [{e['user_id']}] {tup} -> {marker}") + if len(edges) > n_show: + print(f" ... and {len(edges) - n_show} more") + # Show per-user breakdown + by_user: dict = {} + for e in edges: + by_user[e["user_id"]] = by_user.get(e["user_id"], 0) + 1 + print(f"\nBy user: {by_user}") + return 0 + + timestamp = int(time.time()) + stats = { + "posted": 0, + "skipped_no_key": 0, + "skipped_llm": 0, + "errors": 0, + "extracted_total": 0, + "no_facts": 0, + } + + for i, edge in enumerate(edges, 1): + user_id = edge["user_id"] + api_key = api_keys.get(user_id) + if not api_key: + stats["skipped_no_key"] += 1 + continue + try: + sentence = make_sentence(edge) + except Exception as exc: + stats["errors"] += 1 + print(f" [{i}/{len(edges)}] LLM REWRITE EXCEPTION: {exc}") + continue + if sentence is None: + stats["skipped_llm"] += 1 + continue + body = { + "user_id": user_id, + "messages": [{"role": "user", "content": sentence}], + "metadata": { + "source": "neo4j_legacy_import", + "neo4j_rel_type": edge["rel"], + "import_timestamp": timestamp, + }, + } + try: + r = client.post( + f"{args.mem0_url}/memories", + headers={"X-API-Key": api_key}, + json=body, + ) + if r.status_code != 200: + stats["errors"] += 1 + print( + f" [{i}/{len(edges)}] {user_id} {sentence!r} HTTP {r.status_code} " + f"{r.text[:120]}" + ) + continue + data = r.json() + results = (data.get("added_memories") or [{}])[0].get("results", []) + stats["extracted_total"] += len(results) + if not results: + stats["no_facts"] += 1 + stats["posted"] += 1 + if i % 5 == 0 or i == len(edges): + print( + f" [{i}/{len(edges)}] posted={stats['posted']} " + f"extracted={stats['extracted_total']} " + f"no_facts={stats['no_facts']} " + f"llm_skipped={stats['skipped_llm']} " + f"no_key={stats['skipped_no_key']} errors={stats['errors']}" + ) + except Exception as exc: + stats["errors"] += 1 + print(f" [{i}/{len(edges)}] EXCEPTION: {exc}") + + print(f"\nDONE: {stats}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())