#!/usr/bin/python3
# This file is part of Cockpit.
#
# Copyright (C) 2016 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import os
import sys
import unittest

# import Cockpit's machinery for test VMs and its browser test API
TEST_DIR = os.path.dirname(__file__)
sys.path.append(os.path.join(TEST_DIR, "common"))
sys.path.append(os.path.join(os.path.dirname(TEST_DIR), "bots/machine"))
from packagelib import *
from testlib import *
import testvm

# candlepin on the services image has a lot of demo data preloaded
# useful info/commands:
#    Login: doc      password: password
#    org:   snowwhite
#
#    Login: admin    password: admin
#    org:   admin
#
# if you download product files onto the test machine, these will show up as installed products
# local directory: /etc/pki/product
# sample products used in the tests (different subscription results for doc and admin):
# /home/admin/candlepin/generated_certs/6050.pem
# /home/admin/candlepin/generated_certs/88888.pem
#
# to use the candlepin image on a test machine, either add the certificate or
# allow insecure connections (/etc/rhsm/rhsm.conf -> "insecure = 1")
#
# $IP is the ip of the candlepin machine
#
# add an activation key to a pool:
# curl --insecure --request POST --user admin:admin \
# https://$IP:8443/candlepin/activation_keys/ff80808155ca50b10155ca50cd280010/pools/ff80808155ca50b10155ca51f04607e5
# register with: activation key "awesome_os_pool" and org "admin"
# or: subscription-manager register --activationkey awesome_os_pool --org admin --serverurl https://$IP:8443/candlepin
#
# in order to get the right ids for the activation key and pool, see ACTIVATION_KEY_COMMAND and POOL_COMMAND

WAIT_SCRIPT = """
import sys
import time
import requests

# minimum waiting time for Candlepin to answer (in seconds)
waiting_time = 10

sys.stderr.write("Waiting %s seconds for %s\\n" % (waiting_time, sys.argv[1]))
sys.stderr.flush()
time.sleep(waiting_time)
for i in range(1,200):
  try:
     sys.stderr.write("Waiting for %s\\n" % sys.argv[1])
     sys.stderr.flush()
     res = requests.get(sys.argv[1])
     break
  except:
     time.sleep(1)
"""

ACTIVATION_KEY_SCRIPT = """
import sys
import json
import requests

data = requests.get(sys.argv[1] + "/activation_keys", auth=("admin","admin")).json()
key = [e['id'] for e in data if e['name'] == 'awesome_os_pool' and e['owner']['displayName'] == 'Admin Owner'][0]

data = requests.get(sys.argv[1] + "/pools", auth=("admin","admin")).json()
pool = [ e['id'] for e in data if e['owner']['key'] == 'admin' and e['contractNumber'] == '0' and [p for p in e['providedProducts'] if p['productId'] == '88888'] ][0]

key_url = sys.argv[1] + "/activation_keys/{key}/pools/{pool}".format(key=key, pool=pool)
requests.post(key_url, auth=("admin","admin"))
"""

# fmt: off
CLIENT_ADDR = "10.111.112.1"
CANDLEPIN_ADDR = "10.111.112.100"
CANDLEPIN_HOSTNAME = "services.cockpit.lan"
CANDLEPIN_URL = "https://%s:8443/candlepin" % CANDLEPIN_HOSTNAME

PRODUCT_SNOWY = {
    "id": "6050",
    "name": "Snowy OS Premium Architecture Bits"
}

PRODUCT_SHARED = {
    "id": "88888",
    "name": "Shared File System Bits (no content)"
}
# fmt: on


def machine_python(machine, script, *args):
    cmd = ["python3", "-c", script] + list(args)
    return machine.execute(cmd)


def machine_restorecon(machine, path, *args):
    cmd = ["restorecon", "-R", path] + list(args)
    return machine.execute(cmd)


def skipUnlessDistroFamily(distro, reason):
    """
    Skip the current test function with the specified [reason]
    unless the distribution of the machine is part of the specified
    family [distro] (i.e. it starts as "distro-").
    """
    return unittest.skipUnless(testvm.DEFAULT_IMAGE.startswith(distro + "-"), reason)


