Backend Architecture¶
Overview¶
The Gunicorn Prometheus Exporter implements a complete Redis-based storage backend that extends the Prometheus Python client to support distributed metrics storage. This implementation follows Prometheus multiprocess specifications while providing enhanced scalability and separation of concerns.
System Architecture Flow¶
flowchart TD
A[Gunicorn Application] --> B[Prometheus Worker]
B --> C[Redis Storage Backend]
C --> D[Redis Server]
E[Prometheus Scraper] --> F[Metrics Endpoint]
F --> G[RedisMultiProcessCollector]
G --> H[Redis Storage Backend]
H --> D
I[Configuration] --> J[Redis Enabled?]
J -->|Yes| C
J -->|No| K[File Storage Fallback]
K --> L[MultiProcessCollector]
L --> M[File System]
style A fill:#e1f5fe
style C fill:#f3e5f5
style D fill:#ffebee
style G fill:#e8f5e8
style K fill:#fff3e0
Prometheus Specification Implementation¶
Multiprocess Protocol Compliance¶
Our implementation fully complies with the Prometheus multiprocess specification:
flowchart LR
A[Prometheus Client] --> B[Value Class]
B --> C[Storage Backend]
C --> D[Redis Storage]
E[Collector] --> F[Read All Values]
F --> G[Aggregate by Mode]
G --> H[Return Metrics]
I[Multiprocess Modes] --> J[all]
I --> K[liveall]
I --> L[max]
I --> M[min]
I --> N[sum]
I --> O[mostrecent]
style A fill:#e1f5fe
style D fill:#f3e5f5
style I fill:#e8f5e8
Value Class Replacement Flow¶
sequenceDiagram
participant PC as Prometheus Client
participant RV as RedisValue
participant RSD as RedisStorageDict
participant R as Redis Server
PC->>RV: Create metric value
RV->>RSD: Store value
RSD->>R: Write to Redis
R-->>RSD: Confirm write
RSD-->>RV: Return success
RV-->>PC: Value created
Note over PC,R: RedisValue replaces MmapedValue
PC->>RV: Increment value
RV->>RSD: Update value
RSD->>R: Atomic increment
R-->>RSD: New value
RSD-->>RV: Updated value
RV-->>PC: Increment complete
Architecture Components¶
Core Components¶
Our backend consists of several key components that work together to provide seamless Redis integration:
1. RedisStorageClient (backend/core/client.py
)¶
The main client for Redis operations, providing:
- Connection Management: Handles Redis connections with error handling
- Key Generation: Creates structured Redis keys with embedded process information
- Value Operations: Implements read/write operations for metric data
- Metadata Management: Handles metric metadata storage and retrieval
- TTL Support: Automatic key expiration for cleanup
class RedisStorageClient:
def __init__(self, redis_client: RedisClientProtocol, key_prefix: str = "gunicorn"):
self._redis_client = redis_client
self._key_prefix = key_prefix
self._redis_dict = RedisStorageDict(redis_client, key_prefix)
2. RedisStorageDict (backend/core/client.py
)¶
Storage abstraction layer implementing Prometheus multiprocess protocols:
- Protocol Compliance: Implements
StorageDictProtocol
for Prometheus compatibility - Multiprocess Mode Support: Handles all Prometheus multiprocess modes
- Atomic Operations: Thread-safe metric updates using Redis transactions
- Error Handling: Graceful fallback mechanisms
class RedisStorageDict:
def read_value(self, key: str, metric_type: str = "counter", multiprocess_mode: str = "") -> Tuple[float, float]:
# Returns (value, timestamp) tuple
def write_value(self, key: str, value: float, timestamp: float, metric_type: str = "counter", multiprocess_mode: str = "") -> None:
# Writes metric value with timestamp
3. RedisMultiProcessCollector (backend/core/collector.py
)¶
Collector that aggregates metrics from Redis across multiple processes:
- Metric Aggregation: Implements Prometheus multiprocess aggregation logic
- Process Discovery: Scans Redis for metric keys from all processes
- Mode Handling: Correctly processes different multiprocess modes
- Label Preservation: Maintains all metric labels and metadata
class RedisMultiProcessCollector:
def collect(self) -> Generator[Metric, None, None]:
# Yields aggregated metrics from Redis
def _read_metrics_from_redis(self) -> Dict[str, Any]:
# Reads all metrics from Redis storage
4. RedisValue (backend/core/values.py
)¶
Redis-backed value implementation for individual metrics:
- Value Storage: Stores individual metric values in Redis
- Timestamp Management: Tracks metric timestamps
- Exemplar Support: Handles exemplar data for tracing
- Process Cleanup: Supports process-specific cleanup operations
class RedisValue:
def inc(self, amount: float = 1.0) -> None:
# Increment counter value
def set(self, value: float) -> None:
# Set gauge value
def get(self) -> float:
# Get current value
5. RedisStorageManager (backend/service/manager.py
)¶
Service layer managing Redis connections and lifecycle:
- Connection Management: Creates and manages Redis connections
- Value Class Factory: Creates Redis-backed value classes
- Collector Management: Provides Redis-based collectors
- Lifecycle Management: Handles setup and teardown operations
class RedisStorageManager:
def setup(self) -> None:
# Initialize Redis storage backend
def get_collector(self) -> RedisMultiProcessCollector:
# Returns Redis-based collector
def teardown(self) -> None:
# Clean up Redis resources
Redis Key Architecture¶
Key Structure¶
We use a structured key format that embeds process information and multiprocess modes:
Key Generation Flow¶
flowchart TD
A[Metric Creation] --> B[Extract Components]
B --> C[Metric Name]
B --> D[Labels]
B --> E[Help Text]
C --> F[Generate Hash]
D --> F
E --> F
F --> G[Create Redis Key]
G --> H[gunicorn:type_mode:pid:data:hash]
I[Process ID] --> G
J[Metric Type] --> G
K[Multiprocess Mode] --> G
style A fill:#e1f5fe
style F fill:#f3e5f5
style H fill:#e8f5e8
Key Components¶
gunicorn
: Fixed prefix for all keys{metric_type}_{mode}
: Metric type and multiprocess mode (e.g.,gauge_all
,counter
){pid}
: Process ID for process isolation{data_type}
: Eithermetric
ormeta
for data vs metadata{hash}
: MD5 hash of the original metric key for stability
Examples¶
gunicorn:gauge_all:12345:metric:abc123def456
gunicorn:counter:12345:meta:def456ghi789
gunicorn:histogram:12345:metric:ghi789jkl012
Key Generation¶
Keys are generated using deterministic hashing:
def _get_metric_key(self, key: str, metric_type: str = "counter", multiprocess_mode: str = "") -> str:
import os
import hashlib
pid = os.getpid()
key_hash = hashlib.md5(key.encode("utf-8"), usedforsecurity=False).hexdigest()
# Include multiprocess mode in key structure for gauge metrics
if metric_type == "gauge" and multiprocess_mode:
type_with_mode = f"{metric_type}_{multiprocess_mode}"
else:
type_with_mode = metric_type
return f"{self._key_prefix}:{type_with_mode}:{pid}:metric:{key_hash}"
Data Storage Format¶
Metric Data Storage¶
Metrics are stored as Redis hashes with the following structure:
HSET gunicorn:gauge_all:12345:metric:abc123def456
value "42.5"
timestamp "1640995200.123"
updated_at "1640995200.123"
Metadata Storage¶
Metadata is stored separately for efficient querying:
HSET gunicorn:gauge_all:12345:meta:abc123def456
multiprocess_mode "all"
metric_name "gunicorn_worker_memory_bytes"
labelnames "worker_id,pid"
help_text "Memory usage per worker"
original_key "[\"gunicorn_worker_memory_bytes\",\"memory\",{\"worker_id\":\"worker_1_1640995200\",\"pid\":\"12345\"},\"Memory usage per worker\"]"
Prometheus Specification Compliance¶
Multiprocess Mode Support¶
Our implementation correctly handles all Prometheus multiprocess modes:
flowchart TD
A[Multiprocess Mode] --> B[all]
A --> C[liveall]
A --> D[live]
A --> E[max]
A --> F[min]
A --> G[sum]
A --> H[mostrecent]
B --> I[All processes including dead]
C --> J[Filter live processes]
D --> K[Default live processes]
E --> L[Redis MAX operation]
F --> M[Redis MIN operation]
G --> N[Redis SUM operation]
H --> O[Timestamp-based selection]
style A fill:#e1f5fe
style I fill:#f3e5f5
style J fill:#e8f5e8
style K fill:#e8f5e8
style L fill:#fff3e0
style M fill:#fff3e0
style N fill:#fff3e0
style O fill:#fff3e0
Mode | Description | Implementation |
---|---|---|
all | All processes (including dead ones) | Stores all metric instances with PID labels |
liveall | All live processes | Filters out dead processes during collection |
live | Only live processes (default) | Same as liveall for our use case |
max | Maximum value across processes | Aggregates using Redis MAX operations |
min | Minimum value across processes | Aggregates using Redis MIN operations |
sum | Sum of values across processes | Aggregates using Redis SUM operations |
mostrecent | Most recent value | Uses timestamp-based selection |
Metric Type Handling¶
Full support for all Prometheus metric types:
- Counters: Monotonic increasing values
- Gauges: Values that can go up and down
- Histograms: Distribution of values in buckets
- Summaries: Quantile-based aggregations
Label Preservation¶
All metric labels and metadata are preserved:
- Label Names: Stored in metadata for reconstruction
- Label Values: Embedded in Redis keys and metadata
- Help Text: Preserved for metric documentation
- Metric Names: Maintained for proper identification
Performance Optimizations¶
Performance Optimization Flow¶
flowchart TD
A[Metrics Collection] --> B[Batch Operations]
B --> C[Redis Pipeline]
C --> D[Execute Batch]
D --> E[Process Results]
F[Key Iteration] --> G[Streaming Collection]
G --> H[Scan Iter in Batches]
H --> I[Process 100 Keys at Once]
I --> J[Memory Efficient]
K[Read Operations] --> L[Lock-Free Reads]
L --> M[Non-blocking Scan]
M --> N[Per-Key Locking]
N --> O[High Concurrency]
style A fill:#e1f5fe
style C fill:#f3e5f5
style G fill:#e8f5e8
style L fill:#fff3e0
Batch Operations¶
Groups Redis operations for efficiency:
def read_all_values(self) -> Dict[str, Tuple[float, float]]:
# Uses pipeline for batch operations
with self._redis.pipeline() as pipe:
for metric_key in self._redis.scan_iter(match=pattern, count=100):
pipe.hgetall(metric_key)
results = pipe.execute()
Streaming Collection¶
Processes metrics in batches to avoid memory overload:
def _read_metrics_from_redis(self) -> Dict[str, Any]:
# Processes keys in batches of 100
for metric_key in redis_client.scan_iter(match=pattern, count=100):
# Process each key individually
Lock-Free Reads¶
Uses Redis scan_iter
for non-blocking key iteration:
# Non-blocking key iteration
for metric_key in self._redis.scan_iter(match=pattern, count=100):
with self._lock: # Only lock per key, not entire scan
# Process individual key
Metadata Caching¶
Reduces Redis lookups for frequently accessed metadata:
def _get_multiprocess_mode_from_metadata(self, key: str, metric_type: str) -> str:
# Caches metadata lookups to reduce Redis calls
Error Handling and Fallback¶
Graceful Degradation¶
The system gracefully handles Redis unavailability:
def _get_default_redis_client(self) -> Optional[RedisClientProtocol]:
try:
return redis.Redis.from_url(redis_url)
except redis.ConnectionError:
logger.warning("Redis connection failed, falling back to file storage")
return None
Connection Retry Logic¶
Implements retry logic for transient failures:
def _redis_now(self) -> float:
try:
# Try Redis server time first
return self._redis.time()[0]
except Exception:
# Fallback to local time
return time.time()
Comprehensive Error Handling¶
The backend implements multiple layers of error handling:
1. Import-Level Error Handling¶
# Conditional Redis import - only import when needed
try:
import redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
redis = None
2. Connection-Level Error Handling¶
def setup(self) -> bool:
"""Set up Redis-based metrics storage."""
if not config.redis_enabled:
logger.debug("Redis is not enabled, skipping Redis metrics setup")
return False
try:
# Create Redis client
self._redis_client = self._redis_client_factory()
# Test connection
self._redis_client.ping()
logger.debug("Connected to Redis successfully")
return True
except Exception as e:
logger.error("Failed to setup Redis metrics: %s", e)
self._cleanup()
return False
3. Operation-Level Error Handling¶
def _safe_parse_float(data: Union[bytes, bytearray, str, None], default: float = 0.0) -> float:
"""Safely parse bytes/string to float with error handling."""
if data is None:
return default
try:
if isinstance(data, (bytes, bytearray)):
return float(data.decode("utf-8"))
return float(data)
except (ValueError, UnicodeDecodeError):
logger.debug("Failed to parse float value: %s, using default: %s", data, default)
return default
4. Timeout and Retry Logic¶
def _redis_operation_with_retry(self, operation, max_retries=3, delay=0.1):
"""Execute Redis operation with retry logic."""
for attempt in range(max_retries):
try:
return operation()
except (redis.ConnectionError, redis.TimeoutError) as e:
if attempt == max_retries - 1:
logger.error("Redis operation failed after %d attempts: %s", max_retries, e)
raise
logger.warning("Redis operation failed (attempt %d/%d): %s", attempt + 1, max_retries, e)
time.sleep(delay * (2 ** attempt))
5. Fallback Mechanisms¶
def _get_multiprocess_mode_from_metadata(self, key: str, metric_type: str) -> str:
"""Get multiprocess mode from metadata with fallback."""
try:
metadata_key = self._get_metadata_key(key, metric_type)
metadata = self._redis.hgetall(metadata_key)
if metadata:
return _safe_decode_bytes(metadata.get(b"multiprocess_mode", b"")).decode("utf-8")
except Exception as e:
logger.debug("Failed to get multiprocess mode from metadata: %s", e)
# Fallback to default mode
return "all" if metric_type == "gauge" else ""
Error Recovery Strategies¶
Error Recovery Flow¶
flowchart TD
A[Redis Operation] --> B{Success?}
B -->|Yes| C[Return Result]
B -->|No| D[Error Handling]
D --> E[Retry Logic]
E --> F{Max Retries?}
F -->|No| G[Exponential Backoff]
G --> A
F -->|Yes| H[Fallback Strategy]
H --> I[File Storage]
H --> J[Graceful Degradation]
H --> K[Resource Cleanup]
I --> L[MultiProcessCollector]
J --> M[Disable Redis Features]
K --> N[Close Connections]
style A fill:#e1f5fe
style D fill:#ffebee
style H fill:#fff3e0
style L fill:#e8f5e8
1. Automatic Fallback to File Storage¶
When Redis setup fails, the system continues with file-based storage:
def setup(self) -> bool:
"""Set up Redis-based metrics storage."""
if not config.redis_enabled:
logger.debug("Redis is not enabled, skipping Redis metrics setup")
return False
try:
# Create Redis client and test connection
self._redis_client = self._redis_client_factory()
self._redis_client.ping()
# ... setup Redis storage ...
return True
except Exception as e:
logger.error("Failed to setup Redis metrics: %s", e)
self._cleanup()
return False
**Note**: When Redis setup fails, the Prometheus client library continues using its default file-based multiprocess storage mechanism. No additional fallback code is needed.
2. Graceful Degradation of Features¶
def is_redis_enabled(self) -> bool:
"""Check if Redis is enabled and available."""
return (
config.redis_enabled and
REDIS_AVAILABLE and
self._redis_client is not None
)
3. Resource Cleanup on Failure¶
def _cleanup(self) -> None:
"""Clean up Redis resources."""
try:
if self._redis_client:
self._redis_client.close()
except Exception as e:
logger.debug("Error during Redis cleanup: %s", e)
finally:
self._redis_client = None
self._redis_value_class = None
Integration Points¶
Integration Flow¶
flowchart TD
A[Gunicorn Application] --> B[Prometheus Worker]
B --> C[Redis Storage Manager]
C --> D[Redis Value Class]
D --> E[Redis Storage Dict]
E --> F[Redis Server]
G[Prometheus Scraper] --> H[Metrics Endpoint]
H --> I[Redis MultiProcess Collector]
I --> J[Read from Redis]
J --> F
K[Configuration] --> L[Redis Enabled?]
L -->|Yes| C
L -->|No| M[File Storage]
M --> N[MultiProcess Collector]
N --> O[File System]
style A fill:#e1f5fe
style C fill:#f3e5f5
style F fill:#ffebee
style I fill:#e8f5e8
style M fill:#fff3e0
Prometheus Client Integration¶
The backend integrates seamlessly with the Prometheus Python client:
# Replace default multiprocess collector
from gunicorn_prometheus_exporter.backend.service import get_redis_storage_manager
manager = get_redis_storage_manager()
collector = manager.get_collector()
registry.register(collector)
Gunicorn Integration¶
Hooks into Gunicorn's lifecycle for automatic setup:
def redis_when_ready(server):
"""Setup Redis storage when Gunicorn is ready."""
from gunicorn_prometheus_exporter.backend.service import setup_redis_metrics
setup_redis_metrics()
Configuration¶
Environment Variables¶
Variable | Default | Description |
---|---|---|
REDIS_ENABLED | false | Enable Redis storage backend |
REDIS_HOST | 127.0.0.1 | Redis server hostname |
REDIS_PORT | 6379 | Redis server port |
REDIS_DB | 0 | Redis database number |
REDIS_PASSWORD | (none) | Redis password (optional) |
REDIS_KEY_PREFIX | gunicorn | Prefix for Redis keys |
REDIS_TTL_SECONDS | 300 | Key expiration time |
Programmatic Configuration¶
from gunicorn_prometheus_exporter.backend.service import RedisStorageManager
manager = RedisStorageManager(
redis_host="localhost",
redis_port=6379,
redis_db=0,
key_prefix="myapp",
ttl_seconds=600
)
Testing and Validation¶
Unit Tests¶
Comprehensive unit tests cover all components:
- Storage Operations: Read/write operations
- Key Generation: Proper key structure and hashing
- Multiprocess Modes: All mode implementations
- Error Handling: Graceful failure scenarios
- Performance: Batch operations and streaming
Integration Tests¶
System tests validate complete functionality:
- Redis Integration: End-to-end Redis storage
- Multi-Process: Multiple worker processes
- Metric Collection: All metric types
- Prometheus Scraping: Metrics endpoint functionality
Performance Benchmarks¶
Validates performance characteristics:
- Throughput: Metrics per second
- Latency: Read/write operation times
- Memory Usage: Memory consumption patterns
- Scalability: Multi-instance performance
Future Enhancements¶
Planned Features¶
- Redis Cluster Support: Distributed Redis deployment
- Compression: Metric data compression for storage efficiency
- Encryption: Encrypted metric storage
- Advanced Aggregation: Custom aggregation functions
- Metrics Export: Export to other monitoring systems
Extensibility¶
The architecture is designed for extensibility:
- Custom Storage Backends: Pluggable storage implementations
- Custom Aggregators: User-defined aggregation logic
- Custom Collectors: Specialized metric collectors
- Custom Protocols: Alternative storage protocols