HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.109.1.lve.el8.x86_64 #1 SMP Thu Mar 5 20:23:46 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.30
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //opt/saltstack/salt/lib/python3.10/site-packages/salt/utils/__pycache__/event.cpython-310.pyc
o

�N�g��@s�dZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
mZddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlZ
ddlmZe�e�Z dZ!dZ"dZ#dZ$d	d
ddd
dddddd�
Z%							d1dd�Z&d2dd�Z'd3dd�Z(dde$fdd�Z)Gdd �d �Z*Gd!d"�d"e*�Z+Gd#d$�d$e+�Z,Gd%d&�d&�Z-Gd'd(�d(e*�Z.Gd)d*�d*�Z/Gd+d,�d,e
j0j1j2�Z3Gd-d.�d.e
j0j1j2�Z4Gd/d0�d0�Z5dS)4a�
Manage events

Events are all fired off via a zeromq 'pub' socket, and listened to with local
zeromq 'sub' sockets


All of the formatting is self contained in the event module, so we should be
able to modify the structure in the future since the same module used to read
events is the same module used to fire off events.

Old style event messages were comprised of two parts delimited at the 20 char
point. The first 20 characters are used for the zeromq subscriber to match
publications and 20 characters was chosen because it was at the time a few more
characters than the length of a jid (Job ID).  Any tags of length less than 20
characters were padded with "|" chars out to 20 characters.

Although not explicit, the data for an event comprised a python dict that was
serialized by msgpack.

New style event messages support event tags longer than 20 characters while
still being backwards compatible with old style tags.

The longer tags better enable name spaced event tags which tend to be longer.
Moreover, the constraint that the event data be a python dict is now an
explicit constraint and fire-event will now raise a ValueError if not. Tags
must be ascii safe strings, that is, have values less than 0x80

Since the msgpack dict (map) indicators have values greater than or equal to
0x80 it can be unambiguously determined if the start of data is at char 21
or not.

In the new style, when the tag is longer than 20 characters, an end of tag
string is appended to the tag given by the string constant TAGEND, that is, two
line feeds '

'.  When the tag is less than 20 characters then the tag is
padded with pipes "|" out to 20 characters as before.  When the tag is exactly
20 characters no padded is done.

The get_event method intelligently figures out if the tag is longer than 20
characters.


The convention for namespacing is to use dot characters "." as the name space
delimiter. The name space "salt" is reserved by SaltStack for internal events.

For example:
Namespaced tag
    'salt.runner.manage.status.start'

�N)�MutableMapping)�SaltInvocationError)zstate.highstatez	state.slsz

�/�salt�auth�job�key�minion�syndic�run�wheel�cloud�
fileserver�queue)
rrrr	r
rrr
rrTFc	CsR|r
tjj�dd�|p|d}|dkrt||||||d�St|||||||d�S)aG
    Return an event object suitable for the named transport

    :param IOLoop io_loop: Pass in an io_loop if you want asynchronous
                           operation for obtaining events. Eg use of
                           set_event_handler() API. Otherwise, operation
                           will be synchronous.
    ZChlorinez�The 'transport' kwarg has been deprecated and it will be removed in the Chlorine release, as such, its usage is no longer required.�sock_dir�master��listen�io_loop�	keep_loop�raise_errors)r�utilsZversionsZ
warn_until�MasterEvent�	SaltEvent)�noder�	transport�optsrrrr�r�D/opt/saltstack/salt/lib/python3.10/site-packages/salt/utils/event.py�	get_eventjs0���rcCst|||||d�S)zA
    Return an event object suitable for the named transport
    )rrr)r)rrrrrrrr�get_master_event�s
�r �c
Cs�z|dg}Wn
tyYdSwt||�}zt||ddd�}|j||d�WdStyD}zt�d|||�WYd}~dSd}~ww)zO
    Fire an event containing the arguments passed to an orchestration job
    �argsrF)r)�tagz-Failed to fire args event %s with data %s: %sN)�	NameError�tagifyr �
fire_event�	Exception�log�warning)r�jidZtag_data�prefixZ
tag_suffixr#Z_event�excrrr�	fire_args�s�
���r-c	Cs�|t�||�g}t|d�r|�|�n|�|�t|�D]"\}}z
tjj�	||�||<Wqt
y>t||�||<Yqwt�
dd�|D��S)a`
    convenience function to build a namespaced event tag string
    from joining with the TABPART character the base, prefix and suffix

    If string prefix is a valid key in TAGS Then use the value of key prefix
    Else use prefix string

    If suffix is a list Then join all string elements of suffix individually
    Else use string suffix

    �appendcSsg|]}|r|�qSrr)�.0�partrrr�
<listcomp>�sztagify.<locals>.<listcomp>)�TAGS�get�hasattr�extendr.�	enumeraterr�stringutils�to_str�	TypeError�str�	TAGPARTER�join)�suffixr+�base�parts�index�_rrrr%�s

�r%c@sleZdZdZ						dLdd�Zedd��Zd	d
�ZdMdd�ZdNd
d�Z	dNdd�Z
dd�ZdOdd�Zdd�Z
edd��ZdNdd�ZdNdd�Zedd��Zed d!��Zed"d#��Zd$d%�Zd&d'�Zd(d)�ZdPd*d+�Z	,	-				dQd.d/�Zd0d1�Zd2d3�ZdRd4d5�Zejjj j!dSd7d8��Z"dTd9d:�Z#dTd;d<�Z$d=d>�Z%dUd@dA�Z&dBdC�Z'dDdE�Z(dFdG�Z)dHdI�Z*dJdK�Z+dS)Vrz�
    Warning! Use the get_event function or the code will not be
    RAET compatible
    The base class used to manage salt events
    NTFcCs||_|dur||_d|_ntjjj��|_d|_d|_d|_	d|_
d|_||_|dur.i}|dkr:tj
j��|_ntj
j��|_|j�|�|durQ|jd}n||jd<tjj��red|vred|jd<|�||�\|_|_g|_g|_|��|r�|js�|��dSdSdS)aW
        :param IOLoop io_loop: Pass in an io_loop if you want asynchronous
                               operation for obtaining events. Eg use of
                               set_event_handler() API. Otherwise, operation
                               will be synchronous.
        :param Bool keep_loop: Pass a boolean to determine if we want to keep
                               the io loop or destroy it when the event handle
                               is destroyed. This is useful when using event
                               loops from within third party asynchronous code
        NFTrr�ipc_mode�tcp)rr�_run_io_loop_syncr�ext�tornado�ioloop�IOLoop�cpub�cpush�
subscriber�pusherr�config�DEFAULT_MASTER_OPTS�copyr�DEFAULT_MINION_OPTS�updater�platform�
is_windows�_SaltEvent__load_uri�puburi�pulluri�pending_tags�pending_events�_SaltEvent__load_cache_regex�connect_pub)�selfrrrrrrrrrr�__init__�s:


�zSaltEvent.__init__cCstjjjdd�|_dS)z�
        Initialize the regular expression cache and put it in the
        class namespace. The regex search strings will be prepend with '^'
        �^)ZprependN)rr�cacheZ
CacheRegex�cache_regex)�clsrrrZ__load_cache_regexs
zSaltEvent.__load_cache_regexcCs|dkr)|jddkrt|jd�}t|jd�}n\tj�|d�}tj�|d�}nM|jddkr?t|jd�}t|jd	�}n7tt|jd
�}|j�d|jd�}|tj	j
�|����d
d�}tj�|d|�d��}tj�|d|�d��}t
�d|jj|�t
�d|jj|�||fS)z�
        Return the string URI for the location of the pull and pub sockets to
        use for firing and listening to events
        rrBrC�tcp_master_pub_port�tcp_master_pull_port�master_event_pub.ipc�master_event_pull.ipc�tcp_pub_port�
tcp_pull_port�	hash_typeZhash_id�idN�
�
minion_event_�_pub.ipc�	_pull.ipc�%s PUB socket URI: %s�%s PULL socket URI: %s)r�int�os�pathr<�getattr�hashlibr3rrr7�to_bytes�	hexdigestr(�debug�	__class__�__name__)r[rrrUrVrgZ	minion_id�id_hashrrrZ
__load_uris*�
�zSaltEvent.__load_uricCs*|durdS|�|�}|j�||g�dS)a�
        Subscribe to events matching the passed tag.

        If you do not subscribe to a tag, events will be discarded by calls to
        get_event that request a different tag. In contexts where many different
        jobs are outstanding it is important to subscribe to prevent one call
        to get_event from discarding a response required by a subsequent call
        to get_event.
        N)�_get_match_funcrWr.)r[r#�
match_type�
match_funcrrr�	subscribe<s

zSaltEvent.subscribecs||durdS|�|�}z
|j�||g�Wn	tyYnw|j}g|_|D]�t�fdd�|jD��r;|j���q'dS)zA
        Un-subscribe to events matching the passed tag.
        Nc3�"�|]\}}|�d|�VqdS�r#Nr�r/ZptagZpmatch_func�Zevtrr�	<genexpr>[��
�z(SaltEvent.unsubscribe.<locals>.<genexpr>)rzrW�remove�
ValueErrorrX�anyr.)r[r#r{r|�
old_eventsrr�r�unsubscribeKs"
����zSaltEvent.unsubscribecCs�|jrdS|jr�tjj�|j���|jdur*tjjjtj	j
j|jfd|jidd�|_z|jj
|d�d|_WnItjjjjyHt�d�YnAtye}z|jtjkrV�t�d�WYd}~n1d}~wty}ztjd|tjd	�WYd}~n d}~wwWd�|jSWd�|jSWd�|jSWd�|jS1s�wY|jS|jdur�tj	j
j|j|jd
�|_d|_|jS)z2
        Establish the publish connection
        TNr�r"�kwargsZ
loop_kwarg��timeoutz!Encountered StreamClosedExceptionz)Error opening stream, file does not existz.An exception occurred connecting publisher: %s�Zexc_info_on_loglevel�r)rIrDrr�asynchronous�current_iolooprrK�SyncWrapperr�ipcZIPCMessageSubscriberrU�connectrErF�iostream�StreamClosedErrorr(�error�OSError�errno�ENOENTr'�info�logging�DEBUG�r[r�r,rrrrZ`sb
�
�����
� �
� �
� �
� � 
��zSaltEvent.connect_pubcCs*|jsdS|j��d|_g|_d|_dS)z?
        Close the publish connection (if established)
        NF)rIrK�closerX�r[rrr�	close_pub�s

zSaltEvent.close_pub�cCsT|jrdS|jr�tjj�|j��x|jdur*tjjjtj	j
j|jfd|jidd�|_z|jj
|d�d|_Wn6tjjjjyR}zt�d|�WYd}~n(d}~wtyl}ztjd|tjd�WYd}~nd}~wwWd�|jSWd�|jSWd�|jS1s�wY|jS|jdur�tj	j
j|j|jd�|_d|_|jS)	zb
        Establish a connection with the event pull socket
        Default timeout is 1 s
        TNrr�r�zUnable to connect pusher: %sr�r�)rJrDrrr�r�rrLr�rr�ZIPCMessageClientrVr�rErFr�r�r(rvr'r�r�r�r�rrr�connect_pull�sT
�
�����
��
��
��
��zSaltEvent.connect_pullcCs$|jsdS|j��d|_d|_dS)z>
        Close the pusher connection (if established)
        NF)rJrLr�r�rrr�
