Custom Transport Mechanisms¶
This document describes how to implement and use custom transport mechanisms in MCP Server Fuzzer, addressing GitHub Issue #9.
Overview¶
MCP Server Fuzzer now supports custom transport mechanisms that can be registered and used alongside built-in transports (HTTP, SSE, STDIO, StreamableHTTP). This allows users to implement support for proprietary protocols, specialized communication patterns, or integration with specific MCP server implementations.
Architecture¶
The custom transport system consists of:
- Transport Registry: A centralized registry for managing custom transport implementations.
 - Factory Integration: Automatic discovery and instantiation of custom transports.
 - Configuration Support: Declarative configuration of custom transports via YAML.
 - Type Safety: Full type checking and validation for custom transport implementations.
 
Implementing a Custom Transport¶
1. Create a Transport Class¶
Your custom transport must inherit from TransportProtocol and implement all required abstract methods:
from mcp_fuzzer.transport.base import TransportProtocol
from typing import Any, Dict, Optional, AsyncIterator
import asyncio
class MyCustomTransport(TransportProtocol):
    def __init__(self, endpoint: str, **kwargs):
        self.endpoint = endpoint
        # Initialize your transport-specific configuration
    async def connect(self) -> None:
        """Establish connection to the transport."""
        # Implement connection logic
        pass
    async def disconnect(self) -> None:
        """Close connection to the transport."""
        # Implement disconnection logic
        pass
    async def send_request(
        self, method: str, params: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Send a JSON-RPC request and return the result object."""
        # Implement request sending logic
        # Must return the JSON-RPC result (not the full envelope)
        pass
    async def send_raw(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """Send a raw payload and return the response."""
        # Implement raw payload sending logic
        pass
    async def send_notification(
        self, method: str, params: Optional[Dict[str, Any]] = None
    ) -> None:
        """Send a JSON-RPC notification (fire-and-forget)."""
        # Implement notification sending logic
        pass
    async def _stream_request(
        self, payload: Dict[str, Any]
    ) -> AsyncIterator[Dict[str, Any]]:
        """Implement streaming request functionality."""
        # Implement streaming logic
        # Yield response chunks as they arrive
        if False:  # Replace with actual streaming logic
            yield {}
2. Register the Transport¶
Option A: Programmatic Registration¶
from mcp_fuzzer.transport import register_custom_transport
# Register your custom transport
register_custom_transport(
    name="mytransport",
    transport_class=MyCustomTransport,
    description="My custom MCP transport implementation",
    config_schema={
        "type": "object",
        "properties": {
            "timeout": {"type": "number"},
            "retries": {"type": "integer"}
        }
    }
)
Option B: Configuration File Registration¶
Add to your mcp-fuzzer.yaml configuration file:
custom_transports:
  mytransport:
    module: "my_package.transports"
    class: "MyCustomTransport"
    description: "My custom MCP transport implementation"
    config:
      timeout: 30
      retries: 3
3. Use the Custom Transport¶
Once registered, your custom transport can be used in several ways:
Via URL Scheme¶
from mcp_fuzzer.transport import create_transport
# Use custom transport with URL scheme
transport = create_transport("mytransport://my-endpoint")
Via Factory Function¶
from mcp_fuzzer.transport import create_custom_transport
# Create custom transport instance directly
transport = create_custom_transport("mytransport", "my-endpoint")
Via Configuration¶
Example: WebSocket Transport¶
Here's a complete example of a WebSocket-based transport implementation:
import asyncio
import json
import websockets
from typing import Any, Dict, Optional, AsyncIterator
from mcp_fuzzer.transport.base import TransportProtocol
class WebSocketTransport(TransportProtocol):
    def __init__(self, url: str, timeout: float = 30.0):
        self.url = url
        self.timeout = timeout
        self.websocket = None
        self._request_id = 1
    async def connect(self) -> None:
        """Establish WebSocket connection."""
        try:
            self.websocket = await websockets.connect(
                self.url,
                extra_headers={"Content-Type": "application/json"}
            )
        except Exception as e:
            raise ConnectionError(f"Failed to connect to {self.url}: {e}")
    async def disconnect(self) -> None:
        """Close WebSocket connection."""
        if self.websocket:
            await self.websocket.close()
            self.websocket = None
    async def send_request(
        self, method: str, params: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Send JSON-RPC request over WebSocket."""
        if not self.websocket:
            await self.connect()
        request_id = self._request_id
        self._request_id += 1
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": request_id
        }
        await self.websocket.send(json.dumps(payload))
        # Wait for response
        response_text = await asyncio.wait_for(
            self.websocket.recv(), timeout=self.timeout
        )
        response = json.loads(response_text)
        if "error" in response:
            raise Exception(f"Server error: {response['error']}")
        return response.get("result", {})
    async def send_raw(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """Send raw payload over WebSocket."""
        if not self.websocket:
            await self.connect()
        await self.websocket.send(json.dumps(payload))
        response_text = await asyncio.wait_for(
            self.websocket.recv(), timeout=self.timeout
        )
        return json.loads(response_text)
    async def send_notification(
        self, method: str, params: Optional[Dict[str, Any]] = None
    ) -> None:
        """Send JSON-RPC notification over WebSocket."""
        if not self.websocket:
            await self.connect()
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {}
        }
        await self.websocket.send(json.dumps(payload))
    async def _stream_request(
        self, payload: Dict[str, Any]
    ) -> AsyncIterator[Dict[str, Any]]:
        """Stream responses over WebSocket."""
        if not self.websocket:
            await self.connect()
        await self.websocket.send(json.dumps(payload))
        try:
            while True:
                response_text = await asyncio.wait_for(
                    self.websocket.recv(), timeout=self.timeout
                )
                response = json.loads(response_text)
                yield response
        except asyncio.TimeoutError:
            return
