#!/usr/bin/python
# -*- coding: utf-8 -*-

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

from testlib import *
import os
import subprocess
import unittest

@unittest.skipIf("atomic" in os.environ.get("TEST_OS", ""), "cockpit-testing dependencies")
class TestConnection(MachineCase):
    def testBasic(self):
        b = self.browser
        m = self.machine

        m.start_cockpit()

        # take cockpit-ws down on the login page
        b.open("/system")
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        m.execute("systemctl stop cockpit-testing.socket")
        b.click('#login-button')
        b.wait_text_not('#login-fatal-message', "")
        m.execute("systemctl start cockpit-testing.socket")
        b.reload()
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.expect_load()
        b.enter_page("/system")

        # take cockpit-ws down on the server page
        m.execute("systemctl stop cockpit-testing.socket")
        b.switch_to_top()
        b.wait_popup("disconnected-dialog")
        m.execute("systemctl start cockpit-testing.socket")
        b.click('#disconnected-reconnect')
        b.expect_load()
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")

        # sever the connection on the login page
        m.execute("iptables -I INPUT 1 -p tcp --dport 9090 -j REJECT")
        b.click('#login-button')
        with b.wait_timeout(20):
            b.wait_text_not('#login-fatal-message', "")
        m.execute("iptables -D INPUT 1")
        b.reload()
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.expect_load()
        b.enter_page("/system")

        # sever the connection on the server page
        m.execute("iptables -I INPUT 1 -p tcp --dport 9090 -j REJECT")
        b.switch_to_top()
        with b.wait_timeout(60):
            b.wait_popup("disconnected-dialog")
        b.wait_in_text('#disconnected-error', "Connection has timed out.")
        m.execute("iptables -D INPUT 1")
        b.click('#disconnected-reconnect')
        b.expect_load()
        b.enter_page("/system")

        self.allow_restart_journal_messages()

    def testTls(self):
        m = self.machine

        # Start Cockpit with TLS
        m.execute("systemctl start cockpit.socket")

        null = open("/dev/null")
        args = ['openssl', 's_client', '-connect',  m.address + ":9090" ]

        # A normal TLS connection works
        m.message(repr(args))
        output = subprocess.check_output(args, stdin=null, stderr=subprocess.STDOUT)
        m.message(output)
        self.assertIn("DONE", output)

        # SSLv3 should not work
        try:
            cmd = args + [ '-ssl3' ]
            m.message(repr(cmd))
            output = subprocess.check_output(cmd, stdin=null, stderr=subprocess.STDOUT)
        except subprocess.CalledProcessError, ex:
            m.message(ex.output)
            self.assertIn("wrong version number", ex.output)
        else:
            m.message(output)
            self.fail("SSL3 should not have been successful")

        # RC4 should not work
        try:
            cmd = args + [ '-cipher', 'RC4' ]
            m.message(repr(cmd))
            output = subprocess.check_output(cmd, stdin=null, stderr=subprocess.STDOUT)
        except subprocess.CalledProcessError, ex:
            m.message(ex.output)
            self.assertIn("ssl handshake failure", ex.output)
        else:
            m.message(output)
            self.fail("RC4 cipher should not have been usable")

        # Install a certificate chain, and give it an arbitrary bad file context
        m.upload(["files/cert-chain.cert"], "/etc/cockpit/ws-certs.d")
        m.execute("! selinuxenabled || chcon --type svirt_sandbox_file_t /etc/cockpit/ws-certs.d/cert-chain.cert")

        # This should also reset the file context
        m.execute("systemctl restart cockpit")

        # Should use the new certificates and entire chain should show up
        m.message(repr(args))
        output = subprocess.check_output(args, stdin=null, stderr=subprocess.STDOUT)
        m.message(output)
        self.assertIn("DONE", output)
        self.assertIn("s:/CN=localhost", output)
        self.assertIn("1 s:/OU=Intermediate", output)

        self.allow_journal_messages(
            ".*Peer failed to perform TLS handshake",
            ".*Error performing TLS handshake: Could not negotiate a supported cipher suite.")

    def testConfigOrigins(self):
        m = self.machine
        m.start_cockpit()
        m.execute('echo "[WebService]\nOrigins = http://other-origin:9090 http://localhost:9090" > /etc/cockpit/cockpit.conf')
        output = m.execute('curl -s -f -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Origin: http://other-origin:9090" -H "Host: localhost:9090" -H "Sec-Websocket-Key: 3sc2c9IzwRUc3BlSIYwtSA==" -H "Sec-Websocket-Version: 13" http://localhost:9090/cockpit/socket')
        self.assertIn('"no-session"', output)

        # The socket should also answer at /socket
        output = m.execute('curl -s -f -N -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Origin: http://other-origin:9090" -H "Host: localhost:9090" -H "Sec-Websocket-Key: 3sc2c9IzwRUc3BlSIYwtSA==" -H "Sec-Websocket-Version: 13" http://localhost:9090/socket')
        self.assertIn('"no-session"', output)

        self.allow_journal_messages('peer did not close io when expected')

    def testSocketPort(self):
        m = self.machine

        # Change port according to documentation: http://files.cockpit-project.org/guide/latest/listen.html
        m.execute('! selinuxenabled || semanage port -m -t websm_port_t -p tcp 443')
        m.execute('mkdir -p /etc/systemd/system/cockpit.socket.d/ && printf "[Socket]\nListenStream=\nListenStream=443" > /etc/systemd/system/cockpit.socket.d/listen.conf')
        m.execute('systemctl daemon-reload && systemctl restart cockpit.socket')

        output = m.execute('wget --no-check-certificate -O - https://localhost 2>&1 || true')
        self.assertIn('Cockpit starting...', output)

        output = m.execute('wget --no-check-certificate -O - https://localhost:9090 2>&1 || true')
        self.assertIn('Connection refused', output)

        self.allow_journal_messages(".*Peer failed to perform TLS handshake")

if __name__ == '__main__':
    test_main()
