HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.109.1.lve.el8.x86_64 #1 SMP Thu Mar 5 20:23:46 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.30
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //opt/saltstack/salt/lib/python3.10/site-packages/salt/transport/__pycache__/tcp.cpython-310.pyc
o

�N�gT��@s6dZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
ZddlZddl
ZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlmZmZddlmZddl m!Z!ej"j#�$�r�dZ%ndZ%e%r�ddl&Ze�'e(�Z)Gdd	�d	e*�Z+d
d�Z,dd
�Z-dd�Z.Gdd�de!�Z/Gdd�d�Z0Gdd�dej1j2j3�Z4Gdd�dej1j2j5�Z6Gdd�dej7j8j9j:�Z;Gdd�de;�Z<Gdd�dej7j8j=j>�Z?Gdd�d�Z@Gd d!�d!�ZAGd"d#�d#ej7j8j9j:�ZBGd$d%�d%ej1j2jC�ZDGd&d'�d'ej1j2jE�ZFdS)(zh
TCP transport classes

Wire protocol: "len(payload) msgpack({'head': SOMEHEADER, 'body': SOMEBODY})"


�N)�SaltClientError�SaltReqTimeoutError)�
ip_bracket)�SignalHandlingProcessTFc@seZdZdZdS)�ClosingError� N)�__name__�
__module__�__qualname__�__doc__�rr�F/opt/saltstack/salt/lib/python3.10/site-packages/salt/transport/tcp.pyr2srcCs&tj}|�dd�rtj}t�|tj�S)NZipv6F)�socket�AF_INET�get�AF_INET6�SOCK_STREAM)�opts�familyrrr
�_get_socket6srcCst|ddd�t||�fS)NZ	interfaceT��strip)r�int)rZ	port_typerrr
�_get_bind_addr=s
�rcCsbttd�r�|�dd�r�|�tjtjd�ttd�rgttd�r3|�dd�}|d	kr3|�tjtjt|��ttd
�rM|�dd�}|d	krM|�tjtj	t|��ttd�rg|�d
d�}|d	krg|�tjtj
t|��ttd�r�|�dd�}|�d
d�}|d	ks�|d	kr�|d	kr�d}|d	kr�d}|�tjdt|d�t|d�f�dSdSdS|�tjtjd	�dSdS)z<
    Ensure that TCP keepalives are set for the socket.
    �SO_KEEPALIVEZ
tcp_keepaliveF��SOL_TCP�TCP_KEEPIDLE�tcp_keepalive_idle���r�TCP_KEEPCNT�tcp_keepalive_cnt�
TCP_KEEPINTVL�tcp_keepalive_intvl�SIO_KEEPALIVE_VALSi i�N)
�hasattrrr�
setsockopt�
SOL_SOCKETrrrrr r"�ioctlr$)�sockrrr!r#rrr
�_set_tcp_keepaliveDsV


�
�
�


����r*cs<eZdZdZdZ�fdd�Zdd�Zdd�Zd	d
�Z�Z	S)�LoadBalancerServera
    Raw TCP server which runs in its own process and will listen
    for incoming connections. Each incoming connection will be
    sent via multiprocessing queue to the workers.
    Since the queue is shared amongst workers, only one worker will
    handle a given connection.
    �cs(t�jdi|��||_||_d|_dS)Nr)�super�__init__r�socket_queue�_socket)�selfrr/�kwargs��	__class__rr
r.�s
zLoadBalancerServer.__init__cCs0|jdur|j�tj�|j��d|_dSdS�N)r0�shutdownr�	SHUT_RDWR�close�r1rrr
r8�s