Configuration Schema¶
Custom transports can define their own configuration schemas for validation:
config_schema = {
    "type": "object",
    "properties": {
        "url": {"type": "string"},
        "timeout": {"type": "number", "default": 30.0},
        "retries": {"type": "integer", "default": 3},
        "headers": {
            "type": "object",
            "additionalProperties": {"type": "string"}
        }
    },
    "required": ["url"]
}
Error Handling¶
Custom transports should follow these error handling patterns:
- Connection Errors: Raise 
ConnectionErrorfor connectivity issues - Protocol Errors: Raise 
ValueErrorfor invalid protocol usage - Server Errors: Include server error details in raised exceptions
 - Timeouts: Use appropriate timeout handling for operations
 
Testing Custom Transports¶
Create unit tests for your custom transport:
import pytest
from mcp_fuzzer.transport import register_custom_transport
def test_custom_transport_registration():
    """Test that custom transport can be registered and used."""
    register_custom_transport(
        name="test_transport",
        transport_class=MyCustomTransport,
        description="Test transport"
    )
    from mcp_fuzzer.transport import create_transport
    transport = create_transport("test_transport://test-endpoint")
    assert isinstance(transport, MyCustomTransport)
Integration with MCP Fuzzer¶
Custom transports integrate seamlessly with the fuzzing framework:
from mcp_fuzzer.transport import create_transport
from mcp_fuzzer.client import MCPClient
# Create custom transport
transport = create_transport("mytransport://server-endpoint")
# Use with MCP client
client = MCPClient(transport)
tools = await client.list_tools()
Best Practices¶
- Inherit from TransportProtocol: Ensure your transport implements all required methods
 - Handle Connections Properly: Implement connect/disconnect methods for resource management
 - Use Timeouts: Always implement appropriate timeouts for operations
 - Validate Input: Validate method names, parameters, and payloads
 - Log Operations: Use logging to track transport operations for debugging
 - Handle Errors Gracefully: Provide meaningful error messages and proper exception handling
 - Document Configuration: Clearly document any configuration options your transport accepts
 
Troubleshooting¶
Common Issues¶
- Import Errors: Ensure your transport module is on the Python path.
 - Registration Failures: Check that your transport class inherits from 
TransportProtocol. - Connection Issues: Verify endpoint URLs and network connectivity.
 - Configuration Errors: Validate your YAML configuration against the schema
 
Debug Logging¶
Enable debug logging to troubleshoot transport issues:
This will show detailed logs of transport operations, including custom transport registration and usage.