HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.109.1.lve.el8.x86_64 #1 SMP Thu Mar 5 20:23:46 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.30
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //proc/self/root/opt/saltstack/salt/lib/python3.10/site-packages/salt/fileclient.py
"""
Classes that manage file clients
"""

import contextlib
import errno
import ftplib  # nosec
import http.server
import logging
import os
import shutil
import string
import time
import urllib.error
import urllib.parse

import salt.channel.client
import salt.client
import salt.crypt
import salt.fileserver
import salt.loader
import salt.payload
import salt.utils.atomicfile
import salt.utils.data
import salt.utils.files
import salt.utils.gzip_util
import salt.utils.hashutils
import salt.utils.http
import salt.utils.path
import salt.utils.platform
import salt.utils.stringutils
import salt.utils.templates
import salt.utils.url
import salt.utils.verify
import salt.utils.versions
from salt.config import DEFAULT_HASH_TYPE
from salt.exceptions import CommandExecutionError, MinionError, SaltClientError
from salt.ext.tornado.httputil import (
    HTTPHeaders,
    HTTPInputError,
    parse_response_start_line,
)
from salt.utils.openstack.swift import SaltSwift

log = logging.getLogger(__name__)
MAX_FILENAME_LENGTH = 255


def get_file_client(opts, pillar=False):
    """
    Read in the ``file_client`` option and return the correct type of file
    server
    """
    client = opts.get("file_client", "remote")

    if pillar and client == "local":
        client = "pillar"
    return {"remote": RemoteClient, "local": FSClient, "pillar": PillarClient}.get(
        client, RemoteClient
    )(opts)


def decode_dict_keys_to_str(src):
    """
    Convert top level keys from bytes to strings if possible.
    This is necessary because Python 3 makes a distinction
    between these types.
    """
    if not isinstance(src, dict):
        return src

    output = {}
    for key, val in src.items():
        if isinstance(key, bytes):
            try:
                key = key.decode()
            except UnicodeError:
                pass
        output[key] = val
    return output