�zLoadBalancerServer.closecC�|��dSr5�r8r9rrr
�__del__��zLoadBalancerServer.__del__c
Cs�t|j�|_|j�tjtjd�t|j|j�|j�d�|j�	t
|jd��|j�|j�	z|j�
�\}}|j�||fdd�Wnty`}ztjjj�|�tjkr[WYd}~q.�d}~wwq/)z)
        Start the load balancer
        r�ret_portTN)rrr0r&rr'�SO_REUSEADDRr*�setblocking�bindr�listen�backlog�acceptr/�put�OSError�salt�ext�tornado�utilZerrno_from_exception�errnoZECONNABORTED)r1�
connection�address�errr
�run�s"���zLoadBalancerServer.run)
rr	r
rrCr.r8r<rO�
__classcell__rrr3r
r+�s
r+c@s&eZdZdZeddd��Zdd�ZdS)	�ResolverF�
cCs tjjjjjd|d�d|_dS)Nz)salt.ext.tornado.netutil.ThreadedResolver)�num_threadsT)rGrHrIZnetutilrQZ	configure�_resolver_configured)�clsrSrrr
�_config_resolver�s�
zResolver._config_resolvercOs|js	|��dSdSr5)rTrV)r1�argsr2rrr
r.�s�zResolver.__init__N)rR)rr	r
rT�classmethodrVr.rrrr
rQ�s
rQcs�eZdZdZdZ�fdd�Zdd�Zejj	j
jddd	��Zejj	j
jd
d��Z
ejj	j
jdd
��Zdd�Zdd�Zdd�Z�ZS)�TCPPubClientz&
    Tornado based TCP Pub Client
    �tcpcs@t�j||fi|��||_||_d|_d|_d|_t�|_dS�NF)	r-r.r�io_loop�message_client�	connected�_closingrQ�resolver)r1rr\r2r3rr
r.�szTCPPubClient.__init__cCs2|jrdSd|_|jdur|j��d|_dSdS�NT�r_r]r8r9rrr
r8�s


�zTCPPubClient.closeNccs^�d|_||_t|j|jdt|j�|j|||j�d�|j�d�d�|_|j��Vd|_	dS)NTZ	master_ip�	source_ipZsource_publish_port)r\�connect_callback�disconnect_callbackrc�source_port)
�_connect_called�publish_port�
MessageClientrrr\rr]�connectr^)r1rhrdrerrr
rj�s�

�

zTCPPubClient.connectcCs<t|t�stjj�|�}tjj�|�}n|}tj	j
j�|��r5)
�
isinstance�dictrG�utils�msgpack�loads�	transport�frame�decode_embedded_strsrHrI�gen�Return)r1�messages�bodyrrr
�_decode_messages�s

zTCPPubClient._decode_messagesccs�|jj�|�VdSr5)r]�_stream�write)r1�msgrrr
�sends�zTCPPubClient.sendcCs|j�|�S)z.
        Register an on_recv callback
        )r]�on_recv)r1�callbackrrr
r|szTCPPubClient.on_recvcC�|Sr5rr9rrr
�	__enter__
�zTCPPubClient.__enter__cCr:r5r;)r1�exc_typeZexc_valZexc_tbrrr
�__exit__r=zTCPPubClient.__exit__�NN)rr	r
r�ttyper.r8rGrHrIrs�	coroutinerjrwr{r|rr�rPrrr3r
rY�s	





rYc@sneZdZdZdZdd�Zedd��Zdd�Zd	d
�Z	dd�Z
d
d�Zdd�Ze
jjjjddd��Zdd�ZdS)�TCPReqServerzc
    Tornado based TCP Request/Reply Server

    :param dict opts: Salt master config options.
    �cCs||_d|_d|_dSr5)rr0�
req_server�r1rrrr
r.s
zTCPReqServer.__init__cCs|jSr5)r0r9rrr
r#szTCPReqServer.socketc
Cs�|jdur6z	|j�tj�Wnty(}z|jtjkrn�WYd}~nd}~ww|jdur3|j��d|_|jdurhz|j��Wn tyb}z|jdkrP�t	�
dt|��WYd}~nd}~wwd|_dSdS)N�	z4TCPReqServerChannel close generated an exception: %s)r0r6rr7rFrK�ENOTCONNr�r8�log�	exception�str)r1�excrrr
r8's4
���



���
�zTCPReqServer.closecCr~r5rr9rrr
rEr�zTCPReqServer.__enter__cGr:r5r;)r1rWrrr
r�Hr=zTCPReqServer.__exit__cCs�trt��|_|jt|j|jfdd�dStjj	�
�sDt|j�|_|j�
tjtjd�t|j|j�|j�d�|j�t|jd��dSdS)zB
        Pre-fork we need to create the zmq router device
        r+)rW�namerrr>N)�USE_LOAD_BALANCER�multiprocessing�Queuer/�add_processr+rrGrm�platform�
is_windowsrr0r&rr'r?r*r@rAr�r1Zprocess_managerrrr
�pre_forkKs


��zTCPReqServer.pre_forkcCs�||_tjj�|��jtrt|j|j|j	�
d�d�|_nPtjj�
�rIt|j	�|_|j�tjtjd�t|j|j	�|j�d�|j�t|j	d��t|j|j	�
d�|d�|_|j�|j�|j�|j�Wd�dSWd�dS1swwYdS)z�
        After forking we need to create all of the local sockets to listen to the
        router

        message_handler: function to call with your payloads
        �ssl��ssl_optionsrrr>)r�r\N)�message_handlerrGrm�asynchronous�current_ioloopr��LoadBalancerWorkerr/�handle_messagerrr�r�r�rr0r&rr'r?r*r@rAr�SaltMessageServer�
add_socketrBrC)r1r�r\rrr
�	post_fork]s0

�
��"�zTCPReqServer.post_forkNccs4�|�|�}|�|�V}|�tjjj||d��dS)N��header)�decode_payloadr�ryrGrprq�	frame_msg)r1�stream�payloadr�Zreplyrrr
r�|s�
zTCPReqServer.handle_messagecC�|Sr5r)r1r�rrr
r��r�zTCPReqServer.decode_payloadr5)rr	r
rrCr.�propertyrr8rr�r�r�rGrHrIrsr�r�r�rrrr
r�s

r�csPeZdZdZ�fdd�Zejjjj	ejjj
jfdd��Zdd�Z
dd	�Z�ZS)
r�z{
    Raw TCP server which will receive all of the TCP streams and re-assemble
    messages that are sent through to us
    csJ|�dd�p
tjjjj��}d|_t�j	|i|��||_
g|_||_dS)Nr\F)
�poprGrHrI�ioloop�IOLoop�currentr_r-r.r\�clientsr�)r1r�rWr2r\r3rr
r.�s�
zSaltMessageServer.__init__c	
cs��t�d|�|j�||f�tjj��}z+	|jddd�V}|�	|�|D]}tj
j�|�}|d}|j
�|j||d|�q&q|yWt�d|�|�||f�Yd
Sty|}ztjd|dd	�|�||f�|��WYd
}~d
Sd
}~ww)zP
        Handle incoming streams and add messages to the incoming queue
        zReq client %s connectedT���partial�headrvzreq client disconnected %szother master-side exception: %s��exc_infoN)r��tracer��appendrGrmrn�Unpacker�
read_bytes�feedrprqrrr\�spawn_callbackr��
remove_client�	Exceptionr8)	r1r�rMZ_StreamClosedError�unpacker�
wire_bytes�
framed_msgr�rNrrr
�
handle_stream�s0�

��	��zSaltMessageServer.handle_streamcCs2z	|j�|�WdStyt�d�YdSw)Nz/Message server client was not in list to remove)r��remove�
ValueErrorr�r��r1�clientrrr
r��s
�zSaltMessageServer.remove_clientc
Csx|jrdSd|_|jD]}|\}}|��|�|�qz|��WdSty;}z
|jdkr0�WYd}~dSd}~ww)z"
        Close the server
        NTr�)r_r�r8r��stoprFrK)r1�itemr�rMr�rrr
r8�s

���zSaltMessageServer.close)rr	r
rr.rGrHrIrsr��iostream�StreamClosedErrorr�r�r8rPrrr3r
r��s


�r�cs4eZdZdZ�fdd�Z�fdd�Zdd�Z�ZS)r�z�
    This will receive TCP connections from 'LoadBalancerServer' via
    a multiprocessing queue.
    Since the queue is shared amongst workers, only one worker will handle
    a given connection.
    csHt�j|g|�Ri|��||_t��|_tj|jd�|_|j�	�dS)N)�target)
r-r.r/�	threading�Event�_stop�Thread�socket_queue_thread�thread�start)r1r/r�rWr2r3rr
r.�s

zLoadBalancerWorker.__init__cs"|j��|j��t���dSr5)r��setr��joinr-r8r9r3rr
r8�s

zLoadBalancerWorker.closec	Csnz*	z|j�dd�\}}Wntjy |j��rYWdSYqw|j�|j||�qt	t
fy6YdSw�NTr)r/r�queue�Emptyr��is_setr\r�Z_handle_connection�KeyboardInterrupt�
SystemExit)r1Z
client_socketrMrrr
r��s 
���
�z&LoadBalancerWorker.socket_queue_thread)rr	r
rr.r8r�rPrrr3r
r��s
r�cs*eZdZdZd�fdd�	Zdd�Z�ZS)�TCPClientKeepAlivezN
    Override _create_stream() in TCPClient to enable keep alive support.
    Ncs||_t�j|d�dS)N�r`)rr-r.)r1rr`r3rr
r.�szTCPClientKeepAlive.__init__cKsPt|j�}t||j�tjjjj||d�}tjjjdkr!|�	|�S||�	|�fS)z�
        Override _create_stream() in TCPClient.

        Tornado 4.5 added the kwargs 'source_ip' and 'source_port'.
        Due to this, use **kwargs to swallow these and any future
        kwargs to maintain compatibility.
        )�max_buffer_size)r�)
rrr*rGrHrIr�ZIOStream�version_inforj)r1r��af�addrr2r)r�rrr
�_create_stream�s

�
z!TCPClientKeepAlive._create_streamr5)rr	r
rr.r�rPrrr3r
r��sr�c@s�eZdZdZ						ddd�Zdd�Zejjj	j
dd��Zd	d
�Zejjj	j
dd��Z
ejjj	j
d
d��Zejjj	j
dd��Zdd�Zdd�Zdd�Zdd�Zejjj	j
ddd��ZdS)riz*
    Low-level message sending client
    Nc

Cs�||_||_||_||_|	|_||_||_|ptjj	j
j��|_
tjj�|j
��t||d�|_Wd�n1s:wYi|_d|_d|_d|_d|_tjj	j��|_d|_d|_|�dd�|_dS)Nr�FZtcp_reconnect_backoffr)r�host�portrcrfrdrerGrHrIr�r�r�r\rmr�r�r��_tcp_client�send_future_map�_read_until_future�_on_recvr_�_closed�
concurrent�FutureZ_connecting_future�_stream_return_runningrxr�backoff)
r1rr�r�r\r`rdrercrfrrr
r.s(�zMessageClient.__init__cCs$|jrdSd|_|j�d|j�dSr�)r_r\�add_timeout�check_closer9rrr
r8?szMessageClient.closecCs:|js|j��d|_d|_d|_dS|j�d|j�dS)NFTr)	r�r�r8rxr_r�r\r�r�r9rrr
r�Es

