File: //opt/saltstack/salt/lib/python3.10/site-packages/salt/transport/__pycache__/zeromq.cpython-310.pyc
o
�N�g3� � @ s� d Z ddlZddlZddlZddlZddlZddlZddlZddlm Z ddl
ZddlZddl
ZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlZddlmZ ddlmZmZ ddlmZm Z mZ zddl!ZdZ"W n e#y� dZ"Y nw e�$e%�Z&dd d
�Z'G dd� dej(j)j*�Z*G d
d� dej(j)j+�Z,dd� Z-G dd� d�Z.G dd� d�Z/G dd� dej(j)j0�Z1G dd� dej(j)j2�Z2dS )z
Zeromq transport classes
� N)�randint)� ipaddress)�
SaltException�SaltReqTimeoutError)�LIBZMQ_VERSION_INFO�ZMQ_VERSION_INFO�zmqTFc C s� ddl m} dj|| �|d�}|s|rntdkr_tdkr_|r.|r.dj||�||| �|d�}|S |r@|s@d j||�|| �|d
�}|S |r]|s]t�| �jdkrNdn|d
�}dj|||| �|d�}|S t� d� t� d� t� d� |S )aR
Return the ZeroMQ URI to connect the Minion to the Master.
It supports different source IP / port, given the ZeroMQ syntax:
// Connecting using a IP address and bind to an IP address
rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
Source: http://api.zeromq.org/4-1:zmq-tcp
r )�
ip_bracketztcp://{master_ip}:{master_port})� master_ip�master_port)� � � )� r r
z9tcp://{source_ip}:{source_port};{master_ip}:{master_port})� source_ip�source_portr
r z-tcp://{source_ip}:0;{master_ip}:{master_port})r r
r r z0.0.0.0z::z6tcp://{ip_any}:{source_port};{master_ip}:{master_port})�ip_anyr r
r zAUnable to connect to the Master using a specific source IP / portz9Consider upgrading to pyzmq >= 16.0.1 and libzmq >= 4.1.6zVSpecific source IP / port for connecting to master returner port: configuraion ignored)
Zsalt.utils.networkr �formatr r r �
ip_address�version�log�warning)r
r r r r �
master_urir � r �I/opt/saltstack/salt/lib/python3.10/site-packages/salt/transport/zeromq.py�_get_master_uri, sV ���&���������
�r c s� e Zd ZdZdZ� fdd�Zdd� Zdd� Zd d
� Ze j
jjj
ddd
��Zedd� �Ze j
jjj
dd� �Zedd� �Zdd� Ze j
jjj
dd� �Z� ZS )�
PublishClientzw
A transport channel backed by ZeroMQ for a Salt Publisher to use to
publish commands to connected minions
�zeromqc sv t � j||fi |�� || _|| _t�tjj� | jd ���
� | _d| _t
�� | _| j�t
j�| _| jd r^| j�t
jd� | j�d�dkrO| j�t
jd� n| j�t
jtjj� | j�� n| j�t
jd� | j�t
jtjj� | jd �� tt
d �r�| j�t
j| jd
� | j�t
j| jd � | j�t
j| jd � | j�t
j| jd
� | jd }| jd r�t| jd | jd | jd �}t�d| jd | jd | jd |� t�d|� | j�t
j|� tt
d��rt�d| jd | jd � | j�t
j | jd � | jd du �sd| jd v �rtt
d��r| j�t
j!d� t"�r7| jd �r9t#| j�| _$| j$�%| j� d S d S d S )N�idF�
zmq_filtering� broadcast�__role�syndic� syndic� �
TCP_KEEPALIVE�
tcp_keepalive�tcp_keepalive_idle�tcp_keepalive_cnt�tcp_keepalive_intvlZ
recon_defaultZrecon_randomizeZ recon_maxz?Generated random reconnect delay between '%sms' and '%sms' (%s)z#Setting zmq_reconnect_ivl to '%sms'�RECONNECT_IVL_MAXz'Setting zmq_reconnect_ivl_max to '%sms'�ipv6T�:r
�IPV4ONLYr �zmq_monitor)&�super�__init__�opts�io_loop�hashlib�sha1�salt�utils�stringutils�to_bytes� hexdigest�hexid�_closingr �Context�context�socketZSUB�_socket�
setsockoptZ SUBSCRIBE�getZIDENTITY�hasattrr% �TCP_KEEPALIVE_IDLE�TCP_KEEPALIVE_CNT�TCP_KEEPALIVE_INTVLr r �debugZ
RECONNECT_IVLr* r- �HAS_ZMQ_MONITOR�ZeroMQSocketMonitor�_monitor�
start_io_loop)�selfr1 r2 �kwargsZrecon_delay�� __class__r r r0 p sz ��
��
���
���"��zPublishClient.__init__c C s� | j du rd S d| _ t| d�r| jd ur| j�� d | _t| d�r(| j�d� nt| d�r3| j�d� t| d�rE| jjdu rG| j� � d S d S d S )NTrI �_streamr r? r= F)
r; rB rI �stoprO �closer? r= �closed�term�rK r r r rQ � s
�zPublishClient.closec C � | S �Nr rT r r r � __enter__� � zPublishClient.__enter__c C � | � � d S rV �rQ �rK �exc_typeZexc_valZexc_tbr r r �__exit__� � zPublishClient.__exit__Nc C sP d| _ || _t�d| j� t�d| | j� | j�| j� |d ur&|d� d S d S )NTzCConnecting the Minion to the Master publish port, using the URI: %sz%r connecting to %s)�_connect_called�publish_portr rF �
master_pubr? �connect)rK r` Zconnect_callbackZdisconnect_callbackr r r rb � s ��zPublishClient.connectc C s( t | jd | j| j�d�| j�d�d�S )z0
Return the master publish port
r
r Zsource_publish_port�r r )r r1 r` rA rT r r r ra � s
�zPublishClient.master_pubc C s� t |�}|dkrtj�|d �}nH|dkrPtjj�|d �}| j�d�dkr-|d| j fvs9| j�d�dkrG|dvrGt
�d|� tjj
j�d ��tj�|d �}n td
�t |����tjj
j�|��)z�
Take the zmq messages, decrypt/decode them into a payload
:param list messages: A list of messages to be decoded
r
r � r! r"