HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.109.1.lve.el8.x86_64 #1 SMP Thu Mar 5 20:23:46 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.30
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //proc/self/root/opt/saltstack/salt/lib/python3.10/site-packages/salt/pillar/azureblob.py
"""
Use Azure Blob as a Pillar source.

.. versionadded:: 3001

:maintainer: <devops@eitr.tech>
:maturity: new
:depends:
    * `azure-storage-blob <https://pypi.org/project/azure-storage-blob/>`_ >= 12.0.0

The Azure Blob ext_pillar can be configured with the following parameters:

.. code-block:: yaml

    ext_pillar:
      - azureblob:
          container: 'test_container'
          connection_string: 'connection_string'
          multiple_env: False
          environment: 'base'
          blob_cache_expire: 30
          blob_sync_on_update: True

:param container: The name of the target Azure Blob Container.

:param connection_string: The connection string to use to access the specified Azure Blob Container.

:param multiple_env: Specifies whether the pillar should interpret top level folders as pillar environments.
    Defaults to false.

:param environment: Specifies which environment the container represents when in single environment mode. Defaults
    to 'base' and is ignored if multiple_env is set as True.

:param blob_cache_expire: Specifies expiration time of the Azure Blob metadata cache file. Defaults to 30s.

:param blob_sync_on_update: Specifies if the cache is synced on update. Defaults to True.

"""

import logging
import os
import pickle
import time
from copy import deepcopy

import salt.utils.files
import salt.utils.hashutils
from salt.pillar import Pillar

HAS_LIBS = False
try:
    # pylint: disable=no-name-in-module
    from azure.storage.blob import BlobServiceClient

    # pylint: enable=no-name-in-module
    HAS_LIBS = True
except ImportError:
    pass


__virtualname__ = "azureblob"

# Set up logging
log = logging.getLogger(__name__)


def __virtual__():
    if not HAS_LIBS:
        return (
            False,
            "The following dependency is required to use the Azure Blob ext_pillar: "
            "Microsoft Azure Storage Blob >= 12.0.0 ",
        )

    return __virtualname__


def ext_pillar(
    minion_id,
    pillar,  # pylint: disable=W0613
    container,
    connection_string,
    multiple_env=False,
    environment="base",
    blob_cache_expire=30,
    blob_sync_on_update=True,
):
    """
    Execute a command and read the output as YAML.

    :param container: The name of the target Azure Blob Container.

    :param connection_string: The connection string to use to access the specified Azure Blob Container.

    :param multiple_env: Specifies whether the pillar should interpret top level folders as pillar environments.
        Defaults to false.

    :param environment: Specifies which environment the container represents when in single environment mode. Defaults
        to 'base' and is ignored if multiple_env is set as True.

    :param blob_cache_expire: Specifies expiration time of the Azure Blob metadata cache file. Defaults to 30s.

    :param blob_sync_on_update: Specifies if the cache is synced on update. Defaults to True.

    """
    # normpath is needed to remove appended '/' if root is empty string.
    pillar_dir = os.path.normpath(
        os.path.join(_get_cache_dir(), environment, container)
    )

    if __opts__["pillar_roots"].get(environment, []) == [pillar_dir]:
        return {}

    metadata = _init(
        connection_string, container, multiple_env, environment, blob_cache_expire
    )

    log.debug("Blob metadata: %s", metadata)

    if blob_sync_on_update:
        # sync the containers to the local cache
        log.info("Syncing local pillar cache from Azure Blob...")
        for saltenv, env_meta in metadata.items():
            for container, files in _find_files(env_meta).items():
                for file_path in files:
                    cached_file_path = _get_cached_file_name(
                        container, saltenv, file_path
                    )
                    log.info("%s - %s : %s", container, saltenv, file_path)
                    # load the file from Azure Blob if not in the cache or too old
                    _get_file_from_blob(
                        connection_string,
                        metadata,
                        saltenv,
                        container,
                        file_path,
                        cached_file_path,
                    )

        log.info("Sync local pillar cache from Azure Blob completed.")

    opts = deepcopy(__opts__)
    opts["pillar_roots"][environment] = (
        [os.path.join(pillar_dir, environment)] if multiple_env else [pillar_dir]
    )

    # Avoid recursively re-adding this same pillar
    opts["ext_pillar"] = [x for x in opts["ext_pillar"] if "azureblob" not in x]

    pil = Pillar(opts, __grains__, minion_id, environment)

    compiled_pillar = pil.compile_pillar(ext=False)

    return compiled_pillar


