diff --git a/heat/engine/stack_lock.py b/heat/engine/stack_lock.py index 314863cdd0..f7b7d65532 100644 --- a/heat/engine/stack_lock.py +++ b/heat/engine/stack_lock.py @@ -14,20 +14,15 @@ import contextlib import uuid -from oslo_config import cfg from oslo_log import log as logging -import oslo_messaging as messaging from oslo_utils import excutils from heat.common import exception from heat.common.i18n import _LI from heat.common.i18n import _LW -from heat.common import messaging as rpc_messaging from heat.objects import stack as stack_object from heat.objects import stack_lock as stack_lock_object -from heat.rpc import api as rpc_api - -cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config') +from heat.rpc import listener_client LOG = logging.getLogger(__name__) @@ -41,15 +36,8 @@ class StackLock(object): @staticmethod def engine_alive(context, engine_id): - client = rpc_messaging.get_rpc_client( - version='1.0', topic=rpc_api.LISTENER_TOPIC, - server=engine_id) - client_context = client.prepare( - timeout=cfg.CONF.engine_life_check_timeout) - try: - return client_context.call(context, 'listening') - except messaging.MessagingTimeout: - return False + return listener_client.EngineListenerClient( + engine_id).is_alive(context) @staticmethod def generate_engine_id(): diff --git a/heat/rpc/listener_client.py b/heat/rpc/listener_client.py new file mode 100644 index 0000000000..fe626492ab --- /dev/null +++ b/heat/rpc/listener_client.py @@ -0,0 +1,50 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Client side of the heat worker RPC API. +""" + +from oslo_config import cfg +import oslo_messaging as messaging + +from heat.common import messaging as rpc_messaging +from heat.rpc import api as rpc_api + +cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config') + + +class EngineListenerClient(object): + '''Client side of the heat listener RPC API. + + API version history:: + + 1.0 - Initial version. + ''' + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, engine_id): + _client = rpc_messaging.get_rpc_client( + topic=rpc_api.LISTENER_TOPIC, + version=self.BASE_RPC_API_VERSION, + server=engine_id) + self._client = _client.prepare( + timeout=cfg.CONF.engine_life_check_timeout) + + def is_alive(self, ctxt): + try: + return self._client.call(ctxt, 'listening') + except messaging.MessagingTimeout: + return False diff --git a/heat/tests/test_rpc_listener_client.py b/heat/tests/test_rpc_listener_client.py new file mode 100644 index 0000000000..fe10f33339 --- /dev/null +++ b/heat/tests/test_rpc_listener_client.py @@ -0,0 +1,70 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import oslo_messaging as messaging + +from heat.rpc import api as rpc_api +from heat.rpc import listener_client as rpc_client +from heat.tests import common + + +class ListenerClientTest(common.HeatTestCase): + + @mock.patch('heat.common.messaging.get_rpc_client', + return_value=mock.Mock()) + def test_engine_alive_ok(self, rpc_client_method): + mock_rpc_client = rpc_client_method.return_value + mock_prepare_method = mock_rpc_client.prepare + mock_prepare_client = mock_prepare_method.return_value + mock_cnxt = mock.Mock() + + listener_client = rpc_client.EngineListenerClient('engine-007') + rpc_client_method.assert_called_once_with( + version=rpc_client.EngineListenerClient.BASE_RPC_API_VERSION, + topic=rpc_api.LISTENER_TOPIC, server='engine-007', + ) + mock_prepare_method.assert_called_once_with(timeout=2) + self.assertEqual(mock_prepare_client, + listener_client._client, + "Failed to create RPC client") + + ret = listener_client.is_alive(mock_cnxt) + self.assertTrue(ret) + mock_prepare_client.call.assert_called_once_with(mock_cnxt, + 'listening') + + @mock.patch('heat.common.messaging.get_rpc_client', + return_value=mock.Mock()) + def test_engine_alive_timeout(self, rpc_client_method): + mock_rpc_client = rpc_client_method.return_value + mock_prepare_method = mock_rpc_client.prepare + mock_prepare_client = mock_prepare_method.return_value + mock_cnxt = mock.Mock() + + listener_client = rpc_client.EngineListenerClient('engine-007') + rpc_client_method.assert_called_once_with( + version=rpc_client.EngineListenerClient.BASE_RPC_API_VERSION, + topic=rpc_api.LISTENER_TOPIC, server='engine-007', + ) + mock_prepare_method.assert_called_once_with(timeout=2) + self.assertEqual(mock_prepare_client, + listener_client._client, + "Failed to create RPC client") + + mock_prepare_client.call.side_effect = messaging.MessagingTimeout( + 'too slow') + ret = listener_client.is_alive(mock_cnxt) + self.assertFalse(ret) + mock_prepare_client.call.assert_called_once_with(mock_cnxt, + 'listening') diff --git a/heat/tests/test_stack_lock.py b/heat/tests/test_stack_lock.py index babc406dff..cccb793996 100644 --- a/heat/tests/test_stack_lock.py +++ b/heat/tests/test_stack_lock.py @@ -12,7 +12,6 @@ # under the License. import mock -import oslo_messaging as messaging from heat.common import exception from heat.engine import stack_lock @@ -216,29 +215,3 @@ class StackLockTest(common.HeatTestCase): raise self.TestThreadLockException self.assertRaises(self.TestThreadLockException, check_thread_lock) assert not stack_lock_object.StackLock.release.called - - def test_engine_alive_ok(self): - slock = stack_lock.StackLock(self.context, self.stack_id, - self.engine_id) - mget_client = self.patchobject(stack_lock.rpc_messaging, - 'get_rpc_client') - mclient = mget_client.return_value - mclient_ctx = mclient.prepare.return_value - mclient_ctx.call.return_value = True - ret = slock.engine_alive(self.context, self.engine_id) - self.assertTrue(ret) - mclient.prepare.assert_called_once_with(timeout=2) - mclient_ctx.call.assert_called_once_with(self.context, 'listening') - - def test_engine_alive_timeout(self): - slock = stack_lock.StackLock(self.context, self.stack_id, - self.engine_id) - mget_client = self.patchobject(stack_lock.rpc_messaging, - 'get_rpc_client') - mclient = mget_client.return_value - mclient_ctx = mclient.prepare.return_value - mclient_ctx.call.side_effect = messaging.MessagingTimeout('too slow') - ret = slock.engine_alive(self.context, self.engine_id) - self.assertIs(False, ret) - mclient.prepare.assert_called_once_with(timeout=2) - mclient_ctx.call.assert_called_once_with(self.context, 'listening')