from __future__ import annotations import csv from dataclasses import dataclass, field from datetime import date, datetime, timezone from decimal import Decimal, InvalidOperation, ROUND_HALF_UP from enum import Enum from io import BytesIO, StringIO from typing import Any, Callable, Iterable, Iterator, Mapping, Sequence from openpyxl import Workbook CSVValueFormatter = Callable[[Any], str] Accessor = Callable[[Any], Any] __all__ = [ "CSVExportColumn", "CSVExporter", "default_project_columns", "default_scenario_columns", "stream_projects_to_csv", "stream_scenarios_to_csv", "ExcelExporter", "export_projects_to_excel", "export_scenarios_to_excel", "default_formatter", "format_datetime_utc", "format_date_iso", "format_decimal", ] @dataclass(slots=True) class CSVExportColumn: """Declarative description of a CSV export column.""" header: str accessor: Accessor | str formatter: CSVValueFormatter | None = None required: bool = False _accessor: Accessor = field(init=False, repr=False) def __post_init__(self) -> None: object.__setattr__(self, "_accessor", _coerce_accessor(self.accessor)) def value_for(self, entity: Any) -> Any: accessor = object.__getattribute__(self, "_accessor") try: return accessor(entity) except Exception: # pragma: no cover - defensive safeguard return None class CSVExporter: """Stream Python objects as UTF-8 encoded CSV rows.""" def __init__( self, columns: Sequence[CSVExportColumn], *, include_header: bool = True, line_terminator: str = "\n", ) -> None: if not columns: raise ValueError("At least one column is required for CSV export.") self._columns: tuple[CSVExportColumn, ...] = tuple(columns) self._include_header = include_header self._line_terminator = line_terminator @property def columns(self) -> tuple[CSVExportColumn, ...]: return self._columns def headers(self) -> tuple[str, ...]: return tuple(column.header for column in self._columns) def iter_bytes(self, records: Iterable[Any]) -> Iterator[bytes]: buffer = StringIO() writer = csv.writer(buffer, lineterminator=self._line_terminator) if self._include_header: writer.writerow(self.headers()) yield _drain_buffer(buffer) for record in records: writer.writerow(self._format_row(record)) yield _drain_buffer(buffer) def _format_row(self, record: Any) -> list[str]: formatted: list[str] = [] for column in self._columns: raw_value = column.value_for(record) formatter = column.formatter or default_formatter formatted.append(formatter(raw_value)) return formatted def default_project_columns( *, include_description: bool = True, include_timestamps: bool = True, ) -> tuple[CSVExportColumn, ...]: columns: list[CSVExportColumn] = [ CSVExportColumn("name", "name", required=True), CSVExportColumn("location", "location"), CSVExportColumn("operation_type", "operation_type"), ] if include_description: columns.append(CSVExportColumn("description", "description")) if include_timestamps: columns.extend( ( CSVExportColumn("created_at", "created_at", formatter=format_datetime_utc), CSVExportColumn("updated_at", "updated_at", formatter=format_datetime_utc), ) ) return tuple(columns) def default_scenario_columns( *, include_description: bool = True, include_timestamps: bool = True, ) -> tuple[CSVExportColumn, ...]: columns: list[CSVExportColumn] = [ CSVExportColumn( "project_name", lambda scenario: getattr( getattr(scenario, "project", None), "name", None), required=True, ), CSVExportColumn("name", "name", required=True), CSVExportColumn("status", "status"), CSVExportColumn("start_date", "start_date", formatter=format_date_iso), CSVExportColumn("end_date", "end_date", formatter=format_date_iso), CSVExportColumn("discount_rate", "discount_rate", formatter=format_decimal), CSVExportColumn("currency", "currency"), CSVExportColumn("primary_resource", "primary_resource"), ] if include_description: columns.append(CSVExportColumn("description", "description")) if include_timestamps: columns.extend( ( CSVExportColumn("created_at", "created_at", formatter=format_datetime_utc), CSVExportColumn("updated_at", "updated_at", formatter=format_datetime_utc), ) ) return tuple(columns) def stream_projects_to_csv( projects: Iterable[Any], *, columns: Sequence[CSVExportColumn] | None = None, ) -> Iterator[bytes]: resolved_columns = tuple(columns or default_project_columns()) exporter = CSVExporter(resolved_columns) yield from exporter.iter_bytes(projects) def stream_scenarios_to_csv( scenarios: Iterable[Any], *, columns: Sequence[CSVExportColumn] | None = None, ) -> Iterator[bytes]: resolved_columns = tuple(columns or default_scenario_columns()) exporter = CSVExporter(resolved_columns) yield from exporter.iter_bytes(scenarios) def default_formatter(value: Any) -> str: if value is None: return "" if isinstance(value, Enum): return str(value.value) if isinstance(value, Decimal): return format_decimal(value) if isinstance(value, datetime): return format_datetime_utc(value) if isinstance(value, date): return format_date_iso(value) if isinstance(value, bool): return "true" if value else "false" return str(value) def format_datetime_utc(value: Any) -> str: if not isinstance(value, datetime): return "" if value.tzinfo is None: value = value.replace(tzinfo=timezone.utc) value = value.astimezone(timezone.utc) return value.isoformat().replace("+00:00", "Z") def format_date_iso(value: Any) -> str: if not isinstance(value, date): return "" return value.isoformat() def format_decimal(value: Any) -> str: if value is None: return "" if isinstance(value, Decimal): try: quantised = value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) except InvalidOperation: # pragma: no cover - unexpected precision issues quantised = value return format(quantised, "f") if isinstance(value, (int, float)): return f"{value:.2f}" return default_formatter(value) class ExcelExporter: """Produce Excel workbooks via write-only streaming.""" def __init__( self, columns: Sequence[CSVExportColumn], *, sheet_name: str = "Export", workbook_title: str | None = None, include_header: bool = True, metadata: Mapping[str, Any] | None = None, metadata_sheet_name: str = "Metadata", ) -> None: if not columns: raise ValueError( "At least one column is required for Excel export.") self._columns: tuple[CSVExportColumn, ...] = tuple(columns) self._sheet_name = sheet_name or "Export" self._include_header = include_header self._metadata = dict(metadata) if metadata else None self._metadata_sheet_name = metadata_sheet_name or "Metadata" self._workbook = Workbook(write_only=True) if workbook_title: self._workbook.properties.title = workbook_title def export(self, records: Iterable[Any]) -> bytes: sheet = self._workbook.create_sheet(title=self._sheet_name) if self._include_header: sheet.append([column.header for column in self._columns]) for record in records: sheet.append(self._format_row(record)) self._append_metadata_sheet() return self._finalize() def _format_row(self, record: Any) -> list[Any]: row: list[Any] = [] for column in self._columns: raw_value = column.value_for(record) formatter = column.formatter or default_formatter row.append(formatter(raw_value)) return row def _append_metadata_sheet(self) -> None: if not self._metadata: return sheet_name = self._metadata_sheet_name existing = set(self._workbook.sheetnames) if sheet_name in existing: index = 1 while True: candidate = f"{sheet_name}_{index}" if candidate not in existing: sheet_name = candidate break index += 1 meta_ws = self._workbook.create_sheet(title=sheet_name) meta_ws.append(["Key", "Value"]) for key, value in self._metadata.items(): meta_ws.append([ str(key), "" if value is None else str(value), ]) def _finalize(self) -> bytes: buffer = BytesIO() self._workbook.save(buffer) buffer.seek(0) return buffer.getvalue() def export_projects_to_excel( projects: Iterable[Any], *, columns: Sequence[CSVExportColumn] | None = None, sheet_name: str = "Projects", workbook_title: str | None = None, metadata: Mapping[str, Any] | None = None, ) -> bytes: exporter = ExcelExporter( columns or default_project_columns(), sheet_name=sheet_name, workbook_title=workbook_title, metadata=metadata, ) return exporter.export(projects) def export_scenarios_to_excel( scenarios: Iterable[Any], *, columns: Sequence[CSVExportColumn] | None = None, sheet_name: str = "Scenarios", workbook_title: str | None = None, metadata: Mapping[str, Any] | None = None, ) -> bytes: exporter = ExcelExporter( columns or default_scenario_columns(), sheet_name=sheet_name, workbook_title=workbook_title, metadata=metadata, ) return exporter.export(scenarios) def _coerce_accessor(accessor: Accessor | str) -> Accessor: if callable(accessor): return accessor path = [segment for segment in accessor.split(".") if segment] def _resolve(entity: Any) -> Any: current: Any = entity for segment in path: if current is None: return None current = getattr(current, segment, None) return current return _resolve def _drain_buffer(buffer: StringIO) -> bytes: data = buffer.getvalue() buffer.seek(0) buffer.truncate(0) return data.encode("utf-8")