HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.109.1.lve.el8.x86_64 #1 SMP Thu Mar 5 20:23:46 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.30
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //proc/thread-self/root/opt/saltstack/salt/lib/python3.10/site-packages/salt/transport/tcp.py
"""
TCP transport classes

Wire protocol: "len(payload) msgpack({'head': SOMEHEADER, 'body': SOMEBODY})"


"""

import errno
import logging
import multiprocessing
import os
import queue
import socket
import threading
import urllib
import uuid

import salt.ext.tornado
import salt.ext.tornado.concurrent
import salt.ext.tornado.gen
import salt.ext.tornado.iostream
import salt.ext.tornado.netutil
import salt.ext.tornado.tcpclient
import salt.ext.tornado.tcpserver
import salt.master
import salt.payload
import salt.transport.frame
import salt.transport.ipc
import salt.utils.asynchronous
import salt.utils.files
import salt.utils.msgpack
import salt.utils.platform
import salt.utils.versions
from salt.exceptions import SaltClientError, SaltReqTimeoutError
from salt.utils.network import ip_bracket
from salt.utils.process import SignalHandlingProcess

if salt.utils.platform.is_windows():
    USE_LOAD_BALANCER = True
else:
    USE_LOAD_BALANCER = False

if USE_LOAD_BALANCER:
    import salt.ext.tornado.util

log = logging.getLogger(__name__)


class ClosingError(Exception):
    """ """


def _get_socket(opts):
    family = socket.AF_INET
    if opts.get("ipv6", False):
        family = socket.AF_INET6
    return socket.socket(family, socket.SOCK_STREAM)


def _get_bind_addr(opts, port_type):
    return (
        ip_bracket(opts["interface"], strip=True),
        int(opts[port_type]),
    )


def _set_tcp_keepalive(sock, opts):
    """
    Ensure that TCP keepalives are set for the socket.
    """
    if hasattr(socket, "SO_KEEPALIVE"):
        if opts.get("tcp_keepalive", False):
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
            if hasattr(socket, "SOL_TCP"):
                if hasattr(socket, "TCP_KEEPIDLE"):
                    tcp_keepalive_idle = opts.get("tcp_keepalive_idle", -1)
                    if tcp_keepalive_idle > 0:
                        sock.setsockopt(
                            socket.SOL_TCP, socket.TCP_KEEPIDLE, int(tcp_keepalive_idle)
                        )
                if hasattr(socket, "TCP_KEEPCNT"):
                    tcp_keepalive_cnt = opts.get("tcp_keepalive_cnt", -1)
                    if tcp_keepalive_cnt > 0:
                        sock.setsockopt(
                            socket.SOL_TCP, socket.TCP_KEEPCNT, int(tcp_keepalive_cnt)
                        )
                if hasattr(socket, "TCP_KEEPINTVL"):
                    tcp_keepalive_intvl = opts.get("tcp_keepalive_intvl", -1)
                    if tcp_keepalive_intvl > 0:
                        sock.setsockopt(
                            socket.SOL_TCP,
                            socket.TCP_KEEPINTVL,
                            int(tcp_keepalive_intvl),
                        )
            if hasattr(socket, "SIO_KEEPALIVE_VALS"):
                # Windows doesn't support TCP_KEEPIDLE, TCP_KEEPCNT, nor
                # TCP_KEEPINTVL. Instead, it has its own proprietary
                # SIO_KEEPALIVE_VALS.
                tcp_keepalive_idle = opts.get("tcp_keepalive_idle", -1)
                tcp_keepalive_intvl = opts.get("tcp_keepalive_intvl", -1)
                # Windows doesn't support changing something equivalent to
                # TCP_KEEPCNT.
                if tcp_keepalive_idle > 0 or tcp_keepalive_intvl > 0:
                    # Windows defaults may be found by using the link below.
                    # Search for 'KeepAliveTime' and 'KeepAliveInterval'.
                    # https://technet.microsoft.com/en-us/library/bb726981.aspx#EDAA
                    # If one value is set and the other isn't, we still need
                    # to send both values to SIO_KEEPALIVE_VALS and they both
                    # need to be valid. So in that case, use the Windows
                    # default.
                    if tcp_keepalive_idle <= 0:
                        tcp_keepalive_idle = 7200
                    if tcp_keepalive_intvl <= 0:
                        tcp_keepalive_intvl = 1
                    # The values expected are in milliseconds, so multiply by
                    # 1000.
                    sock.ioctl(
                        socket.SIO_KEEPALIVE_VALS,
                        (
                            1,
                            int(tcp_keepalive_idle * 1000),
                            int(tcp_keepalive_intvl * 1000),
                        ),
                    )
        else:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 0)


