# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from __future__ import print_function
import string
import sys
import time

import ddt
import mock
import testtools

from rally.common import utils
from rally import exceptions
from tests.unit import test


class ImmutableMixinTestCase(test.TestCase):

    def test_without_base_values(self):
        im = utils.ImmutableMixin()
        self.assertRaises(exceptions.ImmutableException,
                          im.__setattr__, "test", "test")

    def test_with_base_values(self):

        class A(utils.ImmutableMixin):
            def __init__(self, test):
                self.test = test
                super(A, self).__init__()

        a = A("test")
        self.assertRaises(exceptions.ImmutableException,
                          a.__setattr__, "abc", "test")
        self.assertEqual(a.test, "test")


class EnumMixinTestCase(test.TestCase):

    def test_enum_mix_in(self):

        class Foo(utils.EnumMixin):
            a = 10
            b = 20
            CC = "2000"

        self.assertEqual(set(list(Foo())), set([10, 20, "2000"]))

    def test_with_underscore(self):

        class Foo(utils.EnumMixin):
            a = 10
            b = 20
            _CC = "2000"

        self.assertEqual(set(list(Foo())), set([10, 20]))


class StdIOCaptureTestCase(test.TestCase):

    def test_stdout_capture(self):
        stdout = sys.stdout
        messages = ["abcdef", "defgaga"]
        with utils.StdOutCapture() as out:
            for msg in messages:
                print(msg)

        self.assertEqual(out.getvalue().rstrip("\n").split("\n"), messages)
        self.assertEqual(stdout, sys.stdout)

    def test_stderr_capture(self):
        stderr = sys.stderr
        messages = ["abcdef", "defgaga"]
        with utils.StdErrCapture() as err:
            for msg in messages:
                print(msg, file=sys.stderr)

        self.assertEqual(err.getvalue().rstrip("\n").split("\n"), messages)
        self.assertEqual(stderr, sys.stderr)


class TimerTestCase(test.TestCase):

    def test_timer_duration(self):
        start_time = time.time()
        end_time = time.time()

        with mock.patch("rally.common.utils.time") as mock_time:
            mock_time.time = mock.MagicMock(return_value=start_time)
            with utils.Timer() as timer:
                mock_time.time = mock.MagicMock(return_value=end_time)

        self.assertIsNone(timer.error)
        self.assertEqual(end_time - start_time, timer.duration())

    def test_timer_exception(self):
        try:
            with utils.Timer() as timer:
                raise Exception()
        except Exception:
            pass
        self.assertEqual(3, len(timer.error))
        self.assertEqual(timer.error[0], type(Exception()))


def module_level_method():
    pass


class MethodClassTestCase(test.TestCase):

    @testtools.skipIf(sys.version_info > (2, 9), "Problems with access to "
                                                 "class from <locals>")
    def test_method_class_for_class_level_method(self):
        class A:
            def m(self):
                pass
        self.assertEqual(A, utils.get_method_class(A.m))

    def test_method_class_for_module_level_method(self):
        self.assertIsNone(utils.get_method_class(module_level_method))


class FirstIndexTestCase(test.TestCase):

    def test_list_with_existing_matching_element(self):
        lst = [1, 3, 5, 7]
        self.assertEqual(utils.first_index(lst, lambda e: e == 1), 0)
        self.assertEqual(utils.first_index(lst, lambda e: e == 5), 2)
        self.assertEqual(utils.first_index(lst, lambda e: e == 7), 3)

    def test_list_with_non_existing_matching_element(self):
        lst = [1, 3, 5, 7]
        self.assertIsNone(utils.first_index(lst, lambda e: e == 2))


class EditDistanceTestCase(test.TestCase):

    def test_distance_empty_strings(self):
        dist = utils.distance("", "")
        self.assertEqual(0, dist)

    def test_distance_equal_strings(self):
        dist = utils.distance("abcde", "abcde")
        self.assertEqual(0, dist)

    def test_distance_replacement(self):
        dist = utils.distance("abcde", "__cde")
        self.assertEqual(2, dist)

    def test_distance_insertion(self):
        dist = utils.distance("abcde", "ab__cde")
        self.assertEqual(2, dist)

    def test_distance_deletion(self):
        dist = utils.distance("abcde", "abc")
        self.assertEqual(2, dist)


class TenantIteratorTestCase(test.TestCase):

    def test_iterate_per_tenant(self):
        users = []
        tenants_count = 2
        users_per_tenant = 5
        for tenant_id in range(tenants_count):
            for user_id in range(users_per_tenant):
                users.append({"id": str(user_id),
                              "tenant_id": str(tenant_id)})

        expected_result = [
            ({"id": "0", "tenant_id": str(i)}, str(i)) for i in range(
                tenants_count)]
        real_result = [i for i in utils.iterate_per_tenants(users)]

        self.assertEqual(expected_result, real_result)