zMessageClient.check_closecCr:r5r;r9rrr
r<Pr=zMessageClient.__del__c
ks��|js|jr|j|jd�}d}|durg|jsg|jsgz|jjt|jdd�|jfd|j	�
d�i|��V}Wn(ty\}zt�
d|j|j||j�tjjj�|j�VWYd}~nd}~ww|durg|jsg|jrtjjj�|��)N)rcrfTrr�r�zgTCP Message Client encountered an exception while connecting to %s:%s: %r, will reconnect in %d seconds)rcrfr�r_r�rjrr�r�rrr�r��warningr�rGrHrIrs�sleeprt)r1r2r�r�rrr
�	getstreamUs:���
��� ���zMessageClient.getstreamccsR�|jdur#|��V|_|jr%|js|j�|j�|jr'|�d�dSdSdSdSra)rxr�r�r\r��_stream_returnrdr9rrr
rjqs�
�zMessageClient.connectc

csP�d|_tjj��}|j�s#zM|jjddd�V}|�|�|D]:}tj	j
�|�}|d}|d}|�d�}||j
vrD|j
�|��|�q|jdurS|j�|j||�qt�d|�qWn�tjjjjy�}zJt�d|j|j�|j
��D]}|�|�qui|_
|js�|jr�WYd}~dS|jr�|��|j}	d|_|	r�|	� �tjj��}|�!�VWYd}~nod}~wt"y�d	|j#vr�t�$d
�nt%�YnVt&�y}zItjddd�|j
��D]}|�|�q�i|_
|js�|jr�WYd}~dS|jr�|��|j}	d|_|	�r
|	� �tjj��}|�!�VWYd}~nd}~ww|jrd
|_dS)NTr�r�r�rv�midz7Got response for message_id %s that we are not trackingz*tcp stream to %s:%s closed, unable to recv�detect_modez[There was an error trying to use TCP transport; attempting to fallback to another transportzException parsing responser�F)'r�rGrmrnr�r_rxr�r�rprqrrrr�r�Z
set_resultr�r\r�r��errorrHrIr�r��debugr�r��values�
set_exceptionr�rer8rj�	TypeErrorr�inforr�)
r1r�r�r�r�rv�
message_idrN�futurer�rrr
r�{s��



����
�����
AzMessageClient._stream_returncCstt���Sr5)r��uuidZuuid4r9rrr
�_message_id�r=zMessageClient._message_idcs(�dur	�|_dS�fdd�}||_dS)zU
        Register a callback for received messages (that we didn't initiate)
        Ncs�|�dSr5r)r�rv�r}rr
