File: //proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/fd_ops.py
"""fd-based file operations for symlink-attack mitigation.
All helpers in this module use O_NOFOLLOW and dir_fd-relative syscalls
so that no path-based resolution can be redirected by a concurrent
symlink swap.
This module is intentionally kept separate from utils/__init__.py to
avoid loading these OS-specific helpers into every agent component.
"""
import errno
import logging
import os
import stat
from contextlib import contextmanager, suppress
from pathlib import Path
logger = logging.getLogger(__name__)
def rmtree_fd(dir_fd) -> None:
"""Remove all contents of a directory using fd-relative operations.
Every entry is opened with ``O_NOFOLLOW`` so symlinks inside the tree
are unlinked rather than followed. The directory referenced by
*dir_fd* itself is **not** removed — the caller should ``os.rmdir()``
the parent entry after this call returns.
Uses an iterative approach with an explicit stack to avoid hitting
Python's recursion limit on adversarial deeply-nested trees.
*dir_fd* must be an open ``O_RDONLY | O_DIRECTORY`` descriptor.
"""
# Each stack frame is (fd, name_to_rmdir_after_close) where
# name_to_rmdir_after_close is the entry name that should be
# rmdir'd from the parent once this fd is fully processed.
# The initial fd is managed by the caller, so its rmdir entry is None.
stack = [(dir_fd, None)]
try:
while stack:
current_fd, _ = stack[-1]
pushed = False
with os.scandir(current_fd) as entries:
for entry in entries:
if entry.is_dir(follow_symlinks=False):
child_fd = os.open(
entry.name,
os.O_RDONLY | os.O_DIRECTORY | os.O_NOFOLLOW,
dir_fd=current_fd,
)
stack.append((child_fd, entry.name))
pushed = True
break # restart scan from the new directory
else:
os.unlink(entry.name, dir_fd=current_fd)
if not pushed:
# All entries in current directory have been removed.
fd, name = stack.pop()
if name is not None:
# Close the child fd and rmdir it from the parent.
os.close(fd)
parent_fd, _ = stack[-1]
os.rmdir(name, dir_fd=parent_fd)
except BaseException:
# On error, close any fds we opened (but not the caller's dir_fd).
for fd, name in stack:
if name is not None:
os.close(fd)
raise
def open_dir_no_symlinks(path) -> int:
"""Open a directory, refusing symlinks at every path component.
Walks the absolute *path* one component at a time, opening each with
``O_NOFOLLOW | O_DIRECTORY`` relative to the parent fd. This guards
against symlink attacks at *any* depth in the hierarchy, not just the
leaf.
Returns an ``O_RDONLY`` file descriptor for the final directory.
The caller is responsible for closing it.
"""
path = os.path.abspath(os.fspath(path))
parts = Path(path).parts # ('/', 'home', 'user', ...)
fd = os.open(parts[0], os.O_RDONLY | os.O_DIRECTORY)
try:
for part in parts[1:]:
new_fd = os.open(
part,
os.O_RDONLY | os.O_DIRECTORY | os.O_NOFOLLOW,
dir_fd=fd,
)
os.close(fd)
fd = new_fd
return fd
except BaseException:
os.close(fd)
raise
@contextmanager
def open_nofollow(path, flags=os.O_RDONLY, *, dir_fd=None):
"""Open a file with O_NOFOLLOW, closing the fd on exit.
Yields the raw file descriptor. Rejects symlinks at the leaf
component (raises ELOOP).
When *dir_fd* is provided, *path* is resolved relative to that
directory descriptor.
"""
kw = {"dir_fd": dir_fd} if dir_fd is not None else {}
fd = os.open(str(path), flags | os.O_NOFOLLOW, **kw)
try:
yield fd
finally:
os.close(fd)
@contextmanager
def safe_dir(path):
"""Open a directory with symlink protection, closing the fd on exit.
Walks every path component with O_NOFOLLOW via open_dir_no_symlinks
and yields the resulting fd.
"""
fd = open_dir_no_symlinks(path)
try:
yield fd
finally:
os.close(fd)
def atomic_rewrite_fd(
filename,
data: bytes,
*,
uid,
gid,
allow_empty_content,
permissions,
dir_fd: int,
) -> bool:
"""dir_fd-relative implementation of atomic_rewrite.
The caller opens the directory with O_NOFOLLOW before any file I/O
begins. All file operations use dir_fd so that a concurrent rename
of the directory to a symlink cannot redirect writes to a privileged
path.
"""
_, basename = os.path.split(filename)
# Read current content without following symlinks.
try:
content_fd = os.open(
basename, os.O_RDONLY | os.O_NOFOLLOW, dir_fd=dir_fd
)
with os.fdopen(content_fd, "rb") as f:
old_content = f.read(len(data) + 1)
if old_content == data:
return False
except FileNotFoundError:
pass # file does not exist yet; will be created
except OSError as exc:
if exc.errno == errno.ELOOP:
pass # existing entry is a symlink; overwrite it
else:
raise
if not allow_empty_content and not data:
logger.error("empty content: %r for file: %s", data, filename)
return False
if permissions is None:
try:
st = os.stat(basename, dir_fd=dir_fd, follow_symlinks=False)
if stat.S_ISLNK(st.st_mode):
raise OSError(errno.ELOOP, os.strerror(errno.ELOOP), basename)
permissions = stat.S_IMODE(st.st_mode)
except FileNotFoundError:
current_umask = os.umask(0)
os.umask(current_umask)
permissions = 0o666 & ~current_umask
# Create temp file atomically inside the directory referenced by dir_fd.
# O_NOFOLLOW + O_EXCL ensures the name cannot be a pre-existing symlink.
tmp_basename = None
tmp_fd = -1
for _ in range(100):
tmp_basename = f"{basename}_{os.urandom(4).hex()}.i360edit"
try:
tmp_fd = os.open(
tmp_basename,
os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_NOFOLLOW,
0o600,
dir_fd=dir_fd,
)
break
except FileExistsError:
continue
else:
raise FileExistsError("Could not create temporary file (100 attempts)")
try:
view = memoryview(data)
written = 0
while written < len(data):
written += os.write(tmp_fd, view[written:])
if uid is not None and gid is not None:
os.chown(tmp_fd, uid, gid)
os.chmod(tmp_fd, permissions)
os.fsync(tmp_fd)
os.close(tmp_fd)
tmp_fd = -1
# Atomic rename entirely within the directory we hold open.
os.rename(tmp_basename, basename, src_dir_fd=dir_fd, dst_dir_fd=dir_fd)
tmp_basename = None # rename succeeded; no cleanup needed
finally:
if tmp_fd >= 0:
os.close(tmp_fd)
if tmp_basename is not None:
with suppress(FileNotFoundError):
os.unlink(tmp_basename, dir_fd=dir_fd)
return True