class RAMIntTestCase(test.TestCase):

    @mock.patch("rally.common.utils.multiprocessing")
    def test__init__(self, mock_multiprocessing):
        utils.RAMInt()
        mock_multiprocessing.Lock.assert_called_once_with()
        mock_multiprocessing.Value.assert_called_once_with("I", 0)

    @mock.patch("rally.common.utils.multiprocessing")
    def test__int__(self, mock_multiprocessing):
        mock_multiprocessing.Value.return_value = mock.Mock(value=42)
        self.assertEqual(int(utils.RAMInt()), 42)

    @mock.patch("rally.common.utils.multiprocessing")
    def test__str__(self, mock_multiprocessing):
        mock_multiprocessing.Value.return_value = mock.Mock(value=42)
        self.assertEqual(str(utils.RAMInt()), "42")

    @mock.patch("rally.common.utils.multiprocessing")
    def test__iter__(self, mock_multiprocessing):
        ram_int = utils.RAMInt()
        self.assertEqual(iter(ram_int), ram_int)

    @mock.patch("rally.common.utils.multiprocessing")
    def test__next__(self, mock_multiprocessing):
        class MemInt(int):
            THRESHOLD = 5

            def __iadd__(self, i):
                return MemInt((int(self) + i) % self.THRESHOLD)

        mock_lock = mock.MagicMock()
        mock_multiprocessing.Lock.return_value = mock_lock
        mock_multiprocessing.Value.return_value = mock.Mock(value=MemInt(0))

        ram_int = utils.RAMInt()
        self.assertEqual(int(ram_int), 0)
        for i in range(MemInt.THRESHOLD - 1):
            self.assertEqual(ram_int.__next__(), i)
        self.assertRaises(StopIteration, ram_int.__next__)
        self.assertEqual(mock_lock.__enter__.mock_calls,
                         [mock.call()] * MemInt.THRESHOLD)
        self.assertEqual(len(mock_lock.__exit__.mock_calls), MemInt.THRESHOLD)

    @mock.patch("rally.common.utils.RAMInt.__next__",
                return_value="next_value")
    @mock.patch("rally.common.utils.multiprocessing")
    def test_next(self, mock_multiprocessing, mock_ram_int___next__):
        self.assertEqual(next(utils.RAMInt()), "next_value")
        mock_ram_int___next__.assert_called_once_with()

    @mock.patch("rally.common.utils.multiprocessing")
    def test_reset(self, mock_multiprocessing):
        ram_int = utils.RAMInt()
        self.assertRaises(TypeError, int, ram_int)
        ram_int.reset()
        self.assertEqual(int(ram_int), 0)


class GenerateRandomTestCase(test.TestCase):

    @mock.patch("rally.common.utils.random")
    def test_generate_random_name(self, mock_random):
        choice = "foobarspamchoicestring"

        idx = iter(range(100))
        mock_random.choice.side_effect = lambda choice: choice[next(idx)]
        self.assertEqual(utils.generate_random_name(),
                         string.ascii_lowercase[:16])

        idx = iter(range(100))
        mock_random.choice.side_effect = lambda choice: choice[next(idx)]
        self.assertEqual(utils.generate_random_name(length=10),
                         string.ascii_lowercase[:10])

        idx = iter(range(100))
        mock_random.choice.side_effect = lambda choice: choice[next(idx)]
        self.assertEqual(utils.generate_random_name(choice=choice),
                         choice[:16])

        idx = iter(range(100))
        mock_random.choice.side_effect = lambda choice: choice[next(idx)]
        self.assertEqual(utils.generate_random_name(choice=choice, length=5),
                         choice[:5])

        idx = iter(range(100))
        mock_random.choice.side_effect = lambda choice: choice[next(idx)]
        self.assertEqual(
            utils.generate_random_name(prefix="foo_", length=10),
            "foo_" + string.ascii_lowercase[:10])

        idx = iter(range(100))
        mock_random.choice.side_effect = lambda choice: choice[next(idx)]
        self.assertEqual(
            utils.generate_random_name(prefix="foo_",
                                       choice=choice, length=10),
            "foo_" + choice[:10])


