diff --git a/heat/engine/service.py b/heat/engine/service.py index b3bf11b25a..f20bead6eb 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -52,15 +52,18 @@ from heat.engine import stack as parser from heat.engine import stack_lock from heat.engine import template as templatem from heat.engine import watchrule +from heat.engine import worker from heat.openstack.common import service from heat.openstack.common import threadgroup from heat.rpc import api as rpc_api +from heat.rpc import worker_api as rpc_worker_api cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config') cfg.CONF.import_opt('max_resources_per_stack', 'heat.common.config') cfg.CONF.import_opt('max_stacks_per_tenant', 'heat.common.config') cfg.CONF.import_opt('enable_stack_abandon', 'heat.common.config') cfg.CONF.import_opt('enable_stack_adopt', 'heat.common.config') +cfg.CONF.import_opt('convergence_engine', 'heat.common.config') LOG = logging.getLogger(__name__) @@ -271,6 +274,7 @@ class EngineService(service.Service): # happens after the fork when spawning multiple worker processes self.stack_watch = None self.listener = None + self.worker_service = None self.engine_id = None self.thread_group_mgr = None self.target = None @@ -313,9 +317,22 @@ class EngineService(service.Service): self.thread_group_mgr) LOG.debug("Starting listener for engine %s" % self.engine_id) self.listener.start() + + if cfg.CONF.convergence_engine: + self.worker_service = worker.WorkerService( + host=self.host, + topic=rpc_worker_api.TOPIC, + engine_id=self.engine_id, + thread_group_mgr=self.thread_group_mgr + ) + self.worker_service.start() + LOG.debug("WorkerService is started in engine %s" % + self.engine_id) + target = messaging.Target( version=self.RPC_API_VERSION, server=self.host, topic=self.topic) + self.target = target self._rpc_server = rpc_messaging.get_rpc_server(target, self) self._rpc_server.start() @@ -341,6 +358,12 @@ class EngineService(service.Service): def stop(self): self._stop_rpc_server() + if cfg.CONF.convergence_engine: + # Stop the WorkerService + self.worker_service.stop() + LOG.info(_LI("WorkerService is stopped in engine %s"), + self.engine_id) + # Wait for all active threads to be finished for stack_id in self.thread_group_mgr.groups.keys(): # Ignore dummy service task diff --git a/heat/engine/worker.py b/heat/engine/worker.py new file mode 100644 index 0000000000..897a1c3509 --- /dev/null +++ b/heat/engine/worker.py @@ -0,0 +1,78 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +import oslo_messaging +from osprofiler import profiler + +from heat.common.i18n import _LE +from heat.common.i18n import _LI +from heat.common import messaging as rpc_messaging +from heat.openstack.common import service +from heat.rpc import worker_client as rpc_client + +LOG = logging.getLogger(__name__) + + +@profiler.trace_cls("rpc") +class WorkerService(service.Service): + """ + This service is dedicated to handle internal messages to the 'worker' + (a.k.a. 'converger') actor in convergence. Messages on this bus will + use the 'cast' rather than 'call' method to anycast the message to + an engine that will handle it asynchronously. It won't wait for + or expect replies from these messages. + """ + + RPC_API_VERSION = '1.0' + + def __init__(self, + host, + topic, + engine_id, + thread_group_mgr): + super(WorkerService, self).__init__() + self.host = host + self.topic = topic + self.engine_id = engine_id + self.thread_group_mgr = thread_group_mgr + + self._rpc_client = None + self._rpc_server = None + + def start(self): + target = oslo_messaging.Target( + version=self.RPC_API_VERSION, + server=self.host, + topic=self.topic) + LOG.info(_LI("Starting WorkerService ...")) + + self._rpc_server = rpc_messaging.get_rpc_server(target, self) + self._rpc_server.start() + + self._rpc_client = rpc_client.WorkerClient() + + super(WorkerService, self).start() + + def stop(self): + # Stop rpc connection at first for preventing new requests + LOG.info(_LI("Stopping WorkerService ...")) + try: + self._rpc_server.stop() + self._rpc_server.wait() + except Exception as e: + LOG.error(_LE("WorkerService is failed to stop, %s"), e) + + super(WorkerService, self).stop() diff --git a/heat/rpc/worker_api.py b/heat/rpc/worker_api.py new file mode 100644 index 0000000000..8a97aaaf79 --- /dev/null +++ b/heat/rpc/worker_api.py @@ -0,0 +1,16 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +TOPIC = 'engine_worker' diff --git a/heat/rpc/worker_client.py b/heat/rpc/worker_client.py new file mode 100644 index 0000000000..24d8fd64ed --- /dev/null +++ b/heat/rpc/worker_client.py @@ -0,0 +1,49 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Client side of the heat worker RPC API. +""" + +from heat.common import messaging +from heat.rpc import worker_api + + +class WorkerClient(object): + '''Client side of the heat worker RPC API. + + API version history:: + + 1.0 - Initial version. + ''' + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self): + self._client = messaging.get_rpc_client( + topic=worker_api.TOPIC, + version=self.BASE_RPC_API_VERSION) + + @staticmethod + def make_msg(method, **kwargs): + return method, kwargs + + def cast(self, ctxt, msg, version=None): + method, kwargs = msg + if version is not None: + client = self._client.prepare(version=version) + else: + client = self._client + return client.cast(ctxt, method, **kwargs) diff --git a/heat/tests/test_engine_worker.py b/heat/tests/test_engine_worker.py new file mode 100644 index 0000000000..33720b8c38 --- /dev/null +++ b/heat/tests/test_engine_worker.py @@ -0,0 +1,82 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from heat.engine import worker +from heat.tests import common + + +class WorkerServiceTest(common.HeatTestCase): + def setUp(self): + super(WorkerServiceTest, self).setUp() + thread_gruop_mgr = mock.Mock() + self.worker = worker.WorkerService('host-1', + 'topic-1', + 'engine_id', + thread_gruop_mgr) + + def test_make_sure_rpc_version(self): + self.assertEqual( + '1.0', + worker.WorkerService.RPC_API_VERSION, + ('RPC version is changed, please update this test to new version ' + 'and make sure additional test cases are added for RPC APIs ' + 'added in new version')) + + @mock.patch('heat.common.messaging.get_rpc_server', + return_value=mock.Mock()) + @mock.patch('oslo_messaging.Target', + return_value=mock.Mock()) + @mock.patch('heat.rpc.worker_client.WorkerClient', + return_value=mock.Mock()) + def test_service_start(self, + rpc_client_class, + target_class, + rpc_server_method + ): + self.worker.start() + + # Make sure target is called with proper parameters + target_class.assert_called_once_with( + version=worker.WorkerService.RPC_API_VERSION, + server=self.worker.host, + topic=self.worker.topic) + + # Make sure rpc server creation with proper target + # and WorkerService is initialized with it + target = target_class.return_value + rpc_server_method.assert_called_once_with(target, + self.worker) + rpc_server = rpc_server_method.return_value + self.assertEqual(rpc_server, + self.worker._rpc_server, + "Failed to create RPC server") + + # Make sure rpc server is started. + rpc_server.start.assert_called_once_with() + + # Make sure rpc client is created and initialized in WorkerService + rpc_client = rpc_client_class.return_value + rpc_client_class.assert_called_once_with() + self.assertEqual(rpc_client, + self.worker._rpc_client, + "Failed to create RPC client") + + def test_service_stop(self): + with mock.patch.object(self.worker, '_rpc_server') as mock_rpc_server: + self.worker.stop() + mock_rpc_server.stop.assert_called_once_with() + mock_rpc_server.wait.assert_called_once_with() diff --git a/heat/tests/test_rpc_worker_client.py b/heat/tests/test_rpc_worker_client.py new file mode 100644 index 0000000000..75b25f1d70 --- /dev/null +++ b/heat/tests/test_rpc_worker_client.py @@ -0,0 +1,72 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from heat.rpc import worker_api as rpc_api +from heat.rpc import worker_client as rpc_client +from heat.tests import common + + +class WorkerClientTest(common.HeatTestCase): + + def setUp(self): + super(WorkerClientTest, self).setUp() + + def test_make_msg(self): + method = 'sample_method' + kwargs = {'a': '1', + 'b': '2'} + result = method, kwargs + self.assertEqual( + result, + rpc_client.WorkerClient.make_msg(method, **kwargs)) + + @mock.patch('heat.common.messaging.get_rpc_client', + return_value=mock.Mock()) + def test_cast(self, rpc_client_method): + # Mock the rpc client + mock_rpc_client = rpc_client_method.return_value + + # Create the WorkerClient + worker_client = rpc_client.WorkerClient() + rpc_client_method.assert_called_once_with( + version=rpc_client.WorkerClient.BASE_RPC_API_VERSION, + topic=rpc_api.TOPIC + ) + self.assertEqual(mock_rpc_client, + worker_client._client, + "Failed to create RPC client") + + # Check cast in default version + mock_cnxt = mock.Mock() + method = 'sample_method' + kwargs = {'a': '1', + 'b': '2'} + msg = method, kwargs + + # go with default version + worker_client.cast(mock_cnxt, msg) + mock_rpc_client.cast.assert_called_once_with(mock_cnxt, + method, + **kwargs) + + # Check cast in given version + version = '1.2' + worker_client.cast(mock_cnxt, msg, version) + mock_rpc_client.prepare.assert_called_once_with(version=version) + mock_rpc_client.cast.assert_called_once_with(mock_cnxt, + method, + **kwargs)