HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.109.1.lve.el8.x86_64 #1 SMP Thu Mar 5 20:23:46 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.30
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //opt/saltstack/salt/lib/python3.10/site-packages/salt/transport/__pycache__/zeromq.cpython-310.pyc
o

�N�g3��@s�dZddlZddlZddlZddlZddlZddlZddlZddlm	Z	ddl
ZddlZddl
ZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlmZddlmZmZddlmZm Z mZzddl!ZdZ"Wne#y�dZ"Ynwe�$e%�Z&dd	d
�Z'Gdd�dej(j)j*�Z*Gd
d�dej(j)j+�Z,dd�Z-Gdd�d�Z.Gdd�d�Z/Gdd�dej(j)j0�Z1Gdd�dej(j)j2�Z2dS)z
Zeromq transport classes
�N)�randint)�	ipaddress)�
SaltException�SaltReqTimeoutError)�LIBZMQ_VERSION_INFO�ZMQ_VERSION_INFO�zmqTFcCs�ddlm}dj||�|d�}|s|rntdkr_tdkr_|r.|r.dj||�|||�|d�}|S|r@|s@d	j||�||�|d
�}|S|r]|s]t�|�jdkrNdn|d
�}dj||||�|d�}|St�	d�t�	d�t�	d�|S)aR
    Return the ZeroMQ URI to connect the Minion to the Master.
    It supports different source IP / port, given the ZeroMQ syntax:
    // Connecting using a IP address and bind to an IP address
    rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
    Source: http://api.zeromq.org/4-1:zmq-tcp
    r)�
ip_bracketztcp://{master_ip}:{master_port})�	master_ip�master_port)���)�rr
z9tcp://{source_ip}:{source_port};{master_ip}:{master_port})�	source_ip�source_portr
rz-tcp://{source_ip}:0;{master_ip}:{master_port})rr
rrz0.0.0.0z::z6tcp://{ip_any}:{source_port};{master_ip}:{master_port})�ip_anyrr
rzAUnable to connect to the Master using a specific source IP / portz9Consider upgrading to pyzmq >= 16.0.1 and libzmq >= 4.1.6zVSpecific source IP / port for connecting to master returner port: configuraion ignored)
Zsalt.utils.networkr	�formatrrr�
ip_address�version�log�warning)r
rrrr	�
master_urir�r�I/opt/saltstack/salt/lib/python3.10/site-packages/salt/transport/zeromq.py�_get_master_uri,sV���&���������
�rcs�eZdZdZdZ�fdd�Zdd�Zdd�Zd	d
�Ze	j
jjj
ddd
��Zedd��Ze	j
jjj
dd��Zedd��Zdd�Ze	j
jjj
dd��Z�ZS)�
PublishClientzw
    A transport channel backed by ZeroMQ for a Salt Publisher to use to
    publish commands to connected minions
    �zeromqcsvt�j||fi|��||_||_t�tjj�	|jd���
�|_d|_t
��|_|j�t
j�|_|jdr^|j�t
jd�|j�d�dkrO|j�t
jd�n|j�t
jtjj�	|j��n|j�t
jd�|j�t
jtjj�	|jd��tt
d	�r�|j�t
j|jd
�|j�t
j|jd�|j�t
j|jd�|j�t
j|jd
�|jd}|jdr�t|jd|jd|jd�}t�d|jd|jd|jd|�t�d|�|j�t
j|�tt
d��rt�d|jd|jd�|j�t
j |jd�|jddu�sd|jdv�rtt
d��r|j�t
j!d�t"�r7|jd�r9t#|j�|_$|j$�%|j�dSdSdS)N�idF�
zmq_filtering�	broadcast�__role�syndic�syndic��
TCP_KEEPALIVE�
tcp_keepalive�tcp_keepalive_idle�tcp_keepalive_cnt�tcp_keepalive_intvlZ
recon_defaultZrecon_randomizeZ	recon_maxz?Generated random reconnect delay between '%sms' and '%sms' (%s)z#Setting zmq_reconnect_ivl to '%sms'�RECONNECT_IVL_MAXz'Setting zmq_reconnect_ivl_max to '%sms'�ipv6T�:r
�IPV4ONLYr�zmq_monitor)&�super�__init__�opts�io_loop�hashlib�sha1�salt�utils�stringutils�to_bytes�	hexdigest�hexid�_closingr�Context�context�socketZSUB�_socket�
setsockoptZ	SUBSCRIBE�getZIDENTITY�hasattrr%�TCP_KEEPALIVE_IDLE�TCP_KEEPALIVE_CNT�TCP_KEEPALIVE_INTVLrr�debugZ
RECONNECT_IVLr*r-�HAS_ZMQ_MONITOR�ZeroMQSocketMonitor�_monitor�
start_io_loop)�selfr1r2�kwargsZrecon_delay��	__class__rrr0psz��

