HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.104.1.lve.el8.x86_64 #1 SMP Tue Feb 10 20:07:30 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.29
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //opt/saltstack/salt/lib/python3.10/site-packages/salt/utils/openstack/pyrax/queues.py
import logging

# pylint: disable=3rd-party-module-not-gated
import pyrax
import pyrax.exceptions  # pylint: disable=no-name-in-module

# Import salt classes
from salt.utils.openstack.pyrax import authenticate

log = logging.getLogger(__name__)


# pylint: enable=3rd-party-module-not-gated


class RackspaceQueues:
    def __init__(self, username, password, region, **kwargs):
        self.auth = authenticate.Authenticate(username, password, region, **kwargs)
        self.conn = self.auth.conn.queues

    def create(self, qname):
        """
        Create RackSpace Queue.
        """
        try:
            if self.exists(qname):
                log.error('Queues "%s" already exists. Nothing done.', qname)
                return True

            self.conn.create(qname)

            return True
        except pyrax.exceptions as err_msg:
            log.error("RackSpace API got some problems during creation: %s", err_msg)
        return False

    def delete(self, qname):
        """
        Delete an existings RackSpace Queue.
        """
        try:
            q = self.exists(qname)
            if not q:
                return False
            queue = self.show(qname)
            if queue:
                queue.delete()
        except pyrax.exceptions as err_msg:
            log.error("RackSpace API got some problems during deletion: %s", err_msg)
            return False

        return True

    def exists(self, qname):
        """
        Check to see if a Queue exists.
        """
        try:
            # First if not exists() -> exit
            if self.conn.queue_exists(qname):
                return True
            return False
        except pyrax.exceptions as err_msg:
            log.error(
                "RackSpace API got some problems during existing queue check: %s",
                err_msg,
            )
        return False

    def show(self, qname):
        """
        Show information about Queue
        """
        try:
            # First if not exists() -> exit
            if not self.conn.queue_exists(qname):
                return {}
            # If exist, search the queue to return the Queue Object
            for queue in self.conn.list():
                if queue.name == qname:
                    return queue
        except pyrax.exceptions as err_msg:
            log.error(
                "RackSpace API got some problems during existing queue check: %s",
                err_msg,
            )
        return {}