diff --git a/doc/source/contributor/internals.rst b/doc/source/contributor/internals.rst index 6c402ec67..454caf9ea 100644 --- a/doc/source/contributor/internals.rst +++ b/doc/source/contributor/internals.rst @@ -32,3 +32,4 @@ Neutron Lib Internals api_validators callbacks db_model_query + rpc_api diff --git a/doc/source/contributor/rpc_api.rst b/doc/source/contributor/rpc_api.rst new file mode 100644 index 000000000..b93b9292a --- /dev/null +++ b/doc/source/contributor/rpc_api.rst @@ -0,0 +1,175 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + + Convention for heading levels in Neutron devref: + ======= Heading 0 (reserved for the title in a document) + ------- Heading 1 + ~~~~~~~ Heading 2 + +++++++ Heading 3 + ''''''' Heading 4 + (Avoid deeper levels because they do not render well.) + + +Neutron RPC API Layer +===================== + +Neutron uses the oslo.messaging library to provide an internal communication +channel between Neutron services. This communication is typically done via +AMQP, but those details are mostly hidden by the use of oslo.messaging and it +could be some other protocol in the future. + +RPC APIs are defined in Neutron in two parts: client side and server side. + +Client Side +----------- + +Here is an example of an rpc client definition: + +:: + + import oslo_messaging + + from neutron_lib import rpc as n_rpc + + + class ClientAPI(object): + """Client side RPC interface definition. + + API version history: + 1.0 - Initial version + 1.1 - Added my_remote_method_2 + """ + + def __init__(self, topic): + target = oslo_messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) + + def my_remote_method(self, context, arg1, arg2): + cctxt = self.client.prepare() + return cctxt.call(context, 'my_remote_method', arg1=arg1, arg2=arg2) + + def my_remote_method_2(self, context, arg1): + cctxt = self.client.prepare(version='1.1') + return cctxt.call(context, 'my_remote_method_2', arg1=arg1) + + +This class defines the client side interface for an rpc API. The interface has +2 methods. The first method existed in version 1.0 of the interface. The +second method was added in version 1.1. When the newer method is called, it +specifies that the remote side must implement at least version 1.1 to handle +this request. + +Server Side +----------- + +The server side of an rpc interface looks like this: + +:: + + import oslo_messaging + + + class ServerAPI(object): + + target = oslo_messaging.Target(version='1.1') + + def my_remote_method(self, context, arg1, arg2): + return 'foo' + + def my_remote_method_2(self, context, arg1): + return 'bar' + + +This class implements the server side of the interface. The +oslo_messaging.Target() defined says that this class currently implements +version 1.1 of the interface. + +.. _rpc_versioning: + +Versioning +---------- + +Note that changes to rpc interfaces must always be done in a backwards +compatible way. The server side should always be able to handle older clients +(within the same major version series, such as 1.X). + +It is possible to bump the major version number and drop some code only needed +for backwards compatibility. For more information about how to do that, see +https://wiki.openstack.org/wiki/RpcMajorVersionUpdates. + +Example Change +~~~~~~~~~~~~~~ + +As an example minor API change, let's assume we want to add a new parameter to +my_remote_method_2. First, we add the argument on the server side. To be +backwards compatible, the new argument must have a default value set so that the +interface will still work even if the argument is not supplied. Also, the +interface's minor version number must be incremented. So, the new server side +code would look like this: + +:: + + import oslo_messaging + + + class ServerAPI(object): + + target = oslo_messaging.Target(version='1.2') + + def my_remote_method(self, context, arg1, arg2): + return 'foo' + + def my_remote_method_2(self, context, arg1, arg2=None): + if not arg2: + # Deal with the fact that arg2 was not specified if needed. + return 'bar' + +We can now update the client side to pass the new argument. The client must +also specify that version '1.2' is required for this method call to be +successful. The updated client side would look like this: + +:: + + import oslo_messaging + + from neutron.common import rpc as n_rpc + + + class ClientAPI(object): + """Client side RPC interface definition. + + API version history: + 1.0 - Initial version + 1.1 - Added my_remote_method_2 + 1.2 - Added arg2 to my_remote_method_2 + """ + + def __init__(self, topic): + target = oslo_messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) + + def my_remote_method(self, context, arg1, arg2): + cctxt = self.client.prepare() + return cctxt.call(context, 'my_remote_method', arg1=arg1, arg2=arg2) + + def my_remote_method_2(self, context, arg1, arg2): + cctxt = self.client.prepare(version='1.2') + return cctxt.call(context, 'my_remote_method_2', + arg1=arg1, arg2=arg2) + +More Info +--------- + +For more information, see the oslo.messaging documentation: +https://docs.openstack.org/oslo.messaging/latest/. diff --git a/neutron_lib/exceptions/__init__.py b/neutron_lib/exceptions/__init__.py index 2dbac6f85..32db10320 100644 --- a/neutron_lib/exceptions/__init__.py +++ b/neutron_lib/exceptions/__init__.py @@ -557,3 +557,227 @@ class PhysicalNetworkNameError(NeutronException): class TenantIdProjectIdFilterConflict(BadRequest): message = _("Both tenant_id and project_id passed as filters.") + + +class SubnetPoolNotFound(NotFound): + message = _("Subnet pool %(subnetpool_id)s could not be found.") + + +class StateInvalid(BadRequest): + message = _("Unsupported port state: %(port_state)s.") + + +class DhcpPortInUse(InUse): + message = _("Port %(port_id)s is already acquired by another DHCP agent") + + +class HostRoutesExhausted(BadRequest): + # NOTE(xchenum): probably make sense to use quota exceeded exception? + message = _("Unable to complete operation for %(subnet_id)s. " + "The number of host routes exceeds the limit %(quota)s.") + + +class DNSNameServersExhausted(BadRequest): + # NOTE(xchenum): probably make sense to use quota exceeded exception? + message = _("Unable to complete operation for %(subnet_id)s. " + "The number of DNS nameservers exceeds the limit %(quota)s.") + + +class FlatNetworkInUse(InUse): + message = _("Unable to create the flat network. " + "Physical network %(physical_network)s is in use.") + + +class NoNetworkFoundInMaximumAllowedAttempts(ServiceUnavailable): + message = _("Unable to create the network. " + "No available network found in maximum allowed attempts.") + + +class MalformedRequestBody(BadRequest): + message = _("Malformed request body: %(reason)s.") + + +class InvalidAllocationPool(BadRequest): + message = _("The allocation pool %(pool)s is not valid.") + + +class UnsupportedPortDeviceOwner(Conflict): + message = _("Operation %(op)s is not supported for device_owner " + "%(device_owner)s on port %(port_id)s.") + + +class OverlappingAllocationPools(Conflict): + message = _("Found overlapping allocation pools: " + "%(pool_1)s %(pool_2)s for subnet %(subnet_cidr)s.") + + +class OutOfBoundsAllocationPool(BadRequest): + message = _("The allocation pool %(pool)s spans " + "beyond the subnet cidr %(subnet_cidr)s.") + + +class BridgeDoesNotExist(NeutronException): + message = _("Bridge %(bridge)s does not exist.") + + +class QuotaResourceUnknown(NotFound): + message = _("Unknown quota resources %(unknown)s.") + + +class QuotaMissingTenant(BadRequest): + message = _("Tenant-id was missing from quota request.") + + +class InvalidQuotaValue(Conflict): + message = _("Change would make usage less than 0 for the following " + "resources: %(unders)s.") + + +class InvalidSharedSetting(Conflict): + message = _("Unable to reconfigure sharing settings for network " + "%(network)s. Multiple tenants are using it.") + + +class ExtensionsNotFound(NotFound): + message = _("Extensions not found: %(extensions)s.") + + +class GatewayConflictWithAllocationPools(InUse): + message = _("Gateway ip %(ip_address)s conflicts with " + "allocation pool %(pool)s.") + + +class GatewayIpInUse(InUse): + message = _("Current gateway ip %(ip_address)s already in use " + "by port %(port_id)s. Unable to update.") + + +class NetworkVxlanPortRangeError(NeutronException): + message = _("Invalid network VXLAN port range: '%(vxlan_range)s'.") + + +class VxlanNetworkUnsupported(NeutronException): + message = _("VXLAN network unsupported.") + + +class DuplicatedExtension(NeutronException): + message = _("Found duplicate extension: %(alias)s.") + + +class DriverCallError(MultipleExceptions): + def __init__(self, exc_list=None): + super(DriverCallError, self).__init__(exc_list or []) + + +class DeviceIDNotOwnedByTenant(Conflict): + message = _("The following device_id %(device_id)s is not owned by your " + "tenant or matches another tenants router.") + + +class InvalidCIDR(BadRequest): + message = _("Invalid CIDR %(input)s given as IP prefix.") + + +class FailToDropPrivilegesExit(SystemExit): + """Exit exception raised when a drop privileges action fails.""" + code = 99 + + +class NetworkIdOrRouterIdRequiredError(NeutronException): + message = _('Both network_id and router_id are None. ' + 'One must be provided.') + + +class EmptySubnetPoolPrefixList(BadRequest): + message = _("Empty subnet pool prefix list.") + + +class PrefixVersionMismatch(BadRequest): + message = _("Cannot mix IPv4 and IPv6 prefixes in a subnet pool.") + + +class UnsupportedMinSubnetPoolPrefix(BadRequest): + message = _("Prefix '%(prefix)s' not supported in IPv%(version)s pool.") + + +class IllegalSubnetPoolPrefixBounds(BadRequest): + message = _("Illegal prefix bounds: %(prefix_type)s=%(prefixlen)s, " + "%(base_prefix_type)s=%(base_prefixlen)s.") + + +class IllegalSubnetPoolPrefixUpdate(BadRequest): + message = _("Illegal update to prefixes: %(msg)s.") + + +class SubnetAllocationError(NeutronException): + message = _("Failed to allocate subnet: %(reason)s.") + + +class AddressScopePrefixConflict(Conflict): + message = _("Failed to associate address scope: subnetpools " + "within an address scope must have unique prefixes.") + + +class IllegalSubnetPoolAssociationToAddressScope(BadRequest): + message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s " + "cannot be associated with address scope " + "%(address_scope_id)s.") + + +class IllegalSubnetPoolIpVersionAssociationToAddressScope(BadRequest): + message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s " + "cannot associate with address scope %(address_scope_id)s " + "because subnetpool ip_version is not %(ip_version)s.") + + +class IllegalSubnetPoolUpdate(BadRequest): + message = _("Illegal subnetpool update : %(reason)s.") + + +class MinPrefixSubnetAllocationError(BadRequest): + message = _("Unable to allocate subnet with prefix length %(prefixlen)s, " + "minimum allowed prefix is %(min_prefixlen)s.") + + +class MaxPrefixSubnetAllocationError(BadRequest): + message = _("Unable to allocate subnet with prefix length %(prefixlen)s, " + "maximum allowed prefix is %(max_prefixlen)s.") + + +class SubnetPoolDeleteError(BadRequest): + message = _("Unable to delete subnet pool: %(reason)s.") + + +class SubnetPoolQuotaExceeded(OverQuota): + message = _("Per-tenant subnet pool prefix quota exceeded.") + + +class NetworkSubnetPoolAffinityError(BadRequest): + message = _("Subnets hosted on the same network must be allocated from " + "the same subnet pool.") + + +class ObjectActionError(NeutronException): + message = _('Object action %(action)s failed because: %(reason)s.') + + +class CTZoneExhaustedError(NeutronException): + message = _("IPtables conntrack zones exhausted, iptables rules cannot " + "be applied.") + + +class TenantQuotaNotFound(NotFound): + message = _("Quota for tenant %(tenant_id)s could not be found.") + + +class MultipleFilterIDForIPFound(Conflict): + message = _("Multiple filter IDs for IP %(ip)s found.") + + +class FilterIDForIPNotFound(NotFound): + message = _("Filter ID for IP %(ip)s could not be found.") + + +class FailedToAddQdiscToDevice(NeutronException): + message = _("Failed to add %(direction)s qdisc " + "to device %(device)s.") diff --git a/neutron_lib/exceptions/l3.py b/neutron_lib/exceptions/l3.py index 55f3a4320..4ed192f6c 100644 --- a/neutron_lib/exceptions/l3.py +++ b/neutron_lib/exceptions/l3.py @@ -71,3 +71,23 @@ class RouterExternalGatewayInUseByFloatingIp(exceptions.InUse): class RouterInterfaceAttachmentConflict(exceptions.Conflict): message = _("Error %(reason)s while attempting the operation.") + + +class RouterNotCompatibleWithAgent(exceptions.NeutronException): + message = _("Router '%(router_id)s' is not compatible with this agent.") + + +class FloatingIpSetupException(exceptions.NeutronException): + def __init__(self, message=None): + self.message = message + super(FloatingIpSetupException, self).__init__() + + +class AbortSyncRouters(exceptions.NeutronException): + message = _("Aborting periodic_sync_routers_task due to an error.") + + +class IpTablesApplyException(exceptions.NeutronException): + def __init__(self, message=None): + self.message = message + super(IpTablesApplyException, self).__init__() diff --git a/neutron_lib/exceptions/qos.py b/neutron_lib/exceptions/qos.py new file mode 100644 index 000000000..40f650abd --- /dev/null +++ b/neutron_lib/exceptions/qos.py @@ -0,0 +1,87 @@ +# Copyright 2011 VMware, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron_lib._i18n import _ +from neutron_lib import exceptions as e + + +class QosPolicyNotFound(e.NotFound): + message = _("QoS policy %(policy_id)s could not be found.") + + +class QosRuleNotFound(e.NotFound): + message = _("QoS rule %(rule_id)s for policy %(policy_id)s " + "could not be found.") + + +class QoSPolicyDefaultAlreadyExists(e.Conflict): + message = _("A default QoS policy exists for project %(project_id)s.") + + +class PortQosBindingNotFound(e.NotFound): + message = _("QoS binding for port %(port_id)s and policy %(policy_id)s " + "could not be found.") + + +class PortQosBindingError(e.NeutronException): + message = _("QoS binding for port %(port_id)s and policy %(policy_id)s " + "could not be created: %(db_error)s.") + + +class NetworkQosBindingNotFound(e.NotFound): + message = _("QoS binding for network %(net_id)s and policy %(policy_id)s " + "could not be found.") + + +class FloatingIPQosBindingNotFound(e.NotFound): + message = _("QoS binding for floating IP %(fip_id)s and policy " + "%(policy_id)s could not be found.") + + +class QosPolicyInUse(e.InUse): + message = _("QoS Policy %(policy_id)s is used by " + "%(object_type)s %(object_id)s.") + + +class FloatingIPQosBindingError(e.NeutronException): + message = _("QoS binding for floating IP %(fip_id)s and policy " + "%(policy_id)s could not be created: %(db_error)s.") + + +class NetworkQosBindingError(e.NeutronException): + message = _("QoS binding for network %(net_id)s and policy %(policy_id)s " + "could not be created: %(db_error)s.") + + +class QosRuleNotSupported(e.Conflict): + message = _("Rule %(rule_type)s is not supported by port %(port_id)s") + + +class QoSRuleParameterConflict(e.Conflict): + message = _("Unable to add the rule with value %(rule_value)s to the " + "policy %(policy_id)s as the existing rule of type " + "%(existing_rule)s restricts the bandwidth to " + "%(existing_value)s.") + + +class QoSRulesConflict(e.Conflict): + message = _("Rule %(new_rule_type)s conflicts with " + "rule %(rule_id)s which already exists in " + "QoS Policy %(policy_id)s.") + + +class PolicyRemoveAuthorizationError(e.NotAuthorized): + message = _("Failed to remove provided policy %(policy_id)s " + "because you are not authorized.") diff --git a/neutron_lib/fixture.py b/neutron_lib/fixture.py index 23a39e06e..3e698ae92 100644 --- a/neutron_lib/fixture.py +++ b/neutron_lib/fixture.py @@ -14,6 +14,8 @@ import copy import fixtures import mock +from oslo_config import cfg +from oslo_messaging import conffixture from neutron_lib.api import attributes from neutron_lib.api import definitions @@ -23,6 +25,11 @@ from neutron_lib.db import api as db_api from neutron_lib.db import model_base from neutron_lib.db import model_query from neutron_lib.plugins import directory +from neutron_lib import rpc +from neutron_lib.tests.unit import fake_notifier + + +CONF = cfg.CONF class PluginDirectoryFixture(fixtures.Fixture): @@ -244,3 +251,22 @@ class DBQueryHooksFixture(fixtures.Fixture): def _restore(self): model_query._model_query_hooks = self._backup + + +class RPCFixture(fixtures.Fixture): + + def _setUp(self): + # don't actually start RPC listeners when testing + mock.patch.object(rpc.Connection, 'consume_in_threads', + return_value=[]).start() + self.useFixture(fixtures.MonkeyPatch( + 'oslo_messaging.Notifier', fake_notifier.FakeNotifier)) + + self.messaging_conf = conffixture.ConfFixture(CONF) + self.messaging_conf.transport_driver = 'fake' + # NOTE(russellb) We want all calls to return immediately. + self.messaging_conf.response_timeout = 0 + self.useFixture(self.messaging_conf) + + self.addCleanup(rpc.cleanup) + rpc.init(CONF) diff --git a/neutron_lib/rpc.py b/neutron_lib/rpc.py new file mode 100644 index 000000000..5716c2dc2 --- /dev/null +++ b/neutron_lib/rpc.py @@ -0,0 +1,357 @@ +# Copyright (c) 2012 OpenStack Foundation. +# Copyright (c) 2014 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import random +import time + +from neutron_lib._i18n import _ +from neutron_lib import context +from neutron_lib import exceptions +from neutron_lib.utils import runtime +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging +from oslo_messaging.rpc import dispatcher +from oslo_messaging import serializer as om_serializer +from oslo_service import service +from oslo_utils import excutils +from osprofiler import profiler + + +LOG = logging.getLogger(__name__) +TRANSPORT = None +NOTIFICATION_TRANSPORT = None +NOTIFIER = None + +_DFT_EXMODS = runtime.list_package_modules(exceptions.__name__) + + +def init(conf, rpc_ext_mods=None): + """Initialize the global RPC objects. + + :param conf: The oslo conf to use for initialization. + :param rpc_ext_mods: Exception modules to expose via RPC. + :returns: None. + """ + global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER + + if rpc_ext_mods is None: + rpc_ext_mods = _DFT_EXMODS + else: + rpc_ext_mods = list(set(rpc_ext_mods + _DFT_EXMODS)) + + TRANSPORT = oslo_messaging.get_rpc_transport( + conf, allowed_remote_exmods=rpc_ext_mods) + NOTIFICATION_TRANSPORT = oslo_messaging.get_notification_transport( + conf, allowed_remote_exmods=rpc_ext_mods) + serializer = RequestContextSerializer() + NOTIFIER = oslo_messaging.Notifier(NOTIFICATION_TRANSPORT, + serializer=serializer) + + +def cleanup(): + """Deactivate and cleanup the global RPC objects. + + :returns: None. + """ + global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER + if TRANSPORT is None: + raise AssertionError(_("'TRANSPORT' must not be None")) + if NOTIFICATION_TRANSPORT is None: + raise AssertionError( + _("'NOTIFICATION_TRANSPORT' must not be None")) + if NOTIFIER is None: + raise AssertionError(_("'NOTIFIER' must not be None")) + TRANSPORT.cleanup() + NOTIFICATION_TRANSPORT.cleanup() + _BackingOffContextWrapper.reset_timeouts() + TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None + + +def _get_default_method_timeout(): + return TRANSPORT.conf.rpc_response_timeout + + +def _get_default_method_timeouts(): + return collections.defaultdict(_get_default_method_timeout) + + +class _ContextWrapper(object): + def __init__(self, original_context): + self._original_context = original_context + + def __getattr__(self, name): + return getattr(self._original_context, name) + + def cast(self, ctxt, method, **kwargs): + try: + self._original_context.cast(ctxt, method, **kwargs) + except Exception as e: + # TODO(kevinbenton): make catch specific to missing exchange once + # bug/1705351 is resolved on the oslo.messaging side; if + # oslo.messaging auto-creates the exchange, then just remove the + # code completely + LOG.debug("Ignored exception during cast: %e", e) + + +class _BackingOffContextWrapper(_ContextWrapper): + """Wraps oslo messaging contexts to set the timeout for calls. + + This intercepts RPC calls and sets the timeout value to the globally + adapting value for each method. An oslo messaging timeout results in + a doubling of the timeout value for the method on which it timed out. + There currently is no logic to reduce the timeout since busy Neutron + servers are more frequently the cause of timeouts rather than lost + messages. + """ + _METHOD_TIMEOUTS = _get_default_method_timeouts() + _max_timeout = None + + @classmethod + def reset_timeouts(cls): + # restore the original default timeout factory + cls._METHOD_TIMEOUTS = _get_default_method_timeouts() + cls._max_timeout = None + + @classmethod + def get_max_timeout(cls): + return cls._max_timeout or _get_default_method_timeout() * 10 + + @classmethod + def set_max_timeout(cls, max_timeout): + if max_timeout < cls.get_max_timeout(): + cls._METHOD_TIMEOUTS = collections.defaultdict( + lambda: max_timeout, **{ + k: min(v, max_timeout) + for k, v in cls._METHOD_TIMEOUTS.items() + }) + cls._max_timeout = max_timeout + + def call(self, ctxt, method, **kwargs): + # two methods with the same name in different namespaces should + # be tracked independently + if self._original_context.target.namespace: + scoped_method = '%s.%s' % (self._original_context.target.namespace, + method) + else: + scoped_method = method + # set the timeout from the global method timeout tracker for this + # method + self._original_context.timeout = self._METHOD_TIMEOUTS[scoped_method] + try: + return self._original_context.call(ctxt, method, **kwargs) + except oslo_messaging.MessagingTimeout: + with excutils.save_and_reraise_exception(): + wait = random.uniform( + 0, + min(self._METHOD_TIMEOUTS[scoped_method], + TRANSPORT.conf.rpc_response_timeout) + ) + LOG.error("Timeout in RPC method %(method)s. Waiting for " + "%(wait)s seconds before next attempt. If the " + "server is not down, consider increasing the " + "rpc_response_timeout option as Neutron " + "server(s) may be overloaded and unable to " + "respond quickly enough.", + {'wait': int(round(wait)), 'method': scoped_method}) + new_timeout = min( + self._original_context.timeout * 2, self.get_max_timeout()) + if new_timeout > self._METHOD_TIMEOUTS[scoped_method]: + LOG.warning("Increasing timeout for %(method)s calls " + "to %(new)s seconds. Restart the agent to " + "restore it to the default value.", + {'method': scoped_method, 'new': new_timeout}) + self._METHOD_TIMEOUTS[scoped_method] = new_timeout + time.sleep(wait) + + +class BackingOffClient(oslo_messaging.RPCClient): + """An oslo messaging RPC Client that implements a timeout backoff. + + This has all of the same interfaces as oslo_messaging.RPCClient but + if the timeout parameter is not specified, the _BackingOffContextWrapper + returned will track when call timeout exceptions occur and exponentially + increase the timeout for the given call method. + """ + def prepare(self, *args, **kwargs): + ctx = super(BackingOffClient, self).prepare(*args, **kwargs) + # don't back off contexts that explicitly set a timeout + if 'timeout' in kwargs: + return _ContextWrapper(ctx) + return _BackingOffContextWrapper(ctx) + + @staticmethod + def set_max_timeout(max_timeout): + '''Set RPC timeout ceiling for all backing-off RPC clients.''' + _BackingOffContextWrapper.set_max_timeout(max_timeout) + + +def get_client(target, version_cap=None, serializer=None): + """Get an RPC client for the said target. + + The init() function must be called prior to calling this. + :param target: The RPC target for the client. + :param version_cap: The optional version cap for the RPC client. + :param serializer: The optional serializer to use for the RPC client. + :returns: A new RPC client. + """ + if TRANSPORT is None: + raise AssertionError(_("'TRANSPORT' must not be None")) + serializer = RequestContextSerializer(serializer) + return BackingOffClient(TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + +def get_server(target, endpoints, serializer=None): + """Get a new RPC server reference. + + :param target: The target for the new RPC server. + :param endpoints: The endpoints for the RPC server. + :param serializer: The optional serialize to use for the RPC server. + :returns: A new RPC server reference. + """ + if TRANSPORT is None: + raise AssertionError(_("'TRANSPORT' must not be None")) + serializer = RequestContextSerializer(serializer) + access_policy = dispatcher.DefaultRPCAccessPolicy + return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints, + 'eventlet', serializer, + access_policy=access_policy) + + +def get_notifier(service=None, host=None, publisher_id=None): + """Get a new notifier reference. + + :param service: The optional service for the notifier. + :param host: The optional host for the notifier. If not given the host + will be taken from the global CONF. + :param publisher_id: The optional publisher ID for the notifer. + :returns: A new RPC notifier reference. + """ + if NOTIFIER is None: + raise AssertionError(_("'NOTIFIER' must not be None")) + if not publisher_id: + publisher_id = "%s.%s" % (service, host or cfg.CONF.host) + return NOTIFIER.prepare(publisher_id=publisher_id) + + +class RequestContextSerializer(om_serializer.Serializer): + """Convert RPC common context into Neutron Context.""" + def __init__(self, base=None): + super(RequestContextSerializer, self).__init__() + self._base = base + + def serialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.serialize_entity(ctxt, entity) + + def deserialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.deserialize_entity(ctxt, entity) + + def serialize_context(self, ctxt): + _context = ctxt.to_dict() + prof = profiler.get() + if prof: + trace_info = { + "hmac_key": prof.hmac_key, + "base_id": prof.get_base_id(), + "parent_id": prof.get_id() + } + _context['trace_info'] = trace_info + return _context + + def deserialize_context(self, ctxt): + rpc_ctxt_dict = ctxt.copy() + trace_info = rpc_ctxt_dict.pop("trace_info", None) + if trace_info: + profiler.init(**trace_info) + return context.Context.from_dict(rpc_ctxt_dict) + + +@profiler.trace_cls("rpc") +class Service(service.Service): + """Service object for binaries running on hosts. + + A service enables rpc by listening to queues based on topic and host. + """ + def __init__(self, host, topic, manager=None, serializer=None): + super(Service, self).__init__() + self.host = host + self.topic = topic + self.serializer = serializer + if manager is None: + self.manager = self + else: + self.manager = manager + + def start(self): + super(Service, self).start() + + self.conn = Connection() + LOG.debug("Creating Consumer connection for Service %s", + self.topic) + + endpoints = [self.manager] + + self.conn.create_consumer(self.topic, endpoints) + + # Hook to allow the manager to do other initializations after + # the rpc connection is created. + if callable(getattr(self.manager, 'initialize_service_hook', None)): + self.manager.initialize_service_hook(self) + + # Consume from all consumers in threads + self.conn.consume_in_threads() + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.conn.close() + except Exception: # nosec + pass + super(Service, self).stop() + + +class Connection(object): + """A utility class that manages a collection of RPC servers.""" + + def __init__(self): + super(Connection, self).__init__() + self.servers = [] + + def create_consumer(self, topic, endpoints, fanout=False): + target = oslo_messaging.Target( + topic=topic, server=cfg.CONF.host, fanout=fanout) + server = get_server(target, endpoints) + self.servers.append(server) + + def consume_in_threads(self): + for server in self.servers: + server.start() + return self.servers + + def close(self): + for server in self.servers: + server.stop() + for server in self.servers: + server.wait() diff --git a/neutron_lib/tests/unit/fake_notifier.py b/neutron_lib/tests/unit/fake_notifier.py new file mode 100644 index 000000000..ba68f50c9 --- /dev/null +++ b/neutron_lib/tests/unit/fake_notifier.py @@ -0,0 +1,52 @@ +# Copyright 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import functools + + +NOTIFICATIONS = [] + + +def reset(): + del NOTIFICATIONS[:] + + +FakeMessage = collections.namedtuple('Message', + ['publisher_id', 'priority', + 'event_type', 'payload']) + + +class FakeNotifier(object): + + def __init__(self, transport, publisher_id=None, + driver=None, topics=None, + serializer=None, retry=None): + self.transport = transport + self.publisher_id = publisher_id + for priority in ('debug', 'info', 'warn', 'error', 'critical'): + setattr(self, priority, + functools.partial(self._notify, priority=priority.upper())) + + def prepare(self, publisher_id=None): + if publisher_id is None: + publisher_id = self.publisher_id + return self.__class__(self.transport, publisher_id) + + def _notify(self, ctxt, event_type, payload, priority): + msg = dict(publisher_id=self.publisher_id, + priority=priority, + event_type=event_type, + payload=payload) + NOTIFICATIONS.append(msg) diff --git a/neutron_lib/tests/unit/test_rpc.py b/neutron_lib/tests/unit/test_rpc.py new file mode 100644 index 000000000..2c688afa0 --- /dev/null +++ b/neutron_lib/tests/unit/test_rpc.py @@ -0,0 +1,493 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from oslo_config import cfg +import oslo_messaging as messaging +from oslo_messaging import conffixture as messaging_conffixture +from oslo_messaging.rpc import dispatcher +import testtools + +from neutron_lib import fixture +from neutron_lib import rpc +from neutron_lib.tests import _base as base + + +CONF = cfg.CONF + + +class TestRPC(base.BaseTestCase): + def setUp(self): + super(TestRPC, self).setUp() + self.useFixture(fixture.RPCFixture()) + + @mock.patch.object(rpc, 'RequestContextSerializer') + @mock.patch.object(messaging, 'get_rpc_transport') + @mock.patch.object(messaging, 'get_notification_transport') + @mock.patch.object(messaging, 'Notifier') + def test_init(self, mock_not, mock_noti_trans, mock_trans, mock_ser): + notifier = mock.Mock() + transport = mock.Mock() + noti_transport = mock.Mock() + serializer = mock.Mock() + conf = mock.Mock() + + mock_trans.return_value = transport + mock_noti_trans.return_value = noti_transport + mock_ser.return_value = serializer + mock_not.return_value = notifier + + rpc.init(conf, rpc_ext_mods=['foo']) + + expected_mods = list(set(['foo'] + rpc._DFT_EXMODS)) + mock_trans.assert_called_once_with( + conf, allowed_remote_exmods=expected_mods) + mock_noti_trans.assert_called_once_with( + conf, allowed_remote_exmods=expected_mods) + mock_not.assert_called_once_with(noti_transport, + serializer=serializer) + self.assertIsNotNone(rpc.TRANSPORT) + self.assertIsNotNone(rpc.NOTIFICATION_TRANSPORT) + self.assertIsNotNone(rpc.NOTIFIER) + + def test_cleanup_transport_null(self): + rpc.NOTIFIER = mock.Mock() + rpc.NOTIFICATION_TRANSPORT = mock.Mock() + rpc.TRANSPORT = None + self.assertRaises(AssertionError, rpc.cleanup) + rpc.TRANSPORT = mock.Mock() + + def test_cleanup_notification_transport_null(self): + rpc.TRANSPORT = mock.Mock() + rpc.NOTIFIER = mock.Mock() + rpc.NOTIFICATION_TRANSPORT = None + self.assertRaises(AssertionError, rpc.cleanup) + rpc.NOTIFICATION_TRANSPORT = mock.Mock() + + def test_cleanup_notifier_null(self): + rpc.TRANSPORT = mock.Mock() + rpc.NOTIFICATION_TRANSPORT = mock.Mock() + rpc.NOTIFIER = None + self.assertRaises(AssertionError, rpc.cleanup) + rpc.NOTIFIER = mock.Mock() + + def test_cleanup(self): + rpc.NOTIFIER = mock.Mock() + rpc.NOTIFICATION_TRANSPORT = mock.Mock() + rpc.TRANSPORT = mock.Mock() + trans_cleanup = mock.Mock() + not_trans_cleanup = mock.Mock() + rpc.TRANSPORT.cleanup = trans_cleanup + rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup + + rpc.cleanup() + + trans_cleanup.assert_called_once_with() + not_trans_cleanup.assert_called_once_with() + self.assertIsNone(rpc.TRANSPORT) + self.assertIsNone(rpc.NOTIFICATION_TRANSPORT) + self.assertIsNone(rpc.NOTIFIER) + + rpc.TRANSPORT = mock.Mock() + rpc.NOTIFIER = mock.Mock() + rpc.NOTIFICATION_TRANSPORT = mock.Mock() + + @mock.patch.object(rpc, 'RequestContextSerializer') + @mock.patch.object(rpc, 'BackingOffClient') + def test_get_client(self, mock_client, mock_ser): + rpc.TRANSPORT = mock.Mock() + tgt = mock.Mock() + ser = mock.Mock() + mock_client.return_value = 'client' + mock_ser.return_value = ser + + client = rpc.get_client(tgt, version_cap='1.0', serializer='foo') + + mock_ser.assert_called_once_with('foo') + mock_client.assert_called_once_with(rpc.TRANSPORT, + tgt, version_cap='1.0', + serializer=ser) + self.assertEqual('client', client) + + @mock.patch.object(rpc, 'RequestContextSerializer') + @mock.patch.object(messaging, 'get_rpc_server') + def test_get_server(self, mock_get, mock_ser): + rpc.TRANSPORT = mock.Mock() + ser = mock.Mock() + tgt = mock.Mock() + ends = mock.Mock() + mock_ser.return_value = ser + mock_get.return_value = 'server' + + server = rpc.get_server(tgt, ends, serializer='foo') + + mock_ser.assert_called_once_with('foo') + access_policy = dispatcher.DefaultRPCAccessPolicy + mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends, + 'eventlet', ser, + access_policy=access_policy) + self.assertEqual('server', server) + + def test_get_notifier(self): + rpc.NOTIFIER = mock.Mock() + mock_prep = mock.Mock() + mock_prep.return_value = 'notifier' + rpc.NOTIFIER.prepare = mock_prep + + notifier = rpc.get_notifier('service', publisher_id='foo') + + mock_prep.assert_called_once_with(publisher_id='foo') + self.assertEqual('notifier', notifier) + + def test_get_notifier_null_publisher(self): + rpc.NOTIFIER = mock.Mock() + mock_prep = mock.Mock() + mock_prep.return_value = 'notifier' + rpc.NOTIFIER.prepare = mock_prep + + notifier = rpc.get_notifier('service', host='bar') + + mock_prep.assert_called_once_with(publisher_id='service.bar') + self.assertEqual('notifier', notifier) + + +class TestRequestContextSerializer(base.BaseTestCase): + def setUp(self): + super(TestRequestContextSerializer, self).setUp() + self.mock_base = mock.Mock() + self.ser = rpc.RequestContextSerializer(self.mock_base) + self.ser_null = rpc.RequestContextSerializer(None) + + def test_serialize_entity(self): + self.mock_base.serialize_entity.return_value = 'foo' + + ser_ent = self.ser.serialize_entity('context', 'entity') + + self.mock_base.serialize_entity.assert_called_once_with('context', + 'entity') + self.assertEqual('foo', ser_ent) + + def test_deserialize_entity(self): + self.mock_base.deserialize_entity.return_value = 'foo' + + deser_ent = self.ser.deserialize_entity('context', 'entity') + + self.mock_base.deserialize_entity.assert_called_once_with('context', + 'entity') + self.assertEqual('foo', deser_ent) + + def test_deserialize_entity_null_base(self): + deser_ent = self.ser_null.deserialize_entity('context', 'entity') + + self.assertEqual('entity', deser_ent) + + def test_serialize_context(self): + context = mock.Mock() + + self.ser.serialize_context(context) + + context.to_dict.assert_called_once_with() + + def test_deserialize_context(self): + context_dict = {'foo': 'bar', + 'user_id': 1, + 'tenant_id': 1, + 'is_admin': True} + + c = self.ser.deserialize_context(context_dict) + + self.assertEqual(1, c.user_id) + self.assertEqual(1, c.project_id) + + def test_deserialize_context_no_user_id(self): + context_dict = {'foo': 'bar', + 'user': 1, + 'tenant_id': 1, + 'is_admin': True} + + c = self.ser.deserialize_context(context_dict) + + self.assertEqual(1, c.user_id) + self.assertEqual(1, c.project_id) + + def test_deserialize_context_no_tenant_id(self): + context_dict = {'foo': 'bar', + 'user_id': 1, + 'project_id': 1, + 'is_admin': True} + + c = self.ser.deserialize_context(context_dict) + + self.assertEqual(1, c.user_id) + self.assertEqual(1, c.project_id) + + def test_deserialize_context_no_ids(self): + context_dict = {'foo': 'bar', 'is_admin': True} + + c = self.ser.deserialize_context(context_dict) + + self.assertIsNone(c.user_id) + self.assertIsNone(c.project_id) + + +class ServiceTestCase(base.BaseTestCase): + # the class cannot be based on BaseTestCase since it mocks rpc.Connection + + def setUp(self): + super(ServiceTestCase, self).setUp() + self.host = 'foo' + self.topic = 'neutron-agent' + + self.target_mock = mock.patch('oslo_messaging.Target') + self.target_mock.start() + + self.messaging_conf = messaging_conffixture.ConfFixture(CONF) + self.messaging_conf.transport_url = 'fake://' + self.messaging_conf.response_timeout = 0 + self.useFixture(self.messaging_conf) + + self.addCleanup(rpc.cleanup) + rpc.init(CONF) + + @mock.patch.object(cfg, 'CONF') + def test_operations(self, mock_conf): + mock_conf.host = self.host + with mock.patch('oslo_messaging.get_rpc_server') as get_rpc_server: + rpc_server = get_rpc_server.return_value + + service = rpc.Service(self.host, self.topic) + service.start() + rpc_server.start.assert_called_once_with() + + service.stop() + rpc_server.stop.assert_called_once_with() + rpc_server.wait.assert_called_once_with() + + +class TimeoutTestCase(base.BaseTestCase): + def setUp(self): + super(TimeoutTestCase, self).setUp() + + self.messaging_conf = messaging_conffixture.ConfFixture(CONF) + self.messaging_conf.transport_url = 'fake://' + self.messaging_conf.response_timeout = 0 + self.useFixture(self.messaging_conf) + + self.addCleanup(rpc.cleanup) + rpc.init(CONF) + rpc.TRANSPORT = mock.MagicMock() + rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout + target = messaging.Target(version='1.0', topic='testing') + self.client = rpc.get_client(target) + self.call_context = mock.Mock() + self.sleep = mock.patch('time.sleep').start() + rpc.TRANSPORT.conf.rpc_response_timeout = 10 + + def test_timeout_unaffected_when_explicitly_set(self): + rpc.TRANSPORT.conf.rpc_response_timeout = 5 + ctx = self.client.prepare(topic='sandwiches', timeout=77) + with testtools.ExpectedException(messaging.MessagingTimeout): + ctx.call(self.call_context, 'create_pb_and_j') + # ensure that the timeout was not increased and the back-off sleep + # wasn't called + self.assertEqual( + 5, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['create_pb_and_j']) + self.assertFalse(self.sleep.called) + + def test_timeout_store_defaults(self): + # any method should default to the configured timeout + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2']) + # a change to an existing should not affect new or existing ones + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000 + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + self.assertEqual( + rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_3']) + + def test_method_timeout_sleep(self): + rpc.TRANSPORT.conf.rpc_response_timeout = 2 + for i in range(100): + with testtools.ExpectedException(messaging.MessagingTimeout): + self.client.call(self.call_context, 'method_1') + # sleep value should always be between 0 and configured timeout + self.assertGreaterEqual(self.sleep.call_args_list[0][0][0], 0) + self.assertLessEqual(self.sleep.call_args_list[0][0][0], 2) + self.sleep.reset_mock() + + def test_method_timeout_increases_on_timeout_exception(self): + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + for i in range(5): + with testtools.ExpectedException(messaging.MessagingTimeout): + self.client.call(self.call_context, 'method_1') + + # we only care to check the timeouts sent to the transport + timeouts = [call[1]['timeout'] + for call in rpc.TRANSPORT._send.call_args_list] + self.assertEqual([1, 2, 4, 8, 16], timeouts) + + def test_method_timeout_10x_config_ceiling(self): + rpc.TRANSPORT.conf.rpc_response_timeout = 10 + # 5 doublings should max out at the 10xdefault ceiling + for i in range(5): + with testtools.ExpectedException(messaging.MessagingTimeout): + self.client.call(self.call_context, 'method_1') + self.assertEqual( + 10 * rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + with testtools.ExpectedException(messaging.MessagingTimeout): + self.client.call(self.call_context, 'method_1') + self.assertEqual( + 10 * rpc.TRANSPORT.conf.rpc_response_timeout, + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + + def test_timeout_unchanged_on_other_exception(self): + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc.TRANSPORT._send.side_effect = ValueError + with testtools.ExpectedException(ValueError): + self.client.call(self.call_context, 'method_1') + rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout + with testtools.ExpectedException(messaging.MessagingTimeout): + self.client.call(self.call_context, 'method_1') + timeouts = [call[1]['timeout'] + for call in rpc.TRANSPORT._send.call_args_list] + self.assertEqual([1, 1], timeouts) + + def test_timeouts_for_methods_tracked_independently(self): + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 1 + for method in ('method_1', 'method_1', 'method_2', + 'method_1', 'method_2'): + with testtools.ExpectedException(messaging.MessagingTimeout): + self.client.call(self.call_context, method) + timeouts = [call[1]['timeout'] + for call in rpc.TRANSPORT._send.call_args_list] + self.assertEqual([1, 2, 1, 4, 2], timeouts) + + def test_timeouts_for_namespaces_tracked_independently(self): + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1 + for ns in ('ns1', 'ns2'): + self.client.target.namespace = ns + for i in range(4): + with testtools.ExpectedException(messaging.MessagingTimeout): + self.client.call(self.call_context, 'method') + timeouts = [call[1]['timeout'] + for call in rpc.TRANSPORT._send.call_args_list] + self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts) + + def test_method_timeout_increases_with_prepare(self): + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 + ctx = self.client.prepare(version='1.4') + with testtools.ExpectedException(messaging.MessagingTimeout): + ctx.call(self.call_context, 'method_1') + with testtools.ExpectedException(messaging.MessagingTimeout): + ctx.call(self.call_context, 'method_1') + + # we only care to check the timeouts sent to the transport + timeouts = [call[1]['timeout'] + for call in rpc.TRANSPORT._send.call_args_list] + self.assertEqual([1, 2], timeouts) + + def test_set_max_timeout_caps_all_methods(self): + rpc.TRANSPORT.conf.rpc_response_timeout = 300 + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 100 + rpc.BackingOffClient.set_max_timeout(50) + # both explicitly tracked + self.assertEqual( + 50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + # as well as new methods + self.assertEqual( + 50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2']) + + def test_set_max_timeout_retains_lower_timeouts(self): + rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 10 + rpc.BackingOffClient.set_max_timeout(50) + self.assertEqual( + 10, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) + + def test_set_max_timeout_overrides_default_timeout(self): + rpc.TRANSPORT.conf.rpc_response_timeout = 10 + self.assertEqual( + 10 * 10, rpc._BackingOffContextWrapper.get_max_timeout()) + rpc._BackingOffContextWrapper.set_max_timeout(10) + self.assertEqual(10, rpc._BackingOffContextWrapper.get_max_timeout()) + + +class CastExceptionTestCase(base.BaseTestCase): + def setUp(self): + super(CastExceptionTestCase, self).setUp() + + self.messaging_conf = messaging_conffixture.ConfFixture(CONF) + self.messaging_conf.transport_url = 'fake://' + self.messaging_conf.response_timeout = 0 + self.useFixture(self.messaging_conf) + + self.addCleanup(rpc.cleanup) + rpc.init(CONF) + rpc.TRANSPORT = mock.MagicMock() + rpc.TRANSPORT._send.side_effect = Exception + target = messaging.Target(version='1.0', topic='testing') + self.client = rpc.get_client(target) + self.cast_context = mock.Mock() + + def test_cast_catches_exception(self): + self.client.cast(self.cast_context, 'method_1') + + +class TestConnection(base.BaseTestCase): + def setUp(self): + super(TestConnection, self).setUp() + self.conn = rpc.Connection() + + @mock.patch.object(messaging, 'Target') + @mock.patch.object(cfg, 'CONF') + @mock.patch.object(rpc, 'get_server') + def test_create_consumer(self, mock_get, mock_cfg, mock_tgt): + mock_cfg.host = 'foo' + server = mock.Mock() + target = mock.Mock() + mock_get.return_value = server + mock_tgt.return_value = target + + self.conn.create_consumer('topic', 'endpoints', fanout=True) + + mock_tgt.assert_called_once_with(topic='topic', server='foo', + fanout=True) + mock_get.assert_called_once_with(target, 'endpoints') + self.assertEqual([server], self.conn.servers) + + def test_consume_in_threads(self): + self.conn.servers = [mock.Mock(), mock.Mock()] + + servs = self.conn.consume_in_threads() + + for serv in self.conn.servers: + serv.start.assert_called_once_with() + self.assertEqual(servs, self.conn.servers) + + def test_close(self): + self.conn.servers = [mock.Mock(), mock.Mock()] + + self.conn.close() + + for serv in self.conn.servers: + serv.stop.assert_called_once_with() + serv.wait.assert_called_once_with() diff --git a/neutron_lib/tests/unit/utils/test_runtime.py b/neutron_lib/tests/unit/utils/test_runtime.py index e9e995bd4..70df6906d 100644 --- a/neutron_lib/tests/unit/utils/test_runtime.py +++ b/neutron_lib/tests/unit/utils/test_runtime.py @@ -110,3 +110,11 @@ class TestNamespacedPlugins(base.BaseTestCase): plugins.new_plugin_instance('b') mock_epa.plugin.assert_called_once_with('c', 'd', karg='kval') mock_epb.plugin.assert_called_once_with() + + +class TestListPackageModules(base.BaseTestCase): + + def test_list_package_modules(self): + # mainly just to ensure we can import modules for both PY2/PY3 + self.assertTrue( + len(runtime.list_package_modules('neutron_lib.exceptions')) > 3) diff --git a/neutron_lib/utils/runtime.py b/neutron_lib/utils/runtime.py index 7d52479dd..e4534d69d 100644 --- a/neutron_lib/utils/runtime.py +++ b/neutron_lib/utils/runtime.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pkgutil import sys from oslo_concurrency import lockutils @@ -21,6 +22,7 @@ from stevedore import enabled from neutron_lib._i18n import _ + LOG = logging.getLogger(__name__) SYNCHRONIZED_PREFIX = 'neutron-' @@ -123,3 +125,20 @@ def load_class_by_alias_or_classname(namespace, name): exc_info=True) raise ImportError(_("Class not found.")) return class_to_load + + +def list_package_modules(package_name): + """Get a list of the modules for a given package. + + :param package_name: The package name to get modules for. + :returns: A list of module objects for the said package name. + """ + pkg_mod = importutils.import_module(package_name) + modules = [pkg_mod] + + for mod in pkgutil.walk_packages(pkg_mod.__path__): + _, mod_name, _ = mod + fq_name = pkg_mod.__name__ + "." + mod_name + modules.append(importutils.import_module(fq_name)) + + return modules diff --git a/releasenotes/notes/rehome-common-rpc-5d84a9fe0faa71b7.yaml b/releasenotes/notes/rehome-common-rpc-5d84a9fe0faa71b7.yaml new file mode 100644 index 000000000..8485df134 --- /dev/null +++ b/releasenotes/notes/rehome-common-rpc-5d84a9fe0faa71b7.yaml @@ -0,0 +1,15 @@ +--- +features: + - The ``neutron.common.rpc`` module is now available as ``neutron_lib.rpc`` + and automatically exposes all exception modules from + ``neutron_lib.exceptions`` for RPC usage. + - Exceptions from ``neutron.common.exceptions`` are now available + in the ``neutron_lib.exceptions`` package whereupon exceptions are now + in their respective module (e.g. L3 exceptions are in + ``neutron_lib.exceptions.l3``, etc.). + - The ``neutron.tests.fake_notifier`` is now available as + ``neutron_lib.tests.unit.fake_notifier``. + - The ``neutron_lib.utils.runtime.list_package_modules`` function is now + available for listing all modules in a said package. + - The ``RPCFixture`` is now available in ``neutron_lib.fixtures`` for setting + up RPC based unit tests. diff --git a/tox.ini b/tox.ini index 8bdd5795e..e97e4f233 100644 --- a/tox.ini +++ b/tox.ini @@ -99,8 +99,9 @@ import-order-style = pep8 [testenv:bandit] # B104: Possible binding to all interfaces +# B311: Standard pseudo-random generators are not suitable for security/cryptographic purpose deps = -r{toxinidir}/test-requirements.txt -commands = bandit -r neutron_lib -x tests -n5 -s B104 +commands = bandit -r neutron_lib -x tests -n5 -s B104,B311 [hacking] import_exceptions = neutron_lib._i18n