File: //usr/local/CyberCP/lib/python3.10/site-packages/docker/transport/npipesocket.py
import functools
import io
import time
import pywintypes
import win32api
import win32event
import win32file
import win32pipe
cERROR_PIPE_BUSY = 0xe7
cSECURITY_SQOS_PRESENT = 0x100000
cSECURITY_ANONYMOUS = 0
MAXIMUM_RETRY_COUNT = 10
def check_closed(f):
    @functools.wraps(f)
    def wrapped(self, *args, **kwargs):
        if self._closed:
            raise RuntimeError(
                'Can not reuse socket after connection was closed.'
            )
        return f(self, *args, **kwargs)
    return wrapped
class NpipeSocket:
    """ Partial implementation of the socket API over windows named pipes.
        This implementation is only designed to be used as a client socket,
        and server-specific methods (bind, listen, accept...) are not
        implemented.
    """
    def __init__(self, handle=None):
        self._timeout = win32pipe.NMPWAIT_USE_DEFAULT_WAIT
        self._handle = handle
        self._closed = False
    def accept(self):
        raise NotImplementedError()
    def bind(self, address):
        raise NotImplementedError()
    def close(self):
        self._handle.Close()
        self._closed = True
    @check_closed
    def connect(self, address, retry_count=0):
        try:
            handle = win32file.CreateFile(
                address,
                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                0,
                None,
                win32file.OPEN_EXISTING,
                (cSECURITY_ANONYMOUS
                    | cSECURITY_SQOS_PRESENT
                    | win32file.FILE_FLAG_OVERLAPPED),
                0
            )
        except win32pipe.error as e:
            # See Remarks:
            # https://msdn.microsoft.com/en-us/library/aa365800.aspx
            if e.winerror == cERROR_PIPE_BUSY:
                # Another program or thread has grabbed our pipe instance
                # before we got to it. Wait for availability and attempt to
                # connect again.
                retry_count = retry_count + 1
                if (retry_count < MAXIMUM_RETRY_COUNT):
                    time.sleep(1)
                    return self.connect(address, retry_count)
            raise e
        self.flags = win32pipe.GetNamedPipeInfo(handle)[0]
        self._handle = handle
        self._address = address
    @check_closed
    def connect_ex(self, address):
        return self.connect(address)
    @check_closed
    def detach(self):
        self._closed = True
        return self._handle
    @check_closed
    def dup(self):
        return NpipeSocket(self._handle)
    def getpeername(self):
        return self._address
    def getsockname(self):
        return self._address
    def getsockopt(self, level, optname, buflen=None):
        raise NotImplementedError()
    def ioctl(self, control, option):
        raise NotImplementedError()
    def listen(self, backlog):
        raise NotImplementedError()
    def makefile(self, mode=None, bufsize=None):
        if mode.strip('b') != 'r':
            raise NotImplementedError()
        rawio = NpipeFileIOBase(self)
        if bufsize is None or bufsize <= 0:
            bufsize = io.DEFAULT_BUFFER_SIZE
        return io.BufferedReader(rawio, buffer_size=bufsize)
    @check_closed
    def recv(self, bufsize, flags=0):
        err, data = win32file.ReadFile(self._handle, bufsize)
        return data
    @check_closed
    def recvfrom(self, bufsize, flags=0):
        data = self.recv(bufsize, flags)
        return (data, self._address)
    @check_closed
    def recvfrom_into(self, buf, nbytes=0, flags=0):
        return self.recv_into(buf, nbytes, flags), self._address
    @check_closed
    def recv_into(self, buf, nbytes=0):
        readbuf = buf
        if not isinstance(buf, memoryview):
            readbuf = memoryview(buf)
        event = win32event.CreateEvent(None, True, True, None)
        try:
            overlapped = pywintypes.OVERLAPPED()
            overlapped.hEvent = event
            err, data = win32file.ReadFile(
                self._handle,
                readbuf[:nbytes] if nbytes else readbuf,
                overlapped
            )
            wait_result = win32event.WaitForSingleObject(event, self._timeout)
            if wait_result == win32event.WAIT_TIMEOUT:
                win32file.CancelIo(self._handle)
                raise TimeoutError
            return win32file.GetOverlappedResult(self._handle, overlapped, 0)
        finally:
            win32api.CloseHandle(event)
    @check_closed
    def send(self, string, flags=0):
        event = win32event.CreateEvent(None, True, True, None)
        try:
            overlapped = pywintypes.OVERLAPPED()
            overlapped.hEvent = event
            win32file.WriteFile(self._handle, string, overlapped)
            wait_result = win32event.WaitForSingleObject(event, self._timeout)
            if wait_result == win32event.WAIT_TIMEOUT:
                win32file.CancelIo(self._handle)
                raise TimeoutError
            return win32file.GetOverlappedResult(self._handle, overlapped, 0)
        finally:
            win32api.CloseHandle(event)
    @check_closed
    def sendall(self, string, flags=0):
        return self.send(string, flags)
    @check_closed
    def sendto(self, string, address):
        self.connect(address)
        return self.send(string)
    def setblocking(self, flag):
        if flag:
            return self.settimeout(None)
        return self.settimeout(0)
    def settimeout(self, value):
        if value is None:
            # Blocking mode
            self._timeout = win32event.INFINITE
        elif not isinstance(value, (float, int)) or value < 0:
            raise ValueError('Timeout value out of range')
        else:
            # Timeout mode - Value converted to milliseconds
            self._timeout = int(value * 1000)
    def gettimeout(self):
        return self._timeout
    def setsockopt(self, level, optname, value):
        raise NotImplementedError()
    @check_closed
    def shutdown(self, how):
        return self.close()
class NpipeFileIOBase(io.RawIOBase):
    def __init__(self, npipe_socket):
        self.sock = npipe_socket
    def close(self):
        super().close()
        self.sock = None
    def fileno(self):
        return self.sock.fileno()
    def isatty(self):
        return False
    def readable(self):
        return True
    def readinto(self, buf):
        return self.sock.recv_into(buf)
    def seekable(self):
        return False
    def writable(self):
        return False