RAG Engine Overview¶
The RAG (Retrieval-Augmented Generation) Engine is the core component responsible for document processing, knowledge retrieval, and context-aware response generation in RAGents.
Architecture Overview¶
The RAG Engine consists of several integrated components:
- Document Processing - Multimodal document ingestion and chunking
- Vector Storage - Pluggable vector database backends
- Retrieval Engine - Semantic search and context retrieval
- Reranking - Advanced result reranking and filtering
- Response Generation - Context-aware answer synthesis
Key Features¶
- Multimodal Processing - Handle text, images, PDFs, tables, and more
- Pluggable Vector Stores - ChromaDB, Weaviate, pgvector, Elasticsearch
- Hybrid Search - Combine semantic and keyword search
- Intelligent Chunking - Context-aware document segmentation
- Autocut Filtering - Smart result filtering based on relevance
- Batch Processing - Efficient bulk document handling
- Caching - Intelligent result and embedding caching
Basic Usage¶
Simple RAG Setup¶
import asyncio
from ragents import RAGEngine, RAGConfig
from ragents.llm.client import LLMClient
from ragents.config.environment import get_llm_config_from_env
async def basic_rag_example():
# Initialize configuration
rag_config = RAGConfig.from_env()
llm_config = get_llm_config_from_env()
llm_client = LLMClient(llm_config)
# Create RAG engine
rag_engine = RAGEngine(rag_config, llm_client)
# Add documents
await rag_engine.add_document("path/to/document.pdf")
await rag_engine.add_text("Additional context information")
# Query the knowledge base
response = await rag_engine.query("What are the main findings?")
print(response.answer)
print(f"Sources: {response.sources}")
asyncio.run(basic_rag_example())
RAG with Custom Vector Store¶
from ragents.vector_stores import create_vector_store, VectorStoreConfig, VectorStoreType
async def custom_vector_store_rag():
# Configure vector store
vector_config = VectorStoreConfig(
store_type=VectorStoreType.WEAVIATE,
url="http://localhost:8080",
collection_name="my_knowledge_base"
)
vector_store = create_vector_store(vector_config)
# Configure RAG with custom vector store
rag_config = RAGConfig(
chunk_size=1000,
chunk_overlap=200,
top_k=5,
vector_store=vector_store
)
rag_engine = RAGEngine(rag_config, llm_client)
# Process documents
documents = [
"research_paper.pdf",
"technical_manual.docx",
"presentation.pptx"
]
for doc in documents:
result = await rag_engine.add_document(doc)
print(f"Processed {doc}: {result.chunks_created} chunks")
# Query with enhanced retrieval
response = await rag_engine.query(
"Compare the methodologies across the documents",
retrieval_strategy="hybrid",
rerank_results=True
)
print(response.answer)
Document Processing¶
Supported Formats¶
The RAG Engine supports various document formats:
- Text Files - .txt, .md, .csv
- Office Documents - .pdf, .docx, .pptx, .xlsx
- Images - .png, .jpg, .jpeg (with OCR)
- Web Content - HTML, URLs
- Code Files - .py, .js, .java, .cpp, etc.
- Structured Data - JSON, XML, YAML
Document Ingestion¶
# Single document
result = await rag_engine.add_document(
"document.pdf",
metadata={"author": "John Doe", "category": "research"},
chunk_strategy="semantic" # or "fixed", "paragraph", "sentence"
)
# Batch processing
documents = [
{"path": "doc1.pdf", "metadata": {"type": "manual"}},
{"path": "doc2.docx", "metadata": {"type": "report"}},
]
results = await rag_engine.add_documents_batch(
documents,
batch_size=5,
show_progress=True
)
# URL processing
web_result = await rag_engine.add_url(
"https://example.com/article",
extract_links=True,
max_depth=2
)
# Direct text
text_result = await rag_engine.add_text(
"Important information to remember",
metadata={"source": "user_input", "timestamp": "2024-01-01"}
)
Chunking Strategies¶
from ragents.rag.chunking import ChunkingStrategy
# Semantic chunking (recommended)
rag_config = RAGConfig(
chunking_strategy=ChunkingStrategy.SEMANTIC,
chunk_size=1000,
chunk_overlap=200,
semantic_similarity_threshold=0.8
)
# Fixed-size chunking
rag_config = RAGConfig(
chunking_strategy=ChunkingStrategy.FIXED,
chunk_size=512,
chunk_overlap=50
)
# Paragraph-based chunking
rag_config = RAGConfig(
chunking_strategy=ChunkingStrategy.PARAGRAPH,
min_chunk_size=100,
max_chunk_size=2000
)
# Custom chunking
from ragents.rag.chunking import CustomChunker
class CustomChunker:
async def chunk_text(self, text: str, metadata: dict) -> list:
# Your custom chunking logic
chunks = []
# ... implementation
return chunks
rag_config = RAGConfig(custom_chunker=CustomChunker())
Retrieval Strategies¶
Basic Retrieval¶
# Simple similarity search
results = await rag_engine.retrieve(
query="machine learning algorithms",
top_k=5
)
for result in results:
print(f"Score: {result.score}")
print(f"Content: {result.content[:200]}...")
print(f"Metadata: {result.metadata}")
Advanced Retrieval¶
# Hybrid search (semantic + keyword)
results = await rag_engine.retrieve(
query="deep learning optimization techniques",
strategy="hybrid",
top_k=10,
keyword_weight=0.3,
semantic_weight=0.7
)
# MMR (Maximal Marginal Relevance) for diversity
results = await rag_engine.retrieve(
query="neural network architectures",
strategy="mmr",
top_k=8,
diversity_lambda=0.5
)
# Filtered retrieval
results = await rag_engine.retrieve(
query="computer vision",
top_k=5,
filters={
"metadata.category": "research",
"metadata.year": {"$gte": 2020}
}
)
Query Expansion¶
# Enable query expansion
rag_config = RAGConfig(
enable_query_expansion=True,
expansion_strategy="synonym", # or "related_terms", "llm_expansion"
max_expanded_terms=5
)
# Custom query expansion
from ragents.query_rewriting import QueryExpander
class CustomQueryExpander(QueryExpander):
async def expand_query(self, query: str) -> str:
# Your custom expansion logic
expanded = f"{query} OR related_terms OR synonyms"
return expanded
rag_config = RAGConfig(query_expander=CustomQueryExpander())
Response Generation¶
Basic Generation¶
# Generate response with retrieved context
response = await rag_engine.query(
"What are the benefits of renewable energy?",
include_sources=True,
max_tokens=500
)
print(f"Answer: {response.answer}")
print(f"Sources: {len(response.sources)} documents used")
print(f"Confidence: {response.confidence}")
Advanced Generation¶
# Custom generation with templates
response = await rag_engine.query(
"Summarize the main points",
response_template="""
Based on the retrieved information:
**Main Points:**
{main_points}
**Supporting Evidence:**
{evidence}
**Conclusion:**
{conclusion}
**Sources:** {sources}
""",
structured_output=True
)
# Streaming responses
async for chunk in rag_engine.query_stream(
"Explain quantum computing",
chunk_size=50
):
print(chunk, end="", flush=True)
Citation and Attribution¶
# Automatic citation generation
rag_config = RAGConfig(
enable_citations=True,
citation_style="apa", # or "mla", "chicago", "numbered"
min_citation_confidence=0.7
)
response = await rag_engine.query(
"What is the impact of climate change?",
include_citations=True
)
print(response.answer)
print("\n**Citations:**")
for citation in response.citations:
print(f"[{citation.id}] {citation.formatted_citation}")
Multimodal Processing¶
Image Processing¶
# Enable vision processing
rag_config = RAGConfig(
enable_vision=True,
vision_model="gpt-4-vision", # or "claude-3-vision"
ocr_enabled=True
)
# Process images
image_result = await rag_engine.add_document(
"chart.png",
document_type="image",
extract_text=True,
describe_image=True
)
# Query with image context
response = await rag_engine.query(
"What trends are shown in the chart?",
include_visual_context=True
)
Table Processing¶
# Process tables and spreadsheets
table_result = await rag_engine.add_document(
"data.xlsx",
table_processing=True,
extract_column_headers=True,
create_table_summaries=True
)
# Query tabular data
response = await rag_engine.query(
"What is the average sales figure?",
table_aware=True
)
Performance Optimization¶
Caching¶
# Enable intelligent caching
rag_config = RAGConfig(
enable_caching=True,
cache_embeddings=True,
cache_responses=True,
cache_ttl=3600, # 1 hour
cache_strategy="lru" # or "lfu", "ttl"
)
# Precompute embeddings
await rag_engine.precompute_embeddings(
documents=["doc1.pdf", "doc2.pdf"],
batch_size=10
)
Batch Processing¶
# Efficient batch processing
documents = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
results = await rag_engine.add_documents_batch(
documents,
batch_size=5,
parallel_processing=True,
max_workers=4
)
# Batch queries
queries = [
"What is machine learning?",
"Explain neural networks",
"Define artificial intelligence"
]
responses = await rag_engine.query_batch(
queries,
batch_size=3,
deduplicate_context=True
)
Memory Management¶
# Configure memory usage
rag_config = RAGConfig(
max_memory_usage="2GB",
chunk_batch_size=100,
embedding_batch_size=50,
cleanup_interval=3600 # seconds
)
# Monitor memory usage
stats = await rag_engine.get_memory_stats()
print(f"Memory usage: {stats.memory_usage_mb}MB")
print(f"Cached embeddings: {stats.cached_embeddings}")
print(f"Active chunks: {stats.active_chunks}")
Monitoring and Analytics¶
Performance Metrics¶
from ragents.observability import RAGMetrics
metrics = RAGMetrics(rag_engine)
# Get retrieval statistics
retrieval_stats = metrics.get_retrieval_stats()
print(f"Average retrieval time: {retrieval_stats.avg_time}ms")
print(f"Cache hit rate: {retrieval_stats.cache_hit_rate}")
print(f"Average relevance score: {retrieval_stats.avg_relevance}")
# Get document statistics
doc_stats = metrics.get_document_stats()
print(f"Total documents: {doc_stats.total_documents}")
print(f"Total chunks: {doc_stats.total_chunks}")
print(f"Average chunk size: {doc_stats.avg_chunk_size}")
Query Analytics¶
# Track query patterns
query_analytics = metrics.get_query_analytics()
print(f"Most common queries: {query_analytics.popular_queries}")
print(f"Query performance: {query_analytics.performance_trends}")
# Custom metrics
@metrics.track_metric("custom_retrieval_quality")
async def track_retrieval_quality(query: str, results: list):
# Your custom quality assessment
quality_score = assess_result_quality(results)
return {"quality": quality_score, "query_length": len(query)}
Error Handling¶
Graceful Degradation¶
try:
response = await rag_engine.query("Complex query")
except DocumentProcessingError as e:
print(f"Document processing failed: {e}")
# Fallback to basic LLM response
fallback_response = await llm_client.complete(query)
except RetrievalError as e:
print(f"Retrieval failed: {e}")
# Try with different strategy
response = await rag_engine.query(
query,
strategy="keyword_only",
fallback_to_llm=True
)
Robust Configuration¶
rag_config = RAGConfig(
# Retry configuration
max_retries=3,
retry_delay=1.0,
exponential_backoff=True,
# Fallback options
fallback_to_llm=True,
fallback_vector_store="memory",
fallback_embedding_model="sentence-transformers",
# Validation
validate_documents=True,
check_document_size=True,
max_document_size="10MB"
)
Best Practices¶
Document Organization¶
- Consistent Metadata - Use structured metadata across documents
- Logical Grouping - Organize documents by topic or source
- Regular Updates - Keep knowledge base current
- Quality Control - Validate document quality before ingestion
Retrieval Optimization¶
- Right Chunk Size - Balance context and relevance
- Appropriate Top-K - Don't retrieve too many irrelevant results
- Use Filters - Narrow down search when possible
- Monitor Performance - Track retrieval quality metrics
Response Quality¶
- Context Validation - Ensure retrieved context is relevant
- Citation Accuracy - Verify source attribution
- Response Consistency - Test with similar queries
- User Feedback - Collect and incorporate user feedback
Next Steps¶
- Document Processing - Deep dive into document handling
- Vector Stores - Explore vector database options
- Retrieval Strategies - Advanced retrieval techniques
- Reranking - Improve result relevance