Convergence message bus

Provides the convergence message bus for heat-engine.

Implements: blueprint convergence-message-bus

Change-Id: I19b9edc9f17c881b542926783c8918e536d12ec5
This commit is contained in:
Kanagaraj Manickam 2015-02-20 10:22:36 +05:30
parent 6d106dee26
commit cb66169200
6 changed files with 320 additions and 0 deletions

View File

@ -52,15 +52,18 @@ from heat.engine import stack as parser
from heat.engine import stack_lock
from heat.engine import template as templatem
from heat.engine import watchrule
from heat.engine import worker
from heat.openstack.common import service
from heat.openstack.common import threadgroup
from heat.rpc import api as rpc_api
from heat.rpc import worker_api as rpc_worker_api
cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
cfg.CONF.import_opt('max_resources_per_stack', 'heat.common.config')
cfg.CONF.import_opt('max_stacks_per_tenant', 'heat.common.config')
cfg.CONF.import_opt('enable_stack_abandon', 'heat.common.config')
cfg.CONF.import_opt('enable_stack_adopt', 'heat.common.config')
cfg.CONF.import_opt('convergence_engine', 'heat.common.config')
LOG = logging.getLogger(__name__)
@ -271,6 +274,7 @@ class EngineService(service.Service):
# happens after the fork when spawning multiple worker processes
self.stack_watch = None
self.listener = None
self.worker_service = None
self.engine_id = None
self.thread_group_mgr = None
self.target = None
@ -313,9 +317,22 @@ class EngineService(service.Service):
self.thread_group_mgr)
LOG.debug("Starting listener for engine %s" % self.engine_id)
self.listener.start()
if cfg.CONF.convergence_engine:
self.worker_service = worker.WorkerService(
host=self.host,
topic=rpc_worker_api.TOPIC,
engine_id=self.engine_id,
thread_group_mgr=self.thread_group_mgr
)
self.worker_service.start()
LOG.debug("WorkerService is started in engine %s" %
self.engine_id)
target = messaging.Target(
version=self.RPC_API_VERSION, server=self.host,
topic=self.topic)
self.target = target
self._rpc_server = rpc_messaging.get_rpc_server(target, self)
self._rpc_server.start()
@ -341,6 +358,12 @@ class EngineService(service.Service):
def stop(self):
self._stop_rpc_server()
if cfg.CONF.convergence_engine:
# Stop the WorkerService
self.worker_service.stop()
LOG.info(_LI("WorkerService is stopped in engine %s"),
self.engine_id)
# Wait for all active threads to be finished
for stack_id in self.thread_group_mgr.groups.keys():
# Ignore dummy service task

78
heat/engine/worker.py Normal file
View File

@ -0,0 +1,78 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
import oslo_messaging
from osprofiler import profiler
from heat.common.i18n import _LE
from heat.common.i18n import _LI
from heat.common import messaging as rpc_messaging
from heat.openstack.common import service
from heat.rpc import worker_client as rpc_client
LOG = logging.getLogger(__name__)
@profiler.trace_cls("rpc")
class WorkerService(service.Service):
"""
This service is dedicated to handle internal messages to the 'worker'
(a.k.a. 'converger') actor in convergence. Messages on this bus will
use the 'cast' rather than 'call' method to anycast the message to
an engine that will handle it asynchronously. It won't wait for
or expect replies from these messages.
"""
RPC_API_VERSION = '1.0'
def __init__(self,
host,
topic,
engine_id,
thread_group_mgr):
super(WorkerService, self).__init__()
self.host = host
self.topic = topic
self.engine_id = engine_id
self.thread_group_mgr = thread_group_mgr
self._rpc_client = None
self._rpc_server = None
def start(self):
target = oslo_messaging.Target(
version=self.RPC_API_VERSION,
server=self.host,
topic=self.topic)
LOG.info(_LI("Starting WorkerService ..."))
self._rpc_server = rpc_messaging.get_rpc_server(target, self)
self._rpc_server.start()
self._rpc_client = rpc_client.WorkerClient()
super(WorkerService, self).start()
def stop(self):
# Stop rpc connection at first for preventing new requests
LOG.info(_LI("Stopping WorkerService ..."))
try:
self._rpc_server.stop()
self._rpc_server.wait()
except Exception as e:
LOG.error(_LE("WorkerService is failed to stop, %s"), e)
super(WorkerService, self).stop()

16
heat/rpc/worker_api.py Normal file
View File

