Files
incidentops/app/db.py
minhtrannhat 46ede7757d feat: add observability stack and background task infrastructure
Add OpenTelemetry instrumentation with distributed tracing and metrics:
- Structured JSON logging with trace context correlation
- Auto-instrumentation for FastAPI, asyncpg, httpx, redis
- OTLP exporter for traces and Prometheus metrics endpoint

Implement Celery worker and notification task system:
- Celery app with Redis/SQS broker support and configurable queues
- Notification tasks for incident fan-out, webhooks, and escalations
- Pluggable TaskQueue abstraction with in-memory driver for testing

Add Grafana observability stack (Loki, Tempo, Prometheus, Grafana):
- OpenTelemetry Collector for receiving OTLP traces and logs
- Tempo for distributed tracing backend
- Loki for log aggregation with Promtail DaemonSet
- Prometheus for metrics scraping with RBAC configuration
- Grafana with pre-provisioned datasources and API overview dashboard
- Helm templates for all observability components

Enhance application infrastructure:
- Global exception handlers with structured ErrorResponse schema
- Request logging middleware with timing metrics
- Health check updated to verify task queue connectivity
- Non-root user in Dockerfile for security
- Init containers in Helm deployments for dependency ordering
- Production Helm values with autoscaling and retention policies
2026-01-07 20:51:13 -05:00

75 lines
2.1 KiB
Python

"""Database connection management using asyncpg."""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from contextvars import ContextVar
import asyncpg
from asyncpg.pool import PoolConnectionProxy
class Database:
"""Manages asyncpg connection pool."""
pool: asyncpg.Pool | None = None
async def connect(self, dsn: str) -> None:
"""Create connection pool."""
self.pool = await asyncpg.create_pool(
dsn,
min_size=5,
max_size=20,
command_timeout=60,
)
async def disconnect(self) -> None:
"""Close connection pool."""
if self.pool:
await self.pool.close()
@asynccontextmanager
async def connection(self) -> AsyncGenerator[asyncpg.Connection | PoolConnectionProxy, None]:
"""Acquire a connection from the pool."""
if not self.pool:
raise RuntimeError("Database not connected")
async with self.pool.acquire() as conn:
yield conn
@asynccontextmanager
async def transaction(self) -> AsyncGenerator[asyncpg.Connection | PoolConnectionProxy, None]:
"""Acquire a connection with an active transaction."""
if not self.pool:
raise RuntimeError("Database not connected")
async with self.pool.acquire() as conn:
async with conn.transaction():
yield conn
# Global instance
db = Database()
_connection_ctx: ContextVar[asyncpg.Connection | PoolConnectionProxy | None] = ContextVar(
"db_connection",
default=None,
)
async def get_conn() -> AsyncGenerator[asyncpg.Connection | PoolConnectionProxy, None]:
"""Dependency that reuses the same DB connection within a request context."""
existing_conn = _connection_ctx.get()
if existing_conn is not None:
yield existing_conn
return
if not db.pool:
raise RuntimeError("Database not connected")
async with db.pool.acquire() as conn:
token = _connection_ctx.set(conn)
try:
yield conn
finally:
_connection_ctx.reset(token)