Moved rpc_compat.py code back into rpc.py

Most of this code will probably stay with us for quite some time, so
let's make things easier and consider them as our way of doing RPC.

blueprint oslo-messaging

Change-Id: Iaf353b23f9c54b82d1e02a6bd5a5960cec827c88
This commit is contained in:
Ihar Hrachyshka 2014-06-09 16:09:26 +02:00
parent f7ef6ea7ec
commit b43307b768
72 changed files with 323 additions and 346 deletions

View File

@ -31,7 +31,7 @@ from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import context
@ -137,7 +137,7 @@ class DhcpAgent(manager.Manager):
% {'net_id': network.id, 'action': action})
except Exception as e:
self.schedule_resync(e)
if (isinstance(e, rpc_compat.RemoteError)
if (isinstance(e, n_rpc.RemoteError)
and e.exc_type == 'NetworkNotFound'
or isinstance(e, exceptions.NetworkNotFound)):
LOG.warning(_("Network %s has been deleted."), network.id)
@ -377,7 +377,7 @@ class DhcpAgent(manager.Manager):
pm.disable()
class DhcpPluginApi(rpc_compat.RpcProxy):
class DhcpPluginApi(n_rpc.RpcProxy):
"""Agent side of the dhcp rpc API.
API version history:

View File

@ -30,7 +30,7 @@ from neutron.agent.linux import ovs_lib # noqa
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants as l3_constants
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils as common_utils
from neutron import context
@ -54,7 +54,7 @@ RPC_LOOP_INTERVAL = 1
FLOATING_IP_CIDR_SUFFIX = '/32'
class L3PluginApi(rpc_compat.RpcProxy):
class L3PluginApi(n_rpc.RpcProxy):
"""Agent side of the l3 agent RPC API.
API version history:
@ -80,9 +80,9 @@ class L3PluginApi(rpc_compat.RpcProxy):
def get_external_network_id(self, context):
"""Make a remote process call to retrieve the external network id.
@raise rpc_compat.RemoteError: with TooManyExternalNetworks
as exc_type if there are
more than one external network
@raise n_rpc.RemoteError: with TooManyExternalNetworks as
exc_type if there are more than one
external network
"""
return self.call(context,
self.make_msg('get_external_network_id',
@ -328,7 +328,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
self.target_ex_net_id = self.plugin_rpc.get_external_network_id(
self.context)
return self.target_ex_net_id
except rpc_compat.RemoteError as e:
except n_rpc.RemoteError as e:
with excutils.save_and_reraise_exception() as ctx:
if e.exc_type == 'TooManyExternalNetworks':
ctx.reraise = False
@ -861,7 +861,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
self._process_routers(routers, all_routers=True)
self.fullsync = False
LOG.debug(_("_sync_routers_task successfully completed"))
except rpc_compat.RPCException:
except n_rpc.RPCException:
LOG.exception(_("Failed synchronizing routers due to RPC error"))
self.fullsync = True
return

View File

@ -15,7 +15,7 @@
import itertools
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.openstack.common import log as logging
@ -37,7 +37,7 @@ def create_consumers(endpoints, prefix, topic_details):
:returns: A common Connection.
"""
connection = rpc_compat.create_connection(new=True)
connection = n_rpc.create_connection(new=True)
for details in topic_details:
topic, operation, node_name = itertools.islice(
itertools.chain(details, [None]), 3)
@ -53,7 +53,7 @@ def create_consumers(endpoints, prefix, topic_details):
return connection
class PluginReportStateAPI(rpc_compat.RpcProxy):
class PluginReportStateAPI(n_rpc.RpcProxy):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
@ -71,7 +71,7 @@ class PluginReportStateAPI(rpc_compat.RpcProxy):
return self.cast(context, msg, topic=self.topic)
class PluginApi(rpc_compat.RpcProxy):
class PluginApi(n_rpc.RpcProxy):
'''Agent side of the rpc API.
API version history:

View File

@ -14,7 +14,7 @@
# limitations under the License.
from neutron.common import constants
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import manager
@ -24,7 +24,7 @@ from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class DhcpAgentNotifyAPI(rpc_compat.RpcProxy):
class DhcpAgentNotifyAPI(n_rpc.RpcProxy):
"""API for plugin to notify DHCP agent."""
BASE_RPC_API_VERSION = '1.0'
# It seems dhcp agent does not support bulk operation

View File

@ -14,7 +14,7 @@
# limitations under the License.
from neutron.common import constants
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import manager
@ -25,7 +25,7 @@ from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
class L3AgentNotifyAPI(rpc_compat.RpcProxy):
class L3AgentNotifyAPI(n_rpc.RpcProxy):
"""API for plugin to notify L3 agent."""
BASE_RPC_API_VERSION = '1.0'

View File

@ -15,7 +15,7 @@
# under the License.
from neutron.common import constants
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import manager
@ -25,7 +25,7 @@ from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
class MeteringAgentNotifyAPI(rpc_compat.RpcProxy):
class MeteringAgentNotifyAPI(n_rpc.RpcProxy):
"""API for plugin to notify L3 metering agent."""
BASE_RPC_API_VERSION = '1.0'

View File

@ -1,4 +1,5 @@
# Copyright (c) 2012 OpenStack Foundation.
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -20,6 +21,7 @@ from oslo.messaging import serializer as om_serializer
from neutron.common import exceptions
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import service
LOG = logging.getLogger(__name__)
@ -133,3 +135,144 @@ class RequestContextSerializer(om_serializer.Serializer):
tenant_id = rpc_ctxt_dict.pop('project_id', None)
return context.Context(user_id, tenant_id,
load_admin_roles=False, **rpc_ctxt_dict)
class RpcProxy(object):
'''
This class is created to facilitate migration from oslo-incubator
RPC layer implementation to oslo.messaging and is intended to
emulate RpcProxy class behaviour using oslo.messaging API once the
migration is applied.
'''
RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None):
self.topic = topic
target = messaging.Target(topic=topic, version=default_version)
self._client = get_client(target, version_cap=version_cap)
def make_msg(self, method, **kwargs):
return {'method': method,
'namespace': self.RPC_API_NAMESPACE,
'args': kwargs}
def call(self, context, msg, **kwargs):
return self.__call_rpc_method(
context, msg, rpc_method='call', **kwargs)
def cast(self, context, msg, **kwargs):
self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
def fanout_cast(self, context, msg, **kwargs):
kwargs['fanout'] = True
self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
def __call_rpc_method(self, context, msg, **kwargs):
options = dict(
((opt, kwargs[opt])
for opt in ('fanout', 'timeout', 'topic', 'version')
if kwargs.get(opt))
)
if msg['namespace']:
options['namespace'] = msg['namespace']
if options:
callee = self._client.prepare(**options)
else:
callee = self._client
func = getattr(callee, kwargs['rpc_method'])
return func(context, msg['method'], **msg['args'])
class RpcCallback(object):
'''
This class is created to facilitate migration from oslo-incubator
RPC layer implementation to oslo.messaging and is intended to set
callback version using oslo.messaging API once the migration is
applied.
'''
RPC_API_VERSION = '1.0'
def __init__(self):
super(RpcCallback, self).__init__()
self.target = messaging.Target(version=self.RPC_API_VERSION)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super(Service, self).start()
self.conn = create_connection(new=True)
LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
endpoints = [self.manager]
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, endpoints, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, endpoints, fanout=False)
self.conn.create_consumer(self.topic, endpoints, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in threads
self.conn.consume_in_threads()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
super(Service, self).stop()
class Connection(object):
def __init__(self):
super(Connection, self).__init__()
self.servers = []
def create_consumer(self, topic, endpoints, fanout=False):
target = messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = get_server(target, endpoints)
self.servers.append(server)
def consume_in_threads(self):
for server in self.servers:
server.start()
return self.servers
# functions
def create_connection(new=True):
return Connection()
# exceptions
RPCException = messaging.MessagingException
RemoteError = messaging.RemoteError
MessagingTimeout = messaging.MessagingTimeout

View File

@ -1,165 +0,0 @@
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo import messaging
from neutron.common import rpc as n_rpc
from neutron.openstack.common import log as logging
from neutron.openstack.common import service
LOG = logging.getLogger(__name__)
class RpcProxy(object):
'''
This class is created to facilitate migration from oslo-incubator
RPC layer implementation to oslo.messaging and is intended to
emulate RpcProxy class behaviour using oslo.messaging API once the
migration is applied.
'''
RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None):
self.topic = topic
target = messaging.Target(topic=topic, version=default_version)
self._client = n_rpc.get_client(target, version_cap=version_cap)
def make_msg(self, method, **kwargs):
return {'method': method,
'namespace': self.RPC_API_NAMESPACE,
'args': kwargs}
def call(self, context, msg, **kwargs):
return self.__call_rpc_method(
context, msg, rpc_method='call', **kwargs)
def cast(self, context, msg, **kwargs):
self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
def fanout_cast(self, context, msg, **kwargs):
kwargs['fanout'] = True
self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
def __call_rpc_method(self, context, msg, **kwargs):
options = dict(
((opt, kwargs[opt])
for opt in ('fanout', 'timeout', 'topic', 'version')
if kwargs.get(opt))
)
if msg['namespace']:
options['namespace'] = msg['namespace']
if options:
callee = self._client.prepare(**options)
else:
callee = self._client
func = getattr(callee, kwargs['rpc_method'])
return func(context, msg['method'], **msg['args'])
class RpcCallback(object):
'''
This class is created to facilitate migration from oslo-incubator
RPC layer implementation to oslo.messaging and is intended to set
callback version using oslo.messaging API once the migration is
applied.
'''
RPC_API_VERSION = '1.0'
def __init__(self):
super(RpcCallback, self).__init__()
self.target = messaging.Target(version=self.RPC_API_VERSION)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super(Service, self).start()
self.conn = create_connection(new=True)
LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
endpoints = [self.manager]
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, endpoints, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, endpoints, fanout=False)
self.conn.create_consumer(self.topic, endpoints, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in threads
self.conn.consume_in_threads()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
super(Service, self).stop()
class Connection(object):
def __init__(self):
super(Connection, self).__init__()
self.servers = []
def create_consumer(self, topic, endpoints, fanout=False):
target = messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = n_rpc.get_server(target, endpoints)
self.servers.append(server)
def consume_in_threads(self):
for server in self.servers:
server.start()
return self.servers
# functions
def create_connection(new=True):
return Connection()
# exceptions
RPCException = messaging.MessagingException
RemoteError = messaging.RemoteError
MessagingTimeout = messaging.MessagingTimeout

View File

@ -19,7 +19,7 @@ from oslo.config import cfg
import sqlalchemy as sa
from sqlalchemy.orm import exc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.db import model_base
from neutron.db import models_v2
from neutron.extensions import agent as ext_agent
@ -196,7 +196,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
return self._create_or_update_agent(context, agent)
class AgentExtRpcCallback(rpc_compat.RpcCallback):
class AgentExtRpcCallback(n_rpc.RpcCallback):
"""Processes the rpc report in plugin implementations."""
RPC_API_VERSION = '1.0'

View File

@ -17,7 +17,7 @@ import weakref
from oslo.config import cfg
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import utils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
@ -30,7 +30,7 @@ from stevedore import driver
LOG = logging.getLogger(__name__)
class Manager(rpc_compat.RpcCallback, periodic_task.PeriodicTasks):
class Manager(n_rpc.RpcCallback, periodic_task.PeriodicTasks):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'

View File

@ -30,7 +30,7 @@ from neutron.agent.linux import utils
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron import context as q_context
from neutron.extensions import securitygroup as ext_sg
@ -84,7 +84,7 @@ class SecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
self.init_firewall()
class RestProxyAgent(rpc_compat.RpcCallback,
class RestProxyAgent(n_rpc.RpcCallback,
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
RPC_API_VERSION = '1.1'

View File

@ -56,7 +56,7 @@ from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.common import constants as const
from neutron.common import exceptions
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import context as qcontext
@ -94,7 +94,7 @@ SYNTAX_ERROR_MESSAGE = _('Syntax error in server config file, aborting plugin')
METADATA_SERVER_IP = '169.254.169.254'
class AgentNotifierApi(rpc_compat.RpcProxy,
class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
BASE_RPC_API_VERSION = '1.1'
@ -112,7 +112,7 @@ class AgentNotifierApi(rpc_compat.RpcProxy,
topic=self.topic_port_update)
class RestProxyCallbacks(rpc_compat.RpcCallback,
class RestProxyCallbacks(n_rpc.RpcCallback,
sg_rpc_base.SecurityGroupServerRpcCallbackMixin,
dhcp_rpc_base.DhcpRpcCallbackMixin):
@ -493,7 +493,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
LOG.debug(_("NeutronRestProxyV2: initialization done"))
def _setup_rpc(self):
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.topic = topics.PLUGIN
self.notifier = AgentNotifierApi(topics.AGENT)
# init dhcp agent support

View File

@ -29,7 +29,7 @@ from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron.db import agents_db
@ -77,7 +77,7 @@ cfg.CONF.register_opts(SWITCH_OPTS, "SWITCH")
cfg.CONF.register_opts(PHYSICAL_INTERFACE_OPTS, "PHYSICAL_INTERFACE")
class BridgeRpcCallbacks(rpc_compat.RpcCallback,
class BridgeRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
@ -154,7 +154,7 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback,
return entry
class AgentNotifierApi(rpc_compat.RpcProxy,
class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
"""Agent side of the linux bridge rpc API.
@ -251,7 +251,7 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.rpc_context = context.RequestContext('neutron', 'neutron',
is_admin=False)
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [BridgeRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():

View File

@ -26,7 +26,7 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron.db import agents_db
@ -57,7 +57,7 @@ from neutron.plugins.common import constants as svc_constants
LOG = logging.getLogger(__name__)
class N1kvRpcCallbacks(rpc_compat.RpcCallback,
class N1kvRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin):
@ -124,7 +124,7 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [N1kvRpcCallbacks(), agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)

View File

@ -31,7 +31,7 @@ from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants as n_const
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron import context
from neutron.openstack.common import log as logging
@ -79,7 +79,7 @@ CONF.register_opts(agent_opts, "AGENT")
config.register_agent_state_opts_helper(cfg.CONF)
class HyperVSecurityAgent(rpc_compat.RpcCallback,
class HyperVSecurityAgent(n_rpc.RpcCallback,
sg_rpc.SecurityGroupAgentRpcMixin):
# Set RPC API version to 1.1 by default.
RPC_API_VERSION = '1.1'
@ -103,7 +103,7 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback,
consumers)
class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
class HyperVSecurityCallbackMixin(n_rpc.RpcCallback,
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
# Set RPC API version to 1.1 by default.
RPC_API_VERSION = '1.1'
@ -118,7 +118,7 @@ class HyperVPluginApi(agent_rpc.PluginApi,
pass
class HyperVNeutronAgent(rpc_compat.RpcCallback):
class HyperVNeutronAgent(n_rpc.RpcCallback):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'

View File

@ -14,7 +14,7 @@
# under the License.
# @author: Alessandro Pilotti, Cloudbase Solutions Srl
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.openstack.common import log as logging
from neutron.plugins.hyperv.common import constants
@ -22,7 +22,7 @@ from neutron.plugins.hyperv.common import constants
LOG = logging.getLogger(__name__)
class AgentNotifierApi(rpc_compat.RpcProxy):
class AgentNotifierApi(n_rpc.RpcProxy):
'''Agent side of the openvswitch rpc API.
API version history:

View File

@ -18,7 +18,7 @@ from oslo.config import cfg
from neutron.api.v2 import attributes
from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2
@ -185,7 +185,7 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.notifier = agent_notifier_api.AgentNotifierApi(
topics.AGENT)
self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier),

View File

@ -15,7 +15,7 @@
# @author: Alessandro Pilotti, Cloudbase Solutions Srl
from neutron.common import constants as q_const
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.db import dhcp_rpc_base
from neutron.db import l3_rpc_base
from neutron.openstack.common import log as logging
@ -26,7 +26,7 @@ LOG = logging.getLogger(__name__)
class HyperVRpcCallbacks(
rpc_compat.RpcCallback,
n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin):

View File

@ -31,7 +31,7 @@ from neutron.agent.linux import ovs_lib
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants as n_const
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils as n_utils
from neutron import context
@ -52,7 +52,7 @@ class SdnvePluginApi(agent_rpc.PluginApi):
topic=self.topic)
class SdnveNeutronAgent(rpc_compat.RpcCallback):
class SdnveNeutronAgent(n_rpc.RpcCallback):
RPC_API_VERSION = '1.1'

View File

@ -23,7 +23,7 @@ from oslo.config import cfg
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2
@ -56,7 +56,7 @@ class SdnveRpcCallbacks():
return info
class AgentNotifierApi(rpc_compat.RpcProxy):
class AgentNotifierApi(n_rpc.RpcProxy):
'''Agent side of the SDN-VE rpc API.'''
BASE_RPC_API_VERSION = '1.0'
@ -131,7 +131,7 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.endpoints = [SdnveRpcCallbacks(self.notifier),
agents_db.AgentExtRpcCallback()]

View File

@ -37,7 +37,7 @@ from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils as q_utils
from neutron import context
@ -644,7 +644,7 @@ class LinuxBridgeManager:
self.remove_fdb_bridge_entry(mac, agent_ip, interface)
class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
sg_rpc.SecurityGroupAgentRpcCallbackMixin,
l2pop_rpc.L2populationRpcCallBackMixin):

View File

@ -23,7 +23,7 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron.db import agents_db
@ -53,7 +53,7 @@ from neutron.plugins.linuxbridge.db import l2network_db_v2 as db
LOG = logging.getLogger(__name__)
class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin
@ -152,7 +152,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
LOG.debug(_("%s can not be found in database"), device)
class AgentNotifierApi(rpc_compat.RpcProxy,
class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
'''Agent side of the linux bridge rpc API.
@ -272,7 +272,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [LinuxBridgeRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():

View File

@ -27,7 +27,7 @@ from sqlalchemy.orm import exc as sa_exc
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import agents_db
from neutron.db import agentschedulers_db
@ -174,7 +174,7 @@ def _check_resource_exists(func, id, name, raise_exc=False):
raise MidonetPluginException(msg=exc)
class MidoRpcCallbacks(rpc_compat.RpcCallback,
class MidoRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin):
RPC_API_VERSION = '1.1'
@ -369,7 +369,7 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [MidoRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,

View File

@ -17,7 +17,7 @@
# @author: Francois Eleouet, Orange
# @author: Mathieu Rohon, Orange
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.openstack.common import log as logging
@ -25,7 +25,7 @@ from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class L2populationAgentNotifyAPI(rpc_compat.RpcProxy):
class L2populationAgentNotifyAPI(n_rpc.RpcProxy):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.AGENT):

View File

@ -23,7 +23,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions as exc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import agents_db
from neutron.db import agentschedulers_db
@ -130,7 +130,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
agents_db.AgentExtRpcCallback()]
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
return self.conn.consume_in_threads()

View File

@ -17,7 +17,7 @@ from oslo import messaging
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as q_const
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
@ -46,7 +46,7 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
# 1.0 Initial version (from openvswitch/linuxbridge)
# 1.1 Support Security Group RPC
# FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due to
# FIXME(ihrachys): we can't use n_rpc.RpcCallback here due to
# inheritance problems
target = messaging.Target(version=RPC_API_VERSION)
@ -198,7 +198,7 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
q_const.PORT_STATUS_ACTIVE)
class AgentNotifierApi(rpc_compat.RpcProxy,
class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
"""Agent side of the openvswitch rpc API.

View File

@ -27,7 +27,7 @@ from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants as q_constants
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils as q_utils
from neutron import context
@ -143,7 +143,7 @@ class EswitchManager(object):
self.network_map[network_id] = data
class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback,
class MlnxEswitchRpcCallbacks(n_rpc.RpcCallback,
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
# Set RPC API version to 1.0 by default.
@ -203,7 +203,7 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback,
port['mac_address'],
self.agent.agent_id,
cfg.CONF.host)
except rpc_compat.MessagingTimeout:
except n_rpc.MessagingTimeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
else:
LOG.debug(_("No port %s defined on agent."), port['id'])

View File

@ -15,14 +15,14 @@
from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class AgentNotifierApi(rpc_compat.RpcProxy,
class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
"""Agent side of the Embedded Switch RPC API.

View File

@ -23,7 +23,7 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron.db import agents_db
@ -118,7 +118,7 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():

View File

@ -15,7 +15,7 @@
from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
from neutron.db import l3_rpc_base
@ -26,7 +26,7 @@ from neutron.plugins.mlnx.db import mlnx_db_v2 as db
LOG = logging.getLogger(__name__)
class MlnxRpcCallbacks(rpc_compat.RpcCallback,
class MlnxRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):

View File

@ -32,7 +32,7 @@ from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants as q_const
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron import context as q_context
from neutron.extensions import securitygroup as ext_sg
@ -62,7 +62,7 @@ class NECPluginApi(agent_rpc.PluginApi):
port_removed=port_removed))
class NECAgentRpcCallback(rpc_compat.RpcCallback):
class NECAgentRpcCallback(n_rpc.RpcCallback):
RPC_API_VERSION = '1.0'
@ -84,7 +84,7 @@ class NECAgentRpcCallback(rpc_compat.RpcCallback):
self.sg_agent.refresh_firewall()
class SecurityGroupServerRpcApi(rpc_compat.RpcProxy,
class SecurityGroupServerRpcApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupServerRpcApiMixin):
def __init__(self, topic):
@ -93,7 +93,7 @@ class SecurityGroupServerRpcApi(rpc_compat.RpcProxy,
class SecurityGroupAgentRpcCallback(
rpc_compat.RpcCallback,
n_rpc.RpcCallback,
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION

View File

@ -20,7 +20,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.v2 import attributes as attrs
from neutron.common import constants as const
from neutron.common import exceptions as n_exc
from neutron.common import rpc_compat
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import agents_db
from neutron.db import agentschedulers_db
@ -133,7 +133,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
def setup_rpc(self):
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.conn = n_rpc.create_connection(new=True)
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
@ -657,7 +657,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.notify_security_groups_member_updated(context, port)
class NECPluginV2AgentNotifierApi(rpc_compat.RpcProxy,
class NECPluginV2AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
'''RPC API for NEC plugin agent.'''
@ -676,20 +676,20 @@ class NECPluginV2AgentNotifierApi(rpc_compat.RpcProxy,
topic=self.topic_port_update)
class DhcpRpcCallback(rpc_compat.RpcCallback,
class DhcpRpcCallback(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin):
# DhcpPluginApi BASE_RPC_API_VERSION
RPC_API_VERSION = '1.1'
class L3RpcCallback(rpc_compat.RpcCallback, l3_rpc_base.L3RpcCallbackMixin):
class L3RpcCallback(n_rpc.RpcCallback, l3_rpc_base.L3RpcCallbackMixin):
# 1.0 L3PluginApi BASE_RPC_API_VERSION
# 1.1 Support update_floatingip_statuses
RPC_API_VERSION = '1.1'
class SecurityGroupServerRpcCallback(
rpc_compat.RpcCallback,
n_rpc.RpcCallback,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
@ -705,7 +705,7 @@ class SecurityGroupServerRpcCallback(
return port
class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback):
class NECPluginV2RPCCallbacks(n_rpc.RpcCallback):
RPC_API_VERSION = '1.0'