class Client:
    """
    Base class for Salt file interactions
    """

    def __init__(self, opts):
        self.opts = opts
        self.utils = salt.loader.utils(self.opts)

    # Add __setstate__ and __getstate__ so that the object may be
    # deep copied. It normally can't be deep copied because its
    # constructor requires an 'opts' parameter.
    # The TCP transport needs to be able to deep copy this class
    # due to 'salt.utils.context.ContextDict.clone'.
    def __setstate__(self, state):
        # This will polymorphically call __init__
        # in the derived class.
        self.__init__(state["opts"])

    def __getstate__(self):
        return {"opts": self.opts}

    def _check_proto(self, path):
        """
        Make sure that this path is intended for the salt master and trim it
        """
        if not path.startswith("salt://"):
            raise MinionError(f"Unsupported path: {path}")
        file_path, saltenv = salt.utils.url.parse(path)
        return file_path

    def _file_local_list(self, dest):
        """
        Helper util to return a list of files in a directory
        """
        if os.path.isdir(dest):
            destdir = dest
        else:
            destdir = os.path.dirname(dest)

        filelist = set()

        for root, dirs, files in salt.utils.path.os_walk(destdir, followlinks=True):
            for name in files:
                path = os.path.join(root, name)
                filelist.add(path)

        return filelist

    @contextlib.contextmanager
    def _cache_loc(self, path, saltenv="base", cachedir=None):
        """
        Return the local location to cache the file, cache dirs will be made
        """
        cachedir = self.get_cachedir(cachedir)
        dest = salt.utils.path.join(cachedir, "files", saltenv, path)
        destdir = os.path.dirname(dest)
        with salt.utils.files.set_umask(0o077):
            # remove destdir if it is a regular file to avoid an OSError when
            # running os.makedirs below
            if os.path.isfile(destdir):
                os.remove(destdir)

            # ensure destdir exists
            try:
                os.makedirs(destdir)
            except OSError as exc:
                if exc.errno != errno.EEXIST:  # ignore if it was there already
                    raise

            yield dest

    def get_cachedir(self, cachedir=None):
        if cachedir is None:
            cachedir = self.opts["cachedir"]
        elif not os.path.isabs(cachedir):
            cachedir = os.path.join(self.opts["cachedir"], cachedir)
        return cachedir

    def get_file(
        self, path, dest="", makedirs=False, saltenv="base", gzip=None, cachedir=None
    ):
        """
        Copies a file from the local files or master depending on
        implementation
        """
        raise NotImplementedError

    def file_list_emptydirs(self, saltenv="base", prefix=""):
        """
        List the empty dirs
        """
        raise NotImplementedError

    def cache_file(
        self,
        path,
        saltenv="base",
        cachedir=None,
        source_hash=None,
        verify_ssl=True,
        use_etag=False,
    ):
        """
        Pull a file down from the file server and store it in the minion
        file cache
        """
        return self.get_url(
            path,
            "",
            True,
            saltenv,
            cachedir=cachedir,
            source_hash=source_hash,
            verify_ssl=verify_ssl,
            use_etag=use_etag,
        )

    def cache_files(self, paths, saltenv="base", cachedir=None):
        """
        Download a list of files stored on the master and put them in the
        minion file cache
        """
        ret = []
        if isinstance(paths, str):
            paths = paths.split(",")
        for path in paths:
            ret.append(self.cache_file(path, saltenv, cachedir=cachedir))
        return ret

    def cache_master(self, saltenv="base", cachedir=None):
        """
        Download and cache all files on a master in a specified environment
        """
        ret = []
        for path in self.file_list(saltenv):
            ret.append(
                self.cache_file(salt.utils.url.create(path), saltenv, cachedir=cachedir)
            )
        return ret

    def cache_dir(
        self,
        path,
        saltenv="base",
        include_empty=False,
        include_pat=None,
        exclude_pat=None,
        cachedir=None,
    ):
        """
        Download all of the files in a subdir of the master
        """
        ret = []

        path = self._check_proto(salt.utils.data.decode(path))
        # We want to make sure files start with this *directory*, use
        # '/' explicitly because the master (that's generating the
        # list of files) only runs on POSIX
        if not path.endswith("/"):
            path = path + "/"

        log.info("Caching directory '%s' for environment '%s'", path, saltenv)
        # go through the list of all files finding ones that are in
        # the target directory and caching them
        for fn_ in self.file_list(saltenv):
            fn_ = salt.utils.data.decode(fn_)
            if fn_.strip() and fn_.startswith(path):
                if salt.utils.stringutils.check_include_exclude(
                    fn_, include_pat, exclude_pat
                ):
                    fn_ = self.cache_file(
                        salt.utils.url.create(fn_), saltenv, cachedir=cachedir
                    )
                    if fn_:
                        ret.append(fn_)

        if include_empty:
            # Break up the path into a list containing the bottom-level
            # directory (the one being recursively copied) and the directories
            # preceding it
            # separated = string.rsplit(path, '/', 1)
            # if len(separated) != 2:
            #     # No slashes in path. (So all files in saltenv will be copied)
            #     prefix = ''
            # else:
            #     prefix = separated[0]
            cachedir = self.get_cachedir(cachedir)

            dest = salt.utils.path.join(cachedir, "files", saltenv)
            for fn_ in self.file_list_emptydirs(saltenv):
                fn_ = salt.utils.data.decode(fn_)
                if fn_.startswith(path):
                    minion_dir = f"{dest}/{fn_}"
                    if not os.path.isdir(minion_dir):
                        os.makedirs(minion_dir)
                    ret.append(minion_dir)
        return ret

    def cache_local_file(self, path, **kwargs):
        """
        Cache a local file on the minion in the localfiles cache
        """
        dest = os.path.join(self.opts["cachedir"], "localfiles", path.lstrip("/"))
        destdir = os.path.dirname(dest)

        if not os.path.isdir(destdir):
            os.makedirs(destdir)

        shutil.copyfile(path, dest)
        return dest

    def file_local_list(self, saltenv="base"):
        """
        List files in the local minion files and localfiles caches
        """
        filesdest = os.path.join(self.opts["cachedir"], "files", saltenv)
        localfilesdest = os.path.join(self.opts["cachedir"], "localfiles")

        fdest = self._file_local_list(filesdest)
        ldest = self._file_local_list(localfilesdest)
        return sorted(fdest.union(ldest))

    def file_list(self, saltenv="base", prefix=""):
        """
        This function must be overwritten
        """
        return []

    def dir_list(self, saltenv="base", prefix=""):
        """
        This function must be overwritten
        """
        return []

    def symlink_list(self, saltenv="base", prefix=""):
        """
        This function must be overwritten
        """
        return {}

    def is_cached(self, path, saltenv="base", cachedir=None):
        """
        Returns the full path to a file if it is cached locally on the minion
        otherwise returns a blank string
        """
        if path.startswith("salt://"):
            path, senv = salt.utils.url.parse(path)
            if senv:
                saltenv = senv

        escaped = True if salt.utils.url.is_escaped(path) else False

        # also strip escape character '|'
        localsfilesdest = os.path.join(
            self.opts["cachedir"], "localfiles", path.lstrip("|/")
        )
        filesdest = os.path.join(
            self.opts["cachedir"], "files", saltenv, path.lstrip("|/")
        )
        extrndest = self._extrn_path(path, saltenv, cachedir=cachedir)

        if os.path.exists(filesdest):
            return salt.utils.url.escape(filesdest) if escaped else filesdest
        elif os.path.exists(localsfilesdest):
            return (
                salt.utils.url.escape(localsfilesdest) if escaped else localsfilesdest
            )
        elif os.path.exists(extrndest):
            return extrndest

        return ""

    def cache_dest(self, url, saltenv="base", cachedir=None):
        """
        Return the expected cache location for the specified URL and
        environment.
        """
        proto = urllib.parse.urlparse(url).scheme

        if proto == "":
            # Local file path
            return url

        if proto == "salt":
            url, senv = salt.utils.url.parse(url)
            if senv:
                saltenv = senv
            return salt.utils.path.join(
                self.opts["cachedir"], "files", saltenv, url.lstrip("|/")
            )

        return self._extrn_path(url, saltenv, cachedir=cachedir)

    def list_states(self, saltenv):
        """
        Return a list of all available sls modules on the master for a given
        environment
        """
        states = set()
        for path in self.file_list(saltenv):
            if salt.utils.platform.is_windows():
                path = path.replace("\\", "/")
            if path.endswith(".sls"):
                # is an sls module!
                if path.endswith("/init.sls"):
                    states.add(path.replace("/", ".")[:-9])
                else:
                    states.add(path.replace("/", ".")[:-4])
        return sorted(states)

    def get_state(self, sls, saltenv, cachedir=None):
        """
        Get a state file from the master and store it in the local minion
        cache; return the location of the file
        """
        if "." in sls:
            sls = sls.replace(".", "/")
        sls_url = salt.utils.url.create(sls + ".sls")
        init_url = salt.utils.url.create(sls + "/init.sls")
        for path in [sls_url, init_url]:
            dest = self.cache_file(path, saltenv, cachedir=cachedir)
            if dest:
                return {"source": path, "dest": dest}
        return {}

    def get_dir(self, path, dest="", saltenv="base", gzip=None, cachedir=None):
        """
        Get a directory recursively from the salt-master
        """
        ret = []
        # Strip trailing slash
        path = self._check_proto(path).rstrip("/")
        # Break up the path into a list containing the bottom-level directory
        # (the one being recursively copied) and the directories preceding it
        separated = path.rsplit("/", 1)
        if len(separated) != 2:
            # No slashes in path. (This means all files in saltenv will be
            # copied)
            prefix = ""
        else:
            prefix = separated[0]

        # Copy files from master
        for fn_ in self.file_list(saltenv, prefix=path):
            # Prevent files in "salt://foobar/" (or salt://foo.sh) from
            # matching a path of "salt://foo"
            try:
                if fn_[len(path)] != "/":
                    continue
            except IndexError:
                continue
            # Remove the leading directories from path to derive
            # the relative path on the minion.
            minion_relpath = fn_[len(prefix) :].lstrip("/")
            ret.append(
                self.get_file(
                    salt.utils.url.create(fn_),
                    f"{dest}/{minion_relpath}",
                    True,
                    saltenv,
                    gzip,
                )
            )
        # Replicate empty dirs from master
        try:
            for fn_ in self.file_list_emptydirs(saltenv, prefix=path):
                # Prevent an empty dir "salt://foobar/" from matching a path of
                # "salt://foo"
                try:
                    if fn_[len(path)] != "/":
                        continue
                except IndexError:
                    continue
                # Remove the leading directories from path to derive
                # the relative path on the minion.
                minion_relpath = fn_[len(prefix) :].lstrip("/")
                minion_mkdir = f"{dest}/{minion_relpath}"
                if not os.path.isdir(minion_mkdir):
                    os.makedirs(minion_mkdir)
                ret.append(minion_mkdir)
        except TypeError:
            pass
        ret.sort()
        return ret

    def get_url(
        self,
        url,
        dest,
        makedirs=False,
        saltenv="base",
        no_cache=False,
        cachedir=None,
        source_hash=None,
        verify_ssl=True,
        use_etag=False,
    ):
        """
        Get a single file from a URL.
        """
        url_data = urllib.parse.urlparse(url, allow_fragments=False)
        url_scheme = url_data.scheme
        url_path = os.path.join(url_data.netloc, url_data.path).rstrip(os.sep)

        # If dest is a directory, rewrite dest with filename
        if dest is not None and (os.path.isdir(dest) or dest.endswith(("/", "\\"))):
            if (
                url_data.query
                or len(url_data.path) > 1
                and not url_data.path.endswith("/")
            ):
                strpath = url.split("/")[-1]
            else:
                strpath = "index.html"

            if salt.utils.platform.is_windows():
                strpath = salt.utils.path.sanitize_win_path(strpath)

            dest = os.path.join(dest, strpath)

        if url_scheme and url_scheme.lower() in string.ascii_lowercase:
            url_path = ":".join((url_scheme, url_path))
            url_scheme = "file"

        if url_scheme in ("file", ""):
            # Local filesystem
            if not os.path.isabs(url_path):
                raise CommandExecutionError(f"Path '{url_path}' is not absolute")
            if dest is None:
                with salt.utils.files.fopen(url_path, "rb") as fp_:
                    data = fp_.read()
                return data
            return url_path

        if url_scheme == "salt":
            result = self.get_file(url, dest, makedirs, saltenv, cachedir=cachedir)
            if result and dest is None:
                with salt.utils.files.fopen(result, "rb") as fp_:
                    data = fp_.read()
                return data
            return result

        if dest:
            destdir = os.path.dirname(dest)
            if not os.path.isdir(destdir):
                if makedirs:
                    os.makedirs(destdir)
                else:
                    return ""
        elif not no_cache:
            dest = self._extrn_path(url, saltenv, cachedir=cachedir)
            if source_hash is not None:
                try:
                    source_hash = source_hash.split("=")[-1]
                    form = salt.utils.files.HASHES_REVMAP[len(source_hash)]
                    if salt.utils.hashutils.get_hash(dest, form) == source_hash:
                        log.debug(
                            "Cached copy of %s (%s) matches source_hash %s, "
                            "skipping download",
                            url,
                            dest,
                            source_hash,
                        )
                        return dest
                except (AttributeError, KeyError, OSError):
                    pass
            destdir = os.path.dirname(dest)
            if not os.path.isdir(destdir):
                os.makedirs(destdir)

        if url_data.scheme == "s3":
            try:

                def s3_opt(key, default=None):
                    """
                    Get value of s3.<key> from Minion config or from Pillar
                    """
                    if "s3." + key in self.opts:
                        return self.opts["s3." + key]
                    try:
                        return self.opts["pillar"]["s3"][key]
                    except (KeyError, TypeError):
                        return default

                self.utils["s3.query"](
                    method="GET",
                    bucket=url_data.netloc,
                    path=url_data.path[1:],
                    return_bin=False,
                    local_file=dest,
                    action=None,
                    key=s3_opt("key"),
                    keyid=s3_opt("keyid"),
                    service_url=s3_opt("service_url"),
                    verify_ssl=s3_opt("verify_ssl", True),
                    location=s3_opt("location"),
                    path_style=s3_opt("path_style", False),
                    https_enable=s3_opt("https_enable", True),
                )
                return dest
            except Exception as exc:  # pylint: disable=broad-except
                raise MinionError(f"Could not fetch from {url}. Exception: {exc}")
        if url_data.scheme == "ftp":
            try:
                ftp = ftplib.FTP()  # nosec
                ftp_port = url_data.port
                if not ftp_port:
                    ftp_port = 21
                ftp.connect(url_data.hostname, ftp_port)
                ftp.login(url_data.username, url_data.password)
                remote_file_path = url_data.path.lstrip("/")
                with salt.utils.files.fopen(dest, "wb") as fp_:
                    ftp.retrbinary(f"RETR {remote_file_path}", fp_.write)
                ftp.quit()
                return dest
            except Exception as exc:  # pylint: disable=broad-except
                raise MinionError(
                    "Could not retrieve {} from FTP server. Exception: {}".format(
                        url, exc
                    )
                )

        if url_data.scheme == "swift":
            try:

                def swift_opt(key, default):
                    """
                    Get value of <key> from Minion config or from Pillar
                    """
                    if key in self.opts:
                        return self.opts[key]
                    try:
                        return self.opts["pillar"][key]
                    except (KeyError, TypeError):
                        return default

                swift_conn = SaltSwift(
                    swift_opt("keystone.user", None),
                    swift_opt("keystone.tenant", None),
                    swift_opt("keystone.auth_url", None),
                    swift_opt("keystone.password", None),
                )

                swift_conn.get_object(url_data.netloc, url_data.path[1:], dest)
                return dest
            except Exception:  # pylint: disable=broad-except
                raise MinionError(f"Could not fetch from {url}")

        get_kwargs = {}
        if url_data.username is not None and url_data.scheme in ("http", "https"):
            netloc = url_data.netloc
            at_sign_pos = netloc.rfind("@")
            if at_sign_pos != -1:
                netloc = netloc[at_sign_pos + 1 :]
            fixed_url = urllib.parse.urlunparse(
                (
                    url_data.scheme,
                    netloc,
                    url_data.path,
                    url_data.params,
                    url_data.query,
                    url_data.fragment,
                )
            )
            get_kwargs["auth"] = (url_data.username, url_data.password)
        else:
            fixed_url = url

        destfp = None
        dest_etag = f"{dest}.etag"
        try:
            # Tornado calls streaming_callback on redirect response bodies.
            # But we need streaming to support fetching large files (> RAM
            # avail). Here we are working around this by disabling recording
            # the body for redirections. The issue is fixed in Tornado 4.3.0
            # so on_header callback could be removed when we'll deprecate
            # Tornado<4.3.0. See #27093 and #30431 for details.

            # Use list here to make it writable inside the on_header callback.
            # Simple bool doesn't work here: on_header creates a new local
            # variable instead. This could be avoided in Py3 with 'nonlocal'
            # statement. There is no Py2 alternative for this.
            #
            # write_body[0] is used by the on_chunk callback to tell it whether
            #   or not we need to write the body of the request to disk. For
            #   30x redirects we set this to False because we don't want to
            #   write the contents to disk, as we will need to wait until we
            #   get to the redirected URL.
            #
            # write_body[1] will contain a tornado.httputil.HTTPHeaders
            #   instance that we will use to parse each header line. We
            #   initialize this to False, and after we parse the status line we
            #   will replace it with the HTTPHeaders instance. If/when we have
            #   found the encoding used in the request, we set this value to
            #   False to signify that we are done parsing.
            #
            # write_body[2] is where the encoding will be stored
            #
            # write_body[3] is where the etag will be stored if use_etag is
            #   enabled. This allows us to iterate over the headers until
            #   both content encoding and etag are found.
            write_body = [None, False, None, None]

            def on_header(hdr):
                if write_body[1] is not False and (
                    write_body[2] is None or (use_etag and write_body[3] is None)
                ):
                    if not hdr.strip() and "Content-Type" not in write_body[1]:
                        # If write_body[0] is True, then we are not following a
                        # redirect (initial response was a 200 OK). So there is
                        # no need to reset write_body[0].
                        if write_body[0] is not True:
                            # We are following a redirect, so we need to reset
                            # write_body[0] so that we properly follow it.
                            write_body[0] = None
                        # We don't need the HTTPHeaders object anymore
                        if not use_etag or write_body[3]:
                            write_body[1] = False
                        return
                    # Try to find out what content type encoding is used if
                    # this is a text file
                    write_body[1].parse_line(hdr)  # pylint: disable=no-member
                    # Case insensitive Etag header checking below. Don't break case
                    # insensitivity unless you really want to mess with people's heads
                    # in the tests. Note: http.server and apache2 use "Etag" and nginx
                    # uses "ETag" as the header key. Yay standards!
                    if use_etag and "etag" in map(str.lower, write_body[1]):
                        etag = write_body[3] = [
                            val
                            for key, val in write_body[1].items()
                            if key.lower() == "etag"
                        ][0]
                        with salt.utils.files.fopen(dest_etag, "w") as etagfp:
                            etag = etagfp.write(etag)
                    elif "Content-Type" in write_body[1]:
                        content_type = write_body[1].get(
                            "Content-Type"
                        )  # pylint: disable=no-member
                        if not content_type.startswith("text"):
                            write_body[2] = False
                            if not use_etag or write_body[3]:
                                write_body[1] = False
                        else:
                            encoding = "utf-8"
                            fields = content_type.split(";")
                            for field in fields:
                                if "encoding" in field:
                                    encoding = field.split("encoding=")[-1]
                            write_body[2] = encoding
                            # We have found our encoding. Stop processing headers.
                            if not use_etag or write_body[3]:
                                write_body[1] = False

                        # If write_body[0] is False, this means that this
                        # header is a 30x redirect, so we need to reset
                        # write_body[0] to None so that we parse the HTTP
                        # status code from the redirect target. Additionally,
                        # we need to reset write_body[2] so that we inspect the
                        # headers for the Content-Type of the URL we're
                        # following.
                        if write_body[0] is write_body[1] is False:
                            write_body[0] = write_body[2] = None

                # Check the status line of the HTTP request
                if write_body[0] is None:
                    try:
                        hdr = parse_response_start_line(hdr)
                    except HTTPInputError:
                        # Not the first line, do nothing
                        return
                    write_body[0] = hdr.code not in [301, 302, 303, 307]
                    write_body[1] = HTTPHeaders()

            if no_cache:
                result = []

                def on_chunk(chunk):
                    if write_body[0]:
                        if write_body[2]:
                            chunk = chunk.decode(write_body[2])
                        result.append(chunk)

            else:
                dest_tmp = f"{dest}.part"
                # We need an open filehandle to use in the on_chunk callback,
                # that's why we're not using a with clause here.
                # pylint: disable=resource-leakage
                destfp = salt.utils.files.fopen(dest_tmp, "wb")
                # pylint: enable=resource-leakage

                def on_chunk(chunk):
                    if write_body[0]:
                        destfp.write(chunk)

            # ETag is only used for refetch. Cached file and previous ETag
            # should be present for verification.
            header_dict = {}
            if use_etag and os.path.exists(dest_etag) and os.path.exists(dest):
                with salt.utils.files.fopen(dest_etag, "r") as etagfp:
                    etag = etagfp.read().replace("\n", "").strip()
                header_dict["If-None-Match"] = etag

            query = salt.utils.http.query(
                fixed_url,
                stream=True,
                streaming_callback=on_chunk,
                header_callback=on_header,
                username=url_data.username,
                password=url_data.password,
                opts=self.opts,
                verify_ssl=verify_ssl,
                header_dict=header_dict,
                **get_kwargs,
            )

            # 304 Not Modified is returned when If-None-Match header
            # matches server ETag for requested file.
            if use_etag and query.get("status") == 304:
                if not no_cache:
                    destfp.close()
                    destfp = None
                    os.remove(dest_tmp)
                return dest
            if "handle" not in query:
                raise MinionError(
                    "Error: {} reading {}".format(query["error"], url_data.path)
                )
            if no_cache:
                if write_body[2]:
                    return "".join(result)
                return b"".join(result)
            else:
                destfp.close()
                destfp = None
                salt.utils.files.rename(dest_tmp, dest)
                return dest
        except urllib.error.HTTPError as exc:
            raise MinionError(
                "HTTP error {0} reading {1}: {3}".format(
                    exc.code,
                    url,
                    *http.server.BaseHTTPRequestHandler.responses[exc.code],
                )
            )
        except urllib.error.URLError as exc:
            raise MinionError(f"Error reading {url}: {exc.reason}")
        finally:
            if destfp is not None:
                destfp.close()

    def get_template(
        self,
        url,
        dest,
        template="jinja",
        makedirs=False,
        saltenv="base",
        cachedir=None,
        **kwargs,
    ):
        """
        Cache a file then process it as a template
        """
        if "env" in kwargs:
            # "env" is not supported; Use "saltenv".
            kwargs.pop("env")

        kwargs["saltenv"] = saltenv
        sfn = self.cache_file(url, saltenv, cachedir=cachedir)
        if not sfn or not os.path.exists(sfn):
            return ""
        if template in salt.utils.templates.TEMPLATE_REGISTRY:
            data = salt.utils.templates.TEMPLATE_REGISTRY[template](sfn, **kwargs)
        else:
            log.error(
                "Attempted to render template with unavailable engine %s", template
            )
            return ""
        if not data["result"]:
            # Failed to render the template
            log.error("Failed to render template with error: %s", data["data"])
            return ""
        if not dest:
            # No destination passed, set the dest as an extrn_files cache
            dest = self._extrn_path(url, saltenv, cachedir=cachedir)
            # If Salt generated the dest name, create any required dirs
            makedirs = True

        destdir = os.path.dirname(dest)
        if not os.path.isdir(destdir):
            if makedirs:
                os.makedirs(destdir)
            else:
                salt.utils.files.safe_rm(data["data"])
                return ""
        shutil.move(data["data"], dest)
        return dest

    def _extrn_path(self, url, saltenv, cachedir=None):
        """
        Return the extrn_filepath for a given url
        """
        url_data = urllib.parse.urlparse(url)
        if salt.utils.platform.is_windows():
            netloc = salt.utils.path.sanitize_win_path(url_data.netloc)
        else:
            netloc = url_data.netloc

        # Strip user:pass from URLs
        netloc = netloc.split("@")[-1]
        try:
            if url_data.port:
                # Remove : from path
                netloc = netloc.replace(":", "")
        except ValueError:
            # On Windows urllib raises a ValueError
            # when using a file:// source and trying
            # to access the port attribute.
            pass

        if cachedir is None:
            cachedir = self.opts["cachedir"]
        elif not os.path.isabs(cachedir):
            cachedir = os.path.join(self.opts["cachedir"], cachedir)

        if url_data.query:
            file_name = "-".join([url_data.path, url_data.query])
        else:
            file_name = url_data.path

        # clean_path returns an empty string if the check fails
        root_path = salt.utils.path.join(cachedir, "extrn_files", saltenv, netloc)
        new_path = os.path.sep.join([root_path, file_name])
        if not salt.utils.verify.clean_path(root_path, new_path, subdir=True):
            return "Invalid path"

        if len(file_name) > MAX_FILENAME_LENGTH:
            file_name = salt.utils.hashutils.sha256_digest(file_name)

        return salt.utils.path.join(cachedir, "extrn_files", saltenv, netloc, file_name)