��
���

���"��zPublishClient.__init__cCs�|jdurdSd|_t|d�r|jdur|j��d|_t|d�r(|j�d�nt|d�r3|j�d�t|d�rE|jjdurG|j�	�dSdSdS)NTrI�_streamrr?r=F)
r;rBrI�stoprO�closer?r=�closed�term�rKrrrrQ�s



�zPublishClient.closecC�|S�NrrTrrr�	__enter__��zPublishClient.__enter__cC�|��dSrV�rQ�rK�exc_typeZexc_valZexc_tbrrr�__exit__��zPublishClient.__exit__NcCsPd|_||_t�d|j�t�d||j�|j�|j�|dur&|d�dSdS)NTzCConnecting the Minion to the Master publish port, using the URI: %sz%r connecting to %s)�_connect_called�publish_portrrF�
master_pubr?�connect)rKr`Zconnect_callbackZdisconnect_callbackrrrrb�s��zPublishClient.connectcCs(t|jd|j|j�d�|j�d�d�S)z0
        Return the master publish port
        r
rZsource_publish_port�rr)rr1r`rArTrrrra�s

�zPublishClient.master_pubcCs�t|�}|dkrtj�|d�}nH|dkrPtjj�|d�}|j�d�dkr-|d|j	fvs9|j�d�dkrG|dvrGt
�d|�tjj
j�d	��tj�|d�}n	td
�t|����tjj
j�|��)z�
        Take the zmq messages, decrypt/decode them into a payload

        :param list messages: A list of messages to be decoded
        r
r�r!r"�	broadcast)rer"z(Publish received for not this minion: %sNz@Invalid number of messages ({}) in zeromq pubmessage from master)�lenr5�payload�loadsr6r7Zto_strr1rAr:rrF�ext�tornado�gen�Return�	Exceptionr)rK�messagesZmessages_lenrgZmessage_targetrrr�_decode_messages�s$��zPublishClient._decode_messagescCs(t|d�stjjj|j|jd�|_|jS)zI
        Return the current zmqstream, creating one if necessary
        rO�r2)rBr�	eventloop�	zmqstream�	ZMQStreamr?r2rOrTrrr�streams

�zPublishClient.streamc
CsLz|j�|�WSty%}z|dur t|�dkr WYd}~dS�d}~ww)z�
        Register a callback for received messages (that we didn't initiate)

        :param func callback: A function which should be called when data is received
        NzStream is closed)rt�on_recv�OSError�str)rK�callback�excrrrrus��zPublishClient.on_recvcCs|jj|dd�dS)NT)Znoblock)rt�send�rK�msgrrrrz$szPublishClient.send�NN)�__name__�
__module__�__qualname__�__doc__�ttyper0rQrWr]r5rirjrk�	coroutinerb�propertyrarortrurz�
__classcell__rrrMrrhs$K



!



