Merge "Split engine service test cases (6)"

This commit is contained in:
Jenkins 2015-06-11 17:38:08 +00:00 committed by Gerrit Code Review
commit b0995e89f2
3 changed files with 501 additions and 455 deletions

View File

@ -0,0 +1,370 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
from oslo_config import cfg
from heat.common import context
from heat.common import service_utils
from heat.engine import service
from heat.engine import worker
from heat.objects import service as service_objects
from heat.openstack.common import threadgroup
from heat.rpc import worker_api
from heat.tests import common
from heat.tests.engine import tools
from heat.tests import utils
class ServiceEngineTest(common.HeatTestCase):
def setUp(self):
super(ServiceEngineTest, self).setUp()
self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant')
self.eng = service.EngineService('a-host', 'a-topic')
self.eng.engine_id = 'engine-fake-uuid'
def test_make_sure_rpc_version(self):
self.assertEqual(
'1.9',
service.EngineService.RPC_API_VERSION,
('RPC version is changed, please update this test to new version '
'and make sure additional test cases are added for RPC APIs '
'added in new version'))
@mock.patch.object(service_objects.Service, 'get_all')
@mock.patch.object(service_utils, 'format_service')
def test_service_get_all(self, mock_format_service, mock_get_all):
mock_get_all.return_value = [mock.Mock()]
mock_format_service.return_value = mock.Mock()
self.assertEqual(1, len(self.eng.list_services(self.ctx)))
self.assertTrue(mock_get_all.called)
mock_format_service.assert_called_once_with(mock.ANY)
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(service_objects.Service, 'create')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_start(self,
mock_admin_context,
mock_service_create,
mock_service_update):
self.eng.service_id = None
mock_admin_context.return_value = self.ctx
srv = dict(id='mock_id')
mock_service_create.return_value = srv
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_service_create.assert_called_once_with(
self.ctx,
dict(host=self.eng.host,
hostname=self.eng.hostname,
binary=self.eng.binary,
engine_id=self.eng.engine_id,
topic=self.eng.topic,
report_interval=cfg.CONF.periodic_interval))
self.assertEqual(srv['id'], self.eng.service_id)
mock_service_update.assert_called_once_with(
self.ctx,
self.eng.service_id,
dict(deleted_at=None))
@mock.patch.object(service_objects.Service, 'get_all_by_args')
@mock.patch.object(service_objects.Service, 'delete')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_cleanup(self,
mock_admin_context,
mock_service_delete,
mock_get_all):
mock_admin_context.return_value = self.ctx
ages_a_go = datetime.datetime.utcnow() - datetime.timedelta(
seconds=4000)
mock_get_all.return_value = [{'id': 'foo',
'deleted_at': None,
'updated_at': ages_a_go}]
self.eng.service_manage_cleanup()
mock_admin_context.assert_called_once_with()
mock_get_all.assert_called_once_with(self.ctx,
self.eng.host,
self.eng.binary,
self.eng.hostname)
mock_service_delete.assert_called_once_with(
self.ctx, 'foo')
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_update(self, mock_admin_context,
mock_service_update):
self.eng.service_id = 'mock_id'
mock_admin_context.return_value = self.ctx
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_service_update.assert_called_once_with(
self.ctx,
'mock_id',
dict(deleted_at=None))
def test_stop_rpc_server(self):
with mock.patch.object(self.eng,
'_rpc_server') as mock_rpc_server:
self.eng._stop_rpc_server()
mock_rpc_server.stop.assert_called_once_with()
mock_rpc_server.wait.assert_called_once_with()
def _test_engine_service_start(
self,
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# engine id
sample_uuid_method.assert_called_once_with()
sampe_uuid = sample_uuid_method.return_value
self.assertEqual(sampe_uuid,
self.eng.engine_id,
'Failed to generated engine_id')
# Thread group manager
thread_group_manager_class.assert_called_once_with()
thread_group_manager = thread_group_manager_class.return_value
self.assertEqual(thread_group_manager,
self.eng.thread_group_mgr,
'Failed to create Thread Group Manager')
# Engine Listener
engine_listener_class.assert_called_once_with(
self.eng.host,
self.eng.engine_id,
self.eng.thread_group_mgr
)
engine_lister = engine_listener_class.return_value
engine_lister.start.assert_called_once_with()
# Worker Service
if cfg.CONF.convergence_engine:
worker_service_class.assert_called_once_with(
host=self.eng.host,
topic=worker_api.TOPIC,
engine_id=self.eng.engine_id,
thread_group_mgr=self.eng.thread_group_mgr
)
worker_service = worker_service_class.return_value
worker_service.start.assert_called_once_with()
# RPC Target
target_class.assert_called_once_with(
version=service.EngineService.RPC_API_VERSION,
server=self.eng.host,
topic=self.eng.topic)
# RPC server
target = target_class.return_value
rpc_server_method.assert_called_once_with(target,
self.eng)
rpc_server = rpc_server_method.return_value
self.assertEqual(rpc_server,
self.eng._rpc_server,
"Failed to create RPC server")
rpc_server.start.assert_called_once_with()
# RPC client
rpc_client = rpc_client_class.return_value
rpc_client_class.assert_called_once_with(
version=service.EngineService.RPC_API_VERSION)
self.assertEqual(rpc_client,
self.eng._client,
"Failed to create RPC client")
# Manage Thread group
thread_group_class.assert_called_once_with()
manage_thread_group = thread_group_class.return_value
manage_thread_group.add_timer.assert_called_once_with(
cfg.CONF.periodic_interval,
self.eng.service_manage_report
)
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
return_value=mock.Mock())
@mock.patch('heat.common.messaging.get_rpc_client',
return_value=mock.Mock())
@mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id',
return_value='sample-uuid')
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch('heat.engine.service.EngineListener',
return_value=mock.Mock())
@mock.patch('heat.openstack.common.threadgroup.ThreadGroup',
return_value=mock.Mock())
def test_engine_service_start_in_non_convergence_mode(
self,
thread_group_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
cfg.CONF.set_default('convergence_engine', False)
self._test_engine_service_start(
thread_group_class,
None,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method
)
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
return_value=mock.Mock())
@mock.patch('heat.common.messaging.get_rpc_client',
return_value=mock.Mock())
@mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id',
return_value=mock.Mock())
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch('heat.engine.service.EngineListener',
return_value=mock.Mock())
@mock.patch('heat.engine.worker.WorkerService',
return_value=mock.Mock())
@mock.patch('heat.openstack.common.threadgroup.ThreadGroup',
return_value=mock.Mock())
def test_engine_service_start_in_convergence_mode(
self,
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
cfg.CONF.set_default('convergence_engine', True)
self._test_engine_service_start(
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method
)
def _test_engine_service_stop(
self,
service_delete_method,
admin_context_method):
cfg.CONF.set_default('periodic_interval', 60)
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# Add dummy thread group to test thread_group_mgr.stop() is executed?
dtg1 = tools.DummyThreadGroup()
dtg2 = tools.DummyThreadGroup()
self.eng.thread_group_mgr.groups['sample-uuid1'] = dtg1
self.eng.thread_group_mgr.groups['sample-uuid2'] = dtg2
self.eng.service_id = 'sample-service-uuid'
orig_stop = self.eng.thread_group_mgr.stop
with mock.patch.object(self.eng.thread_group_mgr, 'stop') as stop:
stop.side_effect = orig_stop
self.eng.stop()
# RPC server
self.eng._stop_rpc_server.assert_called_once_with()
if cfg.CONF.convergence_engine:
# WorkerService
self.eng.worker_service.stop.assert_called_once_with()
# Wait for all active threads to be finished
calls = [mock.call('sample-uuid1', True),
mock.call('sample-uuid2', True)]
self.eng.thread_group_mgr.stop.assert_has_calls(calls, True)
# # Manage Thread group
self.eng.manage_thread_grp.stop.assert_called_with(False)
# Service delete
admin_context_method.assert_called_once_with()
ctxt = admin_context_method.return_value
service_delete_method.assert_called_once_with(
ctxt,
self.eng.service_id
)
@mock.patch.object(service.EngineService,
'_stop_rpc_server')
@mock.patch.object(worker.WorkerService,
'stop')
@mock.patch.object(threadgroup.ThreadGroup,
'stop')
@mock.patch('heat.common.context.get_admin_context',
return_value=mock.Mock())
@mock.patch('heat.objects.service.Service.delete',
return_value=mock.Mock())
def test_engine_service_stop_in_convergence_mode(
self,
service_delete_method,
admin_context_method,
thread_group_stop,
worker_service_stop,
rpc_server_stop):
cfg.CONF.set_default('convergence_engine', True)
self._test_engine_service_stop(
service_delete_method,
admin_context_method
)
@mock.patch.object(service.EngineService, '_stop_rpc_server')
@mock.patch.object(threadgroup.ThreadGroup, 'stop')
@mock.patch('heat.common.context.get_admin_context',
return_value=mock.Mock())
@mock.patch('heat.objects.service.Service.delete',
return_value=mock.Mock())
def test_engine_service_stop_in_non_convergence_mode(
self,
service_delete_method,
admin_context_method,
thread_group_stop,
rpc_server_stop):
cfg.CONF.set_default('convergence_engine', False)
self._test_engine_service_stop(
service_delete_method,
admin_context_method
)
@mock.patch('oslo_log.log.setup')
def test_engine_service_reset(self, setup_logging_mock):
self.eng.reset()
setup_logging_mock.assertCalledOnceWith(cfg.CONF, 'heat')

View File

@ -0,0 +1,131 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import mock
from heat.engine import service
from heat.tests import common
class ThreadGroupManagerTest(common.HeatTestCase):
def setUp(self):
super(ThreadGroupManagerTest, self).setUp()
self.f = 'function'
self.fargs = ('spam', 'ham', 'eggs')
self.fkwargs = {'foo': 'bar'}
self.cnxt = 'ctxt'
self.engine_id = 'engine_id'
self.stack = mock.Mock()
self.lock_mock = mock.Mock()
self.stlock_mock = self.patch('heat.engine.service.stack_lock')
self.stlock_mock.StackLock.return_value = self.lock_mock
self.tg_mock = mock.Mock()
self.thg_mock = self.patch('heat.engine.service.threadgroup')
self.thg_mock.ThreadGroup.return_value = self.tg_mock
self.cfg_mock = self.patch('heat.engine.service.cfg')
def test_tgm_start_with_lock(self):
thm = service.ThreadGroupManager()
with self.patchobject(thm, 'start_with_acquired_lock'):
mock_thread_lock = mock.Mock()
mock_thread_lock.__enter__ = mock.Mock(return_value=None)
mock_thread_lock.__exit__ = mock.Mock(return_value=None)
self.lock_mock.thread_lock.return_value = mock_thread_lock
thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f,
*self.fargs, **self.fkwargs)
self.stlock_mock.StackLock.assert_called_with(self.cnxt,
self.stack.id,
self.engine_id)
thm.start_with_acquired_lock.assert_called_once_with(
self.stack, self.lock_mock,
self.f, *self.fargs, **self.fkwargs)
def test_tgm_start(self):
stack_id = 'test'
thm = service.ThreadGroupManager()
ret = thm.start(stack_id, self.f, *self.fargs, **self.fkwargs)
self.assertEqual(self.tg_mock, thm.groups['test'])
self.tg_mock.add_thread.assert_called_with(
thm._start_with_trace, None,
self.f, *self.fargs, **self.fkwargs)
self.assertEqual(ret, self.tg_mock.add_thread())
def test_tgm_add_timer(self):
stack_id = 'test'
thm = service.ThreadGroupManager()
thm.add_timer(stack_id, self.f, *self.fargs, **self.fkwargs)
self.assertEqual(self.tg_mock, thm.groups[stack_id])
self.tg_mock.add_timer.assert_called_with(
self.cfg_mock.CONF.periodic_interval,
self.f, *self.fargs, **self.fkwargs)
def test_tgm_add_event(self):
stack_id = 'add_events_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
self.assertEqual([e1, e2], thm.events[stack_id])
def test_tgm_remove_event(self):
stack_id = 'add_events_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.remove_event(None, stack_id, e2)
self.assertEqual([e1], thm.events[stack_id])
thm.remove_event(None, stack_id, e1)
self.assertNotIn(stack_id, thm.events)
def test_tgm_send(self):
stack_id = 'send_test'
e1, e2 = mock.MagicMock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.send(stack_id, 'test_message')
class ThreadGroupManagerStopTest(common.HeatTestCase):
def test_tgm_stop(self):
stack_id = 'test'
done = []
def function():
while True:
eventlet.sleep()
def linked(gt, thread):
for i in range(10):
eventlet.sleep()
done.append(thread)
thm = service.ThreadGroupManager()
thm.add_event(stack_id, mock.Mock())
thread = thm.start(stack_id, function)
thread.link(linked, thread)
thm.stop(stack_id)
self.assertIn(thread, done)
self.assertNotIn(stack_id, thm.groups)
self.assertNotIn(stack_id, thm.events)

