HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.109.1.lve.el8.x86_64 #1 SMP Thu Mar 5 20:23:46 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.30
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //opt/saltstack/salt/lib/python3.10/site-packages/salt/transport/__pycache__/ipc.cpython-310.pyc
o

�N�gVl�@sdZddlZddlZddlZddlZddlZddlZddlZddl	Zddl
ZddlZddlZddl	m
Z
ddl	mZddlmZmZddlmZe�e�Zdd�ZGd	d
�d
ejjjj�ZGdd�d�ZGd
d�d�ZGdd�de�Z Gdd�de�Z!Gdd�d�Z"Gdd�de�Z#dS)z
IPC transport classes
�N)�IOLoop)�TimeoutError)�IOStream�StreamClosedError)�LockcCs|jdur
|j�|�dSdS�N)�_future_with_timeout�_done_callback)�future�r�F/opt/saltstack/salt/lib/python3.10/site-packages/salt/transport/ipc.py�future_with_timeout_callbacks
�r
cs,eZdZ�fdd�Zdd�Zdd�Z�ZS)�FutureWithTimeoutcs�t���||_||_|dur$|dkrd}|j�|j��||j�|_nd|_t|jd�r?||j_	|j�
�r=t|j�dSdS||j_	|j�t�dS)Ng�������?r)
�super�__init__�io_loop�_futureZadd_timeout�time�_timeout_callback�_timeout_handle�hasattrr�doner
�add_done_callback)�selfrr
�timeout��	__class__rrr#s"
�
�zFutureWithTimeout.__init__cCsd|_d|j_|�t��dSr)rrr�
set_exception�TornadoTimeoutError�rrrrr;sz#FutureWithTimeout._timeout_callbackc
Csdz|jdur|j�|j�d|_|�|���WdSty1}z|�|�WYd}~dSd}~wwr)rrZremove_timeout�
set_result�result�	Exceptionr)rr
�excrrrr	Ds
��z FutureWithTimeout._done_callback)�__name__�
__module__�__qualname__rrr	�
__classcell__rrrrr"s	rc@sjeZdZdZdgZdgZddd�Zdd�Zej	j
jjd	d
��Z
dd�Zd
d�Zdd�Zdd�Zdd�ZdS)�	IPCServerz�
    A Tornado IPC server very similar to Tornado's TCPServer class
    but using either UNIX domain sockets or TCP sockets
    �
handle_stream�closeNcCs8||_d|_||_d|_|ptjjjj�	�|_
d|_dS)a	
        Create a new Tornado IPC server

        :param str/int socket_path: Path on the filesystem for the
                                    socket to bind to. This socket does
                                    not need to exist prior to calling
                                    this method, but parent directories
                                    should.
                                    It may also be of type 'int', in
                                    which case it is used as the port
                                    for a tcp localhost connection.
        :param IOLoop io_loop: A Tornado ioloop to handle scheduling
        :param func payload_handler: A function to customize handling of
                                     incoming data.
        FN)�socket_path�_started�payload_handler�sock�salt�ext�tornado�ioloopr�currentr�_closing)rr+rr-rrrr\s
zIPCServer.__init__cC��t�d|j�t|jt�r6t�tjtj�|_|j�	tj
tjd�|j�d�|j�
d|jf�|j�d�n
tjjj�|j�|_tjj�|j��tjjj�|j|j�Wd�n1s^wYd|_dS)�y
        Perform the work necessary to start up a Tornado IPC server

        Blocks until socket is established
        z IPCServer: binding to socket: %s�r�	127.0.0.1�NT��log�tracer+�
