"""
.. role:: python(code)
:language: python
"""
from __future__ import annotations
import collections
import functools
import importlib
import io
import logging
import numbers
import os
import re
import sys
import traceback
import typing
from importlib import metadata
from logging import StreamHandler
import typing_extensions
__version__: str = metadata.version("mplugin")
[docs]
class CheckError(RuntimeError):
"""Abort check execution.
This exception should be raised if a plugin is unable to determine the
system status. Raising this exception will cause the plugin to display the
exception’s argument and exit with an ``UNKNOWN`` (``3``) status.
"""
pass
[docs]
class Timeout(RuntimeError):
"""Maximum check run time exceeded.
This exception is raised internally by ``mplugin`` if the runtime check takes
longer than allowed. The check is aborted and the plugin exits with an
``UNKNOWN`` (``3``) status.
"""
pass
[docs]
class ServiceState:
"""Abstract base class for all states.
Each state has two constant attributes:
- :attr:`code` is the corresponding exit code.
- :attr:`text` is the short text representation which is printed for
example at the beginning of the summary line.
:param code: The Plugin API compliant exit code. Must be ``0``, ``1``, ``2`` or ``3``.
:param text: The short text representation that is printed, for example, at
the beginning of the summary line.
"""
code: int
"""The Plugin API compliant exit code. Must be ``0``, ``1``, ``2`` or ``3``."""
text: str
"""The short text representation that is printed, for example, at
the beginning of the summary line."""
def __init__(self, code: int, text: str) -> None:
self.code = code
self.text = text
def __str__(self) -> str:
"""Plugin-API compliant text representation."""
return self.text
def __int__(self) -> int:
"""The Plugin API compliant exit code."""
return self.code
def __gt__(self, other: typing.Any) -> bool:
return (
hasattr(other, "code")
and isinstance(other.code, int)
and self.code > other.code
)
def __eq__(self, other: typing.Any) -> bool:
return (
hasattr(other, "code")
and isinstance(other.code, int)
and self.code == other.code
and hasattr(other, "text")
and isinstance(other.text, str)
and self.text == other.text
)
def __hash__(self) -> int:
return hash((self.code, self.text))
[docs]
@staticmethod
def worst(states: list["ServiceState"]) -> "ServiceState":
"""Reduce list of *states* to the most significant state."""
return functools.reduce(lambda a, b: a if a > b else b, states, ok)
[docs]
@staticmethod
def state(exit_code: int) -> ServiceState:
"""
Convert an exit code to a ServiceState.
:param exit_code: The exit code to convert. Must be ``0``, ``1``, ``2`` or ``3``.
:return: The corresponding ServiceState (``ok``, ``warn``, ``critical``, or ``unknown``).
:raises CheckError: If exit_code is greater than 3.
"""
if exit_code == 0:
return ok
elif exit_code == 1:
return warning
elif exit_code == 2:
return critical
elif exit_code == 3:
return unknown
raise CheckError(f"Exit code {exit_code} is > 3")
ok: ServiceState = ServiceState(0, "ok")
"""The plugin was able to check the service and it appeared to be functioning
properly."""
warning: ServiceState = ServiceState(1, "warning")
"""
The plugin was able to check the service, but it appeared to be above some
``warning`` threshold or did not appear to be working properly."""
critical: ServiceState = ServiceState(2, "critical")
"""The plugin detected that either the service was not running or it was above
some ``critical`` threshold."""
unknown: ServiceState = ServiceState(3, "unknown")
"""Invalid command line arguments were supplied to the plugin or low-level
failures internal to the plugin (such as unable to fork, or open a tcp socket)
that prevent it from performing the specified operation. Higher-level errors
(such as name resolution errors, socket timeouts, etc) are outside of the control
of plugins and should generally NOT be reported as ``unknown`` states.
The ``--help`` or ``--version`` output should also result in ``unknown`` state."""
RangeSpec = typing.Union[str, int, float, "Range"]
[docs]
class Range:
"""Represents a threshold range.
The general format is ``[@][start:][end]``. ``start:`` may be omitted if
``start==0``. ``~:`` means that start is negative infinity. If ``end`` is
omitted, infinity is assumed. To invert the match condition, prefix
the range expression with ``@``.
.. seealso::
See the
`Monitoring Plugin Guidelines Repository <https://github.com/monitoring-plugins/monitoring-plugin-guidelines/blob/main/definitions/01.range_expressions.md>`__
or the
`Monitoring Plugins Development Guidelines <https://www.monitoring-plugins.org/doc/guidelines.html#THRESHOLDFORMAT>`__
for details.
:param spec: may be either a string, a float, or another
Range object.
:param invert: If the true, the value exceeds the threshold if it is
INSIDE the range between start and end (including the endpoints).
:param start: The (inclusive) start point on a numeric scale (possibly
negative or negative infinity).
:param end: The (inclusive) end point on a numeric scale (possibly
negative or positive infinity).
"""
invert: bool
"""If the true, the value exceeds the threshold if it is INSIDE the range
between start and end (including the endpoints)."""
start: float
"""The (inclusive) start point on a numeric scale (possibly negative or
negative infinity)."""
end: float
"""The (inclusive) end point on a numeric scale (possibly negative or
positive infinity)."""
def __init__(
self,
spec: typing.Optional[RangeSpec] = None,
invert: typing.Optional[bool] = None,
start: typing.Optional[float] = None,
end: typing.Optional[float] = None,
) -> None:
"""Creates a Range object according to `spec`."""
if spec is not None and not (invert is None and start is None and end is None):
raise ValueError("Specify spec OR invert, start, end! not both")
if isinstance(spec, Range):
self.invert = spec.invert
self.start = spec.start
self.end = spec.end
elif isinstance(spec, int) or isinstance(spec, float):
self.invert = False
self.start = 0
self.end = spec
elif spec is None and not (invert is None and start is None and end is None):
if invert is not None:
self.invert = invert
else:
self.invert = False
if start is not None:
self.start = start
else:
self.start = 0
if end is not None:
self.end = end
else:
self.end = float("inf")
else:
if spec is None:
spec = ""
self.start, self.end, self.invert = Range._parse(str(spec))
Range._verify(self.start, self.end)
@classmethod
def _parse(cls, spec: str) -> tuple[float, float, bool]:
invert = False
start: float
start_str: str
end: float
end_str: str
if spec.startswith("@"):
invert = True
spec = spec[1:]
if ":" in spec:
start_str, end_str = spec.split(":")
else:
start_str, end_str = "", spec
if start_str == "~":
start = float("-inf")
else:
start = cls._parse_atom(start_str, 0)
end = cls._parse_atom(end_str, float("inf"))
return start, end, invert
@staticmethod
def _parse_atom(atom: str, default: float) -> float:
if atom == "":
return default
if "." in atom:
return float(atom)
return int(atom)
@staticmethod
def _verify(start: float, end: float) -> None:
"""Throws ValueError if the range is not consistent."""
if start > end:
raise ValueError("start %s must not be greater than end %s" % (start, end))
[docs]
def match(self, value: float) -> bool:
"""Decides if `value` is inside/outside the threshold.
:returns: `True` if value is inside the bounds for non-inverted
Ranges.
Also available as `in` operator.
"""
if value < self.start:
return False ^ self.invert
if value > self.end:
return False ^ self.invert
return True ^ self.invert
def __contains__(self, value: float) -> bool:
return self.match(value)
def _format(self, omit_zero_start: bool = True) -> str:
result: list[str] = []
if self.invert:
result.append("@")
if self.start == float("-inf"):
result.append("~:")
elif not omit_zero_start or self.start != 0:
result.append(("%s:" % self.start))
if self.end != float("inf"):
result.append(("%s" % self.end))
return "".join(result)
def __str__(self) -> str:
"""Human-readable range specification."""
return self._format()
def __repr__(self) -> str:
"""Parseable range specification."""
return "Range(%r)" % str(self)
def __eq__(self, value: object) -> bool:
if not isinstance(value, Range):
return False
return (
self.invert == value.invert
and self.start == self.start
and self.end == self.end
)
@property
def violation(self) -> str:
"""Human-readable description why a value does not match."""
return "outside range {0}".format(self._format(False))
class _Output:
ILLEGAL = "|"
logchan: StreamHandler[io.StringIO]
verbose: int
status: str
out: list[str]
warnings: list[str]
longperfdata: list[str]
def __init__(self, logchan: StreamHandler[io.StringIO], verbose: int = 0) -> None:
self.logchan = logchan
self.verbose = verbose
self.status = ""
self.out = []
self.warnings = []
self.longperfdata = []
def add(self, check: "Check") -> None:
self.status = self.format_status(check)
if self.verbose == 0:
perfdata = self.format_perfdata(check)
if perfdata:
self.status += " " + perfdata
else:
self.add_longoutput(check.verbose)
self.longperfdata.append(self.format_perfdata(check))
def format_status(self, check: "Check") -> str:
if check.name:
name_prefix = check.name.upper() + " "
else:
name_prefix = ""
summary_str = check.summary.strip()
return self._screen_chars(
"{0}{1}{2}".format(
name_prefix,
str(check.state).upper(),
" - " + summary_str if summary_str else "",
),
"status line",
)
def format_perfdata(self, check: "Check") -> str:
if not check.perfdata:
return ""
out = " ".join(check.perfdata)
return "| " + self._screen_chars(out, "perfdata")
def add_longoutput(self, text: str | list[str] | tuple[str, ...]) -> None:
if isinstance(text, (list, tuple)):
for line in text:
self.add_longoutput(line)
else:
self.out.append(self._screen_chars(text, "long output"))
def __str__(self) -> str:
output: list[str] = [
elem
for elem in [self.status]
+ self.out
+ [self._screen_chars(self.logchan.stream.getvalue(), "logging output")]
+ self.warnings
+ self.longperfdata
if elem
]
return "\n".join(output) + "\n"
@staticmethod
def _filter_output(output: str, filtered: str) -> str:
"""Filters out characters from output"""
for char in filtered:
output = output.replace(char, "")
return output
def _screen_chars(self, text: str, where: str) -> str:
text = text.rstrip("\n")
screened = _Output._filter_output(text, self.ILLEGAL)
if screened != text:
self.warnings.append(
self._illegal_chars_warning(where, set(text) - set(screened))
)
return screened
@staticmethod
def _illegal_chars_warning(where: str, removed_chars: set[str]) -> str:
hex_chars = ", ".join("0x{0:x}".format(ord(c)) for c in removed_chars)
return "warning: removed illegal characters ({0}) from {1}".format(
hex_chars, where
)
P = typing.ParamSpec("P")
R = typing.TypeVar("R")
[docs]
def guarded(
original_function: typing.Any = None, verbose: typing.Any = None
) -> typing.Any:
"""Runs a function mplugin's Runtime environment.
`guarded` makes the decorated function behave correctly with respect
to the monitoring plugin API if it aborts with an uncaught exception or
a timeout. It exits with an *unknown* exit code and prints a
traceback in a format acceptable by monitoring solution.
This function should be used as a decorator for the script's `main`
function.
:param verbose: Optional keyword parameter to control verbosity
level during early execution (before
:meth:`~mplugin.main` has been called). For example,
use `@guarded(verbose=0)` to turn tracebacks in that phase off.
"""
def _decorate(func: typing.Callable[P, R]):
@functools.wraps(func)
def wrapper(*args: typing.Any, **kwds: typing.Any):
runtime = _Runtime()
if verbose is not None:
runtime.verbose = verbose
try:
return func(*args, **kwds)
except Timeout as exc:
runtime._handle_exception( # type: ignore
"Timeout: check execution aborted after {0}".format(exc)
)
except Exception:
runtime._handle_exception() # type: ignore
return wrapper
if original_function is not None:
assert callable(original_function), (
'Function {!r} not callable. Forgot to add "verbose=" keyword?'.format(
original_function
)
)
return _decorate(original_function)
return _decorate # type: ignore
class _Runtime:
"""Functions and classes to interface with the system.
This module contains the :class:`Runtime` class that handles exceptions,
timeouts and logging. Plugin authors should not use Runtime directly,
but decorate the plugin's main function with :func:`~.runtime.guarded`.
"""
instance: typing.Optional[typing_extensions.Self] = None # type: ignore
check: typing.Optional["Check"] = None
_verbose = 1
_colorize: bool = False
"""Use ANSI colors to colorize the logging output"""
timeout: typing.Optional[int] = None
logchan: logging.StreamHandler[io.StringIO]
output: _Output
stdout: typing.Optional[io.StringIO] = None
exitcode: int = 70 # EX_SOFTWARE
def __new__(cls) -> typing_extensions.Self:
if not cls.instance:
cls.instance: typing_extensions.Self = super(_Runtime, cls).__new__(cls)
return cls.instance
def __init__(self) -> None:
rootlogger = logging.getLogger("mplugin")
rootlogger.setLevel(logging.DEBUG)
self.logchan = logging.StreamHandler(io.StringIO())
self.logchan.setFormatter(logging.Formatter("%(message)s"))
rootlogger.addHandler(self.logchan)
self.output = _Output(self.logchan)
def _handle_exception(
self, statusline: typing.Optional[str] = None
) -> typing.NoReturn:
exc_type, value = sys.exc_info()[0:2]
name = self.check.name.upper() + " " if self.check else ""
self.output.status = "{0}UNKNOWN: {1}".format(
name,
statusline or traceback.format_exception_only(exc_type, value)[0].strip(),
)
if self.verbose > 0:
self.output.add_longoutput(traceback.format_exc())
print("{0}".format(self.output), end="", file=self.stdout)
self.exitcode = 3
self.sysexit()
@property
def verbose(self) -> int:
return self._verbose
@verbose.setter
def verbose(self, verbose: typing.Any) -> None:
if isinstance(verbose, int):
self._verbose = verbose
elif isinstance(verbose, float):
self._verbose = int(verbose)
else:
self._verbose = len(verbose or [])
if self._verbose >= 3:
self.logchan.setLevel(logging.DEBUG)
self._verbose = 3
elif self._verbose == 2:
self.logchan.setLevel(logging.INFO)
else:
self.logchan.setLevel(logging.WARNING)
self.output.verbose = self._verbose
@property
def colorize(self) -> int:
return self._colorize
@colorize.setter
def colorize(self, colorize: bool) -> None:
self._colorize = colorize
if colorize:
self.logchan.setFormatter(self._AnsiColorFormatter("%(message)s"))
else:
self.logchan.setFormatter(logging.Formatter("%(message)s"))
def run(self, check: "Check") -> None:
check()
self.output.add(check)
self.exitcode = check.exitcode
@staticmethod
def _with_timeout(
time: int, func: typing.Callable[P, R], *args: typing.Any, **kwargs: typing.Any
) -> None:
"""Call `func` but terminate after `t` seconds."""
if os.name == "posix":
signal = importlib.import_module("signal")
def timeout_handler(signum: int, frame: typing.Any) -> typing.NoReturn:
raise Timeout("{0}s".format(time))
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(time)
try:
func(*args, **kwargs)
finally:
signal.alarm(0)
if os.name == "nt":
# We use a thread here since NT systems don't have POSIX signals.
threading = importlib.import_module("threading")
func_thread = threading.Thread(target=func, args=args, kwargs=kwargs)
func_thread.daemon = True # quit interpreter even if still running
func_thread.start()
func_thread.join(time)
if func_thread.is_alive():
raise Timeout("{0}s".format(time))
def execute(
self,
check: "Check",
verbose: typing.Any = None,
timeout: typing.Any = None,
colorize: bool = False,
) -> typing.NoReturn:
self.check = check
if verbose is not None:
self.verbose = verbose
if timeout is not None:
self.timeout = int(timeout)
if colorize:
self.colorize = True
if self.timeout:
_Runtime._with_timeout(self.timeout, self.run, check)
else:
self.run(check)
print("{0}".format(self.output), end="", file=self.stdout)
self.sysexit()
def sysexit(self) -> typing.NoReturn:
sys.exit(self.exitcode)
class _AnsiColorFormatter(logging.Formatter):
"""https://medium.com/@kamilmatejuk/inside-python-colorful-logging-ad3a74442cc6"""
def format(self, record: logging.LogRecord) -> str:
no_style = "\033[0m"
bold = "\033[91m"
grey = "\033[90m"
yellow = "\033[93m"
red = "\033[31m"
red_light = "\033[91m"
blue = "\033[34m"
start_style = {
"DEBUG": grey,
"INFO": blue,
"WARNING": yellow,
"ERROR": red,
"CRITICAL": red_light + bold,
}.get(record.levelname, no_style)
end_style = no_style
return f"{start_style}{super().format(record)}{end_style}"
[docs]
class Metric:
"""Single measured value. Structured representation for data points.
Instances of ths class are passed as value objects between most of
mplugin's core classes. Typically, :class:`~.Resource` objects
emit a list of metrics as result of their :meth:`~.Resource.probe`
methods.
The value should be expressed in terms of base units, so
:python:`Metric('swap', 10240, 'B')`
is better than
:python:`Metric('swap', 10, 'kiB')`.
:param name: A short internal identifier for the value -- appears
also in the performance data.
:param value: A data point. This value vsually has a boolen or numeric type,
but other types are also possible.
:param uom: :term:`unit of measure`, preferrably as ISO
abbreviation like ``s``.
:param min: The minimum value or ``None`` if there is no known minimum.
:param max: The maximum value or ``None`` if there is no known maximum.
:param context: The name of the associated :class:`~.Context` (defaults to the
metric’s name if left out).
"""
name: str
"""A short internal identifier for the value -- appears also in the
performance data."""
value: typing.Any
"""A data point. This value vsually has a boolen or numeric type,
but other types are also possible."""
uom: typing.Optional[str] = None
""":term:`unit of measure`, preferrably as ISO
abbreviation like ``s``."""
min: typing.Optional[float] = None
"""The minimum value or ``None`` if there is no known minimum."""
max: typing.Optional[float] = None
"""The maximum value or ``None`` if there is no known maximum."""
context_name: str
"""The name of the associated :class:`~.Context` (defaults to the
metric’s name if left out)."""
__context: typing.Optional["Context"] = None
__resource: typing.Optional["Resource"] = None
# Changing these now would be API-breaking, so we'll ignore these
# shadowed built-ins
# pylint: disable-next=redefined-builtin
def __init__(
self,
name: str,
value: typing.Any,
uom: typing.Optional[str] = None,
min: typing.Optional[float] = None,
max: typing.Optional[float] = None,
context: typing.Optional[typing.Union[str, Context]] = None,
resource: typing.Optional[Resource] = None,
) -> None:
"""Creates new Metric instance."""
self.name = name
self.value = value
self.uom = uom
self.min = min
self.max = max
if context is not None:
if isinstance(context, str):
self.context_name = context
if isinstance(context, Context):
self.context_name = context.name
self.__context = context
else:
self.context_name = name
if resource is not None:
self.__resource = resource
def __str__(self) -> str:
"""Same as :attr:`valueunit`."""
return self.valueunit
@property
def resource(self) -> Resource:
if not self.__resource:
raise RuntimeError("no resource set for metric", self.name)
return self.__resource
@resource.setter
def resource(self, resource: Resource) -> None:
self.__resource = resource
@property
def context(self) -> Context:
if not self.__context:
raise RuntimeError("no context set for metric", self.name)
return self.__context
@context.setter
def context(self, context: Context) -> None:
self.__context = context
@property
def description(self) -> typing.Optional[str]:
"""Human-readable, detailed string representation.
Delegates to the :class:`~.context.Context` to format the value.
:returns: :meth:`~.context.Context.describe` output or
:attr:`valueunit` if no context has been associated yet
"""
if self.__context:
return self.__context.describe(self)
return str(self)
@property
def valueunit(self) -> str:
"""Compact string representation.
This is just the value and the unit. If the value is a real
number, express the value with a limited number of digits to
improve readability.
"""
return "%s%s" % (self._human_readable_value, self.uom or "")
@property
def _human_readable_value(self) -> str:
"""Limit number of digits for floats."""
if isinstance(self.value, numbers.Real) and not isinstance(
self.value, numbers.Integral
):
return "%.4g" % self.value
return str(self.value)
[docs]
def evaluate(self) -> typing.Union["Result", "ServiceState"]:
"""Evaluates this instance according to the context.
:return: :class:`~mplugin.Result` object
:raise RuntimeError: if no context has been associated yet
"""
return self.context.evaluate(self, self.resource)
[docs]
class Resource:
"""Abstract base class for custom domain models.
Domain model for data :term:`acquisition`.
:class:`Resource` is the base class for the plugin's :term:`domain
model`. It shoul model the relevant details of reality that a plugin is
supposed to check. The :class:`~.check.Check` controller calls
:meth:`Resource.probe` on all passed resource objects to acquire data.
Plugin authors should subclass :class:`Resource` and write
whatever methods are needed to get the interesting bits of information.
The most important resource subclass should be named after the plugin
itself.
Subclasses may add arguments to the constructor to parametrize
information retrieval.
"""
@property
def name(self) -> str:
return self.__class__.__name__
[docs]
def probe(
self,
) -> typing.Union[list["Metric"], "Metric", typing.Generator["Metric", None, None]]:
"""Query system state and return metrics.
This is the only method called by the check controller.
It should trigger all necessary actions and create metrics.
A plugin can perform several measurements at once.
.. code-block:: Python
def probe(self):
self.users = self.list_users()
self.unique_users = set(self.users)
return [
Metric("total", len(self.users), min=0, context="users"),
Metric("unique", len(self.unique_users), min=0, context="users"),
]
Alternatively, the probe() method can act as generator and yield metrics:
.. code-block:: Python
def probe(self):
self.users = self.list_users()
self.unique_users = set(self.users)
yield Metric('total', len(self.users), min=0,
context='users')
yield Metric('unique', len(self.unique_users), min=0,
context='users')]
:return: list of :class:`~mplugin.Metric` objects,
or generator that emits :class:`~mplugin.Metric`
objects, or single :class:`~mplugin.Metric`
object
"""
return []
[docs]
class Result:
"""Evaluation outcome consisting of state and explanation.
A Result object is typically emitted by a
:class:`~mplugin.Context` object and represents the
outcome of an evaluation. It contains a
:class:`~mplugin.state.ServiceState` as well as an explanation.
Plugin authors may subclass Result to implement specific features.
Outcomes from evaluating metrics in contexts.
The :class:`Result` class is the base class for all evaluation results.
The :class:`Results` class (plural form) provides a result container with
access functions and iterators.
Plugin authors may create their own :class:`Result` subclass to
accomodate for special needs.
"""
state: "ServiceState"
hint: typing.Optional[str]
metric: typing.Optional["Metric"]
def __init__(
self,
state: "ServiceState",
hint: typing.Optional[str] = None,
metric: typing.Optional["Metric"] = None,
) -> None:
self.state = state
self.hint = hint
self.metric = metric
def __str__(self) -> str:
"""Textual result explanation.
The result explanation is taken from :attr:`metric.description`
(if a metric has been passed to the constructur), followed
optionally by the value of :attr:`hint`. This method's output
should consist only of a text for the reason but not for the
result's state. The latter is rendered independently.
:returns: result explanation or empty string
"""
if self.metric and self.metric.description:
desc = self.metric.description
else:
desc = None
if self.hint and desc:
return "{0} ({1})".format(desc, self.hint)
if self.hint:
return self.hint
if desc:
return desc
return ""
@property
def resource(self) -> typing.Optional["Resource"]:
"""Reference to the resource used to generate this result."""
if not self.metric:
return None
return self.metric.resource
@property
def context(self) -> typing.Optional["Context"]:
"""Reference to the metric used to generate this result."""
if not self.metric:
return None
return self.metric.context
def __eq__(self, value: object) -> bool:
if not isinstance(value, Result):
return False
return (
self.state == value.state
and self.hint == value.hint
and self.metric == value.metric
)
[docs]
class Results:
"""Container for result sets.
Basically, this class manages a set of results and provides
convenient access methods by index, name, or result state. It is
meant to make queries in :class:`~.summary.Summary`
implementations compact and readable.
The constructor accepts an arbitrary number of result objects and
adds them to the container.
"""
results: list[Result]
by_state: dict["ServiceState", list[Result]]
by_name: dict[str, Result]
def __init__(self, *results: Result) -> None:
self.results = []
self.by_state = collections.defaultdict(list)
self.by_name = {}
if results:
self.add(*results)
[docs]
def add(self, *results: Result) -> typing_extensions.Self:
"""Adds more results to the container.
Besides passing :class:`Result` objects in the constructor,
additional results may be added after creating the container.
:raises ValueError: if `result` is not a :class:`Result` object
"""
for result in results:
if not isinstance(result, Result): # type: ignore
raise ValueError(
"trying to add non-Result to Results container", result
)
self.results.append(result)
self.by_state[result.state].append(result)
try:
self.by_name[result.metric.name] = result # type: ignore
except AttributeError:
pass
return self
def __iter__(self) -> typing.Generator[Result, typing.Any, None]:
"""Iterates over all results.
The iterator is sorted in order of decreasing state
significance (unknown > critical > warning > ok).
:returns: result object iterator
"""
for state in reversed(sorted(self.by_state)):
for result in self.by_state[state]:
yield result
def __len__(self) -> int:
"""Number of results in this container."""
return len(self.results)
def __getitem__(self, item: typing.Union[int, str]) -> Result:
"""Access result by index or name.
If *item* is an integer, the itemth element in the
container is returned. If *item* is a string, it is used to
look up a result with the given name.
:raises KeyError: if no matching result is found
"""
if isinstance(item, int):
return self.results[item]
return self.by_name[item]
def __contains__(self, name: str) -> bool:
"""Tests if a result with given name is present.
:returns: boolean
"""
return name in self.by_name
@property
def most_significant_state(self) -> "ServiceState":
"""The "worst" state found in all results.
:returns: :obj:`~mplugin.state.ServiceState` object
:raises ValueError: if no results are present
"""
return max(self.by_state.keys())
@property
def most_significant(self) -> list[Result]:
"""Returns list of results with most significant state.
From all results present, a subset with the "worst" state is
selected.
:returns: list of :class:`Result` objects or empty list if no
results are present
"""
try:
return self.by_state[self.most_significant_state]
except ValueError:
return []
@property
def first_significant(self) -> Result:
"""Selects one of the results with most significant state.
:returns: :class:`Result` object
:raises IndexError: if no results are present
"""
return self.most_significant[0]
[docs]
class Summary:
"""Creates a summary formatter object.
Create status line from results.
This module contains the :class:`Summary` class which serves as base
class to get a status line from the check's :class:`~.result.Results`. A
Summary object is used by :class:`~.check.Check` to obtain a suitable data
:term:`presentation` depending on the check's overall state.
Plugin authors may either stick to the default implementation or subclass it
to adapt it to the check's domain. The status line is probably the most
important piece of text returned from a check: It must lead directly to the
problem in the most concise way. So while the default implementation is quite
usable, plugin authors should consider subclassing to provide a specific
implementation that gets the output to the point.
This base class takes no parameters in its constructor, but subclasses may
provide more elaborate constructors that accept parameters to influence
output creation.
"""
[docs]
def ok(self, results: "Results") -> str:
"""Formats status line when overall state is ok.
The default implementation returns a string representation of
the first result.
:param results: :class:`~mplugin.Results` container
:returns: status line
"""
return "{0}".format(results[0])
[docs]
def problem(self, results: "Results") -> str:
"""Formats status line when overall state is not ok.
The default implementation returns a string representation of te
first significant result, i.e. the result with the "worst"
state.
:param results: :class:`~.result.Results` container
:returns: status line
"""
return "{0}".format(results.first_significant)
[docs]
def verbose(
self, results: "Results"
) -> typing.Union[str, list[str], tuple[str, ...]]:
"""Provides extra lines if verbose plugin execution is requested.
The default implementation returns a list of all resources that are in
a non-ok state.
:param results: :class:`~.result.Results` container
:returns: list of strings
"""
msgs: list[str] = []
for result in results:
if result.state == ok:
continue
msgs.append("{0}: {1}".format(result.state, result))
return msgs
[docs]
def empty(self) -> typing.Literal["no check results"]:
"""Formats status line when the result set is empty.
:returns: status line
"""
return "no check results"
FmtMetric = str | typing.Callable[["Metric", "Context"], str]
[docs]
class Context:
"""Creates generic context identified by `name`.
Metadata about metrics to perform data :term:`evaluation`.
This module contains the :class:`Context` class, which is the base for
all contexts. :class:`ScalarContext` is an important specialization to
cover numeric contexts with warning and critical thresholds. The
:class:`~.check.Check` controller selects a context for each
:class:`~.metric.Metric` by matching the metric's `context` attribute with the
context's `name`. The same context may be used for several metrics.
Plugin authors may just use to :class:`ScalarContext` in the majority of cases.
Sometimes is better to subclass :class:`Context` instead to implement custom
evaluation or performance data logic.
Generic contexts just format associated metrics and evaluate
always to :obj:`~mplugin.ok`. Metric formatting is
controlled with the :attr:`fmt_metric` attribute. It can either
be a string or a callable. See the :meth:`describe` method for
how formatting is done.
:param name: A context name that is matched by the context
attribute of :class:`~mplugin.Metric`
:param fmt_metric: string or callable to convert
context and associated metric to a human readable string
"""
name: str
fmt_metric: typing.Optional[FmtMetric]
def __init__(
self,
name: str,
fmt_metric: typing.Optional[FmtMetric] = None,
) -> None:
self.name = name
self.fmt_metric = fmt_metric
[docs]
def evaluate(
self, metric: "Metric", resource: "Resource"
) -> typing.Union[Result, ServiceState]:
"""Determines state of a given metric.
This base implementation returns :class:`~mplugin.ok`
in all cases. Plugin authors may override this method in
subclasses to specialize behaviour.
:param metric: associated metric that is to be evaluated
:param resource: resource that produced the associated metric
(may optionally be consulted)
:returns: :class:`~.result.Result` or
:class:`~.state.ServiceState` object
"""
return self.result(ok, metric=metric)
[docs]
def result(
self,
state: ServiceState,
hint: typing.Optional[str] = None,
metric: typing.Optional["Metric"] = None,
) -> Result:
"""
Create a Result object with the given state, hint, and metric.
:param state: The service state for the result.
:param hint: An optional hint message providing additional context.
:param metric: An optional Metric object associated with the result.
:return: A Result object containing the provided state, hint, and metric.
"""
return Result(state=state, hint=hint, metric=metric)
[docs]
def ok(
self,
hint: typing.Optional[str] = None,
metric: typing.Optional["Metric"] = None,
) -> Result:
"""
Create a successful Result.
:param hint: Optional hint message providing additional context about the successful operation.
:param metric: Optional Metric object associated with this result.
:return: A Result object representing a successful operation.
"""
return self.result(ok, hint=hint, metric=metric)
[docs]
def warning(
self,
hint: typing.Optional[str] = None,
metric: typing.Optional["Metric"] = None,
) -> Result:
"""
Create a warning result.
:param hint: Optional hint message to provide additional context for the warning.
:param metric: Optional metric associated with the warning.
:return: A Result object representing a warning.
"""
return self.result(warning, hint=hint, metric=metric)
[docs]
def critical(
self,
hint: typing.Optional[str] = None,
metric: typing.Optional["Metric"] = None,
) -> Result:
"""
Create a critical result.
:param hint: Optional hint message providing additional context about the critical result.
:param metric: Optional metric object associated with this critical result.
:return: A Result object representing a critical state.
"""
return self.result(critical, hint=hint, metric=metric)
[docs]
def unknown(
self,
hint: typing.Optional[str] = None,
metric: typing.Optional["Metric"] = None,
) -> Result:
"""
Create a Result object with an unknown status.
:param hint: Optional hint message providing additional context about why the result is unknown
:param metric: Optional Metric object associated with this result
:return: A Result object with unknown status
"""
return self.result(unknown, hint=hint, metric=metric)
[docs]
def performance(
self, metric: "Metric", resource: "Resource"
) -> typing.Optional[
typing.Union[
Performance,
typing.Sequence[Performance],
typing.Generator[Performance, typing.Any, None],
]
]:
"""Derives performance data from a given metric.
This base implementation just returns none. Plugin authors may
override this method in subclass to specialize behaviour.
.. code-block:: python
def performance(self, metric: Metric, resource: Resource) -> Performance:
return Performance(label=metric.name, value=metric.value)
.. code-block:: python
def performance(
self, metric: Metric, resource: Resource
) -> Performance | None:
if not opts.performance_data:
return None
return Performance(
metric.name,
metric.value,
metric.uom,
self.warning,
self.critical,
metric.min,
metric.max,
)
:param metric: associated metric from which performance data are
derived
:param resource: resource that produced the associated metric
(may optionally be consulted)
:returns: :class:`~.performance.Performance` object or `None`
"""
return None
[docs]
def describe(self, metric: "Metric") -> typing.Optional[str]:
"""Provides human-readable metric description.
Formats the metric according to the :attr:`fmt_metric`
attribute. If :attr:`fmt_metric` is a string, it is evaluated as
format string with all metric attributes in the root namespace.
If :attr:`fmt_metric` is callable, it is called with the metric
and this context as arguments. If :attr:`fmt_metric` is not set,
this default implementation does not return a description.
Plugin authors may override this method in subclasses to control
text output more tightly.
:param metric: associated metric
:returns: description string or None
"""
if not self.fmt_metric:
return None
if isinstance(self.fmt_metric, str):
return self.fmt_metric.format(
name=metric.name,
value=metric.value,
uom=metric.uom,
valueunit=metric.valueunit,
min=metric.min,
max=metric.max,
)
return self.fmt_metric(metric, self)
[docs]
class ScalarContext(Context):
warn_range: Range
critical_range: Range
def __init__(
self,
name: str,
warning: typing.Optional[RangeSpec] = None,
critical: typing.Optional[RangeSpec] = None,
fmt_metric: FmtMetric = "{name} is {valueunit}",
) -> None:
"""Ready-to-use :class:`Context` subclass for scalar values.
ScalarContext models the common case where a single scalar is to
be evaluated against a pair of warning and critical thresholds.
:attr:`name` and :attr:`fmt_metric`,
are described in the :class:`Context` base class.
:param warning: Warning threshold as
:class:`~mplugin.Range` object or range string.
:param critical: Critical threshold as
:class:`~mplugin.Range` object or range string.
"""
super(ScalarContext, self).__init__(name, fmt_metric)
self.warn_range = Range(warning)
self.critical_range = Range(critical)
[docs]
def evaluate(self, metric: "Metric", resource: "Resource") -> Result:
"""Compares metric with ranges and determines result state.
The metric's value is compared to the instance's :attr:`warning`
and :attr:`critical` ranges, yielding an appropropiate state
depending on how the metric fits in the ranges. Plugin authors
may override this method in subclasses to provide custom
evaluation logic.
:param metric: metric that is to be evaluated
:param resource: not used
:returns: :class:`~mplugin.Result` object
"""
if not self.critical_range.match(metric.value):
return self.critical(self.critical_range.violation, metric)
if not self.warn_range.match(metric.value):
return self.warning(self.warn_range.violation, metric)
return self.ok(None, metric)
[docs]
def performance(self, metric: "Metric", resource: "Resource") -> Performance:
"""Derives performance data.
The metric's attributes are combined with the local
:attr:`warning` and :attr:`critical` ranges to get a
fully populated :class:`~mplugin.performance.Performance`
object.
:param metric: metric from which performance data are derived
:param resource: not used
:returns: :class:`~mplugin.performance.Performance` object
"""
return Performance(
metric.name,
metric.value,
metric.uom,
self.warn_range,
self.critical_range,
metric.min,
metric.max,
)
class _Contexts:
"""Container for collecting all generated contexts."""
by_name: dict[str, Context]
def __init__(self) -> None:
self.by_name = dict(
default=ScalarContext("default", "", ""), null=Context("null")
)
def add(self, context: Context) -> None:
self.by_name[context.name] = context
def __getitem__(self, context_name: str) -> Context:
try:
return self.by_name[context_name]
except KeyError:
raise KeyError(
"cannot find context",
context_name,
"known contexts: {0}".format(", ".join(self.by_name.keys())),
)
def __contains__(self, context_name: str) -> bool:
return context_name in self.by_name
def __iter__(self) -> typing.Iterator[str]:
return iter(self.by_name)
log: logging.Logger = logging.getLogger("mplugin")
"""
**mplugin** integrates with the logging module from Python's standard
library. If the main function is decorated with :meth:`guarded` (which is heavily
recommended), the logging module gets automatically configured before the
execution of the `main()` function starts. Messages logged to the *mplugin*
logger (or any sublogger) are processed with mplugin's integrated logging.
The verbosity level is set in the :meth:`check.main()` invocation depending on
the number of ``-v`` flags.
When called with *verbose=0,* both the summary and the performance data are
printed on one line and the warning message is displayed. Messages logged with
*warning* or *error* level are always printed.
Setting *verbose* to 1 does not change the logging level but enable multi-line
output. Additionally, full tracebacks would be printed in the case of an
uncaught exception.
Verbosity levels of ``2`` and ``3`` enable logging with *info* or *debug* levels.
"""
[docs]
class Check:
"""Controller logic for check execution.
The class :class:`Check` orchestrates the
the various stages of check execution. Interfacing with the
outside system is done via a separate :class:`Runtime` object.
When a check is called (using :meth:`Check.main` or
:meth:`Check.__call__`), it probes all resources and evaluates the
returned metrics to results and performance data. A typical usage
pattern would be to populate a check with domain objects and then
delegate control to it.
"""
resources: list[Resource]
contexts: _Contexts
_summary: Summary
results: Results
perfdata: list[str]
name: str
def __init__(
self,
*objects: Resource | Context | Summary | Results,
name: typing.Optional[str] = None,
) -> None:
"""Creates and configures a check.
Specialized *objects* representing resources, contexts,
summary, or results are passed to the the :meth:`add` method.
Alternatively, objects can be added later manually.
If no *name* is given, the output prefix is set to the first
resource's name. If *name* is None, no prefix is set at all.
"""
self.resources = []
self.contexts = _Contexts()
self._summary = Summary()
self.results = Results()
self.perfdata = []
if name is not None:
self.name = name
else:
self.name = ""
self.add(*objects)
[docs]
def add(self, *objects: Resource | Context | Summary | Results):
"""Adds domain objects to a check.
:param objects: one or more objects that are descendants from
:class:`~mplugin.Resource`,
:class:`~mplugin.Context`,
:class:`~mplugin.Summary`, or
:class:`~mplugin.Results`.
"""
for obj in objects:
if isinstance(obj, Resource):
self.resources.append(obj)
if self.name is None: # type: ignore
self.name = ""
elif self.name == "":
self.name = self.resources[0].name
elif isinstance(obj, Context):
self.contexts.add(obj)
elif isinstance(obj, Summary):
self._summary = obj
elif isinstance(obj, Results): # type: ignore
self.results = obj
else:
raise TypeError("cannot add type {0} to check".format(type(obj)), obj)
return self
def _evaluate_resource(self, resource: Resource) -> None:
metric = None
try:
metrics = resource.probe()
if not metrics:
log.warning("resource %s did not produce any metric", resource.name)
if isinstance(metrics, Metric):
# resource returned a bare metric instead of list/generator
metrics = [metrics]
for metric in metrics:
context = self.contexts[metric.context_name]
metric.context = context
metric.resource = resource
result = metric.evaluate()
if isinstance(result, Result):
self.results.add(result)
elif isinstance(result, ServiceState): # type: ignore
self.results.add(Result(result, metric=metric))
else:
raise ValueError(
"evaluate() returned neither Result nor ServiceState object",
metric.name,
result,
)
for performance in metric.performance():
self.perfdata.append(str(performance))
except CheckError as e:
self.results.add(Result(unknown, str(e), metric))
def __call__(self) -> None:
"""Actually run the check.
After a check has been called, the :attr:`results` and
:attr:`perfdata` attributes are populated with the outcomes. In
most cases, you should not use __call__ directly but invoke
:meth:`main`, which delegates check execution to the
:class:`Runtime` environment.
"""
for resource in self.resources:
self._evaluate_resource(resource)
self.perfdata = sorted([p for p in self.perfdata if p])
[docs]
def main(
self,
verbose: typing.Any = None,
timeout: typing.Any = None,
colorize: bool = False,
) -> typing.NoReturn:
"""All-in-one control delegation to the runtime environment.
Get a :class:`_Runtime` instance and
perform all phases: run the check (via :meth:`__call__`), print
results and exit the program with an appropriate status code.
:param verbose: output verbosity level between 0 and 3
:param timeout: abort check execution with a :exc:`Timeout`
exception after so many seconds (use 0 for no timeout)
:param colorize: Use ANSI colors to colorize the logging output
"""
runtime = _Runtime()
runtime.execute(self, verbose=verbose, timeout=timeout, colorize=colorize)
@property
def state(self) -> ServiceState:
"""Overall check state.
The most significant (=worst) state seen in :attr:`results` to
far. :obj:`unknown` if no results have been
collected yet. Corresponds with :attr:`~Check.exitcode`. Read-only
property.
"""
try:
return self.results.most_significant_state
except ValueError:
return unknown
@property
def summary(self) -> str:
"""Status line summary string.
The first line of output that summarizes that situation as
perceived by the check. The string is usually queried from a
:class:`Summary` object. Read-only property.
"""
if not self.results:
return self._summary.empty() or ""
if self.state == ok:
return self._summary.ok(self.results) or ""
return self._summary.problem(self.results) or ""
@property
def verbose(self) -> typing.Union[str, list[str], tuple[str, ...]]:
"""Additional lines of output.
Long text output if check runs in verbose mode. Also queried
from :class:`Summary`. Read-only property.
"""
return self._summary.verbose(self.results) or ""
@property
def exitcode(self) -> int:
"""Overall check exit code according to the monitoring API.
Corresponds with :py:attr:`~Check.state`. Read-only property.
"""
try:
return int(self.results.most_significant_state)
except ValueError:
return 3