class LoadBalancerServer(SignalHandlingProcess):
    """
    Raw TCP server which runs in its own process and will listen
    for incoming connections. Each incoming connection will be
    sent via multiprocessing queue to the workers.
    Since the queue is shared amongst workers, only one worker will
    handle a given connection.
    """

    # TODO: opts!
    # Based on default used in salt.ext.tornado.netutil.bind_sockets()
    backlog = 128

    def __init__(self, opts, socket_queue, **kwargs):
        super().__init__(**kwargs)
        self.opts = opts
        self.socket_queue = socket_queue
        self._socket = None

    def close(self):
        if self._socket is not None:
            self._socket.shutdown(socket.SHUT_RDWR)
            self._socket.close()
            self._socket = None

    # pylint: disable=W1701
    def __del__(self):
        self.close()

    # pylint: enable=W1701

    def run(self):
        """
        Start the load balancer
        """
        self._socket = _get_socket(self.opts)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        _set_tcp_keepalive(self._socket, self.opts)
        self._socket.setblocking(1)
        self._socket.bind(_get_bind_addr(self.opts, "ret_port"))
        self._socket.listen(self.backlog)

        while True:
            try:
                # Wait for a connection to occur since the socket is
                # blocking.
                connection, address = self._socket.accept()
                # Wait for a free slot to be available to put
                # the connection into.
                # Sockets are picklable on Windows in Python 3.
                self.socket_queue.put((connection, address), True, None)
            except OSError as e:
                # ECONNABORTED indicates that there was a connection
                # but it was closed while still in the accept queue.
                # (observed on FreeBSD).
                if salt.ext.tornado.util.errno_from_exception(e) == errno.ECONNABORTED:
                    continue
                raise


class Resolver:

    _resolver_configured = False

    @classmethod
    def _config_resolver(cls, num_threads=10):
        salt.ext.tornado.netutil.Resolver.configure(
            "salt.ext.tornado.netutil.ThreadedResolver", num_threads=num_threads
        )
        cls._resolver_configured = True

    def __init__(self, *args, **kwargs):
        if not self._resolver_configured:
            # TODO: add opt to specify number of resolver threads
            self._config_resolver()


class TCPPubClient(salt.transport.base.PublishClient):
    """
    Tornado based TCP Pub Client
    """

    ttype = "tcp"

    def __init__(self, opts, io_loop, **kwargs):  # pylint: disable=W0231
        super().__init__(opts, io_loop, **kwargs)
        self.opts = opts
        self.io_loop = io_loop
        self.message_client = None
        self.connected = False
        self._closing = False
        self.resolver = Resolver()

    def close(self):
        if self._closing:
            return
        self._closing = True
        if self.message_client is not None:
            self.message_client.close()
            self.message_client = None

    @salt.ext.tornado.gen.coroutine
    def connect(self, publish_port, connect_callback=None, disconnect_callback=None):
        self._connect_called = True
        self.publish_port = publish_port
        self.message_client = MessageClient(
            self.opts,
            self.opts["master_ip"],
            int(self.publish_port),
            io_loop=self.io_loop,
            connect_callback=connect_callback,
            disconnect_callback=disconnect_callback,
            source_ip=self.opts.get("source_ip"),
            source_port=self.opts.get("source_publish_port"),
        )
        yield self.message_client.connect()  # wait for the client to be connected
        self.connected = True

    @salt.ext.tornado.gen.coroutine
    def _decode_messages(self, messages):
        if not isinstance(messages, dict):
            # TODO: For some reason we need to decode here for things
            #       to work. Fix this.
            body = salt.utils.msgpack.loads(messages)
            body = salt.transport.frame.decode_embedded_strs(body)
        else:
            body = messages
        raise salt.ext.tornado.gen.Return(body)

    @salt.ext.tornado.gen.coroutine
    def send(self, msg):
        yield self.message_client._stream.write(msg)

    def on_recv(self, callback):
        """
        Register an on_recv callback
        """
        return self.message_client.on_recv(callback)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