View File

@ -11,10 +11,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import datetime
import uuid import uuid
import eventlet
from eventlet import event as grevent from eventlet import event as grevent
import mock import mock
import mox import mox
@ -27,7 +25,6 @@ from heat.common import context
from heat.common import exception from heat.common import exception
from heat.common import identifier from heat.common import identifier
from heat.common import messaging from heat.common import messaging
from heat.common import service_utils
from heat.common import template_format from heat.common import template_format
from heat.engine import dependencies from heat.engine import dependencies
from heat.engine import environment from heat.engine import environment
@ -39,17 +36,14 @@ from heat.engine import stack as parser
from heat.engine import stack_lock from heat.engine import stack_lock
from heat.engine import template as templatem from heat.engine import template as templatem
from heat.engine import watchrule from heat.engine import watchrule
from heat.engine import worker
from heat.objects import event as event_object from heat.objects import event as event_object
from heat.objects import resource as resource_objects from heat.objects import resource as resource_objects
from heat.objects import service as service_objects
from heat.objects import stack as stack_object from heat.objects import stack as stack_object
from heat.objects import sync_point as sync_point_object from heat.objects import sync_point as sync_point_object
from heat.objects import watch_data as watch_data_object from heat.objects import watch_data as watch_data_object
from heat.objects import watch_rule as watch_rule_object from heat.objects import watch_rule as watch_rule_object
from heat.openstack.common import threadgroup from heat.openstack.common import threadgroup
from heat.rpc import api as rpc_api from heat.rpc import api as rpc_api
from heat.rpc import worker_api
from heat.rpc import worker_client from heat.rpc import worker_client
from heat.tests import common from heat.tests import common
from heat.tests.engine import tools from heat.tests.engine import tools
@ -1357,14 +1351,6 @@ class StackServiceTest(common.HeatTestCase):
res._register_class('ResourceWithPropsType', res._register_class('ResourceWithPropsType',
generic_rsrc.ResourceWithProps) generic_rsrc.ResourceWithProps)
def test_make_sure_rpc_version(self):
self.assertEqual(
'1.9',
service.EngineService.RPC_API_VERSION,
('RPC version is changed, please update this test to new version '
'and make sure additional test cases are added for RPC APIs '
'added in new version'))
@mock.patch.object(service_stack_watch.StackWatch, 'start_watch_task') @mock.patch.object(service_stack_watch.StackWatch, 'start_watch_task')
@mock.patch.object(stack_object.Stack, 'get_all') @mock.patch.object(stack_object.Stack, 'get_all')
@mock.patch.object(service.service.Service, 'start') @mock.patch.object(service.service.Service, 'start')
@ -3026,167 +3012,6 @@ class StackServiceTest(common.HeatTestCase):
self.eng._validate_new_stack, self.eng._validate_new_stack,
self.ctx, 'test_existing_stack', parsed_template) self.ctx, 'test_existing_stack', parsed_template)
@mock.patch.object(service_objects.Service, 'get_all')
@mock.patch.object(service_utils, 'format_service')
def test_service_get_all(self, mock_format_service, mock_get_all):
mock_get_all.return_value = [mock.Mock()]
mock_format_service.return_value = mock.Mock()
self.assertEqual(1, len(self.eng.list_services(self.ctx)))
self.assertTrue(mock_get_all.called)
mock_format_service.assert_called_once_with(mock.ANY)
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(service_objects.Service, 'create')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_start(self,
mock_admin_context,
mock_service_create,
mock_service_update):
self.eng.service_id = None
mock_admin_context.return_value = self.ctx
srv = dict(id='mock_id')
mock_service_create.return_value = srv
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_service_create.assert_called_once_with(
self.ctx,
dict(host=self.eng.host,
hostname=self.eng.hostname,
binary=self.eng.binary,
engine_id=self.eng.engine_id,
topic=self.eng.topic,
report_interval=cfg.CONF.periodic_interval))
self.assertEqual(self.eng.service_id, srv['id'])
mock_service_update.assert_called_once_with(
self.ctx,
self.eng.service_id,
dict(deleted_at=None))
@mock.patch.object(service_objects.Service, 'get_all_by_args')
@mock.patch.object(service_objects.Service, 'delete')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_cleanup(self,
mock_admin_context,
mock_service_delete,
mock_get_all):
mock_admin_context.return_value = self.ctx
ages_a_go = datetime.datetime.utcnow() - datetime.timedelta(
seconds=4000)
mock_get_all.return_value = [{'id': 'foo',
'deleted_at': None,
'updated_at': ages_a_go}]
self.eng.service_manage_cleanup()
mock_admin_context.assert_called_once_with()
mock_get_all.assert_called_once_with(self.ctx,
self.eng.host,
self.eng.binary,
self.eng.hostname)
mock_service_delete.assert_called_once_with(
self.ctx, 'foo')
@mock.patch.object(service_objects.Service, 'update_by_id')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_update(
self,
mock_admin_context,
mock_service_update):
self.eng.service_id = 'mock_id'
mock_admin_context.return_value = self.ctx
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_service_update.assert_called_once_with(
self.ctx,
'mock_id',
dict(deleted_at=None))
def test_stop_rpc_server(self):
with mock.patch.object(self.eng,
'_rpc_server') as mock_rpc_server:
self.eng._stop_rpc_server()
mock_rpc_server.stop.assert_called_once_with()
mock_rpc_server.wait.assert_called_once_with()
def _test_engine_service_start(
self,
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# engine id
sample_uuid_method.assert_called_once_with()
sampe_uuid = sample_uuid_method.return_value
self.assertEqual(sampe_uuid,
self.eng.engine_id,
'Failed to generated engine_id')
# Thread group manager
thread_group_manager_class.assert_called_once_with()
thread_group_manager = thread_group_manager_class.return_value
self.assertEqual(thread_group_manager,
self.eng.thread_group_mgr,
'Failed to create Thread Group Manager')
# Engine Listener
engine_listener_class.assert_called_once_with(
self.eng.host,
self.eng.engine_id,
self.eng.thread_group_mgr
)
engine_lister = engine_listener_class.return_value
engine_lister.start.assert_called_once_with()
# Worker Service
if cfg.CONF.convergence_engine:
worker_service_class.assert_called_once_with(
host=self.eng.host,
topic=worker_api.TOPIC,
engine_id=self.eng.engine_id,
thread_group_mgr=self.eng.thread_group_mgr
)
worker_service = worker_service_class.return_value
worker_service.start.assert_called_once_with()
# RPC Target
target_class.assert_called_once_with(
version=service.EngineService.RPC_API_VERSION,
server=self.eng.host,
topic=self.eng.topic)
# RPC server
target = target_class.return_value
rpc_server_method.assert_called_once_with(target,
self.eng)
rpc_server = rpc_server_method.return_value
self.assertEqual(rpc_server,
self.eng._rpc_server,
"Failed to create RPC server")
rpc_server.start.assert_called_once_with()
# RPC client
rpc_client = rpc_client_class.return_value
rpc_client_class.assert_called_once_with(
version=service.EngineService.RPC_API_VERSION)
self.assertEqual(rpc_client,
self.eng._client,
"Failed to create RPC client")
# Manage Thread group
thread_group_class.assert_called_once_with()
manage_thread_group = thread_group_class.return_value
manage_thread_group.add_timer.assert_called_once_with(
cfg.CONF.periodic_interval,
self.eng.service_manage_report
)
@mock.patch('heat.engine.service.ThreadGroupManager', @mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock()) return_value=mock.Mock())
@mock.patch.object(stack_object.Stack, 'get_all') @mock.patch.object(stack_object.Stack, 'get_all')
@ -3237,283 +3062,3 @@ class StackServiceTest(common.HeatTestCase):
fake_stack, fake_stack.state_set, fake_stack.action, fake_stack, fake_stack.state_set, fake_stack.action,
parser.Stack.FAILED, 'Engine went down during stack CREATE' parser.Stack.FAILED, 'Engine went down during stack CREATE'
) )
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
return_value=mock.Mock())
@mock.patch('heat.common.messaging.get_rpc_client',
return_value=mock.Mock())
@mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id',
return_value='sample-uuid')
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch('heat.engine.service.EngineListener',
return_value=mock.Mock())
@mock.patch('heat.openstack.common.threadgroup.ThreadGroup',
return_value=mock.Mock())
def test_engine_service_start_in_non_convergence_mode(
self,
thread_group_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
cfg.CONF.set_default('convergence_engine', False)
self._test_engine_service_start(
thread_group_class,
None,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method
)
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
return_value=mock.Mock())
@mock.patch('heat.common.messaging.get_rpc_client',
return_value=mock.Mock())
@mock.patch('heat.engine.stack_lock.StackLock.generate_engine_id',
return_value=mock.Mock())
@mock.patch('heat.engine.service.ThreadGroupManager',
return_value=mock.Mock())
@mock.patch('heat.engine.service.EngineListener',
return_value=mock.Mock())
@mock.patch('heat.engine.worker.WorkerService',
return_value=mock.Mock())
@mock.patch('heat.openstack.common.threadgroup.ThreadGroup',
return_value=mock.Mock())
def test_engine_service_start_in_convergence_mode(
self,
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method):
cfg.CONF.set_default('convergence_engine', True)
self._test_engine_service_start(
thread_group_class,
worker_service_class,
engine_listener_class,
thread_group_manager_class,
sample_uuid_method,
rpc_client_class,
target_class,
rpc_server_method
)
def _test_engine_service_stop(
self,
service_delete_method,
admin_context_method):
cfg.CONF.set_default('periodic_interval', 60)
self.patchobject(self.eng, 'service_manage_cleanup')
self.patchobject(self.eng, 'reset_stack_status')
self.eng.start()
# Add dummy thread group to test thread_group_mgr.stop() is executed?
dtg1 = tools.DummyThreadGroup()
dtg2 = tools.DummyThreadGroup()
self.eng.thread_group_mgr.groups['sample-uuid1'] = dtg1
self.eng.thread_group_mgr.groups['sample-uuid2'] = dtg2
self.eng.service_id = 'sample-service-uuid'
orig_stop = self.eng.thread_group_mgr.stop
with mock.patch.object(self.eng.thread_group_mgr, 'stop') as stop:
stop.side_effect = orig_stop
self.eng.stop()
# RPC server
self.eng._stop_rpc_server.assert_called_once_with()
if cfg.CONF.convergence_engine:
# WorkerService
self.eng.worker_service.stop.assert_called_once_with()
# Wait for all active threads to be finished
calls = [mock.call('sample-uuid1', True),
mock.call('sample-uuid2', True)]
self.eng.thread_group_mgr.stop.assert_has_calls(
calls,
True)
# # Manage Thread group
self.eng.manage_thread_grp.stop.assert_called_with(False)
# Service delete
admin_context_method.assert_called_once_with()
ctxt = admin_context_method.return_value
service_delete_method.assert_called_once_with(
ctxt,
self.eng.service_id
)
@mock.patch.object(service.EngineService,
'_stop_rpc_server')
@mock.patch.object(worker.WorkerService,
'stop')
@mock.patch.object(threadgroup.ThreadGroup,
'stop')
@mock.patch('heat.common.context.get_admin_context',
return_value=mock.Mock())
@mock.patch('heat.objects.service.Service.delete',
return_value=mock.Mock())
def test_engine_service_stop_in_convergence_mode(
self,
service_delete_method,
admin_context_method,
thread_group_stop,
worker_service_stop,
rpc_server_stop):
cfg.CONF.set_default('convergence_engine', True)
self._test_engine_service_stop(
service_delete_method,
admin_context_method
)
@mock.patch.object(service.EngineService,
'_stop_rpc_server')
@mock.patch.object(threadgroup.ThreadGroup,
'stop')
@mock.patch('heat.common.context.get_admin_context',
return_value=mock.Mock())
@mock.patch('heat.objects.service.Service.delete',
return_value=mock.Mock())
def test_engine_service_stop_in_non_convergence_mode(
self,
service_delete_method,
admin_context_method,
thread_group_stop,
rpc_server_stop):
cfg.CONF.set_default('convergence_engine', False)
self._test_engine_service_stop(
service_delete_method,
admin_context_method
)
@mock.patch('oslo_log.log.setup')
def test_engine_service_reset(self, setup_logging_mock):
self.eng.reset()
setup_logging_mock.assertCalledOnceWith(cfg.CONF, 'heat')
class ThreadGroupManagerTest(common.HeatTestCase):
def setUp(self):
super(ThreadGroupManagerTest, self).setUp()
self.f = 'function'
self.fargs = ('spam', 'ham', 'eggs')
self.fkwargs = {'foo': 'bar'}
self.cnxt = 'ctxt'
self.engine_id = 'engine_id'
self.stack = mock.Mock()
self.lock_mock = mock.Mock()
self.stlock_mock = self.patch('heat.engine.service.stack_lock')
self.stlock_mock.StackLock.return_value = self.lock_mock
self.tg_mock = mock.Mock()
self.thg_mock = self.patch('heat.engine.service.threadgroup')
self.thg_mock.ThreadGroup.return_value = self.tg_mock
self.cfg_mock = self.patch('heat.engine.service.cfg')
def test_tgm_start_with_lock(self):
thm = service.ThreadGroupManager()
with self.patchobject(thm, 'start_with_acquired_lock'):
mock_thread_lock = mock.Mock()
mock_thread_lock.__enter__ = mock.Mock(return_value=None)
mock_thread_lock.__exit__ = mock.Mock(return_value=None)
self.lock_mock.thread_lock.return_value = mock_thread_lock
thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f,
*self.fargs, **self.fkwargs)
self.stlock_mock.StackLock.assert_called_with(self.cnxt,
self.stack.id,
self.engine_id)
thm.start_with_acquired_lock.assert_called_once_with(
self.stack, self.lock_mock,
self.f, *self.fargs, **self.fkwargs)
def test_tgm_start(self):
stack_id = 'test'
thm = service.ThreadGroupManager()
ret = thm.start(stack_id, self.f, *self.fargs, **self.fkwargs)
self.assertEqual(self.tg_mock, thm.groups['test'])
self.tg_mock.add_thread.assert_called_with(
thm._start_with_trace, None,
self.f, *self.fargs, **self.fkwargs)
self.assertEqual(ret, self.tg_mock.add_thread())
def test_tgm_add_timer(self):
stack_id = 'test'
thm = service.ThreadGroupManager()
thm.add_timer(stack_id, self.f, *self.fargs, **self.fkwargs)
self.assertEqual(self.tg_mock, thm.groups[stack_id])
self.tg_mock.add_timer.assert_called_with(
self.cfg_mock.CONF.periodic_interval,
self.f, *self.fargs, **self.fkwargs)
def test_tgm_add_event(self):
stack_id = 'add_events_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
self.assertEqual([e1, e2], thm.events[stack_id])
def test_tgm_remove_event(self):
stack_id = 'add_events_test'
e1, e2 = mock.Mock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.remove_event(None, stack_id, e2)
self.assertEqual([e1], thm.events[stack_id])
thm.remove_event(None, stack_id, e1)
self.assertNotIn(stack_id, thm.events)
def test_tgm_send(self):
stack_id = 'send_test'
e1, e2 = mock.MagicMock(), mock.Mock()
thm = service.ThreadGroupManager()
thm.add_event(stack_id, e1)
thm.add_event(stack_id, e2)
thm.send(stack_id, 'test_message')
class ThreadGroupManagerStopTest(common.HeatTestCase):
def test_tgm_stop(self):
stack_id = 'test'
done = []
def function():
while True:
eventlet.sleep()
def linked(gt, thread):
for i in range(10):
eventlet.sleep()
done.append(thread)
thm = service.ThreadGroupManager()
thm.add_event(stack_id, mock.Mock())
thread = thm.start(stack_id, function)
thread.link(linked, thread)
thm.stop(stack_id)
self.assertIn(thread, done)
self.assertNotIn(stack_id, thm.groups)
self.assertNotIn(stack_id, thm.events)