Skip to content

policy.py

Policy.

AutoAddPolicy (MissingHostKeyPolicy)

thread-safe AutoAddPolicy

Source code in wizardwebssh/policy.py
class AutoAddPolicy(paramiko.client.MissingHostKeyPolicy):
    """
    thread-safe AutoAddPolicy
    """

    lock = threading.Lock()

    def is_missing_host_key(self, client, hostname, key):
        k = client._system_host_keys.lookup(hostname) or client._host_keys.lookup(hostname)
        if k is None:
            return True
        host_key = k.get(key.get_name(), None)
        if host_key is None:
            return True
        if host_key != key:
            raise paramiko.BadHostKeyException(hostname, key, host_key)

    def missing_host_key(self, client, hostname, key):
        with self.lock:
            if self.is_missing_host_key(client, hostname, key):
                keytype = key.get_name()
                logging.info("Adding {} host key for {}".format(keytype, hostname))
                client._host_keys._entries.append(paramiko.hostkeys.HostKeyEntry([hostname], key))

                with open(client._host_keys_filename, "a") as f:
                    f.write("{} {} {}\n".format(hostname, keytype, key.get_base64()))

missing_host_key(self, client, hostname, key)

Called when an .SSHClient receives a server key for a server that isn't in either the system or local .HostKeys object. To accept the key, simply return. To reject, raised an exception (which will be passed to the calling application).

Source code in wizardwebssh/policy.py
def missing_host_key(self, client, hostname, key):
    with self.lock:
        if self.is_missing_host_key(client, hostname, key):
            keytype = key.get_name()
            logging.info("Adding {} host key for {}".format(keytype, hostname))
            client._host_keys._entries.append(paramiko.hostkeys.HostKeyEntry([hostname], key))

            with open(client._host_keys_filename, "a") as f:
                f.write("{} {} {}\n".format(hostname, keytype, key.get_base64()))