class TCPReqServer(salt.transport.base.DaemonizedRequestServer):
    """
    Tornado based TCP Request/Reply Server

    :param dict opts: Salt master config options.
    """

    # TODO: opts!
    backlog = 5

    def __init__(self, opts):  # pylint: disable=W0231
        self.opts = opts
        self._socket = None
        self.req_server = None

    @property
    def socket(self):
        return self._socket

    def close(self):
        if self._socket is not None:
            try:
                self._socket.shutdown(socket.SHUT_RDWR)
            except OSError as exc:
                if exc.errno == errno.ENOTCONN:
                    # We may try to shutdown a socket which is already disconnected.
                    # Ignore this condition and continue.
                    pass
                else:
                    raise
            if self.req_server is None:
                # We only close the socket if we don't have a req_server instance.
                # If we did, because the req_server is also handling this socket, when we call
                # req_server.stop(), tornado will give us an AssertionError because it's trying to
                # match the socket.fileno() (after close it's -1) to the fd it holds on it's _sockets cache
                # so it can remove the socket from the IOLoop handlers
                self._socket.close()
            self._socket = None
        if self.req_server is not None:
            try:
                self.req_server.close()
            except OSError as exc:
                if exc.errno != 9:
                    raise
                log.exception(
                    "TCPReqServerChannel close generated an exception: %s", str(exc)
                )
            self.req_server = None

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

    def pre_fork(self, process_manager):
        """
        Pre-fork we need to create the zmq router device
        """
        if USE_LOAD_BALANCER:
            self.socket_queue = multiprocessing.Queue()
            process_manager.add_process(
                LoadBalancerServer,
                args=(self.opts, self.socket_queue),
                name="LoadBalancerServer",
            )
        elif not salt.utils.platform.is_windows():
            self._socket = _get_socket(self.opts)
            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            _set_tcp_keepalive(self._socket, self.opts)
            self._socket.setblocking(0)
            self._socket.bind(_get_bind_addr(self.opts, "ret_port"))

    def post_fork(self, message_handler, io_loop):
        """
        After forking we need to create all of the local sockets to listen to the
        router

        message_handler: function to call with your payloads
        """
        self.message_handler = message_handler

        with salt.utils.asynchronous.current_ioloop(io_loop):
            if USE_LOAD_BALANCER:
                self.req_server = LoadBalancerWorker(
                    self.socket_queue,
                    self.handle_message,
                    ssl_options=self.opts.get("ssl"),
                )
            else:
                if salt.utils.platform.is_windows():
                    self._socket = _get_socket(self.opts)
                    self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                    _set_tcp_keepalive(self._socket, self.opts)
                    self._socket.setblocking(0)
                    self._socket.bind(_get_bind_addr(self.opts, "ret_port"))
                self.req_server = SaltMessageServer(
                    self.handle_message,
                    ssl_options=self.opts.get("ssl"),
                    io_loop=io_loop,
                )
                self.req_server.add_socket(self._socket)
                self._socket.listen(self.backlog)

    @salt.ext.tornado.gen.coroutine
    def handle_message(self, stream, payload, header=None):
        payload = self.decode_payload(payload)
        reply = yield self.message_handler(payload)
        stream.write(salt.transport.frame.frame_msg(reply, header=header))

    def decode_payload(self, payload):
        return payload


