File: //usr/lib/python3/dist-packages/cloudinit/distros/ug_util.py
# Copyright (C) 2012 Canonical Ltd.
# Copyright (C) 2012, 2013 Hewlett-Packard Development Company, L.P.
# Copyright (C) 2012 Yahoo! Inc.
#
# Author: Scott Moser <scott.moser@canonical.com>
# Author: Juerg Haefliger <juerg.haefliger@hp.com>
# Author: Joshua Harlow <harlowja@yahoo-inc.com>
# Author: Ben Howard <ben.howard@canonical.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
import logging
from cloudinit import lifecycle, type_utils, util
LOG = logging.getLogger(__name__)
# Normalizes an input group configuration which can be:
# Comma separated string or a list or a dictionary
#
# Returns dictionary of group names => members of that group which is the
# standard form used in the rest of cloud-init
def _normalize_groups(grp_cfg):
    if isinstance(grp_cfg, str):
        grp_cfg = grp_cfg.strip().split(",")
    if isinstance(grp_cfg, list):
        c_grp_cfg = {}
        for i in grp_cfg:
            if isinstance(i, dict):
                for k, v in i.items():
                    if not isinstance(v, (list, str)):
                        raise TypeError(
                            "Bad group member type %s"
                            % (type_utils.obj_name(v))
                        )
                    if isinstance(v, list):
                        c_grp_cfg.setdefault(k, []).extend(v)
                    else:
                        c_grp_cfg.setdefault(k, []).append(v)
            elif isinstance(i, str):
                if i not in c_grp_cfg:
                    c_grp_cfg[i] = []
            else:
                raise TypeError(
                    "Unknown group name type %s" % (type_utils.obj_name(i))
                )
        grp_cfg = c_grp_cfg
    groups = {}
    if isinstance(grp_cfg, dict):
        for grp_name, grp_members in grp_cfg.items():
            groups[grp_name] = util.uniq_merge_sorted(grp_members)
    else:
        raise TypeError(
            "Group config must be list, dict or string type only but found %s"
            % (type_utils.obj_name(grp_cfg))
        )
    return groups
# Normalizes an input group configuration which can be: a list or a dictionary
#
# components that define the user config + 'name' (if a 'name' field does not
# exist then the default user is assumed to 'own' that configuration.)
#
# Returns a dictionary of user names => user config which is the standard form
# used in the rest of cloud-init. Note the default user will have a special
# config entry 'default' which will be marked true and all other users will be
# marked false.
def _normalize_users(u_cfg, def_user_cfg=None):
    if isinstance(u_cfg, dict):
        ad_ucfg = []
        for k, v in u_cfg.items():
            if isinstance(v, (bool, int, float, str)):
                if util.is_true(v):
                    ad_ucfg.append(str(k))
            elif isinstance(v, dict):
                v["name"] = k
                ad_ucfg.append(v)
            else:
                raise TypeError(
                    "Unmappable user value type %s for key %s"
                    % (type_utils.obj_name(v), k)
                )
        u_cfg = ad_ucfg
    elif isinstance(u_cfg, str):
        u_cfg = util.uniq_merge_sorted(u_cfg)
    users = {}
    for user_config in u_cfg:
        if isinstance(user_config, (list, str)):
            for u in util.uniq_merge(user_config):
                if u and u not in users:
                    users[u] = {}
        elif isinstance(user_config, dict):
            n = user_config.pop("name", "default")
            prev_config = users.get(n) or {}
            users[n] = util.mergemanydict([prev_config, user_config])
        else:
            raise TypeError(
                "User config must be dictionary/list or string "
                " types only and not %s" % (type_utils.obj_name(user_config))
            )
    # Ensure user options are in the right python friendly format
    if users:
        c_users = {}
        for uname, uconfig in users.items():
            c_uconfig = {}
            for k, v in uconfig.items():
                k = k.replace("-", "_").strip()
                if k:
                    c_uconfig[k] = v
            c_users[uname] = c_uconfig
        users = c_users
    # Fix the default user into the actual default user name and replace it.
    def_user = None
    if users and "default" in users:
        def_config = users.pop("default")
        if def_user_cfg:
            # Pickup what the default 'real name' is and any groups that are
            # provided by the default config
            def_user_cfg = def_user_cfg.copy()
            def_user = def_user_cfg.pop("name")
            def_groups = def_user_cfg.pop("groups", [])
            # Pick any config + groups for the user name that we may have
            # extracted previously
            parsed_config = users.pop(def_user, {})
            parsed_groups = parsed_config.get("groups", [])
            # Now merge the extracted groups with the default config provided
            users_groups = util.uniq_merge_sorted(parsed_groups, def_groups)
            parsed_config["groups"] = ",".join(users_groups)
            # The real config for the default user is the combination of the
            # default user config provided by the distro, the default user
            # config provided by the above merging for the user 'default' and
            # then the parsed config from the user's 'real name' which does not
            # have to be 'default' (but could be)
            users[def_user] = util.mergemanydict(
                [def_user_cfg, def_config, parsed_config]
            )
    # Ensure that only the default user that we found (if any) is actually
    # marked as the default user
    for uname, uconfig in users.items():
        uconfig["default"] = uname == def_user if def_user else False
    return users
