File: //proc/self/root/opt/saltstack/salt/lib/python3.10/site-packages/salt/client/mixins.py
"""
A collection of mixins useful for the various *Client interfaces
"""
import copy
import fnmatch
import logging
import os
import signal
import traceback
import weakref
from collections.abc import Mapping, MutableMapping
import salt._logging
import salt.channel.client
import salt.exceptions
import salt.ext.tornado.stack_context
import salt.minion
import salt.output
import salt.utils.args
import salt.utils.doc
import salt.utils.error
import salt.utils.event
import salt.utils.jid
import salt.utils.job
import salt.utils.lazy
import salt.utils.platform
import salt.utils.process
import salt.utils.state
import salt.utils.user
import salt.utils.versions
log = logging.getLogger(__name__)
CLIENT_INTERNAL_KEYWORDS = frozenset(
[
"client",
"cmd",
"eauth",
"fun",
"kwarg",
"match",
"token",
"__jid__",
"__tag__",
"__user__",
"username",
"password",
"full_return",
"print_event",
]
)
class ClientFuncsDict(MutableMapping):
"""
Class to make a read-only dict for accessing runner funcs "directly"
"""
def __init__(self, client):
self.client = client
def __getattr__(self, attr):
"""
Provide access eg. to 'pack'
"""
return getattr(self.client.functions, attr)
def __setitem__(self, key, val):
raise NotImplementedError()
def __delitem__(self, key):
raise NotImplementedError()
def __getitem__(self, key):
"""
Return a function that you can call with regular func params, but
will do all the _proc_function magic
"""
if key not in self.client.functions:
raise KeyError
def wrapper(*args, **kwargs):
low = {
"fun": key,
"args": args,
"kwargs": kwargs,
}
pub_data = {}
# Copy kwargs keys so we can iterate over and pop the pub data
kwargs_keys = list(kwargs)
# pull out pub_data if you have it
for kwargs_key in kwargs_keys:
if kwargs_key.startswith("__pub_"):
pub_data[kwargs_key] = kwargs.pop(kwargs_key)
async_pub = self.client._gen_async_pub(pub_data.get("__pub_jid"))
user = salt.utils.user.get_specific_user()
return self.client._proc_function(
instance=self.client,
opts=self.client.opts,
fun=key,
low=low,
user=user,
tag=async_pub["tag"],
jid=async_pub["jid"],
daemonize=False,
)
return wrapper
def __len__(self):
return len(self.client.functions)
def __iter__(self):
return iter(self.client.functions)
class ClientStateMixin:
def __init__(self, opts, context=None):
self.opts = opts
if context is None:
context = {}
self.context = context
# __setstate__ and __getstate__ are only used on spawning platforms.
def __getstate__(self):
return {
"opts": self.opts,
"context": self.context or None,
}
def __setstate__(self, state):
# If __setstate__ is getting called it means this is running on a new process.
self.__init__(state["opts"], context=state["context"])
class SyncClientMixin(ClientStateMixin):
"""
A mixin for *Client interfaces to abstract common function execution
"""
functions = ()
def functions_dict(self):
"""
Return a dict that will mimic the "functions" dict used all over salt.
It creates a wrapper around the function allowing **kwargs, and if pub_data
is passed in as kwargs, will re-use the JID passed in
"""
return ClientFuncsDict(self)
def master_call(self, **kwargs):
"""
Execute a function through the master network interface.
"""
load = kwargs
load["cmd"] = self.client
with salt.channel.client.ReqChannel.factory(
self.opts, crypt="clear", usage="master_call"
) as channel:
ret = channel.send(load)
if isinstance(ret, Mapping):
if "error" in ret:
salt.utils.error.raise_error(**ret["error"])
return ret
def cmd_sync(self, low, timeout=None, full_return=False):
"""
Execute a runner function synchronously; eauth is respected
This function requires that :conf_master:`external_auth` is configured
and the user is authorized to execute runner functions: (``@runner``).
.. code-block:: python
runner.eauth_sync({
'fun': 'jobs.list_jobs',
'username': 'saltdev',
'password': 'saltdev',
'eauth': 'pam',
})
"""
with salt.utils.event.get_master_event(
self.opts, self.opts["sock_dir"], listen=True
) as event:
job = self.master_call(**low)
ret_tag = salt.utils.event.tagify("ret", base=job["tag"])
if timeout is None:
timeout = self.opts.get("rest_timeout", 300)
ret = event.get_event(
tag=ret_tag, full=True, wait=timeout, auto_reconnect=True
)
if ret is None:
raise salt.exceptions.SaltClientTimeout(
"RunnerClient job '{}' timed out".format(job["jid"]),
jid=job["jid"],
)
return ret if full_return else ret["data"]["return"]
def cmd(
self,
fun,
arg=None,
pub_data=None,
kwarg=None,
print_event=True,
full_return=False,
):
"""
Execute a function
"""
if arg is None:
arg = tuple()
if not isinstance(arg, list) and not isinstance(arg, tuple):
raise salt.exceptions.SaltInvocationError(
"arg must be formatted as a list/tuple"
)
if pub_data is None:
pub_data = {}
if not isinstance(pub_data, dict):
raise salt.exceptions.SaltInvocationError(
"pub_data must be formatted as a dictionary"
)
if kwarg is None:
kwarg = {}
if not isinstance(kwarg, dict):
raise salt.exceptions.SaltInvocationError(
"kwarg must be formatted as a dictionary"
)
arglist = salt.utils.args.parse_input(
arg, no_parse=self.opts.get("no_parse", [])
)
# if you were passed kwarg, add it to arglist
if kwarg:
kwarg["__kwarg__"] = True
arglist.append(kwarg)
args, kwargs = salt.minion.load_args_and_kwargs(
self.functions[fun], arglist, pub_data
)
low = {"fun": fun, "arg": args, "kwarg": kwargs}
if "user" in pub_data:
low["__user__"] = pub_data["user"]
return self.low(fun, low, print_event=print_event, full_return=full_return)
@property
def mminion(self):
if not hasattr(self, "_mminion"):
self._mminion = salt.minion.MasterMinion(
self.opts, states=False, rend=False
)
return self._mminion
@property
def store_job(self):
"""
Helper that allows us to turn off storing jobs for different classes
that may incorporate this mixin.
"""
try:
class_name = self.__class__.__name__.lower()
except AttributeError:
log.warning(
"Unable to determine class name", exc_info_on_loglevel=logging.DEBUG
)
return True
try:
return self.opts[f"{class_name}_returns"]
except KeyError:
# No such option, assume this isn't one we care about gating and
# just return True.
return True
def low(self, fun, low, print_event=True, full_return=False):
"""
Execute a function from low data
Low data includes:
required:
- fun: the name of the function to run
optional:
- arg: a list of args to pass to fun
- kwarg: kwargs for fun
- __user__: user who is running the command
- __jid__: jid to run under
- __tag__: tag to run under
"""
# fire the mminion loading (if not already done) here
# this is not to clutter the output with the module loading
# if we have a high debug level.
self.mminion # pylint: disable=W0104
jid = low.get("__jid__", salt.utils.jid.gen_jid(self.opts))
tag = low.get("__tag__", salt.utils.event.tagify(jid, prefix=self.tag_prefix))
data = {
"fun": f"{self.client}.{fun}",
"jid": jid,
"user": low.get("__user__", "UNKNOWN"),
}
if print_event:
print_func = (
self.print_async_event if hasattr(self, "print_async_event") else None
)
else:
# Suppress printing of return event (this keeps us from printing
# runner/wheel output during orchestration).
print_func = None
with salt.utils.event.NamespacedEvent(
salt.utils.event.get_event(
"master",
self.opts["sock_dir"],
opts=self.opts,
listen=False,
),
tag,
print_func=print_func,
) as namespaced_event:
# TODO: test that they exist
# TODO: Other things to inject??
func_globals = {
"__jid__": jid,
"__user__": data["user"],
"__tag__": tag,
# weak ref to avoid the Exception in interpreter
# teardown of event
"__jid_event__": weakref.proxy(namespaced_event),
}
try:
self_functions = copy.copy(self.functions)
salt.utils.lazy.verify_fun(self_functions, fun)
# Inject some useful globals to *all* the function's global
# namespace only once per module-- not per func
completed_funcs = []
for mod_name in self_functions.keys():
if "." not in mod_name:
continue
mod, _ = mod_name.split(".", 1)
if mod in completed_funcs:
continue
completed_funcs.append(mod)
for global_key, value in func_globals.items():
self.functions[mod_name].__globals__[global_key] = value
# There are some discrepancies of what a "low" structure is in the
# publisher world it is a dict including stuff such as jid, fun,
# arg (a list of args, with kwargs packed in). Historically this
# particular one has had no "arg" and just has had all the kwargs
# packed into the top level object. The plan is to move away from
# that since the caller knows what is an arg vs a kwarg, but while
# we make the transition we will load "kwargs" using format_call if
# there are no kwargs in the low object passed in.
if "arg" in low and "kwarg" in low:
args = low["arg"]
kwargs = low["kwarg"]
else:
f_call = salt.utils.args.format_call(
self.functions[fun],
low,
expected_extra_kws=CLIENT_INTERNAL_KEYWORDS,
)
args = f_call.get("args", ())
kwargs = f_call.get("kwargs", {})
# Update the event data with loaded args and kwargs
data["fun_args"] = list(args) + ([kwargs] if kwargs else [])
func_globals["__jid_event__"].fire_event(data, "new")
# Initialize a context for executing the method.
with salt.ext.tornado.stack_context.StackContext(
self.functions.context_dict.clone
):
func = self.functions[fun]
try:
data["return"] = func(*args, **kwargs)
except TypeError as exc:
data["return"] = (
"\nPassed invalid arguments: {}\n\nUsage:\n{}".format(
exc, func.__doc__
)
)
try:
data["success"] = self.context.get("retcode", 0) == 0
except AttributeError:
# Assume a True result if no context attribute
data["success"] = True
if isinstance(data["return"], dict) and "data" in data["return"]:
# some functions can return boolean values
data["success"] = salt.utils.state.check_result(
data["return"]["data"]
)
except (Exception, SystemExit) as ex: # pylint: disable=broad-except
if isinstance(ex, salt.exceptions.NotImplemented):
data["return"] = str(ex)
else:
data["return"] = "Exception occurred in {} {}: {}".format(
self.client,
fun,
traceback.format_exc(),
)
data["success"] = False
data["retcode"] = 1
if self.store_job:
try:
salt.utils.job.store_job(
self.opts,
{
"id": self.opts["id"],
"tgt": self.opts["id"],
"jid": data["jid"],
"return": data,
},
event=None,
mminion=self.mminion,
)
except salt.exceptions.SaltCacheError:
log.error(
"Could not store job cache info. "
"Job details for this run may be unavailable."
)
# Outputters _can_ mutate data so write to the job cache first!
namespaced_event.fire_event(data, "ret")
# if we fired an event, make sure to delete the event object.
# This will ensure that we call destroy, which will do the 0MQ linger
log.info("Runner completed: %s", data["jid"])
return data if full_return else data["return"]
def get_docs(self, arg=None):
"""
Return a dictionary of functions and the inline documentation for each
"""
if arg:
if "*" in arg:
target_mod = arg
_use_fnmatch = True
else:
target_mod = arg + "." if not arg.endswith(".") else arg
_use_fnmatch = False
if _use_fnmatch:
docs = [
(fun, self.functions[fun].__doc__)
for fun in fnmatch.filter(self.functions, target_mod)
]
else:
docs = [
(fun, self.functions[fun].__doc__)
for fun in sorted(self.functions)
if fun == arg or fun.startswith(target_mod)
]
else:
docs = [
(fun, self.functions[fun].__doc__) for fun in sorted(self.functions)
]
docs = dict(docs)
return salt.utils.doc.strip_rst(docs)
class AsyncClientMixin(ClientStateMixin):
"""
A mixin for *Client interfaces to enable easy asynchronous function execution
"""
client = None
tag_prefix = None
@classmethod
def _proc_function_remote(
cls, *, instance, opts, fun, low, user, tag, jid, daemonize=True
):
"""
Run this method in a multiprocess target to execute the function on the
master and fire the return data on the event bus
"""
if daemonize and not salt.utils.platform.spawning_platform():
# Shutdown logging before daemonizing
salt._logging.shutdown_logging()
salt.utils.process.daemonize()
# Because we have daemonized, salt._logging.in_mainprocess() will
# return False. We'll just force it to return True for this
# particular case so that proper logging can be set up.
salt._logging.in_mainprocess.__pid__ = os.getpid()
# Configure logging once daemonized
salt._logging.setup_logging()
# pack a few things into low
low["__jid__"] = jid
low["__user__"] = user
low["__tag__"] = tag
if instance is None:
instance = cls(opts)
try:
return instance.cmd_sync(low)
except salt.exceptions.EauthAuthenticationError as exc:
log.error(exc)
@classmethod
def _proc_function(
cls,
*,
instance,
opts,
fun,
low,
user,
tag,
jid,
daemonize=True,
full_return=False,
):
"""
Run this method in a multiprocess target to execute the function
locally and fire the return data on the event bus
"""
if daemonize and not salt.utils.platform.spawning_platform():
# Shutdown logging before daemonizing
salt._logging.shutdown_logging()
salt.utils.process.daemonize()
# Because we have daemonized, salt._logging.in_mainprocess() will
# return False. We'll just force it to return True for this
# particular case so that proper logging can be set up.
salt._logging.in_mainprocess.__pid__ = os.getpid()
# Configure logging once daemonized
salt._logging.setup_logging()
if instance is None:
instance = cls(opts)
# pack a few things into low
low["__jid__"] = jid
low["__user__"] = user
low["__tag__"] = tag
return instance.low(fun, low, full_return=full_return)
def cmd_async(self, low):
"""
Execute a function asynchronously; eauth is respected
This function requires that :conf_master:`external_auth` is configured
and the user is authorized
.. code-block:: python
>>> wheel.cmd_async({
'fun': 'key.finger',
'match': 'jerry',
'eauth': 'auto',
'username': 'saltdev',
'password': 'saltdev',
})
{'jid': '20131219224744416681', 'tag': 'salt/wheel/20131219224744416681'}
"""
return self.master_call(**low)
def _gen_async_pub(self, jid=None):
if jid is None:
jid = salt.utils.jid.gen_jid(self.opts)
tag = salt.utils.event.tagify(jid, prefix=self.tag_prefix)
return {"tag": tag, "jid": jid}
def asynchronous(self, fun, low, user="UNKNOWN", pub=None, local=True):
"""
Execute the function in a multiprocess and return the event tag to use
to watch for the return
"""
if local:
proc_func = self._proc_function
else:
proc_func = self._proc_function_remote
async_pub = pub if pub is not None else self._gen_async_pub()
if salt.utils.platform.spawning_platform():
instance = None
else:
instance = self
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
proc = salt.utils.process.SignalHandlingProcess(
target=proc_func,
name="ProcessFunc({}, fun={} jid={})".format(
proc_func.__qualname__, fun, async_pub["jid"]
),
kwargs=dict(
instance=instance,
opts=self.opts,
fun=fun,
low=low,
user=user,
tag=async_pub["tag"],
jid=async_pub["jid"],
),
)
proc.start()
proc.join() # MUST join, otherwise we leave zombies all over
return async_pub
def print_async_event(self, suffix, event):
"""
Print all of the events with the prefix 'tag'
"""
if not isinstance(event, dict):
return
# if we are "quiet", don't print
if self.opts.get("quiet", False):
return
# some suffixes we don't want to print
if suffix in ("new",):
return
try:
outputter = self.opts.get(
"output",
event.get("outputter", None) or event.get("return").get("outputter"),
)
except AttributeError:
outputter = None
# if this is a ret, we have our own set of rules
if suffix == "ret":
# Check if outputter was passed in the return data. If this is the case,
# then the return data will be a dict two keys: 'data' and 'outputter'
if isinstance(event.get("return"), dict) and set(event["return"]) == {
"data",
"outputter",
}:
event_data = event["return"]["data"]
outputter = event["return"]["outputter"]
else:
event_data = event["return"]
else:
event_data = {"suffix": suffix, "event": event}
salt.output.display_output(event_data, outputter, self.opts)