rc@speZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Ze	j
jjj
d
d��Zdd�Zdd�Zdd�Zdd�ZdS)�
RequestServercCs||_d|_d|_d|_dS)NF)r1r;rI�
_w_monitor�rKr1rrrr0*s
zRequestServer.__init__c
Cs(|��t�|jd�}djdi|j��|_|�tj�|_|j�	tj
d�|jddur9ttd�r9|j�	tjd�|j�	tj
|j�dd	��|��|�tj�|_|j�	tj
d�|jd
rutjj��sut�d|jd
�t�|jd
�|j�dd
�dkr�d�|j�dd��|_nd�tj�|jdd��|_t�d�t�d|j�|j�|j�t�d|j�|j�|j�|j�dd
�dkr�t�tj�|jdd�d�	|jjs�|jjr�n4zt� tj!|j|j�Wn&tj"�y}z
|j#t#j$kr�WYd}~qЂd}~wt%t&f�yYnwq�|�'�dS)zA
        Multiprocessing target for the zmq queue device
        Zworker_threadsztcp://{interface}:{ret_port}���r+Tr-r�zmq_backlog��Zmworker_queue_nicenessz$setting mworker_queue niceness to %d�ipc_mode��tcp�tcp://127.0.0.1:{}�tcp_master_workers��ipc://{}�sock_dir�workers.ipcz*Setting up the master communication serverzReqServer clients %szReqServer workers %s�Nr)(�_RequestServer__setup_signalsrr<r1rZurir>ZROUTER�clientsr@�LINGERrBr-�BACKLOGrA�_start_zmq_monitorZDEALER�workersr5r6�platformZ
is_windowsr�info�os�nice�w_uri�path�join�bind�chmodrRZdeviceZQUEUE�ZMQError�errnoZEINTR�KeyboardInterrupt�
SystemExitrS)rKr=ryrrr�
zmq_device0s\���
���zRequestServer.zmq_devicecCs|jrdSt�dt���d|_t|dd�dur |j��d|_t|dd�dur0|j��d|_t	|d�r@|j
jdur@|j
��t	|d�rP|j
jdurP|j
��t	|d	�rZ|j��t	|d
�rj|jjdurj|j��t	|d�r||jjdur~|j��dSdSdS)z4
        Cleanly shutdown the router socket
        Nz$MWorkerQueue under PID %s is closingTrIr�r�Fr�rtr?r=)r;rr�r��getpid�getattrrIrPr�rBr�rRrQr�rtr?r=rSrTrrrrQgs*






�zRequestServer.closecCs|j|jdd�dS)z�
        Pre-fork we need to create the zmq router device

        :param func process_manager: An instance of salt.utils.process.ProcessManager
        ZMWorkerQueue)�nameN)�add_processr��rKZprocess_managerrrr�pre_fork�szRequestServer.pre_forkcCsNtr#|jdr%t�d�t|j�|_tj|jj	d��
�t�d�dSdSdS)zM
        Starts ZMQ monitor for debugging purposes.
        :return:
        r.zStarting ZMQ monitor)�targetz$ZMQ monitor has been started startedN)rGr1rrFrHr?r��	threading�Thread�
start_poll�startrTrrrr��s
�z RequestServer._start_zmq_monitorcCs
t�d�}|�tj�|_|j�tjd�|��|j�	dd�dkr-d�
|j�	dd��|_nd	�
tj
�|jd
d��|_t�d|j�|j�|j�|j�	dd�dkrntj
�tj
�|jd
d��rnt�tj
�|jd
d�d
�tjjj|j|d�|_||_|j�|j�dS)ad
        After forking we need to create all of the local sockets to listen to the
        router

        :param func message_handler: A function to called to handle incoming payloads as
                                     they are picked up off the wire
        :param IOLoop io_loop: An instance of a Tornado IOLoop, to handle event scheduling
        r
r�r�r�r�r�r�r�r�r�r�zWorker binding to socket %sr�rpN)rr<r>ZREPr?r@r�r�r1rArr�r�r�r�rr�rb�isfiler�rqrrrsrt�message_handlerZon_recv_stream�handle_message)rKr�r2r=rrr�	post_fork�s(
	���zRequestServer.post_forkc	csb�z|�|�}Wntjjy|j�|�ddi��YdSw|�|�V}|j�|�|��dS)Nr|zbad load)�decode_payloadr5�
exceptionsZSaltDeserializationErrorrtrz�encode_payloadr�)rKrtrgZreplyrrrr��s��zRequestServer.handle_messagecCstj�|�SrV)r5rg�dumps�rKrgrrrr��r^zRequestServer.encode_payloadcCs$t�tj|j�t�tj|j�dSrV)�signal�SIGINT�_handle_signals�SIGTERMrTrrrZ__setup_signals�szRequestServer.__setup_signalscCsb|jj�d�}|tjkr|d7}n	|tjkr|d7}|d7}t�|�|��t�	t
jjj
�dS)Nz received a r�r�z	. Exiting)rNr~r�r�r�rrFrQ�sys�exitr5�defaultsZ	exitcodes�EX_OK)rK�signumZsigframer|rrrr��s



zRequestServer._handle_signalscCstj�|d�}|S)Nr)r5rgrhr�rrrr��szRequestServer.decode_payloadN)r~rr�r0r�rQr�r�r�r5rirjrkr�r�r�r�r�r�rrrrr�)s7
!

r�cCs�ttd�r=|r?d|vr|�tj|d�d|vr!|�tj|d�d|vr.|�tj|d�d|vrA|�tj|d�dSdSdSdS)a�
    Ensure that TCP keepalives are set as specified in "opts".

    Warning: Failure to set TCP keepalives on the salt-master can result in
    not detecting the loss of a minion when the connection is lost or when
    its host has been terminated without first closing the socket.
    Salt's Presence System depends on this connection status to know if a minion
    is "present".

    Warning: Failure to set TCP keepalives on minions can result in frequent or
    unexpected disconnects!
    r%r&r'r(r)N)rBrr@r%rCrDrE)Z
zmq_socketr1rrr�_set_tcp_keepalive�s
�r�c@sdeZdZdZddd�Zdd�Zdd	�Zd
d�Zej	j
jjddd
��Z
dd�Zej	j
jjdd��ZdS)�AsyncReqMessageClienta�
    This class wraps the underlying zeromq REQ socket and gives a future-based
    interface to sending and recieving messages. This works around the primary
    limitation of serialized send/recv on the underlying socket by queueing the
    message sends in this class. In the future if we decide to attempt to multiplex
    we can manage a pool of REQ/REP sockets-- but for now we'll just do them in serial
    rNcCsr||_||_||_|durtjjjj��|_	n||_	t
jj�
�|_g|_d|_i|_tjjj��|_t��|_dS)a�
        Create an asynchronous message client

        :param dict opts: The salt opts dictionary
        :param str addr: The interface IP address to bind to
        :param int linger: The number of seconds to linger on a ZMQ socket. See
                           http://api.zeromq.org/2-1:zmq-setsockopt [ZMQ_LINGER]
        :param IOLoop io_loop: A Tornado IOLoop event scheduler [tornado.ioloop.IOLoop]
        NF)r1�addr�lingerr5rirj�ioloop�IOLoop�currentr2rrq�futurer<r=Z
send_queuer;Z_send_future_map�locks�Lock�lockr��	get_ident�ident)rKr1r�r�r2rrrr0�s
zAsyncReqMessageClient.__init__cCs t|d�r
|jr
dS|��dS)Nr>)rBr>�_init_socketrTrrrrbszAsyncReqMessageClient.connectcCsT|jrdSd|_t|d�r|jdur|j�d�d|_|jjdur(|j��dSdS)NTr>rF)r;rBr>rQr=rRrSrTrrrrQs�zAsyncReqMessageClient.closecCs�|j�tj�|_ttd�r|j�tjd�t|j|j�|j	�
d�r=ttd�r0|j�tjd�n
ttd�r=|j�tjd�|j�tj
|j�|j�|j	�dS)Nr*i�ztcp://[�IPV6r
r-r)r=r>rZREQrBr@r*r�r1r��
startswithr�r-r�r�rbrTrrrr�'s