def _init(connection_string, container, multiple_env, environment, blob_cache_expire):
    """
    .. versionadded:: 3001

    Connect to Blob Storage and download the metadata for each file in all containers specified and
        cache the data to disk.

    :param connection_string: The connection string to use to access the specified Azure Blob Container.

    :param container: The name of the target Azure Blob Container.

    :param multiple_env: Specifies whether the pillar should interpret top level folders as pillar environments.
        Defaults to false.

    :param environment: Specifies which environment the container represents when in single environment mode. Defaults
        to 'base' and is ignored if multiple_env is set as True.

    :param blob_cache_expire: Specifies expiration time of the Azure Blob metadata cache file. Defaults to 30s.

    """
    cache_file = _get_containers_cache_filename(container)
    exp = time.time() - blob_cache_expire

    # Check if cache_file exists and its mtime
    if os.path.isfile(cache_file):
        cache_file_mtime = os.path.getmtime(cache_file)
    else:
        # If the file does not exist then set mtime to 0 (aka epoch)
        cache_file_mtime = 0

    expired = cache_file_mtime <= exp

    log.debug(
        "Blob storage container cache file %s is %sexpired, mtime_diff=%ss,"
        " expiration=%ss",
        cache_file,
        "" if expired else "not ",
        cache_file_mtime - exp,
        blob_cache_expire,
    )

    if expired:
        pillars = _refresh_containers_cache_file(
            connection_string, container, cache_file, multiple_env, environment
        )
    else:
        pillars = _read_containers_cache_file(cache_file)

    log.debug("Blob container retrieved pillars %s", pillars)

    return pillars


def _get_cache_dir():
    """
    .. versionadded:: 3001

    Get pillar cache directory. Initialize it if it does not exist.

    """
    cache_dir = os.path.join(__opts__["cachedir"], "pillar_azureblob")

    if not os.path.isdir(cache_dir):
        log.debug("Initializing Azure Blob Pillar Cache")
        os.makedirs(cache_dir)

    return cache_dir


def _get_cached_file_name(container, saltenv, path):
    """
    .. versionadded:: 3001

    Return the cached file name for a container path file.

    :param container: The name of the target Azure Blob Container.

    :param saltenv: Specifies which environment the container represents.

    :param path: The path of the file in the container.

    """
    file_path = os.path.join(_get_cache_dir(), saltenv, container, path)

    # make sure container and saltenv directories exist
    if not os.path.exists(os.path.dirname(file_path)):
        os.makedirs(os.path.dirname(file_path))

    return file_path


def _get_containers_cache_filename(container):
    """
    .. versionadded:: 3001

    Return the filename of the cache for container contents. Create the path if it does not exist.

    :param container: The name of the target Azure Blob Container.

    """
    cache_dir = _get_cache_dir()
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir)

    return os.path.join(cache_dir, f"{container}-files.cache")


def _refresh_containers_cache_file(
    connection_string, container, cache_file, multiple_env=False, environment="base"
):
    """
    .. versionadded:: 3001

    Downloads the entire contents of an Azure storage container to the local filesystem.

    :param connection_string: The connection string to use to access the specified Azure Blob Container.

    :param container: The name of the target Azure Blob Container.

    :param cache_file: The path of where the file will be cached.

    :param multiple_env: Specifies whether the pillar should interpret top level folders as pillar environments.

    :param environment: Specifies which environment the container represents when in single environment mode. This is
        ignored if multiple_env is set as True.

    """
    try:
        # Create the BlobServiceClient object which will be used to create a container client
        blob_service_client = BlobServiceClient.from_connection_string(
            connection_string
        )

        # Create the ContainerClient object
        container_client = blob_service_client.get_container_client(container)
    except Exception as exc:  # pylint: disable=broad-except
        log.error("Exception: %s", exc)
        return False

    metadata = {}

    def _walk_blobs(saltenv="base", prefix=None):
        # Walk the blobs in the container with a generator
        blob_list = container_client.walk_blobs(name_starts_with=prefix)

        # Iterate over the generator
        while True:
            try:
                blob = next(blob_list)
            except StopIteration:
                break

            log.debug("Raw blob attributes: %s", blob)

            # Directories end with "/".
            if blob.name.endswith("/"):
                # Recurse into the directory
                _walk_blobs(prefix=blob.name)
                continue

            if multiple_env:
                saltenv = "base" if (not prefix or prefix == ".") else prefix[:-1]

            if saltenv not in metadata:
                metadata[saltenv] = {}

            if container not in metadata[saltenv]:
                metadata[saltenv][container] = []

            metadata[saltenv][container].append(blob)

    _walk_blobs(saltenv=environment)

    # write the metadata to disk
    if os.path.isfile(cache_file):
        os.remove(cache_file)

    log.debug("Writing Azure blobs pillar cache file")

    with salt.utils.files.fopen(cache_file, "wb") as fp_:
        pickle.dump(metadata, fp_)

    return metadata


