File: //usr/local/CyberCP/lib/python3.10/site-packages/asyncssh/encryption.py
# Copyright (c) 2013-2023 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
#    GNU General Public License, Version 2.0, or any later versions of
#    that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation
"""Symmetric key encryption handlers"""
from typing import Dict, List, Optional, Tuple, Type
from .crypto import BasicCipher, GCMCipher, ChachaCipher, get_cipher_params
from .mac import MAC, get_mac_params, get_mac
from .packet import UInt64
_EncParams = Tuple[int, int, int, int, int, bool]
_EncParamsMap = Dict[bytes, Tuple[Type['Encryption'], str]]
_enc_algs: List[bytes] = []
_default_enc_algs: List[bytes] = []
_enc_params: _EncParamsMap = {}
class Encryption:
    """Parent class for SSH packet encryption objects"""
    @classmethod
    def new(cls, cipher_name: str, key: bytes, iv: bytes, mac_alg: bytes = b'',
            mac_key: bytes = b'', etm: bool = False) -> 'Encryption':
        """Construct a new SSH packet encryption object"""
        raise NotImplementedError
    @classmethod
    def get_mac_params(cls, mac_alg: bytes) -> Tuple[int, int, bool]:
        """Get parameters of the MAC algorithm used with this encryption"""
        return get_mac_params(mac_alg)
    def encrypt_packet(self, seq: int, header: bytes,
                       packet: bytes) -> Tuple[bytes, bytes]:
        """Encrypt and sign an SSH packet"""
        raise NotImplementedError
    def decrypt_header(self, seq: int, first_block: bytes,
                       header_len: int) -> Tuple[bytes, bytes]:
        """Decrypt an SSH packet header"""
        raise NotImplementedError
    def decrypt_packet(self, seq: int, first: bytes, rest: bytes,
                       header_len: int, mac: bytes) -> Optional[bytes]:
        """Verify the signature of and decrypt an SSH packet"""
        raise NotImplementedError
class BasicEncryption(Encryption):
    """Shim for basic encryption"""
    def __init__(self, cipher: BasicCipher, mac: MAC):
        self._cipher = cipher
        self._mac = mac
    @classmethod
    def new(cls, cipher_name: str, key: bytes, iv: bytes, mac_alg: bytes = b'',
            mac_key: bytes = b'', etm: bool = False) -> 'BasicEncryption':
        """Construct a new SSH packet encryption object for basic ciphers"""
        cipher = BasicCipher(cipher_name, key, iv)
        mac = get_mac(mac_alg, mac_key)
        if etm:
            return ETMEncryption(cipher, mac)
        else:
            return cls(cipher, mac)
    def encrypt_packet(self, seq: int, header: bytes,
                       packet: bytes) -> Tuple[bytes, bytes]:
        """Encrypt and sign an SSH packet"""
        packet = header + packet
        mac = self._mac.sign(seq, packet) if self._mac else b''
        return self._cipher.encrypt(packet), mac
    def decrypt_header(self, seq: int, first_block: bytes,
                       header_len: int) -> Tuple[bytes, bytes]:
        """Decrypt an SSH packet header"""
        first_block = self._cipher.decrypt(first_block)
        return first_block, first_block[:header_len]
    def decrypt_packet(self, seq: int, first: bytes, rest: bytes,
                       header_len: int, mac: bytes) -> Optional[bytes]:
        """Verify the signature of and decrypt an SSH packet"""
        packet = first + self._cipher.decrypt(rest)
        if self._mac.verify(seq, packet, mac):
            return packet[header_len:]
        else:
            return None
class ETMEncryption(BasicEncryption):
    """Shim for encrypt-then-mac encryption"""
    def encrypt_packet(self, seq: int, header: bytes,
                       packet: bytes) -> Tuple[bytes, bytes]:
        """Encrypt and sign an SSH packet"""
        packet = header + self._cipher.encrypt(packet)
        return packet, self._mac.sign(seq, packet)
    def decrypt_header(self, seq: int, first_block: bytes,
                       header_len: int) -> Tuple[bytes, bytes]:
        """Decrypt an SSH packet header"""
        return first_block, first_block[:header_len]
    def decrypt_packet(self, seq: int, first: bytes, rest: bytes,
                       header_len: int, mac: bytes) -> Optional[bytes]:
        """Verify the signature of and decrypt an SSH packet"""
        packet = first + rest
        if self._mac.verify(seq, packet, mac):
            return self._cipher.decrypt(packet[header_len:])
        else:
            return None
class GCMEncryption(Encryption):
    """Shim for GCM encryption"""
    def __init__(self, cipher: GCMCipher):
        self._cipher = cipher
    @classmethod
    def new(cls, cipher_name: str, key: bytes, iv: bytes, mac_alg: bytes = b'',
            mac_key: bytes = b'', etm: bool = False) -> 'GCMEncryption':
        """Construct a new SSH packet encryption object for GCM ciphers"""
        return cls(GCMCipher(cipher_name, key, iv))
    @classmethod
    def get_mac_params(cls, mac_alg: bytes) -> Tuple[int, int, bool]:
        """Get parameters of the MAC algorithm used with this encryption"""
        return 0, 16, True
    def encrypt_packet(self, seq: int, header: bytes,
                       packet: bytes) -> Tuple[bytes, bytes]:
        """Encrypt and sign an SSH packet"""
        return self._cipher.encrypt_and_sign(header, packet)
    def decrypt_header(self, seq: int, first_block: bytes,
                       header_len: int) -> Tuple[bytes, bytes]:
        """Decrypt an SSH packet header"""
        return first_block, first_block[:header_len]
    def decrypt_packet(self, seq: int, first: bytes, rest: bytes,
                       header_len: int, mac: bytes) -> Optional[bytes]:
        """Verify the signature of and decrypt an SSH packet"""
        return self._cipher.verify_and_decrypt(first[:header_len],
                                               first[header_len:] + rest, mac)