�	wrap_recv�r=z(MessageClient.on_recv.<locals>.wrap_recv)r�)r1r}rrr�r
r|�s

zMessageClient.on_recvcCs*||jvrdS|j�|�}|j�|�dSr5)Zsend_timeout_mapr�r\Zremove_timeout)r1r��timeoutrrr
�remove_message_timeout�s
z$MessageClient.remove_message_timeoutcCs8||jvrdS|j�|�}|dur|�td��dSdS)NzMessage timed out)r�r�r�r)r1r�rzr�rrr
�timeout_message�s
�zMessageClient.timeout_messageFc#s���jrt�����}d|i}tjjj��}�dur&��fdd�}|�|�|�j	|<�j
�d�dur5d}|durC�j�
|�j||�tjjj||d��tjjjj��fdd	��}	�j�|	�|V}
tjjj�|
��)
Nr�cs|��}�j��|�dSr5)�resultr\�add_callback)r��response)r}r1rr
�
handle_future�sz)MessageClient.send.<locals>.handle_futurer�Trr�c3s(����V�jr�j���VdSdSr5)rjrxryr)r�r1rr
�_do_send�s
�
�z$MessageClient.send.<locals>._do_send)r_rr�rGrHrIr�r�Zadd_done_callbackr�rrr\Z
call_laterrrprqr�rsr�rrt)r1rzrr}�rawr�r�r�rr�recvr)r}r�r1r
r{�s(�


zMessageClient.send)NNNNNN)NNF)rr	r
rr.r8rGrHrIrsr�r�r<r�rjr�r�r|rrr{rrrr
ris2	
�$







	
F

ric@s(eZdZdZdd�Zdd�Zdd�ZdS)	�
Subscriberz=
    Client object for use with the TCP publisher server
    cCs"||_||_d|_d|_d|_dSr[)r�rMr_r��id_)r1r�rMrrr
r.s

zSubscriber.__init__cCsR|jrdSd|_|j��s#|j��|jdur%|j��r'|j��dSdSdSdSra)r_r��closedr8r�Zdoner�r9rrr
r8s

�zSubscriber.closecCr:r5r;r9rrr
r<%r=zSubscriber.__del__N)rr	r
rr.r8r<rrrr
r	s
rcsfeZdZdZ	d�fdd�	Zdd�Zdd�Zejj	j
jd	d
��Zdd�Z
ejj	j
jdd
d��Z�ZS)�	PubServerz
    TCP publisher
    Ncsft�j|�d�d�||_||_d|_t�|_d|_|r ||_	ndd�|_	|r,||_
