Files
calminer/monitoring/__init__.py
zwitschi 4cfc5d9ffa
Some checks failed
CI / lint (push) Successful in 15s
CI / test (push) Failing after 27s
CI / build (push) Has been skipped
fix: Resolve Ruff E402 warnings and clean up imports across multiple modules
2025-11-12 11:10:50 +01:00

118 lines
3.8 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, Query, Response
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from sqlalchemy.orm import Session
from config.database import get_db
from services.metrics import MetricsService
router = APIRouter(prefix="/metrics", tags=["monitoring"])
@router.get("", summary="Prometheus metrics endpoint", include_in_schema=False)
async def metrics_endpoint() -> Response:
payload = generate_latest()
return Response(content=payload, media_type=CONTENT_TYPE_LATEST)
@router.get("/performance", summary="Get performance metrics")
async def get_performance_metrics(
metric_name: Optional[str] = Query(
None, description="Filter by metric name"),
hours: int = Query(24, description="Hours back to look"),
db: Session = Depends(get_db),
) -> dict:
"""Get aggregated performance metrics."""
service = MetricsService(db)
start_time = datetime.utcnow() - timedelta(hours=hours)
if metric_name:
metrics = service.get_metrics(
metric_name=metric_name, start_time=start_time)
aggregated = service.get_aggregated_metrics(
metric_name, start_time=start_time)
return {
"metric_name": metric_name,
"period_hours": hours,
"aggregated": aggregated,
"recent_samples": [
{
"timestamp": m.timestamp.isoformat(),
"value": m.value,
"labels": m.labels,
"endpoint": m.endpoint,
"method": m.method,
"status_code": m.status_code,
"duration_seconds": m.duration_seconds,
}
for m in metrics[:50] # Last 50 samples
],
}
# Return summary for all metrics
all_metrics = service.get_metrics(start_time=start_time, limit=1000)
metric_types = {}
for m in all_metrics:
if m.metric_name not in metric_types:
metric_types[m.metric_name] = []
metric_types[m.metric_name].append(m.value)
summary = {}
for name, values in metric_types.items():
summary[name] = {
"count": len(values),
"avg": sum(values) / len(values) if values else 0,
"min": min(values) if values else 0,
"max": max(values) if values else 0,
}
return {
"period_hours": hours,
"summary": summary,
}
@router.get("/health", summary="Detailed health check with metrics")
async def detailed_health(db: Session = Depends(get_db)) -> dict:
"""Get detailed health status with recent metrics."""
service = MetricsService(db)
last_hour = datetime.utcnow() - timedelta(hours=1)
# Get request metrics from last hour
request_metrics = service.get_metrics(
metric_name="http_request", start_time=last_hour
)
if request_metrics:
durations = []
error_count = 0
for m in request_metrics:
if m.duration_seconds is not None:
durations.append(m.duration_seconds)
if m.status_code is not None:
if m.status_code >= 400:
error_count += 1
total_requests = len(request_metrics)
avg_duration = sum(durations) / len(durations) if durations else 0
error_rate = error_count / total_requests if total_requests > 0 else 0
else:
avg_duration = 0
error_rate = 0
total_requests = 0
return {
"status": "ok",
"timestamp": datetime.utcnow().isoformat(),
"metrics": {
"requests_last_hour": total_requests,
"avg_response_time_seconds": avg_duration,
"error_rate": error_rate,
},
}