z"AsyncReqMessageClient._init_socketc#s��tjjj��}tj�|�}�dur��fdd�}|�|��j�	d�dur(d}|dur5�j
�|�j|�}�j
�
�j||�|V}tjjj�|��)zY
        Return a future which will be completed when the message has a response
        Ncs|��}�j��|�dSrV)�resultr2Zadd_callback)r��response�rxrKrr�
handle_futureCsz1AsyncReqMessageClient.send.<locals>.handle_futureZdetect_modeTr
)r5rirjZ
concurrentZFuturergr�Zadd_done_callbackr1rAr2Z
call_later�_timeout_messageZspawn_callback�
_send_recvrkrl)rK�message�timeoutrxr�r�Zsend_timeout�recvrr�rrz8s�
�zAsyncReqMessageClient.sendcCs|��s
|�td��dSdS)NzMessage timed out)�done�
set_exceptionr)rKr�rrrr�Ws�z&AsyncReqMessageClient._timeout_messageccs�za|j��V�>|j�|�Vz|j��V}Wn&tjjjy>}z|�	�s,|�
|�WYd}~Wd�WdSd}~wwWd�n1sIwY|�	�s`tj�
|�}|�|�WdSWdSty�}z|�	�sz|�
|�WYd}~dSWYd}~dSd}~wwrV)r��acquirer>rzr�rrqr�ZCancelledErrorr�r�r5rgrhZ
set_resultrm)rKr�r�r�ry�datarrrr�[s2�

����	����z AsyncReqMessageClient._send_recv)rNr})r~rr�r�r0rbrQr�r5rirjrkr�rzr�r�rrrrr��s


r�c@sDeZdZdZdd�Zdd�Zdd�Zedd	��Zd
d�Z	dd
�Z
dS)rHNcCs||_|j��|_d|_dS)z�
        Create ZMQ monitor sockets

        More information:
            http://api.zeromq.org/4-0:zmq-socket-monitor
        N)r?Zget_monitor_socket�_monitor_socket�_monitor_stream)rKr>rrrr0rs
zZeroMQSocketMonitor.__init__cCs2t�d�tjjj|j|d�|_|j�|j	�dS)N�Event monitor start!rp)
r�tracerrqrrrsr�r�ru�monitor_callback)rKr2rrrrJ}s

�z!ZeroMQSocketMonitor.start_io_loopc	Cs|t�d�z*|jdur'|j��r-|j��}|�|�|jdur*|j��sWdSWdSWdSWdSttjj	fy=YdSw)Nr�)
rr�r��pollZrecv_multipartr��AttributeErrorr�errorZContextTerminatedr{rrrr��s


,��zZeroMQSocketMonitor.start_pollcCsDtjduri}tt�D]}|�d�rtt|�}|||<q|t_tjS)NZEVENT_)rH�_ZeroMQSocketMonitor__EVENT_MAP�dirrr�r�)rK�	event_mapr��valuerrrr��s


�zZeroMQSocketMonitor.event_mapcCsJtjj�|�}|j|d|d<t�d|�|dtjkr#|��dSdS)N�event�descriptionzZeroMQ event: %s)	rr6�monitorZparse_monitor_messager�rrFZEVENT_MONITOR_STOPPEDrP)rKr|Zevtrrrr��s�z$ZeroMQSocketMonitor.monitor_callbackcCsf|jdurdSz|j��Wn
tjyYnwd|_d|_|jdur,|j��d|_t�d�dS)NzEvent monitor done!)	r?Zdisable_monitorr�Errorr�r�rQrr�rTrrrrP�s
�

zZeroMQSocketMonitor.stop)r~rr�r�r0rJr�r�r�r�rPrrrrrHos

rHc@s�eZdZdZe��Zdd�Zdd�Z		d!dd�Z	e
d	d
��Ze
dd��Ze
jjjjd"d
d��Zdd�Ze
dd��Zdd�Zdd�Zdd�Ze
dd��Zdd�Zdd�Zdd �ZdS)#�
PublishServerzD
    Encapsulate synchronous operations for a publisher channel
    cCs
||_dSrV�r1r�rrrr0�s
zPublishServer.__init__cCstjjj�d�S)N�)r5rirjrk�sleeprTrrrrb�szPublishServer.connectNc

s4tjjj��}|��||_t�d�}|�	tj
�}t|�}|�|�t
||j�tjj�|�|_}z|�tj|j�dd��Wn%ttjjfye|�tj|j�dd��|�tj|j�dd��Ynw|jdduryttd�ry|�tjd�|�tj|j�dd��|�tjd	�|�	tj�}tjj�|�}|�tjd	�tj j!�"|j#�t$�%d
|j&�|�'|j&�t$�%d|j#�tj j(�)d��|�'|j#�Wd
�n1s�wYtjjj*j+�fdd��}	|�,|	�z&z|�-�Wn	t.y�YnwW|�/�|�/�d
SW|�/�|�/�d
S|�/�|�/�w)z�
        This method represents the Publish Daemon process. It is intended to be
        run in a thread or process as it creates and runs its own ioloop.
        r
Zpub_hwmr�r+Tr-rr�r�z!Starting the Salt Publisher on %szStarting the Salt Puller on %s�Nc
3sb�z|D]
}tj�|�}�|�VqWdSty0}ztjd|tjd�WYd}~dSd}~ww)Nz Un-handled error in publisher %s)Zexc_info_on_loglevel)r5rgrhrmrr��logging�DEBUG)Zpackages�packagergry��publish_payloadrrru�s�����z-PublishServer.publish_daemon.<locals>.on_recv)0r5rirjr�r�Zmake_currentr2rr<r>ZPUBrHrJr�r1rqrrrs�	dpub_sockr@ZHWMrAr�r�r�ZSNDHWMZRCVHWMrBr-r�r�ZPULLr6rZcheck_ipc_path_max_len�pull_urirr��pub_urir��filesZ	set_umaskrkr�rur�r�rQ)
rKr�Zpresence_callbackZremove_presence_callbackr�r=�pub_sockr�Z	pull_sockrurr�r�publish_daemon�sZ


��

����
zPublishServer.publish_daemoncCsH|j�dd�dkrd�|j�dd��}|Sd�tj�|jdd	��}|S)
Nr�r�r�r��tcp_master_publish_pull�r�r��publish_pull.ipc)r1rArr�r�r�)rKr�rrrr�s���zPublishServer.pull_uricCsdjdi|j��S)Nz tcp://{interface}:{publish_port}r)rr1rTrrrr
szPublishServer.pub_uriccs�tj�|�}|jdrm|rV|D](}t�d|j�tjj�	t
�tjj�	|�����}|j
�||g�Vt�d�q|j�d�rTt�d�|j
�d|g�Vt�d�dSdSt�d|j�|j
�d	|g�Vt�d
�dSt�d|j�|j
�|�Vt�d�dS)
Nrz'Sending filtered data over publisher %szFiltered data has been sentZ
order_masterszSending filtered data to syndicr#z%Filtered data has been sent to syndicz*Sending broadcasted data over publisher %sr zBroadcasted data has been sentz-Sending ZMQ-unfiltered data over publisher %szUnfiltered data has been sent)r5rgr�r1rr�rr6r7r8r3r4r9r�Zsend_multipartrArz)rKrgZ
topic_listZtopicZhtopicrrrr�s,�
�
�zPublishServer.publish_payloadcCs|j|j|jfd�dS)a.
        Do anything necessary pre-fork. Since this is on the master side this will
        primarily be used to create IPC channels and create our daemon process to
        do the actual publishing

        :param func process_manager: A ProcessManager, from salt.utils.process.ProcessManager
        )�argsN)r�rr�r�rrrr�/s
�zPublishServer.pre_forkcCs z|jjWStyYdSw)z�
        This thread's zmq publisher socket. This socket is stored on the class
        so that multiple instantiations in the same thread will re-use a single
        zmq socket.
        N)�
_sock_data�sockr�rTrrrr<s

�zPublishServer.pub_sockcCs�|jr|��t��}|�tj�|j_|j�tj	d�|j
�dd�dkr/d�|j
�dd��}n
d�t
j�|j
d	d
��}t�d|�|j�|�|jjS)z�
        Create and connect this thread's zmq socket. If a publisher socket
        already exists "pub_close" is called before creating and connecting a
        new socket.
        r�r�r�r�r�rrr�r�rzConnecting to pub server: %s)r�	pub_closerr<r>ZPUSHrr	r@r�r1rArr�r�r�rrFrb)rK�ctxr�rrr�pub_connectHs��zPublishServer.pub_connectcCs,t|jd�r|jj��t|jd�dSdS)zn
        Disconnect an existing publisher socket and remove it from the local
        thread's cache.
        r	N)rBrr	rQ�delattrrTrrrr
_s�zPublishServer.pub_closecKs4|js|��tj�|�}|j�|�t�d�dS)z�
        Publish "load" to minions. This send the load to the publisher daemon
        process with does the actual sending to minions.

        :param dict load: A load to be sent across the wire to minions
        zSent payload to publish daemon.N)rrr5rgr�rzrrF)rKrgrLZ
serializedrrr�publishhs
zPublishServer.publishcCs|j�dd�S)NrF)r1rArTrrr�
topic_supportuszPublishServer.topic_supportcCrYrV)r
rTrrrrQyr^zPublishServer.closecCrUrVrrTrrrrW|rXzPublishServer.__enter__cCrYrVrZr[rrrr]r^zPublishServer.__exit__r}rV)r~rr�r�r��localrr0rbrr�r�rr5rirjrkr�r�r�rrr
rrrQrWr]rrrrr��s2
�D



	
r�cs^eZdZdZ�fdd�Zejjjj	dd��Z
ejjjj	d
dd��Zd	d
�Ze
dd��Z�ZS)�
RequestClientrcs@t��||�||_|�|�}t|j||d�|_d|_d|_dS)NrpF)r/r0r1�get_master_urir��message_clientr;r_)rKr1r2rrMrrr0�s
�
zRequestClient.__init__cCsd|_|j��dS�NT)r_rrbrTrrrrb�szRequestClient.connect�<ccs.�|��V|jj||d�V}tjjj�|��)N)r�)rbrrzr5rirjrkrl)rK�loadr��retrrrrz�s�
zRequestClient.sendcCs|jrdSd|_|j��dSr)r;rrQrTrrrrQ�szRequestClient.closecCsDd|vr|dSd|vrt|d|d|�d�|�d�d�Std��)Nrr
rrZsource_ret_portrcz5ReqChannel: missing master_uri/master_ip in self.opts)rrArr�rrrr�s�zRequestClient.get_master_uri)r)r~rr�r�r0r5rirjrkr�rbrzrQ�staticmethodrr�rrrMrr�s


rr})3r�r�r3r�r�r�r�r��randomrZ	zmq.errorrZzmq.eventloop.futureZzmq.eventloop.zmqstreamZsalt.ext.tornador5Zsalt.ext.tornado.concurrentZsalt.ext.tornado.genZsalt.ext.tornado.ioloopZsalt.ext.tornado.locksZsalt.payloadZsalt.transport.baseZsalt.utils.filesZsalt.utils.processZsalt.utils.stringutilsZsalt.utils.zeromqZsalt._compatrZsalt.exceptionsrrrrZzmq.utils.monitorrG�ImportError�	getLoggerr~rrZ	transport�baserZDaemonizedRequestServerr�r�r�rHZDaemonizedPublishServerr�rrrrr�<module>sX�

<B1}AT