# Copyright 2012 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # # Copyright 2012 Nebula, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from functools import wraps import importlib import logging import os import traceback from unittest import mock from urllib import parse from django.conf import settings from django.contrib.messages.storage import cookie as cookie_storage from django.contrib.messages.storage import default_storage from django.core.handlers import wsgi from django.test.client import RequestFactory from django.test import tag from django.test import testcases from django import urls from openstack_auth import user from openstack_auth import utils from requests.packages.urllib3.connection import HTTPConnection from horizon import base from horizon import conf from horizon import exceptions from horizon.test import helpers as horizon_helpers from openstack_dashboard import api from openstack_dashboard import context_processors from openstack_dashboard.test.test_data import utils as test_utils LOG = logging.getLogger(__name__) # Makes output of failing tests much easier to read. wsgi.WSGIRequest.__repr__ = lambda self: "" # Shortcuts to avoid importing horizon_helpers and for backward compatibility. update_settings = horizon_helpers.update_settings IsA = horizon_helpers.IsA IsHttpRequest = horizon_helpers.IsHttpRequest def create_mocks(target_methods, stop_mock=True): """decorator to simplify setting up multiple mocks at once :param target_methods: a dict to define methods to be patched using mock. A key of "target_methods" is a target object whose attribute(s) are patched. A value of "target_methods" is a list of methods to be patched using mock. Each element of the list can be a string or a tuple consisting of two strings. A string specifies a method name of "target" object to be mocked. The decorator create a mock object for the method and the started mock can be accessed via 'mock_' of the test class. For example, in case of:: @create_mocks({api.nova: ['server_list', 'flavor_list']}) def test_example(self): ... self.mock_server_list.return_value = ... self.mock_flavar_list.side_effect = ... you can access the mocked method via "self.mock_server_list" inside a test class. The tuple version is useful when there are multiple methods with a same name are mocked in a single test. The format of the tuple is:: ("", "") The decorator create a mock object for "" and the started mock can be accessed via 'mock_' of the test class. Example:: @create_mocks({ api.nova: [ 'usage_get', ('tenant_absolute_limits', 'nova_tenant_absolute_limits'), ], api.cinder: [ ('tenant_absolute_limits', 'cinder_tenant_absolute_limits'), ], }) def test_example(self): ... self.mock_usage_get.return_value = ... self.mock_nova_tenant_absolute_limits.return_value = ... self.mock_cinder_tenant_absolute_limits.return_value = ... ... :param stop_mock: If True (default), mocks started in this decorator will be stopped. Set this to False only if you cannot stop mocks when exiting this decorator. The default value, True, should work for most cases. """ def wrapper(function): @wraps(function) def wrapped(inst, *args, **kwargs): patchers = [] for target, methods in target_methods.items(): for method in methods: if isinstance(method, str): method_mocked = method attr_name = method else: method_mocked = method[0] attr_name = method[1] m = mock.patch.object(target, method_mocked) patchers.append(m) setattr(inst, 'mock_%s' % attr_name, m.start()) retval = function(inst, *args, **kwargs) if stop_mock: for m in patchers: m.stop() return retval return wrapped return wrapper def _apply_panel_mocks(patchers=None): """Global mocks on panels that get called on all views.""" if patchers is None: patchers = {} mocked_methods = settings.TEST_GLOBAL_MOCKS_ON_PANELS for name, mock_config in mocked_methods.items(): method = mock_config['method'] mock_params = {} for param in ['return_value', 'side_effect']: if param in mock_config: mock_params[param] = mock_config[param] patcher = mock.patch(method, **mock_params) patcher.start() patchers[name] = patcher return patchers class RequestFactoryWithMessages(RequestFactory): def get(self, *args, **kwargs): req = super().get(*args, **kwargs) req.user = utils.get_user(req) req.session = [] req._messages = default_storage(req) return req def post(self, *args, **kwargs): req = super().post(*args, **kwargs) req.user = utils.get_user(req) req.session = [] req._messages = default_storage(req) return req class TestCase(horizon_helpers.TestCase): """Specialized base test case class for Horizon. It gives access to numerous additional features: * A full suite of test data through various attached objects and managers (e.g. ``self.servers``, ``self.user``, etc.). See the docs for :class:`~openstack_dashboard.test.test_data.utils.TestData` for more information. * A set of request context data via ``self.context``. * A ``RequestFactory`` class which supports Django's ``contrib.messages`` framework via ``self.factory``. * A ready-to-go request object via ``self.request``. * The ability to override specific time data controls for easier testing. * Several handy additional assertion methods. """ # To force test failures when unmocked API calls are attempted, provide # boolean variable to store failures missing_mocks = False def fake_conn_request(self): # print a stacktrace to illustrate where the unmocked API call # is being made from traceback.print_stack() # forcing a test failure for missing mock self.missing_mocks = True def setUp(self): self._real_conn_request = HTTPConnection.connect HTTPConnection.connect = self.fake_conn_request self._real_context_processor = context_processors.openstack context_processors.openstack = lambda request: self.context self.patchers = _apply_panel_mocks() super().setUp() def _setup_test_data(self): super()._setup_test_data() test_utils.load_test_data(self) self.context = { 'authorized_tenants': self.tenants.list(), 'JS_CATALOG': context_processors.get_js_catalog(settings), } def _setup_factory(self): # For some magical reason we need a copy of this here. self.factory = RequestFactoryWithMessages() def _setup_user(self, **kwargs): self._real_get_user = utils.get_user tenants = self.context['authorized_tenants'] base_kwargs = { 'id': self.user.id, 'token': self.token, 'username': self.user.name, 'domain_id': self.domain.id, 'user_domain_name': self.domain.name, 'tenant_id': self.tenant.id, 'service_catalog': self.service_catalog, 'authorized_tenants': tenants } base_kwargs.update(kwargs) self.setActiveUser(**base_kwargs) def _setup_request(self): super()._setup_request() self.request.session['token'] = self.token.id def tearDown(self): HTTPConnection.connect = self._real_conn_request context_processors.openstack = self._real_context_processor utils.get_user = self._real_get_user mock.patch.stopall() super().tearDown() # cause a test failure if an unmocked API call was attempted if self.missing_mocks: raise AssertionError("An unmocked API call was made.") def setActiveUser(self, id=None, token=None, username=None, tenant_id=None, service_catalog=None, tenant_name=None, roles=None, authorized_tenants=None, enabled=True, domain_id=None, user_domain_name=None): def get_user(request): ret = user.User( id=id, token=token, user=username, domain_id=domain_id, user_domain_name=user_domain_name, tenant_id=tenant_id, tenant_name=tenant_name, service_catalog=service_catalog, roles=roles, enabled=enabled, authorized_tenants=authorized_tenants, endpoint=settings.OPENSTACK_KEYSTONE_URL, ) ret._is_system_user = False return ret utils.get_user = get_user def assertRedirectsNoFollow(self, response, expected_url): """Check for redirect. Asserts that the given response issued a 302 redirect without processing the view which is redirected to. """ if response.has_header('location'): loc = response['location'] else: loc = '' loc = parse.unquote(loc) expected_url = parse.unquote(expected_url) self.assertEqual(loc, expected_url) self.assertEqual(response.status_code, 302) def assertNoFormErrors(self, response, context_name="form"): """Checks for no form errors. Asserts that the response either does not contain a form in its context, or that if it does, that form has no errors. """ context = getattr(response, "context", {}) if not context or context_name not in context: return True errors = response.context[context_name]._errors assert len(errors) == 0, \ "Unexpected errors were found on the form: %s" % errors def assertFormErrors(self, response, count=0, message=None, context_name="form"): """Check for form errors. Asserts that the response does contain a form in its context, and that form has errors, if count were given, it must match the exact numbers of errors """ context = getattr(response, "context", {}) assert (context and context_name in context), \ "The response did not contain a form." errors = response.context[context_name]._errors if count: assert len(errors) == count, \ "%d errors were found on the form, %d expected" % \ (len(errors), count) if message: text = testcases.assert_and_parse_html( self, message, None, '"message" contains invalid HTML:') content = testcases.assert_and_parse_html( self, str(errors), None, '"_errors" in the response context is not valid HTML:') match_count = content.count(text) self.assertGreaterEqual(match_count, 1) else: assert len(errors) > 0, "No errors were found on the form" def assertStatusCode(self, response, expected_code): """Validates an expected status code. Matches camel case of other assert functions """ if response.status_code == expected_code: return self.fail('status code %r != %r: %s' % (response.status_code, expected_code, response.content)) def assertItemsCollectionEqual(self, response, items_list): self.assertEqual(response.json, {"items": items_list}) def getAndAssertTableRowAction(self, response, table_name, action_name, row_id): table = response.context[table_name + '_table'] rows = list(filter(lambda x: x.id == row_id, table.data)) self.assertEqual(1, len(rows), "Did not find a row matching id '%s'" % row_id) row_actions = table.get_row_actions(rows[0]) actions = list(filter(lambda x: x.name == action_name, row_actions)) msg_args = (action_name, table_name, row_id) self.assertGreater( len(actions), 0, "No action named '%s' found in '%s' table for id '%s'" % msg_args) self.assertEqual( 1, len(actions), "Multiple actions named '%s' found in '%s' table for id '%s'" % msg_args) return actions[0] def getAndAssertTableAction(self, response, table_name, action_name): table = response.context[table_name + '_table'] table_actions = table.get_table_actions() actions = list(filter(lambda x: x.name == action_name, table_actions)) msg_args = (action_name, table_name) self.assertGreater( len(actions), 0, "No action named '%s' found in '%s' table" % msg_args) self.assertEqual( 1, len(actions), "More than one action named '%s' found in '%s' table" % msg_args) return actions[0] @staticmethod def mock_rest_request(**args): mock_args = { 'user.is_authenticated': True, 'policy.check.return_value': True, 'body': '' } mock_args.update(args) return mock.Mock(**mock_args) def assert_mock_multiple_calls_with_same_arguments( self, mocked_method, count, expected_call): self.assertEqual(count, mocked_method.call_count) mocked_method.assert_has_calls([expected_call] * count) def assertNoWorkflowErrors(self, response, context_name="workflow"): """Checks for no workflow errors. Asserts that the response either does not contain a workflow in its context, or that if it does, that workflow has no errors. """ context = getattr(response, "context", {}) if not context or context_name not in context: return True errors = [step.action._errors for step in response.context[context_name].steps] self.assertEqual( 0, len(errors), "Unexpected errors were found on the workflow: %s" % errors) def assertWorkflowErrors(self, response, count=0, message=None, context_name="workflow"): """Check for workflow errors. Asserts that the response does contain a workflow in its context, and that workflow has errors, if count were given, it must match the exact numbers of errors """ context = getattr(response, "context", {}) self.assertIn(context_name, context, msg="The response did not contain a workflow.") errors = {} for step in response.context[context_name].steps: errors.update(step.action._errors) if count: self.assertEqual( count, len(errors), "%d errors were found on the workflow, %d expected" % (len(errors), count)) if message and message not in str(errors): self.fail("Expected message not found, instead found: %s" % ["%s: %s" % (key, [e for e in field_errors]) for (key, field_errors) in errors.items()]) else: self.assertGreater( len(errors), 0, "No errors were found on the workflow") def assertCookieMessage(self, response, expected_msg, detail_msg=None): data = response.cookies["messages"] storage = cookie_storage.CookieStorage(None) messages = [m.message for m in storage._decode(data.value)] if detail_msg is not None: _expected = exceptions._append_detail(expected_msg, detail_msg) else: _expected = expected_msg self.assertIn(_expected, messages) class BaseAdminViewTests(TestCase): """Sets an active user with the "admin" role. For testing admin-only views and functionality. """ def setActiveUser(self, *args, **kwargs): if "roles" not in kwargs: kwargs['roles'] = [self.roles.admin._info] super().setActiveUser(*args, **kwargs) def setSessionValues(self, **kwargs): settings.SESSION_ENGINE = 'django.contrib.sessions.backends.file' engine = importlib.import_module(settings.SESSION_ENGINE) store = engine.SessionStore() for key in kwargs: store[key] = kwargs[key] self.request.session[key] = kwargs[key] store.save() self.session = store self.client.cookies[settings.SESSION_COOKIE_NAME] = store.session_key class APITestCase(TestCase): def setUp(self): super().setUp() utils.patch_middleware_get_user() class RestAPITestCase(TestCase): def setUp(self): super().setUp() mock.patch('horizon.utils.http.is_ajax', return_value=True).start() # APIMockTestCase was introduced to support mox to mock migration smoothly # but it turns we have still users of APITestCase. # We keep both for a while. # Looking at the usage of these classes, it seems better to drop this one. # TODO(amotoki): Clean up APIMockTestCase usage in horizon plugins. APIMockTestCase = APITestCase # Need this to test both Glance API V1 and V2 versions class ResetImageAPIVersionMixin(object): def setUp(self): super().setUp() api.glance.VERSIONS.clear_active_cache() def tearDown(self): api.glance.VERSIONS.clear_active_cache() super().tearDown() @horizon_helpers.pytest_mark('selenium') @tag('selenium') class SeleniumTestCase(horizon_helpers.SeleniumTestCase): def setUp(self): super().setUp() test_utils.load_test_data(self) self._real_get_user = utils.get_user self.setActiveUser(id=self.user.id, token=self.token, username=self.user.name, tenant_id=self.tenant.id, service_catalog=self.service_catalog, authorized_tenants=self.tenants.list()) self.patchers = _apply_panel_mocks() os.environ["HORIZON_TEST_RUN"] = "True" def tearDown(self): utils.get_user = self._real_get_user mock.patch.stopall() del os.environ["HORIZON_TEST_RUN"] def setActiveUser(self, id=None, token=None, username=None, tenant_id=None, service_catalog=None, tenant_name=None, roles=None, authorized_tenants=None, enabled=True): def get_user(request): return user.User(id=id, token=token, user=username, tenant_id=tenant_id, service_catalog=service_catalog, roles=roles, enabled=enabled, authorized_tenants=authorized_tenants, endpoint=settings.OPENSTACK_KEYSTONE_URL) utils.get_user = get_user class SeleniumAdminTestCase(SeleniumTestCase): """Version of AdminTestCase for Selenium. Sets an active user with the "admin" role for testing admin-only views and functionality. """ def setActiveUser(self, *args, **kwargs): if "roles" not in kwargs: kwargs['roles'] = [self.roles.admin._info] super().setActiveUser(*args, **kwargs) def my_custom_sort(flavor): sort_order = { 'm1.secret': 0, 'm1.tiny': 1, 'm1.massive': 2, 'm1.metadata': 3, } return sort_order[flavor.name] # TODO(amotoki): Investigate a way to run PluginTestCase with the main # unit tests. Currently we fail to find a way to clean up urlpatterns and # Site registry touched by setUp() cleanly. As a workaround, we run # PluginTestCase as a separate test process. Hopefully this workaround has gone # in future. For more detail, see bugs 1809983, 1866666 and # https://review.opendev.org/#/c/627640/. @horizon_helpers.pytest_mark('plugin_test') @tag('plugin-test') class PluginTestCase(TestCase): """Test case for testing plugin system of Horizon. For use with tests which deal with the pluggable dashboard and panel configuration, it takes care of backing up and restoring the Horizon configuration. """ def setUp(self): super().setUp() self.old_horizon_config = conf.HORIZON_CONFIG conf.HORIZON_CONFIG = conf.LazySettings() base.Horizon._urls() # Store our original dashboards self._discovered_dashboards = base.Horizon._registry.keys() # Gather up and store our original panels for each dashboard self._discovered_panels = {} for dash in self._discovered_dashboards: panels = base.Horizon._registry[dash]._registry.keys() self._discovered_panels[dash] = panels def tearDown(self): super().tearDown() conf.HORIZON_CONFIG = self.old_horizon_config # Destroy our singleton and re-create it. base.HorizonSite._instance = None del base.Horizon base.Horizon = base.HorizonSite() # Reload the convenience references to Horizon stored in __init__ importlib.reload(importlib.import_module("horizon")) # Re-register our original dashboards and panels. # This is necessary because autodiscovery only works on the first # import, and calling reload introduces innumerable additional # problems. Manual re-registration is the only good way for testing. for dash in self._discovered_dashboards: base.Horizon.register(dash) for panel in self._discovered_panels[dash]: dash.register(panel) self._reload_urls() def _reload_urls(self): """CLeans up URLs. Clears out the URL caches, reloads the root urls module, and re-triggers the autodiscovery mechanism for Horizon. Allows URLs to be re-calculated after registering new dashboards. Useful only for testing and should never be used on a live site. """ urls.clear_url_caches() importlib.reload(importlib.import_module(settings.ROOT_URLCONF)) base.Horizon._urls() def mock_obj_to_dict(r): return mock.Mock(**{'to_dict.return_value': r}) def mock_factory(r): """mocks all the attributes as well as the to_dict """ mocked = mock_obj_to_dict(r) mocked.configure_mock(**r) return mocked