Split engine service test cases (6)

This patch separates out the test cases related to thread group manager
and the service engine itself.

Change-Id: I8b2f28f3796d611d72c42ae57e6f19f8bec71d42
This commit is contained in:
tengqm 2015-06-10 03:51:31 -04:00
parent e4bab2eee2
commit 035c0ea8ab
3 changed files with 501 additions and 455 deletions

View File

@ -0,0 +1,370 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
from oslo_config import cfg
from heat.common import context
from heat.common import service_utils
from heat.engine import service
from heat.engine import worker
from heat.objects import service as service_objects
from heat.openstack.common import threadgroup
from heat.rpc import worker_api
from heat.tests import common
from heat.tests.engine import tools
from heat.tests import utils
class ServiceEngineTest(common.HeatTestCase):
def setUp(self):
super(ServiceEngineTest, self).setUp()
self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant')
self.eng = service.EngineService('a-host', 'a-topic')
self.eng.engine_id = 'engine-fake-uuid'
def test_make_sure_rpc_version(self):
self.assertEqual(
'1.9',
service.EngineService.RPC_API_VERSION,
('RPC version is changed, please update this test to new version '
'and make sure additional test cases are added for RPC APIs '
'added in new version'))
@mock.patch.object(service_objects.Service, 'get_all')
@mock.patch.object(service_utils, 'format_service')
def test_service_get_all(self, mock_format_service, mock_get_all):
mock_get_all.return_value = [mock.Mock()]
mock_format_service.return_value = mock.Mock()
self.assertEqual(1, len(self.eng.list_services(self.ctx)))
self.assertTrue(mock_get_all.called)
mock_format_service.assert_called_once_with(mock.ANY)
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(service_objects.Service, 'create')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_start(self,
mock_admin_context,
mock_service_create,
mock_service_update):
self.eng.service_id = None
mock_admin_context.return_value = self.ctx
srv = dict(id='mock_id')
mock_service_create.return_value = srv
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_service_create.assert_called_once_with(
self.ctx,
dict(host=self.eng.host,
hostname=self.eng.hostname,
binary=self.eng.binary,
engine_id=self.eng.engine_id,
topic=self.eng.topic,
report_interval=cfg.CONF.periodic_interval))
self.assertEqual(srv['id'], self.eng.service_id)
mock_service_update.assert_called_once_with(
self.ctx,
self.eng.service_id,
dict(deleted_at=None))
@mock.patch.object(service_objects.Service, 'get_all_by_args')
@mock.patch.object(service_objects.Service, 'delete')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_cleanup(self,
mock_admin_context,
mock_service_delete,
mock_get_all):
mock_admin_context.return_value = self.ctx
ages_a_go = datetime.datetime.utcnow() - datetime.timedelta(
seconds=4000)
mock_get_all.return_value = [{'id': 'foo',
'deleted_at': None,
'updated_at': ages_a_go}]
self.eng.service_manage_cleanup()
mock_admin_context.assert_called_once_with()
mock_get_all.assert_called_once_with(self.ctx,
self.eng.host,
self.eng.binary,
self.eng.hostname)
mock_service_delete.assert_called_once_with(
self.ctx, 'foo')
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_update(self, mock_admin_context,
mock_service_update):
self.eng.service_id = 'mock_id'
mock_admin_context.return_value = self.ctx
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_service_update.assert_called_once_with(
self.ctx,
'mock_id',
dict(deleted_at=None))
def test_stop_rpc_server(self):
with mock.patch.object(self.eng,
'_rpc_server') as mock_rpc_server:
self.eng._stop_rpc_server()
mock_rpc_server.stop.assert_called_once_with()
mock_rpc_server.wait.assert_called_once_with()
def _test_engine_service_start(
self,
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# engine id
sample_uuid_method.assert_called_once_with()
sampe_uuid = sample_uuid_method.return_value
self.assertEqual(sampe_uuid,
self.eng.engine_id,
'Failed to generated engine_id')
# Thread group manager
thread_group_manager_class.assert_called_once_with()
thread_group_manager = thread_group_manager_class.return_value
self.assertEqual(thread_group_manager,
self.eng.thread_group_mgr,
'Failed to create Thread Group Manager')
# Engine Listener
engine_listener_class.assert_called_once_with(
self.eng.host,
self.eng.engine_id,
self.eng.thread_group_mgr
)
engine_lister = engine_listener_class.return_value
engine_lister.start.assert_called_once_with()
# Worker Service
if cfg.CONF.convergence_engine:
worker_service_class.assert_called_once_with(
host=self.eng.host,
topic=worker_api.TOPIC,
engine_id=self.eng.engine_id,
thread_group_mgr=self.eng.thread_group_mgr
)
worker_service = worker_service_class.return_value
worker_service.start.assert_called_once_with()
# RPC Target
target_class.assert_called_once_with(
version=service.EngineService.RPC_API_VERSION,
server=self.eng.host,
topic=self.eng.topic)
# RPC server
target = target_class.return_value
rpc_server_method.assert_called_once_with(target,
self.eng)
rpc_server = rpc_server_method.return_value
self.assertEqual(rpc_server,
self.eng._rpc_server,
"Failed to create RPC server")
rpc_server.start.assert_called_once_with()
# RPC client
rpc_client = rpc_client_class.return_value
rpc_client_class.assert_called_once_with(
version=service.EngineService.RPC_API_VERSION)
self.assertEqual(rpc_client,
self.eng._client,
"Failed to create RPC client")
# Manage Thread group
thread_group_class.assert_called_once_with()
manage_thread_group = thread_group_class.return_value
manage_thread_group.add_timer.assert_called_once_with(
cfg.CONF.periodic_interval,
self.eng.service_manage_report
)
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
return_value=mock.Mock())
@mock.patch('heat.common.messaging.get_rpc_client',
return_value=mock.Mock())
@mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id',
return_value='sample-uuid')
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch('heat.engine.service.EngineListener',
return_value=mock.Mock())
@mock.patch('heat.openstack.common.threadgroup.ThreadGroup',
return_value=mock.Mock())
def test_engine_service_start_in_non_convergence_mode(
self,
thread_group_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
cfg.CONF.set_default('convergence_engine', False)
self._test_engine_service_start(
thread_group_class,
None,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method
)
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
return_value=mock.Mock())
@mock.patch('heat.common.messaging.get_rpc_client',
return_value=mock.Mock())
@mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id',
return_value=mock.Mock())
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch('heat.engine.service.EngineListener',
return_value=mock.Mock())
@mock.patch('heat.engine.worker.WorkerService',
return_value=mock.Mock())
@mock.patch('heat.openstack.common.threadgroup.ThreadGroup',
return_value=mock.Mock())
def test_engine_service_start_in_convergence_mode(
self,
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
cfg.CONF.set_default('convergence_engine', True)
self._test_engine_service_start(
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method
)
def _test_engine_service_stop(
self,
service_delete_method,
admin_context_method):
cfg.CONF.set_default('periodic_interval', 60)
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# Add dummy thread group to test thread_group_mgr.stop() is executed?
dtg1 = tools.DummyThreadGroup()
dtg2 = tools.DummyThreadGroup()
self.eng.thread_group_mgr.groups['sample-uuid1'] = dtg1
self.eng.thread_group_mgr.groups['sample-uuid2'] = dtg2
self.eng.service_id = 'sample-service-uuid'
orig_stop = self.eng.thread_group_mgr.stop
with mock.patch.object(self.eng.thread_group_mgr, 'stop') as stop:
stop.side_effect = orig_stop
self.eng.stop()
# RPC server
self.eng._stop_rpc_server.assert_called_once_with()
if cfg.CONF.convergence_engine:
# WorkerService
self.eng.worker_service.stop.assert_called_once_with()
# Wait for all active threads to be finished
calls = [mock.call('sample-uuid1', True),
mock.call('sample-uuid2', True)]
self.eng.thread_group_mgr.stop.assert_has_calls(calls, True)
# # Manage Thread group
self.eng.manage_thread_grp.stop.assert_called_with(False)
# Service delete
admin_context_method.assert_called_once_with()
ctxt = admin_context_method.return_value
service_delete_method.assert_called_once_with(
ctxt,
self.eng.service_id
)
@mock.patch.object(service.EngineService,
'_stop_rpc_server')
@mock.patch.object(worker.WorkerService,
'stop')
@mock.patch.object(threadgroup.ThreadGroup,
'stop')
@mock.patch('heat.common.context.get_admin_context',
return_value=mock.Mock())
@mock.patch('heat.objects.service.Service.delete',
return_value=mock.Mock())
def test_engine_service_stop_in_convergence_mode(
self,
service_delete_method,
admin_context_method,
thread_group_stop,
worker_service_stop,
rpc_server_stop):
cfg.CONF.set_default('convergence_engine', True)
self._test_engine_service_stop(
service_delete_method,
admin_context_method
)
@mock.patch.object(service.EngineService, '_stop_rpc_server')
@mock.patch.object(threadgroup.ThreadGroup, 'stop')
@mock.patch('heat.common.context.get_admin_context',
return_value=mock.Mock())
@mock.patch('heat.objects.service.Service.delete',
return_value=mock.Mock())
def test_engine_service_stop_in_non_convergence_mode(
self,
service_delete_method,
admin_context_method,
thread_group_stop,
rpc_server_stop):
cfg.CONF.set_default('convergence_engine', False)
self._test_engine_service_stop(
service_delete_method,
admin_context_method
)
@mock.patch('oslo_log.log.setup')
def test_engine_service_reset(self, setup_logging_mock):
self.eng.reset()
setup_logging_mock.assertCalledOnceWith(cfg.CONF, 'heat')

View File

@ -0,0 +1,131 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import mock
from heat.engine import service
from heat.tests import common
class ThreadGroupManagerTest(common.HeatTestCase):
def setUp(self):
super(ThreadGroupManagerTest, self).setUp()
self.f = 'function'
self.fargs = ('spam', 'ham', 'eggs')
self.fkwargs = {'foo': 'bar'}
self.cnxt = 'ctxt'
self.engine_id = 'engine_id'
self.stack = mock.Mock()
self.lock_mock = mock.Mock()
self.stlock_mock = self.patch('heat.engine.service.stack_lock')
self.stlock_mock.StackLock.return_value = self.lock_mock
self.tg_mock = mock.Mock()
self.thg_mock = self.patch('heat.engine.service.threadgroup')
self.thg_mock.ThreadGroup.return_value = self.tg_mock
self.cfg_mock = self.patch('heat.engine.service.cfg')
def test_tgm_start_with_lock(self):
thm = service.ThreadGroupManager()
with self.patchobject(thm, 'start_with_acquired_lock'):
mock_thread_lock = mock.Mock()
mock_thread_lock.__enter__ = mock.Mock(return_value=None)
mock_thread_lock.__exit__ = mock.Mock(return_value=None)
self.lock_mock.thread_lock.return_value = mock_thread_lock
thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f,
*self.fargs, **self.fkwargs)
self.stlock_mock.StackLock.assert_called_with(self.cnxt,
self.stack.id,
self.engine_id)
thm.start_with_acquired_lock.assert_called_once_with(
self.stack, self.lock_mock,
self.f, *self.fargs, **self.fkwargs)
def test_tgm_start(self):
stack_id = 'test'
thm = service.ThreadGroupManager()
ret = thm.start(stack_id, self.f, *self.fargs, **self.fkwargs)
self.assertEqual(self.tg_mock, thm.groups['test'])
self.tg_mock.add_thread.assert_called_with(
thm._start_with_trace, None,
self.f, *self.fargs, **self.fkwargs)
self.assertEqual(ret, self.tg_mock.add_thread())
def test_tgm_add_timer(self):
stack_id = 'test'
thm = service.ThreadGroupManager()
thm.add_timer(stack_id, self.f, *self.fargs, **self.fkwargs)
self.assertEqual(self.tg_mock, thm.groups[stack_id])
self.tg_mock.add_timer.assert_called_with(
self.cfg_mock.CONF.periodic_interval,
self.f, *self.fargs, **self.fkwargs)
def test_tgm_add_event(self):
stack_id = 'add_events_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
self.assertEqual([e1, e2], thm.events[stack_id])
def test_tgm_remove_event(self):
stack_id = 'add_events_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.remove_event(None, stack_id, e2)
self.assertEqual([e1], thm.events[stack_id])
thm.remove_event(None, stack_id, e1)
self.assertNotIn(stack_id, thm.events)
def test_tgm_send(self):
stack_id = 'send_test'
e1, e2 = mock.MagicMock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.send(stack_id, 'test_message')
class ThreadGroupManagerStopTest(common.HeatTestCase):
def test_tgm_stop(self):
stack_id = 'test'
done = []
def function():
while True:
eventlet.sleep()
def linked(gt, thread):
for i in range(10):
eventlet.sleep()
done.append(thread)
thm = service.ThreadGroupManager()
thm.add_event(stack_id, mock.Mock())
thread = thm.start(stack_id, function)
thread.link(linked, thread)
thm.stop(stack_id)
self.assertIn(thread, done)
self.assertNotIn(stack_id, thm.groups)
self.assertNotIn(stack_id, thm.events)