dSdd�|_
dS)Nr�r�FcSr�r5r)�
subscriberrzrrr
�<lambda><�z$PubServer.__init__.<locals>.<lambda>cSr~r5r)rrrr
r@r)r-r.rr\rr_r�r�Zpresence_events�presence_callback�remove_presence_callback)r1rr\rrr3rr
r.0s

zPubServer.__init__cCs*|jrdSd|_|jD]}|j��qdSra)r_r�r�Z
disconnectr�rrr
r8Bs
�zPubServer.closecCr:r5r;r9rrr
r<Jr=zPubServer.__del__c
cs�tjj��}|js�z-|jjddd�|_|jV}|�|�|D]}tj	j
�|�}|d}|jr5|�||�qWnGtj
jjjyd}zt�d|j�|��|�|�|j�|�WYd}~dSd}~wty~}ztjd|jdd�WYd}~qd}~ww|jr
dSdS)Nr�Tr�rvz'tcp stream to %s closed, unable to recvz"Exception parsing response from %sr�)rGrmrnr�r_r�r�r�r�rprqrrrrHrIr�r�r�r�rMr8rr��discardr�r�)r1r�r�r�r�rvrNrrr
�_stream_readOs:�
��
�����zPubServer._stream_readcCs6t�d|�t||�}|j�|�|j�|j|�dS)NzSubscriber at %s connected)r�r�rr��addr\r�r)r1r�rMr�rrr
r�hs
zPubServer.handle_streamc
cs"�t�d||�tjj�|�}g}|rN|D]7}d}|jD]%}||jkrAz|j�	|�Vd}Wqtj
jjj
y@|�|�Yqwq|sLt�d||j�qn!|jD]}z	|j�	|�VWqQtj
jjj
yn|�|�YqQw|D]}t�d|j�|��|�|�|j�|�qqt�d�dS)Nz'TCP PubServer sending payload: %s 

 %rFTz"Publish target %s not connected %rz0Subscriber at %s has disconnected from publisherz)TCP PubServer finished publishing payload)r�r�rGrprqr�r�rr�ryrHrIr�r�r�r�rMr8rr)r1�packageZ
