HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.111.1.lve.el8.x86_64 #1 SMP Fri Mar 13 13:42:17 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.30
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //home/posscale/subdomains/xibo/vendor/react/zmq/src/React/ZMQ/SocketWrapper.php
<?php

namespace React\ZMQ;

use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;

class SocketWrapper extends EventEmitter
{
    public $fd;
    public $closed = false;
    private $socket;
    private $loop;
    private $buffer;

    public function __construct(\ZMQSocket $socket, LoopInterface $loop)
    {
        $this->socket = $socket;
        $this->loop = $loop;

        $this->fd = $this->socket->getSockOpt(\ZMQ::SOCKOPT_FD);

        $writeListener = array($this, 'handleEvent');
        $this->buffer = new Buffer($socket, $this->fd, $this->loop, $writeListener);
    }

    public function attachReadListener()
    {
        $this->loop->addReadStream($this->fd, array($this, 'handleEvent'));
    }

    public function handleEvent()
    {
        while (true) {
            $events = $this->socket->getSockOpt(\ZMQ::SOCKOPT_EVENTS);

            $hasEvents = ($events & \ZMQ::POLL_IN) || ($events & \ZMQ::POLL_OUT && $this->buffer->listening);
            if (!$hasEvents) {
                break;
            }

            if ($events & \ZMQ::POLL_IN) {
                $this->handleReadEvent();
            }

            if ($events & \ZMQ::POLL_OUT && $this->buffer->listening) {
                $this->buffer->handleWriteEvent();
            }
        }
    }

    public function handleReadEvent()
    {
        $messages = $this->socket->recvmulti(\ZMQ::MODE_NOBLOCK);
        if (false !== $messages) {
            if (1 === count($messages)) {
                $this->emit('message', array($messages[0]));
            }
            $this->emit('messages', array($messages));
        }
    }

    public function getWrappedSocket()
    {
        return $this->socket;
    }

    public function subscribe($channel)
    {
        $this->socket->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, $channel);
    }

    public function unsubscribe($channel)
    {
        $this->socket->setSockOpt(\ZMQ::SOCKOPT_UNSUBSCRIBE, $channel);
    }

    public function send($message)
    {
        $this->buffer->send($message);
    }

    public function close()
    {
        if ($this->closed) {
            return;
        }

        $this->emit('end', array($this));
        $this->loop->removeStream($this->fd);
        $this->buffer->removeAllListeners();
        $this->removeAllListeners();
        unset($this->socket);
        $this->closed = true;
    }

    public function end()
    {
        if ($this->closed) {
            return;
        }

        $that = $this;

        $this->buffer->on('end', function () use ($that) {
            $that->close();
        });

        $this->buffer->end();
    }

    public function __call($method, $args)
    {
        return call_user_func_array(array($this->socket, $method), $args);
    }
}