class SaltMessageServer(salt.ext.tornado.tcpserver.TCPServer):
    """
    Raw TCP server which will receive all of the TCP streams and re-assemble
    messages that are sent through to us
    """

    def __init__(self, message_handler, *args, **kwargs):
        io_loop = (
            kwargs.pop("io_loop", None) or salt.ext.tornado.ioloop.IOLoop.current()
        )
        self._closing = False
        super().__init__(*args, **kwargs)
        self.io_loop = io_loop
        self.clients = []
        self.message_handler = message_handler

    @salt.ext.tornado.gen.coroutine
    def handle_stream(  # pylint: disable=arguments-differ
        self,
        stream,
        address,
        _StreamClosedError=salt.ext.tornado.iostream.StreamClosedError,
    ):
        """
        Handle incoming streams and add messages to the incoming queue
        """
        log.trace("Req client %s connected", address)
        self.clients.append((stream, address))
        unpacker = salt.utils.msgpack.Unpacker()
        try:
            while True:
                wire_bytes = yield stream.read_bytes(4096, partial=True)
                unpacker.feed(wire_bytes)
                for framed_msg in unpacker:
                    framed_msg = salt.transport.frame.decode_embedded_strs(framed_msg)
                    header = framed_msg["head"]
                    self.io_loop.spawn_callback(
                        self.message_handler, stream, framed_msg["body"], header
                    )
        except _StreamClosedError:
            log.trace("req client disconnected %s", address)
            self.remove_client((stream, address))
        except Exception as e:  # pylint: disable=broad-except
            log.trace("other master-side exception: %s", e, exc_info=True)
            self.remove_client((stream, address))
            stream.close()

    def remove_client(self, client):
        try:
            self.clients.remove(client)
        except ValueError:
            log.trace("Message server client was not in list to remove")

    def close(self):
        """
        Close the server
        """
        if self._closing:
            return
        self._closing = True
        for item in self.clients:
            client, address = item
            client.close()
            self.remove_client(item)
        try:
            self.stop()
        except OSError as exc:
            if exc.errno != 9:
                raise


class LoadBalancerWorker(SaltMessageServer):
    """
    This will receive TCP connections from 'LoadBalancerServer' via
    a multiprocessing queue.
    Since the queue is shared amongst workers, only one worker will handle
    a given connection.
    """

    def __init__(self, socket_queue, message_handler, *args, **kwargs):
        super().__init__(message_handler, *args, **kwargs)
        self.socket_queue = socket_queue
        self._stop = threading.Event()
        self.thread = threading.Thread(target=self.socket_queue_thread)
        self.thread.start()

    def close(self):
        self._stop.set()
        self.thread.join()
        super().close()

    def socket_queue_thread(self):
        try:
            while True:
                try:
                    client_socket, address = self.socket_queue.get(True, 1)
                except queue.Empty:
                    if self._stop.is_set():
                        break
                    continue
                # 'self.io_loop' initialized in super class
                # 'salt.ext.tornado.tcpserver.TCPServer'.
                # 'self._handle_connection' defined in same super class.
                self.io_loop.spawn_callback(
                    self._handle_connection, client_socket, address
                )
        except (KeyboardInterrupt, SystemExit):
            pass


class TCPClientKeepAlive(salt.ext.tornado.tcpclient.TCPClient):
    """
    Override _create_stream() in TCPClient to enable keep alive support.
    """

    def __init__(self, opts, resolver=None):
        self.opts = opts
        super().__init__(resolver=resolver)

    def _create_stream(
        self, max_buffer_size, af, addr, **kwargs
    ):  # pylint: disable=unused-argument,arguments-differ
        """
        Override _create_stream() in TCPClient.

        Tornado 4.5 added the kwargs 'source_ip' and 'source_port'.
        Due to this, use **kwargs to swallow these and any future
        kwargs to maintain compatibility.
        """
        # Always connect in plaintext; we'll convert to ssl if necessary
        # after one connection has completed.
        sock = _get_socket(self.opts)
        _set_tcp_keepalive(sock, self.opts)
        stream = salt.ext.tornado.iostream.IOStream(
            sock, max_buffer_size=max_buffer_size
        )
        if salt.ext.tornado.version_info < (5,):
            return stream.connect(addr)
        return stream, stream.connect(addr)