isinstance�int�socket�AF_INET�SOCK_STREAMr.�
setsockopt�
SOL_SOCKET�SO_REUSEADDR�setblocking�bind�listenr/r0r1ZnetutilZbind_unix_socket�utils�asynchronous�current_iolooprZadd_accept_handler�handle_connectionr,rrrr�startu�
��
zIPCServer.startc
#s.�tjjjjdd����fdd�}tjjjdd�}|��s�z&|j	ddd	�V}|�
|�|D]}|d
}|j�|j
||||d��q-WnKtyTt�d|j�YdStyw}z|jd
krgt�d|�nt�d|�WYd}~nd}~wty�}zt�d|�WYd}~nd}~ww|��rdSdS)z�
        Override this to handle the streams as they arrive

        :param IOStream stream: An IOStream for processing

        See https://tornado.readthedocs.io/en/latest/iostream.html#tornado.iostream.IOStream
        for additional details.
        cSstjjj�d��r)r/r0r1�gen�Return)�msgrrr�_null�sz&IPCServer.handle_stream.<locals>._nullcs,��d�rtjjjj��fdd��}|S�S)N�midc3s.�tjjj|d�didd�}��|�VdS)NrRT)�header�raw_body)r/�	transport�frame�
frame_msg_ipc�write)rP�pack�rS�streamrr�return_message�s�
�zGIPCServer.handle_stream.<locals>.write_callback.<locals>.return_message)�getr/r0r1rN�	coroutine)r[rSr\�rQrZr�write_callback�s


z/IPCServer.handle_stream.<locals>.write_callbackF��raw�T��partial�body�head�Client disconnected from IPC %srz>Exception occurred with error number 0, spurious exception: %s�,Exception occurred while handling stream: %sN)r/r0r1rNr^rH�msgpack�Unpacker�closed�
read_bytes�feedr�spawn_callbackr-rr;r<r+�OSError�errno�errorr")rr[r`�unpacker�
wire_bytes�
framed_msgrfr#rr_rr)�sB�


��
�����zIPCServer.handle_streamc
Cs�t�d|r|n|�z'tjj�|j��t|�}Wd�n1s"wY|j�|j	|�WdSt
yJ}z
t�d|�WYd}~dSd}~ww)N�-IPCServer: Handling connection to address: %s�IPC streaming error: %s)r;r<r/rHrIrJrrror)r"rr)r�
connection�addressr[r#rrrrK�s
�����zIPCServer.handle_connectioncCs.|jrdSd|_t|jd�r|j��dSdS���
        Routines to handle any cleanup before the instance shuts down.
        Sockets and filehandles should be closed explicitly, to prevent
        leaks.
        NTr*)r4rr.r*rrrrr*�s�zIPCServer.closecC�$z|��WdStyYdSwr�r*�	TypeErrorrrrr�__del__��
�zIPCServer.__del__cC�|Srrrrrr�	__enter__��zIPCServer.__enter__cG�|��dSr�r*�r�argsrrr�__exit__��zIPCServer.__exit__�NN)r$r%r&�__doc__�
async_methods�
close_methodsrrLr/r0r1rNr^r)rKr*rr�r�rrrrr(Os��


;

r(c@sbeZdZdZddd�Zdd�Zddd�Zejj	j
jdd	d
��Zdd�Z
d
d�Zdd�Zdd�ZdS)�	IPCClienta�
    A Tornado IPC client very similar to Tornado's TCPClient class
    but using either UNIX domain sockets or TCP sockets

    This was written because Tornado does not have its own IPC
    server/client implementation.

    :param IOLoop io_loop: A Tornado ioloop to handle scheduling
    :param str/int socket_path: A path on the filesystem where a socket
                                belonging to a running IPCServer can be
                                found.
                                It may also be of type 'int', in which
                                case it is used as the port for a tcp
                                localhost connection.
    NcCsD|p	tjjjj��|_||_d|_d|_	tj
jjdd�|_
d|_dS)z�
        Create a new IPC client

        IPC clients cannot bind to ports, but must connect to
        existing IPC servers. Clients can then send messages
        to the server.

        FNra)r/r0r1r2rr3rr+r4r[rHrjrkrs�_connecting_future�rr+rrrrrs	
zIPCClient.__init__cCs|jduo
|j��Sr)r[rlrrrr�	connectedszIPCClient.connectedcsr�jdur�j��s�j}n�jdur�j��tjjj��}|�_��|��dur7��fdd�}|�	|�|S)z+
        Connect to the IPC socket
        Ncs|��}�j��|�dSr)r!rZadd_callback)r