class PillarClient(Client):
    """
    Used by pillar to handle fileclient requests
    """

    def _find_file(self, path, saltenv="base"):
        """
        Locate the file path
        """
        fnd = {"path": "", "rel": ""}

        if salt.utils.url.is_escaped(path):
            # The path arguments are escaped
            path = salt.utils.url.unescape(path)
        for root in self.opts["pillar_roots"].get(saltenv, []):
            full = os.path.join(root, path)
            if os.path.isfile(full):
                fnd["path"] = full
                fnd["rel"] = path
                return fnd
        return fnd

    def get_file(
        self, path, dest="", makedirs=False, saltenv="base", gzip=None, cachedir=None
    ):
        """
        Copies a file from the local files directory into :param:`dest`
        gzip compression settings are ignored for local files
        """
        path = self._check_proto(path)
        fnd = self._find_file(path, saltenv)
        fnd_path = fnd.get("path")
        if not fnd_path:
            return ""

        return fnd_path

    def file_list(self, saltenv="base", prefix=""):
        """
        Return a list of files in the given environment
        with optional relative prefix path to limit directory traversal
        """
        ret = []
        prefix = prefix.strip("/")
        for path in self.opts["pillar_roots"].get(saltenv, []):
            for root, dirs, files in salt.utils.path.os_walk(
                os.path.join(path, prefix), followlinks=True
            ):
                # Don't walk any directories that match file_ignore_regex or glob
                dirs[:] = [
                    d for d in dirs if not salt.fileserver.is_file_ignored(self.opts, d)
                ]
                for fname in files:
                    relpath = os.path.relpath(os.path.join(root, fname), path)
                    ret.append(salt.utils.data.decode(relpath))
        return ret

    def file_list_emptydirs(self, saltenv="base", prefix=""):
        """
        List the empty dirs in the pillar_roots
        with optional relative prefix path to limit directory traversal
        """
        ret = []
        prefix = prefix.strip("/")
        for path in self.opts["pillar_roots"].get(saltenv, []):
            for root, dirs, files in salt.utils.path.os_walk(
                os.path.join(path, prefix), followlinks=True
            ):
                # Don't walk any directories that match file_ignore_regex or glob
                dirs[:] = [
                    d for d in dirs if not salt.fileserver.is_file_ignored(self.opts, d)
                ]
                if not dirs and not files:
                    ret.append(salt.utils.data.decode(os.path.relpath(root, path)))
        return ret

    def dir_list(self, saltenv="base", prefix=""):
        """
        List the dirs in the pillar_roots
        with optional relative prefix path to limit directory traversal
        """
        ret = []
        prefix = prefix.strip("/")
        for path in self.opts["pillar_roots"].get(saltenv, []):
            for root, dirs, files in salt.utils.path.os_walk(
                os.path.join(path, prefix), followlinks=True
            ):
                ret.append(salt.utils.data.decode(os.path.relpath(root, path)))
        return ret

    def __get_file_path(self, path, saltenv="base"):
        """
        Return either a file path or the result of a remote find_file call.
        """
        try:
            path = self._check_proto(path)
        except MinionError as err:
            # Local file path
            if not os.path.isfile(path):
                log.warning(
                    "specified file %s is not present to generate hash: %s", path, err
                )
                return None
            else:
                return path
        return self._find_file(path, saltenv)

    def hash_file(self, path, saltenv="base"):
        """
        Return the hash of a file, to get the hash of a file in the pillar_roots
        prepend the path with salt://<file on server> otherwise, prepend the
        file with / for a local file.
        """
        ret = {}
        fnd = self.__get_file_path(path, saltenv)
        if fnd is None:
            return ret

        try:
            # Remote file path (self._find_file() invoked)
            fnd_path = fnd["path"]
        except TypeError:
            # Local file path
            fnd_path = fnd

        hash_type = self.opts.get("hash_type", DEFAULT_HASH_TYPE)
        ret["hsum"] = salt.utils.hashutils.get_hash(fnd_path, form=hash_type)
        ret["hash_type"] = hash_type
        return ret

    def hash_and_stat_file(self, path, saltenv="base"):
        """
        Return the hash of a file, to get the hash of a file in the pillar_roots
        prepend the path with salt://<file on server> otherwise, prepend the
        file with / for a local file.

        Additionally, return the stat result of the file, or None if no stat
        results were found.
        """
        ret = {}
        fnd = self.__get_file_path(path, saltenv)
        if fnd is None:
            return ret, None

        try:
            # Remote file path (self._find_file() invoked)
            fnd_path = fnd["path"]
            fnd_stat = fnd.get("stat")
        except TypeError:
            # Local file path
            fnd_path = fnd
            try:
                fnd_stat = list(os.stat(fnd_path))
            except Exception:  # pylint: disable=broad-except
                fnd_stat = None

        hash_type = self.opts.get("hash_type", DEFAULT_HASH_TYPE)
        ret["hsum"] = salt.utils.hashutils.get_hash(fnd_path, form=hash_type)
        ret["hash_type"] = hash_type
        return ret, fnd_stat

    def list_env(self, saltenv="base"):
        """
        Return a list of the files in the file server's specified environment
        """
        return self.file_list(saltenv)

    def master_opts(self):
        """
        Return the master opts data
        """
        return self.opts

    def envs(self):
        """
        Return the available environments
        """
        ret = []
        for saltenv in self.opts["pillar_roots"]:
            ret.append(saltenv)
        return ret

    def master_tops(self):
        """
        Originally returned information via the external_nodes subsystem.
        External_nodes was deprecated and removed in
        2014.1.6 in favor of master_tops (which had been around since pre-0.17).
             salt-call --local state.show_top
        ends up here, but master_tops has not been extended to support
        show_top in a completely local environment yet.  It's worth noting
        that originally this fn started with
            if 'external_nodes' not in opts: return {}
        So since external_nodes is gone now, we are just returning the
        empty dict.
        """
        return {}