# TODO consolidate with IPCClient
# TODO: limit in-flight messages.
# TODO: singleton? Something to not re-create the tcp connection so much
class MessageClient:
    """
    Low-level message sending client
    """

    def __init__(
        self,
        opts,
        host,
        port,
        io_loop=None,
        resolver=None,
        connect_callback=None,
        disconnect_callback=None,
        source_ip=None,
        source_port=None,
    ):
        self.opts = opts
        self.host = host
        self.port = port
        self.source_ip = source_ip
        self.source_port = source_port
        self.connect_callback = connect_callback
        self.disconnect_callback = disconnect_callback
        self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current()
        with salt.utils.asynchronous.current_ioloop(self.io_loop):
            self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver)
        # TODO: max queue size
        self.send_future_map = {}  # mapping of request_id -> Future

        self._read_until_future = None
        self._on_recv = None
        self._closing = False
        self._closed = False
        self._connecting_future = salt.ext.tornado.concurrent.Future()
        self._stream_return_running = False
        self._stream = None

        self.backoff = opts.get("tcp_reconnect_backoff", 1)

    # TODO: timeout inflight sessions
    def close(self):
        if self._closing:
            return
        self._closing = True
        self.io_loop.add_timeout(1, self.check_close)

    @salt.ext.tornado.gen.coroutine
    def check_close(self):
        if not self.send_future_map:
            self._tcp_client.close()
            self._stream = None
            self._closing = False
            self._closed = True
        else:
            self.io_loop.add_timeout(1, self.check_close)

    # pylint: disable=W1701
    def __del__(self):
        self.close()

    # pylint: enable=W1701

    @salt.ext.tornado.gen.coroutine
    def getstream(self, **kwargs):
        if self.source_ip or self.source_port:
            kwargs = {
                "source_ip": self.source_ip,
                "source_port": self.source_port,
            }
        stream = None
        while stream is None and (not self._closed and not self._closing):
            try:
                stream = yield self._tcp_client.connect(
                    ip_bracket(self.host, strip=True),
                    self.port,
                    ssl_options=self.opts.get("ssl"),
                    **kwargs
                )
            except Exception as exc:  # pylint: disable=broad-except
                log.warning(
                    "TCP Message Client encountered an exception while connecting to"
                    " %s:%s: %r, will reconnect in %d seconds",
                    self.host,
                    self.port,
                    exc,
                    self.backoff,
                )
                yield salt.ext.tornado.gen.sleep(self.backoff)
        raise salt.ext.tornado.gen.Return(stream)

    @salt.ext.tornado.gen.coroutine
    def connect(self):
        if self._stream is None:
            self._stream = yield self.getstream()
            if self._stream:
                if not self._stream_return_running:
                    self.io_loop.spawn_callback(self._stream_return)
                if self.connect_callback:
                    self.connect_callback(True)

    @salt.ext.tornado.gen.coroutine
    def _stream_return(self):
        self._stream_return_running = True
        unpacker = salt.utils.msgpack.Unpacker()
        while not self._closing:
            try:
                wire_bytes = yield self._stream.read_bytes(4096, partial=True)
                unpacker.feed(wire_bytes)
                for framed_msg in unpacker:
                    framed_msg = salt.transport.frame.decode_embedded_strs(framed_msg)
                    header = framed_msg["head"]
                    body = framed_msg["body"]
                    message_id = header.get("mid")

                    if message_id in self.send_future_map:
                        self.send_future_map.pop(message_id).set_result(body)
                        # self.remove_message_timeout(message_id)
                    else:
                        if self._on_recv is not None:
                            self.io_loop.spawn_callback(self._on_recv, header, body)
                        else:
                            log.error(
                                "Got response for message_id %s that we are not"
                                " tracking",
                                message_id,
                            )
            except salt.ext.tornado.iostream.StreamClosedError as e:
                log.debug(
                    "tcp stream to %s:%s closed, unable to recv",
                    self.host,
                    self.port,
                )
                for future in self.send_future_map.values():
                    future.set_exception(e)
                self.send_future_map = {}
                if self._closing or self._closed:
                    return
                if self.disconnect_callback:
                    self.disconnect_callback()
                stream = self._stream
                self._stream = None
                if stream:
                    stream.close()
                unpacker = salt.utils.msgpack.Unpacker()
                yield self.connect()
            except TypeError:
                # This is an invalid transport
                if "detect_mode" in self.opts:
                    log.info(
                        "There was an error trying to use TCP transport; "
                        "attempting to fallback to another transport"
                    )
                else:
                    raise SaltClientError
            except Exception as e:  # pylint: disable=broad-except
                log.error("Exception parsing response", exc_info=True)
                for future in self.send_future_map.values():
                    future.set_exception(e)
                self.send_future_map = {}
                if self._closing or self._closed:
                    return
                if self.disconnect_callback:
                    self.disconnect_callback()
                stream = self._stream
                self._stream = None
                if stream:
                    stream.close()
                unpacker = salt.utils.msgpack.Unpacker()
                yield self.connect()
        self._stream_return_running = False

    def _message_id(self):
        return str(uuid.uuid4())

    # TODO: return a message object which takes care of multiplexing?
    def on_recv(self, callback):
        """
        Register a callback for received messages (that we didn't initiate)
        """
        if callback is None:
            self._on_recv = callback
        else:

            def wrap_recv(header, body):
                callback(body)

            self._on_recv = wrap_recv

    def remove_message_timeout(self, message_id):
        if message_id not in self.send_timeout_map:
            return
        timeout = self.send_timeout_map.pop(message_id)
        self.io_loop.remove_timeout(timeout)

    def timeout_message(self, message_id, msg):
        if message_id not in self.send_future_map:
            return
        future = self.send_future_map.pop(message_id)
        if future is not None:
            future.set_exception(SaltReqTimeoutError("Message timed out"))

    @salt.ext.tornado.gen.coroutine
    def send(self, msg, timeout=None, callback=None, raw=False):
        if self._closing:
            raise ClosingError()
        message_id = self._message_id()
        header = {"mid": message_id}

        future = salt.ext.tornado.concurrent.Future()

        if callback is not None:

            def handle_future(future):
                response = future.result()
                self.io_loop.add_callback(callback, response)

            future.add_done_callback(handle_future)
        # Add this future to the mapping
        self.send_future_map[message_id] = future

        if self.opts.get("detect_mode") is True:
            timeout = 1

        if timeout is not None:
            self.io_loop.call_later(timeout, self.timeout_message, message_id, msg)

        item = salt.transport.frame.frame_msg(msg, header=header)

        @salt.ext.tornado.gen.coroutine
        def _do_send():
            yield self.connect()
            # If the _stream is None, we failed to connect.
            if self._stream:
                yield self._stream.write(item)

        # Run send in a callback so we can wait on the future, in case we time
        # out before we are able to connect.
        self.io_loop.add_callback(_do_send)
        recv = yield future
        raise salt.ext.tornado.gen.Return(recv)


