πŸ“– Guides

πŸ“– Complete Guide to AgentVectorDB (AVDB)

πŸ› οΈ Installation & Setup

System Requirements

Before installing AgentVectorDB, ensure your system meets these requirements:

  • Python 3.8 or higher
  • pip (Python package installer)
  • 100MB free disk space (minimum)
  • 512MB RAM (minimum)
  • For production: 1GB+ RAM recommended

Installation Options

# Basic installation
pip install agentvectordb
 
# Development installation
git clone https://github.com/superagenticai/agentvectordb.git
cd agentvectordb
pip install -e .
 
# With all extras (recommended for development)
pip install "agentvectordb[all]"

πŸš€ Basic Usage Guide

1. Initialize Store

from agentvectordb import AgentVectorDBStore
from agentvectordb.embeddings import DefaultTextEmbeddingFunction
 
# Create embedding function
ef = DefaultTextEmbeddingFunction(dimension=64)
 
# Initialize store with path
store = AgentVectorDBStore(db_path="./my_agent_db")

2. Create Collection

# Create or get a collection with embedding function
collection = store.get_or_create_collection(
    name="agent_memories",
    embedding_function=ef,
    recreate=False  # Set to True only if you want to delete existing collection
)

3. Add Initial Memories

Important: Collections require at least 8 entries for proper KMeans vector index creation. Adding fewer entries will result in the warning "Skipping vector index creation: not enough rows for KMeans" and reduced search performance.

# Initialize with required minimum entries (8 diverse memories required for KMeans)
initial_memories = [
    {
        "content": "System started processing batch job",
        "type": "system_log",
        "metadata": {"operation": "batch_start"}
    },
    {
        "content": "Memory usage peaked at 85%",
        "type": "system_metric",
        "metadata": {"metric": "memory"}
    },
    {
        "content": "Database backup initiated",
        "type": "maintenance",
        "metadata": {"operation": "backup"}
    },
    {
        "content": "API response time improved by 20%",
        "type": "performance",
        "metadata": {"metric": "latency"}
    },
    {
        "content": "New user registration spike detected",
        "type": "analytics",
        "metadata": {"event": "registration"}
    },
    {
        "content": "Cache hit ratio at 95%",
        "type": "performance",
        "metadata": {"metric": "cache"}
    },
    {
        "content": "Security scan completed successfully",
        "type": "security",
        "metadata": {"event": "scan"}
    },
    {
        "content": "Load balancer configuration updated",
        "type": "system_config",
        "metadata": {"component": "lb"}
    }
]
 
# Add initial batch (this will create the KMeans index)
collection.add_batch(initial_memories)
 
# Verify the index creation
print(f"Collection size: {collection.count()}")  # Should show 8 or more entries

4. Query Memories

# Simple semantic search
results = collection.query(
    query_text="system performance",
    k=2  # Number of results to return
)
 
# Process results
for result in results:
    print(f"Content: {result['content']}")
    print(f"Score: {result['_distance']}")
    print("---")

πŸ”„ Async Usage

import asyncio
from agentvectordb import AsyncAgentVectorDBStore
from agentvectordb.embeddings import DefaultTextEmbeddingFunction
 
async def main():
    # Create embedding function
    ef = DefaultTextEmbeddingFunction(dimension=64)
    
    # Initialize async store
    store = AsyncAgentVectorDBStore(db_path="./async_db")
    
    # Create collection
    collection = await store.get_or_create_collection(
        name="async_memories",
        embedding_function=ef
    )
    
    # Add initial batch with required minimum entries
    initial_memories = [
        {
            "content": "Processing async operation 1",
            "type": "system_log",
            "metadata": {"timestamp": "2024-05-19T10:00:00"}
        },
        {
            "content": "Async task queue status",
            "type": "system_metric",
            "metadata": {"queue_size": 10}
        },
        {
            "content": "Background job completed",
            "type": "task_status",
            "metadata": {"duration": "5m"}
        },
        {
            "content": "WebSocket connection established",
            "type": "connection",
            "metadata": {"client_id": "client1"}
        },
        {
            "content": "Cache invalidation triggered",
            "type": "cache",
            "metadata": {"reason": "update"}
        },
        {
            "content": "Rate limiter status check",
            "type": "security",
            "metadata": {"limits": "ok"}
        },
        {
            "content": "Async worker health check",
            "type": "health",
            "metadata": {"status": "healthy"}
        },
        {
            "content": "Event stream processing status",
            "type": "stream",
            "metadata": {"events": 100}
        }
    ]
    
    try:
        await collection.add_batch(initial_memories)
        print("Successfully initialized with required minimum entries")
        
        # Now you can add single entries
        await collection.add(
            content="New async operation",
            type="system_log",
            metadata={"timestamp": "2024-05-19T11:00:00"}
        )
    except Exception as e:
        print(f"Error: {e}")
 