class SubscriptionsCase(MachineCase):
    # fmt: off
    provision = {
        "0": {"address": CLIENT_ADDR + "/20"},
        "services": {"image": "services"}
    }
    # fmt: on

    def setUp(self):
        super(SubscriptionsCase, self).setUp()
        self.candlepin = self.machines["services"]
        m = self.machine

        # wait for candlepin to be active and verify
        # this changed in https://github.com/cockpit-project/bots/pull/1768
        self.candlepin.execute(
            "if [ -x /root/run-candlepin ]; then /root/run-candlepin; else systemctl start tomcat; fi"
        )

        # make sure the cockpit machine can resolve the service machine hostname
        m.execute(f"echo '{CANDLEPIN_ADDR} {CANDLEPIN_HOSTNAME}' >> /etc/hosts")

        # download product info from the candlepin machine
        def download_product(product):
            prod_id = product["id"]
            filename = os.path.join(self.tmpdir, "%s.pem" % prod_id)
            self.candlepin.download("/home/admin/candlepin/generated_certs/%s.pem" % prod_id, filename)
            m.upload([filename], "/etc/pki/product")

        m.execute(["mkdir", "-p", "/etc/pki/product"])
        download_product(PRODUCT_SNOWY)
        download_product(PRODUCT_SHARED)

        # download the candlepin CA certificate
        candlepin_ca_filename = "candlepin-ca.pem"
        candlepin_ca_tmpfile = os.path.join(self.tmpdir, candlepin_ca_filename)
        self.candlepin.download("/home/admin/candlepin/certs/candlepin-ca.crt", candlepin_ca_tmpfile)
        # make it available for the system, updating the system certificate store
        m.upload([candlepin_ca_tmpfile], f"/etc/pki/ca-trust/source/anchors/{candlepin_ca_filename}")
        machine_restorecon(self.machine, "/etc/pki")
        m.execute(["update-ca-trust", "extract"])
        # make it available for subscription-manager too
        m.upload([candlepin_ca_tmpfile], f"/etc/rhsm/ca/{candlepin_ca_filename}")
        machine_restorecon(self.machine, "/etc/rhsm/ca/")

        # Wait for the web service to be accessible
        machine_python(self.machine, WAIT_SCRIPT, CANDLEPIN_URL)

        hostname = m.execute(["hostname"]).rstrip()

        if m.image.startswith("rhel-"):
            m.write(
                "/etc/insights-client/insights-client.conf",
                f"""
[insights-client]
auto_config=False
auto_update=False
base_url={hostname}:8443/r/insights
cert_verify=/var/lib/insights/mock-certs/ca.crt
username=admin
password=foobar
""",
            )

        m.upload(["files/mock-insights"], "/var/tmp")
        m.spawn("env PYTHONUNBUFFERED=1 /var/tmp/mock-insights", "mock-insights.log")

        if m.image == "rhel-8-5":
            self.allow_journal_messages("json_object_get_string_member: assertion 'node != NULL' failed")

    def wait_subscription(self, product, is_subscribed):
        if is_subscribed is True:
            self.browser.wait_text("tr[data-row-id='%s'] .pf-c-label" % product["name"], "Subscribed")
        elif is_subscribed is False:
            self.browser.wait_text_not("tr[data-row-id='%s'] .pf-c-label" % product["name"], "Subscribed")