# Normalizes a set of user/users and group dictionary configuration into an
# usable format so that the rest of cloud-init can understand using the default
# user provided by the input distribution (if any) to allow mapping of the
# 'default' user.
#
# Output is a dictionary of group names -> [member] (list)
# and a dictionary of user names -> user configuration (dict)
#
# If 'user' exists, it will override
# The 'users'[0] entry (if a list) otherwise it will just become an entry in
# the returned dictionary (no override)
def normalize_users_groups(cfg, distro):
    if not cfg:
        cfg = {}
    # Handle the previous style of doing this where the first user
    # overrides the concept of the default user if provided in the user: XYZ
    # format.
    old_user = {}
    if "user" in cfg and cfg["user"]:
        old_user = cfg["user"]
        # Translate it into a format that will be more useful going forward
        if isinstance(old_user, str):
            old_user = {"name": old_user}
            lifecycle.deprecate(
                deprecated="'user' of type string",
                deprecated_version="22.2",
                extra_message="Use 'users' list instead.",
            )
        elif not isinstance(old_user, dict):
            LOG.warning(
                "Format for 'user' key must be a string or dictionary"
                " and not %s",
                type_utils.obj_name(old_user),
            )
            old_user = {}
    # If no old user format, then assume the distro provides what the 'default'
    # user maps to, but notice that if this is provided, we won't automatically
    # inject a 'default' user into the users list, while if an old user format
    # is provided we will.
    distro_user_config = {}
    try:
        distro_user_config = distro.get_default_user()
    except NotImplementedError:
        LOG.warning(
            "Distro has not implemented default user access. No "
            "distribution provided default user will be normalized."
        )
    # Merge the old user (which may just be an empty dict when not present)
    # with the distro provided default user configuration so that the old user
    # style picks up all the distribution specific attributes (if any)
    default_user_config = util.mergemanydict([old_user, distro_user_config])
    base_users = cfg.get("users", [])
    if isinstance(base_users, (dict, str)):
        lifecycle.deprecate(
            deprecated=f"'users' of type {type(base_users)}",
            deprecated_version="22.2",
            extra_message="Use 'users' as a list.",
        )
    elif not isinstance(base_users, (list)):
        LOG.warning(
            "Format for 'users' key must be a comma-separated string"
            " or a dictionary or a list but found %s",
            type_utils.obj_name(base_users),
        )
        base_users = []
    if old_user:
        # When 'user:' is provided, it should be made as the default user
        if isinstance(base_users, list):
            base_users.append({"name": "default"})
        elif isinstance(base_users, dict):
            base_users["default"] = dict(base_users).get("default", True)
        elif isinstance(base_users, str):
            base_users += ",default"
    groups = {}
    if "groups" in cfg:
        groups = _normalize_groups(cfg["groups"])
    users = _normalize_users(base_users, default_user_config)
    return (users, groups)
# Given a user dictionary config, extract the default user name and user config
# and return them or return (None, None) if no default user is found
def extract_default(users, default_name=None, default_config=None):
    if not users:
        return (default_name, default_config)
    def safe_find(entry):
        config = entry[1]
        if not config or "default" not in config:
            return False
        return config["default"]
    tmp_users = dict(filter(safe_find, users.items()))
    if not tmp_users:
        return (default_name, default_config)
    name = list(tmp_users)[0]
    config = tmp_users[name]
    config.pop("default", None)
    return (name, config)