HEX
Server: LiteSpeed
System: Linux php-prod-3.spaceapp.ru 5.15.0-151-generic #161-Ubuntu SMP Tue Jul 22 14:25:40 UTC 2025 x86_64
User: sarli3128 (1010)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/utils/kwconfig.py
import re
from typing import Optional

from defence360agent.utils import atomic_rewrite


class KWConfig:
    """
    Basic class for working with key-value configuration files
    Subclasses must define SEARCH_PATTERN and WRITE_PATTERN
    attributes
    """

    SEARCH_PATTERN = DEFAULT_FILENAME = WRITE_PATTERN = ""
    ALLOW_EMPTY_CONFIG = True

    def __init__(self, name, filename=None):
        assert self.SEARCH_PATTERN

        self._pattern = re.compile(
            self.SEARCH_PATTERN.format(name), re.MULTILINE
        )
        self._filename = filename or self.DEFAULT_FILENAME
        self._name = name

    def set(self, value) -> Optional[str]:
        assert self.WRITE_PATTERN

        with open(self._filename) as f:
            content = f.read()

        old_value = self._parse(content)
        if old_value is None:
            # If no variable found, just add to the bottom
            content += (
                "\n" + self.WRITE_PATTERN.format(self._name, value) + "\n"
            )
        else:
            content = self._pattern.sub(
                self.WRITE_PATTERN.format(self._name, value), content
            )

        atomic_rewrite(
            self._filename,
            content,
            allow_empty_content=self.ALLOW_EMPTY_CONFIG,
        )
        return old_value

    def get(self) -> Optional[str]:
        with open(self._filename) as f:
            content = f.read()
        return self._parse(content)

    def _parse(self, content) -> Optional[str]:
        match = self._pattern.search(content)
        return match and match.group(1)