topic_listr��	to_removeZtopic�sentr�rrr
�publish_payloadosD�

����
��
zPubServer.publish_payload)NNNr5)rr	r
rr.r8r<rGrHrIrsr�rr�rrPrrr3r
r+s�


rc@sreZdZdZdZdd�Zedd��Zdd�Zd	d
�Z			ddd
�Z
dd�Zej
jjjdd��Zdd�Zdd�ZdS)�TCPPublishServerz)
    Tornado based TCP PublishServer
    r,cCs||_d|_dSr5)r�pub_sockr�rrr
r.�s
zTCPPublishServer.__init__cCs|j�dd�S)NZ
order_mastersF)rrr9rrr
�
topic_support�szTCPPublishServer.topic_supportcCs|�|d�dS�Nr)r.)r1�staterrr
�__setstate__�szTCPPublishServer.__setstate__cCs
d|jiSr)rr9rrr
�__getstate__�s
zTCPPublishServer.__getstate__Nc		Csptjjj��}|��||_t|j|||d�|_	}t
|j�}|�tj
tjd�t||j�|�d�|�t|jd��|�|j�|�|�|j�dd�dkrZt|j�dd	��}n
tj�|jd
d�}||_	tjjj|||d�}t�d
|�tj j!�"d��|�#�Wd�n1s�wYz z|�#�Wnt$t%fy�YnwW|�&�dSW|�&�dS|�&�w)zK
        Bind to the interface specified in the configuration file
        )r\rrrrrh�ipc_mode�rZ�tcp_master_publish_pull��sock_dir�publish_pull.ipc)r\Zpayload_handlerzStarting the Salt Puller on %s�N)'rGrHrIr�r�Zmake_currentr\rr�
