import json import requests from typing import Dict, Any, List, Optional from utils.cache import read_cache, write_cache SITE_NAME = 'monitoring' PROTO = 'http' # or 'https' INSTANCES = { "pve": "192.168.88.91", "naspve": "192.168.88.92" } PATHS = { "host_status": "domain-types/host_config/collections/all", "host_status_single": "objects/host/{hostname}", "host_services": "domain-types/service/collections/all" } def get_api_url(hostname: str) -> str: return f"{PROTO}://{INSTANCES[hostname]}/{SITE_NAME}/check_mk/api/1.0" def get_api_endpoint(hostname: str, type: str) -> str: base = f"{PROTO}://{INSTANCES[hostname]}/{SITE_NAME}/check_mk/api/1.0" url = PATHS.get(type, '') return f"{base}/{url}" class CheckMKClient: def __init__(self, base_url: str, user: Optional[str] = None, password: Optional[str] = None, api_token: Optional[str] = None, verify: bool = True, ca_bundle: Optional[str] = None): self.base_url = base_url.rstrip('/') self.session = requests.Session() self.user = user self.password = password self.api_token = api_token self.verify = verify self.ca_bundle = ca_bundle or None # Use API token if provided (Check_MK uses 'Authorization: ' or 'OMD-LOGIN' depending on setup) if api_token: self.session.headers.update({'Authorization': api_token}) elif user and password: self.session.auth = (user, password) def _get(self, path: Optional[str], params: Optional[Dict[str, Any]] = None, url: Optional[str] = None) -> Dict[str, Any]: url = url or f"{self.base_url}/{path.lstrip('/')}" # Try cache first cache_key = json.dumps({'method': 'GET', 'url': url, 'params': params, 'verify': ( self.ca_bundle if self.ca_bundle else self.verify)}, sort_keys=True) cached = read_cache(cache_key) if cached is not None: # if session records last_get (used by tests), try to populate it try: self.session.last_get = dict(url=url, params=params, verify=(self.ca_bundle if self.ca_bundle else self.verify), timeout=10) except Exception: pass return cached resp = self.session.get( url, params=params, verify=(self.ca_bundle if self.ca_bundle else self.verify), timeout=10, ) resp.raise_for_status() # Try to parse JSON; some Check_MK endpoints (e.g., Livestatus) may return plain text try: data = resp.json() except ValueError: data = {'raw': resp.text} # write cache try: write_cache(cache_key, data) except Exception: pass return data def get_host_status(self, hostname: str) -> Dict[str, Any]: # Query the host_config collection endpoint to retrieve all hosts, # cache the full collection, and return the matching host from the cache. try: # Use the collection endpoint and these query params (as requested) url_path = PATHS.get("host_status", "") params = { 'effective_attributes': 'false', 'include_links': 'false', 'fields': '!(links)', 'site': 'monitoring', } # Build the collection URL from the configured base_url url = f"{self.base_url.rstrip('/')}/{url_path.lstrip('/')}" cache_key = json.dumps({'method': 'GET', 'url': url, 'params': params, 'verify': ( self.ca_bundle if self.ca_bundle else self.verify)}, sort_keys=True) # Try cached collection first cached = read_cache(cache_key) if cached is not None: data = cached else: resp = self.session.get( url, params=params, verify=(self.ca_bundle if self.ca_bundle else self.verify), timeout=20, ) resp.raise_for_status() try: data = resp.json() except ValueError: data = None # write full collection to cache (best-effort) try: write_cache(cache_key, data) except Exception: pass except Exception: return {} # Normalize the collection into an iterable list of host objects hosts = [] if data is None: data = {} if isinstance(data, dict): if 'result' in data: res = data.get('result') if isinstance(res, list): hosts = res elif isinstance(res, dict): hosts = list(res.values()) elif isinstance(data.get('hosts'), list): hosts = data.get('hosts') else: vals = [v for v in data.values() if isinstance(v, dict) or isinstance(v, list)] for v in vals: if isinstance(v, list): hosts.extend(v) elif isinstance(data, list): hosts = data # Find host by common keys in the collection for h in hosts: if not isinstance(h, dict): continue for key in ('id', 'name', 'host_name'): if key in h and h.get(key) == hostname: return h # If collection didn't yield a match, fall back to the host-specific endpoint try: params = {'columns': ['name', 'host_name', 'state'], '_pretty': 1} from urllib.parse import quote safe_name = quote(hostname, safe='') data2 = self._get( path=f'/api/1.0/objects/host/{safe_name}', params=params) except Exception: data2 = None if isinstance(data2, dict): if 'result' in data2: res = data2.get('result') if isinstance(res, list): return res[0] if res else {} if isinstance(res, dict): return res if any(k in data2 for k in ('name', 'host_name', 'state')): return data2 return {} def get_host_services(self, hostname: str) -> List[Dict[str, Any]]: # Use the collection POST endpoint to query services by host try: headers = {"Content-Type": "application/json"} payload = { "sites": ["monitoring"], "columns": ["host_name", "description", "state"], "query": {"op": "=", "left": "host_name", "right": hostname}, "host_name": hostname, } verify = self.ca_bundle if self.ca_bundle else self.verify # perform POST with JSON payload; build URL from configured base_url url = get_api_endpoint(hostname, 'host_services') cache_key = json.dumps( {'method': 'POST', 'url': url, 'json': payload, 'verify': verify}, sort_keys=True) cached = read_cache(cache_key) if cached is not None: # try to populate session.last_post for test introspection try: self.session.last_post = dict( url=url, headers=headers, json=payload, verify=verify, timeout=20) except Exception: pass # normalize cached response to a list of services (tests expect a list) if isinstance(cached, dict): for key in ('result', 'value', 'services'): res = cached.get(key) if isinstance(res, list): return res if isinstance(res, dict): return list(res.values()) if isinstance(cached, list): return cached return [] resp = self.session.post( url, headers={"Content-Type": "application/json"}, json=payload, verify=(self.ca_bundle if self.ca_bundle else self.verify), timeout=20, ) resp.raise_for_status() try: data = resp.json() except ValueError: data = None return [] # write cache try: write_cache(cache_key, data) except Exception: pass except Exception as e: return [] # data usually contains 'result' with a list of services if isinstance(data, dict): for key in ('result', 'value', 'services'): res = data.get(key) if isinstance(res, list): return res if isinstance(res, dict): return list(res.values()) if isinstance(data, list): return data return [] def get_service_detail(self, service_url: str) -> Dict[str, Any]: # Use the provided service URL to get detailed information about a specific service base_url = service_url.split('/api/1.0')[0] path = service_url.split('/api/1.0')[-1] try: data = self._get(url=service_url, path=path) except Exception: data = {} if isinstance(data, dict): return data if isinstance(data, list): if data: return data[0] return {} return {}