#!/usr/bin/python
# -*- coding: utf-8 -*-

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

from testlib import *

import socket
import time
import unittest
import os
import subprocess

def kill_user_admin(machine):
    # logind from systemd 208 is buggy, so we use systemd directly if it fails
    # https://bugs.freedesktop.org/show_bug.cgi?id=71092
    machine.execute("loginctl kill-user admin || systemctl kill user-1000.slice")

def authorize_user(m, user):
    m.execute("mkdir -p /home/{0}/.ssh".format(user))
    m.upload(["files/ssh/id_rsa.pub"], "/home/{0}/.ssh/authorized_keys".format(user))
    m.execute("chown -R {0}:{0} /home/{0}/.ssh/".format(user))
    m.execute("chmod 600 /home/{0}/.ssh/authorized_keys".format(user))
    m.execute("chmod 700 /home/{0}/.ssh".format(user))

LOAD_KEYS = [
    "id_rsa", # password: foobar
    "id_ecdsa",  # no password
    "id_dsa", # password: badbad
    "id_ed25519", # password: foobar
]

KEY_IDS = [
    "2048 SHA256:SRvBhCmkCEVnJ6ascVH0AoVEbS3nPbowZkNevJnXtgw /home/admin/.ssh/id_rsa (RSA)",
    "256 SHA256:dyHF4jiKz6RolQqORIATqhbZ4kil5cyiMQWizbQWU8k /home/admin/.ssh/id_ecdsa (ECDSA)",
    "256 SHA256:Uht4NX54Gjz62cNA8+LrHo63HiFW/i5aWg/cl/A3X+c /home/admin/.ssh/id_ed25519 (ED25519)"
]

KEY_IDS_MD5 = [
    "2048 93:40:9e:67:82:78:a8:99:89:39:d5:ba:e0:50:70:e1 /home/admin/.ssh/id_rsa (RSA)",
    "256 bd:56:df:c3:ff:e4:1d:9d:f5:c4:b9:cc:64:00:d5:93 /home/admin/.ssh/id_ecdsa (ECDSA)",
    "256 7a:8a:54:6d:85:b3:e0:f2:7e:1f:14:7a:75:a8:33:d9 /home/admin/.ssh/id_ed25519 (ED25519)"
]

