HEX
Server: Apache
System: Linux server2.voipitup.com.au 4.18.0-553.109.1.lve.el8.x86_64 #1 SMP Thu Mar 5 20:23:46 UTC 2026 x86_64
User: posscale (1027)
PHP: 8.2.30
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins/client.py
import asyncio
import concurrent.futures
import contextlib
import json
import logging
import os
import time
import uuid
from typing import Generator

from defence360agent.api.server import (
    APIError,
    APIErrorTooManyRequests,
    APITokenError,
    send_message,
)
from defence360agent.contracts import license
from defence360agent.contracts.config import Core
from defence360agent.contracts.messages import (
    Message,
    MessageList,
    MessageType,
)
from defence360agent.contracts.plugins import MessageSink, expect
from defence360agent.internals.persistent_message import (
    PersistentMessagesQueue,
)
from defence360agent.utils import recurring_check, safe_cancel_task, Scope
from defence360agent.utils.json import ServerJSONEncoder

logger = logging.getLogger(__name__)


class SendToServerClient:
    """Send messages to server.

    * process Reportable messages;
    * add them to a pending messages list;
    * send all pending messages to server when list is full (contains
      _PENDING_MESSAGES_LIMIT items or more);
    * send all pending messages on plugin shutdown."""

    _PENDING_MESSAGES_LIMIT = int(
        os.environ.get("IMUNIFYAV_MESSAGES_COUNT_TO_SEND", 20)
    )
    _SEND_MESSAGE_RECURRING_TIME = 60 * 5  # 5 minutes
    # 50 second because it should be less than DefaultTimeoutStopSec
    _SHUTDOWN_SEND_TIMEOUT = 50

    async def create_sink(self, loop: asyncio.AbstractEventLoop):
        self._loop = loop
        self._pending = PersistentMessagesQueue()
        self._try_send = asyncio.Event()
        self._lock = asyncio.Lock()
        self._shutting_down = asyncio.Event()
        self._sender_task = loop.create_task(self._send())
        self._invoke_send_message_task = loop.create_task(
            self._invoke_send_message()
        )

    async def shutdown(self) -> None:
        """
        When shutdown begins it signals any in-flight HTTP sends to
        abort immediately (via _shutting_down event), then gives 50
        seconds to finish the stop() sequence.  If stop() isn't done
        in 50 seconds it force-cancels the sender task.
        Finally, any messages still in the buffer are flushed to
        persistent storage so nothing is lost.
        """
        # Signal shutdown — aborts in-flight HTTP requests from the
        # _send task via the asyncio.wait race in _send_pending_messages.
        # This lets stop() acquire the lock quickly instead of waiting
        # for a slow HTTP response.  The event is cleared in stop()
        # before the final _send_pending_messages() flush so that
        # remaining messages are actually delivered during shutdown.
        self._shutting_down.set()

        try:
            await asyncio.wait_for(self.stop(), self._SHUTDOWN_SEND_TIMEOUT)
        except asyncio.TimeoutError:
            # Used logger.error to notify sentry
            logger.error(
                "Timeout (%ds) sending messages to server on shutdown.",
                self._SHUTDOWN_SEND_TIMEOUT,
            )
            if not self._sender_task.cancelled():
                await safe_cancel_task(self._sender_task)
        if self._pending.buffer_size > 0:
            logger.warning(
                "Save %s messages to persistent storage",
                self._pending.buffer_size,
            )
            self._pending.push_buffer_to_storage()
            logger.warning("Stored queue %r", self._pending.qsize())

    async def stop(self):
        """
        Stop sending.
        1. wait for the lock being available
            i.e., while _sender_task finishes the current round
            of sending message (if it takes too long, then
            the timeout in shutdown() is triggered
        2. once the sending round complete (we got the lock),
            cancel the next iteration of the _sender_task (it exits)
        3. send _pending messages (again, if it takes too long,
            the timeout in shutdown() is triggered
            and the coroutine is cancelled

        That method makes sure that the coroutine
        that was started in it has ended.

        It excludes a situation when:
            -> The result of a coroutine that started
                BEFORE shutdown() is started.
            -> And the process of sending messages
                from _pending is interrupted because of it
        """
        # The _lock allows you to be sure that the _send_pending_messages
        # coroutine is not running and _pending is not being used
        logger.info("SendToServer.stop cancel _invoke_send_message_task")
        await safe_cancel_task(self._invoke_send_message_task)
        logger.info("SendToServer.stop wait lock")
        async with self._lock:
            # Cancel _sender_task. The lock ensures that the coroutine
            # is not in its critical part
            logger.info("SendToServer.stop lock acquired, cancel _sender_task")
            await safe_cancel_task(self._sender_task)
            # Clear the shutdown signal so the final flush actually
            # delivers messages instead of re-queuing them.
            self._shutting_down.clear()
            # send messages that are in _pending at the time of agent shutdown
            await self._send_pending_messages()

    @staticmethod
    def _set_api_attrs(api):
        api.set_product_name(license.LicenseCLN.get_product_name())
        api.set_server_id(license.LicenseCLN.get_server_id())
        api.set_license(license.LicenseCLN.get_token())
        return api

    @contextlib.contextmanager
    def _get_api(self) -> Generator[send_message.SendMessageAPI, None, None]:
        base_url = os.environ.get("IMUNIFYAV_API_BASE")
        # we send messages sequentially, so max_workers=1
        with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
            api = send_message.SendMessageAPI(
                Core.VERSION, base_url, executor=executor
            )

            yield self._set_api_attrs(api)

    @expect(MessageType.Reportable)
    async def send_to_server(self, message: Message) -> None:
        # add message handling time if it does not exist, so that
        # the server does not depend on the time it was received
        if "timestamp" not in message:
            message["timestamp"] = time.time()
        if "message_id" not in message:
            message["message_id"] = uuid.uuid4().hex
        self._pending.put(self._encode_data_to_put_in_queue(message))
        self._try_send.set()

    @recurring_check(_SEND_MESSAGE_RECURRING_TIME)
    async def _invoke_send_message(self):
        self._try_send.set()

    @recurring_check(0)
    async def _send(self):
        await self._try_send.wait()
        self._try_send.clear()
        if self._pending.qsize() >= self._PENDING_MESSAGES_LIMIT:
            # The _lock protects critical part of _send method
            logger.info("SendToServer._send wait lock")
            need_to_cancel = None
            async with self._lock:
                logger.info("SendToServer._send lock acquired")
                try:
                    await self._send_pending_messages()
                except asyncio.CancelledError as e:
                    logger.info("SendToServer._send cancelled unlocking")
                    need_to_cancel = e
            logger.info("SendToServer._send lock released")
            if need_to_cancel:
                raise need_to_cancel

    def _encode_data_to_put_in_queue(self, data: Message) -> bytes:
        msg = json.dumps(data, cls=ServerJSONEncoder) + "\n"
        return msg.encode()

    def _decode_message(self, message: bytes) -> Message:
        data = json.loads(message)
        if data.get("list"):
            return MessageList(data.get("list"))
        return Message(data)

    def _requeue_message(self, message, timestamp):
        message["api_retries_count"] = message.get("api_retries_count", 0) + 1
        self._pending.put(
            self._encode_data_to_put_in_queue(message), timestamp=timestamp
        )

    async def _send_one_message(self, api, message):
        """Race the HTTP send against the shutdown signal.

        Returns True on success, raises on API error,
        or returns False if shutdown interrupted the send.
        """
        send_task = asyncio.ensure_future(api.send_message(message))
        shutdown_task = asyncio.ensure_future(self._shutting_down.wait())
        done, pending_tasks = await asyncio.wait(
            {send_task, shutdown_task},
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending_tasks:
            await safe_cancel_task(task)

        if send_task in done:
            # Send completed (possibly with an error) — re-raise any
            # API error.  We check this before shutdown_task to avoid
            # re-queuing a message that was already delivered (both
            # tasks can land in `done` in the same tick).
            send_task.result()
            return True

        # Shutdown won the race
        return False

    async def _try_send_one(self, api, timestamp, message_bytes):
        """Try sending a single message.

        Returns (stop, failed) where *stop* is True if further sends
        should stop (shutdown or server-level error) and *failed* is
        True when the message could not be delivered.
        """
        if self._shutting_down.is_set():
            logger.warning(
                "Shutdown signal received, saving remaining messages"
            )
            self._pending.put(message_bytes, timestamp=timestamp)
            return True, False

        message = self._decode_message(message_bytes)
        msg_info = {
            "method": message.get("method"),
            "message_id": message.get("message_id"),
        }
        try:
            sent = await self._send_one_message(api, message)
            if not sent:
                logger.warning(
                    "Shutdown signal received during send,"
                    " saving remaining messages"
                )
                self._pending.put(message_bytes, timestamp=timestamp)
                return True, False
            logger.info("message sent %s", msg_info)
            return False, False
        except (APIErrorTooManyRequests, APITokenError) as exc:
            logger.warning(
                "Failed to send message %s to server: %s", msg_info, exc
            )
            self._requeue_message(message, timestamp)
            return True, True
        except APIError as exc:
            logger.warning(
                "Failed to send message %s to server: %s", msg_info, exc
            )
            self._requeue_message(message, timestamp)
            return False, True

    async def _send_pending_messages(self) -> None:
        messages = self._pending.pop_all()
        logger.info("Sending %s messages", len(messages))
        failure_count = 0
        processed = 0
        with self._get_api() as api:
            if api.server_id is None:
                for timestamp, message_bytes in messages:
                    self._pending.put(message_bytes, timestamp=timestamp)
                return
            try:
                for timestamp, message_bytes in messages:
                    stop, failed = await self._try_send_one(
                        api, timestamp, message_bytes
                    )
                    processed += 1
                    if failed:
                        failure_count += 1
                    if stop:
                        # Re-queue remaining messages without sending
                        remaining = messages[processed:]
                        for ts, mb in remaining:
                            self._pending.put(mb, timestamp=ts)
                        if failed:
                            # Server error — remaining messages can't
                            # be delivered either, count them as failed.
                            failure_count += len(remaining)
                        processed = len(messages)
                        break
            finally:
                # Re-queue any messages not yet processed (e.g., if
                # CancelledError from shutdown timeout interrupted us).
                for timestamp, message_bytes in messages[processed:]:
                    self._pending.put(message_bytes, timestamp=timestamp)
        logger.info("Unsuccessful to send %s messages", failure_count)


class SendToServer(SendToServerClient, MessageSink):
    SCOPE = Scope.AV
    SHUTDOWN_PRIORITY = 900  # Shutdown late, after Accumulate has flushed