if __name__ == "__main__":
    asyncio.run(main())

🧠 Embedding Functions Guide

Built-in Embeddings

from agentvectordb.embeddings import DefaultTextEmbeddingFunction
 
# Default embedding
ef = DefaultTextEmbeddingFunction(dimension=384)

OpenAI Integration

from agentvectordb.embeddings import BaseEmbeddingFunction
import openai
 
class OpenAIEmbedding(BaseEmbeddingFunction):
    def __init__(self, api_key, model="text-embedding-3-small"):
        openai.api_key = api_key
        self.model = model
        self._dimension = 1536
        
    def embed(self, texts):
        response = openai.Embedding.create(
            input=texts,
            model=self.model
        )
        return [item.embedding for item in response.data]

Custom Embeddings

from agentvectordb.embeddings import BaseEmbeddingFunction
import numpy as np
 
class CustomEmbedding(BaseEmbeddingFunction):
    def __init__(self, dimension=64):
        super().__init__(dimension=dimension)
    
    def embed(self, texts):
        # Your embedding logic here
        return np.random.rand(len(texts), self.dimension)

🎯 Advanced Features

Complex Queries

# Query with filters
results = collection.query(
    query_text="performance issues",
    k=5,
    filter_sql="type = 'performance' AND importance_score > 0.7",
    include_vectors=True
)

Batch Operations

def process_large_dataset(collection, data, batch_size=100):
    """Process large datasets efficiently."""
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        try:
            collection.add_batch(batch)
        except Exception as e:
            print(f"Error processing batch: {e}")

Memory Management

def cleanup_old_memories(collection, days_threshold=30):
    """Remove old memories."""
    import datetime
    
    cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days_threshold)
    collection.delete(
        filter_sql=f"created_at < '{cutoff_date.isoformat()}'"
    )

πŸ”§ Best Practices

Error Handling

from agentvectordb.exceptions import AgentVectorDBException
 
def safe_operation(func):
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except AgentVectorDBException as e:
            print(f"Operation failed: {e}")
            return None
    return wrapper
 
@safe_operation
def add_memory(collection, content, **kwargs):
    return collection.add(content=content, **kwargs)

Logging Setup

import logging
 
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('agent_memory.log'),
        logging.StreamHandler()
    ]
)
 
logger = logging.getLogger('agentvectordb')

πŸ” Troubleshooting

Common Issues

  1. KMeans Index Creation
# Always add at least 8 diverse memories initially
collection.add_batch([...])  # Minimum 8 items
  1. Memory Management
# Monitor collection size
print(f"Collection size: {collection.count()}")
  1. Query Performance
# Use appropriate batch sizes
BATCH_SIZE = 100  # Adjust based on your needs

πŸš€ Production Deployment

Configuration

PRODUCTION_CONFIG = {
    "db_path": "/path/to/prod/db",
    "backup_path": "/path/to/backups",
    "log_level": "INFO",
    "batch_size": 100,
    "cleanup_days": 30
}

Monitoring

def monitor_collection_stats(collection):
    """Monitor collection statistics."""
    stats = {
        "size": collection.count(),
        "types": collection.query("SELECT DISTINCT type FROM collection"),
        "latest": collection.query("SELECT MAX(created_at) FROM collection")
    }
    return stats

🀝 Contributing

We welcome contributions! See our Contributing Guide for details.