Why Multi-Agent Systems Are a Pain in the Ass (But Worth It)

MCP lets agents share tools without writing custom APIs for everything. That's it. Anthropic released it in November 2024 and it actually solves the problem of getting AI agents to work together without constant crashes.

MCP uses JSON-RPC 2.0 for agent communication. I tried other frameworks like AutoGen - they're overly complex. MCP just focuses on tool sharing, which actually works.

What You're Getting Into

Here's what building a multi-agent system actually means:

Debugging Hell: When something breaks, you need to figure out which of your 4+ services is the culprit. Was it the coordinator timing out? The research agent hitting rate limits? Network issues between containers? Good fucking luck.

JSON-RPC 2.0 Everywhere: Every message between agents uses this protocol. It's fine until you get cryptic error codes like -32601 (method not found) at 2 AM when you forgot to register a tool. You'll memorize these error codes, but the messages are still useless for debugging distributed systems failures.

Memory Leaks Galore: Each agent runs its own event loop. If you don't handle cleanup properly, your system will slowly eat all available RAM. I learned this the hard way after production crashed twice. Python's asyncio loops leak memory in long-running services, especially with concurrent HTTP requests.

The Three Types of MCP Components (That Will Confuse You)

  1. MCP Hosts - Where your LLM actually runs (Claude Desktop, your custom app)
  2. MCP Clients - The thing that translates between your host and servers (most confusing part)
  3. MCP Servers - Your actual agents that do the work

Every agent is both a client AND a server. This dual role is why the architecture diagram looks like spaghetti and why debugging takes forever.

Each agent is its own process that talks to the others. When one crashes, the rest keep running. In theory.

Why Bother With This Complexity?

After building this system and watching it crash for three weeks, I can tell you the real advantages:

  • When one agent dies, others keep working (if you code the error handling right)
  • You can scale individual agents instead of the whole monolith
  • Adding new capabilities doesn't require rewriting everything
  • Different agents can run on different hardware (GPU for ML, CPU for coordination)

The official examples show perfect systems that never fail. Reality is messier. Agents timeout, Docker containers refuse to start, and networking issues will drive you insane. But when it works, having specialized agents that coordinate through MCP beats managing one giant monolithic AI application.

Setup: Where Everything Goes Wrong

The installation process looks simple in the docs. It's not. Here's what actually happens when you try to set this up.

What You Need (And What Will Break)

Software you need:

Hardware reality check:

  • 16GB RAM minimum. The docs say 8GB but that's bullshit when you're running 4 agents
  • Fast internet. The initial pip installs will download 500MB+ of dependencies
  • Patience for when Docker decides to eat all your disk space

The Concepts That Matter (Skip the Marketing)

JSON-RPC 2.0: Every agent talks using this protocol. When you get -32602 errors, it means your parameters are fucked. When you get -32601, the method doesn't exist. You'll memorize these error codes.

Schema validation: Your inputs need to match JSON schemas or everything breaks silently. The error messages are useless. Use Pydantic for better validation errors.

Agent discovery: Agents find each other using listCapabilities. This works great until network issues make agents disappear randomly. You'll need health checks and service discovery patterns.

Installation That Actually Works

Create your project directory and virtual environment:

mkdir mcp-agents
cd mcp-agents
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

Now install packages (this will take forever):

pip install fastmcp httpx asyncio aiofiles
pip install openai anthropic pydantic jsonschema

These packages handle the heavy lifting: FastMCP for MCP protocol handling, httpx for async HTTP, Pydantic for data validation, and aiofiles for async file I/O. Check the FastMCP documentation and httpx async guide for implementation details.

Common installation failures:

  • pip install fastmcp fails on Windows → Use WSL2 or give up
  • httpx timeout errors → Your internet sucks, try again
  • Import errors with asyncio → You're on Python 3.7, upgrade

Project Structure (Keep It Simple)

mcp-agents/
├── coordinator.py     # The boss agent
├── researcher.py      # Web scraping agent
├── analyzer.py        # Number crunching agent
├── reporter.py        # Markdown generator
├── shared_utils.py    # Common code
└── docker-compose.yml # For when local testing fails

Docker Container Networking

Forget the fancy directory structure. You'll spend more time organizing files than fixing bugs.

Test Your Setup (This Will Fail)

Create test_setup.py:

## test_setup.py
from fastmcp import FastMCP
import asyncio