@ddt.ddt
class RandomNameTestCase(test.TestCase):

    @ddt.data(
        {},
        {"task_id": "fake-task"},
        {"task_id": "2short", "expected": "s_rally_blargles_dweebled"},
        {"task_id": "fake!task",
         "expected": "s_rally_blargles_dweebled"},
        {"fmt": "XXXX-test-XXX-test",
         "expected": "fake-test-bla-test"})
    @ddt.unpack
    @mock.patch("random.choice")
    def test_generate_random_name(self, mock_choice, task_id="faketask",
                                  expected="s_rally_faketask_blargles",
                                  fmt="s_rally_XXXXXXXX_XXXXXXXX"):
        class FakeNameGenerator(utils.RandomNameGeneratorMixin):
            RESOURCE_NAME_FORMAT = fmt
            task = {"uuid": task_id}

        generator = FakeNameGenerator()

        mock_choice.side_effect = iter("blarglesdweebled")
        self.assertEqual(generator.generate_random_name(), expected)

    def test_generate_random_name_bogus_name_format(self):
        class FakeNameGenerator(utils.RandomNameGeneratorMixin):
            RESOURCE_NAME_FORMAT = "invalid_XXX_format"
            task = {"uuid": "fake-task-id"}

        generator = FakeNameGenerator()
        self.assertRaises(ValueError,
                          generator.generate_random_name)

    @ddt.data(
        {"good": ("rally_abcdefgh_abcdefgh", "rally_12345678_abcdefgh",
                  "rally_ABCdef12_ABCdef12"),
         "bad": ("rally_abcd_efgh", "rally_abcd!efg_12345678",
                 "rally_", "rally__", "rally_abcdefgh_",
                 "rally_abcdefghi_12345678", "foo", "foo_abcdefgh_abcdefgh")},
        {"fmt": "][*_XXX_XXX",
         "chars": "abc(.*)",
         "good": ("][*_abc_abc", "][*_abc_((("),
         "bad": ("rally_ab_cd", "rally_ab!_abc", "rally_", "rally__",
                 "rally_abc_", "rally_abcd_abc", "foo", "foo_abc_abc")},
        {"fmt": "XXXX-test-XXX-test",
         "good": ("abcd-test-abc-test",),
         "bad": ("rally-abcdefgh-abcdefgh", "abc-test-abc-test",
                 "abcd_test_abc_test", "abc-test-abcd-test")})
    @ddt.unpack
    def test_name_matches_pattern(self, good=(), bad=(),
                                  fmt="rally_XXXXXXXX_XXXXXXXX",
                                  chars=string.ascii_letters + string.digits):
        for name in good:
            self.assertTrue(
                utils.name_matches_pattern(name, fmt, chars),
                "%s unexpectedly didn't match resource_name_format" % name)

        for name in bad:
            self.assertFalse(
                utils.name_matches_pattern(name, fmt, chars),
                "%s unexpectedly matched resource_name_format" % name)

    @mock.patch("rally.common.utils.name_matches_pattern")
    def test_name_matches_object(self, mock_name_matches_pattern):
        name = "foo"
        self.assertEqual(
            utils.name_matches_object(name,
                                      utils.RandomNameGeneratorMixin),
            mock_name_matches_pattern.return_value)
        mock_name_matches_pattern.assert_called_once_with(
            name,
            utils. RandomNameGeneratorMixin.RESOURCE_NAME_FORMAT,
            utils.RandomNameGeneratorMixin.RESOURCE_NAME_ALLOWED_CHARACTERS)

    def test_name_matches_pattern_identity(self):
        generator = utils.RandomNameGeneratorMixin()
        generator.task = {"uuid": "faketask"}

        self.assertTrue(utils.name_matches_pattern(
            generator.generate_random_name(),
            generator.RESOURCE_NAME_FORMAT,
            generator.RESOURCE_NAME_ALLOWED_CHARACTERS))

    def test_name_matches_object_identity(self):
        generator = utils.RandomNameGeneratorMixin()
        generator.task = {"uuid": "faketask"}

        self.assertTrue(utils.name_matches_object(
            generator.generate_random_name(), generator))
        self.assertTrue(utils.name_matches_object(
            generator.generate_random_name(), utils.RandomNameGeneratorMixin))

    def test_consistent_task_id_part(self):
        class FakeNameGenerator(utils.RandomNameGeneratorMixin):
            RESOURCE_NAME_FORMAT = "XXXXXXXX_XXXXXXXX"

        generator = FakeNameGenerator()
        generator.task = {"uuid": "good-task-id"}

        names = [generator.generate_random_name() for i in range(100)]
        task_id_parts = set([n.split("_")[0] for n in names])
        self.assertEqual(len(task_id_parts), 1)

        generator.task = {"uuid": "bogus! task! id!"}

        names = [generator.generate_random_name() for i in range(100)]
        task_id_parts = set([n.split("_")[0] for n in names])
        self.assertEqual(len(task_id_parts), 1)