"""DuckDB singleton connection with asyncio write lock and schema migrations.""" import asyncio import os import duckdb _conn: duckdb.DuckDBPyConnection | None = None _write_lock = asyncio.Lock() def get_db_path() -> str: return os.getenv("DB_PATH", "data/app.db") def init_db(path: str | None = None) -> duckdb.DuckDBPyConnection: """Open (or reuse) the DuckDB connection and run schema migrations.""" global _conn if _conn is not None: return _conn db_path = path or get_db_path() if db_path != ":memory:": os.makedirs(os.path.dirname(db_path), exist_ok=True) _conn = duckdb.connect(db_path) _run_migrations(_conn) return _conn def get_conn() -> duckdb.DuckDBPyConnection: """Return the active connection; raises if not yet initialised.""" if _conn is None: raise RuntimeError("Database not initialised. Call init_db() first.") return _conn def close_db() -> None: """Close the connection (called on app shutdown).""" global _conn if _conn is not None: _conn.close() _conn = None def get_write_lock() -> asyncio.Lock: """Return the asyncio lock that serialises write operations.""" return _write_lock def _run_migrations(conn: duckdb.DuckDBPyConnection) -> None: conn.execute(""" CREATE TABLE IF NOT EXISTS users ( id UUID DEFAULT uuid() PRIMARY KEY, email VARCHAR NOT NULL UNIQUE, password_hash VARCHAR NOT NULL, role VARCHAR DEFAULT 'user', created_at TIMESTAMP DEFAULT now(), updated_at TIMESTAMP DEFAULT now() ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS refresh_tokens ( jti UUID DEFAULT uuid() PRIMARY KEY, user_id UUID NOT NULL, issued_at TIMESTAMP DEFAULT now(), expires_at TIMESTAMP NOT NULL, revoked BOOLEAN DEFAULT false ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS uploaded_images ( id UUID DEFAULT uuid() PRIMARY KEY, user_id UUID NOT NULL, filename VARCHAR NOT NULL, content_type VARCHAR NOT NULL, file_path VARCHAR NOT NULL, size_bytes BIGINT NOT NULL, created_at TIMESTAMP DEFAULT now() ) """)