class Subscriber:
    """
    Client object for use with the TCP publisher server
    """

    def __init__(self, stream, address):
        self.stream = stream
        self.address = address
        self._closing = False
        self._read_until_future = None
        self.id_ = None

    def close(self):
        if self._closing:
            return
        self._closing = True
        if not self.stream.closed():
            self.stream.close()
            if self._read_until_future is not None and self._read_until_future.done():
                # This will prevent this message from showing up:
                # '[ERROR   ] Future exception was never retrieved:
                # StreamClosedError'
                # This happens because the logic is always waiting to read
                # the next message and the associated read future is marked
                # 'StreamClosedError' when the stream is closed.
                self._read_until_future.exception()

    # pylint: disable=W1701
    def __del__(self):
        self.close()

    # pylint: enable=W1701


class PubServer(salt.ext.tornado.tcpserver.TCPServer):
    """
    TCP publisher
    """

    def __init__(
        self, opts, io_loop=None, presence_callback=None, remove_presence_callback=None
    ):
        super().__init__(ssl_options=opts.get("ssl"))
        self.io_loop = io_loop
        self.opts = opts
        self._closing = False
        self.clients = set()
        self.presence_events = False
        if presence_callback:
            self.presence_callback = presence_callback
        else:
            self.presence_callback = lambda subscriber, msg: msg
        if remove_presence_callback:
            self.remove_presence_callback = remove_presence_callback
        else:
            self.remove_presence_callback = lambda subscriber: subscriber

    def close(self):
        if self._closing:
            return
        self._closing = True
        for client in self.clients:
            client.stream.disconnect()

    # pylint: disable=W1701
    def __del__(self):
        self.close()

    # pylint: enable=W1701

    @salt.ext.tornado.gen.coroutine
    def _stream_read(self, client):
        unpacker = salt.utils.msgpack.Unpacker()
        while not self._closing:
            try:
                client._read_until_future = client.stream.read_bytes(4096, partial=True)
                wire_bytes = yield client._read_until_future
                unpacker.feed(wire_bytes)
                for framed_msg in unpacker:
                    framed_msg = salt.transport.frame.decode_embedded_strs(framed_msg)
                    body = framed_msg["body"]
                    if self.presence_callback:
                        self.presence_callback(client, body)
            except salt.ext.tornado.iostream.StreamClosedError as e:
                log.debug("tcp stream to %s closed, unable to recv", client.address)
                client.close()
                self.remove_presence_callback(client)
                self.clients.discard(client)
                break
            except Exception as e:  # pylint: disable=broad-except
                log.error(
                    "Exception parsing response from %s", client.address, exc_info=True
                )
                continue

    def handle_stream(self, stream, address):
        log.debug("Subscriber at %s connected", address)
        client = Subscriber(stream, address)
        self.clients.add(client)
        self.io_loop.spawn_callback(self._stream_read, client)

    # TODO: ACK the publish through IPC
    @salt.ext.tornado.gen.coroutine
    def publish_payload(self, package, topic_list=None):
        log.trace("TCP PubServer sending payload: %s \n\n %r", package, topic_list)
        payload = salt.transport.frame.frame_msg(package)
        to_remove = []
        if topic_list:
            for topic in topic_list:
                sent = False
                for client in self.clients:
                    if topic == client.id_:
                        try:
                            # Write the packed str
                            yield client.stream.write(payload)
                            sent = True
                            # self.io_loop.add_future(f, lambda f: True)
                        except salt.ext.tornado.iostream.StreamClosedError:
                            to_remove.append(client)
                if not sent:
                    log.debug("Publish target %s not connected %r", topic, self.clients)
        else:
            for client in self.clients:
                try:
                    # Write the packed str
                    yield client.stream.write(payload)
                except salt.ext.tornado.iostream.StreamClosedError:
                    to_remove.append(client)
        for client in to_remove:
            log.debug(
                "Subscriber at %s has disconnected from publisher", client.address
            )
            client.close()
            self.remove_presence_callback(client)
            self.clients.discard(client)
        log.trace("TCP PubServer finished publishing payload")