mcp = FastMCP(\"Test Server\")

@mcp.tool(\"ping\")
async def ping() -> str:
    return \"pong\"

if __name__ == \"__main__\":
    try:
        print(\"Testing MCP setup...\")
        # This will crash if dependencies are fucked
        result = asyncio.run(ping())
        print(f\"Success: {result}\")
    except Exception as e:
        print(f\"Setup broken: {e}\")
        print(\"Try: pip install --upgrade fastmcp\")

Run it: python test_setup.py

If it fails:

  1. Check Python version: python --version (needs 3.8+)
  2. Reinstall fastmcp: pip uninstall fastmcp && pip install fastmcp
  3. Check virtual environment is active (prompt should show (venv))
  4. On Windows, try WSL2 instead

If it works: Great, you're one of the lucky ones. Time to build the actual agents and watch them crash in new and exciting ways.

The Coordinator Agent: Your Single Point of Failure

The coordinator is the boss agent that tells other agents what to do. When it works, your system scales nicely. When it crashes (often), everything stops working. Here's how to build one that crashes less frequently.

Why This Architecture Sucks But Works

Microservices Distributed Architecture

I tried the orchestrator-worker pattern because it looked elegant in the diagrams. Reality check: it creates a single point of failure and debugging nightmares when agents can't reach the coordinator.

What the coordinator actually does:

  • Breaks down tasks into smaller pieces
  • Figures out which agent should handle what
  • Waits for agents to respond (or timeout)
  • Tries to make sense of the results

What goes wrong:

The single point of failure pattern is a classic distributed systems anti-pattern, but coordinators are necessary evils in multi-agent architectures. Consider circuit breaker patterns and retry logic with exponential backoff.

Coordinator Code That Actually Handles Failures

Create coordinator.py (forget the nested directory structure):

## coordinator.py
from fastmcp import FastMCP
import asyncio
import httpx
import time
import logging
from typing import Dict, List, Any, Optional
from pydantic import BaseModel

## Set up logging because you'll need it
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TaskRequest(BaseModel):
    task_description: str
    priority: int = 1
    timeout_seconds: int = 60  # Be realistic about timeouts

class RegisteredAgent(BaseModel):
    name: str
    endpoint: str
    capabilities: List[str]
    last_seen: float
    healthy: bool = True

class CoordinatorAgent:
    def __init__(self):
        self.mcp = FastMCP("Coordinator")
        self.agents: Dict[str, RegisteredAgent] = {}
        self.setup_tools()
    
    def setup_tools(self):
        @self.mcp.tool("register_agent")
        async def register_agent(name: str, endpoint: str, capabilities: List[str]) -> str:
            """Register an agent (they'll disappear randomly)"""
            self.agents[name] = RegisteredAgent(
                name=name,
                endpoint=endpoint,
                capabilities=capabilities,
                last_seen=time.time()
            )
            logger.info(f"Agent {name} registered with {len(capabilities)} capabilities")
            return f"Registered {name}. Expect it to disappear soon."
        
        @self.mcp.tool("execute_task")
        async def execute_task(request: TaskRequest) -> Dict[str, Any]:
            """Execute a task using available agents"""
            
            # Clean up dead agents first
            await self._cleanup_dead_agents()
            
            if not self.agents:
                return {"error": "No agents available", "status": "failed"}
            
            try:
                # Simple task assignment - no fancy LLM decomposition that breaks
                assignments = self._assign_simple_tasks(request.task_description)
                
                # Execute with timeout handling
                results = await self._execute_with_retries(assignments, request.timeout_seconds)
                
                return {
                    "status": "completed" if results else "failed",
                    "results": results,
                    "agents_used": list(assignments.keys()),
                    "execution_time": time.time()
                }
                
            except Exception as e:
                logger.error(f"Task execution failed: {e}")
                return {"error": str(e), "status": "failed"}
    
    async def _cleanup_dead_agents(self):
        """Remove agents that haven't been seen in 60 seconds"""
        cutoff = time.time() - 60
        dead_agents = [name for name, agent in self.agents.items() 
                      if agent.last_seen < cutoff]
        
        for name in dead_agents:
            logger.warning(f"Removing dead agent: {name}")
            del self.agents[name]
    
    def _assign_simple_tasks(self, description: str) -> Dict[str, str]:
        """Simple task assignment - no LLM bullshit that breaks"""
        assignments = {}
        
        # Hardcoded task routing because LLM decomposition fails randomly
        if "research" in description.lower() or "search" in description.lower():
            researcher = self._find_agent_with_capability("search")
            if researcher:
                assignments[researcher] = f"Search for: {description}"
        
        if "analyze" in description.lower() or "data" in description.lower():
            analyzer = self._find_agent_with_capability("analyze")
            if analyzer:
                assignments[analyzer] = f"Analyze: {description}"
        
        # Always try to generate a report
        reporter = self._find_agent_with_capability("report")
        if reporter:
            assignments[reporter] = f"Generate report for: {description}"
        
        return assignments
    
    def _find_agent_with_capability(self, capability: str) -> Optional[str]:
        """Find any agent that claims to have this capability"""
        for name, agent in self.agents.items():
            if agent.healthy and any(capability in cap for cap in agent.capabilities):
                return name
        return None
    
    async def _execute_with_retries(self, assignments: Dict[str, str], 
                                  timeout: int) -> Dict[str, Any]:
        """Execute tasks with retries because everything fails"""
        results = {}
        
        for agent_name, task in assignments.items():
            agent = self.agents.get(agent_name)
            if not agent:
                results[agent_name] = {"error": "Agent disappeared"}
                continue
            
            # Try the task, retry once if it fails
            for attempt in range(2):
                try:
                    async with httpx.AsyncClient(timeout=timeout) as client:
                        response = await client.post(
                            f"{agent.endpoint}/execute",
                            json={"task": task},
                            timeout=timeout
                        )
                        
                        if response.status_code == 200:
                            result = response.json()
                            results[agent_name] = result
                            agent.last_seen = time.time()
                            break
                        else:
                            logger.warning(f"Agent {agent_name} returned {response.status_code}")
                            
                except (httpx.TimeoutException, httpx.ConnectError) as e:
                    logger.warning(f"Agent {agent_name} timeout/connection error (attempt {attempt + 1})")
                    if attempt == 1:  # Last attempt
                        results[agent_name] = {"error": f"Agent unreachable: {str(e)}"}
                        agent.healthy = False
                        
                except Exception as e:
                    logger.error(f"Agent {agent_name} unexpected error: {e}")
                    results[agent_name] = {"error": str(e)}
                    break
        
        return results

if __name__ == "__main__":
    coordinator = CoordinatorAgent()
    print("Coordinator started. Agents will crash soon.")

Key differences from the perfect example:

  • Actually handles timeouts and connection errors
  • Cleans up dead agents automatically
  • Simple task assignment instead of LLM decomposition that breaks
  • Retries failed requests (once, not forever)
  • Logs everything because debugging is hell otherwise

Common failure modes:

  • Agents register then immediately become unreachable → Check Docker networking
  • Tasks timeout after 60 seconds → Some APIs are slow, increase timeout
  • JSON parsing errors → Agents return malformed responses, add validation
  • Memory leaks → Old httpx clients not getting closed, use context managers

This coordinator will crash less than the "elegant" examples, but it will still crash. The key is failing gracefully and logging everything so you can figure out what went wrong at 3 AM.

Worker Agents: The Ones That Actually Do the Work

These three agents handle the real work while the coordinator pretends to be important. Each one will break in unique and frustrating ways. Here's how to build them so they fail less catastrophically.

The Research Agent: A Glorified Web Scraper

The research agent is basically a web scraper with delusions of grandeur. It searches the web, extracts content, and tries not to get rate-limited. Modern scraping requires rotating user agents, respecting robots.txt, and handling anti-bot measures. Create researcher.py:

## researcher.py
from fastmcp import FastMCP
import httpx
import asyncio
import time
import logging
from typing import List, Dict, Any
from pydantic import BaseModel

logger = logging.getLogger(__name__)

class SearchResult(BaseModel):
    title: str
    url: str
    snippet: str
    score: float

class ResearchAgent:
    def __init__(self):
        self.mcp = FastMCP("Researcher")
        self.last_request_time = 0
        self.request_count = 0
        self.setup_tools()
    
    def setup_tools(self):
        @self.mcp.tool("search")
        async def search(query: str, max_results: int = 5) -> List[Dict[str, Any]]:
            """Search the web (until we get rate limited)"""
            
            # Rate limiting because APIs hate us
            await self._respect_rate_limits()
            
            try:
                # This would use a real search API in production
                # For now, simulate search with delays and failures
                await asyncio.sleep(1)  # Simulate API call
                
                # Randomly fail sometimes because real APIs do
                if self.request_count % 7 == 0:
                    raise Exception("API quota exceeded")
                
                self.request_count += 1
                
                # Mock results that look realistic
                results = []
                for i in range(min(max_results, 3)):  # Never trust max_results
                    results.append({
                        "title": f"Search result {i+1} for: {query}",
                        "url": f"https://example.com/result-{i+1}",
                        "snippet": f"This is relevant information about {query}. " * 3,
                        "score": 0.9 - (i * 0.1)
                    })
                
                logger.info(f"Search completed: {query} -> {len(results)} results")
                return results
                
            except Exception as e:
                logger.error(f"Search failed: {e}")
                return [{"error": f"Search failed: {str(e)}"}]
        
        @self.mcp.tool("extract_content")
        async def extract_content(urls: List[str]) -> Dict[str, Any]:
            """Extract content from URLs (half will timeout)"""
            
            results = {}
            
            for url in urls[:3]:  # Limit to 3 URLs because more will crash
                try:
                    async with httpx.AsyncClient(timeout=10.0) as client:
                        await asyncio.sleep(0.5)  # Rate limiting
                        
                        # Simulate random failures
                        if "example.com" in url:
                            raise httpx.TimeoutException("Fake timeout")
                        
                        # Mock content extraction
                        results[url] = f"Extracted content from {url}. " + "Lorem ipsum. " * 50
                        
                except httpx.TimeoutException:
                    results[url] = {"error": "Timeout - site too slow"}
                    logger.warning(f"Timeout extracting: {url}")
                    
                except Exception as e:
                    results[url] = {"error": str(e)}
                    logger.error(f"Failed to extract {url}: {e}")
            
            return results
    
    async def _respect_rate_limits(self):
        """Rate limiting because getting banned sucks"""
        current_time = time.time()
        time_since_last = current_time - self.last_request_time
        
        if time_since_last < 1.0:  # Minimum 1 second between requests
            wait_time = 1.0 - time_since_last
            logger.info(f"Rate limiting: waiting {wait_time:.2f}s")
            await asyncio.sleep(wait_time)
        
        self.last_request_time = time.time()

if __name__ == "__main__":
    agent = ResearchAgent()
    print("Research agent started. Will get rate limited soon.")

The Analysis Agent: Number Cruncher That Crashes on Large Data

The analysis agent processes data until it runs out of memory. It's great for small datasets, useless for anything realistic. Pandas memory usage grows exponentially with data size, and Python's memory management doesn't help. Consider Dask for out-of-core processing or chunking strategies for large files. Create analyzer.py:

## analyzer.py
from fastmcp import FastMCP
import pandas as pd
import logging
from typing import List, Dict, Any
from pydantic import BaseModel

logger = logging.getLogger(__name__)

class AnalysisAgent:
    def __init__(self):
        self.mcp = FastMCP("Analyzer")
        self.setup_tools()
    
    def setup_tools(self):
        @self.mcp.tool("analyze")
        async def analyze(data: List[Dict[str, Any]]) -> Dict[str, Any]:
            """Analyze data (will OOM on datasets > 100MB)"""
            
            if not data:
                return {"error": "No data provided", "suggestion": "Send some data"}
            
            if len(data) > 10000:
                return {"error": "Dataset too large", "size": len(data), 
                       "suggestion": "Use a real database, not pandas"}
            
            try:
                # Convert to DataFrame (this will crash on complex nested data)
                df = pd.DataFrame(data)
                logger.info(f"Analyzing {len(df)} rows, {len(df.columns)} columns")
                
                # Basic stats that sometimes work
                numeric_columns = df.select_dtypes(include=['number']).columns
                
                if len(numeric_columns) == 0:
                    return {"error": "No numeric data found", 
                           "columns": list(df.columns),
                           "suggestion": "Send data with actual numbers"}
                
                # Simple analysis that won't crash
                stats = {}
                insights = []
                
                for col in numeric_columns:
                    try:
                        col_stats = {
                            "mean": float(df[col].mean()),
                            "median": float(df[col].median()),
                            "std": float(df[col].std()),
                            "min": float(df[col].min()),
                            "max": float(df[col].max())
                        }
                        stats[col] = col_stats
                        
                        # Generate obvious insights
                        if col_stats["std"] > col_stats["mean"]:
                            insights.append(f"{col} has high variability")
                        
                        if col_stats["min"] < 0 and col_stats["max"] > 0:
                            insights.append(f"{col} spans positive and negative values")
                            
                    except Exception as e:
                        logger.warning(f"Failed to analyze column {col}: {e}")
                        stats[col] = {"error": str(e)}
                
                # Look for correlations (this often crashes)
                correlations = []
                if len(numeric_columns) > 1:
                    try:
                        corr_matrix = df[numeric_columns].corr()
                        for i, col1 in enumerate(numeric_columns):
                            for col2 in numeric_columns[i+1:]:
                                corr_val = corr_matrix.loc[col1, col2]
                                if abs(corr_val) > 0.7:
                                    correlations.append({
                                        "variables": [col1, col2],
                                        "correlation": float(corr_val)
                                    })
                    except Exception as e:
                        logger.warning(f"Correlation analysis failed: {e}")
                
                return {
                    "status": "completed",
                    "rows_analyzed": len(df),
                    "numeric_columns": len(numeric_columns),
                    "statistics": stats,
                    "insights": insights,
                    "correlations": correlations,
                    "warning": "Analysis limited to basic stats to prevent crashes"
                }
                
            except MemoryError:
                return {"error": "Out of memory", "suggestion": "Try smaller dataset or buy more RAM"}
                
            except pd.errors.EmptyDataError:
                return {"error": "Empty dataset", "suggestion": "Send actual data"}
                
            except Exception as e:
                logger.error(f"Analysis failed: {e}")
                return {"error": f"Analysis crashed: {str(e)}"}

if __name__ == "__main__":
    agent = AnalysisAgent()
    print("Analysis agent started. Will crash on large datasets.")

The Reporter Agent: Markdown Generator

The reporter agent takes data and spits out markdown. It's the simplest agent and somehow still manages to break. Create reporter.py:

## reporter.py
from fastmcp import FastMCP
import json
import logging
from datetime import datetime
from typing import List, Dict, Any
from pydantic import BaseModel

logger = logging.getLogger(__name__)

class ReportAgent:
    def __init__(self):
        self.mcp = FastMCP("Reporter")
        self.setup_tools()
    
    def setup_tools(self):
        @self.mcp.tool("report")
        async def generate_report(title: str, data: List[Dict[str, Any]]) -> Dict[str, Any]:
            """Generate a report (basically markdown formatting)"""
            
            try:
                report = []
                report.append(f"# {title}")
                report.append(f"
*Generated on {datetime.now().strftime('%Y-%m-%d %H:%M')}*
")
                
                if not data:
                    report.append("## Error

No data provided for report.
")
                    return {"report": "
".join(report), "word_count": 8}
                
                # Executive summary (if we have data)
                report.append("## Summary
")
                report.append(f"Processed {len(data)} data sources.
")
                
                # Process each data source
                report.append("## Findings
")
                
                findings_count = 0
                for i, item in enumerate(data[:5]):  # Limit to 5 items
                    try:
                        if isinstance(item, dict):
                            if "error" in item:
                                report.append(f"- **Source {i+1}**: Error - {item['error']}")
                            elif "results" in item:
                                results = item["results"]
                                if isinstance(results, list):
                                    report.append(f"- **Source {i+1}**: Found {len(results)} results")
                                    findings_count += len(results) if isinstance(results, list) else 1
                                else:
                                    report.append(f"- **Source {i+1}**: {str(results)[:100]}...")
                            else:
                                # Generic handling for other data structures
                                content = str(item)[:200]
                                report.append(f"- **Source {i+1}**: {content}...")
                        else:
                            report.append(f"- **Source {i+1}**: {str(item)[:100]}...")
                            
                        findings_count += 1
                        
                    except Exception as e:
                        report.append(f"- **Source {i+1}**: Failed to process - {str(e)}")
                        logger.warning(f"Failed to process item {i}: {e}")
                
                # Recommendations (always generic)
                report.append("
## Recommendations
")
                if findings_count > 0:
                    report.append("- Review the findings above")
                    report.append("- Verify data quality for next analysis")
                    report.append("- Consider additional data sources")
                else:
                    report.append("- Fix data collection issues")
                    report.append("- Check agent connectivity")
                
                # Footer
                report.append(f"
---
*Report contains {findings_count} findings from multi-agent analysis*")
                
                full_report = "
".join(report)
                word_count = len(full_report.split())
                
                return {
                    "report": full_report,
                    "word_count": word_count,
                    "findings_count": findings_count,
                    "status": "completed"
                }
                
            except Exception as e:
                logger.error(f"Report generation failed: {e}")
                return {
                    "error": f"Report generation failed: {str(e)}",
                    "report": f"# Error Report

Failed to generate report: {str(e)}",
                    "word_count": 0
                }

if __name__ == "__main__":
    agent = ReportAgent()
    print("Reporter agent started. Ready to generate markdown.")

Key differences from the tutorial examples:

  • Research agent: Actually handles rate limiting and API failures instead of pretending everything works
  • Analysis agent: Admits it will crash on large datasets and includes memory safeguards
  • Reporter agent: Simple markdown generation that doesn't try to be clever and break

Common failure modes you'll encounter:

  • Research agent gets rate-limited after 20 requests → Add exponential backoff
  • Analysis agent runs out of memory on > 10K rows → Use chunking or a real database
  • Reporter agent crashes on complex nested data → Flatten your data structures first
  • All agents become unreachable when Docker containers restart → Implement health checks

These agents work for demos and small tasks. For production, you'll need to add proper error handling, retries, monitoring, and probably rewrite them in Go because Python's GIL will drive you insane.

Frequently Asked Questions (The Real Ones)

Q

Why does my coordinator keep losing track of agents?

A

Agents disappear when their containers restart or crash. I spent two weeks debugging this. The fix: implement health checks that ping agents every 30 seconds and remove dead ones from your registry. Also, set restart: unless-stopped in your docker-compose.yml or they'll stay dead after crashes.

Q

Why do I keep getting error code -32601?

A

Method not found. You called a tool that doesn't exist or isn't registered yet. Check:

  1. the agent is actually running
  2. the tool name matches exactly (case sensitive)
  3. the agent has finished loading all its tools.
    Add a /health endpoint to check agent status.
Q

My research agent gets rate-limited constantly. How do I fix this?

A

Google's free tier allows ~100 searches/day. That's nothing. Either pay for an API key or implement aggressive caching. I cache search results for 24 hours and use multiple search APIs (Bing, Duck

DuckGo) with fallback logic. Also, add delays between requests

  • 1 second minimum.
Q

Docker containers keep running out of memory. What's the issue?

A

The analysis agent loads entire datasets into pandas DataFrames. This crashes on anything > 50MB. Solutions:

  1. Set memory limits in docker-compose.yml
  2. Process data in chunks
  3. Use a real database instead of keeping everything in memory.
    I learned this when my 16GB machine couldn't analyze a 200MB CSV.
Q

How do I debug when everything times out?

A

Set short timeouts initially (5-10 seconds) and gradually increase them. Most issues are network-related. Check:

  1. agents can reach each other (docker exec -it container ping other_container)
  2. firewalls aren't blocking ports
  3. agents aren't stuck in infinite loops.
    Use docker logs container_name to see what's actually happening.
Q

Can I run this without Docker?

A

Yes, but you'll hate yourself. Run each agent in a separate terminal with different ports: python coordinator.py --port 8000, python researcher.py --port 8001, etc. Docker handles networking, restarts, and cleanup. Without it, you're manually managing 4+ Python processes.

Q

The coordinator assigns tasks to dead agents. How do I fix this?

A

Add agent health tracking. When an agent fails a request, mark it as unhealthy and stop assigning tasks to it. I do a simple ping every 60 seconds and remove agents that don't respond. Here's the check: GET /health should return {"status": "healthy"}.

Q

My analysis agent crashes on real data. Why?

A

Pandas is not designed for production workloads. It loads everything into memory and crashes on large datasets. For anything serious: use PostgreSQL with chunked processing, not pandas. Or add data size limits: reject requests > 10,000 rows and tell users to use a real database.

Q

Why does the reporter agent generate garbage reports?

A

Because the data it receives is garbage. Add data validation before report generation. Check that dictionaries have expected keys, lists aren't empty, and strings aren't just error messages. I validate every data source and skip broken ones rather than crashing the whole report.

Q

What's the point of this vs just using API calls?

A

MCP provides standardized tool discovery and error handling. Without it, you're writing custom API clients for every agent interaction. MCP handles the JSON-RPC messaging, capability discovery, and error codes. It's less code and more reliable than rolling your own protocol.

Q

How do I scale this beyond 4 agents?

A

Don't. This architecture doesn't scale. For > 10 agents, use a proper message queue (Redis, RabbitMQ) instead of direct HTTP calls. Or switch to a microservices framework designed for scale. MCP is great for small systems, terrible for large distributed architectures.

Q

Everything works locally but fails in production. Why?

A

Production has real network latency, slower disks, less memory, and actual users. Increase all timeouts by 5x, reduce batch sizes by 10x, and add retries everywhere. Local development is a lie. Test with realistic delays: await asyncio.sleep(random.uniform(0.1, 2.0)) in your local setup.

Testing and Deployment: Where Good Code Goes to Die

Testing multi-agent systems is a special kind of hell. Nothing works the way you expect, everything times out randomly, and Docker containers develop a personal vendetta against you. I learned this when our test environment worked perfectly but production died from network issues, timing problems, and cascade failures. Here's how to make it suck less.

Local Testing: The Sweet Lies

Local testing gives you false confidence. Everything works perfectly until you deploy to production and watch it burn. But you still need to test locally to catch the obvious bugs.

Create test_basic.py:

## test_basic.py
import pytest
import asyncio
import httpx
import time

## Test individual agents first (they'll all be broken differently)

@pytest.mark.asyncio
async def test_coordinator_doesnt_crash():
    """Test that coordinator starts without dying immediately"""
    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.get("http://localhost:8000/health")
            assert response.status_code == 200
            assert "healthy" in response.json().get("status", "")
    except httpx.ConnectError:
        pytest.skip("Coordinator not running. Start it first.")

@pytest.mark.asyncio 
async def test_agents_can_register():
    """Test that agents don't crash when registering"""
    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.post(
                "http://localhost:8000/register_agent",
                json={
                    "name": "test_agent",
                    "endpoint": "http://localhost:9999", 
                    "capabilities": ["test"]
                }
            )
            # Any response is good - at least it didn't crash
            assert response.status_code in [200, 201, 202]
    except Exception as e:
        pytest.fail(f"Registration crashed: {e}")

@pytest.mark.asyncio
async def test_simple_task_doesnt_hang():
    """Test that a simple task completes (or fails gracefully)"""
    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            start_time = time.time()
            response = await client.post(
                "http://localhost:8000/execute_task",
                json={"task_description": "test task", "timeout_seconds": 20}
            )
            duration = time.time() - start_time
            
            # We don't care if it succeeds, just that it returns something
            assert duration < 25.0  # Should not hang forever
            assert response.status_code in [200, 400, 500]  # Any response is better than timeout
            
    except httpx.TimeoutException:
        pytest.fail("Task hung - this is bad")
    except Exception as e:
        # Other errors are fine, at least it didn't hang
        print(f"Task failed but didn't hang: {e}")

Run tests: pytest test_basic.py -v

What actually happens:

  • Tests pass locally, fail in CI
  • Agents start in wrong order, everything breaks
  • Network timeouts everywhere
  • Tests are flaky and fail randomly

Integration Testing: Mock Everything

Don't test against real APIs in integration tests. They'll rate-limit you, go down during your demos, and generally ruin your day. Mock everything. I use pytest-mock and responses libraries for HTTP mocking.

## test_integration.py
import pytest
import asyncio
from unittest.mock import patch, AsyncMock

@pytest.mark.asyncio
async def test_end_to_end_workflow_mocked():
    """Test the whole workflow with mocked external calls"""
    
    # Mock all the things that break
    with patch('httpx.AsyncClient.post') as mock_post:
        # Mock successful agent responses
        mock_response = AsyncMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "results": ["mock result 1", "mock result 2"],
            "status": "completed"
        }
        mock_post.return_value = mock_response
        
        # Mock search API that doesn't rate limit
        with patch('researcher.search_api') as mock_search:
            mock_search.return_value = [{"title": "Mock result", "url": "http://test.com"}]
            
            # Now test the workflow
            async with httpx.AsyncClient(timeout=15.0) as client:
                response = await client.post(
                    "http://localhost:8000/execute_task",
                    json={"task_description": "research AI safety", "timeout_seconds": 10}
                )
                
                # Test that mocked system works
                assert response.status_code == 200
                result = response.json()
                assert "status" in result

Load Testing: How to Break Your System

Run multiple tasks simultaneously to find where it breaks:

## load_test.py
import asyncio
import httpx
import time

async def spam_coordinator():
    """Send many requests to find the breaking point"""
    
    async def send_request(task_id):
        try:
            async with httpx.AsyncClient(timeout=30.0) as client:
                start = time.time()
                response = await client.post(
                    "http://localhost:8000/execute_task",
                    json={"task_description": f"task {task_id}", "timeout_seconds": 15}
                )
                duration = time.time() - start
                return {"id": task_id, "status": response.status_code, "duration": duration}
        except Exception as e:
            return {"id": task_id, "error": str(e), "duration": 999}
    
    # Start with 5 concurrent requests, increase until system breaks
    for num_concurrent in [5, 10, 20]:
        print(f"
Testing {num_concurrent} concurrent requests...")
        
        tasks = [send_request(i) for i in range(num_concurrent)]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        successful = sum(1 for r in results if isinstance(r, dict) and r.get("status") == 200)
        avg_duration = sum(r.get("duration", 0) for r in results if isinstance(r, dict)) / len(results)
        
        print(f"Success rate: {successful}/{num_concurrent} ({successful/num_concurrent*100:.1f}%)")
        print(f"Average duration: {avg_duration:.2f}s")
        
        if successful < num_concurrent * 0.8:  # Less than 80% success
            print(f"System breaks at {num_concurrent} concurrent requests")
            break

if __name__ == "__main__":
    asyncio.run(spam_coordinator())

Docker Deployment: Pain and Suffering

Here's a docker-compose.yml that actually works (sometimes):

## docker-compose.yml
version: '3.8'

services:
  coordinator:
    build: .
    command: python coordinator.py --port 8000
    ports:
      - "8000:8000"
    environment:
      - PYTHONUNBUFFERED=1  # See logs immediately
      - LOG_LEVEL=DEBUG     # See everything that breaks
    restart: unless-stopped  # Auto-restart when it crashes
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 30s      # Give it time to start
    depends_on:
      - researcher
      - analyzer
    networks:
      - agent-network

  researcher:
    build: .
    command: python researcher.py --port 8001
    ports:
      - "8001:8001"
    environment:
      - PYTHONUNBUFFERED=1
      - COORDINATOR_URL=http://coordinator:8000
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    networks:
      - agent-network

  analyzer:
    build: .
    command: python analyzer.py --port 8002
    ports:
      - "8002:8002"
    environment:
      - PYTHONUNBUFFERED=1
    restart: unless-stopped
    mem_limit: 2g  # Prevent memory bombs
    networks:
      - agent-network

  reporter:
    build: .
    command: python reporter.py --port 8003
    ports:
      - "8003:8003"
    environment:
      - PYTHONUNBUFFERED=1
    restart: unless-stopped
    networks:
      - agent-network

networks:
  agent-network:
    driver: bridge

## Don't forget volumes for persistent data if needed

Dockerfile that doesn't suck:

FROM python:3.9-slim

## Install curl for health checks
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*

WORKDIR /app

## Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

## Copy source code
COPY *.py .

## Create non-root user (security best practice)
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser

## Default command (override in docker-compose)
CMD ["python", "coordinator.py"]

Monitoring: Know When Things Break

Jaeger Distributed Tracing Architecture

Add basic monitoring because you need to know when (not if) things break:

## monitoring.py
import time
import logging
from collections import defaultdict

class SimpleMetrics:
    def __init__(self):
        self.request_count = 0
        self.error_count = 0
        self.total_time = 0
        self.last_error = None
        self.start_time = time.time()
    
    def record_request(self, duration, success=True):
        self.request_count += 1
        self.total_time += duration
        if not success:
            self.error_count += 1
    
    def record_error(self, error_msg):
        self.error_count += 1
        self.last_error = error_msg
        logging.error(f"Error recorded: {error_msg}")
    
    def get_stats(self):
        uptime = time.time() - self.start_time
        avg_time = self.total_time / self.request_count if self.request_count > 0 else 0
        error_rate = self.error_count / self.request_count if self.request_count > 0 else 0
        
        return {
            "uptime_seconds": uptime,
            "total_requests": self.request_count,
            "error_count": self.error_count,
            "error_rate": f"{error_rate:.2%}",
            "avg_response_time": f"{avg_time:.2f}s",
            "last_error": self.last_error
        }

## Use in your agents:
metrics = SimpleMetrics()

## In tool functions:
start_time = time.time()
try:
    # do work
    result = do_something()
    metrics.record_request(time.time() - start_time, success=True)
    return result
except Exception as e:
    metrics.record_request(time.time() - start_time, success=False)
    metrics.record_error(str(e))
    raise

Environment Variables: The Sane Way

## .env.production
## Keep it simple - exotic configs break in production

LOG_LEVEL=INFO
MAX_CONCURRENT_TASKS=10  # Start low, increase carefully
REQUEST_TIMEOUT=30       # 30 seconds is enough
HEALTH_CHECK_INTERVAL=60 # Check agents every minute

## Database/Cache (if you add them later)
REDIS_URL=redis://localhost:6379
DATABASE_URL=postgresql://user:pass@localhost/agents

## API Keys (set these as environment variables, not in files)
OPENAI_API_KEY=${OPENAI_API_KEY}
SEARCH_API_KEY=${SEARCH_API_KEY}

Deployment Checklist (From Hard Experience)

Before deploying:

  1. Test locally with Docker - docker-compose up and test everything
  2. Check health endpoints - Every service needs /health
  3. Test restart behavior - Kill containers, see if they recover
  4. Monitor memory usage - docker stats to see which agents are memory hogs
  5. Test network failure - Disconnect containers, see what breaks
  6. Check logs - docker logs container_name should show useful info

Production deployment commands:

## Deploy
docker-compose -f docker-compose.prod.yml up -d

## Check status
docker-compose ps

## Watch logs
docker-compose logs -f

## Restart broken service
docker-compose restart coordinator

## Nuclear option (when everything is broken)
docker-compose down && docker-compose up -d

Common production failures:

  • Agents can't reach each other → Check Docker networking
  • Memory leaks crash containers → Add memory limits
  • Logs fill up disk → Configure log rotation
  • Health checks fail → Increase timeout values
  • Everything works then stops → Check resource limits

Testing and deployment will humble you. The code that works perfectly on your laptop will find new ways to break in production. Plan accordingly.

Resources That Actually Matter

Related Tools & Recommendations

integration
Similar content

Multi-Agent MCP Architecture: Building Robust AI Agent Networks

Building Agent Networks That Actually Work (Without Losing Your Sanity)

Anthropic Model Context Protocol (MCP)
/integration/anthropic-mcp-multi-agent-architecture/enterprise-multi-agent-architecture
100%
tool
Similar content

LangGraph: Build Robust AI Agents That Remember & Adapt

Build AI agents that remember what they were doing and can handle complex workflows without falling apart when shit gets weird.

LangGraph
/tool/langgraph/overview
63%
howto
Similar content

Multi-Agent AI Systems: Setup, Build & Debug for Production

Stop fighting with single AI agents and build systems that might actually work in production (if you're lucky)

LangGraph
/howto/setup-multi-agent-ai-architecture/complete-setup-guide
61%
tool
Popular choice

jQuery - The Library That Won't Die

Explore jQuery's enduring legacy, its impact on web development, and the key changes in jQuery 4.0. Understand its relevance for new projects in 2025.

jQuery
/tool/jquery/overview
52%
integration
Popular choice

Sync Notion with GitHub Projects Without Losing Your Mind

Your dev team uses Notion for planning and GitHub for actual work. Keeping them in sync manually is a special kind of hell.

Notion
/integration/notion-github-projects/bidirectional-sync-architecture
48%
tool
Popular choice

OpenAI API Enterprise - The Expensive Tier That Actually Works When It Matters

For companies that can't afford to have their AI randomly shit the bed during business hours

OpenAI API Enterprise
/tool/openai-api-enterprise/overview
46%
howto
Popular choice

Migrate JavaScript to TypeScript Without Losing Your Mind

A battle-tested guide for teams migrating production JavaScript codebases to TypeScript

JavaScript
/howto/migrate-javascript-project-typescript/complete-migration-guide
43%
compare
Popular choice

Python vs JavaScript vs Go vs Rust - Production Reality Check

What Actually Happens When You Ship Code With These Languages

/compare/python-javascript-go-rust/production-reality-check
41%
tool
Popular choice

Claude Computer Use - Claude Can See Your Screen and Click Stuff

I've watched Claude take over my desktop - it screenshots, figures out what's clickable, then starts clicking like a caffeinated intern. Sometimes brilliant, so

Claude Computer Use
/tool/claude-computer-use/overview
39%
news
Popular choice

Samsung Wins 'Oscars of Innovation' for Revolutionary Cooling Tech

South Korean tech giant and Johns Hopkins develop Peltier cooling that's 75% more efficient than current technology

Technology News Aggregation
/news/2025-08-25/samsung-peltier-cooling-award
37%
news
Similar content

GitHub Copilot Agents Panel Launches: AI Assistant Everywhere

AI Coding Assistant Now Accessible from Anywhere on GitHub Interface

General Technology News
/news/2025-08-24/github-copilot-agents-panel-launch
35%
tool
Similar content

Cursor Background Agents & Bugbot Troubleshooting Guide

Troubleshoot common issues with Cursor Background Agents and Bugbot. Solve 'context too large' errors, fix GitHub integration problems, and optimize configurati

Cursor
/tool/cursor/agents-troubleshooting
35%
tool
Similar content

AWS AgentCore: The Agentic AI Revolution & Production AI Agents

Explore AWS AgentCore, Amazon's new AI infrastructure for building production-ready AI agents. Learn about technical realities, strategy, and how AgentCore migh

Amazon Web Services AI/ML Services
/tool/aws-ai-ml-services/agentic-ai-revolution-2025
35%
tool
Similar content

Microsoft Copilot Studio: Debugging, Performance & Troubleshooting Guide

Master Microsoft Copilot Studio debugging. Learn to troubleshoot agents, fix generative answers, optimize performance, manage credits, and resolve common produc

Microsoft Copilot Studio
/tool/microsoft-copilot-studio/troubleshooting-guide
35%
news
Popular choice

AI Systems Generate Working CVE Exploits in 10-15 Minutes - August 22, 2025

Revolutionary cybersecurity research demonstrates automated exploit creation at unprecedented speed and scale

GitHub Copilot
/news/2025-08-22/ai-exploit-generation
35%
news
Popular choice

NVIDIA Spectrum-XGS Ethernet: Revolutionary Scale-Across Technology - August 22, 2025

Breakthrough networking infrastructure connects distributed data centers into giga-scale AI super-factories

GitHub Copilot
/news/2025-08-22/nvidia-spectrum-xgs-ethernet
35%
tool
Popular choice

Longhorn - Distributed Storage for Kubernetes That Doesn't Suck

Explore Longhorn, the distributed block storage solution for Kubernetes. Understand its architecture, installation steps, and system requirements for your clust

Longhorn
/tool/longhorn/overview
35%
troubleshoot
Popular choice

Docker Won't Start on Windows 11? Here's How to Fix That Garbage

Stop the whale logo from spinning forever and actually get Docker working

Docker Desktop
/troubleshoot/docker-daemon-not-running-windows-11/daemon-startup-issues
35%
tool
Popular choice

Xata - Because Cloning Databases Shouldn't Take All Day

Explore Xata's innovative approach to database branching. Learn how it enables instant, production-like development environments without compromising data priva

Xata
/tool/xata/overview
35%
news
Popular choice

Cloudflare AI Week 2025 - New Tools to Stop Employees from Leaking Data to ChatGPT

Cloudflare Built Shadow AI Detection Because Your Devs Keep Using Unauthorized AI Tools

General Technology News
/news/2025-08-24/cloudflare-ai-week-2025
35%

Recommendations combine user behavior, content similarity, research intelligence, and SEO optimization