import json import requests from typing import Dict, Any, Optional from utils.cache import read_cache, write_cache class ProxmoxClient: def __init__(self, base_url: str, user: Optional[str] = None, password: Optional[str] = None, api_token: Optional[str] = None, verify: bool = True, ca_bundle: Optional[str] = None): self.base_url = base_url.rstrip('/') self.session = requests.Session() self.user = user self.password = password self.api_token = api_token self.csrf_token = None self.verify = verify self.ca_bundle = ca_bundle or None # configure auth: prefer API token (PVEAPIToken=userid!tokenid=secret) if api_token: # API token format expected by env: != # We'll provide it as a header 'Authorization: PVEAPIToken=' self.session.headers.update( {'Authorization': f'PVEAPIToken={api_token}'}) # Do not login during __init__ to avoid network calls at import time. self._logged_in = False def _login(self) -> None: url = f"{self.base_url}/access/ticket" resp = self.session.post( url, data={'username': self.user, 'password': self.password}, verify=(self.ca_bundle if self.ca_bundle else self.verify), timeout=10, ) resp.raise_for_status() data = resp.json().get('data', {}) ticket = data.get('ticket') csrf = data.get('CSRFPreventionToken') or data.get('csrf_token') if ticket: # set cookie for subsequent requests. real Session has cookies with set(); # if the session object doesn't support cookies (e.g., dummy in tests), # fall back to adding a Cookie header. try: self.session.cookies.set('PVEAuthCookie', ticket) except Exception: # fallback to header existing = self.session.headers.get('Cookie', '') cookie_val = f'PVEAuthCookie={ticket}' if existing: c = str(existing) + '; ' + cookie_val self.session.headers['Cookie'] = c else: self.session.headers['Cookie'] = cookie_val if csrf: self.csrf_token = csrf self.session.headers.update({'CSRFPreventionToken': csrf}) # mark logged in so future requests don't re-login self._logged_in = True def _ensure_logged_in(self) -> None: if self.api_token or self._logged_in: return # if PVEAuthCookie already present in headers, treat as logged in if 'PVEAuthCookie' in self.session.headers.get('Cookie', ''): self._logged_in = True return if self.user and self.password: self._login() def _get(self, path: str) -> Dict[str, Any]: url = f"{self.base_url}/{path.lstrip('/')}" # ensure authentication is ready before GET self._ensure_logged_in() # try cache first cache_key = json.dumps( {'method': 'GET', 'url': url, 'params': None}, sort_keys=True) cached = read_cache(cache_key) if cached is not None: return cached resp = self.session.get(url, verify=( self.ca_bundle if self.ca_bundle else self.verify), timeout=10) resp.raise_for_status() try: data = resp.json() except ValueError: data = {'raw': resp.text} # write cache (best-effort) try: write_cache(cache_key, data) except Exception: pass return data def get_cluster(self) -> Dict[str, Any]: # /cluster/resources returns resources including nodes data = self._get('/cluster/resources') nodes = [] for item in data.get('data', []): if item.get('type') == 'node': node_name = item.get('node') try: qemu = self._get(f'/nodes/{node_name}/qemu') vms = qemu.get('data', []) except Exception: vms = [] try: lxc = self._get(f'/nodes/{node_name}/lxc') containers = lxc.get('data', []) except Exception: containers = [] node = { 'name': node_name, 'status': item.get('status'), 'memory': item.get('maxmem'), 'cpu': item.get('maxcpu'), 'qemu': vms, 'lxc': containers, } nodes.append(node) return {'nodes': nodes}