class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
    """
    Tornado based TCP PublishServer
    """

    # TODO: opts!
    # Based on default used in salt.ext.tornado.netutil.bind_sockets()
    backlog = 128

    def __init__(self, opts):
        self.opts = opts
        self.pub_sock = None

    @property
    def topic_support(self):
        return not self.opts.get("order_masters", False)

    def __setstate__(self, state):
        self.__init__(state["opts"])

    def __getstate__(self):
        return {"opts": self.opts}

    def publish_daemon(
        self,
        publish_payload,
        presence_callback=None,
        remove_presence_callback=None,
    ):
        """
        Bind to the interface specified in the configuration file
        """
        io_loop = salt.ext.tornado.ioloop.IOLoop()
        io_loop.make_current()
        self.io_loop = io_loop

        # Spin up the publisher
        self.pub_server = pub_server = PubServer(
            self.opts,
            io_loop=io_loop,
            presence_callback=presence_callback,
            remove_presence_callback=remove_presence_callback,
        )
        sock = _get_socket(self.opts)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        _set_tcp_keepalive(sock, self.opts)
        sock.setblocking(0)
        sock.bind(_get_bind_addr(self.opts, "publish_port"))
        sock.listen(self.backlog)
        # pub_server will take ownership of the socket
        pub_server.add_socket(sock)

        # Set up Salt IPC server
        if self.opts.get("ipc_mode", "") == "tcp":
            pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514))
        else:
            pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
        self.pub_server = pub_server
        pull_sock = salt.transport.ipc.IPCMessageServer(
            pull_uri,
            io_loop=io_loop,
            payload_handler=publish_payload,
        )

        # Securely create socket
        log.warning("Starting the Salt Puller on %s", pull_uri)
        with salt.utils.files.set_umask(0o177):
            pull_sock.start()

        # run forever
        try:
            io_loop.start()
        except (KeyboardInterrupt, SystemExit):
            pass
        finally:
            pull_sock.close()

    def pre_fork(self, process_manager):
        """
        Do anything necessary pre-fork. Since this is on the master side this will
        primarily be used to create IPC channels and create our daemon process to
        do the actual publishing
        """
        process_manager.add_process(self.publish_daemon, name=self.__class__.__name__)

    @salt.ext.tornado.gen.coroutine
    def publish_payload(self, payload, *args):
        ret = yield self.pub_server.publish_payload(payload, *args)
        raise salt.ext.tornado.gen.Return(ret)

    def publish(self, payload, **kwargs):
        """
        Publish "load" to minions
        """
        if self.opts.get("ipc_mode", "") == "tcp":
            pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514))
        else:
            pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
        if not self.pub_sock:
            self.pub_sock = salt.utils.asynchronous.SyncWrapper(
                salt.transport.ipc.IPCMessageClient,
                (pull_uri,),
                loop_kwarg="io_loop",
            )
            self.pub_sock.connect()
        self.pub_sock.send(payload)

    def close(self):
        if self.pub_sock:
            self.pub_sock.close()
            self.pub_sock = None