View File

@ -11,10 +11,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
import eventlet
from eventlet import event as grevent
import mock
import mox
@ -27,7 +25,6 @@ from heat.common import context
from heat.common import exception
from heat.common import identifier
from heat.common import messaging
from heat.common import service_utils
from heat.common import template_format
from heat.engine import dependencies
from heat.engine import environment
@ -39,17 +36,14 @@ from heat.engine import stack as parser
from heat.engine import stack_lock
from heat.engine import template as templatem
from heat.engine import watchrule
from heat.engine import worker
from heat.objects import event as event_object
from heat.objects import resource as resource_objects
from heat.objects import service as service_objects
from heat.objects import stack as stack_object
from heat.objects import sync_point as sync_point_object
from heat.objects import watch_data as watch_data_object
from heat.objects import watch_rule as watch_rule_object
from heat.openstack.common import threadgroup
from heat.rpc import api as rpc_api
from heat.rpc import worker_api
from heat.rpc import worker_client
from heat.tests import common
from heat.tests.engine import tools
@ -1357,14 +1351,6 @@ class StackServiceTest(common.HeatTestCase):
res._register_class('ResourceWithPropsType',
generic_rsrc.ResourceWithProps)
def test_make_sure_rpc_version(self):
self.assertEqual(
'1.9',
service.EngineService.RPC_API_VERSION,
('RPC version is changed, please update this test to new version '
'and make sure additional test cases are added for RPC APIs '
'added in new version'))
@mock.patch.object(service_stack_watch.StackWatch, 'start_watch_task')
@mock.patch.object(stack_object.Stack, 'get_all')
@mock.patch.object(service.service.Service, 'start')
@ -3026,167 +3012,6 @@ class StackServiceTest(common.HeatTestCase):
self.eng._validate_new_stack,
self.ctx, 'test_existing_stack', parsed_template)
@mock.patch.object(service_objects.Service, 'get_all')
@mock.patch.object(service_utils, 'format_service')
def test_service_get_all(self, mock_format_service, mock_get_all):
mock_get_all.return_value = [mock.Mock()]
mock_format_service.return_value = mock.Mock()
self.assertEqual(1, len(self.eng.list_services(self.ctx)))
self.assertTrue(mock_get_all.called)
mock_format_service.assert_called_once_with(mock.ANY)
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(service_objects.Service, 'create')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_start(self,
mock_admin_context,
mock_service_create,
mock_service_update):
self.eng.service_id = None
mock_admin_context.return_value = self.ctx
srv = dict(id='mock_id')
mock_service_create.return_value = srv
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_service_create.assert_called_once_with(
self.ctx,
dict(host=self.eng.host,
hostname=self.eng.hostname,
binary=self.eng.binary,
engine_id=self.eng.engine_id,
topic=self.eng.topic,
report_interval=cfg.CONF.periodic_interval))
self.assertEqual(self.eng.service_id, srv['id'])
mock_service_update.assert_called_once_with(
self.ctx,
self.eng.service_id,
dict(deleted_at=None))
@mock.patch.object(service_objects.Service, 'get_all_by_args')
@mock.patch.object(service_objects.Service, 'delete')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_cleanup(self,
mock_admin_context,
mock_service_delete,
mock_get_all):
mock_admin_context.return_value = self.ctx
ages_a_go = datetime.datetime.utcnow() - datetime.timedelta(
seconds=4000)
mock_get_all.return_value = [{'id': 'foo',
'deleted_at': None,
'updated_at': ages_a_go}]
self.eng.service_manage_cleanup()
mock_admin_context.assert_called_once_with()
mock_get_all.assert_called_once_with(self.ctx,
self.eng.host,
self.eng.binary,
self.eng.hostname)
mock_service_delete.assert_called_once_with(
self.ctx, 'foo')
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_update(
self,
mock_admin_context,
mock_service_update):
self.eng.service_id = 'mock_id'
mock_admin_context.return_value = self.ctx
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_service_update.assert_called_once_with(
self.ctx,
'mock_id',
dict(deleted_at=None))
def test_stop_rpc_server(self):
with mock.patch.object(self.eng,
'_rpc_server') as mock_rpc_server:
self.eng._stop_rpc_server()
mock_rpc_server.stop.assert_called_once_with()
mock_rpc_server.wait.assert_called_once_with()
def _test_engine_service_start(
self,
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# engine id
sample_uuid_method.assert_called_once_with()
sampe_uuid = sample_uuid_method.return_value
self.assertEqual(sampe_uuid,
self.eng.engine_id,
'Failed to generated engine_id')
# Thread group manager
thread_group_manager_class.assert_called_once_with()
thread_group_manager = thread_group_manager_class.return_value
self.assertEqual(thread_group_manager,
self.eng.thread_group_mgr,
'Failed to create Thread Group Manager')
# Engine Listener
engine_listener_class.assert_called_once_with(
self.eng.host,
self.eng.engine_id,
self.eng.thread_group_mgr
)
engine_lister = engine_listener_class.return_value
engine_lister.start.assert_called_once_with()
# Worker Service
if cfg.CONF.convergence_engine:
worker_service_class.assert_called_once_with(
host=self.eng.host,
topic=worker_api.TOPIC,
engine_id=self.eng.engine_id,
thread_group_mgr=self.eng.thread_group_mgr
)
worker_service = worker_service_class.return_value
worker_service.start.assert_called_once_with()
# RPC Target
target_class.assert_called_once_with(
version=service.EngineService.RPC_API_VERSION,
server=self.eng.host,
topic=self.eng.topic)
# RPC server
target = target_class.return_value
rpc_server_method.assert_called_once_with(target,
self.eng)
rpc_server = rpc_server_method.return_value
self.assertEqual(rpc_server,
self.eng._rpc_server,
"Failed to create RPC server")
rpc_server.start.assert_called_once_with()
# RPC client
rpc_client = rpc_client_class.return_value
rpc_client_class.assert_called_once_with(
version=service.EngineService.RPC_API_VERSION)
self.assertEqual(rpc_client,
self.eng._client,
"Failed to create RPC client")
# Manage Thread group
thread_group_class.assert_called_once_with()
manage_thread_group = thread_group_class.return_value
manage_thread_group.add_timer.assert_called_once_with(
cfg.CONF.periodic_interval,
self.eng.service_manage_report
)
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch.object(stack_object.Stack, 'get_all')
@ -3237,283 +3062,3 @@ class StackServiceTest(common.HeatTestCase):
fake_stack, fake_stack.state_set, fake_stack.action,
parser.Stack.FAILED, 'Engine went down during stack CREATE'
)
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
return_value=mock.Mock())
@mock.patch('heat.common.messaging.get_rpc_client',
return_value=mock.Mock())
@mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id',
return_value='sample-uuid')
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch('heat.engine.service.EngineListener',
return_value=mock.Mock())
@mock.patch('heat.openstack.common.threadgroup.ThreadGroup',
return_value=mock.Mock())
def test_engine_service_start_in_non_convergence_mode(
self,
thread_group_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
cfg.CONF.set_default('convergence_engine', False)
self._test_engine_service_start(
thread_group_class,
None,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method
)
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
return_value=mock.Mock())
@mock.patch('heat.common.messaging.get_rpc_client',
return_value=mock.Mock())
@mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id',
return_value=mock.Mock())
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch('heat.engine.service.EngineListener',
return_value=mock.Mock())
@mock.patch('heat.engine.worker.WorkerService',
return_value=mock.Mock())
@mock.patch('heat.openstack.common.threadgroup.ThreadGroup',
return_value=mock.Mock())
def test_engine_service_start_in_convergence_mode(
self,
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
cfg.CONF.set_default('convergence_engine', True)
self._test_engine_service_start(
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method
)
def _test_engine_service_stop(
self,
service_delete_method,
admin_context_method):
cfg.CONF.set_default('periodic_interval', 60)
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# Add dummy thread group to test thread_group_mgr.stop() is executed?
dtg1 = tools.DummyThreadGroup()
dtg2 = tools.DummyThreadGroup()
self.eng.thread_group_mgr.groups['sample-uuid1'] = dtg1
self.eng.thread_group_mgr.groups['sample-uuid2'] = dtg2
self.eng.service_id = 'sample-service-uuid'
orig_stop = self.eng.thread_group_mgr.stop
with mock.patch.object(self.eng.thread_group_mgr, 'stop') as stop:
stop.side_effect = orig_stop
self.eng.stop()
# RPC server
self.eng._stop_rpc_server.assert_called_once_with()
if cfg.CONF.convergence_engine:
# WorkerService
self.eng.worker_service.stop.assert_called_once_with()
# Wait for all active threads to be finished
calls = [mock.call('sample-uuid1', True),
mock.call('sample-uuid2', True)]
self.eng.thread_group_mgr.stop.assert_has_calls(
calls,
True)
# # Manage Thread group
self.eng.manage_thread_grp.stop.assert_called_with(False)
# Service delete
admin_context_method.assert_called_once_with()
ctxt = admin_context_method.return_value
service_delete_method.assert_called_once_with(
ctxt,
self.eng.service_id
)
@mock.patch.object(service.EngineService,
'_stop_rpc_server')
@mock.patch.object(worker.WorkerService,
'stop')
@mock.patch.object(threadgroup.ThreadGroup,
'stop')
@mock.patch('heat.common.context.get_admin_context',
return_value=mock.Mock())
@mock.patch('heat.objects.service.Service.delete',
return_value=mock.Mock())
def test_engine_service_stop_in_convergence_mode(
self,
service_delete_method,
admin_context_method,
thread_group_stop,
worker_service_stop,
rpc_server_stop):
cfg.CONF.set_default('convergence_engine', True)
self._test_engine_service_stop(
service_delete_method,
admin_context_method
)
@mock.patch.object(service.EngineService,
'_stop_rpc_server')
@mock.patch.object(threadgroup.ThreadGroup,
'stop')
@mock.patch('heat.common.context.get_admin_context',
return_value=mock.Mock())
@mock.patch('heat.objects.service.Service.delete',
return_value=mock.Mock())
def test_engine_service_stop_in_non_convergence_mode(
self,
service_delete_method,
admin_context_method,
thread_group_stop,
rpc_server_stop):
cfg.CONF.set_default('convergence_engine', False)
self._test_engine_service_stop(
service_delete_method,
admin_context_method
)
@mock.patch('oslo_log.log.setup')
def test_engine_service_reset(self, setup_logging_mock):
self.eng.reset()
setup_logging_mock.assertCalledOnceWith(cfg.CONF, 'heat')
class ThreadGroupManagerTest(common.HeatTestCase):
def setUp(self):
super(ThreadGroupManagerTest, self).setUp()
self.f = 'function'
self.fargs = ('spam', 'ham', 'eggs')
self.fkwargs = {'foo': 'bar'}
self.cnxt = 'ctxt'
self.engine_id = 'engine_id'
self.stack = mock.Mock()
self.lock_mock = mock.Mock()
self.stlock_mock = self.patch('heat.engine.service.stack_lock')
self.stlock_mock.StackLock.return_value = self.lock_mock
self.tg_mock = mock.Mock()
self.thg_mock = self.patch('heat.engine.service.threadgroup')
self.thg_mock.ThreadGroup.return_value = self.tg_mock
self.cfg_mock = self.patch('heat.engine.service.cfg')
def test_tgm_start_with_lock(self):
thm = service.ThreadGroupManager()
with self.patchobject(thm, 'start_with_acquired_lock'):
mock_thread_lock = mock.Mock()
mock_thread_lock.__enter__ = mock.Mock(return_value=None)
mock_thread_lock.__exit__ = mock.Mock(return_value=None)
self.lock_mock.thread_lock.return_value = mock_thread_lock
thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f,
*self.fargs, **self.fkwargs)
self.stlock_mock.StackLock.assert_called_with(self.cnxt,
self.stack.id,
self.engine_id)
thm.start_with_acquired_lock.assert_called_once_with(
self.stack, self.lock_mock,
self.f, *self.fargs, **self.fkwargs)
def test_tgm_start(self):
stack_id = 'test'
thm = service.ThreadGroupManager()
ret = thm.start(stack_id, self.f, *self.fargs, **self.fkwargs)
self.assertEqual(self.tg_mock, thm.groups['test'])
self.tg_mock.add_thread.assert_called_with(
thm._start_with_trace, None,
self.f, *self.fargs, **self.fkwargs)
self.assertEqual(ret, self.tg_mock.add_thread())
def test_tgm_add_timer(self):
stack_id = 'test'
thm = service.ThreadGroupManager()
thm.add_timer(stack_id, self.f, *self.fargs, **self.fkwargs)
self.assertEqual(self.tg_mock, thm.groups[stack_id])
self.tg_mock.add_timer.assert_called_with(
self.cfg_mock.CONF.periodic_interval,
self.f, *self.fargs, **self.fkwargs)
def test_tgm_add_event(self):
stack_id = 'add_events_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
self.assertEqual([e1, e2], thm.events[stack_id])
def test_tgm_remove_event(self):
stack_id = 'add_events_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.remove_event(None, stack_id, e2)
self.assertEqual([e1], thm.events[stack_id])
thm.remove_event(None, stack_id, e1)
self.assertNotIn(stack_id, thm.events)
def test_tgm_send(self):
stack_id = 'send_test'
e1, e2 = mock.MagicMock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.send(stack_id, 'test_message')
class ThreadGroupManagerStopTest(common.HeatTestCase):
def test_tgm_stop(self):
stack_id = 'test'
done = []
def function():
while True:
eventlet.sleep()
def linked(gt, thread):
for i in range(10):
eventlet.sleep()
done.append(thread)
thm = service.ThreadGroupManager()
thm.add_event(stack_id, mock.Mock())
thread = thm.start(stack_id, function)
thread.link(linked, thread)
thm.stop(stack_id)
self.assertIn(thread, done)
self.assertNotIn(stack_id, thm.groups)
self.assertNotIn(stack_id, thm.events)