initial commit
This commit is contained in:
84
tests/test_tls_and_auth.py
Normal file
84
tests/test_tls_and_auth.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import pytest
|
||||
|
||||
from utils.proxmox_client import ProxmoxClient
|
||||
from utils.check_mk_client import CheckMKClient
|
||||
|
||||
|
||||
class DummyResponse:
|
||||
def __init__(self, json_data=None, text=''):
|
||||
self._json = json_data or {}
|
||||
self.text = text
|
||||
|
||||
def raise_for_status(self):
|
||||
return None
|
||||
|
||||
def json(self):
|
||||
return self._json
|
||||
|
||||
|
||||
class DummySession:
|
||||
def __init__(self):
|
||||
self.headers = {}
|
||||
self.auth = None
|
||||
self.called = {}
|
||||
|
||||
def post(self, url, data=None, verify=True, timeout=None):
|
||||
# record verify
|
||||
self.called['post'] = {'url': url, 'verify': verify, 'data': data}
|
||||
return DummyResponse({'data': {'ticket': 'TICKET', 'CSRFPreventionToken': 'CSRF'}})
|
||||
|
||||
def get(self, url, params=None, verify=True, timeout=None):
|
||||
self.called['get'] = {'url': url, 'verify': verify, 'params': params}
|
||||
# Return cluster resource like structure for proxmox
|
||||
if 'cluster/resources' in url:
|
||||
return DummyResponse({'data': [{'type': 'node', 'node': 'node1', 'status': 'online', 'maxmem': 1024, 'maxcpu': 2}]})
|
||||
if 'qemu' in url:
|
||||
return DummyResponse({'data': []})
|
||||
# Check_MK endpoints (accept singular host endpoint too)
|
||||
if '/api/1.0/objects/hosts' in url or '/api/1.0/objects/host' in url:
|
||||
return DummyResponse({'result': [{'host_name': 'node1', 'state': 'ok'}]})
|
||||
if '/api/1.0/objects/services' in url:
|
||||
return DummyResponse({'result': [{'service_description': 'ping', 'state': 'ok', 'output': 'OK'}]})
|
||||
return DummyResponse()
|
||||
|
||||
|
||||
def test_proxmox_verify_and_token(monkeypatch):
|
||||
dummy = DummySession()
|
||||
# ensure the client's requests.Session() returns our dummy session
|
||||
monkeypatch.setattr('utils.proxmox_client.requests.Session', lambda: dummy)
|
||||
client = ProxmoxClient('https://pve.example/api2/json',
|
||||
api_token='user!token=secret', verify=False, ca_bundle=None)
|
||||
|
||||
cluster = client.get_cluster()
|
||||
assert 'nodes' in cluster
|
||||
# ensure token set as header
|
||||
assert 'Authorization' in dummy.headers
|
||||
assert dummy.headers['Authorization'].startswith('PVEAPIToken=')
|
||||
# ensure GET used verify=False
|
||||
assert dummy.called['get']['verify'] == False
|
||||
|
||||
|
||||
def test_proxmox_login_verify(monkeypatch):
|
||||
dummy = DummySession()
|
||||
# monkeypatch requests.Session so login uses DummySession
|
||||
monkeypatch.setattr('utils.proxmox_client.requests.Session', lambda: dummy)
|
||||
client = ProxmoxClient('https://pve.example/api2/json',
|
||||
user='root@pam', password='pw', verify=True)
|
||||
# login is performed lazily on first request; call get_cluster to trigger it
|
||||
_ = client.get_cluster()
|
||||
# ensure post verify True recorded
|
||||
assert dummy.called['post']['verify'] == True
|
||||
|
||||
|
||||
def test_checkmk_verify_and_auth(monkeypatch):
|
||||
dummy = DummySession()
|
||||
monkeypatch.setattr(
|
||||
'utils.check_mk_client.requests.Session', lambda: dummy)
|
||||
client = CheckMKClient('https://cmk.example',
|
||||
api_token='secrettoken', verify=False)
|
||||
# call get_host_status which will call GET on DummySession
|
||||
status = client.get_host_status('node1')
|
||||
assert status.get('host_name') == 'node1'
|
||||
# verify header set
|
||||
assert 'Authorization' in dummy.headers
|
||||
assert dummy.called['get']['verify'] == False
|
||||
Reference in New Issue
Block a user