def _read_containers_cache_file(cache_file):
    """
    .. versionadded:: 3001

    Return the contents of the containers cache file.

    :param cache_file: The path for where the file will be cached.

    """
    log.debug("Reading containers cache file")

    with salt.utils.files.fopen(cache_file, "rb") as fp_:
        data = pickle.load(fp_)

    return data


def _find_files(metadata):
    """
    .. versionadded:: 3001

    Looks for all the files in the Azure Blob container cache metadata.

    :param metadata: The metadata for the container files.

    """
    ret = {}

    for container, data in metadata.items():
        if container not in ret:
            ret[container] = []

        # grab the paths from the metadata
        file_paths = [k["name"] for k in data]
        # filter out the dirs
        ret[container] += [k for k in file_paths if not k.endswith("/")]

    return ret


def _find_file_meta(metadata, container, saltenv, path):
    """
    .. versionadded:: 3001

    Looks for a file's metadata in the Azure Blob Container cache file.

    :param metadata: The metadata for the container files.

    :param container: The name of the target Azure Blob Container.

    :param saltenv: Specifies which environment the container represents.

    :param path: The path of the file in the container.

    """
    env_meta = metadata[saltenv] if saltenv in metadata else {}
    container_meta = env_meta[container] if container in env_meta else {}

    for item_meta in container_meta:
        item_meta = dict(item_meta)
        if "name" in item_meta and item_meta["name"] == path:
            return item_meta


def _get_file_from_blob(
    connection_string, metadata, saltenv, container, path, cached_file_path
):
    """
    .. versionadded:: 3001

    Downloads the entire contents of an Azure storage container to the local filesystem.

    :param connection_string: The connection string to use to access the specified Azure Blob Container.

    :param metadata: The metadata for the container files.

    :param saltenv: Specifies which environment the container represents when in single environment mode. This is
        ignored if multiple_env is set as True.

    :param container: The name of the target Azure Blob Container.

    :param path: The path of the file in the container.

    :param cached_file_path: The path of where the file will be cached.

    """
    # check the local cache...
    if os.path.isfile(cached_file_path):
        file_meta = _find_file_meta(metadata, container, saltenv, path)
        file_md5 = (
            "".join(list(filter(str.isalnum, file_meta["etag"]))) if file_meta else None
        )

        cached_md5 = salt.utils.hashutils.get_hash(cached_file_path, "md5")

        # hashes match we have a cache hit
        log.debug(
            "Cached file: path=%s, md5=%s, etag=%s",
            cached_file_path,
            cached_md5,
            file_md5,
        )
        if cached_md5 == file_md5:
            return

    try:
        # Create the BlobServiceClient object which will be used to create a container client
        blob_service_client = BlobServiceClient.from_connection_string(
            connection_string
        )

        # Create the ContainerClient object
        container_client = blob_service_client.get_container_client(container)

        # Create the BlobClient object
        blob_client = container_client.get_blob_client(path)
    except Exception as exc:  # pylint: disable=broad-except
        log.error("Exception: %s", exc)
        return False

    with salt.utils.files.fopen(cached_file_path, "wb") as outfile:
        outfile.write(blob_client.download_blob().readall())

    return