pub_serverrr&rr'r?r*r@rArrBrCr�rr�os�pathr�rp�ipcZIPCMessageServerr�r�rm�filesZ	set_umaskr�r�r�r8)	r1rrrr\r)r)�pull_uriZ	pull_sockrrr
�publish_daemon�sL	�


�
����zTCPPublishServer.publish_daemoncCs|j|j|jjd�dS)z�
        Do anything necessary pre-fork. Since this is on the master side this will
        primarily be used to create IPC channels and create our daemon process to
        do the actual publishing
        )r�N)r�r/r4rr�rrr
r��szTCPPublishServer.pre_forkcgs(�|jj|g|�R�V}tjjj�|��r5)r)rrGrHrIrsrt)r1r�rW�retrrr
r�s�z TCPPublishServer.publish_payloadcKsx|j�dd�dkrt|j�dd��}n
tj�|jdd�}|js4tjj	j
tjjj
|fdd	�|_|j��|j�|�d
S)z+
        Publish "load" to minions
        r"r#rZr$r%r&r'r\)Z
loop_kwargN)rrrr*r+r�rrGrmr�ZSyncWrapperrpr,ZIPCMessageClientrjr{)r1r�r2r.rrr
�publish�s�
zTCPPublishServer.publishcCs|jr
|j��d|_dSdSr5)rr8r9rrr
r8�s

�zTCPPublishServer.closer�)rr	r
rrCr.r�rr r!r/r�rGrHrIrsr�rr1r8rrrr
r�s 

�6

rcsVeZdZdZdZ�fdd�Zejjj	j
dd��Zejjj	j
ddd	��Zd
d�Z
�ZS)
�TCPReqClientz)
    Tornado based TCP RequestClient
    rZc	
s�t�j||fi|��||_||_tj�|jd�}|j�dd�\}}|t	|�f}|�
d�}tjj
j||t	|�|||�
d�|�
d�d�|_d|_dS)	NZ
master_uri�:rr`rcZsource_ret_port)r\r`rcrfF)r-r.rr\�urllib�parse�urlparse�netloc�rsplitrrrGrprZrir]r_)	r1rr\r2r5Zmaster_hostZmaster_portZmaster_addrr`r3rr
r.s"
�
	zTCPReqClient.__init__ccs�d|_|j��VdSra)rgr]rjr9rrr
rjs�zTCPReqClient.connect�<ccs$�|jj||d�V}tjjj�|��)N)r)r]r{rGrHrIrsrt)r1�loadrr0rrr
r{$s�zTCPReqClient.sendcCs|jrdSd|_|j��dSrarbr9rrr
r8)szTCPReqClient.close)r9)rr	r
rr�r.rGrHrIrsr�rjr{r8rPrrr3r
r2s


r2)GrrK�loggingr�r*r�rr�r4r�Zsalt.ext.tornadorGZsalt.ext.tornado.concurrentZsalt.ext.tornado.genZsalt.ext.tornado.iostreamZsalt.ext.tornado.netutilZsalt.ext.tornado.tcpclientZsalt.ext.tornado.tcpserverZsalt.masterZsalt.payloadZsalt.transport.frameZsalt.transport.ipcZsalt.utils.asynchronousZsalt.utils.filesZsalt.utils.msgpackZsalt.utils.platformZsalt.utils.versionsZsalt.exceptionsrrZsalt.utils.networkrZsalt.utils.processrrmr�r�r�Zsalt.ext.tornado.util�	getLoggerrr�r�rrrr*r+rQrp�baseZ
PublishClientrYZDaemonizedRequestServerr�rHrIZ	tcpserverZ	TCPServerr�r�Z	tcpclientZ	TCPClientr�rirrZDaemonizedPublishServerrZ
RequestClientr2rrrr
�<module>sh
><ErG'"t"hq