close_pull�s


zSaltEvent.close_pullcCs@|�tjj�t��\}}}tjj�|�}tjj|dd�}||fS)N�utf-8)�encoding)	�	partitionrrr7rt�TAGENDr8�payload�loads)r`�raw�mtag�sepZmdata�datarrr�unpack�s
�zSaltEvent.unpackcCs$|dur	|jd}t|d|��d�S)NZevent_match_typeZ_match_tag_)rrr)r[r{rrrrz�s
zSaltEvent._get_match_funccs�|dur|��}|j}g|_d}|D]6�|�d|�r/|dur(�}t�d|�q|j���qt�fdd�|jD��rB|j���qt�d��q|S)a	Check the pending_events list for events that match the tag

        :param tag: The tag to search for
        :type tag: str
        :param tags_regex: List of re expressions to search for also
        :type tags_regex: list[re.compile()]
        :return:
        Nr#z'get_event() returning cached event = %sc3r~rrr�r�rrr��r�z+SaltEvent._check_pending.<locals>.<genexpr>zMget_event() discarding cached event that no longer has any subscriptions = %s)rzrXr(�tracer.r�rW)r[r#r|r��retrr�r�_check_pending�s(	��zSaltEvent._check_pendingcC�
|�|�S)z�
        Check if the event_tag matches the search check.
        Uses startswith to check.
        Return True (matches) or False (no match)
        )�
startswith��	event_tag�
search_tagrrr�_match_tag_startswith��
zSaltEvent._match_tag_startswithcCr�)z�
        Check if the event_tag matches the search check.
        Uses endswith to check.
        Return True (matches) or False (no match)
        )�endswithr�rrr�_match_tag_endswith�r�zSaltEvent._match_tag_endswithcCs|�|�dkS)z�
        Check if the event_tag matches the search check.
        Uses find to check.
        Return True (matches) or False (no match)
        r)�findr�rrr�_match_tag_findszSaltEvent._match_tag_findcCs|j�|��|�duS)z�
        Check if the event_tag matches the search check.
        Uses regular expression search to check.
        Return True (matches) or False (no match)
        N)r_r3�search�r[r�r�rrr�_match_tag_regexszSaltEvent._match_tag_regexcCst�||�S)z�
        Check if the event_tag matches the search check.
        Uses fnmatch to check.
        Return True (matches) or False (no match)
        )�fnmatchr�rrr�_match_tag_fnmatchszSaltEvent._match_tag_fnmatchcCs(|j�dd�r|jd|�dd�kSdS)N�subproxyFrh�proxy_targetT)rr3)r[r�rrr�_subproxy_match!szSaltEvent._subproxy_matchcs�|dur|��}t��}||}d}|durd}n|dkrd}|dur%|r+t��|kr�|dur6|dur4n�d}z2|jsB|j|d�sBWn�|jsNt�d�td��|jj	|d�}|dur[Wnm|�
|�\}	}
|
|	d��Wn(tyvdid	�YStj
jjjy�|jr��YdSty�YdSw|�d
|�r�|��d�s�t�fdd
�|jD��r�t�d��|j���|r�|t��}qt�d���St�d|�dS)NFTrr�z)Trying to get event with async subscriberz&get_event needs synchronous subscriber�r�r#�salt/event/exit�r#r�r#r�c3r~rrr��r�rrr�Os
��
�z'SaltEvent._get_event.<locals>.<genexpr>z'get_event() caching unwanted event = %szget_event() received = %sz3_get_event() waited %s seconds and received nothing)rz�timerIrZrDr(r�rrK�readr��KeyboardInterruptrrErFr�r�r�RuntimeErrorr�r�rWr�rXr.)r[�waitr#r|�no_block�startZ
timeout_atZrun_oncer�r�r�rr�r�
_get_event&s^
��zSaltEvent._get_event�r!c
	Cs�t�d|�|jsJ�|�|�}|�||�}|duritjj�|j	��?|rR|j
}	d|_
	z
|�||||�}Wntjj
jjyM|��|j|d�Yq+w|	|_
n|�||||�}Wd�n1sdwY|duso|rq|S|dS)ad
        Get a single publication.
        If no publication is available, then block for up to ``wait`` seconds.
        Return publication if it is available or ``None`` if no publication is
        available.

        If wait is 0, then block forever.

        tag
            Only return events matching the given tag. If not specified, or set
            to an empty string, all events are returned. It is recommended to
            always be selective on what is to be returned in the event that
            multiple requests are being multiplexed.

        match_type
            Set the function to match the search tag with event tags.
             - 'startswith' : search for event tags that start with tag
             - 'endswith' : search for event tags that end with tag
             - 'find' : search for event tags that contain tag
             - 'regex' : regex search '^' + tag event tags
             - 'fnmatch' : fnmatch tag event tags matching
            Default is opts['event_match_type'] or 'startswith'

            .. versionadded:: 2015.8.0

        no_block
            Define if getting the event should be a blocking call or not.
            Defaults to False to keep backwards compatibility.

            .. versionadded:: 2015.8.0

        Notes:

        Searches cached publications first. If no cached publications are found
        that match the given tag specification, new publications are received
        and checked.

        If a publication is received that does not match the tag specification,
        it is DISCARDED unless it is subscribed to via subscribe() which will
        cause it to be cached.

        If a caller is not going to call get_event immediately after sending a
        request, it MUST subscribe the result to ensure the response is not lost
        should other regions of code call get_event for other purposes.
        zGet event. tag: %sNTr�r�)r(r�rDrzr�rrr�r�rrr�rErFr�r�r�rZ)
r[r�r#�fullr{r��auto_reconnectr|r�rrrrr^s26

���zSaltEvent.get_eventcCsN|jsJ�|js|��sdS|jjdd�}|durdS|�|�\}}||d�S)zJ
        Get the raw event without blocking or any other niceties
        Nrr�r��rDrIrZrK�_readr��r[r�r�r�rrr�get_event_noblock�s

zSaltEvent.get_event_noblockcCsN|jsJ�|js|��sdS|jjdd�}|durdS|�|�\}}||d�S)z�
        Get the raw event in a blocking fashion. This is slower, but it decreases the
        possibility of dropped events.
        Nr�r�r�r�rrr�get_event_block�s

zSaltEvent.get_event_blockccs(�	|j||||d�}|durq|Vq)zJ
        Creates a generator that continuously listens for events
        T)r#r�r{r�N)r)r[r#r�r{r�r�rrr�iter_events�s���zSaltEvent.iter_events��ccs"�|j�dd�r|jd|d<t|�std��t|t�s$td|�d���|js<|dur2t|�d	}nd}|j|d
�s<dSt	j	�
���|d<t}t
jj|dd
�}t
jjj||jdddd�}t�d||�d�t
jj�|�t
jj�|�|g�}	t
jj�|	d�}
|j�|
�V}|dur�||�dSdS)��
        Send a single event into the publisher with payload dict "data" and
        event identifier "tag"

        The default is 1000 ms
        r�Frhr��
Empty tag.�Dict object expected, not '�'.Nr�r��_stampT��use_bin_type�max_event_size�Zis_msgpackedr��"Sending event: tag = %s; data = %s�r�)rr3r:r��
isinstancerrJ�floatr��datetime�utcnow�	isoformatr�rr��dumpsr�dicttrim�	trim_dictr(rvr<r7rtrL�send)r[r�r#�cbr��	timeout_s�tagend�	dump_data�serialized_data�event�msgr�rrr�fire_event_async�sD�
����zSaltEvent.fire_event_asynccCs�|j�dd�r|jd|d<t|�std��t|t�s#td|�d���|js;|dur1t|�d	}nd}|j|d
�s;dSt	j	�
���|d<t}t
jj|dd
�}t
jjj||jdddd�}t�d||�d�t
jj�|�t
jj�|�|g�}t
jj�|d�}	|jr�t
jj�|j��(z|j�|	�Wnty�}
z
tjd|
tj d��d}
~
wwWd�dS1s�wYdS|j�!|jj|	�dS)r�r�Frhr�r�r�r�Nr�r�r�Tr�r�r�r�r�r�z(Publisher send failed with exception: %sr�)"rr3r:r�r�rrJr�r�r�r�r�r�rr�r�rr�r�r(rvr<r7rtrDr�r�rrLr�r'r�r�Zspawn_callback)r[r�r#r�r�r�r�r�r�r�r,rrrr&sb
�������
���zSaltEvent.fire_eventcCs||ddd�}|�|d|�S)z�'
        Send a single event to the master, with the payload "data" and the
        event identifier "tag".

        Default timeout is 1000ms
        N)r#r��eventsZpretag�fire_master)r&)r[r�r#r�r�rrrr�OszSaltEvent.fire_mastercCsF|jdur	|��|jdur|��|jr|js!|j��dSdSdS�N)rKr�rLr�rDrrr�r�rrr�destroyYs

�zSaltEvent.destroyrc

Cs�t|dt�rE|d|}t|dt�r2|�d�}t|t�r)t|�|kr)||}ni}|d|}n!|�di�}|�|i�}|d|}n|d}|�di�}|d}zi|��D]a\}}||d<|�d�}|�d�dur�|�||d�d|d	���|d
|d
<|d|d<d|d<d
|d�d|d	��|d<||d<d|vr�|d|d<|�|t|d
d|dd|gd��qXWdSty�}	zt	j
d|	tjd�WYd}	~	dSd}	~	ww)z3
        Helper function for fire_ret_load
        �fun�retcode�returnz_|-�resultFr�.���r*rh�successzError: �user�subr�rz)Event iteration failed with exception: %sr�N)
r��listr3�len�items�splitr&r%r'r(r�r�r�)
r[�load�	fun_indexr�r�r�r#r��tagsr,rrr�_fire_ret_load_specific_funasR


������z%SaltEvent._fire_ret_load_specific_funcCs�|�d�rk|�d�rmt|dt�r^t|dt�rd}nd}tdt|d��D]5}|d|}|rIt|d�|krH|d|rH|tvrH|�||�q&|d�|d�r[|tvr[|�||�q&dS|dtvro|�|�dSdSdSdS)zE
        Fire events based on information in the return load
        r�r�TFrN)r3r�r�ranger�	SUB_EVENTr)r[rZmultifunc_orderedr	r�rrr�
fire_ret_load�s,
�����zSaltEvent.fire_ret_loadcCs$|jrJ�|js|��|j�|�S)zO
        Invoke the event_handler callback each time an event arrives.
        )rDrIrZrKZ
read_async)r[Z
event_handlerrrr�set_event_handler�s
zSaltEvent.set_event_handlercCs$z|��WdStyYdSwr�)r�r'r�rrr�__del__�s
�zSaltEvent.__del__cC�|Sr�rr�rrr�	__enter__��zSaltEvent.__enter__cG�|��dSr��r��r[r"rrr�__exit__��zSaltEvent.__exit__)NNTNFF)NNr�)r�)NF)r�r!FNFF)r!FNF)Nr�)r�)r),rx�
__module__�__qualname__�__doc__r\�classmethodrYrTr}r�rZr�r�r�r�rzr��staticmethodr�r�r�r�r�r�r�rr�r�r�rrErF�gen�	coroutiner�r&r�r�rrrrrrrrrrr�sj	
�;



*
&


!



:
�R


4
?

3 
rcs,eZdZdZ					d�fdd�	Z�ZS)r��
    Warning! Use the get_event function or the code will not be
    RAET compatible
    Create a master event management object
    NTFc	st�jd||||||d�dS)Nrr)�superr\)r[rrrrrr�rwrrr\�s	
�zMasterEvent.__init__)NTNFF�rxrrrr\�
__classcell__rrr"rr�s	�rc@seZdZdZdS)�LocalClientEventz�
    Warning! Use the get_event function or the code will not be
    RAET compatible
    This class is just used to differentiate who is handling the events,
    specially on logs, but it's the same as MasterEvent.
    N)rxrrrrrrrr%�sr%c@s:eZdZdZd
dd�Zdd�Zdd�Zd	d
�Zdd�ZdS)�NamespacedEventzG
    A wrapper for sending events within a specific base namespace
    NcCs||_||_||_dSr�)r�r>�
print_func)r[r�r>r'rrrr\�s
zNamespacedEvent.__init__cCs6|j�|t||jd��|jdur|�||�dSdS)N)r>)r�r&r%r>r')r[r�r#rrrr&�s
�zNamespacedEvent.fire_eventcCs|j��dSr�)r�r�r�rrrr�szNamespacedEvent.destroycCrr�rr�rrrrrzNamespacedEvent.__enter__cGrr�rrrrrr
rzNamespacedEvent.__exit__r�)	rxrrrr\r&r�rrrrrrr&�s
r&cs"eZdZdZd�fdd�	Z�ZS)�MinionEventr TNFcs"t�jd|�d�||||d�dS)Nr	r)rrrrr)r!r\r3)r[rrrrr"rrr\s
�zMinionEvent.__init__�TNFr#rrr"rr(sr(c@s*eZdZdZd	dd�Zdd�Zdd�ZdS)
�AsyncEventPublisherz�
    An event publisher class intended to run in an ioloop (within a single process)

    TODO: remove references to "minion_event" whenever we need to use this for other things
    Nc
Csltjj��|_|jd}|j�|�|ptjjjj	�
�|_d|_d|_
d|_tt|jd�}|tjj�|jd����dd�}tj�|jdd|�d��}tj�|�rYt�|�tj�|jdd|�d��}tj�|�rrt�|�|jd	d
kr�t|jd�}t|jd�}	n|}|}	t�d
|jj|�t�d|jj|	�|jd}
tj�|
�s�zt� |
d�Wn?t!y�}z3t�"d|�|
|krĂtj�|�s�zt� |d�Wnt!y�}zt�"d|��d}~wwWYd}~nd}~wwtj#j$j%|j||jd�|_
tj#j$j&|	|j|j'd�|_t�(d|	�tjj)�*d��|j
�+�|j�+�Wd�dS1�s/wYdS)NrFrgrhrirjrkrlrBrCrerfrmrni�zCould not create SOCK_DIR: %sr��rZpayload_handlerzStarting pull socket on %s�),rrMrPrOrrQrErFrGrH�currentr�_closing�	publisher�pullerrrrsrr7rtrurprqr<�exists�unlinkror(rvrwrx�isdir�makedirsr�r�rr��IPCMessagePublisher�IPCMessageServer�handle_publishr��files�	set_umaskr�)r[rrZdefault_minion_sock_dirrgryZepub_sock_pathZepull_sock_path�epub_uri�	epull_uriZminion_sock_dirr,rrrr\'sv
�
��
�

����
�
�
$�zAsyncEventPublisher.__init__cC�6z	|j�|�|WStytjddd�YdSw)�a
        Get something from epull, publish it out epub, and return the package (or None)
        z,Unexpected error while polling minion eventsT��exc_infoN�r/Zpublishr'r(�critical�r[�packagerArrrr7o��z"AsyncEventPublisher.handle_publishcCs@|jrdSd|_|jdur|j��|jdur|j��dSdS�NT)r.r/r�r0r�rrrr�{s


�zAsyncEventPublisher.closer�)rxrrrr\r7r�rrrrr* s

Hr*csDeZdZdZ�fdd�Zdd�Zdd�Zdd	�Z�fd
d�Z�Z	S)�EventPublisherzk
    The interface that takes master events and republishes them out to anyone
    who wants to listen
    csHt�jdi|��tjj��|_|j�|�d|_d|_	d|_
d|_dS)NFr)r!r\rrMrNrOrrQr.rr0r/)r[rr�r"rrr\�s
zEventPublisher.__init__c
Cs�|jdrtjj��st�d|jd�t�|jd�tj	j
j��|_
tjj�|j
���|jddkrCt|jd�}t|jd�}ntj�|jdd�}tj�|jdd	�}tjjj|j||j
d
�|_tjjj||j
|jd�|_tjj�d��1|j��|j��|jddkr�|jd
s�|jdr�t�tj�|jdd�d�Wd�n1s�wYt�|j �t!�"t#��z|j
��W|� �n|� �wWd�n1s�wYWd�dSWd�dS1s�wYdS)z:
        Bind the pub and pull sockets for events
        Zevent_publisher_nicenessz%setting EventPublisher niceness to %irBrCrarbrrcrdr�r+r,Z
publisher_aclZ
external_authi�N)$rrrrRrSr(r�rp�nicerErFrGrHrr�r�rorqr<rr�r5r/r6r7r0r8r9r��chmod�atexit�registerr��
contextlib�suppressr�)r[r:r;rrrr�s^�
��
��

�������"�zEventPublisher.runcCr<)r=z,Unexpected error while polling master eventsTr>Nr@rBrrrr7�rDzEventPublisher.handle_publishcCsr|jrdSd|_t�|j�|jdur|j��d|_|jdur(|j��d|_|jdur7|j��d|_dSdSrE)r.rI�
unregisterr�r/r0rr�rrrr��s






�zEventPublisher.closecs|��t��||�dSr�)r�r!�_handle_signals�r[�signumZsigframer"rrrN�szEventPublisher._handle_signals)
rxrrrr\rr7r�rNr$rrr"rrF�s	5rFcsLeZdZdZ�fdd�Z�fdd�Zdd�Zdd	�Zd
d�Zdd
�Z	�Z
S)�EventReturnz�
    A dedicated process which listens to the master event bus and queues
    and forwards events to the specified returner.
    cslddl}t�jdi|��||_|jd|_|j�dd�|_|j��}d|d<|j�	|�|_g|_
d|_dS)	z[
        Initialize the EventReturn system

        Return an EventReturn instance
        rN�event_return_queue�event_return_queue_max_seconds�localZfile_clientFr)Zsalt.minionr!r\rrRr3rSrOr	ZMasterMinion�event_queue�stop)r[rr�rZlocal_minion_optsr"rrr\�s�

zEventReturn.__init__cs&|jr|��d|_t��||�dSrE)rU�flush_eventsrVr!rNrOr"rrrNszEventReturn._handle_signalscCs~t|jdt�r!|jdD]}t�d|�|�d�}|�|�q
nt�d|jd�d�|jd�}|�|�|jdd�=dS)N�event_returnz'Calling event returner %s, one of many.z
.event_returnz/Calling event returner %s, only one configured.z{}.event_return)r�rrr(rv�_flush_event_single�formatrU)r[�rrXrrrrW
s
��
zEventReturn.flush_eventsc
Cs�||jjvrAz|jj||j�WdSty@}z"t�d||�tjtjkr5t�	d|j�WYd}~dSWYd}~dSd}~wwt�d|�dS)Nz;Could not store events - returner '%s' raised exception: %sz'Event data that caused an exception: %sz>Could not store return for event(s) - returner '%s' not found.)
r	Z	returnersrUr'r(r��levelr�r�rv)r[rXr,rrrrYs(�����
�zEventReturn._flush_event_singlecCs�|jdrtjj��st�d|jd�t�|jd�t	d|jdd�|_
|j
jdd�}|j
�id�z�d}|D]f}|d	d
krCd|_
|�|�rN|j�|�d}|jdkr�tj��}|s^|}||j}|dkrmt�d
|�||jkrwd}d}nd}|r�t�d�t|j�|jks�|r�t�dt|j��|��d}|j
r�nq8W|jr�t�dt|j��|��dSdS|jr�t�dt|j��|��ww)z9
        Spin up the multiprocess event returner
        Zevent_return_nicenessz"setting EventReturn niceness to %irT)rr)r�zsalt/event_listen/startNr#r�Frz(Oldest event in queue is %s seconds old.z9Oldest event has been in queue too long, will flush queuezFlushing %s events.)rrrrRrSr(r�rprGrr�r�r&rV�_filterrUr.rSr��now�secondsrvrrRrW)r[r�Zoldesteventr�Ztoo_long_in_queueZrightnowZage_in_secondsrrrr4sd
�



�
����
�
�zEventReturn.runcCsj|d}|jdrd}nd}|jdD]}t�||�rd}nq|jdD]
}t�||�r2d}|Sq%|S)z�
        Take an event and run it through configured filters.

        Returns True if event should be stored, else False
        r#Zevent_return_whitelistFTZevent_return_blacklist)rr�)r[r�r#r�Zwhitelist_matchZblacklist_matchrrrr]ys 
��zEventReturn._filter)rxrrrr\rNrWrYrr]r$rrr"rrQ�sErQc@s,eZdZdZd	dd�Zd	dd�Zdd�ZdS)
�	StateFirez�
    Evaluate the data from a state run and fire events on the master and minion
    for each returned chunk that is not "green"
    This object is made to only run on a minion
    NcCs(||_|stj�|j�|_dS||_dSr�)rrZcryptZSAuthr)r[rrrrrr\�s
zStateFire.__init__cCs�i}|r	|�|�|�|jd||d|j�d�d��tjjj�|j��4}z|�	|�Wnt
yG}ztjd|t
jd�WYd}~n
d}~wwWd�dSWd�dS1s[wYdS)	z�
        Fire an event off on the master server

        CLI Example:

        .. code-block:: bash

            salt '*' event.fire_master 'stuff to be in the event' 'tag'
        rh�
_minion_eventssalt)rhr#r��cmd�tok�(An exception occurred on fire_master: %sr�NT)rQrrZ	gen_tokenr�channel�client�
ReqChannel�factoryr�r'r(r�r�r�)r[r�r#Zpreloadrrer,rrrr��s<


��
����
�	�
�	�	zStateFire.fire_mastercs|jdgdd�}t��fdd�d�D]/}�|dr"�|ds"qd	�t�|d��|dr3d
nd�}|d�|�|d
��qtjjj�	|j��4}z|�
|�Wntyo}ztj
d|tjd�WYd}~n
d}~wwWd�dSWd�dS1s�wYdS)z�
        Pass in a state "running" dict, this is the return dict from a state
        call. The dict will be processed and fire events.

        By default yellows and reds fire events on the master and minion, but
        this can be configured.
        rhra)rhr�rbcs�|�dd�S)NZ__run_num__r)r3)�k��runningrr�<lambda>�sz(StateFire.fire_running.<locals>.<lambda>)rr�Zchangeszstate_{}_{}�True�Falser�r�rdr�NT)r�sortedrZr:r.rrerfrgrhr�r'r(r�r�r�)r[rkrZstagr#rer,rrjr�fire_running�s8�����
�	�
�	�	zStateFire.fire_runningr�)rxrrrr\r�rprrrrr`�s


#r`)NNNTNFFr))r!)6rrIrKr�r�r�rsr�rpr��collections.abcrZsalt.channel.clientrZsalt.configZsalt.defaults.exitcodesZsalt.ext.tornado.ioloopZsalt.ext.tornado.iostreamZsalt.payloadZsalt.transport.ipcZsalt.utils.asynchronousZsalt.utils.cacheZsalt.utils.dicttrimZsalt.utils.filesZsalt.utils.platformZsalt.utils.processZsalt.utils.stringutilsZsalt.utils.zeromqZsalt.exceptionsr�	getLoggerrxr(r
r�r;ZSALTr2rr r-r%rrr%r&r(r*r�processZSignalHandlingProcessrFrQr`rrrr�<module>s�3
�
�
/
		ed'