From 035c0ea8abdb4051ac1b60c9310ea357241ac638 Mon Sep 17 00:00:00 2001 From: tengqm Date: Wed, 10 Jun 2015 03:51:31 -0400 Subject: [PATCH] Split engine service test cases (6) This patch separates out the test cases related to thread group manager and the service engine itself. Change-Id: I8b2f28f3796d611d72c42ae57e6f19f8bec71d42 --- heat/tests/engine/test_service_engine.py | 370 ++++++++++++++++++ heat/tests/engine/test_threadgroup_mgr.py | 131 +++++++ heat/tests/test_engine_service.py | 455 ---------------------- 3 files changed, 501 insertions(+), 455 deletions(-) create mode 100644 heat/tests/engine/test_service_engine.py create mode 100644 heat/tests/engine/test_threadgroup_mgr.py diff --git a/heat/tests/engine/test_service_engine.py b/heat/tests/engine/test_service_engine.py new file mode 100644 index 0000000000..5f6225963b --- /dev/null +++ b/heat/tests/engine/test_service_engine.py @@ -0,0 +1,370 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime + +import mock +from oslo_config import cfg + +from heat.common import context +from heat.common import service_utils +from heat.engine import service +from heat.engine import worker +from heat.objects import service as service_objects +from heat.openstack.common import threadgroup +from heat.rpc import worker_api +from heat.tests import common +from heat.tests.engine import tools +from heat.tests import utils + + +class ServiceEngineTest(common.HeatTestCase): + + def setUp(self): + super(ServiceEngineTest, self).setUp() + + self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant') + self.eng = service.EngineService('a-host', 'a-topic') + self.eng.engine_id = 'engine-fake-uuid' + + def test_make_sure_rpc_version(self): + self.assertEqual( + '1.9', + service.EngineService.RPC_API_VERSION, + ('RPC version is changed, please update this test to new version ' + 'and make sure additional test cases are added for RPC APIs ' + 'added in new version')) + + @mock.patch.object(service_objects.Service, 'get_all') + @mock.patch.object(service_utils, 'format_service') + def test_service_get_all(self, mock_format_service, mock_get_all): + mock_get_all.return_value = [mock.Mock()] + mock_format_service.return_value = mock.Mock() + self.assertEqual(1, len(self.eng.list_services(self.ctx))) + self.assertTrue(mock_get_all.called) + mock_format_service.assert_called_once_with(mock.ANY) + + @mock.patch.object(service_objects.Service, 'update_by_id') + @mock.patch.object(service_objects.Service, 'create') + @mock.patch.object(context, 'get_admin_context') + def test_service_manage_report_start(self, + mock_admin_context, + mock_service_create, + mock_service_update): + self.eng.service_id = None + mock_admin_context.return_value = self.ctx + srv = dict(id='mock_id') + mock_service_create.return_value = srv + self.eng.service_manage_report() + mock_admin_context.assert_called_once_with() + mock_service_create.assert_called_once_with( + self.ctx, + dict(host=self.eng.host, + hostname=self.eng.hostname, + binary=self.eng.binary, + engine_id=self.eng.engine_id, + topic=self.eng.topic, + report_interval=cfg.CONF.periodic_interval)) + self.assertEqual(srv['id'], self.eng.service_id) + mock_service_update.assert_called_once_with( + self.ctx, + self.eng.service_id, + dict(deleted_at=None)) + + @mock.patch.object(service_objects.Service, 'get_all_by_args') + @mock.patch.object(service_objects.Service, 'delete') + @mock.patch.object(context, 'get_admin_context') + def test_service_manage_report_cleanup(self, + mock_admin_context, + mock_service_delete, + mock_get_all): + mock_admin_context.return_value = self.ctx + ages_a_go = datetime.datetime.utcnow() - datetime.timedelta( + seconds=4000) + mock_get_all.return_value = [{'id': 'foo', + 'deleted_at': None, + 'updated_at': ages_a_go}] + self.eng.service_manage_cleanup() + mock_admin_context.assert_called_once_with() + mock_get_all.assert_called_once_with(self.ctx, + self.eng.host, + self.eng.binary, + self.eng.hostname) + mock_service_delete.assert_called_once_with( + self.ctx, 'foo') + + @mock.patch.object(service_objects.Service, 'update_by_id') + @mock.patch.object(context, 'get_admin_context') + def test_service_manage_report_update(self, mock_admin_context, + mock_service_update): + self.eng.service_id = 'mock_id' + mock_admin_context.return_value = self.ctx + self.eng.service_manage_report() + mock_admin_context.assert_called_once_with() + mock_service_update.assert_called_once_with( + self.ctx, + 'mock_id', + dict(deleted_at=None)) + + def test_stop_rpc_server(self): + with mock.patch.object(self.eng, + '_rpc_server') as mock_rpc_server: + self.eng._stop_rpc_server() + mock_rpc_server.stop.assert_called_once_with() + mock_rpc_server.wait.assert_called_once_with() + + def _test_engine_service_start( + self, + thread_group_class, + worker_service_class, + engine_listener_class, + thread_group_manager_class, + sample_uuid_method, + rpc_client_class, + target_class, + rpc_server_method): + self.patchobject(self.eng, 'service_manage_cleanup') + self.patchobject(self.eng, 'reset_stack_status') + self.eng.start() + + # engine id + sample_uuid_method.assert_called_once_with() + sampe_uuid = sample_uuid_method.return_value + self.assertEqual(sampe_uuid, + self.eng.engine_id, + 'Failed to generated engine_id') + + # Thread group manager + thread_group_manager_class.assert_called_once_with() + thread_group_manager = thread_group_manager_class.return_value + self.assertEqual(thread_group_manager, + self.eng.thread_group_mgr, + 'Failed to create Thread Group Manager') + + # Engine Listener + engine_listener_class.assert_called_once_with( + self.eng.host, + self.eng.engine_id, + self.eng.thread_group_mgr + ) + engine_lister = engine_listener_class.return_value + engine_lister.start.assert_called_once_with() + + # Worker Service + if cfg.CONF.convergence_engine: + worker_service_class.assert_called_once_with( + host=self.eng.host, + topic=worker_api.TOPIC, + engine_id=self.eng.engine_id, + thread_group_mgr=self.eng.thread_group_mgr + ) + worker_service = worker_service_class.return_value + worker_service.start.assert_called_once_with() + + # RPC Target + target_class.assert_called_once_with( + version=service.EngineService.RPC_API_VERSION, + server=self.eng.host, + topic=self.eng.topic) + + # RPC server + target = target_class.return_value + rpc_server_method.assert_called_once_with(target, + self.eng) + rpc_server = rpc_server_method.return_value + self.assertEqual(rpc_server, + self.eng._rpc_server, + "Failed to create RPC server") + + rpc_server.start.assert_called_once_with() + + # RPC client + rpc_client = rpc_client_class.return_value + rpc_client_class.assert_called_once_with( + version=service.EngineService.RPC_API_VERSION) + self.assertEqual(rpc_client, + self.eng._client, + "Failed to create RPC client") + + # Manage Thread group + thread_group_class.assert_called_once_with() + manage_thread_group = thread_group_class.return_value + manage_thread_group.add_timer.assert_called_once_with( + cfg.CONF.periodic_interval, + self.eng.service_manage_report + ) + + @mock.patch('heat.common.messaging.get_rpc_server', + return_value=mock.Mock()) + @mock.patch('oslo_messaging.Target', + return_value=mock.Mock()) + @mock.patch('heat.common.messaging.get_rpc_client', + return_value=mock.Mock()) + @mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id', + return_value='sample-uuid') + @mock.patch('heat.engine.service.ThreadGroupManager', + return_value=mock.Mock()) + @mock.patch('heat.engine.service.EngineListener', + return_value=mock.Mock()) + @mock.patch('heat.openstack.common.threadgroup.ThreadGroup', + return_value=mock.Mock()) + def test_engine_service_start_in_non_convergence_mode( + self, + thread_group_class, + engine_listener_class, + thread_group_manager_class, + sample_uuid_method, + rpc_client_class, + target_class, + rpc_server_method): + cfg.CONF.set_default('convergence_engine', False) + self._test_engine_service_start( + thread_group_class, + None, + engine_listener_class, + thread_group_manager_class, + sample_uuid_method, + rpc_client_class, + target_class, + rpc_server_method + ) + + @mock.patch('heat.common.messaging.get_rpc_server', + return_value=mock.Mock()) + @mock.patch('oslo_messaging.Target', + return_value=mock.Mock()) + @mock.patch('heat.common.messaging.get_rpc_client', + return_value=mock.Mock()) + @mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id', + return_value=mock.Mock()) + @mock.patch('heat.engine.service.ThreadGroupManager', + return_value=mock.Mock()) + @mock.patch('heat.engine.service.EngineListener', + return_value=mock.Mock()) + @mock.patch('heat.engine.worker.WorkerService', + return_value=mock.Mock()) + @mock.patch('heat.openstack.common.threadgroup.ThreadGroup', + return_value=mock.Mock()) + def test_engine_service_start_in_convergence_mode( + self, + thread_group_class, + worker_service_class, + engine_listener_class, + thread_group_manager_class, + sample_uuid_method, + rpc_client_class, + target_class, + rpc_server_method): + cfg.CONF.set_default('convergence_engine', True) + self._test_engine_service_start( + thread_group_class, + worker_service_class, + engine_listener_class, + thread_group_manager_class, + sample_uuid_method, + rpc_client_class, + target_class, + rpc_server_method + ) + + def _test_engine_service_stop( + self, + service_delete_method, + admin_context_method): + cfg.CONF.set_default('periodic_interval', 60) + self.patchobject(self.eng, 'service_manage_cleanup') + self.patchobject(self.eng, 'reset_stack_status') + + self.eng.start() + # Add dummy thread group to test thread_group_mgr.stop() is executed? + dtg1 = tools.DummyThreadGroup() + dtg2 = tools.DummyThreadGroup() + self.eng.thread_group_mgr.groups['sample-uuid1'] = dtg1 + self.eng.thread_group_mgr.groups['sample-uuid2'] = dtg2 + self.eng.service_id = 'sample-service-uuid' + + orig_stop = self.eng.thread_group_mgr.stop + + with mock.patch.object(self.eng.thread_group_mgr, 'stop') as stop: + stop.side_effect = orig_stop + + self.eng.stop() + + # RPC server + self.eng._stop_rpc_server.assert_called_once_with() + + if cfg.CONF.convergence_engine: + # WorkerService + self.eng.worker_service.stop.assert_called_once_with() + + # Wait for all active threads to be finished + calls = [mock.call('sample-uuid1', True), + mock.call('sample-uuid2', True)] + self.eng.thread_group_mgr.stop.assert_has_calls(calls, True) + + # # Manage Thread group + self.eng.manage_thread_grp.stop.assert_called_with(False) + + # Service delete + admin_context_method.assert_called_once_with() + ctxt = admin_context_method.return_value + service_delete_method.assert_called_once_with( + ctxt, + self.eng.service_id + ) + + @mock.patch.object(service.EngineService, + '_stop_rpc_server') + @mock.patch.object(worker.WorkerService, + 'stop') + @mock.patch.object(threadgroup.ThreadGroup, + 'stop') + @mock.patch('heat.common.context.get_admin_context', + return_value=mock.Mock()) + @mock.patch('heat.objects.service.Service.delete', + return_value=mock.Mock()) + def test_engine_service_stop_in_convergence_mode( + self, + service_delete_method, + admin_context_method, + thread_group_stop, + worker_service_stop, + rpc_server_stop): + cfg.CONF.set_default('convergence_engine', True) + self._test_engine_service_stop( + service_delete_method, + admin_context_method + ) + + @mock.patch.object(service.EngineService, '_stop_rpc_server') + @mock.patch.object(threadgroup.ThreadGroup, 'stop') + @mock.patch('heat.common.context.get_admin_context', + return_value=mock.Mock()) + @mock.patch('heat.objects.service.Service.delete', + return_value=mock.Mock()) + def test_engine_service_stop_in_non_convergence_mode( + self, + service_delete_method, + admin_context_method, + thread_group_stop, + rpc_server_stop): + cfg.CONF.set_default('convergence_engine', False) + self._test_engine_service_stop( + service_delete_method, + admin_context_method + ) + + @mock.patch('oslo_log.log.setup') + def test_engine_service_reset(self, setup_logging_mock): + self.eng.reset() + setup_logging_mock.assertCalledOnceWith(cfg.CONF, 'heat') diff --git a/heat/tests/engine/test_threadgroup_mgr.py b/heat/tests/engine/test_threadgroup_mgr.py new file mode 100644 index 0000000000..6a1621a04e --- /dev/null +++ b/heat/tests/engine/test_threadgroup_mgr.py @@ -0,0 +1,131 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import mock + +from heat.engine import service +from heat.tests import common + + +class ThreadGroupManagerTest(common.HeatTestCase): + + def setUp(self): + super(ThreadGroupManagerTest, self).setUp() + self.f = 'function' + self.fargs = ('spam', 'ham', 'eggs') + self.fkwargs = {'foo': 'bar'} + self.cnxt = 'ctxt' + self.engine_id = 'engine_id' + self.stack = mock.Mock() + self.lock_mock = mock.Mock() + self.stlock_mock = self.patch('heat.engine.service.stack_lock') + self.stlock_mock.StackLock.return_value = self.lock_mock + self.tg_mock = mock.Mock() + self.thg_mock = self.patch('heat.engine.service.threadgroup') + self.thg_mock.ThreadGroup.return_value = self.tg_mock + self.cfg_mock = self.patch('heat.engine.service.cfg') + + def test_tgm_start_with_lock(self): + thm = service.ThreadGroupManager() + with self.patchobject(thm, 'start_with_acquired_lock'): + mock_thread_lock = mock.Mock() + mock_thread_lock.__enter__ = mock.Mock(return_value=None) + mock_thread_lock.__exit__ = mock.Mock(return_value=None) + self.lock_mock.thread_lock.return_value = mock_thread_lock + thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f, + *self.fargs, **self.fkwargs) + self.stlock_mock.StackLock.assert_called_with(self.cnxt, + self.stack.id, + self.engine_id) + + thm.start_with_acquired_lock.assert_called_once_with( + self.stack, self.lock_mock, + self.f, *self.fargs, **self.fkwargs) + + def test_tgm_start(self): + stack_id = 'test' + + thm = service.ThreadGroupManager() + ret = thm.start(stack_id, self.f, *self.fargs, **self.fkwargs) + + self.assertEqual(self.tg_mock, thm.groups['test']) + self.tg_mock.add_thread.assert_called_with( + thm._start_with_trace, None, + self.f, *self.fargs, **self.fkwargs) + self.assertEqual(ret, self.tg_mock.add_thread()) + + def test_tgm_add_timer(self): + stack_id = 'test' + + thm = service.ThreadGroupManager() + thm.add_timer(stack_id, self.f, *self.fargs, **self.fkwargs) + + self.assertEqual(self.tg_mock, thm.groups[stack_id]) + self.tg_mock.add_timer.assert_called_with( + self.cfg_mock.CONF.periodic_interval, + self.f, *self.fargs, **self.fkwargs) + + def test_tgm_add_event(self): + stack_id = 'add_events_test' + e1, e2 = mock.Mock(), mock.Mock() + thm = service.ThreadGroupManager() + thm.add_event(stack_id, e1) + thm.add_event(stack_id, e2) + self.assertEqual([e1, e2], thm.events[stack_id]) + + def test_tgm_remove_event(self): + stack_id = 'add_events_test' + e1, e2 = mock.Mock(), mock.Mock() + thm = service.ThreadGroupManager() + thm.add_event(stack_id, e1) + thm.add_event(stack_id, e2) + thm.remove_event(None, stack_id, e2) + self.assertEqual([e1], thm.events[stack_id]) + thm.remove_event(None, stack_id, e1) + self.assertNotIn(stack_id, thm.events) + + def test_tgm_send(self): + stack_id = 'send_test' + e1, e2 = mock.MagicMock(), mock.Mock() + thm = service.ThreadGroupManager() + thm.add_event(stack_id, e1) + thm.add_event(stack_id, e2) + thm.send(stack_id, 'test_message') + + +class ThreadGroupManagerStopTest(common.HeatTestCase): + + def test_tgm_stop(self): + stack_id = 'test' + done = [] + + def function(): + while True: + eventlet.sleep() + + def linked(gt, thread): + for i in range(10): + eventlet.sleep() + done.append(thread) + + thm = service.ThreadGroupManager() + thm.add_event(stack_id, mock.Mock()) + thread = thm.start(stack_id, function) + thread.link(linked, thread) + + thm.stop(stack_id) + + self.assertIn(thread, done) + self.assertNotIn(stack_id, thm.groups) + self.assertNotIn(stack_id, thm.events) diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 3d399f8652..7b75a1b713 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -11,10 +11,8 @@ # License for the specific language governing permissions and limitations # under the License. -import datetime import uuid -import eventlet from eventlet import event as grevent import mock import mox @@ -27,7 +25,6 @@ from heat.common import context from heat.common import exception from heat.common import identifier from heat.common import messaging -from heat.common import service_utils from heat.common import template_format from heat.engine import dependencies from heat.engine import environment @@ -39,17 +36,14 @@ from heat.engine import stack as parser from heat.engine import stack_lock from heat.engine import template as templatem from heat.engine import watchrule -from heat.engine import worker from heat.objects import event as event_object from heat.objects import resource as resource_objects -from heat.objects import service as service_objects from heat.objects import stack as stack_object from heat.objects import sync_point as sync_point_object from heat.objects import watch_data as watch_data_object from heat.objects import watch_rule as watch_rule_object from heat.openstack.common import threadgroup from heat.rpc import api as rpc_api -from heat.rpc import worker_api from heat.rpc import worker_client from heat.tests import common from heat.tests.engine import tools @@ -1357,14 +1351,6 @@ class StackServiceTest(common.HeatTestCase): res._register_class('ResourceWithPropsType', generic_rsrc.ResourceWithProps) - def test_make_sure_rpc_version(self): - self.assertEqual( - '1.9', - service.EngineService.RPC_API_VERSION, - ('RPC version is changed, please update this test to new version ' - 'and make sure additional test cases are added for RPC APIs ' - 'added in new version')) - @mock.patch.object(service_stack_watch.StackWatch, 'start_watch_task') @mock.patch.object(stack_object.Stack, 'get_all') @mock.patch.object(service.service.Service, 'start') @@ -3026,167 +3012,6 @@ class StackServiceTest(common.HeatTestCase): self.eng._validate_new_stack, self.ctx, 'test_existing_stack', parsed_template) - @mock.patch.object(service_objects.Service, 'get_all') - @mock.patch.object(service_utils, 'format_service') - def test_service_get_all(self, mock_format_service, mock_get_all): - mock_get_all.return_value = [mock.Mock()] - mock_format_service.return_value = mock.Mock() - self.assertEqual(1, len(self.eng.list_services(self.ctx))) - self.assertTrue(mock_get_all.called) - mock_format_service.assert_called_once_with(mock.ANY) - - @mock.patch.object(service_objects.Service, 'update_by_id') - @mock.patch.object(service_objects.Service, 'create') - @mock.patch.object(context, 'get_admin_context') - def test_service_manage_report_start(self, - mock_admin_context, - mock_service_create, - mock_service_update): - self.eng.service_id = None - mock_admin_context.return_value = self.ctx - srv = dict(id='mock_id') - mock_service_create.return_value = srv - self.eng.service_manage_report() - mock_admin_context.assert_called_once_with() - mock_service_create.assert_called_once_with( - self.ctx, - dict(host=self.eng.host, - hostname=self.eng.hostname, - binary=self.eng.binary, - engine_id=self.eng.engine_id, - topic=self.eng.topic, - report_interval=cfg.CONF.periodic_interval)) - self.assertEqual(self.eng.service_id, srv['id']) - mock_service_update.assert_called_once_with( - self.ctx, - self.eng.service_id, - dict(deleted_at=None)) - - @mock.patch.object(service_objects.Service, 'get_all_by_args') - @mock.patch.object(service_objects.Service, 'delete') - @mock.patch.object(context, 'get_admin_context') - def test_service_manage_report_cleanup(self, - mock_admin_context, - mock_service_delete, - mock_get_all): - mock_admin_context.return_value = self.ctx - ages_a_go = datetime.datetime.utcnow() - datetime.timedelta( - seconds=4000) - mock_get_all.return_value = [{'id': 'foo', - 'deleted_at': None, - 'updated_at': ages_a_go}] - self.eng.service_manage_cleanup() - mock_admin_context.assert_called_once_with() - mock_get_all.assert_called_once_with(self.ctx, - self.eng.host, - self.eng.binary, - self.eng.hostname) - mock_service_delete.assert_called_once_with( - self.ctx, 'foo') - - @mock.patch.object(service_objects.Service, 'update_by_id') - @mock.patch.object(context, 'get_admin_context') - def test_service_manage_report_update( - self, - mock_admin_context, - mock_service_update): - self.eng.service_id = 'mock_id' - mock_admin_context.return_value = self.ctx - self.eng.service_manage_report() - mock_admin_context.assert_called_once_with() - mock_service_update.assert_called_once_with( - self.ctx, - 'mock_id', - dict(deleted_at=None)) - - def test_stop_rpc_server(self): - with mock.patch.object(self.eng, - '_rpc_server') as mock_rpc_server: - self.eng._stop_rpc_server() - mock_rpc_server.stop.assert_called_once_with() - mock_rpc_server.wait.assert_called_once_with() - - def _test_engine_service_start( - self, - thread_group_class, - worker_service_class, - engine_listener_class, - thread_group_manager_class, - sample_uuid_method, - rpc_client_class, - target_class, - rpc_server_method): - self.patchobject(self.eng, 'service_manage_cleanup') - self.patchobject(self.eng, 'reset_stack_status') - self.eng.start() - - # engine id - sample_uuid_method.assert_called_once_with() - sampe_uuid = sample_uuid_method.return_value - self.assertEqual(sampe_uuid, - self.eng.engine_id, - 'Failed to generated engine_id') - - # Thread group manager - thread_group_manager_class.assert_called_once_with() - thread_group_manager = thread_group_manager_class.return_value - self.assertEqual(thread_group_manager, - self.eng.thread_group_mgr, - 'Failed to create Thread Group Manager') - - # Engine Listener - engine_listener_class.assert_called_once_with( - self.eng.host, - self.eng.engine_id, - self.eng.thread_group_mgr - ) - engine_lister = engine_listener_class.return_value - engine_lister.start.assert_called_once_with() - - # Worker Service - if cfg.CONF.convergence_engine: - worker_service_class.assert_called_once_with( - host=self.eng.host, - topic=worker_api.TOPIC, - engine_id=self.eng.engine_id, - thread_group_mgr=self.eng.thread_group_mgr - ) - worker_service = worker_service_class.return_value - worker_service.start.assert_called_once_with() - - # RPC Target - target_class.assert_called_once_with( - version=service.EngineService.RPC_API_VERSION, - server=self.eng.host, - topic=self.eng.topic) - - # RPC server - target = target_class.return_value - rpc_server_method.assert_called_once_with(target, - self.eng) - rpc_server = rpc_server_method.return_value - self.assertEqual(rpc_server, - self.eng._rpc_server, - "Failed to create RPC server") - - rpc_server.start.assert_called_once_with() - - # RPC client - rpc_client = rpc_client_class.return_value - rpc_client_class.assert_called_once_with( - version=service.EngineService.RPC_API_VERSION) - self.assertEqual(rpc_client, - self.eng._client, - "Failed to create RPC client") - - # Manage Thread group - thread_group_class.assert_called_once_with() - manage_thread_group = thread_group_class.return_value - manage_thread_group.add_timer.assert_called_once_with( - cfg.CONF.periodic_interval, - self.eng.service_manage_report - ) - @mock.patch('heat.engine.service.ThreadGroupManager', return_value=mock.Mock()) @mock.patch.object(stack_object.Stack, 'get_all') @@ -3237,283 +3062,3 @@ class StackServiceTest(common.HeatTestCase): fake_stack, fake_stack.state_set, fake_stack.action, parser.Stack.FAILED, 'Engine went down during stack CREATE' ) - - @mock.patch('heat.common.messaging.get_rpc_server', - return_value=mock.Mock()) - @mock.patch('oslo_messaging.Target', - return_value=mock.Mock()) - @mock.patch('heat.common.messaging.get_rpc_client', - return_value=mock.Mock()) - @mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id', - return_value='sample-uuid') - @mock.patch('heat.engine.service.ThreadGroupManager', - return_value=mock.Mock()) - @mock.patch('heat.engine.service.EngineListener', - return_value=mock.Mock()) - @mock.patch('heat.openstack.common.threadgroup.ThreadGroup', - return_value=mock.Mock()) - def test_engine_service_start_in_non_convergence_mode( - self, - thread_group_class, - engine_listener_class, - thread_group_manager_class, - sample_uuid_method, - rpc_client_class, - target_class, - rpc_server_method): - cfg.CONF.set_default('convergence_engine', False) - self._test_engine_service_start( - thread_group_class, - None, - engine_listener_class, - thread_group_manager_class, - sample_uuid_method, - rpc_client_class, - target_class, - rpc_server_method - ) - - @mock.patch('heat.common.messaging.get_rpc_server', - return_value=mock.Mock()) - @mock.patch('oslo_messaging.Target', - return_value=mock.Mock()) - @mock.patch('heat.common.messaging.get_rpc_client', - return_value=mock.Mock()) - @mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id', - return_value=mock.Mock()) - @mock.patch('heat.engine.service.ThreadGroupManager', - return_value=mock.Mock()) - @mock.patch('heat.engine.service.EngineListener', - return_value=mock.Mock()) - @mock.patch('heat.engine.worker.WorkerService', - return_value=mock.Mock()) - @mock.patch('heat.openstack.common.threadgroup.ThreadGroup', - return_value=mock.Mock()) - def test_engine_service_start_in_convergence_mode( - self, - thread_group_class, - worker_service_class, - engine_listener_class, - thread_group_manager_class, - sample_uuid_method, - rpc_client_class, - target_class, - rpc_server_method): - cfg.CONF.set_default('convergence_engine', True) - self._test_engine_service_start( - thread_group_class, - worker_service_class, - engine_listener_class, - thread_group_manager_class, - sample_uuid_method, - rpc_client_class, - target_class, - rpc_server_method - ) - - def _test_engine_service_stop( - self, - service_delete_method, - admin_context_method): - cfg.CONF.set_default('periodic_interval', 60) - self.patchobject(self.eng, 'service_manage_cleanup') - self.patchobject(self.eng, 'reset_stack_status') - - self.eng.start() - # Add dummy thread group to test thread_group_mgr.stop() is executed? - dtg1 = tools.DummyThreadGroup() - dtg2 = tools.DummyThreadGroup() - self.eng.thread_group_mgr.groups['sample-uuid1'] = dtg1 - self.eng.thread_group_mgr.groups['sample-uuid2'] = dtg2 - self.eng.service_id = 'sample-service-uuid' - - orig_stop = self.eng.thread_group_mgr.stop - - with mock.patch.object(self.eng.thread_group_mgr, 'stop') as stop: - stop.side_effect = orig_stop - - self.eng.stop() - - # RPC server - self.eng._stop_rpc_server.assert_called_once_with() - - if cfg.CONF.convergence_engine: - # WorkerService - self.eng.worker_service.stop.assert_called_once_with() - - # Wait for all active threads to be finished - calls = [mock.call('sample-uuid1', True), - mock.call('sample-uuid2', True)] - self.eng.thread_group_mgr.stop.assert_has_calls( - calls, - True) - - # # Manage Thread group - self.eng.manage_thread_grp.stop.assert_called_with(False) - - # Service delete - admin_context_method.assert_called_once_with() - ctxt = admin_context_method.return_value - service_delete_method.assert_called_once_with( - ctxt, - self.eng.service_id - ) - - @mock.patch.object(service.EngineService, - '_stop_rpc_server') - @mock.patch.object(worker.WorkerService, - 'stop') - @mock.patch.object(threadgroup.ThreadGroup, - 'stop') - @mock.patch('heat.common.context.get_admin_context', - return_value=mock.Mock()) - @mock.patch('heat.objects.service.Service.delete', - return_value=mock.Mock()) - def test_engine_service_stop_in_convergence_mode( - self, - service_delete_method, - admin_context_method, - thread_group_stop, - worker_service_stop, - rpc_server_stop): - cfg.CONF.set_default('convergence_engine', True) - self._test_engine_service_stop( - service_delete_method, - admin_context_method - ) - - @mock.patch.object(service.EngineService, - '_stop_rpc_server') - @mock.patch.object(threadgroup.ThreadGroup, - 'stop') - @mock.patch('heat.common.context.get_admin_context', - return_value=mock.Mock()) - @mock.patch('heat.objects.service.Service.delete', - return_value=mock.Mock()) - def test_engine_service_stop_in_non_convergence_mode( - self, - service_delete_method, - admin_context_method, - thread_group_stop, - rpc_server_stop): - cfg.CONF.set_default('convergence_engine', False) - self._test_engine_service_stop( - service_delete_method, - admin_context_method - ) - - @mock.patch('oslo_log.log.setup') - def test_engine_service_reset(self, setup_logging_mock): - self.eng.reset() - setup_logging_mock.assertCalledOnceWith(cfg.CONF, 'heat') - - -class ThreadGroupManagerTest(common.HeatTestCase): - def setUp(self): - super(ThreadGroupManagerTest, self).setUp() - self.f = 'function' - self.fargs = ('spam', 'ham', 'eggs') - self.fkwargs = {'foo': 'bar'} - self.cnxt = 'ctxt' - self.engine_id = 'engine_id' - self.stack = mock.Mock() - self.lock_mock = mock.Mock() - self.stlock_mock = self.patch('heat.engine.service.stack_lock') - self.stlock_mock.StackLock.return_value = self.lock_mock - self.tg_mock = mock.Mock() - self.thg_mock = self.patch('heat.engine.service.threadgroup') - self.thg_mock.ThreadGroup.return_value = self.tg_mock - self.cfg_mock = self.patch('heat.engine.service.cfg') - - def test_tgm_start_with_lock(self): - thm = service.ThreadGroupManager() - with self.patchobject(thm, 'start_with_acquired_lock'): - mock_thread_lock = mock.Mock() - mock_thread_lock.__enter__ = mock.Mock(return_value=None) - mock_thread_lock.__exit__ = mock.Mock(return_value=None) - self.lock_mock.thread_lock.return_value = mock_thread_lock - thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f, - *self.fargs, **self.fkwargs) - self.stlock_mock.StackLock.assert_called_with(self.cnxt, - self.stack.id, - self.engine_id) - - thm.start_with_acquired_lock.assert_called_once_with( - self.stack, self.lock_mock, - self.f, *self.fargs, **self.fkwargs) - - def test_tgm_start(self): - stack_id = 'test' - - thm = service.ThreadGroupManager() - ret = thm.start(stack_id, self.f, *self.fargs, **self.fkwargs) - - self.assertEqual(self.tg_mock, thm.groups['test']) - self.tg_mock.add_thread.assert_called_with( - thm._start_with_trace, None, - self.f, *self.fargs, **self.fkwargs) - self.assertEqual(ret, self.tg_mock.add_thread()) - - def test_tgm_add_timer(self): - stack_id = 'test' - - thm = service.ThreadGroupManager() - thm.add_timer(stack_id, self.f, *self.fargs, **self.fkwargs) - - self.assertEqual(self.tg_mock, thm.groups[stack_id]) - self.tg_mock.add_timer.assert_called_with( - self.cfg_mock.CONF.periodic_interval, - self.f, *self.fargs, **self.fkwargs) - - def test_tgm_add_event(self): - stack_id = 'add_events_test' - e1, e2 = mock.Mock(), mock.Mock() - thm = service.ThreadGroupManager() - thm.add_event(stack_id, e1) - thm.add_event(stack_id, e2) - self.assertEqual([e1, e2], thm.events[stack_id]) - - def test_tgm_remove_event(self): - stack_id = 'add_events_test' - e1, e2 = mock.Mock(), mock.Mock() - thm = service.ThreadGroupManager() - thm.add_event(stack_id, e1) - thm.add_event(stack_id, e2) - thm.remove_event(None, stack_id, e2) - self.assertEqual([e1], thm.events[stack_id]) - thm.remove_event(None, stack_id, e1) - self.assertNotIn(stack_id, thm.events) - - def test_tgm_send(self): - stack_id = 'send_test' - e1, e2 = mock.MagicMock(), mock.Mock() - thm = service.ThreadGroupManager() - thm.add_event(stack_id, e1) - thm.add_event(stack_id, e2) - thm.send(stack_id, 'test_message') - - -class ThreadGroupManagerStopTest(common.HeatTestCase): - def test_tgm_stop(self): - stack_id = 'test' - done = [] - - def function(): - while True: - eventlet.sleep() - - def linked(gt, thread): - for i in range(10): - eventlet.sleep() - done.append(thread) - - thm = service.ThreadGroupManager() - thm.add_event(stack_id, mock.Mock()) - thread = thm.start(stack_id, function) - thread.link(linked, thread) - - thm.stop(stack_id) - - self.assertIn(thread, done) - self.assertNotIn(stack_id, thm.groups) - self.assertNotIn(stack_id, thm.events)