π Complete Guide to AgentVectorDB (AVDB)
π οΈ Installation & Setup
System Requirements
Before installing AgentVectorDB, ensure your system meets these requirements:
- Python 3.8 or higher
- pip (Python package installer)
- 100MB free disk space (minimum)
- 512MB RAM (minimum)
- For production: 1GB+ RAM recommended
Installation Options
# Basic installation
pip install agentvectordb
# Development installation
git clone https://github.com/superagenticai/agentvectordb.git
cd agentvectordb
pip install -e .
# With all extras (recommended for development)
pip install "agentvectordb[all]"
π Basic Usage Guide
1. Initialize Store
from agentvectordb import AgentVectorDBStore
from agentvectordb.embeddings import DefaultTextEmbeddingFunction
# Create embedding function
ef = DefaultTextEmbeddingFunction(dimension=64)
# Initialize store with path
store = AgentVectorDBStore(db_path="./my_agent_db")
2. Create Collection
# Create or get a collection with embedding function
collection = store.get_or_create_collection(
name="agent_memories",
embedding_function=ef,
recreate=False # Set to True only if you want to delete existing collection
)
3. Add Initial Memories
Important: Collections require at least 8 entries for proper KMeans vector index creation. Adding fewer entries will result in the warning "Skipping vector index creation: not enough rows for KMeans" and reduced search performance.
# Initialize with required minimum entries (8 diverse memories required for KMeans)
initial_memories = [
{
"content": "System started processing batch job",
"type": "system_log",
"metadata": {"operation": "batch_start"}
},
{
"content": "Memory usage peaked at 85%",
"type": "system_metric",
"metadata": {"metric": "memory"}
},
{
"content": "Database backup initiated",
"type": "maintenance",
"metadata": {"operation": "backup"}
},
{
"content": "API response time improved by 20%",
"type": "performance",
"metadata": {"metric": "latency"}
},
{
"content": "New user registration spike detected",
"type": "analytics",
"metadata": {"event": "registration"}
},
{
"content": "Cache hit ratio at 95%",
"type": "performance",
"metadata": {"metric": "cache"}
},
{
"content": "Security scan completed successfully",
"type": "security",
"metadata": {"event": "scan"}
},
{
"content": "Load balancer configuration updated",
"type": "system_config",
"metadata": {"component": "lb"}
}
]
# Add initial batch (this will create the KMeans index)
collection.add_batch(initial_memories)
# Verify the index creation
print(f"Collection size: {collection.count()}") # Should show 8 or more entries
4. Query Memories
# Simple semantic search
results = collection.query(
query_text="system performance",
k=2 # Number of results to return
)
# Process results
for result in results:
print(f"Content: {result['content']}")
print(f"Score: {result['_distance']}")
print("---")
π Async Usage
import asyncio
from agentvectordb import AsyncAgentVectorDBStore
from agentvectordb.embeddings import DefaultTextEmbeddingFunction
async def main():
# Create embedding function
ef = DefaultTextEmbeddingFunction(dimension=64)
# Initialize async store
store = AsyncAgentVectorDBStore(db_path="./async_db")
# Create collection
collection = await store.get_or_create_collection(
name="async_memories",
embedding_function=ef
)
# Add initial batch with required minimum entries
initial_memories = [
{
"content": "Processing async operation 1",
"type": "system_log",
"metadata": {"timestamp": "2024-05-19T10:00:00"}
},
{
"content": "Async task queue status",
"type": "system_metric",
"metadata": {"queue_size": 10}
},
{
"content": "Background job completed",
"type": "task_status",
"metadata": {"duration": "5m"}
},
{
"content": "WebSocket connection established",
"type": "connection",
"metadata": {"client_id": "client1"}
},
{
"content": "Cache invalidation triggered",
"type": "cache",
"metadata": {"reason": "update"}
},
{
"content": "Rate limiter status check",
"type": "security",
"metadata": {"limits": "ok"}
},
{
"content": "Async worker health check",
"type": "health",
"metadata": {"status": "healthy"}
},
{
"content": "Event stream processing status",
"type": "stream",
"metadata": {"events": 100}
}
]
try:
await collection.add_batch(initial_memories)
print("Successfully initialized with required minimum entries")
# Now you can add single entries
await collection.add(
content="New async operation",
type="system_log",
metadata={"timestamp": "2024-05-19T11:00:00"}
)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())
π§ Embedding Functions Guide
Built-in Embeddings
from agentvectordb.embeddings import DefaultTextEmbeddingFunction
# Default embedding
ef = DefaultTextEmbeddingFunction(dimension=384)
OpenAI Integration
from agentvectordb.embeddings import BaseEmbeddingFunction
import openai
class OpenAIEmbedding(BaseEmbeddingFunction):
def __init__(self, api_key, model="text-embedding-3-small"):
openai.api_key = api_key
self.model = model
self._dimension = 1536
def embed(self, texts):
response = openai.Embedding.create(
input=texts,
model=self.model
)
return [item.embedding for item in response.data]
Custom Embeddings
from agentvectordb.embeddings import BaseEmbeddingFunction
import numpy as np
class CustomEmbedding(BaseEmbeddingFunction):
def __init__(self, dimension=64):
super().__init__(dimension=dimension)
def embed(self, texts):
# Your embedding logic here
return np.random.rand(len(texts), self.dimension)
π― Advanced Features
Complex Queries
# Query with filters
results = collection.query(
query_text="performance issues",
k=5,
filter_sql="type = 'performance' AND importance_score > 0.7",
include_vectors=True
)
Batch Operations
def process_large_dataset(collection, data, batch_size=100):
"""Process large datasets efficiently."""
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
try:
collection.add_batch(batch)
except Exception as e:
print(f"Error processing batch: {e}")
Memory Management
def cleanup_old_memories(collection, days_threshold=30):
"""Remove old memories."""
import datetime
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days_threshold)
collection.delete(
filter_sql=f"created_at < '{cutoff_date.isoformat()}'"
)
π§ Best Practices
Error Handling
from agentvectordb.exceptions import AgentVectorDBException
def safe_operation(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except AgentVectorDBException as e:
print(f"Operation failed: {e}")
return None
return wrapper
@safe_operation
def add_memory(collection, content, **kwargs):
return collection.add(content=content, **kwargs)
Logging Setup
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent_memory.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('agentvectordb')
π Troubleshooting
Common Issues
- KMeans Index Creation
# Always add at least 8 diverse memories initially
collection.add_batch([...]) # Minimum 8 items
- Memory Management
# Monitor collection size
print(f"Collection size: {collection.count()}")
- Query Performance
# Use appropriate batch sizes
BATCH_SIZE = 100 # Adjust based on your needs
π Production Deployment
Configuration
PRODUCTION_CONFIG = {
"db_path": "/path/to/prod/db",
"backup_path": "/path/to/backups",
"log_level": "INFO",
"batch_size": 100,
"cleanup_days": 30
}
Monitoring
def monitor_collection_stats(collection):
"""Monitor collection statistics."""
stats = {
"size": collection.count(),
"types": collection.query("SELECT DISTINCT type FROM collection"),
"latest": collection.query("SELECT MAX(created_at) FROM collection")
}
return stats
π€ Contributing
We welcome contributions! See our Contributing Guide for details.