class RemoteClient(Client):
    """
    Interact with the salt master file server.
    """

    def __init__(self, opts):
        Client.__init__(self, opts)
        self._closing = False
        self.channel = salt.channel.client.ReqChannel.factory(self.opts)
        if hasattr(self.channel, "auth"):
            self.auth = self.channel.auth
        else:
            self.auth = ""

    def _refresh_channel(self):
        """
        Reset the channel, in the event of an interruption
        """
        # Close the previous channel
        self.channel.close()
        # Instantiate a new one
        self.channel = salt.channel.client.ReqChannel.factory(self.opts)
        return self.channel

    def _channel_send(self, load, raw=False):
        start = time.monotonic()
        try:
            return self.channel.send(
                load,
                raw=raw,
            )
        except salt.exceptions.SaltReqTimeoutError:
            raise SaltClientError(
                f"File client timed out after {int(time.monotonic() - start)} seconds"
            )

    def destroy(self):
        if self._closing:
            return

        self._closing = True
        channel = None
        try:
            channel = self.channel
        except AttributeError:
            pass
        if channel is not None:
            channel.close()

    def get_file(
        self, path, dest="", makedirs=False, saltenv="base", gzip=None, cachedir=None
    ):
        """
        Get a single file from the salt-master
        path must be a salt server location, aka, salt://path/to/file, if
        dest is omitted, then the downloaded file will be placed in the minion
        cache
        """
        path, senv = salt.utils.url.split_env(path)
        if senv:
            saltenv = senv

        if not salt.utils.platform.is_windows():
            hash_server, stat_server = self.hash_and_stat_file(path, saltenv)
        else:
            hash_server = self.hash_file(path, saltenv)

        # Check if file exists on server, before creating files and
        # directories
        if hash_server == "":
            log.debug("Could not find file '%s' in saltenv '%s'", path, saltenv)
            return False

        # If dest is a directory, rewrite dest with filename
        if dest is not None and (os.path.isdir(dest) or dest.endswith(("/", "\\"))):
            dest = os.path.join(dest, os.path.basename(path))
            log.debug(
                "In saltenv '%s', '%s' is a directory. Changing dest to '%s'",
                saltenv,
                os.path.dirname(dest),
                dest,
            )

        # Hash compare local copy with master and skip download
        # if no difference found.
        dest2check = dest
        if not dest2check:
            rel_path = self._check_proto(path)

            log.debug(
                "In saltenv '%s', looking at rel_path '%s' to resolve '%s'",
                saltenv,
                rel_path,
                path,
            )
            with self._cache_loc(rel_path, saltenv, cachedir=cachedir) as cache_dest:
                dest2check = cache_dest

        log.debug(
            "In saltenv '%s', ** considering ** path '%s' to resolve '%s'",
            saltenv,
            dest2check,
            path,
        )

        if dest2check and os.path.isfile(dest2check):
            if not salt.utils.platform.is_windows():
                hash_local, stat_local = self.hash_and_stat_file(dest2check, saltenv)
            else:
                hash_local = self.hash_file(dest2check, saltenv)

            if hash_local == hash_server:
                return dest2check

        log.debug(
            "Fetching file from saltenv '%s', ** attempting ** '%s'", saltenv, path
        )
        d_tries = 0
        transport_tries = 0
        path = self._check_proto(path)
        load = {"path": path, "saltenv": saltenv, "cmd": "_serve_file"}
        if gzip:
            gzip = int(gzip)
            load["gzip"] = gzip

        fn_ = None
        if dest:
            destdir = os.path.dirname(dest)
            if not os.path.isdir(destdir):
                if makedirs:
                    try:
                        os.makedirs(destdir)
                    except OSError as exc:
                        if exc.errno != errno.EEXIST:  # ignore if it was there already
                            raise
                else:
                    return False
            # We need an open filehandle here, that's why we're not using a
            # with clause:
            # pylint: disable=resource-leakage
            fn_ = salt.utils.files.fopen(dest, "wb+")
            # pylint: enable=resource-leakage
        else:
            log.debug("No dest file found")

        while True:
            if not fn_:
                load["loc"] = 0
            else:
                load["loc"] = fn_.tell()
            data = self._channel_send(
                load,
                raw=True,
            )
            # Sometimes the source is local (eg when using
            # 'salt.fileserver.FSChan'), in which case the keys are
            # already strings. Sometimes the source is remote, in which
            # case the keys are bytes due to raw mode. Standardize on
            # strings for the top-level keys to simplify things.
            data = decode_dict_keys_to_str(data)
            try:
                if not data["data"]:
                    if not fn_ and data["dest"]:
                        # This is a 0 byte file on the master
                        with self._cache_loc(
                            data["dest"], saltenv, cachedir=cachedir
                        ) as cache_dest:
                            dest = cache_dest
                            with salt.utils.files.fopen(cache_dest, "wb+") as ofile:
                                ofile.write(data["data"])
                    if "hsum" in data and d_tries < 3:
                        # Master has prompted a file verification, if the
                        # verification fails, re-download the file. Try 3 times
                        d_tries += 1
                        hsum = salt.utils.hashutils.get_hash(
                            dest,
                            salt.utils.stringutils.to_str(
                                data.get("hash_type", DEFAULT_HASH_TYPE)
                            ),
                        )
                        if hsum != data["hsum"]:
                            log.warning(
                                "Bad download of file %s, attempt %d of 3",
                                path,
                                d_tries,
                            )
                            continue
                    break
                if not fn_:
                    with self._cache_loc(
                        data["dest"], saltenv, cachedir=cachedir
                    ) as cache_dest:
                        dest = cache_dest
                        # If a directory was formerly cached at this path, then
                        # remove it to avoid a traceback trying to write the file
                        if os.path.isdir(dest):
                            salt.utils.files.rm_rf(dest)
                        fn_ = salt.utils.atomicfile.atomic_open(dest, "wb+")
                if data.get("gzip", None):
                    data = salt.utils.gzip_util.uncompress(data["data"])
                else:
                    data = data["data"]
                if isinstance(data, str):
                    data = data.encode()
                fn_.write(data)
            except (TypeError, KeyError) as exc:
                try:
                    data_type = type(data).__name__
                except AttributeError:
                    # Shouldn't happen, but don't let this cause a traceback.
                    data_type = str(type(data))
                transport_tries += 1
                log.warning(
                    "Data transport is broken, got: %s, type: %s, "
                    "exception: %s, attempt %d of 3",
                    data,
                    data_type,
                    exc,
                    transport_tries,
                )
                self._refresh_channel()
                if transport_tries > 3:
                    log.error(
                        "Data transport is broken, got: %s, type: %s, "
                        "exception: %s, retry attempts exhausted",
                        data,
                        data_type,
                        exc,
                    )
                    break

        if fn_:
            fn_.close()
            log.info("Fetching file from saltenv '%s', ** done ** '%s'", saltenv, path)
        else:
            log.debug(
                "In saltenv '%s', we are ** missing ** the file '%s'", saltenv, path
            )

        return dest

    def file_list(self, saltenv="base", prefix=""):
        """
        List the files on the master
        """
        load = {"saltenv": saltenv, "prefix": prefix, "cmd": "_file_list"}
        return self._channel_send(
            load,
        )

    def file_list_emptydirs(self, saltenv="base", prefix=""):
        """
        List the empty dirs on the master
        """
        load = {"saltenv": saltenv, "prefix": prefix, "cmd": "_file_list_emptydirs"}
        return self._channel_send(
            load,
        )

    def dir_list(self, saltenv="base", prefix=""):
        """
        List the dirs on the master
        """
        load = {"saltenv": saltenv, "prefix": prefix, "cmd": "_dir_list"}
        return self._channel_send(
            load,
        )

    def symlink_list(self, saltenv="base", prefix=""):
        """
        List symlinked files and dirs on the master
        """
        load = {"saltenv": saltenv, "prefix": prefix, "cmd": "_symlink_list"}
        return self._channel_send(
            load,
        )

    def __hash_and_stat_file(self, path, saltenv="base"):
        """
        Common code for hashing and stating files
        """
        try:
            path = self._check_proto(path)
        except MinionError as err:
            if not os.path.isfile(path):
                log.warning(
                    "specified file %s is not present to generate hash: %s", path, err
                )
                return {}, None
            else:
                ret = {}
                hash_type = self.opts.get("hash_type", DEFAULT_HASH_TYPE)
                ret["hsum"] = salt.utils.hashutils.get_hash(path, form=hash_type)
                ret["hash_type"] = hash_type
                return ret
        load = {"path": path, "saltenv": saltenv, "cmd": "_file_hash"}
        return self._channel_send(
            load,
        )

    def hash_file(self, path, saltenv="base"):
        """
        Return the hash of a file, to get the hash of a file on the salt
        master file server prepend the path with salt://<file on server>
        otherwise, prepend the file with / for a local file.
        """
        return self.__hash_and_stat_file(path, saltenv)

    def hash_and_stat_file(self, path, saltenv="base"):
        """
        The same as hash_file, but also return the file's mode, or None if no
        mode data is present.
        """
        hash_result = self.hash_file(path, saltenv)
        try:
            path = self._check_proto(path)
        except MinionError as err:
            if not os.path.isfile(path):
                return hash_result, None
            else:
                try:
                    return hash_result, list(os.stat(path))
                except Exception:  # pylint: disable=broad-except
                    return hash_result, None
        load = {"path": path, "saltenv": saltenv, "cmd": "_file_find"}
        fnd = self._channel_send(
            load,
        )
        try:
            stat_result = fnd.get("stat")
        except AttributeError:
            stat_result = None
        return hash_result, stat_result

    def list_env(self, saltenv="base"):
        """
        Return a list of the files in the file server's specified environment
        """
        load = {"saltenv": saltenv, "cmd": "_file_list"}
        return self._channel_send(
            load,
        )

    def envs(self):
        """
        Return a list of available environments
        """
        load = {"cmd": "_file_envs"}
        return self._channel_send(
            load,
        )

    def master_opts(self):
        """
        Return the master opts data
        """
        load = {"cmd": "_master_opts"}
        return self._channel_send(
            load,
        )

    def master_tops(self):
        """
        Return the metadata derived from the master_tops system
        """
        load = {"cmd": "_master_tops", "id": self.opts["id"], "opts": self.opts}
        if self.auth:
            load["tok"] = self.auth.gen_token(b"salt")
        return self._channel_send(
            load,
        )

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.destroy()


class FSClient(RemoteClient):
    """
    A local client that uses the RemoteClient but substitutes the channel for
    the FSChan object
    """

    def __init__(self, opts):  # pylint: disable=W0231
        Client.__init__(self, opts)  # pylint: disable=W0233
        self._closing = False
        self.channel = salt.fileserver.FSChan(opts)
        self.auth = DumbAuth()


# Provide backward compatibility for anyone directly using LocalClient (but no
# one should be doing this).
LocalClient = FSClient


class DumbAuth:
    """
    The dumbauth class is used to stub out auth calls fired from the FSClient
    subsystem
    """

    def gen_token(self, clear_tok):
        return clear_tok


class ContextlessFileClient:
    def __init__(self, file_client):
        self.file_client = file_client

    def __getattr__(self, key):
        return getattr(self.file_client, key)

    def __exit__(self, *_):
        pass

    def __enter__(self):
        return self