class TestSubscriptions(SubscriptionsCase):
    def testRegister(self):
        b = self.browser

        self.login_and_go("/subscriptions")

        register_button_sel = "button:contains('Register')"
        unregister_button_sel = "button:contains('Unregister')"

        # wait until we can open the registration dialog
        b.click(register_button_sel)

        b.wait_visible("#subscription-register-url")

        # enter server and incorrect login data
        b.set_val("#subscription-register-url", "custom")
        b.set_input_text("#subscription-register-url-custom", CANDLEPIN_URL)
        b.set_input_text("#subscription-register-username", "doc")
        b.set_input_text("#subscription-register-password", "wrongpass")

        # Do not try to connect to insights
        b.set_checked("#subscription-insights", False)

        # try to register
        dialog_register_button_sel = "footer .pf-m-primary"
        b.click(dialog_register_button_sel)

        # wait for message that we used wrong credentials
        self.allow_browser_errors("error registering")
        b.wait_in_text("body", "Invalid Credentials")

        # enter correct login data and try again, old error should disappear
        b.set_input_text("#subscription-register-password", "password")
        b.click(dialog_register_button_sel)

        b.wait_not_in_text("body", "Invalid credentials")

        # wait for message that we need to specify our org
        b.wait_in_text("body", "User doc is member of more organizations, but no organization was selected")

        # now specify the org
        b.set_input_text("#subscription-register-org", "snowwhite")

        # try to register again
        b.click(dialog_register_button_sel)

        # old error should disappear
        b.wait_not_in_text(
            "body", "User doc is member of more organizations, but no organization was selected"
        )

        # dialog should disappear
        b.wait_not_present(dialog_register_button_sel)

        # make sure this product is subscribed
        self.wait_subscription(PRODUCT_SNOWY, True)

        # unregister
        with b.wait_timeout(360):
            b.click(unregister_button_sel)

        b.wait_visible(register_button_sel)

    def testRegisterWithKey(self):
        b = self.browser

        self.login_and_go("/subscriptions")

        # wait until we can open the registration dialog
        register_button_sel = "button:contains('Register')"
        unregister_button_sel = "button:contains('Unregister')"
        b.click(register_button_sel)

        # enter server data
        b.wait_visible("#subscription-register-url")
        b.set_val("#subscription-register-url", "custom")
        b.set_input_text("#subscription-register-url-custom", CANDLEPIN_URL)

        # Do not try to connect to insights
        b.set_checked("#subscription-insights", False)

        # select registration method "activation key"
        activation_key_checkbox = "#subscription-register-activation-key-method"
        b.click(activation_key_checkbox)

        # make sure we have an activation key on the target machine
        machine_python(self.machine, ACTIVATION_KEY_SCRIPT, CANDLEPIN_URL)
        b.set_input_text("#subscription-register-key", "awesome_os_pool")
        b.set_input_text("#subscription-register-org", "admin")

        dialog_register_button_sel = "footer .pf-m-primary"
        b.click(dialog_register_button_sel)

        # dialog should disappear
        b.wait_not_present(dialog_register_button_sel)

        # make sure this product isn't subscribed
        self.wait_subscription(PRODUCT_SNOWY, False)

        # find another one that is subscribed
        self.wait_subscription(PRODUCT_SHARED, True)

        # unregister
        b.click(unregister_button_sel)

        # make sure this product isn't subscribed
        self.wait_subscription(PRODUCT_SNOWY, False)

        # find another one that isn't subscribed too
        self.wait_subscription(PRODUCT_SHARED, False)

    def testRegisterWithoutAutoAttach(self):
        b = self.browser

        self.login_and_go("/subscriptions")

        # wait until we can open the registration dialog
        register_button_sel = "button:contains('Register')"
        b.click(register_button_sel)

        b.wait_visible("#subscription-register-url")

        # enter server and correct login data
        b.set_val("#subscription-register-url", "custom")
        b.set_input_text("#subscription-register-url-custom", CANDLEPIN_URL)
        b.set_input_text("#subscription-register-username", "doc")
        b.set_input_text("#subscription-register-password", "password")
        b.set_input_text("#subscription-register-org", "snowwhite")

        b.click("#subscription-auto-attach-use")

        dialog_register_button_sel = "footer .pf-m-primary"
        b.click(dialog_register_button_sel)

        # dialog should disappear
        b.wait_not_present(dialog_register_button_sel)

        # this product should not be subscribed ATM, because auto-attach was skipped
        self.wait_subscription(PRODUCT_SHARED, False)

        b.click("button:contains('Auto-attach')")

        # find another one that is subscribed
        self.wait_subscription(PRODUCT_SHARED, True)

    def testUnpriv(self):
        self.machine.execute("useradd junior; echo junior:foobar | chpasswd")
        self.login_and_go("/subscriptions", user="junior")
        self.browser.wait_in_text(
            ".pf-c-empty-state__body", "current user isn't allowed to access system subscription"
        )
        self.allow_journal_messages("junior is not in the sudoers file.  This incident will be reported.")

    @skipUnlessDistroFamily("rhel", "Insights support is specific to RHEL")
    def testInsights(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/subscriptions")

        b.click("button:contains('Register')")
        b.wait_visible("#subscription-register-url")

        b.set_val("#subscription-register-url", "custom")
        b.set_input_text("#subscription-register-url-custom", CANDLEPIN_URL)
        b.set_input_text("#subscription-register-username", "admin")
        b.set_input_text("#subscription-register-password", "admin")
        b.set_input_text("#subscription-register-org", "admin")

        # Do not try to connect to insights
        b.set_checked("#subscription-insights", False)

        dialog_register_button_sel = "footer .pf-m-primary"
        b.click(dialog_register_button_sel)
        b.wait_not_present(dialog_register_button_sel)

        b.click("button:contains('Not connected')")
        b.wait_visible('.pf-c-modal-box__body:contains("This system is not connected")')
        b.click("footer button.apply")
        with b.wait_timeout(360):
            b.wait_not_present(".pf-c-modal-box")

        b.wait_visible("#overview a[href='http://cloud.redhat.com/insights/inventory/123-nice-id']")
        b.wait_visible("#overview a:contains('3 hits, including important')")

        # test system purpose
        m.execute(["subscription-manager", "role", "--set", "Red Hat Enterprise Linux Workstation"])
        m.execute(["subscription-manager", "usage", "--set", "Development/Test"])
        m.execute(["subscription-manager", "service-level", "--set", "Standard"])
        b.wait_in_text("#syspurpose", "Standard")
        b.wait_in_text("#syspurpose", "Development/Test")
        b.wait_in_text("#syspurpose", "Red Hat Enterprise Linux Workstation")

        b.click("button:contains('Connected to Insights')")
        b.wait_visible('.pf-c-modal-box__body:contains("Next Insights data upload")')
        b.wait_visible('.pf-c-modal-box__body:contains("Last Insights data upload")')
        b.click("button.pf-c-expandable-section__toggle:contains('Disconnect from Insights')")
        b.click("button.pf-m-danger:contains('Disconnect from Insights')")
        b.wait_not_present(".pf-c-modal-box")

        b.wait_visible("button:contains('Not connected')")

    @skipUnlessDistroFamily("rhel", "Insights support is specific to RHEL")
    def testSubAndInAndFail(self):
        m = self.machine
        b = self.browser

        # HACK - https://bugzilla.redhat.com/show_bug.cgi?id=2062136
        #
        # We rely on insights-client.service to be working, let's not
        # get distracted by SELinux.  Denials will still be logged and
        # tracked as known issues even when not enforcing.
        #
        m.execute(["setenforce", "0"])

        self.login_and_go("/subscriptions")

        b.click("button:contains('Register')")
        b.wait_visible("#subscription-register-url")

        b.set_val("#subscription-register-url", "custom")
        b.set_input_text("#subscription-register-url-custom", CANDLEPIN_URL)
        b.set_input_text("#subscription-register-username", "admin")
        b.set_input_text("#subscription-register-password", "admin")
        b.set_input_text("#subscription-register-org", "admin")
        b.set_checked("#subscription-insights", True)
        dialog_register_button_sel = "footer .pf-m-primary"
        b.click(dialog_register_button_sel)
        with b.wait_timeout(360):
            b.wait_not_present(dialog_register_button_sel)

        b.wait_visible("button:contains('Connected to Insights')")

        # Break the next upload and expect the warning triangle to tell us about it
        m.execute(["mv", "/etc/insights-client/machine-id", "/etc/insights-client/machine-id.lost"])
        m.execute(["systemctl", "start", "insights-client"])

        b.wait_visible("button .pf-c-button__icon svg[fill='orange']")

        b.click("button:contains('Connected to Insights')")
        b.wait_visible('.pf-c-modal-box__body:contains("The last Insights data upload has failed")')
        b.click("button.cancel")

        # Unbreak it and retry.
        m.execute(["mv", "/etc/insights-client/machine-id.lost", "/etc/insights-client/machine-id"])
        m.execute(
            "systemctl start insights-client; while systemctl --quiet is-active insights-client; do sleep 1; done",
            timeout=360,
        )

        b.wait_not_present("button .pf-c-button__icon svg[fill='orange']")

        b.click("button:contains('Unregister')")
        b.wait_not_in_text("#overview", "Insights")
        m.execute(["test", "-f", "/etc/insights-client/.unregistered"])


class TestSubscriptionsPackages(SubscriptionsCase, PackageCase):
    def testMissingPackages(self):
        m = self.machine
        b = self.browser

        if m.image.startswith("rhel-"):
            m.execute(["pkcon", "remove", "-y", "insights-client"])

        self.createPackage("insights-client", "999", "1")
        self.enableRepo()
        m.execute(["pkcon", "refresh"])

        self.login_and_go("/subscriptions")

        b.click("button:contains('Register')")
        b.wait_visible("#subscription-register-url")

        b.set_val("#subscription-register-url", "custom")
        b.set_input_text("#subscription-register-url-custom", CANDLEPIN_URL)
        b.set_input_text("#subscription-register-username", "admin")
        b.set_input_text("#subscription-register-password", "admin")
        b.set_input_text("#subscription-register-org", "admin")
        b.set_checked("#subscription-insights", True)
        b.wait_visible('.pf-c-modal-box__body:contains("The insights-client package will be installed")')
        b.click("footer button.apply")
        with b.wait_timeout(360):
            b.wait_not_present(".pf-c-modal-box")

        # Connecting to Insights will not have worked because the
        # insights-client binary is not actually there.

        b.wait_visible(".pf-c-alert:contains('not-found')")

        # Try again with the connection dialog.

        m.execute(["test", "-f", "/stamp-insights-client-999-1"])
        m.execute(["pkcon", "remove", "-y", "insights-client"])
        m.execute(["pkcon", "refresh"])

        b.click("button:contains('Not connected')")
        b.wait_visible('.pf-c-modal-box__body:contains("This system is not connected")')
        b.wait_visible('.pf-c-modal-box__body:contains("The insights-client package will be installed")')
        b.click("footer button.apply")
        with b.wait_timeout(360):
            b.wait_visible('footer:contains("not-found")')
        b.click("footer button.cancel")

        m.execute(["test", "-f", "/stamp-insights-client-999-1"])


if __name__ == "__main__":
    test_main()
