Custom Agents¶
Learn how to build your own specialized agent types by extending RAGents' base classes and implementing custom behavior patterns.
Overview¶
RAGents provides a flexible foundation for creating custom agents. You can:
- Extend the base Agent class for completely custom behavior
- Inherit from existing agent types and customize specific aspects
- Create mixins for reusable agent capabilities
- Implement domain-specific reasoning patterns
Base Agent Architecture¶
Agent Base Class¶
All agents inherit from the Agent base class:
from ragents.agents.base import Agent
from ragents import AgentConfig
from ragents.llm.client import LLMClient
class CustomAgent(Agent):
def __init__(
self,
config: AgentConfig,
llm_client: LLMClient,
**kwargs
):
super().__init__(config, llm_client, **kwargs)
# Custom initialization
self.custom_state = {}
self.specialized_tools = []
async def process_message(self, message: str, **kwargs) -> str:
"""Main message processing method to override."""
# Your custom processing logic
return await self._custom_processing_pipeline(message, **kwargs)
async def _custom_processing_pipeline(self, message: str, **kwargs) -> str:
"""Implement your custom processing pipeline."""
# 1. Preprocess message
processed_message = await self._preprocess_message(message)
# 2. Custom reasoning
reasoning_result = await self._custom_reasoning(processed_message)
# 3. Generate response
response = await self._generate_response(reasoning_result)
return response
Creating Specialized Agents¶
Domain-Specific Agent¶
Create an agent specialized for a specific domain:
from ragents.agents.base import Agent
from ragents.tools import tool
class MedicalAssistantAgent(Agent):
"""Specialized agent for medical information assistance."""
def __init__(self, config: AgentConfig, llm_client: LLMClient):
super().__init__(config, llm_client)
# Medical-specific configuration
self.medical_databases = ["pubmed", "mayo_clinic", "medline"]
self.safety_filters = ["medication_checker", "contraindication_detector"]
async def process_message(self, message: str, **kwargs) -> str:
# Add medical safety checks
if await self._requires_medical_disclaimer(message):
disclaimer = "⚠️ This is for informational purposes only. Consult a healthcare professional."
# Check for drug interactions
if await self._mentions_medications(message):
interaction_check = await self._check_drug_interactions(message)
# Process with medical context
return await self._medical_response_pipeline(message, **kwargs)
async def _medical_response_pipeline(self, message: str, **kwargs) -> str:
"""Medical-specific processing pipeline."""
# 1. Extract medical entities
entities = await self._extract_medical_entities(message)
# 2. Search medical knowledge bases
medical_context = await self._search_medical_databases(entities)
# 3. Apply safety filters
filtered_context = await self._apply_safety_filters(medical_context)
# 4. Generate response with medical guidelines
response = await self._generate_medical_response(
message, filtered_context, entities
)
return response
@tool(name="medical_entity_extractor")
async def _extract_medical_entities(self, text: str) -> dict:
"""Extract medical entities from text."""
# Your medical NLP logic
return {
"symptoms": ["headache", "fever"],
"medications": ["aspirin"],
"conditions": ["migraine"]
}
Workflow-Specialized Agent¶
Create an agent for specific workflows:
class CodeReviewAgent(Agent):
"""Agent specialized for code review tasks."""
def __init__(self, config: AgentConfig, llm_client: LLMClient):
super().__init__(config, llm_client)
# Code review specific setup
self.supported_languages = ["python", "javascript", "java", "go"]
self.review_criteria = [
"code_quality", "security", "performance",
"maintainability", "documentation"
]
async def process_message(self, message: str, **kwargs) -> str:
# Check if this is a code review request
if "review" in message.lower() and "code" in message.lower():
return await self._conduct_code_review(message, **kwargs)
else:
return await super().process_message(message, **kwargs)
async def _conduct_code_review(self, message: str, **kwargs) -> str:
"""Conduct comprehensive code review."""
# 1. Extract code from message or attachments
code_content = await self._extract_code(message, kwargs.get("attachments"))
# 2. Analyze code structure
analysis = await self._analyze_code_structure(code_content)
# 3. Run automated checks
automated_issues = await self._run_automated_checks(code_content)
# 4. Perform manual review
manual_review = await self._manual_code_review(code_content, analysis)
# 5. Generate comprehensive report
review_report = await self._generate_review_report(
analysis, automated_issues, manual_review
)
return review_report
async def _analyze_code_structure(self, code: str) -> dict:
"""Analyze code structure and patterns."""
return {
"complexity": "medium",
"patterns": ["singleton", "factory"],
"architecture": "mvc",
"test_coverage": 85
}
Agent Mixins¶
Create reusable capabilities with mixins:
Memory Mixin¶
class AdvancedMemoryMixin:
"""Mixin for advanced memory capabilities."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.episodic_memory = {}
self.semantic_memory = {}
self.working_memory = {}
async def store_episodic_memory(self, event: dict):
"""Store episodic memory (specific events)."""
timestamp = event.get("timestamp", time.time())
self.episodic_memory[timestamp] = event
async def store_semantic_memory(self, concept: str, knowledge: dict):
"""Store semantic memory (general knowledge)."""
self.semantic_memory[concept] = knowledge
async def recall_similar_episodes(self, current_situation: dict) -> list:
"""Recall similar past episodes."""
# Your similarity matching logic
similar_episodes = []
for timestamp, episode in self.episodic_memory.items():
similarity = self._calculate_similarity(current_situation, episode)
if similarity > 0.7:
similar_episodes.append(episode)
return similar_episodes
class LearningMixin:
"""Mixin for learning capabilities."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.experience_buffer = []
self.performance_metrics = {}
async def learn_from_interaction(self, interaction: dict):
"""Learn from user interactions."""
self.experience_buffer.append(interaction)
# Update performance metrics
success = interaction.get("success", False)
task_type = interaction.get("task_type", "general")
if task_type not in self.performance_metrics:
self.performance_metrics[task_type] = {"successes": 0, "total": 0}
self.performance_metrics[task_type]["total"] += 1
if success:
self.performance_metrics[task_type]["successes"] += 1
async def adapt_behavior(self):
"""Adapt behavior based on learning."""
for task_type, metrics in self.performance_metrics.items():
success_rate = metrics["successes"] / metrics["total"]
if success_rate < 0.5:
# Poor performance, adjust strategy
await self._adjust_strategy_for_task(task_type)
# Combine mixins in custom agent
class LearningMedicalAgent(AdvancedMemoryMixin, LearningMixin, MedicalAssistantAgent):
"""Medical agent with advanced memory and learning."""
def __init__(self, config: AgentConfig, llm_client: LLMClient):
super().__init__(config, llm_client)
async def process_message(self, message: str, **kwargs) -> str:
# Process message with parent classes
response = await super().process_message(message, **kwargs)
# Learn from this interaction
interaction = {
"message": message,
"response": response,
"task_type": "medical_query",
"success": True # Determine success criteria
}
await self.learn_from_interaction(interaction)
return response
Custom Reasoning Patterns¶
Chain of Thought Reasoning¶
class ChainOfThoughtAgent(Agent):
"""Agent with explicit chain of thought reasoning."""
async def process_message(self, message: str, **kwargs) -> str:
# Generate step-by-step reasoning
thought_chain = await self._generate_thought_chain(message)
# Execute each step
results = []
for step in thought_chain.steps:
step_result = await self._execute_reasoning_step(step)
results.append(step_result)
# Synthesize final answer
final_answer = await self._synthesize_answer(thought_chain, results)
# Return with reasoning trace
return {
"answer": final_answer,
"reasoning": thought_chain,
"step_results": results
}
async def _generate_thought_chain(self, message: str) -> dict:
"""Generate explicit chain of thought."""
prompt = f"""
Break down this problem into clear reasoning steps:
Problem: {message}
Think step by step:
1. What information do I need?
2. What steps should I take?
3. How will I verify my answer?
"""
thought_response = await self.llm_client.complete(prompt)
return self._parse_thought_chain(thought_response)
Multi-Modal Reasoning¶
class MultiModalAgent(Agent):
"""Agent capable of processing multiple modalities."""
def __init__(self, config: AgentConfig, llm_client: LLMClient):
super().__init__(config, llm_client)
self.vision_processor = VisionProcessor()
self.audio_processor = AudioProcessor()
async def process_message(self, message: str, **kwargs) -> str:
attachments = kwargs.get("attachments", [])
# Process different modalities
processed_inputs = {
"text": message,
"images": [],
"audio": [],
"documents": []
}
for attachment in attachments:
modality = self._detect_modality(attachment)
processed_data = await self._process_modality(attachment, modality)
processed_inputs[modality].append(processed_data)
# Multi-modal reasoning
unified_representation = await self._create_unified_representation(processed_inputs)
response = await self._multi_modal_reasoning(unified_representation)
return response
async def _create_unified_representation(self, inputs: dict) -> dict:
"""Create unified representation from multi-modal inputs."""
# Your multi-modal fusion logic
return {
"text_features": inputs["text"],
"visual_features": self._extract_visual_features(inputs["images"]),
"audio_features": self._extract_audio_features(inputs["audio"]),
"semantic_alignment": self._align_modalities(inputs)
}
Agent Coordination¶
Multi-Agent Systems¶
class CoordinatorAgent(Agent):
"""Agent that coordinates multiple specialized agents."""
def __init__(self, config: AgentConfig, llm_client: LLMClient):
super().__init__(config, llm_client)
self.specialist_agents = {}
def register_specialist(self, domain: str, agent: Agent):
"""Register a specialist agent for a domain."""
self.specialist_agents[domain] = agent
async def process_message(self, message: str, **kwargs) -> str:
# Analyze which specialists are needed
required_specialists = await self._analyze_required_expertise(message)
if len(required_specialists) == 1:
# Single specialist needed
specialist = self.specialist_agents[required_specialists[0]]
return await specialist.process_message(message, **kwargs)
elif len(required_specialists) > 1:
# Multiple specialists needed - coordinate
return await self._coordinate_specialists(message, required_specialists, **kwargs)
else:
# Handle with general capabilities
return await super().process_message(message, **kwargs)
async def _coordinate_specialists(self, message: str, specialists: list, **kwargs) -> str:
"""Coordinate multiple specialists."""
specialist_responses = {}
# Get responses from each specialist
for specialist_name in specialists:
specialist = self.specialist_agents[specialist_name]
response = await specialist.process_message(message, **kwargs)
specialist_responses[specialist_name] = response
# Synthesize responses
final_response = await self._synthesize_specialist_responses(
message, specialist_responses
)
return final_response
# Usage example
coordinator = CoordinatorAgent(config, llm_client)
coordinator.register_specialist("medical", MedicalAssistantAgent(config, llm_client))
coordinator.register_specialist("code_review", CodeReviewAgent(config, llm_client))
coordinator.register_specialist("legal", LegalAssistantAgent(config, llm_client))
Testing Custom Agents¶
Unit Testing¶
import pytest
from unittest.mock import AsyncMock
class TestCustomAgent:
@pytest.fixture
async def agent(self):
config = AgentConfig(name="Test Agent")
llm_client = AsyncMock()
return CustomAgent(config, llm_client)
async def test_custom_processing(self, agent):
"""Test custom processing pipeline."""
message = "Test message"
response = await agent.process_message(message)
assert response is not None
assert isinstance(response, str)
async def test_specialized_behavior(self, agent):
"""Test domain-specific behavior."""
medical_query = "What are the symptoms of flu?"
response = await agent.process_message(medical_query)
# Verify medical-specific processing
assert "medical" in response.lower() or "symptom" in response.lower()
async def test_error_handling(self, agent):
"""Test error handling in custom agent."""
# Test with malformed input
with pytest.raises(ValueError):
await agent.process_message("")
Integration Testing¶
async def test_agent_integration():
"""Test custom agent with real LLM."""
llm_config = get_llm_config_from_env()
llm_client = LLMClient(llm_config)
config = AgentConfig(
name="Integration Test Agent",
enable_tools=True,
enable_memory=True
)
agent = CustomAgent(config, llm_client)
# Test actual interaction
response = await agent.process_message("Hello, how are you?")
assert len(response) > 0
# Test memory
response2 = await agent.process_message("What did I just say?")
assert "hello" in response2.lower()
Best Practices¶
Design Principles¶
- Single Responsibility - Each agent should have a clear, focused purpose
- Composition over Inheritance - Use mixins for reusable capabilities
- Async First - Design all methods to be async-compatible
- Error Handling - Implement robust error handling and recovery
- Testing - Write comprehensive tests for custom behavior
Performance Considerations¶
- Lazy Loading - Load expensive resources only when needed
- Caching - Cache expensive operations and computations
- Resource Management - Properly manage memory and connections
- Batching - Batch operations when possible for efficiency
Maintenance¶
- Documentation - Document custom behavior and configuration
- Versioning - Version your custom agents for reproducibility
- Monitoring - Add observability to custom components
- Backwards Compatibility - Maintain compatibility when updating
Example: Scientific Research Agent¶
class ScientificResearchAgent(AdvancedMemoryMixin, Agent):
"""Agent specialized for scientific research tasks."""
def __init__(self, config: AgentConfig, llm_client: LLMClient):
super().__init__(config, llm_client)
# Scientific tools and databases
self.scientific_databases = ["arxiv", "pubmed", "google_scholar"]
self.analysis_tools = ["statistical_analyzer", "citation_tracker"]
async def process_message(self, message: str, **kwargs) -> str:
# Detect research intent
research_type = await self._classify_research_intent(message)
if research_type == "literature_review":
return await self._conduct_literature_review(message)
elif research_type == "data_analysis":
return await self._analyze_research_data(message, **kwargs)
elif research_type == "hypothesis_generation":
return await self._generate_hypotheses(message)
else:
return await super().process_message(message, **kwargs)
async def _conduct_literature_review(self, query: str) -> str:
"""Conduct systematic literature review."""
# 1. Search scientific databases
papers = await self._search_scientific_literature(query)
# 2. Analyze and categorize papers
categorized_papers = await self._categorize_papers(papers)
# 3. Extract key findings
key_findings = await self._extract_key_findings(categorized_papers)
# 4. Identify research gaps
research_gaps = await self._identify_research_gaps(key_findings)
# 5. Generate review report
review_report = await self._generate_literature_review(
key_findings, research_gaps, categorized_papers
)
return review_report
Next Steps¶
- RAG Engine - Enhance agents with knowledge retrieval
- Tools Development - Create custom tools for your agents
- Advanced Features - Add advanced reasoning capabilities
- Deployment - Deploy custom agents to production