Removed create_rpc_dispatcher methods
Now that we don't have a special dispatcher class and we pass a list of endpoints to corresponding functions instead, those methods are unneeded. blueprint oslo-messaging Change-Id: If2b187fd8e553594212264f34b51b5b99c4630b2
This commit is contained in:
parent
4482f46690
commit
b282574048
@ -108,15 +108,15 @@ class Service(service.Service):
|
||||
LOG.debug("Creating Consumer connection for Service %s" %
|
||||
self.topic)
|
||||
|
||||
dispatcher = [self.manager]
|
||||
endpoints = [self.manager]
|
||||
|
||||
# Share this same connection for these Consumers
|
||||
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
|
||||
self.conn.create_consumer(self.topic, endpoints, fanout=False)
|
||||
|
||||
node_topic = '%s.%s' % (self.topic, self.host)
|
||||
self.conn.create_consumer(node_topic, dispatcher, fanout=False)
|
||||
self.conn.create_consumer(node_topic, endpoints, fanout=False)
|
||||
|
||||
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
|
||||
self.conn.create_consumer(self.topic, endpoints, fanout=True)
|
||||
|
||||
# Hook to allow the manager to do other initializations after
|
||||
# the rpc connection is created.
|
||||
|
@ -30,9 +30,6 @@ class MeteringRpcCallbacks(object):
|
||||
def __init__(self, meter_plugin):
|
||||
self.meter_plugin = meter_plugin
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def get_sync_data_metering(self, context, **kwargs):
|
||||
l3_plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
|
@ -105,10 +105,10 @@ class RestProxyAgent(rpc_compat.RpcCallback,
|
||||
self.topic = topics.AGENT
|
||||
self.plugin_rpc = PluginApi(topics.PLUGIN)
|
||||
self.context = q_context.get_admin_context_without_session()
|
||||
self.dispatcher = [self]
|
||||
self.endpoints = [self]
|
||||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.SECURITY_GROUP, topics.UPDATE]]
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
|
||||
|
@ -119,9 +119,6 @@ class RestProxyCallbacks(rpc_compat.RpcCallback,
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
def get_port_from_device(self, device):
|
||||
port_id = re.sub(r"^tap", "", device)
|
||||
port = self.get_port_and_sgs(port_id)
|
||||
@ -505,9 +502,9 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
|
||||
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
|
||||
self._dhcp_agent_notifier
|
||||
)
|
||||
self.callbacks = RestProxyCallbacks()
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
self.endpoints = [RestProxyCallbacks(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
self.conn.create_consumer(self.topic, self.endpoints,
|
||||
fanout=False)
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
@ -91,14 +91,6 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback,
|
||||
# 1.1 Support Security Group RPC
|
||||
TAP_PREFIX_LEN = 3
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
"""Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
"""
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
@classmethod
|
||||
def get_port_from_device(cls, device):
|
||||
"""Get port from the brocade specific db."""
|
||||
@ -262,10 +254,10 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.rpc_context = context.RequestContext('neutron', 'neutron',
|
||||
is_admin=False)
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.callbacks = BridgeRpcCallbacks()
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.endpoints = [BridgeRpcCallbacks(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
|
@ -68,14 +68,6 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback,
|
||||
# Set RPC API version to 1.1 by default.
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
"""Get the rpc dispatcher for this rpc manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
"""
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
|
||||
class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
external_net_db.External_net_db_mixin,
|
||||
@ -135,9 +127,9 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
|
||||
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.dispatcher = N1kvRpcCallbacks().create_rpc_dispatcher()
|
||||
self.endpoints = [N1kvRpcCallbacks(), agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||
# Consume from all consumers in threads
|
||||
|
@ -97,16 +97,13 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback,
|
||||
|
||||
def _setup_rpc(self):
|
||||
self.topic = topics.AGENT
|
||||
self.dispatcher = self._create_rpc_dispatcher()
|
||||
self.endpoints = [HyperVSecurityCallbackMixin(self)]
|
||||
consumers = [[topics.SECURITY_GROUP, topics.UPDATE]]
|
||||
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
|
||||
def _create_rpc_dispatcher(self):
|
||||
return [HyperVSecurityCallbackMixin(self)]
|
||||
|
||||
|
||||
class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
|
||||
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
|
||||
@ -165,13 +162,13 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
|
||||
# RPC network init
|
||||
self.context = context.get_admin_context_without_session()
|
||||
# Handle updates from service
|
||||
self.dispatcher = self._create_rpc_dispatcher()
|
||||
self.endpoints = [self]
|
||||
# Define the listening consumers for the agent
|
||||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.NETWORK, topics.DELETE],
|
||||
[topics.PORT, topics.DELETE],
|
||||
[constants.TUNNEL, topics.UPDATE]]
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
|
||||
@ -233,9 +230,6 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
|
||||
network_type, physical_network,
|
||||
segmentation_id, port['admin_state_up'])
|
||||
|
||||
def _create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def _get_vswitch_name(self, network_type, physical_network):
|
||||
if network_type != p_const.TYPE_LOCAL:
|
||||
vswitch_name = self._get_vswitch_for_physical_network(
|
||||
|
@ -190,10 +190,10 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.notifier = agent_notifier_api.AgentNotifierApi(
|
||||
topics.AGENT)
|
||||
self.callbacks = rpc_callbacks.HyperVRpcCallbacks(self.notifier)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
|
@ -18,7 +18,6 @@
|
||||
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import l3_rpc_base
|
||||
from neutron.openstack.common import log as logging
|
||||
@ -41,14 +40,6 @@ class HyperVRpcCallbacks(
|
||||
self.notifier = notifier
|
||||
self._db = hyperv_db.HyperVPluginDB()
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
'''
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
def get_device_details(self, rpc_context, **kwargs):
|
||||
"""Agent requests device details."""
|
||||
agent_id = kwargs.get('agent_id')
|
||||
|
@ -123,10 +123,10 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
|
||||
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
|
||||
|
||||
self.context = context.get_admin_context_without_session()
|
||||
self.dispatcher = self.create_rpc_dispatcher()
|
||||
self.endpoints = [self]
|
||||
consumers = [[constants.INFO, topics.UPDATE]]
|
||||
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
if self.polling_interval:
|
||||
@ -154,9 +154,6 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
|
||||
"connection-mode",
|
||||
"out-of-band")
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def setup_integration_br(self, bridge_name, reset_br, out_of_band,
|
||||
controller_ip=None):
|
||||
'''Sets up the integration bridge.
|
||||
|
@ -48,13 +48,6 @@ class SdnveRpcCallbacks():
|
||||
def __init__(self, notifier):
|
||||
self.notifier = notifier # used to notify the agent
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
'''
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
def sdnve_info(self, rpc_context, **kwargs):
|
||||
'''Update new information.'''
|
||||
info = kwargs.get('info')
|
||||
@ -140,9 +133,9 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
self.callbacks = SdnveRpcCallbacks(self.notifier)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
self.endpoints = [SdnveRpcCallbacks(self.notifier),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
self.conn.create_consumer(self.topic, self.endpoints,
|
||||
fanout=False)
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
@ -809,14 +809,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
|
||||
|
||||
getattr(self, method)(context, values)
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
'''
|
||||
return [self]
|
||||
|
||||
|
||||
class LinuxBridgePluginApi(agent_rpc.PluginApi,
|
||||
sg_rpc.SecurityGroupServerRpcApiMixin):
|
||||
@ -876,9 +868,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
|
||||
# RPC network init
|
||||
self.context = context.get_admin_context_without_session()
|
||||
# Handle updates from service
|
||||
self.callbacks = LinuxBridgeRpcCallbacks(self.context,
|
||||
self)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self)]
|
||||
# Define the listening consumers for the agent
|
||||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.NETWORK, topics.DELETE],
|
||||
@ -886,7 +876,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
|
||||
if cfg.CONF.VXLAN.l2_population:
|
||||
consumers.append([topics.L2POPULATION,
|
||||
topics.UPDATE, cfg.CONF.host])
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
report_interval = cfg.CONF.AGENT.report_interval
|
||||
|
@ -65,14 +65,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
|
||||
# Device names start with "tap"
|
||||
TAP_PREFIX_LEN = 3
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
'''
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
@classmethod
|
||||
def get_port_from_device(cls, device):
|
||||
port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
|
||||
@ -281,10 +273,10 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
|
||||
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.callbacks = LinuxBridgeRpcCallbacks()
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.endpoints = [LinuxBridgeRpcCallbacks(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
|
@ -180,16 +180,6 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback,
|
||||
dhcp_rpc_base.DhcpRpcCallbackMixin):
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
"""Get the rpc dispatcher for this manager.
|
||||
|
||||
This a basic implementation that will call the plugin like get_ports
|
||||
and handle basic events
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
"""
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
|
||||
class MidonetPluginException(n_exc.NeutronException):
|
||||
message = _("%(msg)s")
|
||||
@ -382,9 +372,9 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
# RPC support
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.callbacks = MidoRpcCallbacks()
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
self.endpoints = [MidoRpcCallbacks(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
self.conn.create_consumer(self.topic, self.endpoints,
|
||||
fanout=False)
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
@ -25,6 +25,7 @@ from neutron.common import constants as const
|
||||
from neutron.common import exceptions as exc
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.common import topics
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db import allowedaddresspairs_db as addr_pair_db
|
||||
from neutron.db import db_base_plugin_v2
|
||||
@ -126,11 +127,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
)
|
||||
|
||||
def start_rpc_listeners(self):
|
||||
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
|
||||
self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
self.conn.create_consumer(self.topic, self.endpoints,
|
||||
fanout=False)
|
||||
return self.conn.consume_in_threads()
|
||||
|
||||
|
@ -19,7 +19,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.common import topics
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
|
||||
@ -58,14 +57,6 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
|
||||
# test in H3.
|
||||
super(RpcCallbacks, self).__init__(notifier, type_manager)
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
'''
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
@classmethod
|
||||
def _device_to_port_id(cls, device):
|
||||
# REVISIT(rkukura): Consider calling into MechanismDrivers to
|
||||
|
@ -210,15 +210,6 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback,
|
||||
else:
|
||||
LOG.debug(_("No port %s defined on agent."), port['id'])
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
"""Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version,
|
||||
or support more than one class as the target of rpc messages,
|
||||
override this method.
|
||||
"""
|
||||
return [self]
|
||||
|
||||
|
||||
class MlnxEswitchPluginApi(agent_rpc.PluginApi,
|
||||
sg_rpc.SecurityGroupServerRpcApiMixin):
|
||||
@ -268,14 +259,12 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
|
||||
# RPC network init
|
||||
self.context = context.get_admin_context_without_session()
|
||||
# Handle updates from service
|
||||
self.callbacks = MlnxEswitchRpcCallbacks(self.context,
|
||||
self)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.endpoints = [MlnxEswitchRpcCallbacks(self.context, self)]
|
||||
# Define the listening consumers for the agent
|
||||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.NETWORK, topics.DELETE],
|
||||
[topics.SECURITY_GROUP, topics.UPDATE]]
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
|
||||
|
@ -28,6 +28,7 @@ from neutron.common import exceptions as n_exc
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import external_net_db
|
||||
@ -120,10 +121,10 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
|
||||
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.callbacks = rpc_callbacks.MlnxRpcCallbacks()
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
|
||||
|
@ -18,7 +18,6 @@ from oslo.config import cfg
|
||||
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import l3_rpc_base
|
||||
@ -40,15 +39,6 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback,
|
||||
#to be compatible with Linux Bridge Agent on Network Node
|
||||
TAP_PREFIX_LEN = 3
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
"""Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an RPC API version,
|
||||
or support more than one class as the target of RPC messages,
|
||||
override this method.
|
||||
"""
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
@classmethod
|
||||
def get_port_from_device(cls, device):
|
||||
"""Get port according to device.
|
||||
|
@ -156,11 +156,11 @@ class NECNeutronAgent(object):
|
||||
self, self.sg_agent)
|
||||
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
|
||||
self.sg_agent)
|
||||
self.dispatcher = [self.callback_nec, self.callback_sg]
|
||||
self.endpoints = [self.callback_nec, self.callback_sg]
|
||||
# Define the listening consumer for the agent
|
||||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.SECURITY_GROUP, topics.UPDATE]]
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
|
||||
|
@ -146,14 +146,14 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
|
||||
# NOTE: callback_sg is referred to from the sg unit test.
|
||||
self.callback_sg = SecurityGroupServerRpcCallback()
|
||||
self.dispatcher = [
|
||||
self.endpoints = [
|
||||
NECPluginV2RPCCallbacks(self.safe_reference),
|
||||
DhcpRpcCallback(),
|
||||
L3RpcCallback(),
|
||||
self.callback_sg,
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
@ -715,14 +715,6 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback):
|
||||
super(NECPluginV2RPCCallbacks, self).__init__()
|
||||
self.plugin = plugin
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
'''
|
||||
return [self]
|
||||
|
||||
def update_ports(self, rpc_context, **kwargs):
|
||||
"""Update ports' information and activate/deavtivate them.
|
||||
|
||||
|
@ -286,13 +286,13 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
|
||||
# RPC network init
|
||||
self.context = context.get_admin_context_without_session()
|
||||
# Handle updates from service
|
||||
self.dispatcher = self.create_rpc_dispatcher()
|
||||
self.endpoints = [self]
|
||||
# Define the listening consumers for the agent
|
||||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.NETWORK, topics.DELETE],
|
||||
[constants.TUNNEL, topics.UPDATE],
|
||||
[topics.SECURITY_GROUP, topics.UPDATE]]
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
report_interval = cfg.CONF.AGENT.report_interval
|
||||
@ -344,14 +344,6 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
|
||||
return
|
||||
self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
"""Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
"""
|
||||
return [self]
|
||||
|
||||
def _provision_local_vlan_outbound_for_tunnel(self, lvid,
|
||||
segmentation_id, ofports):
|
||||
br = self.tun_br
|
||||
|
@ -119,11 +119,11 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback):
|
||||
self, self.sg_agent)
|
||||
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
|
||||
self.sg_agent)
|
||||
self.dispatcher = [self.callback_oc, self.callback_sg]
|
||||
self.endpoints = [self.callback_oc, self.callback_sg]
|
||||
# Define the listening consumer for the agent
|
||||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.SECURITY_GROUP, topics.UPDATE]]
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
|
||||
|
@ -58,10 +58,6 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback,
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
"""Get the rpc dispatcher for this manager."""
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
@staticmethod
|
||||
def get_port_from_device(device):
|
||||
port = nvsd_db.get_port_from_device(device)
|
||||
@ -165,10 +161,10 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||
l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||
)
|
||||
self.callbacks = NVSDPluginRpcCallbacks()
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.endpoints = [NVSDPluginRpcCallbacks(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
@ -249,7 +249,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
|
||||
# RPC network init
|
||||
self.context = context.get_admin_context_without_session()
|
||||
# Handle updates from service
|
||||
self.dispatcher = self.create_rpc_dispatcher()
|
||||
self.endpoints = [self]
|
||||
# Define the listening consumers for the agent
|
||||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.NETWORK, topics.DELETE],
|
||||
@ -258,7 +258,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
|
||||
if self.l2_pop:
|
||||
consumers.append([topics.L2POPULATION,
|
||||
topics.UPDATE, cfg.CONF.host])
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
report_interval = cfg.CONF.AGENT.report_interval
|
||||
@ -493,14 +493,6 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
|
||||
else:
|
||||
LOG.warning(_('Action %s not supported'), action)
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
'''
|
||||
return [self]
|
||||
|
||||
def provision_local_vlan(self, net_uuid, network_type, physical_network,
|
||||
segmentation_id):
|
||||
'''Provisions a local VLAN.
|
||||
|
@ -73,14 +73,6 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback,
|
||||
self.notifier = notifier
|
||||
self.tunnel_type = tunnel_type
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
'''
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
@classmethod
|
||||
def get_port_from_device(cls, device):
|
||||
port = ovs_db_v2.get_port_from_device(device)
|
||||
@ -341,10 +333,10 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||
l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||
)
|
||||
self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
|
@ -200,16 +200,13 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback,
|
||||
self.topic = topics.AGENT
|
||||
self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
|
||||
self.context = q_context.get_admin_context_without_session()
|
||||
self.dispatcher = self._create_rpc_dispatcher()
|
||||
self.endpoints = [self]
|
||||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.SECURITY_GROUP, topics.UPDATE]]
|
||||
self.connection = agent_rpc.create_consumers(self.dispatcher,
|
||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||
self.topic,
|
||||
consumers)
|
||||
|
||||
def _create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def _setup_integration_br(self, root_helper, integ_br,
|
||||
tunnel_ip, ovsdb_port, ovsdb_ip):
|
||||
self.int_br = OVSBridge(integ_br, root_helper)
|
||||
|
@ -57,9 +57,6 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback,
|
||||
super(RyuRpcCallbacks, self).__init__()
|
||||
self.ofp_rest_api_addr = ofp_rest_api_addr
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def get_ofp_rest_api(self, context, **kwargs):
|
||||
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
|
||||
return self.ofp_rest_api_addr
|
||||
@ -143,10 +140,9 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.endpoints = [RyuRpcCallbacks(self.ofp_api_host)]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def _create_all_tenant_network(self):
|
||||
|
@ -25,7 +25,6 @@ from neutron.api.v2 import attributes
|
||||
from neutron.common import constants as const
|
||||
from neutron.common import exceptions as ntn_exc
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import db_base_plugin_v2
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import l3_db
|
||||
@ -48,14 +47,6 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback,
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
'''Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
'''
|
||||
return [self, agents_db.AgentExtRpcCallback()]
|
||||
|
||||
|
||||
def handle_network_dhcp_access(plugin, context, network, action):
|
||||
pass
|
||||
|
@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.common import constants as const
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.common import topics
|
||||
from neutron.db import agents_db
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.plugins.vmware.common import config
|
||||
@ -70,8 +71,9 @@ class DhcpMetadataAccess(object):
|
||||
def _setup_rpc_dhcp_metadata(self, notifier=None):
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.dispatcher = nsx_rpc.NSXRpcCallbacks().create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
|
||||
self.endpoints = [nsx_rpc.NSXRpcCallbacks(),
|
||||
agents_db.AgentExtRpcCallback()]
|
||||
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
|
||||
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
|
||||
notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
|
||||
self.conn.consume_in_threads()
|
||||
|
@ -40,9 +40,6 @@ class FirewallCallbacks(rpc_compat.RpcCallback):
|
||||
super(FirewallCallbacks, self).__init__()
|
||||
self.plugin = plugin
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def set_firewall_status(self, context, firewall_id, status, **kwargs):
|
||||
"""Agent uses this to set a firewall's status."""
|
||||
LOG.debug(_("set_firewall_status() called"))
|
||||
@ -165,13 +162,11 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin):
|
||||
"""Do the initialization for the firewall service plugin here."""
|
||||
qdbapi.register_models()
|
||||
|
||||
self.callbacks = FirewallCallbacks(self)
|
||||
self.endpoints = [FirewallCallbacks(self)]
|
||||
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.conn.create_consumer(
|
||||
topics.FIREWALL_PLUGIN,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
topics.FIREWALL_PLUGIN, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
self.agent_rpc = FirewallAgentApi(
|
||||
|
@ -39,14 +39,6 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback,
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
"""Get the rpc dispatcher for this manager.
|
||||
|
||||
If a manager would like to set an rpc API version, or support more than
|
||||
one class as the target of rpc messages, override this method.
|
||||
"""
|
||||
return [self]
|
||||
|
||||
|
||||
class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
|
||||
extraroute_db.ExtraRoute_db_mixin,
|
||||
@ -76,9 +68,8 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.agent_notifiers.update(
|
||||
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
|
||||
self.callbacks = L3RouterPluginRpcCallbacks()
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
self.endpoints = [L3RouterPluginRpcCallbacks()]
|
||||
self.conn.create_consumer(self.topic, self.endpoints,
|
||||
fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
|
@ -64,9 +64,6 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback):
|
||||
super(LoadBalancerCallbacks, self).__init__()
|
||||
self.plugin = plugin
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self, agents_db.AgentExtRpcCallback(self.plugin)]
|
||||
|
||||
def get_ready_devices(self, context, host=None):
|
||||
with context.session.begin(subtransactions=True):
|
||||
agents = self.plugin.get_lbaas_agents(context,
|
||||
@ -342,11 +339,14 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
|
||||
if hasattr(self.plugin, 'agent_callbacks'):
|
||||
return
|
||||
|
||||
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
|
||||
self.plugin.agent_endpoints = [
|
||||
LoadBalancerCallbacks(self.plugin),
|
||||
agents_db.AgentExtRpcCallback(self.plugin)
|
||||
]
|
||||
self.plugin.conn = rpc_compat.create_connection(new=True)
|
||||
self.plugin.conn.create_consumer(
|
||||
topics.LOADBALANCER_PLUGIN,
|
||||
self.plugin.agent_callbacks.create_rpc_dispatcher(),
|
||||
self.plugin.agent_endpoints,
|
||||
fanout=False)
|
||||
self.plugin.conn.consume_in_threads()
|
||||
|
||||
|
@ -28,13 +28,11 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
|
||||
def __init__(self):
|
||||
super(MeteringPlugin, self).__init__()
|
||||
|
||||
self.callbacks = metering_rpc.MeteringRpcCallbacks(self)
|
||||
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
|
||||
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.conn.create_consumer(
|
||||
topics.METERING_PLUGIN,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
topics.METERING_PLUGIN, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
|
||||
|
@ -198,10 +198,8 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
|
||||
|
||||
self.service_state = {}
|
||||
|
||||
self.conn.create_consumer(
|
||||
node_topic,
|
||||
self.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.endpoints = [self]
|
||||
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
self.agent_rpc = (
|
||||
CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
|
||||
@ -225,9 +223,6 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
|
||||
v['timeout']))
|
||||
for k, v in csrs_found.items()])
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def vpnservice_updated(self, context, **kwargs):
|
||||
"""Handle VPNaaS service driver change notifications."""
|
||||
LOG.debug(_("Handling VPN service update notification '%s'"),
|
||||
|
@ -504,10 +504,8 @@ class IPsecDriver(device_drivers.DeviceDriver):
|
||||
self.processes = {}
|
||||
self.process_status_cache = {}
|
||||
|
||||
self.conn.create_consumer(
|
||||
node_topic,
|
||||
self.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.endpoints = [self]
|
||||
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
|
||||
self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
|
||||
@ -515,9 +513,6 @@ class IPsecDriver(device_drivers.DeviceDriver):
|
||||
self.process_status_cache_check.start(
|
||||
interval=self.conf.ipsec.ipsec_status_check_interval)
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def _update_nat(self, vpnservice, func):
|
||||
"""Setting up nat rule in iptables.
|
||||
|
||||
|
@ -53,9 +53,6 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback):
|
||||
super(CiscoCsrIPsecVpnDriverCallBack, self).__init__()
|
||||
self.driver = driver
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def get_vpn_services_on_host(self, context, host=None):
|
||||
"""Retuns info on the vpnservices on the host."""
|
||||
plugin = self.driver.service_plugin
|
||||
@ -88,12 +85,10 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
|
||||
|
||||
def __init__(self, service_plugin):
|
||||
super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin)
|
||||
self.callbacks = CiscoCsrIPsecVpnDriverCallBack(self)
|
||||
self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)]
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.conn.create_consumer(
|
||||
topics.CISCO_IPSEC_DRIVER_TOPIC,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
topics.CISCO_IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
|
||||
topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
|
||||
|
@ -40,9 +40,6 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback):
|
||||
super(IPsecVpnDriverCallBack, self).__init__()
|
||||
self.driver = driver
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return [self]
|
||||
|
||||
def get_vpn_services_on_host(self, context, host=None):
|
||||
"""Returns the vpnservices on the host."""
|
||||
plugin = self.driver.service_plugin
|
||||
@ -73,12 +70,10 @@ class IPsecVPNDriver(service_drivers.VpnDriver):
|
||||
|
||||
def __init__(self, service_plugin):
|
||||
super(IPsecVPNDriver, self).__init__(service_plugin)
|
||||
self.callbacks = IPsecVpnDriverCallBack(self)
|
||||
self.endpoints = [IPsecVpnDriverCallBack(self)]
|
||||
self.conn = rpc_compat.create_connection(new=True)
|
||||
self.conn.create_consumer(
|
||||
topics.IPSEC_DRIVER_TOPIC,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
topics.IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
self.agent_rpc = IPsecVpnAgentApi(
|
||||
topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
|
||||
|
@ -33,8 +33,6 @@ from neutron.db import portbindings_db # noqa
|
||||
|
||||
RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
|
||||
NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
|
||||
CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
|
||||
DISPATCHER = CALLBACKS + '.create_rpc_dispatcher'
|
||||
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
|
||||
SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
|
||||
HTTPCON = 'neutron.plugins.bigswitch.servermanager.httplib.HTTPConnection'
|
||||
@ -61,15 +59,11 @@ class BigSwitchTestBase(object):
|
||||
|
||||
def setup_patches(self):
|
||||
self.plugin_notifier_p = mock.patch(NOTIFIER)
|
||||
# prevent rpc callback dispatcher from being created
|
||||
self.callbacks_p = mock.patch(DISPATCHER,
|
||||
new=lambda *args, **kwargs: None)
|
||||
# prevent any greenthreads from spawning
|
||||
self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None)
|
||||
# prevent the consistency watchdog from starting
|
||||
self.watch_p = mock.patch(CWATCH, new=lambda *args, **kwargs: None)
|
||||
self.addCleanup(db.clear_db)
|
||||
self.callbacks_p.start()
|
||||
self.plugin_notifier_p.start()
|
||||
self.spawn_p.start()
|
||||
self.watch_p.start()
|
||||
|
@ -32,7 +32,7 @@ class RestProxySecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase,
|
||||
super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str)
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
self.notifier = plugin.notifier
|
||||
self.rpc = plugin.callbacks
|
||||
self.rpc = plugin.endpoints[0]
|
||||
self.startHttpPatch()
|
||||
|
||||
|
||||
|
@ -62,7 +62,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
|
||||
bound, status)
|
||||
port_id = port['port']['id']
|
||||
neutron_context = context.get_admin_context()
|
||||
details = self.plugin.callbacks.get_device_details(
|
||||
details = self.plugin.endpoints[0].get_device_details(
|
||||
neutron_context, agent_id="theAgentId", device=port_id)
|
||||
if bound:
|
||||
self.assertEqual(details['network_type'], 'local')
|
||||
|
@ -74,7 +74,8 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
|
||||
req.get_response(self.api))
|
||||
port_id = res['port']['id']
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
port_dict = plugin.callbacks.get_port_from_device(port_id)
|
||||
callbacks = plugin.endpoints[0]
|
||||
port_dict = callbacks.get_port_from_device(port_id)
|
||||
self.assertEqual(port_id, port_dict['id'])
|
||||
self.assertEqual([security_group_id],
|
||||
port_dict[ext_sg.SECURITYGROUPS])
|
||||
@ -85,7 +86,7 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
|
||||
|
||||
def test_security_group_get_port_from_device_with_no_port(self):
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
|
||||
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
|
||||
self.assertIsNone(port_dict)
|
||||
|
||||
|
||||
|
@ -136,7 +136,8 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
|
||||
req.get_response(self.api))
|
||||
port_id = res['port']['id']
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
port_dict = plugin.callbacks.get_port_from_device(port_id)
|
||||
callbacks = plugin.endpoints[0]
|
||||
port_dict = callbacks.get_port_from_device(port_id)
|
||||
self.assertEqual(port_id, port_dict['id'])
|
||||
self.assertEqual([security_group_id],
|
||||
port_dict[ext_sg.SECURITYGROUPS])
|
||||
@ -148,7 +149,7 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
|
||||
def test_security_group_get_port_from_device_with_no_port(self):
|
||||
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
|
||||
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
|
||||
self.assertIsNone(port_dict)
|
||||
|
||||
|
||||
|
@ -84,7 +84,8 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
|
||||
req.get_response(self.api))
|
||||
port_id = res['port']['id']
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
port_dict = plugin.callbacks.get_port_from_device(port_id)
|
||||
callbacks = plugin.endpoints[0]
|
||||
port_dict = callbacks.get_port_from_device(port_id)
|
||||
self.assertEqual(port_id, port_dict['id'])
|
||||
self.assertEqual([security_group_id],
|
||||
port_dict[ext_sg.SECURITYGROUPS])
|
||||
@ -95,7 +96,7 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
|
||||
|
||||
def test_security_group_get_port_from_device_with_no_port(self):
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
|
||||
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
|
||||
self.assertIsNone(port_dict)
|
||||
|
||||
|
||||
|
@ -73,7 +73,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
|
||||
req.get_response(self.api))
|
||||
port_id = res['port']['id']
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
port_dict = plugin.callbacks.get_port_from_device(port_id)
|
||||
port_dict = plugin.endpoints[0].get_port_from_device(port_id)
|
||||
self.assertEqual(port_id, port_dict['id'])
|
||||
self.assertEqual([security_group_id],
|
||||
port_dict[ext_sg.SECURITYGROUPS])
|
||||
@ -84,7 +84,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
|
||||
|
||||
def test_security_group_get_port_from_device_with_no_port(self):
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
|
||||
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
|
||||
self.assertIsNone(port_dict)
|
||||
|
||||
|
||||
|
@ -41,7 +41,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
|
||||
def setUp(self):
|
||||
super(TestFirewallCallbacks,
|
||||
self).setUp(fw_plugin=FW_PLUGIN_KLASS)
|
||||
self.callbacks = self.plugin.callbacks
|
||||
self.callbacks = self.plugin.endpoints[0]
|
||||
|
||||
def test_set_firewall_status(self):
|
||||
ctx = context.get_admin_context()
|
||||
@ -210,7 +210,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
|
||||
|
||||
def setUp(self):
|
||||
super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS)
|
||||
self.callbacks = self.plugin.callbacks
|
||||
self.callbacks = self.plugin.endpoints[0]
|
||||
|
||||
def test_create_second_firewall_not_permitted(self):
|
||||
with self.firewall():
|
||||
@ -342,7 +342,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
|
||||
for k, v in attrs.iteritems():
|
||||
self.assertEqual(fw_db[k], v)
|
||||
# cleanup the pending firewall
|
||||
self.plugin.callbacks.firewall_deleted(ctx, fw_id)
|
||||
self.plugin.endpoints[0].firewall_deleted(ctx, fw_id)
|
||||
|
||||
def test_delete_firewall_after_agent_delete(self):
|
||||
ctx = context.get_admin_context()
|
||||
|
@ -83,31 +83,31 @@ class AgentPluginReportState(base.BaseTestCase):
|
||||
|
||||
class AgentRPCMethods(base.BaseTestCase):
|
||||
def test_create_consumers(self):
|
||||
dispatcher = mock.Mock()
|
||||
endpoints = [mock.Mock()]
|
||||
expected = [
|
||||
mock.call(new=True),
|
||||
mock.call().create_consumer('foo-topic-op', dispatcher,
|
||||
mock.call().create_consumer('foo-topic-op', endpoints,
|
||||
fanout=True),
|
||||
mock.call().consume_in_threads()
|
||||
]
|
||||
|
||||
call_to_patch = 'neutron.common.rpc_compat.create_connection'
|
||||
with mock.patch(call_to_patch) as create_connection:
|
||||
rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
|
||||
rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
|
||||
create_connection.assert_has_calls(expected)
|
||||
|
||||
def test_create_consumers_with_node_name(self):
|
||||
dispatcher = mock.Mock()
|
||||
endpoints = [mock.Mock()]
|
||||
expected = [
|
||||
mock.call(new=True),
|
||||
mock.call().create_consumer('foo-topic-op', dispatcher,
|
||||
mock.call().create_consumer('foo-topic-op', endpoints,
|
||||
fanout=True),
|
||||
mock.call().create_consumer('foo-topic-op.node1', dispatcher,
|
||||
mock.call().create_consumer('foo-topic-op.node1', endpoints,
|
||||
fanout=False),
|
||||
mock.call().consume_in_threads()
|
||||
]
|
||||
|
||||
call_to_patch = 'neutron.common.rpc_compat.create_connection'
|
||||
with mock.patch(call_to_patch) as create_connection:
|
||||
rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')])
|
||||
rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
|
||||
create_connection.assert_has_calls(expected)
|
||||
|
Loading…
x
Reference in New Issue
Block a user