Plugin API Reference¶
This document provides detailed API reference for the plugin component.
Core Classes¶
PrometheusMixin¶
The core mixin class that provides Prometheus functionality to all worker types.
class PrometheusMixin:
"""Mixin class that adds Prometheus metrics functionality to any worker type."""
def __init__(self, *args, **kwargs):
# Call the parent class's __init__ first
super().__init__(*args, **kwargs)
# Setup logging when worker is initialized
_setup_logging()
self.start_time = time.time()
# Create a unique worker ID using worker age and timestamp
# Format: worker_<age>_<timestamp>
self.worker_id = f"worker_{self.age}_{int(self.start_time)}"
self.process = psutil.Process()
# Initialize request counter
self._request_count = 0
Properties:
worker_id
- Unique worker identifierstart_time
- Worker start timestampprocess
- psutil Process instance_request_count
- Internal request counter
Methods:
update_worker_metrics()
- Update worker-specific metrics_handle_request_metrics(start_time=None)
- Handle request metrics tracking_handle_request_error_metrics(req, e, start_time=None)
- Handle request error metrics_extract_request_from_args(args)
- Extract request object from method arguments_extract_request_info(req)
- Extract method and endpoint information_generic_handle_request(parent_method, *args, **kwargs)
- Generic request handler wrapper_generic_handle_error(req, client, addr, e)
- Generic error handler wrapper_generic_handle_quit(sig, frame)
- Generic quit handler wrapper_generic_handle_abort(sig, frame)
- Generic abort handler wrapper_clear_old_metrics()
- Clear old PID-based worker samples
Worker Classes¶
PrometheusWorker¶
Sync worker with Prometheus metrics.
class PrometheusWorker(PrometheusMixin, SyncWorker):
"""Sync worker with Prometheus metrics."""
def handle_request(self, listener, req, client, addr):
"""Handle a request and update metrics."""
return self._generic_handle_request(
super().handle_request, listener, req, client, addr
)
def handle_error(self, req, client, addr, e):
"""Handle request errors and update error metrics."""
return self._generic_handle_error(req, client, addr, e)
def handle_quit(self, sig, frame):
"""Handle quit signal and update worker state."""
self._generic_handle_quit(sig, frame)
def handle_abort(self, sig, frame):
"""Handle abort signal and update worker state."""
self._generic_handle_abort(sig, frame)
PrometheusThreadWorker¶
Thread worker with Prometheus metrics.
class PrometheusThreadWorker(PrometheusMixin, ThreadWorker):
"""Thread worker with Prometheus metrics."""
def handle_request(self, req, conn):
"""Handle a request and update metrics."""
return self._generic_handle_request(super().handle_request, req, conn)
def handle_error(self, req, client, addr, e):
"""Handle request errors and update error metrics."""
return self._generic_handle_error(req, client, addr, e)
def handle_quit(self, sig, frame):
"""Handle quit signal and update worker state."""
self._generic_handle_quit(sig, frame)
def handle_abort(self, sig, frame):
"""Handle abort signal and update worker state."""
self._generic_handle_abort(sig, frame)
PrometheusEventletWorker¶
Eventlet worker with Prometheus metrics.
class PrometheusEventletWorker(PrometheusMixin, EventletWorker):
"""Eventlet worker with Prometheus metrics."""
def handle_request(self, listener_name, req, sock, addr):
"""Handle a request and update metrics."""
return self._generic_handle_request(
super().handle_request, listener_name, req, sock, addr
)
def handle_error(self, req, client, addr, e):
"""Handle request errors and update error metrics."""
return self._generic_handle_error(req, client, addr, e)
def handle_quit(self, sig, frame):
"""Handle quit signal and update worker state."""
self._generic_handle_quit(sig, frame)
def handle_abort(self, sig, frame):
"""Handle abort signal and update worker state."""
self._generic_handle_abort(sig, frame)
PrometheusGeventWorker¶
Gevent worker with Prometheus metrics.
class PrometheusGeventWorker(PrometheusMixin, GeventWorker):
"""Gevent worker with Prometheus metrics."""
def handle_request(self, listener_name, req, sock, addr):
"""Handle a request and update metrics."""
return self._generic_handle_request(
super().handle_request, listener_name, req, sock, addr
)
def handle_error(self, req, client, addr, e):
"""Handle request errors and update error metrics."""
return self._generic_handle_error(req, client, addr, e)
def handle_quit(self, sig, frame):
"""Handle quit signal and update worker state."""
self._generic_handle_quit(sig, frame)
def handle_abort(self, sig, frame):
"""Handle abort signal and update worker state."""
self._generic_handle_abort(sig, frame)
Metrics Collection¶
Worker Metrics¶
The plugin collects comprehensive worker metrics:
Memory Usage¶
def update_worker_metrics(self):
"""Update worker metrics."""
memory_info = self.process.memory_info()
WORKER_MEMORY.labels(worker_id=self.worker_id).set(memory_info.rss)
CPU Usage¶
def update_worker_metrics(self):
"""Update worker metrics."""
cpu_percent = self.process.cpu_percent()
WORKER_CPU.labels(worker_id=self.worker_id).set(cpu_percent)
Uptime¶
def update_worker_metrics(self):
"""Update worker metrics."""
uptime = time.time() - self.start_time
WORKER_UPTIME.labels(worker_id=self.worker_id).set(uptime)
Worker State¶
def update_worker_metrics(self):
"""Update worker metrics."""
timestamp = int(time.time())
WORKER_STATE.labels(
worker_id=self.worker_id, state="running", timestamp=timestamp
).set(1)
Request Metrics¶
Request Counting¶
def _handle_request_metrics(self, start_time=None):
"""Handle request metrics tracking."""
self._request_count += 1
WORKER_REQUESTS.labels(worker_id=self.worker_id).inc()
Request Duration¶
def _handle_request_metrics(self, start_time=None):
"""Handle request metrics tracking."""
duration = time.time() - start_time
WORKER_REQUEST_DURATION.labels(worker_id=self.worker_id).observe(duration)
Error Metrics¶
Failed Requests¶
def _handle_request_error_metrics(self, req, e, start_time=None):
"""Handle request error metrics tracking."""
method, endpoint = self._extract_request_info(req)
error_type = type(e).__name__
WORKER_FAILED_REQUESTS.labels(
worker_id=self.worker_id,
method=method,
endpoint=endpoint,
error_type=error_type,
).inc()
Error Handling¶
def _generic_handle_error(self, req, client, addr, e):
"""Generic error handler wrapper."""
method, endpoint = self._extract_request_info(req)
error_type = type(e).__name__
WORKER_ERROR_HANDLING.labels(
worker_id=self.worker_id,
method=method,
endpoint=endpoint,
error_type=error_type,
).inc()
Worker State Management¶
Quit Signal Handling¶
def _generic_handle_quit(self, sig, frame):
"""Generic quit handler wrapper."""
timestamp = int(time.time())
WORKER_STATE.labels(
worker_id=self.worker_id, state="quitting", timestamp=timestamp
).set(1)
WORKER_RESTART_REASON.labels(worker_id=self.worker_id, reason="quit").inc()
WORKER_RESTART_COUNT.labels(
worker_id=self.worker_id, restart_type="graceful", reason="quit"
).inc()
Abort Signal Handling¶
def _generic_handle_abort(self, sig, frame):
"""Generic abort handler wrapper."""
timestamp = int(time.time())
WORKER_STATE.labels(
worker_id=self.worker_id, state="aborting", timestamp=timestamp
).set(1)
WORKER_RESTART_REASON.labels(worker_id=self.worker_id, reason="abort").inc()
WORKER_RESTART_COUNT.labels(
worker_id=self.worker_id, restart_type="forced", reason="abort"
).inc()
Request Processing¶
Generic Request Handler¶
def _generic_handle_request(self, parent_method, *args, **kwargs):
"""Generic request handler wrapper."""
start_time = time.time()
try:
# Update worker metrics on each request
self.update_worker_metrics()
# Call parent handle_request
result = parent_method(*args, **kwargs)
# Update request metrics after successful request
self._handle_request_metrics(start_time)
return result
except Exception as e:
# Handle request error metrics
req = self._extract_request_from_args(args)
self._handle_request_error_metrics(req, e, start_time)
raise
Request Information Extraction¶
def _extract_request_from_args(self, args):
"""Extract request object from method arguments."""
for arg in args:
# Check if this looks like a request object
if hasattr(arg, "method") and hasattr(arg, "path"):
return arg
return None
def _extract_request_info(self, req):
"""Extract method and endpoint information from request object."""
method = req.method if req and hasattr(req, "method") else "UNKNOWN"
endpoint = req.path if req and hasattr(req, "path") else "UNKNOWN"
return method, endpoint
Metrics Cleanup¶
Old Metrics Cleanup¶
def _clear_old_metrics(self):
"""Clear only the old PID-based worker samples."""
for MetricClass in [
WORKER_REQUESTS,
WORKER_REQUEST_DURATION,
WORKER_MEMORY,
WORKER_CPU,
WORKER_UPTIME,
WORKER_FAILED_REQUESTS,
WORKER_ERROR_HANDLING,
WORKER_STATE,
WORKER_RESTART_REASON,
WORKER_RESTART_COUNT,
]:
metric = MetricClass._metric
labelnames = list(metric._labelnames)
# Collect the old label-tuples to delete
to_delete = []
for label_values in list(metric._metrics.keys()):
try:
wid = label_values[labelnames.index("worker_id")]
except ValueError:
continue
# Check if this is an old worker ID (different from current)
if wid != self.worker_id:
to_delete.append(label_values)
# Delete the old samples
for label_values in to_delete:
metric.remove(*label_values)
Async Worker Support¶
Eventlet Worker Creation¶
def _create_eventlet_worker():
"""Create PrometheusEventletWorker class if available."""
global PrometheusEventletWorker
if EVENTLET_AVAILABLE:
try:
from gunicorn.workers.geventlet import EventletWorker
class PrometheusEventletWorker(PrometheusMixin, EventletWorker):
# ... implementation
except (ImportError, RuntimeError):
PrometheusEventletWorker = None
Gevent Worker Creation¶
def _create_gevent_worker():
"""Create PrometheusGeventWorker class if available."""
global PrometheusGeventWorker
if GEVENT_AVAILABLE:
try:
from gunicorn.workers.ggevent import GeventWorker
class PrometheusGeventWorker(PrometheusMixin, GeventWorker):
# ... implementation
except (ImportError, RuntimeError):
PrometheusGeventWorker = None
Utility Functions¶
Worker Class Getters¶
def get_prometheus_eventlet_worker():
"""Get PrometheusEventletWorker class if available."""
return PrometheusEventletWorker
def get_prometheus_gevent_worker():
"""Get PrometheusGeventWorker class if available."""
return PrometheusGeventWorker
Configuration¶
Logging Setup¶
The plugin component uses the singleton configuration pattern for logging setup:
from gunicorn_prometheus_exporter.config import config
def _setup_logging():
"""Setup logging with configuration."""
try:
log_level = config.get_gunicorn_config().get("loglevel", "INFO").upper()
logging.basicConfig(level=getattr(logging, log_level))
except Exception as e:
# Fallback to INFO level if config is not available
logging.basicConfig(level=logging.INFO)
logging.getLogger(__name__).warning(
"Could not setup logging from config: %s", e
)
Configuration Access¶
The plugin component accesses configuration through the global singleton:
# Import the global config instance
from gunicorn_prometheus_exporter.config import config
# Access configuration values
log_level = config.get_gunicorn_config().get("loglevel", "INFO").upper()
Error Handling¶
Exception Handling¶
All metrics operations are wrapped in try-catch blocks to ensure worker stability:
try:
# Metric operation
WORKER_REQUESTS.labels(worker_id=self.worker_id).inc()
except Exception as e:
logger.error("Failed to update metric: %s", e)
Graceful Degradation¶
If metrics collection fails, the worker continues to function normally:
def update_worker_metrics(self):
"""Update worker metrics."""
try:
# Update metrics
memory_info = self.process.memory_info()
WORKER_MEMORY.labels(worker_id=self.worker_id).set(memory_info.rss)
except Exception as e:
logger.error("Failed to update worker metrics: %s", e)
Related Documentation¶
- Plugin Overview - Plugin component overview
- Metrics Component - Metrics collection and monitoring
- Configuration Guide - Configuration options