�response��callbackrrr�
handle_future*sz(IPCClient.connect.<locals>.handle_future)
r�r�	exceptionr/r0r1�
concurrent�Future�_connectr)rr�rr
r�rr�r�connects



zIPCClient.connectc
cs`�t|jt�rtj}d|jf}ntj}|j}d|_|dur#t��|}	|jr)dS|jdurPt	j
j�|j
��tt�|tj��|_Wd�n1sKwYzt�d|j�|j�|�V|j�d�WdSty�}z;|j��rwd|_|dus�t��|kr�|jdur�|j��d|_|j�|�WYd}~dSt	jjj�d�VWYd}~nd}~wwq$)z0
        Connect to a running IPCServer
        r8NTz#IPCClient: Connecting to socket: %sr7)r=r+r>r?r@�AF_UNIXr[rr4r/rHrIrJrrrAr;r<r�r�r r"rlr*rr0r1rN�sleep)rrZ	sock_typeZ	sock_addrZ
timeout_at�errrr�2sD�
�


���zIPCClient._connectc
Cs�|jrdSd|_d|_t�d|jj�|jdur@|j��sBz|j��WdSt	y?}z|j
t
jkr4�WYd}~dSd}~wwdSdS)r{NTzClosing %s instance)r4r�r;�debugrr$r[rlr*rprq�EBADF�rr#rrrr*[s����zIPCClient.closecCr|rr}rrrrrrr�zIPCClient.__del__cCr�rrrrrrr�|r�zIPCClient.__enter__cGr�rr�r�rrrr�r�zIPCClient.__exit__rr�)r$r%r&r�rr�r�r/r0r1rNr^r�r*rr�r�rrrrr��s


(
r�c@s4eZdZdZgd�ZdgZejjj	j
ddd��ZdS)�IPCMessageClienta�
    Salt IPC message client

    Create an IPC client to send messages to an IPC server

    An example of a very simple IPCMessageClient connecting to an IPCServer. This
    example assumes an already running IPCMessage server.

    IMPORTANT: The below example also assumes a running IOLoop process.

    # Import Tornado libs
    import salt.ext.tornado.ioloop

    # Import Salt libs
    import salt.config
    import salt.transport.ipc

    io_loop = salt.ext.tornado.ioloop.IOLoop.current()

    ipc_server_socket_path = '/var/run/ipc_server.ipc'

    ipc_client = salt.transport.ipc.IPCMessageClient(ipc_server_socket_path, io_loop=io_loop)

    # Connect to the server
    ipc_client.connect()

    # Send some data
    ipc_client.send('Hello world')
    )�sendr�r�r*Nccs8�|��s
|��Vtjjj|dd�}|j�|�VdS)a

        Send a message to an IPC socket

        If the socket is not currently connected, a connection will be established.

        :param dict msg: The message to be sent
        :param int timeout: Timeout when sending message (Currently unimplemented)
        T�rTN)r�r�r/rUrVrWr[rX)rrPrZtriesrYrrrr��s
�

zIPCMessageClient.sendr�)r$r%r&r�r�r�r/r0r1rNr^r�rrrrr��s�
r�c@seZdZdZdS)�IPCMessageServera
    Salt IPC message server

    Creates a message server which can create and bind to a socket on a given
    path and then respond to messages asynchronously.

    An example of a very simple IPCServer which prints received messages to
    a console:

        # Import Tornado libs
        import salt.ext.tornado.ioloop

        # Import Salt libs
        import salt.transport.ipc

        io_loop = salt.ext.tornado.ioloop.IOLoop.current()
        ipc_server_socket_path = '/var/run/ipc_server.ipc'
        ipc_server = salt.transport.ipc.IPCMessageServer(ipc_server_socket_path, io_loop=io_loop,
                                                         payload_handler=print_to_console)
        # Bind to the socket and prepare to run
        ipc_server.start()

        # Start the server
        io_loop.start()

        # This callback is run whenever a message is received
        def print_to_console(payload):
            print(payload)

    See IPCMessageClient() for an example of sending messages to an IPCMessageServer instance
    N)r$r%r&r�rrrrr��sr�c@s^eZdZdZddd�Zdd�Zejjj	j
dd��Zd	d
�Zdd�Z
d
d�Zdd�Zdd�ZdS)�IPCMessagePublisherz~
    A Tornado IPC Publisher similar to Tornado's TCPServer class
    but using either UNIX domain sockets or TCP sockets
    NcCs8||_||_d|_d|_|pt��|_d|_t�|_	dS)a�
        Create a new Tornado IPC server
        :param dict opts: Salt options
        :param str/int socket_path: Path on the filesystem for the
                                    socket to bind to. This socket does
                                    not need to exist prior to calling
                                    this method, but parent directories
                                    should.
                                    It may also be of type 'int', in
                                    which case it is used as the port
                                    for a tcp localhost connection.
        :param IOLoop io_loop: A Tornado ioloop to handle scheduling
        FN)
�optsr+r,r.rr3rr4�set�streams)rr�r+rrrrr�szIPCMessagePublisher.__init__cCr5)r6z*IPCMessagePublisher: binding to socket: %sr7rr8r9NTr:rrrrrL�rMzIPCMessagePublisher.startc
cs��z	|�|�VWdSty!t�d|j�|j�|�YdStyG}zt�d|�|�	�s6|�
�|j�|�WYd}~dSd}~ww)Nrhri)rXrr;r<r+r��discardr"rrrlr*)rr[rYr#rrr�_writes���zIPCMessagePublisher._writecCs>|jsdStjjj|dd�}|jD]}|j�|j||�qdS)z7
        Send message to all connected sockets
        NTr�)r�r/rUrVrWrror�)rrPrYr[rrr�publish#s
�zIPCMessagePublisher.publishc
s�t�d|�zNi}�jddkr �jd|d<t�d�jd�tjj��j��t|fi|���Wd�n1s;wY�j	�
����fdd�}��|�WdStym}z
t�
d|�WYd}~dSd}~ww)	NrvZipc_write_bufferrZmax_write_buffer_sizez'Setting IPC connection write buffer: %scs�j���dSr)r�r�r�rr[rr�discard_after_closed<szCIPCMessagePublisher.handle_connection.<locals>.discard_after_closedrw)r;r<r�r/rHrIrJrrr��addZset_close_callbackr"rr)rrxry�kwargsr�r#rr�rrK.s&����z%IPCMessagePublisher.handle_connectioncCsL|jrdSd|_|jD]}|��q|j��t|jd�r$|j��dSdSrz)r4r�r*�clearrr.r�rrrr*Cs


�zIPCMessagePublisher.closecCr�rrrrrrr�Rr�zIPCMessagePublisher.__enter__cGr�rr�r�rrrr�Ur�zIPCMessagePublisher.__exit__r)r$r%r&r�rrLr/r0r1rNr^r�r�rKr*r�r�rrrrr��s


r�cs�eZdZdZddgZdgZd�fdd�	Zejj	j
jddd	��Zejj	j
jd
d��Z
ddd
�Zejj	j
jdd��Z�fdd�Z�ZS)�IPCMessageSubscribera�
    Salt IPC message subscriber

    Create an IPC client to receive messages from IPC publisher

    An example of a very simple IPCMessageSubscriber connecting to an IPCMessagePublisher.
    This example assumes an already running IPCMessagePublisher.

    IMPORTANT: The below example also assumes the IOLoop is NOT running.

    # Import Tornado libs
    import salt.ext.tornado.ioloop

    # Import Salt libs
    import salt.config
    import salt.transport.ipc

    # Create a new IO Loop.
    # We know that this new IO Loop is not currently running.
    io_loop = salt.ext.tornado.ioloop.IOLoop()

    ipc_publisher_socket_path = '/var/run/ipc_publisher.ipc'

    ipc_subscriber = salt.transport.ipc.IPCMessageSubscriber(ipc_server_socket_path, io_loop=io_loop)

    # Connect to the server
    # Use the associated IO Loop that isn't running.
    io_loop.run_sync(ipc_subscriber.connect)

    # Wait for some data
    package = ipc_subscriber.read_sync()
    �readr�r*Ncs(t�j||d�d|_g|_t�|_dS)N)r)rr�_read_stream_future�_saved_datar�_read_in_progressr�rrrr�szIPCMessageSubscriber.__init__c	
cs��z�z
|jjdd�VWntjjjjytjjj�d��wd}d}zW	|jdur4|j	j
ddd�|_|dur=|jV}n	t|j|j|�V}d|_d}|j
�|�d}|j
D]}|rd|j�||d�qV|rm|d}d}qV|j�|d�qV|synq&WnBty�d}Yn8ty�}zt�d|j�d|_WYd}~n!d}~wty�}zt�d	|�d|_|}WYd}~nd}~ww|j��|dur�|�tjjj�|��ty�YdSw)
Ng:�0�yE>�rTrcrdrfFz#Subscriber disconnected from IPC %sz:Exception occurred in Subscriber while handling stream: %s)r��acquirer/r0r1rNrrOr�r[rmrrrsrnror��appendrrr;r<r+r"rr�releaser~)	rrr�Zexc_to_raise�retrtZfirst_sync_msgrur#rrr�_read�sp��
�

�
�����
�zIPCMessageSubscriber._readc
cs��|jr|j�d�}tjjj�|��|��s]z	|jdd�VWn9t	y8t
�d|j�tjjj�
d�VYn!tyX}zt
�d|�tjjj�
d�VWYd}~nd}~ww|��r|�|�V}tjjj�|��)z�
        Asynchronously read messages and invoke a callback when they are ready.
        :param callback: A callback with the received data
        r�r��1Subscriber closed stream on IPC %s before connectr7�2Exception occurred while Subscriber connecting: %sN)r��popr/r0r1rNrOr�r�rr;r<r+r�r"rrr�)rr�resr#rrrr��s*�����zIPCMessageSubscriber.readcs(�jr	�j�d�S�j���fdd��S)aZ
        Read a message from an IPC socket

        The socket must already be connected.
        The associated IO Loop must NOT be running.
        :param int timeout: Timeout when receiving message
        :return: message data if successful. None if timed out. Will raise an
                 exception for all other error conditions.
        rcs
����Sr)r�r�rrrr�<lambda>�s
z0IPCMessageSubscriber.read_sync.<locals>.<lambda>)r�r�rZrun_syncr�rr�r�	read_sync�s
zIPCMessageSubscriber.read_syncc
cs��|��sLz	|jdd�VWn9ty't�d|j�tjjj	�
d�VYn!tyG}zt�d|�tjjj	�
d�VWYd}~nd}~ww|��r|�
d|�VdS)z�
        Asynchronously read messages and invoke a callback when they are ready.

        :param callback: A callback with the received data
        r�r�r�r7r�N)r�r�rr;r<r+r/r0r1rNr�r"rrr�)rr�r#rrr�
read_async�s"�����zIPCMessageSubscriber.read_asynccs`|jrdSt���|jdur(|j��r*|j��}|r,t|t�s.t�	d|�dSdSdSdSdS)r{Nz!Read future returned exception %r)
r4rr*r�rr�r=rr;rrr�rrrr*s

�zIPCMessageSubscriber.closer)r$r%r&r�r�r�rr/r0r1rNr^r�r�r�r�r*r'rrrrr�Ys "��

A



r�)$r�rq�loggingr?rZsalt.ext.tornador/Zsalt.ext.tornado.concurrentZsalt.ext.tornado.genZsalt.ext.tornado.ioloopZsalt.ext.tornado.netutilZsalt.transport.frameZsalt.utils.msgpackrrrZsalt.ext.tornado.iostreamrrZsalt.ext.tornado.locksr�	getLoggerr$r;r
r0r1r�r�rr(r�r�r�r�r�rrrr�<module>s6
-(:"z