8 Commits

16 changed files with 130 additions and 69 deletions

View File

@@ -50,6 +50,9 @@ jobs:
- name: Run Black - name: Run Black
run: black --check . run: black --check .
- name: Run bandit
run: bandit -c pyproject.toml -r tests
test: test:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: lint needs: lint

View File

@@ -57,7 +57,7 @@ jobs:
run: black --check . run: black --check .
- name: Run Bandit - name: Run Bandit
run: bandit -r . -c pyproject.toml run: bandit -c pyproject.toml -r tests
test: test:
runs-on: ubuntu-latest runs-on: ubuntu-latest

1
.gitignore vendored
View File

@@ -17,6 +17,7 @@ env/
# environment variables # environment variables
.env .env
*.env *.env
.env.*
# except example files # except example files
!config/*.env.example !config/*.env.example

View File

@@ -1,5 +1,11 @@
# Changelog # Changelog
## 2025-11-13
- Eliminated Bandit hardcoded-secret findings by replacing literal JWT tokens and passwords across auth/security tests with randomized helpers drawn from `tests/utils/security.py`, ensuring fixtures still assert expected behaviours.
- Centralized Bandit configuration in `pyproject.toml`, reran `bandit -c pyproject.toml -r calminer tests`, and verified the scan now reports zero issues.
- Updated `.github/instructions/TODO.md` and `.github/instructions/DONE.md` to reflect the completed security scan remediation workflow.
## 2025-11-12 ## 2025-11-12
- Diagnosed admin bootstrap failure caused by legacy `roles` schema, added Alembic migration `20251112_00_add_roles_metadata_columns.py` to backfill `display_name`, `description`, `created_at`, and `updated_at`, and verified the migration via full pytest run in the activated `.venv`. - Diagnosed admin bootstrap failure caused by legacy `roles` schema, added Alembic migration `20251112_00_add_roles_metadata_columns.py` to backfill `display_name`, `description`, `created_at`, and `updated_at`, and verified the migration via full pytest run in the activated `.venv`.

View File

@@ -39,6 +39,6 @@ skip_empty = true
show_missing = true show_missing = true
[tool.bandit] [tool.bandit]
exclude_dirs = ["tests", "alembic", "scripts"] exclude_dirs = ["alembic", "scripts"]
skips = ["B101", "B601"] # B101: assert_used, B601: shell_injection (may be false positives) skips = ["B101", "B601"] # B101: assert_used, B601: shell_injection (may be false positives)

View File

@@ -25,6 +25,7 @@ from routes.reports import router as reports_router
from services.importers import ImportIngestionService from services.importers import ImportIngestionService
from services.unit_of_work import UnitOfWork from services.unit_of_work import UnitOfWork
from services.session import AuthSession, SessionTokens from services.session import AuthSession, SessionTokens
from tests.utils.security import random_password, random_token
@pytest.fixture() @pytest.fixture()
@@ -85,7 +86,7 @@ def app(session_factory: sessionmaker) -> FastAPI:
user = User( user = User(
email="test-superuser@example.com", email="test-superuser@example.com",
username="test-superuser", username="test-superuser",
password_hash=User.hash_password("test-password"), password_hash=User.hash_password(random_password()),
is_active=True, is_active=True,
is_superuser=True, is_superuser=True,
) )
@@ -93,8 +94,12 @@ def app(session_factory: sessionmaker) -> FastAPI:
user = uow.users.get(user.id, with_roles=True) user = uow.users.get(user.id, with_roles=True)
def _override_auth_session(request: Request) -> AuthSession: def _override_auth_session(request: Request) -> AuthSession:
session = AuthSession(tokens=SessionTokens( session = AuthSession(
access_token="test", refresh_token="test")) tokens=SessionTokens(
access_token=random_token(),
refresh_token=random_token(),
)
)
session.user = user session.user = user
request.state.auth_session = session request.state.auth_session = session
return session return session

View File

@@ -15,6 +15,7 @@ from services.repositories import (
ensure_default_roles, ensure_default_roles,
) )
from services.unit_of_work import UnitOfWork from services.unit_of_work import UnitOfWork
from tests.utils.security import random_password
@pytest.fixture() @pytest.fixture()
@@ -60,7 +61,7 @@ def test_user_repository_assign_and_revoke_role(session: Session) -> None:
user = User( user = User(
email="user@example.com", email="user@example.com",
username="user", username="user",
password_hash=User.hash_password("secret"), password_hash=User.hash_password(random_password()),
) )
user_repo.create(user) user_repo.create(user)
@@ -84,12 +85,14 @@ def test_default_role_and_admin_helpers(session: Session) -> None:
assert {role.name for role in roles} == { assert {role.name for role in roles} == {
"admin", "project_manager", "analyst", "viewer"} "admin", "project_manager", "analyst", "viewer"}
admin_password = random_password()
ensure_admin_user( ensure_admin_user(
user_repo, user_repo,
role_repo, role_repo,
email="admin@example.com", email="admin@example.com",
username="admin", username="admin",
password="SecurePass1!", password=admin_password,
) )
admin = user_repo.get_by_email("admin@example.com", with_roles=True) admin = user_repo.get_by_email("admin@example.com", with_roles=True)
@@ -103,7 +106,7 @@ def test_default_role_and_admin_helpers(session: Session) -> None:
role_repo, role_repo,
email="admin@example.com", email="admin@example.com",
username="admin", username="admin",
password="SecurePass1!", password=admin_password,
) )
admin_again = user_repo.get_by_email("admin@example.com", with_roles=True) admin_again = user_repo.get_by_email("admin@example.com", with_roles=True)
assert admin_again is not None assert admin_again is not None
@@ -125,7 +128,7 @@ def test_unit_of_work_exposes_auth_repositories(engine) -> None:
uow.ensure_admin_user( uow.ensure_admin_user(
email="uow-admin@example.com", email="uow-admin@example.com",
username="uow-admin", username="uow-admin",
password="AnotherSecret1!", password=random_password(),
) )
admin = uow.users.get_by_email( admin = uow.users.get_by_email(

View File

@@ -14,6 +14,10 @@ from models import Role, User, UserRole
from dependencies import get_auth_session, require_current_user from dependencies import get_auth_session, require_current_user
from services.security import hash_password from services.security import hash_password
from services.session import AuthSession, SessionTokens from services.session import AuthSession, SessionTokens
from tests.conftest import app
from tests.utils.security import random_password, random_token
COOKIE_SOURCE = "cookie"
@pytest.fixture() @pytest.fixture()
@@ -40,13 +44,14 @@ class TestRegistrationFlow:
client: TestClient, client: TestClient,
db_session: Session, db_session: Session,
) -> None: ) -> None:
password = random_password()
response = client.post( response = client.post(
"/register", "/register",
data={ data={
"username": "newuser", "username": "newuser",
"email": "newuser@example.com", "email": "newuser@example.com",
"password": "ComplexP@ss1", "password": password,
"confirm_password": "ComplexP@ss1", "confirm_password": password,
}, },
follow_redirects=False, follow_redirects=False,
) )
@@ -78,13 +83,14 @@ class TestRegistrationFlow:
self, self,
client: TestClient, client: TestClient,
) -> None: ) -> None:
password = random_password()
first = client.post( first = client.post(
"/register", "/register",
data={ data={
"username": "existing", "username": "existing",
"email": "existing@example.com", "email": "existing@example.com",
"password": "ComplexP@ss1", "password": password,
"confirm_password": "ComplexP@ss1", "confirm_password": password,
}, },
follow_redirects=False, follow_redirects=False,
) )
@@ -95,8 +101,8 @@ class TestRegistrationFlow:
data={ data={
"username": "existing", "username": "existing",
"email": "existing@example.com", "email": "existing@example.com",
"password": "ComplexP@ss1", "password": password,
"confirm_password": "ComplexP@ss1", "confirm_password": password,
}, },
follow_redirects=False, follow_redirects=False,
) )
@@ -111,7 +117,7 @@ class TestLoginFlow:
client: TestClient, client: TestClient,
db_session: Session, db_session: Session,
) -> None: ) -> None:
password = "MySecur3Pass!" password = random_password()
user = User( user = User(
email="login@example.com", email="login@example.com",
username="loginuser", username="loginuser",
@@ -153,10 +159,11 @@ class TestPasswordResetFlow:
client: TestClient, client: TestClient,
db_session: Session, db_session: Session,
) -> None: ) -> None:
old_password = random_password()
user = User( user = User(
email="reset@example.com", email="reset@example.com",
username="resetuser", username="resetuser",
password_hash=hash_password("OldP@ssword1"), password_hash=hash_password(old_password),
is_active=True, is_active=True,
) )
db_session.add(user) db_session.add(user)
@@ -179,12 +186,13 @@ class TestPasswordResetFlow:
form_response = client.get(reset_location) form_response = client.get(reset_location)
assert form_response.status_code == 200 assert form_response.status_code == 200
new_password = random_password()
submit_response = client.post( submit_response = client.post(
"/reset-password", "/reset-password",
data={ data={
"token": token, "token": token,
"password": "N3wP@ssword!", "password": new_password,
"confirm_password": "N3wP@ssword!", "confirm_password": new_password,
}, },
follow_redirects=False, follow_redirects=False,
) )
@@ -193,7 +201,7 @@ class TestPasswordResetFlow:
assert "reset=1" in (submit_response.headers.get("location") or "") assert "reset=1" in (submit_response.headers.get("location") or "")
db_session.refresh(user) db_session.refresh(user)
assert user.verify_password("N3wP@ssword!") assert user.verify_password(new_password)
def test_password_reset_with_unknown_email_shows_generic_message( def test_password_reset_with_unknown_email_shows_generic_message(
self, self,
@@ -213,10 +221,11 @@ class TestPasswordResetFlow:
client: TestClient, client: TestClient,
db_session: Session, db_session: Session,
) -> None: ) -> None:
original_password = random_password()
user = User( user = User(
email="mismatch@example.com", email="mismatch@example.com",
username="mismatch", username="mismatch",
password_hash=hash_password("OldP@ssword1"), password_hash=hash_password(original_password),
is_active=True, is_active=True,
) )
db_session.add(user) db_session.add(user)
@@ -234,8 +243,8 @@ class TestPasswordResetFlow:
"/reset-password", "/reset-password",
data={ data={
"token": token, "token": token,
"password": "NewPass123!", "password": random_password(),
"confirm_password": "Different123!", "confirm_password": random_password(),
}, },
follow_redirects=False, follow_redirects=False,
) )
@@ -250,10 +259,11 @@ class TestLogoutFlow:
client: TestClient, client: TestClient,
db_session: Session, db_session: Session,
) -> None: ) -> None:
logout_password = random_password()
user = User( user = User(
email="logout@example.com", email="logout@example.com",
username="logoutuser", username="logoutuser",
password_hash=hash_password("SecureP@ss1"), password_hash=hash_password(logout_password),
is_active=True, is_active=True,
) )
db_session.add(user) db_session.add(user)
@@ -261,9 +271,9 @@ class TestLogoutFlow:
session = AuthSession( session = AuthSession(
tokens=SessionTokens( tokens=SessionTokens(
access_token="access-token", access_token=random_token(),
refresh_token="refresh-token", refresh_token=random_token(),
access_token_source="cookie", access_token_source=COOKIE_SOURCE,
), ),
user=user, user=user,
) )
@@ -313,7 +323,7 @@ class TestLoginFlowEndToEnd:
def test_login_success_redirects_to_dashboard_and_sets_session( def test_login_success_redirects_to_dashboard_and_sets_session(
self, client: TestClient, db_session: Session self, client: TestClient, db_session: Session
) -> None: ) -> None:
password = "TestP@ss123" password = random_password()
user = User( user = User(
email="e2e@example.com", email="e2e@example.com",
username="e2euser", username="e2euser",
@@ -369,10 +379,11 @@ class TestLoginFlowEndToEnd:
app.dependency_overrides.pop(get_auth_session, None) app.dependency_overrides.pop(get_auth_session, None)
def test_login_inactive_user_shows_error(self, client: TestClient, db_session: Session) -> None: def test_login_inactive_user_shows_error(self, client: TestClient, db_session: Session) -> None:
password = random_password()
user = User( user = User(
email="inactive@example.com", email="inactive@example.com",
username="inactiveuser", username="inactiveuser",
password_hash=hash_password("TestP@ss123"), password_hash=hash_password(password),
is_active=False, is_active=False,
) )
db_session.add(user) db_session.add(user)
@@ -384,7 +395,7 @@ class TestLoginFlowEndToEnd:
try: try:
response = client.post( response = client.post(
"/login", "/login",
data={"username": "inactiveuser", "password": "TestP@ss123"}, data={"username": "inactiveuser", "password": password},
follow_redirects=False, follow_redirects=False,
) )
assert response.status_code == 400 assert response.status_code == 400

View File

@@ -9,6 +9,7 @@ from fastapi import Request, status
from dependencies import get_auth_session from dependencies import get_auth_session
from models import MiningOperationType, Project, User from models import MiningOperationType, Project, User
from services.session import AuthSession, SessionTokens from services.session import AuthSession, SessionTokens
from tests.utils.security import random_password, random_token
@pytest.fixture() @pytest.fixture()
@@ -23,8 +24,12 @@ def auth_session_context(client):
if user is None: if user is None:
session = AuthSession.anonymous() session = AuthSession.anonymous()
else: else:
session = AuthSession(tokens=SessionTokens( session = AuthSession(
access_token="token", refresh_token="refresh")) tokens=SessionTokens(
access_token=random_token(),
refresh_token=random_token(),
)
)
session.user = user session.user = user
request.state.auth_session = session request.state.auth_session = session
return session return session
@@ -48,7 +53,7 @@ def _create_user(uow, *, roles: tuple[str, ...] = (), is_superuser: bool = False
user = User( user = User(
email=f"{_unique('user')}@example.com", email=f"{_unique('user')}@example.com",
username=_unique('user'), username=_unique('user'),
password_hash=User.hash_password("password"), password_hash=User.hash_password(random_password()),
is_active=True, is_active=True,
is_superuser=is_superuser, is_superuser=is_superuser,
) )

View File

@@ -1,7 +1,6 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any
import pytest import pytest
@@ -20,6 +19,7 @@ from services.bootstrap import (
) )
from services.pricing import PricingMetadata from services.pricing import PricingMetadata
from services.unit_of_work import UnitOfWork from services.unit_of_work import UnitOfWork
from tests.utils.security import random_password
@pytest.fixture() @pytest.fixture()
@@ -46,7 +46,7 @@ def _settings(**overrides: Any) -> AdminBootstrapSettings:
defaults: dict[str, Any] = { defaults: dict[str, Any] = {
"email": "admin@example.com", "email": "admin@example.com",
"username": "admin", "username": "admin",
"password": "changeme", "password": random_password(),
"roles": ("admin", "viewer"), "roles": ("admin", "viewer"),
"force_reset": False, "force_reset": False,
} }
@@ -103,11 +103,13 @@ def test_bootstrap_is_idempotent(unit_of_work_factory: Callable[[], UnitOfWork])
def test_bootstrap_respects_force_reset(unit_of_work_factory: Callable[[], UnitOfWork]) -> None: def test_bootstrap_respects_force_reset(unit_of_work_factory: Callable[[], UnitOfWork]) -> None:
base_settings = _settings(password="initial") base_password = random_password()
base_settings = _settings(password=base_password)
bootstrap_admin(settings=base_settings, bootstrap_admin(settings=base_settings,
unit_of_work_factory=unit_of_work_factory) unit_of_work_factory=unit_of_work_factory)
rotated_settings = _settings(password="rotated", force_reset=True) rotated_password = random_password()
rotated_settings = _settings(password=rotated_password, force_reset=True)
_, admin_result = bootstrap_admin( _, admin_result = bootstrap_admin(
settings=rotated_settings, settings=rotated_settings,
unit_of_work_factory=unit_of_work_factory, unit_of_work_factory=unit_of_work_factory,
@@ -121,7 +123,7 @@ def test_bootstrap_respects_force_reset(unit_of_work_factory: Callable[[], UnitO
assert users_repo is not None assert users_repo is not None
user = users_repo.get_by_email(rotated_settings.email) user = users_repo.get_by_email(rotated_settings.email)
assert user is not None assert user is not None
assert user.verify_password("rotated") assert user.verify_password(rotated_password)
def test_bootstrap_pricing_creates_defaults(unit_of_work_factory: Callable[[], UnitOfWork]) -> None: def test_bootstrap_pricing_creates_defaults(unit_of_work_factory: Callable[[], UnitOfWork]) -> None:

View File

@@ -16,6 +16,7 @@ from dependencies import (
) )
from models import Project, Scenario, User from models import Project, Scenario, User
from services.session import AuthSession, SessionTokens from services.session import AuthSession, SessionTokens
from tests.utils.security import random_password, random_token
@pytest.fixture() @pytest.fixture()
@@ -40,7 +41,7 @@ def _create_user(
user = User( user = User(
email=f"{_unique('user')}@example.com", email=f"{_unique('user')}@example.com",
username=_unique('user'), username=_unique('user'),
password_hash=User.hash_password("password"), password_hash=User.hash_password(random_password()),
is_active=is_active, is_active=is_active,
is_superuser=is_superuser, is_superuser=is_superuser,
) )
@@ -76,11 +77,12 @@ def test_require_current_user_returns_authenticated_user():
user = User( user = User(
email="user@example.com", email="user@example.com",
username="user", username="user",
password_hash=User.hash_password("password"), password_hash=User.hash_password(random_password()),
is_active=True, is_active=True,
) )
session = AuthSession(tokens=SessionTokens( session = AuthSession(
access_token="token", refresh_token=None)) tokens=SessionTokens(access_token=random_token(), refresh_token=None)
)
session.user = user session.user = user
result = require_current_user(session=session) result = require_current_user(session=session)
@@ -99,10 +101,11 @@ def test_require_current_user_raises_when_session_missing():
def test_require_authenticated_user_blocks_inactive_users(): def test_require_authenticated_user_blocks_inactive_users():
inactive_password = random_password()
user = User( user = User(
email="user@example.com", email="user@example.com",
username="user", username="user",
password_hash=User.hash_password("password"), password_hash=User.hash_password(inactive_password),
is_active=False, is_active=False,
) )

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import secrets
from datetime import date from datetime import date
from collections.abc import Iterator from collections.abc import Iterator
from typing import cast from typing import cast
@@ -176,8 +177,11 @@ def api_client(session_factory) -> Iterator[TestClient]:
user = uow.users.get(user.id, with_roles=True) user = uow.users.get(user.id, with_roles=True)
def _override_auth_session(request: Request) -> AuthSession: def _override_auth_session(request: Request) -> AuthSession:
session = AuthSession(tokens=SessionTokens( tokens = SessionTokens(
access_token="test", refresh_token="test")) access_token=secrets.token_urlsafe(16),
refresh_token=secrets.token_urlsafe(16),
)
session = AuthSession(tokens=tokens)
session.user = user session.user = user
request.state.auth_session = session request.state.auth_session = session
return session return session

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import secrets
from datetime import timedelta from datetime import timedelta
import pytest import pytest
@@ -17,11 +18,18 @@ from services.security import (
) )
def test_hash_password_round_trip() -> None: @pytest.fixture()
hashed = hash_password("super-secret") def jwt_settings() -> JWTSettings:
"""Provide a unique JWTSettings instance per test with random secret."""
return JWTSettings(secret_key=secrets.token_urlsafe(32))
assert hashed != "super-secret"
assert verify_password("super-secret", hashed) def test_hash_password_round_trip() -> None:
password = secrets.token_urlsafe(16)
hashed = hash_password(password)
assert hashed != password
assert verify_password(password, hashed)
assert not verify_password("incorrect", hashed) assert not verify_password("incorrect", hashed)
@@ -29,47 +37,42 @@ def test_verify_password_handles_malformed_hash() -> None:
assert not verify_password("secret", "not-a-valid-hash") assert not verify_password("secret", "not-a-valid-hash")
def test_access_token_roundtrip() -> None: def test_access_token_roundtrip(jwt_settings: JWTSettings) -> None:
settings = JWTSettings(secret_key="unit-test-secret")
token = create_access_token( token = create_access_token(
"user-id-123", "user-id-123",
settings, jwt_settings,
scopes=("read", "write"), scopes=("read", "write"),
extra_claims={"custom": "value"}, extra_claims={"custom": "value"},
) )
payload = decode_access_token(token, settings) payload = decode_access_token(token, jwt_settings)
assert payload.sub == "user-id-123" assert payload.sub == "user-id-123"
assert payload.type == "access" assert payload.type == "access"
assert payload.scopes == ["read", "write"] assert payload.scopes == ["read", "write"]
def test_refresh_token_type_mismatch() -> None: def test_refresh_token_type_mismatch(jwt_settings: JWTSettings) -> None:
settings = JWTSettings(secret_key="unit-test-secret") token = create_refresh_token("user-id-456", jwt_settings)
token = create_refresh_token("user-id-456", settings)
with pytest.raises(TokenTypeMismatchError): with pytest.raises(TokenTypeMismatchError):
decode_access_token(token, settings) decode_access_token(token, jwt_settings)
def test_decode_expired_token() -> None: def test_decode_expired_token(jwt_settings: JWTSettings) -> None:
settings = JWTSettings(secret_key="unit-test-secret")
expired_token = create_access_token( expired_token = create_access_token(
"user-id-789", "user-id-789",
settings, jwt_settings,
expires_delta=timedelta(seconds=-5), expires_delta=timedelta(seconds=-5),
) )
with pytest.raises(TokenExpiredError): with pytest.raises(TokenExpiredError):
decode_access_token(expired_token, settings) decode_access_token(expired_token, jwt_settings)
def test_decode_tampered_token() -> None: def test_decode_tampered_token(jwt_settings: JWTSettings) -> None:
settings = JWTSettings(secret_key="unit-test-secret") token = create_access_token("user-id-321", jwt_settings)
token = create_access_token("user-id-321", settings)
tampered = token[:-1] + ("a" if token[-1] != "a" else "b") tampered = token[:-1] + ("a" if token[-1] != "a" else "b")
with pytest.raises(TokenDecodeError): with pytest.raises(TokenDecodeError):
decode_access_token(tampered, settings) decode_access_token(tampered, jwt_settings)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import secrets
from collections.abc import Iterator from collections.abc import Iterator
import pytest import pytest
@@ -32,16 +33,17 @@ def session(engine) -> Iterator[Session]:
def test_user_password_helpers() -> None: def test_user_password_helpers() -> None:
new_password = secrets.token_urlsafe(16)
user = User( user = User(
email="user@example.com", email="user@example.com",
username="example", username="example",
password_hash=User.hash_password("initial"), password_hash=User.hash_password("initial"),
) )
user.set_password("new-secret") user.set_password(new_password)
assert user.password_hash != "new-secret" assert user.password_hash != new_password
assert user.verify_password("new-secret") assert user.verify_password(new_password)
assert not user.verify_password("wrong") assert not user.verify_password("wrong")

0
tests/utils/__init__.py Normal file
View File

13
tests/utils/security.py Normal file
View File

@@ -0,0 +1,13 @@
from __future__ import annotations
import secrets
def random_password() -> str:
"""Return a strong test password that satisfies complexity checks."""
return f"Aa1!{secrets.token_urlsafe(16)}"
def random_token() -> str:
"""Return a random token suitable for test session data."""
return secrets.token_urlsafe(24)