diff --git a/neutron/api/v2/base.py b/neutron/api/v2/base.py index 2ed735e277..89ef47ed6d 100644 --- a/neutron/api/v2/base.py +++ b/neutron/api/v2/base.py @@ -27,8 +27,8 @@ from neutron.api.v2 import attributes from neutron.api.v2 import resource as wsgi_resource from neutron.common import constants as const from neutron.common import exceptions +from neutron.common import rpc as n_rpc from neutron.openstack.common import log as logging -from neutron.openstack.common.notifier import api as notifier_api from neutron import policy from neutron import quota @@ -69,7 +69,7 @@ class Controller(object): self._native_sorting = self._is_native_sorting_supported() self._policy_attrs = [name for (name, info) in self._attr_info.items() if info.get('required_by_policy')] - self._publisher_id = notifier_api.publisher_id('network') + self._notifier = n_rpc.get_notifier('network') # use plugin's dhcp notifier, if this is already instantiated agent_notifiers = getattr(plugin, 'agent_notifiers', {}) self._dhcp_agent_notifier = ( @@ -372,10 +372,8 @@ class Controller(object): def create(self, request, body=None, **kwargs): """Creates a new instance of the requested entity.""" parent_id = kwargs.get(self._parent_id_name) - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, self._resource + '.create.start', - notifier_api.CONF.default_notification_level, body) body = Controller.prepare_request_body(request.context, body, True, self._resource, self._attr_info, @@ -419,10 +417,8 @@ class Controller(object): def notify(create_result): notifier_method = self._resource + '.create.end' - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, notifier_method, - notifier_api.CONF.default_notification_level, create_result) self._send_dhcp_notification(request.context, create_result, @@ -458,10 +454,8 @@ class Controller(object): def delete(self, request, id, **kwargs): """Deletes the specified entity.""" - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, self._resource + '.delete.start', - notifier_api.CONF.default_notification_level, {self._resource + '_id': id}) action = self._plugin_handlers[self.DELETE] @@ -482,10 +476,8 @@ class Controller(object): obj_deleter = getattr(self._plugin, action) obj_deleter(request.context, id, **kwargs) notifier_method = self._resource + '.delete.end' - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, notifier_method, - notifier_api.CONF.default_notification_level, {self._resource + '_id': id}) result = {self._resource: self._view(request.context, obj)} self._send_nova_notification(action, {}, result) @@ -502,10 +494,8 @@ class Controller(object): msg = _("Invalid format: %s") % request.body raise exceptions.BadRequest(resource='body', msg=msg) payload['id'] = id - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, self._resource + '.update.start', - notifier_api.CONF.default_notification_level, payload) body = Controller.prepare_request_body(request.context, body, False, self._resource, self._attr_info, @@ -541,11 +531,7 @@ class Controller(object): obj = obj_updater(request.context, id, **kwargs) result = {self._resource: self._view(request.context, obj)} notifier_method = self._resource + '.update.end' - notifier_api.notify(request.context, - self._publisher_id, - notifier_method, - notifier_api.CONF.default_notification_level, - result) + self._notifier.info(request.context, notifier_method, result) self._send_dhcp_notification(request.context, result, notifier_method) diff --git a/neutron/cmd/usage_audit.py b/neutron/cmd/usage_audit.py index f48e0c691c..6294d710d9 100644 --- a/neutron/cmd/usage_audit.py +++ b/neutron/cmd/usage_audit.py @@ -26,9 +26,9 @@ import sys from oslo.config import cfg from neutron.common import config +from neutron.common import rpc as n_rpc from neutron import context from neutron import manager -from neutron.openstack.common.notifier import api as notifier_api def main(): @@ -37,33 +37,14 @@ def main(): cxt = context.get_admin_context() plugin = manager.NeutronManager.get_plugin() + notifier = n_rpc.get_notifier('network') for network in plugin.get_networks(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'network.exists', - notifier_api.INFO, - {'network': network}) + notifier.info(cxt, 'network.exists', {'network': network}) for subnet in plugin.get_subnets(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'subnet.exists', - notifier_api.INFO, - {'subnet': subnet}) + notifier.info(cxt, 'subnet.exists', {'subnet': subnet}) for port in plugin.get_ports(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'port.exists', - notifier_api.INFO, - {'port': port}) + notifier.info(cxt, 'port.exists', {'port': port}) for router in plugin.get_routers(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'router.exists', - notifier_api.INFO, - {'router': router}) + notifier.info(cxt, 'router.exists', {'router': router}) for floatingip in plugin.get_floatingips(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'floatingip.exists', - notifier_api.INFO, - {'floatingip': floatingip}) + notifier.info(cxt, 'floatingip.exists', {'floatingip': floatingip}) diff --git a/neutron/common/config.py b/neutron/common/config.py index a7b7a95594..0a8232fa02 100644 --- a/neutron/common/config.py +++ b/neutron/common/config.py @@ -20,13 +20,13 @@ Routines for configuring Neutron import os from oslo.config import cfg +from oslo import messaging from paste import deploy from neutron.api.v2 import attributes from neutron.common import utils from neutron.openstack.common.db import options as db_options from neutron.openstack.common import log as logging -from neutron.openstack.common import rpc from neutron import version @@ -125,7 +125,7 @@ cfg.CONF.register_opts(core_opts) cfg.CONF.register_cli_opts(core_cli_opts) # Ensure that the control exchange is set correctly -rpc.set_defaults(control_exchange='neutron') +messaging.set_transport_defaults(control_exchange='neutron') _SQL_CONNECTION_DEFAULT = 'sqlite://' # Update the default QueuePool parameters. These can be tweaked by the # configuration variables - max_pool_size, max_overflow and pool_timeout @@ -139,6 +139,11 @@ def init(args, **kwargs): version='%%prog %s' % version.version_info.release_string(), **kwargs) + # FIXME(ihrachys): if import is put in global, circular import + # failure occurs + from neutron.common import rpc as n_rpc + n_rpc.init(cfg.CONF) + # Validate that the base_mac is of the correct format msg = attributes._validate_regex(cfg.CONF.base_mac, attributes.MAC_PATTERN) diff --git a/neutron/common/rpc.py b/neutron/common/rpc.py index 643cf59344..98d4681404 100644 --- a/neutron/common/rpc.py +++ b/neutron/common/rpc.py @@ -15,31 +15,122 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg +from oslo import messaging +from oslo.messaging import serializer as om_serializer + +from neutron.common import exceptions from neutron import context from neutron.openstack.common import log as logging -from neutron.openstack.common.rpc import dispatcher LOG = logging.getLogger(__name__) -class PluginRpcDispatcher(dispatcher.RpcDispatcher): - """This class is used to convert RPC common context into +TRANSPORT = None +NOTIFIER = None + +ALLOWED_EXMODS = [ + exceptions.__name__, +] +EXTRA_EXMODS = [] + + +TRANSPORT_ALIASES = { + 'neutron.openstack.common.rpc.impl_fake': 'fake', + 'neutron.openstack.common.rpc.impl_qpid': 'qpid', + 'neutron.openstack.common.rpc.impl_kombu': 'rabbit', + 'neutron.openstack.common.rpc.impl_zmq': 'zmq', + 'neutron.rpc.impl_fake': 'fake', + 'neutron.rpc.impl_qpid': 'qpid', + 'neutron.rpc.impl_kombu': 'rabbit', + 'neutron.rpc.impl_zmq': 'zmq', +} + + +def init(conf): + global TRANSPORT, NOTIFIER + exmods = get_allowed_exmods() + TRANSPORT = messaging.get_transport(conf, + allowed_remote_exmods=exmods, + aliases=TRANSPORT_ALIASES) + NOTIFIER = messaging.Notifier(TRANSPORT) + + +def cleanup(): + global TRANSPORT, NOTIFIER + assert TRANSPORT is not None + assert NOTIFIER is not None + TRANSPORT.cleanup() + TRANSPORT = NOTIFIER = None + + +def add_extra_exmods(*args): + EXTRA_EXMODS.extend(args) + + +def clear_extra_exmods(): + del EXTRA_EXMODS[:] + + +def get_allowed_exmods(): + return ALLOWED_EXMODS + EXTRA_EXMODS + + +def get_client(target, version_cap=None, serializer=None): + assert TRANSPORT is not None + serializer = PluginRpcSerializer(serializer) + return messaging.RPCClient(TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + +def get_server(target, endpoints, serializer=None): + assert TRANSPORT is not None + serializer = PluginRpcSerializer(serializer) + return messaging.get_rpc_server(TRANSPORT, + target, + endpoints, + executor='eventlet', + serializer=serializer) + + +def get_notifier(service=None, host=None, publisher_id=None): + assert NOTIFIER is not None + if not publisher_id: + publisher_id = "%s.%s" % (service, host or cfg.CONF.host) + return NOTIFIER.prepare(publisher_id=publisher_id) + + +class PluginRpcSerializer(om_serializer.Serializer): + """This serializer is used to convert RPC common context into Neutron Context. """ + def __init__(self, base): + super(PluginRpcSerializer, self).__init__() + self._base = base - def __init__(self, callbacks): - super(PluginRpcDispatcher, self).__init__(callbacks) + def serialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.serialize_entity(ctxt, entity) - def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs): - rpc_ctxt_dict = rpc_ctxt.to_dict() + def deserialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.deserialize_entity(ctxt, entity) + + def serialize_context(self, ctxt): + return ctxt.to_dict() + + def deserialize_context(self, ctxt): + rpc_ctxt_dict = ctxt.copy() user_id = rpc_ctxt_dict.pop('user_id', None) if not user_id: user_id = rpc_ctxt_dict.pop('user', None) tenant_id = rpc_ctxt_dict.pop('tenant_id', None) if not tenant_id: tenant_id = rpc_ctxt_dict.pop('project_id', None) - neutron_ctxt = context.Context(user_id, tenant_id, - load_admin_roles=False, **rpc_ctxt_dict) - return super(PluginRpcDispatcher, self).dispatch( - neutron_ctxt, version, method, namespace, **kwargs) + return context.Context(user_id, tenant_id, + load_admin_roles=False, **rpc_ctxt_dict) diff --git a/neutron/common/rpc_compat.py b/neutron/common/rpc_compat.py index f494d53380..939551d493 100644 --- a/neutron/common/rpc_compat.py +++ b/neutron/common/rpc_compat.py @@ -13,24 +13,63 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg +from oslo import messaging + +from neutron.common import rpc as n_rpc from neutron.openstack.common import log as logging -from neutron.openstack.common import rpc -from neutron.openstack.common.rpc import common as rpc_common -from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher -from neutron.openstack.common.rpc import proxy from neutron.openstack.common import service LOG = logging.getLogger(__name__) -class RpcProxy(proxy.RpcProxy): +class RpcProxy(object): ''' This class is created to facilitate migration from oslo-incubator RPC layer implementation to oslo.messaging and is intended to emulate RpcProxy class behaviour using oslo.messaging API once the migration is applied. ''' + RPC_API_NAMESPACE = None + + def __init__(self, topic, default_version, version_cap=None): + self.topic = topic + target = messaging.Target(topic=topic, version=default_version) + self._client = n_rpc.get_client(target, version_cap=version_cap) + + def make_msg(self, method, **kwargs): + return {'method': method, + 'namespace': self.RPC_API_NAMESPACE, + 'args': kwargs} + + def call(self, context, msg, **kwargs): + return self.__call_rpc_method( + context, msg, rpc_method='call', **kwargs) + + def cast(self, context, msg, **kwargs): + self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs) + + def fanout_cast(self, context, msg, **kwargs): + kwargs['fanout'] = True + self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs) + + def __call_rpc_method(self, context, msg, **kwargs): + options = dict( + ((opt, kwargs[opt]) + for opt in ('fanout', 'timeout', 'topic', 'version') + if kwargs.get(opt)) + ) + if msg['namespace']: + options['namespace'] = msg['namespace'] + + if options: + callee = self._client.prepare(**options) + else: + callee = self._client + + func = getattr(callee, kwargs['rpc_method']) + return func(context, msg['method'], **msg['args']) class RpcCallback(object): @@ -40,6 +79,11 @@ class RpcCallback(object): callback version using oslo.messaging API once the migration is applied. ''' + RPC_API_VERSION = '1.0' + + def __init__(self): + super(RpcCallback, self).__init__() + self.target = messaging.Target(version=self.RPC_API_VERSION) class Service(service.Service): @@ -64,8 +108,7 @@ class Service(service.Service): LOG.debug("Creating Consumer connection for Service %s" % self.topic) - dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], - self.serializer) + dispatcher = [self.manager] # Share this same connection for these Consumers self.conn.create_consumer(self.topic, dispatcher, fanout=False) @@ -93,11 +136,30 @@ class Service(service.Service): super(Service, self).stop() +class Connection(object): + + def __init__(self): + super(Connection, self).__init__() + self.servers = [] + + def create_consumer(self, topic, proxy, fanout=False): + target = messaging.Target( + topic=topic, server=cfg.CONF.host, fanout=fanout) + server = n_rpc.get_server(target, proxy) + self.servers.append(server) + + def consume_in_thread(self): + for server in self.servers: + server.start() + return self.servers + + # functions -create_connection = rpc.create_connection +def create_connection(new=True): + return Connection() # exceptions -RPCException = rpc_common.RPCException -RemoteError = rpc_common.RemoteError -MessagingTimeout = rpc_common.Timeout +RPCException = messaging.MessagingException +RemoteError = messaging.RemoteError +MessagingTimeout = messaging.MessagingTimeout diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index 547026277a..5d2aa6e1aa 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as l3_constants from neutron.common import exceptions as n_exc +from neutron.common import rpc as n_rpc from neutron.common import utils from neutron.db import model_base from neutron.db import models_v2 @@ -28,7 +29,6 @@ from neutron.extensions import external_net from neutron.extensions import l3 from neutron import manager from neutron.openstack.common import log as logging -from neutron.openstack.common.notifier import api as notifier_api from neutron.openstack.common import uuidutils from neutron.plugins.common import constants @@ -481,11 +481,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'tenant_id': port['tenant_id'], 'port_id': port['id'], 'subnet_id': port['fixed_ips'][0]['subnet_id']} - notifier_api.notify(context, - notifier_api.publisher_id('network'), - 'router.interface.create', - notifier_api.CONF.default_notification_level, - {'router_interface': info}) + notifier = n_rpc.get_notifier('network') + notifier.info( + context, 'router.interface.create', {'router_interface': info}) return info def _confirm_router_interface_not_in_use(self, context, router_id, @@ -560,11 +558,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'tenant_id': port['tenant_id'], 'port_id': port['id'], 'subnet_id': subnet['id']} - notifier_api.notify(context, - notifier_api.publisher_id('network'), - 'router.interface.delete', - notifier_api.CONF.default_notification_level, - {'router_interface': info}) + notifier = n_rpc.get_notifier('network') + notifier.info( + context, 'router.interface.delete', {'router_interface': info}) return info def _get_floatingip(self, context, id): diff --git a/neutron/db/metering/metering_rpc.py b/neutron/db/metering/metering_rpc.py index 82e7d3dd1f..b55a0cf4c7 100644 --- a/neutron/db/metering/metering_rpc.py +++ b/neutron/db/metering/metering_rpc.py @@ -15,7 +15,6 @@ # under the License. from neutron.common import constants as consts -from neutron.common import rpc as p_rpc from neutron.common import utils from neutron import manager from neutron.openstack.common import log as logging @@ -32,7 +31,7 @@ class MeteringRpcCallbacks(object): self.meter_plugin = meter_plugin def create_rpc_dispatcher(self): - return p_rpc.PluginRpcDispatcher([self]) + return [self] def get_sync_data_metering(self, context, **kwargs): l3_plugin = manager.NeutronManager.get_service_plugins().get( diff --git a/neutron/openstack/common/service.py b/neutron/openstack/common/service.py index 79ae9bc5d0..4575de4b47 100644 --- a/neutron/openstack/common/service.py +++ b/neutron/openstack/common/service.py @@ -45,7 +45,9 @@ from neutron.openstack.common import systemd from neutron.openstack.common import threadgroup -rpc = importutils.try_import('neutron.openstack.common.rpc') +#rpc = importutils.try_import('neutron.openstack.common.rpc') +# TODO(ihrachys): restore once oslo-rpc code is removed from the tree +rpc = None CONF = cfg.CONF LOG = logging.getLogger(__name__) diff --git a/neutron/plugins/bigswitch/agent/restproxy_agent.py b/neutron/plugins/bigswitch/agent/restproxy_agent.py index a9c1e6653a..6cdf5913b0 100644 --- a/neutron/plugins/bigswitch/agent/restproxy_agent.py +++ b/neutron/plugins/bigswitch/agent/restproxy_agent.py @@ -36,7 +36,6 @@ from neutron import context as q_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import excutils from neutron.openstack.common import log -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.bigswitch import config as pl_config LOG = log.getLogger(__name__) @@ -106,7 +105,7 @@ class RestProxyAgent(rpc_compat.RpcCallback, self.topic = topics.AGENT self.plugin_rpc = PluginApi(topics.PLUGIN) self.context = q_context.get_admin_context_without_session() - self.dispatcher = dispatcher.RpcDispatcher([self]) + self.dispatcher = [self] consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] self.connection = agent_rpc.create_consumers(self.dispatcher, diff --git a/neutron/plugins/bigswitch/plugin.py b/neutron/plugins/bigswitch/plugin.py index 9249f5d6b0..712f02b3c3 100644 --- a/neutron/plugins/bigswitch/plugin.py +++ b/neutron/plugins/bigswitch/plugin.py @@ -57,7 +57,6 @@ from neutron.api import extensions as neutron_extensions from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.common import constants as const from neutron.common import exceptions -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -121,8 +120,7 @@ class RestProxyCallbacks(rpc_compat.RpcCallback, RPC_API_VERSION = '1.1' def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def get_port_from_device(self, device): port_id = re.sub(r"^tap", "", device) diff --git a/neutron/plugins/brocade/NeutronPlugin.py b/neutron/plugins/brocade/NeutronPlugin.py index fc1d1ad5d7..5ec3fb4016 100644 --- a/neutron/plugins/brocade/NeutronPlugin.py +++ b/neutron/plugins/brocade/NeutronPlugin.py @@ -31,7 +31,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -98,8 +97,7 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py index d3749f19da..e5c701e7d2 100644 --- a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py +++ b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py @@ -28,7 +28,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -75,8 +74,7 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, diff --git a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py index 3447356257..f76f751f84 100644 --- a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py +++ b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py @@ -38,7 +38,6 @@ from neutron.common import topics from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.hyperv.agent import utils from neutron.plugins.hyperv.agent import utilsfactory @@ -106,8 +105,7 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback, consumers) def _create_rpc_dispatcher(self): - rpc_callback = HyperVSecurityCallbackMixin(self) - return dispatcher.RpcDispatcher([rpc_callback]) + return [HyperVSecurityCallbackMixin(self)] class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback, @@ -236,7 +234,7 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback): segmentation_id, port['admin_state_up']) def _create_rpc_dispatcher(self): - return dispatcher.RpcDispatcher([self]) + return [self] def _get_vswitch_name(self, network_type, physical_network): if network_type != p_const.TYPE_LOCAL: diff --git a/neutron/plugins/hyperv/rpc_callbacks.py b/neutron/plugins/hyperv/rpc_callbacks.py index dafc160e75..e967286d58 100644 --- a/neutron/plugins/hyperv/rpc_callbacks.py +++ b/neutron/plugins/hyperv/rpc_callbacks.py @@ -17,7 +17,6 @@ # @author: Alessandro Pilotti, Cloudbase Solutions Srl from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.db import agents_db from neutron.db import dhcp_rpc_base @@ -48,8 +47,7 @@ class HyperVRpcCallbacks( If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def get_device_details(self, rpc_context, **kwargs): """Agent requests device details.""" diff --git a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py index 1a5190d90e..b1fa1e8b65 100644 --- a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py +++ b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py @@ -37,7 +37,6 @@ from neutron.common import utils as n_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.ibm.common import config # noqa from neutron.plugins.ibm.common import constants @@ -156,7 +155,7 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback): "out-of-band") def create_rpc_dispatcher(self): - return dispatcher.RpcDispatcher([self]) + return [self] def setup_integration_br(self, bridge_name, reset_br, out_of_band, controller_ip=None): diff --git a/neutron/plugins/ibm/sdnve_neutron_plugin.py b/neutron/plugins/ibm/sdnve_neutron_plugin.py index d3be17e517..8a6615f2e4 100644 --- a/neutron/plugins/ibm/sdnve_neutron_plugin.py +++ b/neutron/plugins/ibm/sdnve_neutron_plugin.py @@ -23,7 +23,6 @@ from oslo.config import cfg from neutron.common import constants as n_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -54,8 +53,7 @@ class SdnveRpcCallbacks(): If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return n_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def sdnve_info(self, rpc_context, **kwargs): '''Update new information.''' diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index d586b2eb9e..5af3f674a3 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -45,7 +45,6 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.linuxbridge.common import config # noqa from neutron.plugins.linuxbridge.common import constants as lconst @@ -816,7 +815,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return dispatcher.RpcDispatcher([self]) + return [self] class LinuxBridgePluginApi(agent_rpc.PluginApi, diff --git a/neutron/plugins/linuxbridge/lb_neutron_plugin.py b/neutron/plugins/linuxbridge/lb_neutron_plugin.py index 9af9a616d5..61089f63cf 100644 --- a/neutron/plugins/linuxbridge/lb_neutron_plugin.py +++ b/neutron/plugins/linuxbridge/lb_neutron_plugin.py @@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -72,8 +71,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/midonet/plugin.py b/neutron/plugins/midonet/plugin.py index 3902278e46..4495dda01d 100644 --- a/neutron/plugins/midonet/plugin.py +++ b/neutron/plugins/midonet/plugin.py @@ -29,7 +29,6 @@ from sqlalchemy.orm import exc as sa_exc from neutron.api.v2 import attributes from neutron.common import constants from neutron.common import exceptions as n_exc -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -189,8 +188,7 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return n_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] class MidonetPluginException(n_exc.NeutronException): diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index ff4e6e7bce..e5068afb4c 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -13,9 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo import messaging + from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -46,13 +47,15 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, # 1.0 Initial version (from openvswitch/linuxbridge) # 1.1 Support Security Group RPC + # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due to + # inheritance problems + target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, notifier, type_manager): # REVISIT(kmestery): This depends on the first three super classes # not having their own __init__ functions. If an __init__() is added # to one, this could break. Fix this and add a unit test to cover this # test in H3. - # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due - # to inheritance problems super(RpcCallbacks, self).__init__(notifier, type_manager) def create_rpc_dispatcher(self): @@ -61,8 +64,7 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def _device_to_port_id(cls, device): diff --git a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py index 90a97ce443..94fd2b89a4 100644 --- a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py +++ b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py @@ -35,7 +35,6 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.mlnx.agent import utils from neutron.plugins.mlnx.common import config # noqa @@ -218,7 +217,7 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback, or support more than one class as the target of rpc messages, override this method. """ - return dispatcher.RpcDispatcher([self]) + return [self] class MlnxEswitchPluginApi(agent_rpc.PluginApi, diff --git a/neutron/plugins/mlnx/rpc_callbacks.py b/neutron/plugins/mlnx/rpc_callbacks.py index fff970c43a..0eda514368 100644 --- a/neutron/plugins/mlnx/rpc_callbacks.py +++ b/neutron/plugins/mlnx/rpc_callbacks.py @@ -17,7 +17,6 @@ from oslo.config import cfg from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.db import agents_db from neutron.db import api as db_api @@ -48,8 +47,7 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback, or support more than one class as the target of RPC messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/nec/agent/nec_neutron_agent.py b/neutron/plugins/nec/agent/nec_neutron_agent.py index 38b13b5b7c..c1f580ac2e 100755 --- a/neutron/plugins/nec/agent/nec_neutron_agent.py +++ b/neutron/plugins/nec/agent/nec_neutron_agent.py @@ -38,7 +38,6 @@ from neutron import context as q_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.nec.common import config @@ -157,8 +156,7 @@ class NECNeutronAgent(object): self, self.sg_agent) self.callback_sg = SecurityGroupAgentRpcCallback(self.context, self.sg_agent) - self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec, - self.callback_sg]) + self.dispatcher = [self.callback_nec, self.callback_sg] # Define the listening consumer for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] diff --git a/neutron/plugins/nec/common/config.py b/neutron/plugins/nec/common/config.py index ed35dcb17f..70f4a1a63d 100644 --- a/neutron/plugins/nec/common/config.py +++ b/neutron/plugins/nec/common/config.py @@ -18,7 +18,6 @@ from oslo.config import cfg from neutron.agent.common import config -from neutron.openstack.common import rpc # noqa from neutron.plugins.nec.common import constants as nconst diff --git a/neutron/plugins/nec/nec_plugin.py b/neutron/plugins/nec/nec_plugin.py index e36f9d63ee..2bea5c04ef 100644 --- a/neutron/plugins/nec/nec_plugin.py +++ b/neutron/plugins/nec/nec_plugin.py @@ -22,7 +22,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.v2 import attributes as attrs from neutron.common import constants as const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -147,12 +146,12 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2, # NOTE: callback_sg is referred to from the sg unit test. self.callback_sg = SecurityGroupServerRpcCallback() - callbacks = [NECPluginV2RPCCallbacks(self.safe_reference), - DhcpRpcCallback(), - L3RpcCallback(), - self.callback_sg, - agents_db.AgentExtRpcCallback()] - self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks) + self.dispatcher = [ + NECPluginV2RPCCallbacks(self.safe_reference), + DhcpRpcCallback(), + L3RpcCallback(), + self.callback_sg, + agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) # Consume from all consumers in a thread @@ -722,7 +721,7 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback): If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self]) + return [self] def update_ports(self, rpc_context, **kwargs): """Update ports' information and activate/deavtivate them. diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py index 7ff3040b06..c79d77a915 100644 --- a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py +++ b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py @@ -39,7 +39,6 @@ from neutron.common import utils as n_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.ofagent.common import config # noqa from neutron.plugins.openvswitch.common import constants @@ -351,7 +350,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return dispatcher.RpcDispatcher([self]) + return [self] def _provision_local_vlan_outbound_for_tunnel(self, lvid, segmentation_id, ofports): diff --git a/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py b/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py index d1d3daf6ef..0ef6348dfb 100644 --- a/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py +++ b/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py @@ -32,7 +32,6 @@ from neutron.common import topics from neutron import context as n_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log as logging -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.oneconvergence.lib import config LOG = logging.getLogger(__name__) @@ -120,8 +119,7 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback): self, self.sg_agent) self.callback_sg = SecurityGroupAgentRpcCallback(self.context, self.sg_agent) - self.dispatcher = dispatcher.RpcDispatcher([self.callback_oc, - self.callback_sg]) + self.dispatcher = [self.callback_oc, self.callback_sg] # Define the listening consumer for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] diff --git a/neutron/plugins/oneconvergence/plugin.py b/neutron/plugins/oneconvergence/plugin.py index 7d7af13b0d..732ead70ae 100644 --- a/neutron/plugins/oneconvergence/plugin.py +++ b/neutron/plugins/oneconvergence/plugin.py @@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.common import constants as q_const from neutron.common import exceptions as nexception -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -61,8 +60,7 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback, def create_rpc_dispatcher(self): """Get the rpc dispatcher for this manager.""" - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @staticmethod def get_port_from_device(device): diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index e6a58567a4..31c6274848 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -41,7 +41,6 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.openvswitch.common import config # noqa from neutron.plugins.openvswitch.common import constants @@ -500,7 +499,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return dispatcher.RpcDispatcher([self]) + return [self] def provision_local_vlan(self, net_uuid, network_type, physical_network, segmentation_id): diff --git a/neutron/plugins/openvswitch/ovs_neutron_plugin.py b/neutron/plugins/openvswitch/ovs_neutron_plugin.py index 01867c4164..5e3f387b0f 100644 --- a/neutron/plugins/openvswitch/ovs_neutron_plugin.py +++ b/neutron/plugins/openvswitch/ovs_neutron_plugin.py @@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -80,8 +79,7 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/ryu/agent/ryu_neutron_agent.py b/neutron/plugins/ryu/agent/ryu_neutron_agent.py index 746a0c2f54..6086113c7f 100755 --- a/neutron/plugins/ryu/agent/ryu_neutron_agent.py +++ b/neutron/plugins/ryu/agent/ryu_neutron_agent.py @@ -42,7 +42,6 @@ from neutron.common import topics from neutron import context as q_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.ryu.common import config # noqa @@ -209,7 +208,7 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback, consumers) def _create_rpc_dispatcher(self): - return dispatcher.RpcDispatcher([self]) + return [self] def _setup_integration_br(self, root_helper, integ_br, tunnel_ip, ovsdb_port, ovsdb_ip): diff --git a/neutron/plugins/ryu/ryu_neutron_plugin.py b/neutron/plugins/ryu/ryu_neutron_plugin.py index 35065a41e5..787ccb21c7 100644 --- a/neutron/plugins/ryu/ryu_neutron_plugin.py +++ b/neutron/plugins/ryu/ryu_neutron_plugin.py @@ -23,7 +23,6 @@ from ryu.app import rest_nw_id from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import api as db @@ -59,7 +58,7 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback, self.ofp_rest_api_addr = ofp_rest_api_addr def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return [self] def get_ofp_rest_api(self, context, **kwargs): LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr) diff --git a/neutron/plugins/vmware/dhcp_meta/rpc.py b/neutron/plugins/vmware/dhcp_meta/rpc.py index 057e94d970..c32a39b372 100644 --- a/neutron/plugins/vmware/dhcp_meta/rpc.py +++ b/neutron/plugins/vmware/dhcp_meta/rpc.py @@ -24,7 +24,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as const from neutron.common import exceptions as ntn_exc -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.db import agents_db from neutron.db import db_base_plugin_v2 @@ -55,8 +54,7 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return n_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def handle_network_dhcp_access(plugin, context, network, action): diff --git a/neutron/policy.py b/neutron/policy.py index 4c64432b6f..747638287f 100644 --- a/neutron/policy.py +++ b/neutron/policy.py @@ -26,7 +26,6 @@ from oslo.config import cfg from neutron.api.v2 import attributes from neutron.common import exceptions import neutron.common.utils as utils -from neutron import manager from neutron.openstack.common import excutils from neutron.openstack.common import importutils from neutron.openstack.common import log as logging @@ -263,6 +262,9 @@ class OwnerCheck(policy.Check): # resource is handled by the core plugin. It might be worth # having a way to map resources to plugins so to make this # check more general + # FIXME(ihrachys): if import is put in global, circular + # import failure occurs + from neutron import manager f = getattr(manager.NeutronManager.get_instance().plugin, 'get_%s' % parent_res) # f *must* exist, if not found it is better to let neutron diff --git a/neutron/service.py b/neutron/service.py index 9b3073b5fb..f14021769e 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -13,13 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet import inspect import logging as std_logging import os import random from oslo.config import cfg +from oslo.messaging import server as rpc_server from neutron.common import config from neutron.common import rpc_compat @@ -112,23 +112,25 @@ class RpcWorker(object): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, plugin): self._plugin = plugin - self._server = None + self._servers = [] def start(self): # We may have just forked from parent process. A quick disposal of the # existing sql connections avoids producing errors later when they are # discovered to be broken. session.get_engine().pool.dispose() - self._server = self._plugin.start_rpc_listener() + self._servers = self._plugin.start_rpc_listener() def wait(self): - if isinstance(self._server, eventlet.greenthread.GreenThread): - self._server.wait() + for server in self._servers: + if isinstance(server, rpc_server.MessageHandlingServer): + server.wait() def stop(self): - if isinstance(self._server, eventlet.greenthread.GreenThread): - self._server.kill() - self._server = None + for server in self._servers: + if isinstance(server, rpc_server.MessageHandlingServer): + server.kill() + self._servers = [] def serve_rpc(): diff --git a/neutron/services/firewall/fwaas_plugin.py b/neutron/services/firewall/fwaas_plugin.py index fd2131e219..0238902f3b 100644 --- a/neutron/services/firewall/fwaas_plugin.py +++ b/neutron/services/firewall/fwaas_plugin.py @@ -20,7 +20,6 @@ from oslo.config import cfg from neutron.common import exceptions as n_exception -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron import context as neutron_context @@ -42,7 +41,7 @@ class FirewallCallbacks(rpc_compat.RpcCallback): self.plugin = plugin def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return [self] def set_firewall_status(self, context, firewall_id, status, **kwargs): """Agent uses this to set a firewall's status.""" diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index c5505817d5..29950c984d 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -21,7 +21,6 @@ from oslo.config import cfg from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import api as qdbapi @@ -46,7 +45,7 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self]) + return [self] class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin, diff --git a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py index 8436cb8354..85be0bacd0 100644 --- a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py +++ b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py @@ -22,7 +22,6 @@ from oslo.config import cfg from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -66,8 +65,7 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback): self.plugin = plugin def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher( - [self, agents_db.AgentExtRpcCallback(self.plugin)]) + return [self, agents_db.AgentExtRpcCallback(self.plugin)] def get_ready_devices(self, context, host=None): with context.session.begin(subtransactions=True): diff --git a/neutron/services/metering/agents/metering_agent.py b/neutron/services/metering/agents/metering_agent.py index ba1fe6bac2..80883f41b3 100644 --- a/neutron/services/metering/agents/metering_agent.py +++ b/neutron/services/metering/agents/metering_agent.py @@ -26,6 +26,7 @@ from neutron.agent.common import config from neutron.agent import rpc as agent_rpc from neutron.common import config as common_config from neutron.common import constants as constants +from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -34,7 +35,6 @@ from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.notifier import api as notifier_api from neutron.openstack.common import periodic_task from neutron.openstack.common import service from neutron import service as neutron_service @@ -114,11 +114,8 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager): 'host': self.host} LOG.debug(_("Send metering report: %s"), data) - notifier_api.notify(self.context, - notifier_api.publisher_id('metering'), - 'l3.meter', - notifier_api.CONF.default_notification_level, - data) + notifier = n_rpc.get_notifier('metering') + notifier.info(self.context, 'l3.meter', data) info['pkts'] = 0 info['bytes'] = 0 info['time'] = 0 diff --git a/neutron/services/vpn/device_drivers/cisco_ipsec.py b/neutron/services/vpn/device_drivers/cisco_ipsec.py index ba19460d77..12904f23e3 100644 --- a/neutron/services/vpn/device_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/device_drivers/cisco_ipsec.py @@ -20,10 +20,10 @@ import requests import netaddr from oslo.config import cfg +from oslo import messaging import six from neutron.common import exceptions -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron import context as ctx from neutron.openstack.common import lockutils @@ -184,12 +184,13 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): # history # 1.0 Initial version - RPC_API_VERSION = '1.0' + # TODO(ihrachys): we can't use RpcCallback here due to inheritance + # issues + target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, agent, host): - # TODO(ihrachys): we can't use RpcCallback here due to - # inheritance issues self.host = host self.conn = rpc_compat.create_connection(new=True) context = ctx.get_admin_context_without_session() @@ -225,7 +226,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): for k, v in csrs_found.items()]) def create_rpc_dispatcher(self): - return n_rpc.PluginRpcDispatcher([self]) + return [self] def vpnservice_updated(self, context, **kwargs): """Handle VPNaaS service driver change notifications.""" diff --git a/neutron/services/vpn/device_drivers/ipsec.py b/neutron/services/vpn/device_drivers/ipsec.py index 0d9ded9604..2480eb2727 100644 --- a/neutron/services/vpn/device_drivers/ipsec.py +++ b/neutron/services/vpn/device_drivers/ipsec.py @@ -23,11 +23,11 @@ import shutil import jinja2 import netaddr from oslo.config import cfg +from oslo import messaging import six from neutron.agent.linux import ip_lib from neutron.agent.linux import utils -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron import context from neutron.openstack.common import lockutils @@ -487,9 +487,11 @@ class IPsecDriver(device_drivers.DeviceDriver): RPC_API_VERSION = '1.0' + # TODO(ihrachys): we can't use RpcCallback here due to inheritance + # issues + target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, agent, host): - # TODO(ihrachys): we can't use RpcCallback here due to - # inheritance issues self.agent = agent self.conf = self.agent.conf self.root_helper = self.agent.root_helper @@ -514,7 +516,7 @@ class IPsecDriver(device_drivers.DeviceDriver): interval=self.conf.ipsec.ipsec_status_check_interval) def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return [self] def _update_nat(self, vpnservice, func): """Setting up nat rule in iptables. diff --git a/neutron/services/vpn/service_drivers/cisco_ipsec.py b/neutron/services/vpn/service_drivers/cisco_ipsec.py index 8565723197..c2b39da9e3 100644 --- a/neutron/services/vpn/service_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/service_drivers/cisco_ipsec.py @@ -16,7 +16,6 @@ import netaddr from netaddr import core as net_exc from neutron.common import exceptions -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.openstack.common import excutils from neutron.openstack.common import log as logging @@ -55,7 +54,7 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback): self.driver = driver def create_rpc_dispatcher(self): - return n_rpc.PluginRpcDispatcher([self]) + return [self] def get_vpn_services_on_host(self, context, host=None): """Retuns info on the vpnservices on the host.""" diff --git a/neutron/services/vpn/service_drivers/ipsec.py b/neutron/services/vpn/service_drivers/ipsec.py index cf4b055d89..13b7c171b4 100644 --- a/neutron/services/vpn/service_drivers/ipsec.py +++ b/neutron/services/vpn/service_drivers/ipsec.py @@ -16,7 +16,6 @@ # under the License. import netaddr -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.openstack.common import log as logging from neutron.services.vpn.common import topics @@ -42,7 +41,7 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback): self.driver = driver def create_rpc_dispatcher(self): - return n_rpc.PluginRpcDispatcher([self]) + return [self] def get_vpn_services_on_host(self, context, host=None): """Returns the vpnservices on the host.""" diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 87412f9244..95034f6538 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -29,15 +29,14 @@ import eventlet.timeout import fixtures import mock from oslo.config import cfg +from oslo.messaging import conffixture as messaging_conffixture import testtools from neutron.common import config +from neutron.common import rpc as n_rpc from neutron.db import agentschedulers_db from neutron import manager -from neutron.openstack.common.notifier import api as notifier_api -from neutron.openstack.common.notifier import test_notifier -from neutron.openstack.common import rpc -from neutron.openstack.common.rpc import impl_fake +from neutron.tests import fake_notifier from neutron.tests import post_mortem_debug @@ -58,6 +57,10 @@ def fake_use_fatal_exceptions(*args): return True +def fake_consume_in_threads(self): + return [] + + class BaseTestCase(testtools.TestCase): def cleanup_core_plugin(self): @@ -90,16 +93,10 @@ class BaseTestCase(testtools.TestCase): if core_plugin is not None: cfg.CONF.set_override('core_plugin', core_plugin) - def _cleanup_test_notifier(self): - test_notifier.NOTIFICATIONS = [] - def setup_notification_driver(self, notification_driver=None): - # to reload the drivers - self.addCleanup(notifier_api._reset_drivers) - self.addCleanup(self._cleanup_test_notifier) - notifier_api._reset_drivers() + self.addCleanup(fake_notifier.reset) if notification_driver is None: - notification_driver = [test_notifier.__name__] + notification_driver = [fake_notifier.__name__] cfg.CONF.set_override("notification_driver", notification_driver) @staticmethod @@ -113,10 +110,6 @@ class BaseTestCase(testtools.TestCase): else: conf(args) - def _cleanup_rpc_backend(self): - rpc._RPCIMPL = None - impl_fake.CONSUMERS.clear() - def setUp(self): super(BaseTestCase, self).setUp() @@ -124,8 +117,6 @@ class BaseTestCase(testtools.TestCase): # test-specific cleanup has a chance to release references. self.addCleanup(self.cleanup_core_plugin) - self.addCleanup(self._cleanup_rpc_backend) - # Configure this first to ensure pm debugging support for setUp() if os.environ.get('OS_POST_MORTEM_DEBUG') in TRUE_STRING: self.addOnException(post_mortem_debug.exception_handler) @@ -179,6 +170,25 @@ class BaseTestCase(testtools.TestCase): 'neutron.common.exceptions.NeutronException.use_fatal_exceptions', fake_use_fatal_exceptions)) + # don't actually start RPC listeners when testing + self.useFixture(fixtures.MonkeyPatch( + 'neutron.common.rpc_compat.Connection.consume_in_thread', + fake_consume_in_threads)) + + self.useFixture(fixtures.MonkeyPatch( + 'oslo.messaging.Notifier', fake_notifier.FakeNotifier)) + + self.messaging_conf = messaging_conffixture.ConfFixture(CONF) + self.messaging_conf.transport_driver = 'fake' + self.messaging_conf.response_timeout = 15 + self.useFixture(self.messaging_conf) + + self.addCleanup(n_rpc.clear_extra_exmods) + n_rpc.add_extra_exmods('neutron.test') + + self.addCleanup(n_rpc.cleanup) + n_rpc.init(CONF) + if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml': raise self.skipException('XML Testing Skipped in Py26') diff --git a/neutron/tests/fake_notifier.py b/neutron/tests/fake_notifier.py new file mode 100644 index 0000000000..012f3351eb --- /dev/null +++ b/neutron/tests/fake_notifier.py @@ -0,0 +1,50 @@ +# Copyright 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import functools + + +NOTIFICATIONS = [] + + +def reset(): + del NOTIFICATIONS[:] + + +FakeMessage = collections.namedtuple('Message', + ['publisher_id', 'priority', + 'event_type', 'payload']) + + +class FakeNotifier(object): + + def __init__(self, transport, publisher_id=None): + self.transport = transport + self.publisher_id = publisher_id + for priority in ('debug', 'info', 'warn', 'error', 'critical'): + setattr(self, priority, + functools.partial(self._notify, priority=priority.upper())) + + def prepare(self, publisher_id=None): + if publisher_id is None: + publisher_id = self.publisher_id + return self.__class__(self.transport, publisher_id) + + def _notify(self, ctxt, event_type, payload, priority): + msg = dict(publisher_id=self.publisher_id, + priority=priority, + event_type=event_type, + payload=payload) + NOTIFICATIONS.append(msg) diff --git a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py index 4af19fc546..965842738b 100644 --- a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py +++ b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py @@ -23,9 +23,9 @@ Unit Tests for hyperv neutron rpc import mock from neutron.agent import rpc as agent_rpc +from neutron.common import rpc_compat from neutron.common import topics from neutron.openstack.common import context -from neutron.openstack.common import rpc from neutron.plugins.hyperv import agent_notifier_api as ana from neutron.plugins.hyperv.common import constants from neutron.tests import base @@ -38,19 +38,19 @@ class rpcHyperVApiTestCase(base.BaseTestCase): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if method == 'call' else None expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False - with mock.patch.object(rpc, rpc_method) as rpc_method_mock: + proxy = rpc_compat.RpcProxy + with mock.patch.object(proxy, rpc_method) as rpc_method_mock: rpc_method_mock.return_value = expected_retval retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg] - for arg, expected_arg in zip(rpc_method_mock.call_args[0], - expected_args): - self.assertEqual(arg, expected_arg) + expected = [ + mock.call(ctxt, expected_msg, topic=topic) + ] + rpc_method_mock.assert_has_calls(expected) def test_delete_network(self): rpcapi = ana.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/linuxbridge/test_rpcapi.py b/neutron/tests/unit/linuxbridge/test_rpcapi.py index 762a65be1d..616a06acd9 100644 --- a/neutron/tests/unit/linuxbridge/test_rpcapi.py +++ b/neutron/tests/unit/linuxbridge/test_rpcapi.py @@ -35,7 +35,6 @@ class rpcApiTestCase(base.BaseTestCase): expected_retval = 'foo' if method == 'call' else None if not expected_msg: expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False @@ -49,15 +48,19 @@ class rpcApiTestCase(base.BaseTestCase): return expected_retval self.useFixture(fixtures.MonkeyPatch( - 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method)) + 'neutron.common.rpc_compat.RpcProxy.' + rpc_method, + _fake_rpc_method)) retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(expected_retval, retval) - expected_args = [ctxt, topic, expected_msg] + expected_args = [ctxt, expected_msg] + expected_kwargs = {'topic': topic} - for arg, expected_arg in zip(self.fake_args, expected_args): + # skip the first argument which is 'self' + for arg, expected_arg in zip(self.fake_args[1:], expected_args): self.assertEqual(expected_arg, arg) + self.assertEqual(expected_kwargs, self.fake_kwargs) def test_delete_network(self): rpcapi = plb.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index a2d3bf0eb5..af48a74f17 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -20,9 +20,9 @@ Unit Tests for ml2 rpc import mock from neutron.agent import rpc as agent_rpc +from neutron.common import rpc_compat from neutron.common import topics from neutron.openstack.common import context -from neutron.openstack.common import rpc from neutron.plugins.ml2.drivers import type_tunnel from neutron.plugins.ml2 import rpc as plugin_rpc from neutron.tests import base @@ -34,20 +34,19 @@ class RpcApiTestCase(base.BaseTestCase): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if method == 'call' else None expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False + rpc = rpc_compat.RpcProxy with mock.patch.object(rpc, rpc_method) as rpc_method_mock: rpc_method_mock.return_value = expected_retval retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(retval, expected_retval) - - expected_args = [ctxt, topic, expected_msg] - for arg, expected_arg in zip(rpc_method_mock.call_args[0], - expected_args): - self.assertEqual(arg, expected_arg) + expected = [ + mock.call(ctxt, expected_msg, topic=topic) + ] + rpc_method_mock.assert_has_calls(expected) def test_delete_network(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/mlnx/test_rpcapi.py b/neutron/tests/unit/mlnx/test_rpcapi.py index 80dcf78277..ea34a840b1 100644 --- a/neutron/tests/unit/mlnx/test_rpcapi.py +++ b/neutron/tests/unit/mlnx/test_rpcapi.py @@ -37,7 +37,6 @@ class rpcApiTestCase(base.BaseTestCase): expected_retval = 'foo' if method == 'call' else None if not expected_msg: expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False @@ -51,15 +50,19 @@ class rpcApiTestCase(base.BaseTestCase): return expected_retval self.useFixture(fixtures.MonkeyPatch( - 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method)) + 'neutron.common.rpc_compat.RpcProxy.' + rpc_method, + _fake_rpc_method)) retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(expected_retval, retval) - expected_args = [ctxt, topic, expected_msg] + expected_args = [ctxt, expected_msg] + expected_kwargs = {'topic': topic} - for arg, expected_arg in zip(self.fake_args, expected_args): + # skip the first argument which is 'self' + for arg, expected_arg in zip(self.fake_args[1:], expected_args): self.assertEqual(expected_arg, arg) + self.assertEqual(expected_kwargs, self.fake_kwargs) def test_delete_network(self): rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py index 1b6a7370a7..e8f75b9f4c 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py +++ b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py @@ -34,7 +34,6 @@ class rpcApiTestCase(base.BaseTestCase): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if method == 'call' else None expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False @@ -48,15 +47,19 @@ class rpcApiTestCase(base.BaseTestCase): return expected_retval self.useFixture(fixtures.MonkeyPatch( - 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method)) + 'neutron.common.rpc_compat.RpcProxy.' + rpc_method, + _fake_rpc_method)) retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg] + expected_args = [ctxt, expected_msg] + expected_kwargs = {'topic': topic} - for arg, expected_arg in zip(self.fake_args, expected_args): + # skip the first argument which is 'self' + for arg, expected_arg in zip(self.fake_args[1:], expected_args): self.assertEqual(arg, expected_arg) + self.assertEqual(expected_kwargs, self.fake_kwargs) def test_delete_network(self): rpcapi = povs.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/services/metering/test_metering_agent.py b/neutron/tests/unit/services/metering/test_metering_agent.py index 3e1d0db299..b3e3511fea 100644 --- a/neutron/tests/unit/services/metering/test_metering_agent.py +++ b/neutron/tests/unit/services/metering/test_metering_agent.py @@ -18,10 +18,10 @@ import mock from oslo.config import cfg from neutron.agent.common import config -from neutron.openstack.common.notifier import test_notifier from neutron.openstack.common import uuidutils from neutron.services.metering.agents import metering_agent from neutron.tests import base +from neutron.tests import fake_notifier _uuid = uuidutils.generate_uuid @@ -96,8 +96,8 @@ class TestMeteringOperations(base.BaseTestCase): 'bytes': 444}} self.agent._metering_loop() - self.assertNotEqual(len(test_notifier.NOTIFICATIONS), 0) - for n in test_notifier.NOTIFICATIONS: + self.assertNotEqual(len(fake_notifier.NOTIFICATIONS), 0) + for n in fake_notifier.NOTIFICATIONS: if n['event_type'] == 'l3.meter': break diff --git a/neutron/tests/unit/test_agent_rpc.py b/neutron/tests/unit/test_agent_rpc.py index bc4ae4a178..569a739566 100644 --- a/neutron/tests/unit/test_agent_rpc.py +++ b/neutron/tests/unit/test_agent_rpc.py @@ -27,7 +27,7 @@ class AgentRPCPluginApi(base.BaseTestCase): agent = rpc.PluginApi('fake_topic') ctxt = context.RequestContext('fake_user', 'fake_project') expect_val = 'foo' - with mock.patch('neutron.openstack.common.rpc.call') as rpc_call: + with mock.patch('neutron.common.rpc_compat.RpcProxy.call') as rpc_call: rpc_call.return_value = expect_val func_obj = getattr(agent, method) if method == 'tunnel_sync': diff --git a/neutron/tests/unit/test_api_v2.py b/neutron/tests/unit/test_api_v2.py index c09dd21ee9..38d54f7cac 100644 --- a/neutron/tests/unit/test_api_v2.py +++ b/neutron/tests/unit/test_api_v2.py @@ -33,12 +33,12 @@ from neutron.api.v2 import router from neutron.common import exceptions as n_exc from neutron import context from neutron import manager -from neutron.openstack.common.notifier import api as notifer_api from neutron.openstack.common import policy as common_policy from neutron.openstack.common import uuidutils from neutron import policy from neutron import quota from neutron.tests import base +from neutron.tests import fake_notifier from neutron.tests.unit import testlib_api @@ -1242,41 +1242,42 @@ class V2Views(base.BaseTestCase): class NotificationTest(APIv2TestBase): - def _resource_op_notifier(self, opname, resource, expected_errors=False, - notification_level='INFO'): + + def setUp(self): + super(NotificationTest, self).setUp() + fake_notifier.reset() + + def _resource_op_notifier(self, opname, resource, expected_errors=False): initial_input = {resource: {'name': 'myname'}} instance = self.plugin.return_value instance.get_networks.return_value = initial_input instance.get_networks_count.return_value = 0 expected_code = exc.HTTPCreated.code - with mock.patch.object(notifer_api, 'notify') as mynotifier: - if opname == 'create': - initial_input[resource]['tenant_id'] = _uuid() - res = self.api.post_json( - _get_path('networks'), - initial_input, expect_errors=expected_errors) - if opname == 'update': - res = self.api.put_json( - _get_path('networks', id=_uuid()), - initial_input, expect_errors=expected_errors) - expected_code = exc.HTTPOk.code - if opname == 'delete': - initial_input[resource]['tenant_id'] = _uuid() - res = self.api.delete( - _get_path('networks', id=_uuid()), - expect_errors=expected_errors) - expected_code = exc.HTTPNoContent.code - expected = [mock.call(mock.ANY, - 'network.' + cfg.CONF.host, - resource + "." + opname + ".start", - notification_level, - mock.ANY), - mock.call(mock.ANY, - 'network.' + cfg.CONF.host, - resource + "." + opname + ".end", - notification_level, - mock.ANY)] - self.assertEqual(expected, mynotifier.call_args_list) + if opname == 'create': + initial_input[resource]['tenant_id'] = _uuid() + res = self.api.post_json( + _get_path('networks'), + initial_input, expect_errors=expected_errors) + if opname == 'update': + res = self.api.put_json( + _get_path('networks', id=_uuid()), + initial_input, expect_errors=expected_errors) + expected_code = exc.HTTPOk.code + if opname == 'delete': + initial_input[resource]['tenant_id'] = _uuid() + res = self.api.delete( + _get_path('networks', id=_uuid()), + expect_errors=expected_errors) + expected_code = exc.HTTPNoContent.code + + expected_events = ('.'.join([resource, opname, "start"]), + '.'.join([resource, opname, "end"])) + self.assertEqual(len(fake_notifier.NOTIFICATIONS), + len(expected_events)) + for msg, event in zip(fake_notifier.NOTIFICATIONS, expected_events): + self.assertEqual('INFO', msg['priority']) + self.assertEqual(event, msg['event_type']) + self.assertEqual(res.status_int, expected_code) def test_network_create_notifer(self): @@ -1288,11 +1289,6 @@ class NotificationTest(APIv2TestBase): def test_network_update_notifer(self): self._resource_op_notifier('update', 'network') - def test_network_create_notifer_with_log_level(self): - cfg.CONF.set_override('default_notification_level', 'DEBUG') - self._resource_op_notifier('create', 'network', - notification_level='DEBUG') - class DHCPNotificationTest(APIv2TestBase): def _test_dhcp_notifier(self, opname, resource, initial_input=None): diff --git a/neutron/tests/unit/test_l3_plugin.py b/neutron/tests/unit/test_l3_plugin.py index b02ba15f87..4eb80d0d33 100644 --- a/neutron/tests/unit/test_l3_plugin.py +++ b/neutron/tests/unit/test_l3_plugin.py @@ -38,9 +38,9 @@ from neutron.extensions import l3 from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import log as logging -from neutron.openstack.common.notifier import test_notifier from neutron.openstack.common import uuidutils from neutron.plugins.common import constants as service_constants +from neutron.tests import fake_notifier from neutron.tests.unit import test_agent_ext_plugin from neutron.tests.unit import test_api_v2 from neutron.tests.unit import test_api_v2_extension @@ -660,7 +660,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): 'subnet.create.end', 'router.interface.create', 'router.interface.delete'] - test_notifier.NOTIFICATIONS = [] + fake_notifier.reset() with self.router() as r: with self.subnet() as s: body = self._router_interface_action('add', @@ -683,9 +683,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): self.assertEqual( set(exp_notifications), - set(n['event_type'] for n in test_notifier.NOTIFICATIONS)) + set(n['event_type'] for n in fake_notifier.NOTIFICATIONS)) - for n in test_notifier.NOTIFICATIONS: + for n in fake_notifier.NOTIFICATIONS: if n['event_type'].startswith('router.interface.'): payload = n['payload']['router_interface'] self.assertIn('id', payload) diff --git a/requirements.txt b/requirements.txt index 5ba04f255f..f34177ab21 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,6 +23,7 @@ alembic>=0.4.1 six>=1.7.0 stevedore>=0.14 oslo.config>=1.2.1 +oslo.messaging>=1.3.0 oslo.rootwrap python-novaclient>=2.17.0 diff --git a/setup.cfg b/setup.cfg index cc4db51b9c..0eaaaed0f6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -169,6 +169,14 @@ neutron.ml2.mechanism_drivers = fslsdn = neutron.plugins.ml2.drivers.mechanism_fslsdn:FslsdnMechanismDriver neutron.openstack.common.cache.backends = memory = neutron.openstack.common.cache._backends.memory:MemoryBackend +# These are for backwards compat with Icehouse notification_driver configuration values +oslo.messaging.notify.drivers = + neutron.openstack.common.notifier.log_notifier = oslo.messaging.notify._impl_log:LogDriver + neutron.openstack.common.notifier.no_op_notifier = oslo.messaging.notify._impl_noop:NoOpDriver + neutron.openstack.common.notifier.rpc_notifier2 = oslo.messaging.notify._impl_messaging:MessagingV2Driver + neutron.openstack.common.notifier.rpc_notifier = oslo.messaging.notify._impl_messaging:MessagingDriver + neutron.openstack.common.notifier.test_notifier = oslo.messaging.notify._impl_test:TestDriver + [build_sphinx] all_files = 1