# https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=805030
@unittest.skipIf("debian" in os.environ.get("TEST_OS", ""), "No ssh keys on Debian (yet).")
class TestMultiMachineKeyAuth(MachineCase):

    def load_key(self, name, password):
        self.browser.switch_to_top()
        self.browser.eval_js("require('shell/credentials').keys_instance().load('{0}', '{1}')".format(name, password))

    def check_keys(self, keys):
        self.assertEqual(self.browser.eval_js("require('base1/cockpit').spawn([ 'ssh-add', '-l' ])"),
                         "\n".join(keys) + "\n")

    def setUp(self):
        MachineCase.setUp(self)
        self.machine2 = self.new_machine()
        self.machine2.start()
        self.machine2.wait_boot()
        # Add user
        self.machine2.execute("useradd user -c 'User' || true")
        self.machine2.execute("sed -i 's/.*PasswordAuthentication.*/PasswordAuthentication no/' /etc/ssh/sshd_config")
        self.machine2.execute("( ! systemctl is-active sshd.socket || systemctl stop sshd.socket) && systemctl restart sshd.service")
        self.machine2.wait_ssh()

    def tearDown(self):
        if self.runner and self.runner.wasSuccessful():
            self.check_journal_messages(self.machine2)
        MachineCase.tearDown(self)

    def testBasic(self):
        b = self.browser
        m1 = self.machine
        m2 = self.machine2

        # Load keys
        m1.execute("mkdir -p /home/admin/.ssh")

        m1.upload(["files/ssh/{0}".format(k) for k in LOAD_KEYS],
                   "/home/admin/.ssh/")
        m1.upload(["files/ssh/{0}.pub".format(k) for k in LOAD_KEYS],
                   "/home/admin/.ssh/")
        m1.execute("chmod 400 /home/admin/.ssh/*")
        m1.execute("chown -R admin:admin /home/admin/.ssh")

        self.login_and_go("/system")

        try:
            m1.execute("ps xa | grep ssh-agent | grep -v grep")
        except subprocess.CalledProcessError:
            assert False, "No running ssh-agent found"

        # pam-ssh-add isn't used on atomic
        if 'atomic' in m1.image:
            self.load_key('id_rsa', 'foobar')
            if 'rhel' in m1.image:
                self.check_keys([
                    "2048 93:40:9e:67:82:78:a8:99:89:39:d5:ba:e0:50:70:e1 id_rsa (RSA)"])
            else:
                self.check_keys([
                    "2048 SHA256:SRvBhCmkCEVnJ6ascVH0AoVEbS3nPbowZkNevJnXtgw id_rsa (RSA)"])
        elif 'rhel' in m1.image:
            self.check_keys(KEY_IDS_MD5)
        else:
            # Check our keys were loaded.
            self.check_keys(KEY_IDS)

        # Add machine
        b.switch_to_top()
        b.go("/@%s" % m2.address)
        b.wait_visible("#machine-troubleshoot")
        b.click('#machine-troubleshoot')
        b.wait_popup('troubleshoot-dialog')

        b.wait_text('#troubleshoot-dialog .btn-primary', "Add")
        b.wait_present("#troubleshoot-dialog .btn-primary:not([disabled])")
        b.click('#troubleshoot-dialog .btn-primary')
        b.wait_in_text('#troubleshoot-dialog', "Fingerprint")
        b.click('#troubleshoot-dialog .btn-primary')
        b.wait_in_text('#troubleshoot-dialog h4', "Log in to")
        b.wait_present("#troubleshoot-dialog .btn-primary:not([disabled])")
        b.wait_visible("#login-available")
        b.wait_not_present("#login-diff-password")
        b.wait_present("#login-custom-user")
        self.assertEqual(b.val("#login-custom-user"), "")
        b.set_val("#login-custom-user", "user")
        self.assertEqual(b.val("#login-type"), "stored")
        b.wait_not_in_text("#login-type", "Password")
        b.wait_visible("#login-available")
        b.wait_not_in_text("#login-available", "Login Password")
        b.wait_in_text("#login-available", "id_rsa")

        if not 'atomic' in m1.image:
            b.wait_in_text("#login-available", "id_ecdsa")
            b.wait_in_text("#login-available", "id_ed25519")

        # add key
        authorize_user(m2, "user")

        # Login
        b.click("#troubleshoot-dialog .btn-primary")
        b.wait_popdown('troubleshoot-dialog')
        b.enter_page("/system", host="user@{0}".format(m2.address))

        # Logout
        b.logout()
        b.wait_visible("#login")
        try:
            m1.execute("ps xa | grep ssh-agent | grep -v grep")
            assert False, "Logout did not stop ssh agent."
        except subprocess.CalledProcessError:
            pass

        # Remove authorized keys
        m2.execute("rm /home/user/.ssh/authorized_keys")
        authorize_user(m2, "admin")

        self.login_and_go("/system")
        b.switch_to_top()
        # pam-ssh-add isn't used on atomic
        if 'atomic' in m1.image:
            self.load_key('id_rsa', 'foobar')

        b.go("/@%s" % m2.address)
        b.wait_visible("#machine-troubleshoot")
        b.click('#machine-troubleshoot')
        b.wait_popup('troubleshoot-dialog')

        b.wait_in_text('#troubleshoot-dialog h4', "Log in to")
        b.wait_present("#troubleshoot-dialog .btn-primary:not([disabled])")
        b.wait_visible("#login-available")
        b.wait_visible("#login-custom-user")
        self.assertEqual(b.val("#login-custom-user"), "user")
        b.set_val("#login-custom-user", "")
        b.click("#troubleshoot-dialog .btn-primary")
        b.wait_popdown('troubleshoot-dialog')
        b.enter_page("/system", host=m2.address)
        b.wait_not_present("iframe.container-frame[name='cockpit1:user@{0}/system']".format(m2.address))

        # Change user
        authorize_user(m2, "user")

        b.go("/dashboard")
        b.enter_page("/dashboard")
        b.click('#dashboard-enable-edit')
        b.wait_visible("#dashboard-hosts .list-group-item[data-address='{0}'] button.pficon-edit".format(m2.address))
        b.click("#dashboard-hosts .list-group-item[data-address='{0}'] button.pficon-edit".format(m2.address))
        b.wait_popup('host-edit-dialog')
        b.wait_visible('#host-edit-user')
        self.assertEqual(b.val("#host-edit-user"), "")
        b.set_val('#host-edit-user', 'user')
        b.click('#host-edit-apply')
        b.wait_popdown('host-edit-dialog')
        b.wait_not_present("iframe.container-frame[name='cockpit1:{0}/system']".format(m2.address))

        b.click("#dashboard-hosts .list-group-item[data-address='{0}']".format(m2.address))
        b.enter_page("/system", host="user@{0}".format(m2.address))

        self.allow_restart_journal_messages()
        self.allow_hostkey_messages()
        # Might happen when killing the bridge.
        self.allow_journal_messages("localhost: dropping message while waiting for child to exit",
                                    "Received message for unknown channel: .*",
                                    ".*: error reading from ssh",
                                    ".*: bridge program failed: Child process exited with code .*",
                                    # Since there is not password,
                                    # reauthorize doesn't work on m2
                                    ".*: user admin reauthorization failed",
                                    "Error executing command as another user: Not authorized",
                                    "This incident has been reported.",
                                    "sudo: a password is required")

if __name__ == '__main__':
    test_main()
