knowledge-base/test_integration.py
Pratik Narola 7689409950 Initial commit: Production-ready Mem0 interface with monitoring
- Complete Mem0 OSS integration with hybrid datastore
- PostgreSQL + pgvector for vector storage
- Neo4j 5.18 for graph relationships
- Google Gemini embeddings integration
- Comprehensive monitoring with correlation IDs
- Real-time statistics and performance tracking
- Production-grade observability features
- Clean repository with no exposed secrets
2025-08-10 17:34:41 +05:30

543 lines
No EOL
21 KiB
Python

#!/usr/bin/env python3
"""
Integration tests for Mem0 Interface - Zero Mocking, Real API calls
Tests against running Docker Compose stack (PostgreSQL + Neo4j + FastAPI)
Usage:
python test_integration.py # Run all tests (quiet)
python test_integration.py -v # Run with verbose output
python test_integration.py --help # Show help
"""
import requests
import json
import sys
import argparse
from datetime import datetime
import time
BASE_URL = "http://localhost:8000"
TEST_USER = f"test_user_{int(datetime.now().timestamp())}"
def main():
parser = argparse.ArgumentParser(
description="Mem0 Integration Tests - Real API Testing (Zero Mocking)",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("--verbose", "-v", action="store_true",
help="Show detailed output and API responses")
args = parser.parse_args()
verbose = args.verbose
print("🧪 Mem0 Integration Tests - Real API Testing")
print(f"🎯 Target: {BASE_URL}")
print(f"👤 Test User: {TEST_USER}")
print(f"⏰ Started: {datetime.now().strftime('%H:%M:%S')}")
print("=" * 50)
# Test sequence - order matters for data dependencies
tests = [
test_health_check,
test_empty_search_protection,
test_add_memories_with_hierarchy,
test_search_memories_basic,
test_search_memories_hierarchy_filters,
test_get_user_memories_with_hierarchy,
test_memory_history,
test_update_memory,
test_chat_with_memory,
test_graph_relationships_creation,
test_graph_relationships,
test_delete_specific_memory,
test_delete_all_user_memories,
test_cleanup_verification
]
results = []
start_time = time.time()
for test in tests:
result = run_test(test.__name__, test, verbose)
results.append(result)
# Small delay between tests for API stability
time.sleep(0.5)
# Summary
end_time = time.time()
duration = end_time - start_time
passed = sum(1 for r in results if r)
total = len(results)
print("=" * 50)
print(f"📊 Test Results: {passed}/{total} tests passed")
print(f"⏱️ Duration: {duration:.2f} seconds")
if passed == total:
print("✅ All tests passed! System is working correctly.")
sys.exit(0)
else:
print("❌ Some tests failed! Check the output above.")
sys.exit(1)
def run_test(name, test_func, verbose):
"""Run a single test with error handling"""
try:
if verbose:
print(f"\n🔍 Running {name}...")
test_func(verbose)
print(f"{name}")
return True
except AssertionError as e:
print(f"{name}: Assertion failed - {e}")
return False
except requests.exceptions.ConnectionError:
print(f"{name}: Cannot connect to {BASE_URL} - Is the server running?")
return False
except Exception as e:
print(f"{name}: {e}")
return False
def log_response(response, verbose, context=""):
"""Log API response details if verbose"""
if verbose:
print(f" {context} Status: {response.status_code}")
try:
data = response.json()
if isinstance(data, dict) and len(data) < 5:
print(f" {context} Response: {data}")
else:
print(f" {context} Response keys: {list(data.keys()) if isinstance(data, dict) else 'list'}")
except:
print(f" {context} Response: {response.text[:100]}...")
# ================== TEST FUNCTIONS ==================
def test_health_check(verbose):
"""Test service health endpoint"""
response = requests.get(f"{BASE_URL}/health", timeout=10)
log_response(response, verbose, "Health")
assert response.status_code == 200, f"Expected 200, got {response.status_code}"
data = response.json()
assert "status" in data, "Health response missing 'status' field"
assert data["status"] in ["healthy", "degraded"], f"Invalid status: {data['status']}"
# Check individual services
assert "services" in data, "Health response missing 'services' field"
if verbose:
print(f" Overall status: {data['status']}")
for service, status in data["services"].items():
print(f" {service}: {status}")
def test_empty_search_protection(verbose):
"""Test empty query protection (should not return 500 error)"""
payload = {
"query": "",
"user_id": TEST_USER,
"limit": 5
}
response = requests.post(f"{BASE_URL}/memories/search", json=payload, timeout=10)
log_response(response, verbose, "Empty Search")
assert response.status_code == 200, f"Empty query failed with {response.status_code}"
data = response.json()
assert data["memories"] == [], "Empty query should return empty memories list"
assert "note" in data, "Empty query response should include explanatory note"
assert data["query"] == "", "Query should be echoed back"
if verbose:
print(f" Empty search note: {data['note']}")
print(f" Total count: {data.get('total_count', 0)}")
def test_add_memories_with_hierarchy(verbose):
"""Test adding memories with multi-level hierarchy support"""
payload = {
"messages": [
{"role": "user", "content": "I work at TechCorp as a Senior Software Engineer"},
{"role": "user", "content": "My colleague Sarah from Marketing team helped with Q3 presentation"},
{"role": "user", "content": "Meeting with John the Product Manager tomorrow about new feature development"}
],
"user_id": TEST_USER,
"agent_id": "test_agent",
"run_id": "test_run_001",
"session_id": "test_session_001",
"metadata": {"test": "integration", "scenario": "work_context"}
}
response = requests.post(f"{BASE_URL}/memories", json=payload, timeout=60)
log_response(response, verbose, "Add Memories")
assert response.status_code == 200, f"Add memories failed with {response.status_code}"
data = response.json()
assert "added_memories" in data, "Response missing 'added_memories'"
assert "message" in data, "Response missing success message"
assert len(data["added_memories"]) > 0, "No memories were added"
# Verify graph extraction (if available)
memories = data["added_memories"]
if isinstance(memories, list) and len(memories) > 0:
first_memory = memories[0]
if "relations" in first_memory:
relations = first_memory["relations"]
if "added_entities" in relations and relations["added_entities"]:
if verbose:
print(f" Graph extracted: {len(relations['added_entities'])} relationships")
print(f" Sample relations: {relations['added_entities'][:3]}")
if verbose:
print(f" Added {len(memories)} memory blocks")
print(f" Hierarchy - Agent: test_agent, Run: test_run_001, Session: test_session_001")
def test_search_memories_basic(verbose):
"""Test basic memory search functionality"""
# Test meaningful search
payload = {
"query": "TechCorp",
"user_id": TEST_USER,
"limit": 10
}
response = requests.post(f"{BASE_URL}/memories/search", json=payload, timeout=15)
log_response(response, verbose, "Search")
assert response.status_code == 200, f"Search failed with {response.status_code}"
data = response.json()
assert "memories" in data, "Search response missing 'memories'"
assert "total_count" in data, "Search response missing 'total_count'"
assert "query" in data, "Search response missing 'query'"
assert data["query"] == "TechCorp", "Query not echoed correctly"
# Should find memories since we just added some
assert data["total_count"] > 0, "Search should find previously added memories"
assert len(data["memories"]) > 0, "Search should return memory results"
# Verify memory structure
memory = data["memories"][0]
assert "id" in memory, "Memory missing 'id'"
assert "memory" in memory, "Memory missing 'memory' content"
assert "user_id" in memory, "Memory missing 'user_id'"
if verbose:
print(f" Found {data['total_count']} memories")
print(f" First memory: {memory['memory'][:50]}...")
def test_search_memories_hierarchy_filters(verbose):
"""Test multi-level hierarchy filtering in search"""
# Test with hierarchy filters
payload = {
"query": "TechCorp",
"user_id": TEST_USER,
"agent_id": "test_agent",
"run_id": "test_run_001",
"session_id": "test_session_001",
"limit": 10
}
response = requests.post(f"{BASE_URL}/memories/search", json=payload, timeout=15)
log_response(response, verbose, "Hierarchy Search")
assert response.status_code == 200, f"Hierarchy search failed with {response.status_code}"
data = response.json()
assert "memories" in data, "Hierarchy search response missing 'memories'"
# Should find memories since we added with these exact hierarchy values
assert len(data["memories"]) > 0, "Should find memories with matching hierarchy"
if verbose:
print(f" Found {len(data['memories'])} memories with hierarchy filters")
print(f" Filters: agent_id=test_agent, run_id=test_run_001, session_id=test_session_001")
def test_get_user_memories_with_hierarchy(verbose):
"""Test retrieving user memories with hierarchy filtering"""
# Test with hierarchy parameters
params = {
"limit": 20,
"agent_id": "test_agent",
"run_id": "test_run_001",
"session_id": "test_session_001"
}
response = requests.get(f"{BASE_URL}/memories/{TEST_USER}", params=params, timeout=15)
log_response(response, verbose, "Get User Memories with Hierarchy")
assert response.status_code == 200, f"Get user memories with hierarchy failed with {response.status_code}"
memories = response.json()
assert isinstance(memories, list), "User memories should return a list"
if len(memories) > 0:
memory = memories[0]
assert "id" in memory, "Memory missing 'id'"
assert "memory" in memory, "Memory missing 'memory' content"
assert memory["user_id"] == TEST_USER, f"Wrong user_id: {memory['user_id']}"
if verbose:
print(f" Retrieved {len(memories)} memories with hierarchy filters")
print(f" First memory: {memory['memory'][:40]}...")
else:
if verbose:
print(" No memories found with hierarchy filters (may be expected)")
def test_memory_history(verbose):
"""Test memory history endpoint"""
# First get a memory to check history for
response = requests.get(f"{BASE_URL}/memories/{TEST_USER}?limit=1", timeout=10)
assert response.status_code == 200, "Failed to get memory for history test"
memories = response.json()
if len(memories) == 0:
if verbose:
print(" No memories available for history test (skipping)")
return
memory_id = memories[0]["id"]
# Test memory history endpoint
response = requests.get(f"{BASE_URL}/memories/{memory_id}/history", timeout=15)
log_response(response, verbose, "Memory History")
assert response.status_code == 200, f"Memory history failed with {response.status_code}"
data = response.json()
assert "memory_id" in data, "History response missing 'memory_id'"
assert "history" in data, "History response missing 'history'"
assert "message" in data, "History response missing success message"
assert data["memory_id"] == memory_id, f"Wrong memory_id in response: {data['memory_id']}"
if verbose:
print(f" Retrieved history for memory {memory_id}")
print(f" History entries: {len(data['history']) if isinstance(data['history'], list) else 'N/A'}")
def test_update_memory(verbose):
"""Test updating a specific memory"""
# First get a memory to update
response = requests.get(f"{BASE_URL}/memories/{TEST_USER}?limit=1", timeout=10)
assert response.status_code == 200, "Failed to get memory for update test"
memories = response.json()
assert len(memories) > 0, "No memories available to update"
memory_id = memories[0]["id"]
original_content = memories[0]["memory"]
# Update the memory
payload = {
"memory_id": memory_id,
"content": f"UPDATED: {original_content}"
}
response = requests.put(f"{BASE_URL}/memories", json=payload, timeout=10)
log_response(response, verbose, "Update")
assert response.status_code == 200, f"Update failed with {response.status_code}"
data = response.json()
assert "message" in data, "Update response missing success message"
if verbose:
print(f" Updated memory {memory_id}")
print(f" Original: {original_content[:30]}...")
def test_chat_with_memory(verbose):
"""Test memory-enhanced chat functionality"""
payload = {
"message": "What company do I work for?",
"user_id": TEST_USER
}
try:
response = requests.post(f"{BASE_URL}/chat", json=payload, timeout=90)
log_response(response, verbose, "Chat")
assert response.status_code == 200, f"Chat failed with {response.status_code}"
data = response.json()
assert "response" in data, "Chat response missing 'response'"
assert "memories_used" in data, "Chat response missing 'memories_used'"
assert "model_used" in data, "Chat response missing 'model_used'"
# Should use some memories for context
assert data["memories_used"] >= 0, "Memories used should be non-negative"
if verbose:
print(f" Chat response: {data['response'][:60]}...")
print(f" Memories used: {data['memories_used']}")
print(f" Model: {data['model_used']}")
except requests.exceptions.ReadTimeout:
if verbose:
print(" Chat endpoint timed out (LLM API may be slow)")
# Still test that the endpoint exists and accepts requests
try:
response = requests.post(f"{BASE_URL}/chat", json=payload, timeout=5)
except requests.exceptions.ReadTimeout:
# This is expected - endpoint exists but processing is slow
if verbose:
print(" Chat endpoint confirmed active (processing timeout expected)")
def test_graph_relationships_creation(verbose):
"""Test graph relationships creation with entity-rich memories"""
# Create a separate test user for graph relationship testing
graph_test_user = f"graph_test_user_{int(datetime.now().timestamp())}"
# Add memories with clear entity relationships
payload = {
"messages": [
{"role": "user", "content": "John Smith works at Microsoft as a Senior Software Engineer"},
{"role": "user", "content": "John Smith is friends with Sarah Johnson who works at Google"},
{"role": "user", "content": "Sarah Johnson lives in Seattle and loves hiking"},
{"role": "user", "content": "Microsoft is located in Redmond, Washington"},
{"role": "user", "content": "John Smith and Sarah Johnson both graduated from Stanford University"}
],
"user_id": graph_test_user,
"metadata": {"test": "graph_relationships", "scenario": "entity_creation"}
}
response = requests.post(f"{BASE_URL}/memories", json=payload, timeout=60)
log_response(response, verbose, "Add Graph Memories")
assert response.status_code == 200, f"Add graph memories failed with {response.status_code}"
data = response.json()
assert "added_memories" in data, "Response missing 'added_memories'"
if verbose:
print(f" Added {len(data['added_memories'])} memories for graph relationship testing")
# Wait a moment for graph processing (Mem0 graph extraction can be async)
time.sleep(2)
# Test graph relationships endpoint
response = requests.get(f"{BASE_URL}/graph/relationships/{graph_test_user}", timeout=15)
log_response(response, verbose, "Graph Relationships")
assert response.status_code == 200, f"Graph relationships failed with {response.status_code}"
graph_data = response.json()
assert "relationships" in graph_data, "Graph response missing 'relationships'"
assert "entities" in graph_data, "Graph response missing 'entities'"
assert "user_id" in graph_data, "Graph response missing 'user_id'"
assert graph_data["user_id"] == graph_test_user, f"Wrong user_id in graph: {graph_data['user_id']}"
relationships = graph_data["relationships"]
entities = graph_data["entities"]
if verbose:
print(f" Found {len(relationships)} relationships")
print(f" Found {len(entities)} entities")
# Print sample relationships if they exist
if relationships:
print(f" Sample relationships:")
for i, rel in enumerate(relationships[:3]): # Show first 3
source = rel.get("source", "unknown")
target = rel.get("target", "unknown")
relationship = rel.get("relationship", "unknown")
print(f" {i+1}. {source} --{relationship}--> {target}")
# Print sample entities if they exist
if entities:
print(f" Sample entities: {[e.get('name', str(e)) for e in entities[:5]]}")
# Verify relationship structure (if relationships exist)
for rel in relationships:
assert "source" in rel or "from" in rel, f"Relationship missing source/from: {rel}"
assert "target" in rel or "to" in rel, f"Relationship missing target/to: {rel}"
assert "relationship" in rel or "type" in rel, f"Relationship missing type: {rel}"
# Clean up graph test user memories
cleanup_response = requests.delete(f"{BASE_URL}/memories/user/{graph_test_user}", timeout=15)
assert cleanup_response.status_code == 200, "Failed to cleanup graph test memories"
if verbose:
print(f" Cleaned up graph test user: {graph_test_user}")
# Note: We expect some relationships even if graph extraction is basic
# The test passes if the endpoint works and returns proper structure
def test_graph_relationships(verbose):
"""Test graph relationships endpoint"""
response = requests.get(f"{BASE_URL}/graph/relationships/{TEST_USER}", timeout=15)
log_response(response, verbose, "Graph")
assert response.status_code == 200, f"Graph endpoint failed with {response.status_code}"
data = response.json()
assert "relationships" in data, "Graph response missing 'relationships'"
assert "entities" in data, "Graph response missing 'entities'"
assert "user_id" in data, "Graph response missing 'user_id'"
assert data["user_id"] == TEST_USER, f"Wrong user_id in graph: {data['user_id']}"
if verbose:
print(f" Relationships: {len(data['relationships'])}")
print(f" Entities: {len(data['entities'])}")
def test_delete_specific_memory(verbose):
"""Test deleting a specific memory"""
# Get a memory to delete
response = requests.get(f"{BASE_URL}/memories/{TEST_USER}?limit=1", timeout=10)
assert response.status_code == 200, "Failed to get memory for deletion test"
memories = response.json()
assert len(memories) > 0, "No memories available to delete"
memory_id = memories[0]["id"]
# Delete the memory
response = requests.delete(f"{BASE_URL}/memories/{memory_id}", timeout=10)
log_response(response, verbose, "Delete")
assert response.status_code == 200, f"Delete failed with {response.status_code}"
data = response.json()
assert "message" in data, "Delete response missing success message"
if verbose:
print(f" Deleted memory {memory_id}")
def test_delete_all_user_memories(verbose):
"""Test deleting all memories for a user"""
response = requests.delete(f"{BASE_URL}/memories/user/{TEST_USER}", timeout=15)
log_response(response, verbose, "Delete All")
assert response.status_code == 200, f"Delete all failed with {response.status_code}"
data = response.json()
assert "message" in data, "Delete all response missing success message"
if verbose:
print(f" Deleted all memories for {TEST_USER}")
def test_cleanup_verification(verbose):
"""Verify cleanup was successful"""
response = requests.get(f"{BASE_URL}/memories/{TEST_USER}?limit=10", timeout=10)
log_response(response, verbose, "Cleanup Check")
assert response.status_code == 200, f"Cleanup verification failed with {response.status_code}"
memories = response.json()
assert isinstance(memories, list), "Should return list even if empty"
# Should be empty after deletion
if len(memories) > 0:
print(f" Warning: {len(memories)} memories still exist after cleanup")
else:
if verbose:
print(" Cleanup successful - no memories remain")
if __name__ == "__main__":
main()