class ChachaEncryption(Encryption):
    """Shim for chacha20-poly1305 encryption"""
    def __init__(self, cipher: ChachaCipher):
        self._cipher = cipher
    @classmethod
    def new(cls, cipher_name: str, key: bytes, iv: bytes, mac_alg: bytes = b'',
            mac_key: bytes = b'', etm: bool = False) -> 'ChachaEncryption':
        """Construct a new SSH packet encryption object for Chacha ciphers"""
        return cls(ChachaCipher(key))
    @classmethod
    def get_mac_params(cls, mac_alg: bytes) -> Tuple[int, int, bool]:
        """Get parameters of the MAC algorithm used with this encryption"""
        return 0, 16, True
    def encrypt_packet(self, seq: int, header: bytes,
                       packet: bytes) -> Tuple[bytes, bytes]:
        """Encrypt and sign an SSH packet"""
        return self._cipher.encrypt_and_sign(header, packet, UInt64(seq))
    def decrypt_header(self, seq: int, first_block: bytes,
                       header_len: int) -> Tuple[bytes, bytes]:
        """Decrypt an SSH packet header"""
        return (first_block,
                self._cipher.decrypt_header(first_block[:header_len],
                                            UInt64(seq)))
    def decrypt_packet(self, seq: int, first: bytes, rest: bytes,
                       header_len: int, mac: bytes) -> Optional[bytes]:
        """Verify the signature of and decrypt an SSH packet"""
        return self._cipher.verify_and_decrypt(first[:header_len],
                                               first[header_len:] + rest,
                                               UInt64(seq), mac)
def register_encryption_alg(enc_alg: bytes, encryption: Type[Encryption],
                            cipher_name: str, default: bool) -> None:
    """Register an encryption algorithm"""
    try:
        get_cipher_params(cipher_name)
    except KeyError:
        pass
    else:
        _enc_algs.append(enc_alg)
        if default:
            _default_enc_algs.append(enc_alg)
        _enc_params[enc_alg] = (encryption, cipher_name)
def get_encryption_algs() -> List[bytes]:
    """Return supported encryption algorithms"""
    return _enc_algs
def get_default_encryption_algs() -> List[bytes]:
    """Return default encryption algorithms"""
    return _default_enc_algs
def get_encryption_params(enc_alg: bytes,
                          mac_alg: bytes = b'') -> _EncParams:
    """Get parameters of an encryption and MAC algorithm"""
    encryption, cipher_name = _enc_params[enc_alg]
    enc_keysize, enc_ivsize, enc_blocksize = get_cipher_params(cipher_name)
    mac_keysize, mac_hashsize, etm = encryption.get_mac_params(mac_alg)
    return (enc_keysize, enc_ivsize, enc_blocksize,
            mac_keysize, mac_hashsize, etm)
def get_encryption(enc_alg: bytes, key: bytes, iv: bytes, mac_alg: bytes = b'',
                   mac_key: bytes = b'', etm: bool = False) -> Encryption:
    """Return an object which can encrypt and decrypt SSH packets"""
    encryption, cipher_name = _enc_params[enc_alg]
    return encryption.new(cipher_name, key, iv, mac_alg, mac_key, etm)
_enc_alg_list = (
    (b'chacha20-poly1305@openssh.com', ChachaEncryption,
     'chacha20-poly1305', True),
    (b'aes256-gcm@openssh.com',        GCMEncryption,
     'aes256-gcm',        True),
    (b'aes128-gcm@openssh.com',        GCMEncryption,
     'aes128-gcm',        True),
    (b'aes256-ctr',                    BasicEncryption,
     'aes256-ctr',        True),
    (b'aes192-ctr',                    BasicEncryption,
     'aes192-ctr',        True),
    (b'aes128-ctr',                    BasicEncryption,
     'aes128-ctr',        True),
    (b'aes256-cbc',                    BasicEncryption,
     'aes256-cbc',        False),
    (b'aes192-cbc',                    BasicEncryption,
     'aes192-cbc',        False),
    (b'aes128-cbc',                    BasicEncryption,
     'aes128-cbc',        False),
    (b'3des-cbc',                      BasicEncryption,
     'des3-cbc',          False),
    (b'blowfish-cbc',                  BasicEncryption,
     'blowfish-cbc',      False),
    (b'cast128-cbc',                   BasicEncryption,
     'cast128-cbc',       False),
    (b'seed-cbc@ssh.com',              BasicEncryption,
     'seed-cbc',          False),
    (b'arcfour256',                    BasicEncryption,
     'arcfour256',        False),
    (b'arcfour128',                    BasicEncryption,
     'arcfour128',        False),
    (b'arcfour',                       BasicEncryption,
     'arcfour',           False)
)
for _enc_alg_args in _enc_alg_list:
    register_encryption_alg(*_enc_alg_args)