Copy-paste RPC Service class for backwards compatibility

blueprint oslo-messaging

Change-Id: Ie48de6d3636d6404316f19d73c7e8453298ecf14
This commit is contained in:
Ihar Hrachyshka
2014-06-02 17:44:20 +02:00
parent 13b4fed39c
commit cc86e13879
4 changed files with 63 additions and 5 deletions

View File

@@ -13,8 +13,15 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import common as rpc_common from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
from neutron.openstack.common.rpc import proxy from neutron.openstack.common.rpc import proxy
from neutron.openstack.common import service
LOG = logging.getLogger(__name__)
class RpcProxy(proxy.RpcProxy): class RpcProxy(proxy.RpcProxy):
@@ -35,6 +42,57 @@ class RpcCallback(object):
''' '''
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super(Service, self).start()
self.conn = rpc.create_connection(new=True)
LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
self.serializer)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, dispatcher, fanout=False)
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
super(Service, self).stop()
# exceptions # exceptions
RPCException = rpc_common.RPCException RPCException = rpc_common.RPCException
RemoteError = rpc_common.RemoteError RemoteError = rpc_common.RemoteError

View File

@@ -22,6 +22,7 @@ import random
from oslo.config import cfg from oslo.config import cfg
from neutron.common import config from neutron.common import config
from neutron.common import rpc_compat
from neutron import context from neutron import context
from neutron.db import api as session from neutron.db import api as session
from neutron import manager from neutron import manager
@@ -29,7 +30,6 @@ from neutron.openstack.common import excutils
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import service
from neutron.openstack.common import service as common_service from neutron.openstack.common import service as common_service
from neutron import wsgi from neutron import wsgi
@@ -178,7 +178,7 @@ def _run_wsgi(app_name):
return server return server
class Service(service.Service): class Service(rpc_compat.Service):
"""Service object for binaries running on hosts. """Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based A service takes a manager and enables rpc by listening to queues based

View File

@@ -23,8 +23,8 @@ from oslo.config import cfg
from neutron.agent.common import config from neutron.agent.common import config
from neutron.agent.linux import interface from neutron.agent.linux import interface
from neutron.common import rpc_compat
from neutron.common import topics from neutron.common import topics
from neutron.openstack.common.rpc import service as rpc_service
from neutron.openstack.common import service from neutron.openstack.common import service
from neutron.services.loadbalancer.agent import agent_manager as manager from neutron.services.loadbalancer.agent import agent_manager as manager
@@ -37,7 +37,7 @@ OPTS = [
] ]
class LbaasAgentService(rpc_service.Service): class LbaasAgentService(rpc_compat.Service):
def start(self): def start(self):
super(LbaasAgentService, self).start() super(LbaasAgentService, self).start()
self.tg.add_timer( self.tg.add_timer(

View File

@@ -27,7 +27,7 @@ from neutron.tests import base
class TestLbaasService(base.BaseTestCase): class TestLbaasService(base.BaseTestCase):
def test_start(self): def test_start(self):
with mock.patch.object( with mock.patch.object(
agent.rpc_service.Service, 'start' agent.rpc_compat.Service, 'start'
) as mock_start: ) as mock_start:
mgr = mock.Mock() mgr = mock.Mock()