Janet Yu e2ef492311 Fix addition of plugin panel to panel group
Revert fix from bug #1329050, which adds a plugin panel to its dashboard's
class. Adding a plugin panel to a dashboard whose class has its panels defined
in a tuple will fail because the new plugin cannot be appended to the tuple.
The code errors out before the panel gets added to the dashboard's panel list.
However, at this point, the panel has already been registered with the
dashboard. This causes Dashboard.get_panels() to see the panel in its registry
but not in its panel groups. Consequently, the panel gets put in the "Other"
panel group instead of its configured one, as described in bug #1378558.

By not adding the the plugin panel to its dashboard's class, the panel gets
added to its dashboard's panel list. Now the original issue from bug #1329050
resurfaces. The root cause of that is that every time Dashboard._autodiscover()
is called, it instantiates all its panel group classes, wiping out any panels
previously added to any of the panel groups. Avoid this by always processing
all the plugin panel groups before any of the plugin panels, which also ensures
that the panel groups exist before their panels.

Making this change exposed an issue where the Horizon URLs were being loaded
twice during the plugin infrastructure's unit tests, causing plugin panels to
be added multiple times to their dashboards. Fix that by removing the second,
unnecessary call. Also create a second test plugin panel group and panel to
test the scenarios described in the two bugs addressed here, adding two panel
groups at a time.

Change-Id: Id6a99c3ff18102c8f47431638d4dda98f14ec641
Closes-Bug: 1378558
2015-04-22 23:02:43 -07:00

600 lines
24 KiB
Python

