Create test_trove_handlers.py

This commit is contained in:
Jonathan Arrance 2016-09-03 10:24:57 -04:00 committed by GitHub
parent dbc394a835
commit 0866daeca2

View File

@ -0,0 +1,234 @@
from __future__ import absolute_import
from __future__ import print_function
import unittest
import mock
import reactive.designate_handlers as handlers
_when_args = {}
_when_not_args = {}
def mock_hook_factory(d):
def mock_hook(*args, **kwargs):
def inner(f):
# remember what we were passed. Note that we can't actually
# determine the class we're attached to, as the decorator only gets
# the function.
try:
d[f.__name__].append(dict(args=args, kwargs=kwargs))
except KeyError:
d[f.__name__] = [dict(args=args, kwargs=kwargs)]
return f
return inner
return mock_hook
class TestDesignateHandlers(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._patched_when = mock.patch('charms.reactive.when',
mock_hook_factory(_when_args))
cls._patched_when_started = cls._patched_when.start()
cls._patched_when_not = mock.patch('charms.reactive.when_not',
mock_hook_factory(_when_not_args))
cls._patched_when_not_started = cls._patched_when_not.start()
# force requires to rerun the mock_hook decorator:
# try except is Python2/Python3 compatibility as Python3 has moved
# reload to importlib.
try:
reload(handlers)
except NameError:
import importlib
importlib.reload(handlers)
@classmethod
def tearDownClass(cls):
cls._patched_when.stop()
cls._patched_when_started = None
cls._patched_when = None
cls._patched_when_not.stop()
cls._patched_when_not_started = None
cls._patched_when_not = None
# and fix any breakage we did to the module
try:
reload(handlers)
except NameError:
import importlib
importlib.reload(handlers)
def setUp(self):
self._patches = {}
self._patches_start = {}
def tearDown(self):
for k, v in self._patches.items():
v.stop()
setattr(self, k, None)
self._patches = None
self._patches_start = None
def patch(self, obj, attr, return_value=None):
mocked = mock.patch.object(obj, attr)
self._patches[attr] = mocked
started = mocked.start()
started.return_value = return_value
self._patches_start[attr] = started
setattr(self, attr, started)
def test_registered_hooks(self):
# test that the hooks actually registered the relation expressions that
# are meaningful for this interface: this is to handle regressions.
# The keys are the function names that the hook attaches to.
all_interfaces = (
'dns-backend.available',
'shared-db.available',
'identity-service.available',
'amqp.available')
when_patterns = {
'setup_amqp_req': [('amqp.connected', )],
'setup_database': [('shared-db.connected', )],
'setup_endpoint': [('identity-service.connected', )],
'configure_ssl': [('identity-service.available', )],
'update_peers': [('cluster.available', )],
'config_changed': [('config.changed', )],
'cluster_connected': [('ha.connected', )],
'create_servers_and_domains': [
all_interfaces,
('base-config.rendered', ),
('db.synched', ),
],
'configure_designate_full': [
all_interfaces,
('db.synched', ),
],
'run_db_migration': [
all_interfaces,
('base-config.rendered', ),
],
'configure_designate_basic': [
all_interfaces,
],
}
when_not_patterns = {
'install_packages': [('installed', )],
'run_db_migration': [('db.synched', )],
'configure_designate_basic': [('base-config.rendered', )],
'create_servers_and_domains': [('domains.created', )],
}
# check the when hooks are attached to the expected functions
for t, p in [(_when_args, when_patterns),
(_when_not_args, when_not_patterns)]:
for f, args in t.items():
# check that function is in patterns
print(f)
self.assertTrue(f in p.keys())
# check that the lists are equal
l = [a['args'] for a in args]
self.assertEqual(l, p[f])
def test_install_packages(self):
self.patch(handlers.designate, 'install')
self.patch(handlers.reactive, 'set_state')
handlers.install_packages()
self.install.assert_called_once_with()
self.set_state.assert_called_once_with('installed')
def test_setup_amqp_req(self):
self.patch(handlers.designate, 'assess_status')
amqp = mock.MagicMock()
handlers.setup_amqp_req(amqp)
amqp.request_access.assert_called_once_with(
username='designate', vhost='openstack')
self.assess_status.assert_called_once_with()
def test_database(self):
self.patch(handlers.designate, 'assess_status')
database = mock.MagicMock()
self.patch(handlers.hookenv, 'unit_private_ip', 'private_ip')
handlers.setup_database(database)
calls = [
mock.call(
'designate',
'designate',
'private_ip',
prefix='designate'),
mock.call(
'dpm',
'dpm',
'private_ip',
prefix='dpm'),
]
database.configure.has_calls(calls)
self.assess_status.assert_called_once_with()
def test_setup_endpoint(self):
self.patch(handlers.designate, 'assess_status')
self.patch(handlers.designate, 'register_endpoints')
handlers.setup_endpoint('endpoint_object')
self.register_endpoints.assert_called_once_with('endpoint_object')
self.assess_status.assert_called_once_with()
def test_configure_designate_basic(self):
self.patch(handlers.reactive, 'set_state')
self.patch(handlers.designate, 'render_base_config')
self.patch(handlers.reactive.RelationBase, 'from_state')
handlers.configure_designate_basic('arg1', 'arg2')
self.render_base_config.assert_called_once_with(('arg1', 'arg2', ))
self.set_state.assert_called_once_with('base-config.rendered')
def test_run_db_migration(self):
self.patch(handlers.reactive, 'set_state')
self.patch(handlers.designate, 'db_sync')
self.patch(handlers.designate, 'db_sync_done')
self.db_sync_done.return_value = False
handlers.run_db_migration('arg1', 'arg2')
self.db_sync.assert_called_once_with()
self.assertFalse(self.set_state.called)
self.db_sync.reset_mock()
self.db_sync_done.return_value = True
handlers.run_db_migration('arg1', 'arg2')
self.db_sync.assert_called_once_with()
self.set_state.assert_called_once_with('db.synched')
def test_update_peers(self):
cluster = mock.MagicMock()
self.patch(handlers.designate, 'update_peers')
handlers.update_peers(cluster)
self.update_peers.assert_called_once_with(cluster)
def test_configure_designate_full(self):
self.patch(handlers.reactive.RelationBase, 'from_state',
return_value=None)
self.patch(handlers.designate, 'configure_ssl')
self.patch(handlers.designate, 'render_full_config')
self.patch(handlers.designate, 'create_initial_servers_and_domains')
self.patch(handlers.designate, 'render_sink_configs')
self.patch(handlers.designate, 'render_rndc_keys')
self.patch(handlers.designate, 'update_pools')
handlers.configure_designate_full('arg1', 'arg2')
self.configure_ssl.assert_called_once_with()
self.render_full_config.assert_called_once_with(('arg1', 'arg2', ))
self.create_initial_servers_and_domains.assert_called_once_with()
self.render_sink_configs.assert_called_once_with(('arg1', 'arg2', ))
self.render_rndc_keys.assert_called_once_with()
self.update_pools.assert_called_once_with()
def test_cluster_connected(self):
hacluster = mock.MagicMock()
self.patch(handlers.designate, 'configure_ha_resources')
self.patch(handlers.designate, 'assess_status')
handlers.cluster_connected(hacluster)
self.configure_ha_resources.assert_called_once_with(hacluster)
self.assess_status.assert_called_once_with()
def test_config_changed(self):
self.patch(handlers.designate, 'assess_status')
handlers.config_changed()
self.assess_status.assert_called_once_with()