@ -0,0 +1,16 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
TOPIC = 'engine_worker'

49
heat/rpc/worker_client.py Normal file
View File

@ -0,0 +1,49 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Client side of the heat worker RPC API.
"""
from heat.common import messaging
from heat.rpc import worker_api
class WorkerClient(object):
'''Client side of the heat worker RPC API.
API version history::
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self):
self._client = messaging.get_rpc_client(
topic=worker_api.TOPIC,
version=self.BASE_RPC_API_VERSION)
@staticmethod
def make_msg(method, **kwargs):
return method, kwargs
def cast(self, ctxt, msg, version=None):
method, kwargs = msg
if version is not None:
client = self._client.prepare(version=version)
else:
client = self._client
return client.cast(ctxt, method, **kwargs)

View File

@ -0,0 +1,82 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from heat.engine import worker
from heat.tests import common
class WorkerServiceTest(common.HeatTestCase):
def setUp(self):
super(WorkerServiceTest, self).setUp()
thread_gruop_mgr = mock.Mock()
self.worker = worker.WorkerService('host-1',
'topic-1',
'engine_id',
thread_gruop_mgr)
def test_make_sure_rpc_version(self):
self.assertEqual(
'1.0',
worker.WorkerService.RPC_API_VERSION,
('RPC version is changed, please update this test to new version '
'and make sure additional test cases are added for RPC APIs '
'added in new version'))
@mock.patch('heat.common.messaging.get_rpc_server',
return_value=mock.Mock())
@mock.patch('oslo_messaging.Target',
return_value=mock.Mock())
@mock.patch('heat.rpc.worker_client.WorkerClient',
return_value=mock.Mock())
def test_service_start(self,
rpc_client_class,
target_class,
rpc_server_method
):
self.worker.start()
# Make sure target is called with proper parameters
target_class.assert_called_once_with(
version=worker.WorkerService.RPC_API_VERSION,
server=self.worker.host,
topic=self.worker.topic)
# Make sure rpc server creation with proper target
# and WorkerService is initialized with it
target = target_class.return_value
rpc_server_method.assert_called_once_with(target,
self.worker)
rpc_server = rpc_server_method.return_value
self.assertEqual(rpc_server,
self.worker._rpc_server,
"Failed to create RPC server")
# Make sure rpc server is started.
rpc_server.start.assert_called_once_with()
# Make sure rpc client is created and initialized in WorkerService
rpc_client = rpc_client_class.return_value
rpc_client_class.assert_called_once_with()
self.assertEqual(rpc_client,
self.worker._rpc_client,
"Failed to create RPC client")
def test_service_stop(self):
with mock.patch.object(self.worker, '_rpc_server') as mock_rpc_server:
self.worker.stop()
mock_rpc_server.stop.assert_called_once_with()
mock_rpc_server.wait.assert_called_once_with()

View File

@ -0,0 +1,72 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from heat.rpc import worker_api as rpc_api
from heat.rpc import worker_client as rpc_client
from heat.tests import common
class WorkerClientTest(common.HeatTestCase):
def setUp(self):
super(WorkerClientTest, self).setUp()
def test_make_msg(self):
method = 'sample_method'
kwargs = {'a': '1',
'b': '2'}
result = method, kwargs
self.assertEqual(
result,
rpc_client.WorkerClient.make_msg(method, **kwargs))
@mock.patch('heat.common.messaging.get_rpc_client',
return_value=mock.Mock())
def test_cast(self, rpc_client_method):
# Mock the rpc client
mock_rpc_client = rpc_client_method.return_value
# Create the WorkerClient
worker_client = rpc_client.WorkerClient()
rpc_client_method.assert_called_once_with(
version=rpc_client.WorkerClient.BASE_RPC_API_VERSION,
topic=rpc_api.TOPIC
)
self.assertEqual(mock_rpc_client,
worker_client._client,
"Failed to create RPC client")
# Check cast in default version
mock_cnxt = mock.Mock()
method = 'sample_method'
kwargs = {'a': '1',
'b': '2'}
msg = method, kwargs
# go with default version
worker_client.cast(mock_cnxt, msg)
mock_rpc_client.cast.assert_called_once_with(mock_cnxt,
method,
**kwargs)
# Check cast in given version
version = '1.2'
worker_client.cast(mock_cnxt, msg, version)
mock_rpc_client.prepare.assert_called_once_with(version=version)
mock_rpc_client.cast.assert_called_once_with(mock_cnxt,
method,
**kwargs)