# Copyright 2012 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Copyright 2012 Nebula, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
from functools import wraps # noqa
import json
import os
from ceilometerclient.v2 import client as ceilometer_client
from cinderclient import client as cinder_client
from django.conf import settings
from django.contrib.messages.storage import default_storage # noqa
from django.core.handlers import wsgi
from django.core import urlresolvers
from django.test.client import RequestFactory # noqa
from django.test import utils as django_test_utils
from django.utils.importlib import import_module # noqa
from django.utils import unittest
import glanceclient
from heatclient import client as heat_client
import httplib2
from keystoneclient.v2_0 import client as keystone_client
import mock
import mox
from neutronclient.v2_0 import client as neutron_client
from novaclient.v2 import client as nova_client
from openstack_auth import user
from openstack_auth import utils
from saharaclient import client as sahara_client
from swiftclient import client as swift_client
from troveclient import client as trove_client
from horizon import base
from horizon import conf
from horizon.test import helpers as horizon_helpers
from openstack_dashboard import api
from openstack_dashboard import context_processors
from openstack_dashboard.test.test_data import utils as test_utils
# Makes output of failing mox tests much easier to read.
wsgi.WSGIRequest.__repr__ = lambda self: "<class 'django.http.HttpRequest'>"
def create_stubs(stubs_to_create={}):
"""decorator to simplify setting up multiple stubs at once via mox
:param stubs_to_create: methods to stub in one or more modules
:type stubs_to_create: dict
The keys are python paths to the module containing the methods to mock.
To mock a method in openstack_dashboard/api/nova.py, the key is::
api.nova
The values are either a tuple of list of methods to mock in the module
indicated by the key.
For example::
('server_list',)
-or-
('flavor_list', 'server_list',)
-or-
['flavor_list', 'server_list']
Additionally, multiple modules can be mocked at once::
{
api.nova: ('flavor_list', 'server_list'),
api.glance: ('image_list_detailed',),
}
"""
if not isinstance(stubs_to_create, dict):
raise TypeError("create_stub must be passed a dict, but a %s was "
"given." % type(stubs_to_create).__name__)
def inner_stub_out(fn):
@wraps(fn)
def instance_stub_out(self, *args, **kwargs):
for key in stubs_to_create:
if not (isinstance(stubs_to_create[key], tuple) or
isinstance(stubs_to_create[key], list)):
raise TypeError("The values of the create_stub "
"dict must be lists or tuples, but "
"is a %s."
% type(stubs_to_create[key]).__name__)
for value in stubs_to_create[key]:
self.mox.StubOutWithMock(key, value)
return fn(self, *args, **kwargs)
return instance_stub_out
return inner_stub_out
class RequestFactoryWithMessages(RequestFactory):
def get(self, *args, **kwargs):
req = super(RequestFactoryWithMessages, self).get(*args, **kwargs)
req.user = utils.get_user(req)
req.session = []
req._messages = default_storage(req)
return req
def post(self, *args, **kwargs):
req = super(RequestFactoryWithMessages, self).post(*args, **kwargs)
req.user = utils.get_user(req)
req.session = []
req._messages = default_storage(req)
return req
@unittest.skipIf(os.environ.get('SKIP_UNITTESTS', False),
"The SKIP_UNITTESTS env variable is set.")
class TestCase(horizon_helpers.TestCase):
"""Specialized base test case class for Horizon.
It gives access to numerous additional features:
* A full suite of test data through various attached objects and
managers (e.g. ``self.servers``, ``self.user``, etc.). See the
docs for
:class:`~openstack_dashboard.test.test_data.utils.TestData`
for more information.
* The ``mox`` mocking framework via ``self.mox``.
* A set of request context data via ``self.context``.
* A ``RequestFactory`` class which supports Django's ``contrib.messages``
framework via ``self.factory``.
* A ready-to-go request object via ``self.request``.
* The ability to override specific time data controls for easier testing.
* Several handy additional assertion methods.
"""
def setUp(self):
def fake_conn_request(*args, **kwargs):
raise Exception("An external URI request tried to escape through "
"an httplib2 client. Args: %s, kwargs: %s"
% (args, kwargs))
self._real_conn_request = httplib2.Http._conn_request
httplib2.Http._conn_request = fake_conn_request
self._real_context_processor = context_processors.openstack
context_processors.openstack = lambda request: self.context
self.patchers = {}
self.add_panel_mocks()
super(TestCase, self).setUp()
def _setup_test_data(self):
super(TestCase, self)._setup_test_data()
test_utils.load_test_data(self)
self.context = {'authorized_tenants': self.tenants.list()}
def _setup_factory(self):
# For some magical reason we need a copy of this here.
self.factory = RequestFactoryWithMessages()
def _setup_user(self):
self._real_get_user = utils.get_user
tenants = self.context['authorized_tenants']
self.setActiveUser(id=self.user.id,
token=self.token,
username=self.user.name,
domain_id=self.domain.id,
tenant_id=self.tenant.id,
service_catalog=self.service_catalog,
authorized_tenants=tenants)
def _setup_request(self):
super(TestCase, self)._setup_request()
self.request.session['token'] = self.token.id
def add_panel_mocks(self):
"""Global mocks on panels that get called on all views."""
self.patchers['aggregates'] = mock.patch(
'openstack_dashboard.dashboards.admin'
'.aggregates.panel.Aggregates.can_access',
mock.Mock(return_value=True))
self.patchers['aggregates'].start()
def tearDown(self):
httplib2.Http._conn_request = self._real_conn_request
context_processors.openstack = self._real_context_processor
utils.get_user = self._real_get_user
mock.patch.stopall()
super(TestCase, self).tearDown()
def setActiveUser(self, id=None, token=None, username=None, tenant_id=None,
service_catalog=None, tenant_name=None, roles=None,
authorized_tenants=None, enabled=True, domain_id=None):
def get_user(request):
return user.User(id=id,
token=token,
user=username,
domain_id=domain_id,
tenant_id=tenant_id,
service_catalog=service_catalog,
roles=roles,
enabled=enabled,
authorized_tenants=authorized_tenants,
endpoint=settings.OPENSTACK_KEYSTONE_URL)
utils.get_user = get_user
def assertRedirectsNoFollow(self, response, expected_url):
"""Check for redirect.
Asserts that the given response issued a 302 redirect without
processing the view which is redirected to.
"""
assert (response.status_code / 100 == 3), \
"The response did not return a redirect."
self.assertEqual(response._headers.get('location', None),
('Location', settings.TESTSERVER + expected_url))
self.assertEqual(response.status_code, 302)
def assertNoFormErrors(self, response, context_name="form"):
"""Checks for no form errors.
Asserts that the response either does not contain a form in its
context, or that if it does, that form has no errors.
"""
context = getattr(response, "context", {})
if not context or context_name not in context:
return True
errors = response.context[context_name]._errors
assert len(errors) == 0, \
"Unexpected errors were found on the form: %s" % errors
def assertFormErrors(self, response, count=0, message=None,
context_name="form"):
"""Check for form errors.
Asserts that the response does contain a form in its
context, and that form has errors, if count were given,
it must match the exact numbers of errors
"""
context = getattr(response, "context", {})
assert (context and context_name in context), \
"The response did not contain a form."
errors = response.context[context_name]._errors
if count:
assert len(errors) == count, \
"%d errors were found on the form, %d expected" % \
(len(errors), count)
if message and message not in unicode(errors):
self.fail("Expected message not found, instead found: %s"
% ["%s: %s" % (key, [e for e in field_errors]) for
(key, field_errors) in errors.items()])
else:
assert len(errors) > 0, "No errors were found on the form"
def assertStatusCode(self, response, expected_code):
"""Validates an expected status code.
Matches camel case of other assert functions
"""
if response.status_code == expected_code:
return
self.fail('status code %r != %r: %s' % (response.status_code,
expected_code,
response.content))
def assertItemsCollectionEqual(self, response, items_list):
self.assertEqual(response.content,
'{"items": ' + json.dumps(items_list) + "}")
@staticmethod
def mock_rest_request(**args):
mock_args = {
'user.is_authenticated.return_value': True,
'is_ajax.return_value': True,
'policy.check.return_value': True,
'body': ''
}
mock_args.update(args)
return mock.Mock(**mock_args)
class BaseAdminViewTests(TestCase):
"""Sets an active user with the "admin" role.
For testing admin-only views and functionality.
"""
def setActiveUser(self, *args, **kwargs):
if "roles" not in kwargs:
kwargs['roles'] = [self.roles.admin._info]
super(BaseAdminViewTests, self).setActiveUser(*args, **kwargs)
def setSessionValues(self, **kwargs):
settings.SESSION_ENGINE = 'django.contrib.sessions.backends.file'
engine = import_module(settings.SESSION_ENGINE)
store = engine.SessionStore()
for key in kwargs:
store[key] = kwargs[key]
self.request.session[key] = kwargs[key]
store.save()
self.session = store
self.client.cookies[settings.SESSION_COOKIE_NAME] = store.session_key
class APITestCase(TestCase):
"""Testing APIs.
For use with tests which deal with the underlying clients rather than
stubbing out the openstack_dashboard.api.* methods.
"""
def setUp(self):
super(APITestCase, self).setUp()
utils.patch_middleware_get_user()
def fake_keystoneclient(request, admin=False):
"""Returns the stub keystoneclient.
Only necessary because the function takes too many arguments to
conveniently be a lambda.
"""
return self.stub_keystoneclient()
# Store the original clients
self._original_glanceclient = api.glance.glanceclient
self._original_keystoneclient = api.keystone.keystoneclient
self._original_novaclient = api.nova.novaclient
self._original_neutronclient = api.neutron.neutronclient
self._original_cinderclient = api.cinder.cinderclient
self._original_heatclient = api.heat.heatclient
self._original_ceilometerclient = api.ceilometer.ceilometerclient
self._original_troveclient = api.trove.troveclient
self._original_saharaclient = api.sahara.client
# Replace the clients with our stubs.
api.glance.glanceclient = lambda request: self.stub_glanceclient()
api.keystone.keystoneclient = fake_keystoneclient
api.nova.novaclient = lambda request: self.stub_novaclient()
api.neutron.neutronclient = lambda request: self.stub_neutronclient()
api.cinder.cinderclient = lambda request: self.stub_cinderclient()
api.heat.heatclient = (lambda request, password=None:
self.stub_heatclient())
api.ceilometer.ceilometerclient = (lambda request:
self.stub_ceilometerclient())
api.trove.troveclient = lambda request: self.stub_troveclient()
api.sahara.client = lambda request: self.stub_saharaclient()
def tearDown(self):
super(APITestCase, self).tearDown()
api.glance.glanceclient = self._original_glanceclient
api.nova.novaclient = self._original_novaclient
api.keystone.keystoneclient = self._original_keystoneclient
api.neutron.neutronclient = self._original_neutronclient
api.cinder.cinderclient = self._original_cinderclient
api.heat.heatclient = self._original_heatclient
api.ceilometer.ceilometerclient = self._original_ceilometerclient
api.trove.troveclient = self._original_troveclient
api.sahara.client = self._original_saharaclient
def stub_novaclient(self):
if not hasattr(self, "novaclient"):
self.mox.StubOutWithMock(nova_client, 'Client')
self.novaclient = self.mox.CreateMock(nova_client.Client)
return self.novaclient
def stub_cinderclient(self):
if not hasattr(self, "cinderclient"):
self.mox.StubOutWithMock(cinder_client, 'Client')
self.cinderclient = self.mox.CreateMock(cinder_client.Client)
return self.cinderclient
def stub_keystoneclient(self):
if not hasattr(self, "keystoneclient"):
self.mox.StubOutWithMock(keystone_client, 'Client')
# NOTE(saschpe): Mock properties, MockObject.__init__ ignores them:
keystone_client.Client.auth_token = 'foo'
keystone_client.Client.service_catalog = None
keystone_client.Client.tenant_id = '1'
keystone_client.Client.tenant_name = 'tenant_1'
keystone_client.Client.management_url = ""
keystone_client.Client.__dir__ = lambda: []
self.keystoneclient = self.mox.CreateMock(keystone_client.Client)
return self.keystoneclient
def stub_glanceclient(self):
if not hasattr(self, "glanceclient"):
self.mox.StubOutWithMock(glanceclient, 'Client')
self.glanceclient = self.mox.CreateMock(glanceclient.Client)
return self.glanceclient
def stub_neutronclient(self):
if not hasattr(self, "neutronclient"):
self.mox.StubOutWithMock(neutron_client, 'Client')
self.neutronclient = self.mox.CreateMock(neutron_client.Client)
return self.neutronclient
def stub_swiftclient(self, expected_calls=1):
if not hasattr(self, "swiftclient"):
self.mox.StubOutWithMock(swift_client, 'Connection')
self.swiftclient = self.mox.CreateMock(swift_client.Connection)
while expected_calls:
swift_client.Connection(None,
mox.IgnoreArg(),
None,
preauthtoken=mox.IgnoreArg(),
preauthurl=mox.IgnoreArg(),
cacert=None,
insecure=False,
auth_version="2.0") \
.AndReturn(self.swiftclient)
expected_calls -= 1
return self.swiftclient
def stub_heatclient(self):
if not hasattr(self, "heatclient"):
self.mox.StubOutWithMock(heat_client, 'Client')
self.heatclient = self.mox.CreateMock(heat_client.Client)
return self.heatclient
def stub_ceilometerclient(self):
if not hasattr(self, "ceilometerclient"):
self.mox.StubOutWithMock(ceilometer_client, 'Client')
self.ceilometerclient = self.mox.\
CreateMock(ceilometer_client.Client)
return self.ceilometerclient
def stub_troveclient(self):
if not hasattr(self, "troveclient"):
self.mox.StubOutWithMock(trove_client, 'Client')
self.troveclient = self.mox.CreateMock(trove_client.Client)
return self.troveclient
def stub_saharaclient(self):
if not hasattr(self, "saharaclient"):
self.mox.StubOutWithMock(sahara_client, 'Client')
self.saharaclient = self.mox.CreateMock(sahara_client.Client)
return self.saharaclient
@unittest.skipUnless(os.environ.get('WITH_SELENIUM', False),
"The WITH_SELENIUM env variable is not set.")
class SeleniumTestCase(horizon_helpers.SeleniumTestCase):
def setUp(self):
super(SeleniumTestCase, self).setUp()
test_utils.load_test_data(self)
self.mox = mox.Mox()
self._real_get_user = utils.get_user
self.setActiveUser(id=self.user.id,
token=self.token,
username=self.user.name,
tenant_id=self.tenant.id,
service_catalog=self.service_catalog,
authorized_tenants=self.tenants.list())
self.patchers = {}
self.patchers['aggregates'] = mock.patch(
'openstack_dashboard.dashboards.admin'
'.aggregates.panel.Aggregates.can_access',
mock.Mock(return_value=True))
self.patchers['aggregates'].start()
os.environ["HORIZON_TEST_RUN"] = "True"
def tearDown(self):
self.mox.UnsetStubs()
utils.get_user = self._real_get_user
mock.patch.stopall()
self.mox.VerifyAll()
del os.environ["HORIZON_TEST_RUN"]
def setActiveUser(self, id=None, token=None, username=None, tenant_id=None,
service_catalog=None, tenant_name=None, roles=None,
authorized_tenants=None, enabled=True):
def get_user(request):
return user.User(id=id,
token=token,
user=username,
tenant_id=tenant_id,
service_catalog=service_catalog,
roles=roles,
enabled=enabled,
authorized_tenants=authorized_tenants,
endpoint=settings.OPENSTACK_KEYSTONE_URL)
utils.get_user = get_user
class SeleniumAdminTestCase(SeleniumTestCase):
"""Version of AdminTestCase for Selenium.
Sets an active user with the "admin" role for testing admin-only views and
functionality.
"""
def setActiveUser(self, *args, **kwargs):
if "roles" not in kwargs:
kwargs['roles'] = [self.roles.admin._info]
super(SeleniumAdminTestCase, self).setActiveUser(*args, **kwargs)
def my_custom_sort(flavor):
sort_order = {
'm1.secret': 0,
'm1.tiny': 1,
'm1.massive': 2,
'm1.metadata': 3,
}
return sort_order[flavor.name]
class PluginTestCase(TestCase):
"""Test case for testing plugin system of Horizon.
For use with tests which deal with the pluggable dashboard and panel
configuration, it takes care of backing up and restoring the Horizon
configuration.
"""
def setUp(self):
super(PluginTestCase, self).setUp()
self.old_horizon_config = conf.HORIZON_CONFIG
conf.HORIZON_CONFIG = conf.LazySettings()
base.Horizon._urls()
# Store our original dashboards
self._discovered_dashboards = base.Horizon._registry.keys()
# Gather up and store our original panels for each dashboard
self._discovered_panels = {}
for dash in self._discovered_dashboards:
panels = base.Horizon._registry[dash]._registry.keys()
self._discovered_panels[dash] = panels
def tearDown(self):
super(PluginTestCase, self).tearDown()
conf.HORIZON_CONFIG = self.old_horizon_config
# Destroy our singleton and re-create it.
base.HorizonSite._instance = None
del base.Horizon
base.Horizon = base.HorizonSite()
# Reload the convenience references to Horizon stored in __init__
reload(import_module("horizon"))
# Re-register our original dashboards and panels.
# This is necessary because autodiscovery only works on the first
# import, and calling reload introduces innumerable additional
# problems. Manual re-registration is the only good way for testing.
for dash in self._discovered_dashboards:
base.Horizon.register(dash)
for panel in self._discovered_panels[dash]:
dash.register(panel)
self._reload_urls()
def _reload_urls(self):
"""CLeans up URLs.
Clears out the URL caches, reloads the root urls module, and
re-triggers the autodiscovery mechanism for Horizon. Allows URLs
to be re-calculated after registering new dashboards. Useful
only for testing and should never be used on a live site.
"""
urlresolvers.clear_url_caches()
reload(import_module(settings.ROOT_URLCONF))
base.Horizon._urls()
class update_settings(django_test_utils.override_settings):
"""override_settings which allows override an item in dict.
django original override_settings replaces a dict completely,
however OpenStack dashboard setting has many dictionary configuration
and there are test case where we want to override only one item in
a dictionary and keep other items in the dictionary.
This version of override_settings allows this if keep_dict is True.
If keep_dict False is specified, the original behavior of
Django override_settings is used.
"""
def __init__(self, keep_dict=True, **kwargs):
if keep_dict:
for key, new_value in kwargs.items():
value = getattr(settings, key, None)
if (isinstance(new_value, collections.Mapping) and
isinstance(value, collections.Mapping)):
copied = copy.copy(value)
copied.update(new_value)
kwargs[key] = copied
super(update_settings, self).__init__(**kwargs)