class TCPReqClient(salt.transport.base.RequestClient):
    """
    Tornado based TCP RequestClient
    """

    ttype = "tcp"

    def __init__(self, opts, io_loop, **kwargs):  # pylint: disable=W0231
        super().__init__(opts, io_loop, **kwargs)
        self.opts = opts
        self.io_loop = io_loop
        parse = urllib.parse.urlparse(self.opts["master_uri"])
        master_host, master_port = parse.netloc.rsplit(":", 1)
        master_addr = (master_host, int(master_port))
        # self.resolver = Resolver()
        resolver = kwargs.get("resolver")
        self.message_client = salt.transport.tcp.MessageClient(
            opts,
            master_host,
            int(master_port),
            io_loop=io_loop,
            resolver=resolver,
            source_ip=opts.get("source_ip"),
            source_port=opts.get("source_ret_port"),
        )
        self._closing = False

    @salt.ext.tornado.gen.coroutine
    def connect(self):
        self._connect_called = True
        yield self.message_client.connect()

    @salt.ext.tornado.gen.coroutine
    def send(self, load, timeout=60):
        ret = yield self.message_client.send(load, timeout=timeout)
        raise salt.ext.tornado.gen.Return(ret)

    def close(self):
        if self._closing:
            return
        self._closing = True
        self.message_client.close()