feat: add audit events and runtime state snapshots to database
- Introduced new tables for audit events and runtime state snapshots in the database schema. - Created data classes for AuditRecord and RuntimeStateRecord to represent the new entities. - Implemented AuditRepository and RuntimeStateRepository for inserting and retrieving records. - Enhanced the dashboard to include an audit trail section, displaying recent audit events. - Added tests for the new audit repository and runtime lifecycle functionalities. - Updated settings validation to ensure proper configuration for alerting features. - Integrated alert notifications across various components, including execution sequencer and loss limits.
This commit is contained in:
@@ -0,0 +1,101 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
WORKSPACE = Path(__file__).resolve().parents[1]
|
||||
|
||||
EXCLUDED_DIRS = {
|
||||
".git",
|
||||
".venv",
|
||||
"__pycache__",
|
||||
".mypy_cache",
|
||||
".pytest_cache",
|
||||
"data",
|
||||
"node_modules",
|
||||
}
|
||||
|
||||
PATTERNS: list[tuple[str, re.Pattern[str]]] = [
|
||||
("private_key", re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----")),
|
||||
("aws_access_key", re.compile(r"AKIA[0-9A-Z]{16}")),
|
||||
(
|
||||
"generic_token",
|
||||
re.compile(
|
||||
r"(?i)(token|secret|password)\s*[:=]\s*['\"]?"
|
||||
r"(?=[A-Za-z0-9_\-+/=.]{20,})(?=.*[A-Za-z])(?=.*\d)[A-Za-z0-9_\-+/=.]{20,}"
|
||||
),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def _is_probably_text(path: Path) -> bool:
|
||||
try:
|
||||
with path.open("rb") as handle:
|
||||
sample = handle.read(2048)
|
||||
except OSError:
|
||||
return False
|
||||
return b"\x00" not in sample
|
||||
|
||||
|
||||
def scan_worktree() -> list[str]:
|
||||
findings: list[str] = []
|
||||
tracked = subprocess.run(
|
||||
["git", "-C", str(WORKSPACE), "ls-files"],
|
||||
check=False,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if tracked.returncode != 0:
|
||||
return ["worktree_scan_failed"]
|
||||
|
||||
for rel_path in tracked.stdout.splitlines():
|
||||
path = WORKSPACE / rel_path
|
||||
if not path.is_file() or any(part in EXCLUDED_DIRS for part in path.parts):
|
||||
continue
|
||||
if not _is_probably_text(path):
|
||||
continue
|
||||
|
||||
try:
|
||||
content = path.read_text(encoding="utf-8", errors="ignore")
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
for rule_name, pattern in PATTERNS:
|
||||
if pattern.search(content):
|
||||
findings.append(
|
||||
f"worktree:{path.relative_to(WORKSPACE)}:{rule_name}")
|
||||
return findings
|
||||
|
||||
|
||||
def scan_git_history() -> list[str]:
|
||||
cmd = ["git", "-C", str(WORKSPACE), "log", "--all",
|
||||
"-p", "--pretty=format:%H"]
|
||||
completed = subprocess.run(
|
||||
cmd, check=False, capture_output=True, text=True)
|
||||
if completed.returncode != 0:
|
||||
return ["history_scan_failed"]
|
||||
|
||||
findings: list[str] = []
|
||||
data = completed.stdout
|
||||
for rule_name, pattern in PATTERNS:
|
||||
if pattern.search(data):
|
||||
findings.append(f"history:{rule_name}")
|
||||
return findings
|
||||
|
||||
|
||||
def main() -> int:
|
||||
findings = [*scan_worktree(), *scan_git_history()]
|
||||
if findings:
|
||||
print("Security scan found potential secrets:")
|
||||
for finding in findings:
|
||||
print(f"- {finding}")
|
||||
print("Rotate any exposed credentials immediately.")
|
||||
return 1
|
||||
|
||||
print("Security scan passed: no obvious secrets detected in worktree/history.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user