diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 47fc4a6f9be..16822033154 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -24,6 +24,7 @@ from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources as callback_resources from neutron_lib import constants from neutron_lib.plugins import utils +from neutron_lib import rpc as lib_rpc from oslo_log import log as logging import oslo_messaging from oslo_utils import uuidutils @@ -83,7 +84,7 @@ class PluginReportStateAPI(object): def report_state(self, context, agent_state, use_call=False): cctxt = self.client.prepare( - timeout=n_rpc.TRANSPORT.conf.rpc_response_timeout) + timeout=lib_rpc.TRANSPORT.conf.rpc_response_timeout) # add unique identifier to a report # that can be logged on server side. # This create visible correspondence between events on diff --git a/neutron/common/exceptions.py b/neutron/common/exceptions.py index 865f974342e..f15cc39a6db 100644 --- a/neutron/common/exceptions.py +++ b/neutron/common/exceptions.py @@ -13,343 +13,104 @@ # License for the specific language governing permissions and limitations # under the License. -from neutron_lib import exceptions as e +from neutron_lib import exceptions +from neutron_lib.exceptions import l3 +from neutron_lib.exceptions import qos from neutron._i18n import _ -# TODO(boden): remove rpc shims -PortBindingNotFound = e.PortBindingNotFound - - -class SubnetPoolNotFound(e.NotFound): - message = _("Subnet pool %(subnetpool_id)s could not be found.") - - -class QosPolicyNotFound(e.NotFound): - message = _("QoS policy %(policy_id)s could not be found.") - - -class QosRuleNotFound(e.NotFound): - message = _("QoS rule %(rule_id)s for policy %(policy_id)s " - "could not be found.") - - -class QoSPolicyDefaultAlreadyExists(e.Conflict): - message = _("A default QoS policy exists for project %(project_id)s.") - - -class PortQosBindingNotFound(e.NotFound): - message = _("QoS binding for port %(port_id)s and policy %(policy_id)s " - "could not be found.") - - -class PortQosBindingError(e.NeutronException): - message = _("QoS binding for port %(port_id)s and policy %(policy_id)s " - "could not be created: %(db_error)s.") - - -class NetworkQosBindingNotFound(e.NotFound): - message = _("QoS binding for network %(net_id)s and policy %(policy_id)s " - "could not be found.") - - -class FloatingIPQosBindingNotFound(e.NotFound): - message = _("QoS binding for floating IP %(fip_id)s and policy " - "%(policy_id)s could not be found.") - - -class FloatingIPQosBindingError(e.NeutronException): - message = _("QoS binding for floating IP %(fip_id)s and policy " - "%(policy_id)s could not be created: %(db_error)s.") - - -class NetworkQosBindingError(e.NeutronException): - message = _("QoS binding for network %(net_id)s and policy %(policy_id)s " - "could not be created: %(db_error)s.") - - -class PolicyRemoveAuthorizationError(e.NotAuthorized): - message = _("Failed to remove provided policy %(policy_id)s " - "because you are not authorized.") - - -class StateInvalid(e.BadRequest): - message = _("Unsupported port state: %(port_state)s.") - - -class QosPolicyInUse(e.InUse): - message = _("QoS Policy %(policy_id)s is used by " - "%(object_type)s %(object_id)s.") - - -class DhcpPortInUse(e.InUse): - message = _("Port %(port_id)s is already acquired by another DHCP agent") - - -class HostRoutesExhausted(e.BadRequest): - # NOTE(xchenum): probably make sense to use quota exceeded exception? - message = _("Unable to complete operation for %(subnet_id)s. " - "The number of host routes exceeds the limit %(quota)s.") - - -class DNSNameServersExhausted(e.BadRequest): - # NOTE(xchenum): probably make sense to use quota exceeded exception? - message = _("Unable to complete operation for %(subnet_id)s. " - "The number of DNS nameservers exceeds the limit %(quota)s.") - - -class FlatNetworkInUse(e.InUse): - message = _("Unable to create the flat network. " - "Physical network %(physical_network)s is in use.") - - -class NoNetworkFoundInMaximumAllowedAttempts(e.ServiceUnavailable): - message = _("Unable to create the network. " - "No available network found in maximum allowed attempts.") - - -class MalformedRequestBody(e.BadRequest): - message = _("Malformed request body: %(reason)s.") - - -class InvalidAllocationPool(e.BadRequest): - message = _("The allocation pool %(pool)s is not valid.") - - -class QosRuleNotSupported(e.Conflict): - message = _("Rule %(rule_type)s is not supported by port %(port_id)s") - - -class UnsupportedPortDeviceOwner(e.Conflict): - message = _("Operation %(op)s is not supported for device_owner " - "%(device_owner)s on port %(port_id)s.") - - -class OverlappingAllocationPools(e.Conflict): - message = _("Found overlapping allocation pools: " - "%(pool_1)s %(pool_2)s for subnet %(subnet_cidr)s.") - - -class OutOfBoundsAllocationPool(e.BadRequest): - message = _("The allocation pool %(pool)s spans " - "beyond the subnet cidr %(subnet_cidr)s.") - - -class BridgeDoesNotExist(e.NeutronException): - message = _("Bridge %(bridge)s does not exist.") - - -class QuotaResourceUnknown(e.NotFound): - message = _("Unknown quota resources %(unknown)s.") - - -class QuotaMissingTenant(e.BadRequest): - message = _("Tenant-id was missing from quota request.") - - -class InvalidQuotaValue(e.Conflict): - message = _("Change would make usage less than 0 for the following " - "resources: %(unders)s.") - - -class InvalidSharedSetting(e.Conflict): - message = _("Unable to reconfigure sharing settings for network " - "%(network)s. Multiple tenants are using it.") - - -class QoSRuleParameterConflict(e.Conflict): - message = _("Unable to add the rule with value %(rule_value)s to the " - "policy %(policy_id)s as the existing rule of type " - "%(existing_rule)s restricts the bandwidth to " - "%(existing_value)s.") - - -class QoSRulesConflict(e.Conflict): - message = _("Rule %(new_rule_type)s conflicts with " - "rule %(rule_id)s which already exists in " - "QoS Policy %(policy_id)s.") - - -class ExtensionsNotFound(e.NotFound): - message = _("Extensions not found: %(extensions)s.") - - -class GatewayConflictWithAllocationPools(e.InUse): - message = _("Gateway ip %(ip_address)s conflicts with " - "allocation pool %(pool)s.") - - -class GatewayIpInUse(e.InUse): - message = _("Current gateway ip %(ip_address)s already in use " - "by port %(port_id)s. Unable to update.") - - -class NetworkVxlanPortRangeError(e.NeutronException): - message = _("Invalid network VXLAN port range: '%(vxlan_range)s'.") - - -class VxlanNetworkUnsupported(e.NeutronException): - message = _("VXLAN network unsupported.") - - -class DuplicatedExtension(e.NeutronException): - message = _("Found duplicate extension: %(alias)s.") - - -class DriverCallError(e.MultipleExceptions): - def __init__(self, exc_list=None): - super(DriverCallError, self).__init__(exc_list or []) - - -class DeviceIDNotOwnedByTenant(e.Conflict): - message = _("The following device_id %(device_id)s is not owned by your " - "tenant or matches another tenants router.") - - -class InvalidCIDR(e.BadRequest): - message = _("Invalid CIDR %(input)s given as IP prefix.") - - -class RouterNotCompatibleWithAgent(e.NeutronException): - message = _("Router '%(router_id)s' is not compatible with this agent.") - - -class FailToDropPrivilegesExit(SystemExit): - """Exit exception raised when a drop privileges action fails.""" - code = 99 - - -class FloatingIpSetupException(e.NeutronException): - def __init__(self, message=None): - self.message = message - super(FloatingIpSetupException, self).__init__() - - -class IpTablesApplyException(e.NeutronException): - def __init__(self, message=None): - self.message = message - super(IpTablesApplyException, self).__init__() - - -class NetworkIdOrRouterIdRequiredError(e.NeutronException): - message = _('Both network_id and router_id are None. ' - 'One must be provided.') - - -class AbortSyncRouters(e.NeutronException): - message = _("Aborting periodic_sync_routers_task due to an error.") - - -class EmptySubnetPoolPrefixList(e.BadRequest): - message = _("Empty subnet pool prefix list.") - - -class PrefixVersionMismatch(e.BadRequest): - message = _("Cannot mix IPv4 and IPv6 prefixes in a subnet pool.") - - -class UnsupportedMinSubnetPoolPrefix(e.BadRequest): - message = _("Prefix '%(prefix)s' not supported in IPv%(version)s pool.") - - -class IllegalSubnetPoolPrefixBounds(e.BadRequest): - message = _("Illegal prefix bounds: %(prefix_type)s=%(prefixlen)s, " - "%(base_prefix_type)s=%(base_prefixlen)s.") - - -class IllegalSubnetPoolPrefixUpdate(e.BadRequest): - message = _("Illegal update to prefixes: %(msg)s.") - - -class SubnetAllocationError(e.NeutronException): - message = _("Failed to allocate subnet: %(reason)s.") - - -class AddressScopePrefixConflict(e.Conflict): - message = _("Failed to associate address scope: subnetpools " - "within an address scope must have unique prefixes.") - - -class IllegalSubnetPoolAssociationToAddressScope(e.BadRequest): - message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s " - "cannot be associated with address scope " - "%(address_scope_id)s.") - - -class IllegalSubnetPoolIpVersionAssociationToAddressScope(e.BadRequest): - message = _("Illegal subnetpool association: subnetpool %(subnetpool_id)s " - "cannot associate with address scope %(address_scope_id)s " - "because subnetpool ip_version is not %(ip_version)s.") - - -class IllegalSubnetPoolUpdate(e.BadRequest): - message = _("Illegal subnetpool update : %(reason)s.") - - -class MinPrefixSubnetAllocationError(e.BadRequest): - message = _("Unable to allocate subnet with prefix length %(prefixlen)s, " - "minimum allowed prefix is %(min_prefixlen)s.") - - -class MaxPrefixSubnetAllocationError(e.BadRequest): - message = _("Unable to allocate subnet with prefix length %(prefixlen)s, " - "maximum allowed prefix is %(max_prefixlen)s.") - - -class SubnetPoolDeleteError(e.BadRequest): - message = _("Unable to delete subnet pool: %(reason)s.") - - -class SubnetPoolQuotaExceeded(e.OverQuota): - message = _("Per-tenant subnet pool prefix quota exceeded.") - - -class NetworkSubnetPoolAffinityError(e.BadRequest): - message = _("Subnets hosted on the same network must be allocated from " - "the same subnet pool.") - - -class ObjectActionError(e.NeutronException): - message = _('Object action %(action)s failed because: %(reason)s.') - - -class CTZoneExhaustedError(e.NeutronException): - message = _("IPtables conntrack zones exhausted, iptables rules cannot " - "be applied.") - - -class TenantQuotaNotFound(e.NotFound): - message = _("Quota for tenant %(tenant_id)s could not be found.") - - -class TenantIdProjectIdFilterConflict(e.BadRequest): - message = _("Both tenant_id and project_id passed as filters.") - - -class MultipleFilterIDForIPFound(e.Conflict): - message = _("Multiple filter IDs for IP %(ip)s found.") - - -class FilterIDForIPNotFound(e.NotFound): - message = _("Filter ID for IP %(ip)s could not be found.") - - -class FailedToAddQdiscToDevice(e.NeutronException): - message = _("Failed to add %(direction)s qdisc " - "to device %(device)s.") - - -class PortBindingAlreadyActive(e.Conflict): +# TODO(boden): remove lib shims +SubnetPoolNotFound = exceptions.SubnetPoolNotFound +StateInvalid = exceptions.StateInvalid +DhcpPortInUse = exceptions.DhcpPortInUse +HostRoutesExhausted = exceptions.HostRoutesExhausted +DNSNameServersExhausted = exceptions.DNSNameServersExhausted +FlatNetworkInUse = exceptions.FlatNetworkInUse +NoNetworkFoundInMaximumAllowedAttempts = \ + exceptions.NoNetworkFoundInMaximumAllowedAttempts +MalformedRequestBody = exceptions.MalformedRequestBody +InvalidAllocationPool = exceptions.InvalidAllocationPool +UnsupportedPortDeviceOwner = \ + exceptions.UnsupportedPortDeviceOwner +OverlappingAllocationPools = exceptions.OverlappingAllocationPools +OutOfBoundsAllocationPool = exceptions.OutOfBoundsAllocationPool +BridgeDoesNotExist = exceptions.BridgeDoesNotExist +QuotaResourceUnknown = exceptions.QuotaResourceUnknown +QuotaMissingTenant = exceptions.QuotaMissingTenant +InvalidQuotaValue = exceptions.InvalidQuotaValue +InvalidSharedSetting = exceptions.InvalidSharedSetting +ExtensionsNotFound = exceptions.ExtensionsNotFound +GatewayConflictWithAllocationPools = \ + exceptions.GatewayConflictWithAllocationPools +GatewayIpInUse = exceptions.GatewayIpInUse +NetworkVxlanPortRangeError = exceptions.NetworkVxlanPortRangeError +VxlanNetworkUnsupported = exceptions.VxlanNetworkUnsupported +DuplicatedExtension = exceptions.DuplicatedExtension +DriverCallError = exceptions.DriverCallError +DeviceIDNotOwnedByTenant = exceptions.DeviceIDNotOwnedByTenant +InvalidCIDR = exceptions.InvalidCIDR +FailToDropPrivilegesExit = exceptions.FailToDropPrivilegesExit +NetworkIdOrRouterIdRequiredError = exceptions.NetworkIdOrRouterIdRequiredError +EmptySubnetPoolPrefixList = exceptions.EmptySubnetPoolPrefixList +PrefixVersionMismatch = exceptions.PrefixVersionMismatch +UnsupportedMinSubnetPoolPrefix = exceptions.UnsupportedMinSubnetPoolPrefix +IllegalSubnetPoolPrefixBounds = exceptions.IllegalSubnetPoolPrefixBounds +IllegalSubnetPoolPrefixUpdate = exceptions.IllegalSubnetPoolPrefixUpdate +SubnetAllocationError = exceptions.SubnetAllocationError +AddressScopePrefixConflict = exceptions.AddressScopePrefixConflict +IllegalSubnetPoolAssociationToAddressScope = \ + exceptions.IllegalSubnetPoolAssociationToAddressScope +IllegalSubnetPoolIpVersionAssociationToAddressScope = \ + exceptions.IllegalSubnetPoolIpVersionAssociationToAddressScope +IllegalSubnetPoolUpdate = exceptions.IllegalSubnetPoolUpdate +MinPrefixSubnetAllocationError = exceptions.MinPrefixSubnetAllocationError +MaxPrefixSubnetAllocationError = exceptions.MaxPrefixSubnetAllocationError +SubnetPoolDeleteError = exceptions.SubnetPoolDeleteError +SubnetPoolQuotaExceeded = exceptions.SubnetPoolQuotaExceeded +NetworkSubnetPoolAffinityError = exceptions.NetworkSubnetPoolAffinityError +ObjectActionError = exceptions.ObjectActionError +CTZoneExhaustedError = exceptions.CTZoneExhaustedError +TenantQuotaNotFound = exceptions.TenantQuotaNotFound +TenantIdProjectIdFilterConflict = exceptions.TenantIdProjectIdFilterConflict +MultipleFilterIDForIPFound = exceptions.MultipleFilterIDForIPFound +FilterIDForIPNotFound = exceptions.FilterIDForIPNotFound +FailedToAddQdiscToDevice = exceptions.FailedToAddQdiscToDevice +PortBindingNotFound = exceptions.PortBindingNotFound + +QosPolicyNotFound = qos.QosPolicyNotFound +QosRuleNotFound = qos.QosRuleNotFound +QoSPolicyDefaultAlreadyExists = qos.QoSPolicyDefaultAlreadyExists +PortQosBindingNotFound = qos.PortQosBindingNotFound +PortQosBindingError = qos.PortQosBindingError +NetworkQosBindingNotFound = qos.NetworkQosBindingNotFound +FloatingIPQosBindingNotFound = qos.FloatingIPQosBindingNotFound +FloatingIPQosBindingError = qos.FloatingIPQosBindingError +NetworkQosBindingError = qos.NetworkQosBindingError +PolicyRemoveAuthorizationError = qos.PolicyRemoveAuthorizationError +QosPolicyInUse = qos.QosPolicyInUse +QosRuleNotSupported = qos.QosRuleNotSupported +QoSRuleParameterConflict = qos.QoSRuleParameterConflict +QoSRulesConflict = qos.QoSRulesConflict + +RouterNotCompatibleWithAgent = l3.RouterNotCompatibleWithAgent +FloatingIpSetupException = l3.FloatingIpSetupException +IpTablesApplyException = l3.IpTablesApplyException +AbortSyncRouters = l3.AbortSyncRouters + + +# TODO(boden): rehome these + +class PortBindingAlreadyActive(exceptions.Conflict): message = _("Binding for port %(port_id)s on host %(host)s is already " "active.") -class PortBindingAlreadyExists(e.Conflict): +class PortBindingAlreadyExists(exceptions.Conflict): message = _("Binding for port %(port_id)s on host %(host)s already " "exists.") -class PortBindingError(e.NeutronException): +class PortBindingError(exceptions.NeutronException): message = _("Binding for port %(port_id)s on host %(host)s could not be " "created or updated.") diff --git a/neutron/common/rpc.py b/neutron/common/rpc.py index 73ed735657c..5a790b7de90 100644 --- a/neutron/common/rpc.py +++ b/neutron/common/rpc.py @@ -14,316 +14,20 @@ # License for the specific language governing permissions and limitations # under the License. -import collections -import random -import time - -from neutron_lib import context -from neutron_lib import exceptions as lib_exceptions -from oslo_config import cfg -from oslo_log import log as logging -import oslo_messaging -from oslo_messaging.rpc import dispatcher -from oslo_messaging import serializer as om_serializer -from oslo_service import service -from oslo_utils import excutils -from osprofiler import profiler - -from neutron.common import exceptions +from neutron_lib import rpc -LOG = logging.getLogger(__name__) +# TODO(boden): remove lib rpc shims +TRANSPORT = rpc.TRANSPORT +NOTIFICATION_TRANSPORT = rpc.NOTIFICATION_TRANSPORT +NOTIFIER = rpc.NOTIFIER - -TRANSPORT = None -NOTIFICATION_TRANSPORT = None -NOTIFIER = None - -_DFT_EXMODS = [ - exceptions.__name__, - lib_exceptions.__name__, -] - - -def init(conf, rpc_ext_mods=None): - global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER - - if rpc_ext_mods is None: - rpc_ext_mods = _DFT_EXMODS - else: - rpc_ext_mods = list(set(rpc_ext_mods + _DFT_EXMODS)) - - TRANSPORT = oslo_messaging.get_rpc_transport( - conf, allowed_remote_exmods=rpc_ext_mods) - NOTIFICATION_TRANSPORT = oslo_messaging.get_notification_transport( - conf, allowed_remote_exmods=rpc_ext_mods) - serializer = RequestContextSerializer() - NOTIFIER = oslo_messaging.Notifier(NOTIFICATION_TRANSPORT, - serializer=serializer) - - -def cleanup(): - global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER - if TRANSPORT is None: - raise AssertionError("'TRANSPORT' must not be None") - if NOTIFICATION_TRANSPORT is None: - raise AssertionError("'NOTIFICATION_TRANSPORT' must not be None") - if NOTIFIER is None: - raise AssertionError("'NOTIFIER' must not be None") - TRANSPORT.cleanup() - NOTIFICATION_TRANSPORT.cleanup() - _BackingOffContextWrapper.reset_timeouts() - TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None - - -def _get_default_method_timeout(): - return TRANSPORT.conf.rpc_response_timeout - - -def _get_default_method_timeouts(): - return collections.defaultdict(_get_default_method_timeout) - - -class _ContextWrapper(object): - def __init__(self, original_context): - self._original_context = original_context - - def __getattr__(self, name): - return getattr(self._original_context, name) - - def cast(self, ctxt, method, **kwargs): - try: - self._original_context.cast(ctxt, method, **kwargs) - except Exception as e: - # TODO(kevinbenton): make catch specific to missing exchange once - # bug/1705351 is resolved on the oslo.messaging side; if - # oslo.messaging auto-creates the exchange, then just remove the - # code completely - LOG.debug("Ignored exception during cast: %e", e) - - -class _BackingOffContextWrapper(_ContextWrapper): - """Wraps oslo messaging contexts to set the timeout for calls. - - This intercepts RPC calls and sets the timeout value to the globally - adapting value for each method. An oslo messaging timeout results in - a doubling of the timeout value for the method on which it timed out. - There currently is no logic to reduce the timeout since busy Neutron - servers are more frequently the cause of timeouts rather than lost - messages. - """ - _METHOD_TIMEOUTS = _get_default_method_timeouts() - _max_timeout = None - - @classmethod - def reset_timeouts(cls): - # restore the original default timeout factory - cls._METHOD_TIMEOUTS = _get_default_method_timeouts() - cls._max_timeout = None - - @classmethod - def get_max_timeout(cls): - return cls._max_timeout or _get_default_method_timeout() * 10 - - @classmethod - def set_max_timeout(cls, max_timeout): - if max_timeout < cls.get_max_timeout(): - cls._METHOD_TIMEOUTS = collections.defaultdict( - lambda: max_timeout, **{ - k: min(v, max_timeout) - for k, v in cls._METHOD_TIMEOUTS.items() - }) - cls._max_timeout = max_timeout - - def call(self, ctxt, method, **kwargs): - # two methods with the same name in different namespaces should - # be tracked independently - if self._original_context.target.namespace: - scoped_method = '%s.%s' % (self._original_context.target.namespace, - method) - else: - scoped_method = method - # set the timeout from the global method timeout tracker for this - # method - self._original_context.timeout = self._METHOD_TIMEOUTS[scoped_method] - try: - return self._original_context.call(ctxt, method, **kwargs) - except oslo_messaging.MessagingTimeout: - with excutils.save_and_reraise_exception(): - wait = random.uniform( - 0, - min(self._METHOD_TIMEOUTS[scoped_method], - TRANSPORT.conf.rpc_response_timeout) - ) - LOG.error("Timeout in RPC method %(method)s. Waiting for " - "%(wait)s seconds before next attempt. If the " - "server is not down, consider increasing the " - "rpc_response_timeout option as Neutron " - "server(s) may be overloaded and unable to " - "respond quickly enough.", - {'wait': int(round(wait)), 'method': scoped_method}) - new_timeout = min( - self._original_context.timeout * 2, self.get_max_timeout()) - if new_timeout > self._METHOD_TIMEOUTS[scoped_method]: - LOG.warning("Increasing timeout for %(method)s calls " - "to %(new)s seconds. Restart the agent to " - "restore it to the default value.", - {'method': scoped_method, 'new': new_timeout}) - self._METHOD_TIMEOUTS[scoped_method] = new_timeout - time.sleep(wait) - - -class BackingOffClient(oslo_messaging.RPCClient): - """An oslo messaging RPC Client that implements a timeout backoff. - - This has all of the same interfaces as oslo_messaging.RPCClient but - if the timeout parameter is not specified, the _BackingOffContextWrapper - returned will track when call timeout exceptions occur and exponentially - increase the timeout for the given call method. - """ - def prepare(self, *args, **kwargs): - ctx = super(BackingOffClient, self).prepare(*args, **kwargs) - # don't back off contexts that explicitly set a timeout - if 'timeout' in kwargs: - return _ContextWrapper(ctx) - return _BackingOffContextWrapper(ctx) - - @staticmethod - def set_max_timeout(max_timeout): - '''Set RPC timeout ceiling for all backing-off RPC clients.''' - _BackingOffContextWrapper.set_max_timeout(max_timeout) - - -def get_client(target, version_cap=None, serializer=None): - if TRANSPORT is None: - raise AssertionError("'TRANSPORT' must not be None") - serializer = RequestContextSerializer(serializer) - return BackingOffClient(TRANSPORT, - target, - version_cap=version_cap, - serializer=serializer) - - -def get_server(target, endpoints, serializer=None): - if TRANSPORT is None: - raise AssertionError("'TRANSPORT' must not be None") - serializer = RequestContextSerializer(serializer) - access_policy = dispatcher.DefaultRPCAccessPolicy - return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints, - 'eventlet', serializer, - access_policy=access_policy) - - -def get_notifier(service=None, host=None, publisher_id=None): - if NOTIFIER is None: - raise AssertionError("'NOTIFIER' must not be None") - if not publisher_id: - publisher_id = "%s.%s" % (service, host or cfg.CONF.host) - return NOTIFIER.prepare(publisher_id=publisher_id) - - -class RequestContextSerializer(om_serializer.Serializer): - """This serializer is used to convert RPC common context into - Neutron Context. - """ - def __init__(self, base=None): - super(RequestContextSerializer, self).__init__() - self._base = base - - def serialize_entity(self, ctxt, entity): - if not self._base: - return entity - return self._base.serialize_entity(ctxt, entity) - - def deserialize_entity(self, ctxt, entity): - if not self._base: - return entity - return self._base.deserialize_entity(ctxt, entity) - - def serialize_context(self, ctxt): - _context = ctxt.to_dict() - prof = profiler.get() - if prof: - trace_info = { - "hmac_key": prof.hmac_key, - "base_id": prof.get_base_id(), - "parent_id": prof.get_id() - } - _context['trace_info'] = trace_info - return _context - - def deserialize_context(self, ctxt): - rpc_ctxt_dict = ctxt.copy() - trace_info = rpc_ctxt_dict.pop("trace_info", None) - if trace_info: - profiler.init(**trace_info) - return context.Context.from_dict(rpc_ctxt_dict) - - -@profiler.trace_cls("rpc") -class Service(service.Service): - """Service object for binaries running on hosts. - - A service enables rpc by listening to queues based on topic and host. - """ - def __init__(self, host, topic, manager=None, serializer=None): - super(Service, self).__init__() - self.host = host - self.topic = topic - self.serializer = serializer - if manager is None: - self.manager = self - else: - self.manager = manager - - def start(self): - super(Service, self).start() - - self.conn = Connection() - LOG.debug("Creating Consumer connection for Service %s", - self.topic) - - endpoints = [self.manager] - - self.conn.create_consumer(self.topic, endpoints) - - # Hook to allow the manager to do other initializations after - # the rpc connection is created. - if callable(getattr(self.manager, 'initialize_service_hook', None)): - self.manager.initialize_service_hook(self) - - # Consume from all consumers in threads - self.conn.consume_in_threads() - - def stop(self): - # Try to shut the connection down, but if we get any sort of - # errors, go ahead and ignore them.. as we're shutting down anyway - try: - self.conn.close() - except Exception: # nosec - pass - super(Service, self).stop() - - -class Connection(object): - - def __init__(self): - super(Connection, self).__init__() - self.servers = [] - - def create_consumer(self, topic, endpoints, fanout=False): - target = oslo_messaging.Target( - topic=topic, server=cfg.CONF.host, fanout=fanout) - server = get_server(target, endpoints) - self.servers.append(server) - - def consume_in_threads(self): - for server in self.servers: - server.start() - return self.servers - - def close(self): - for server in self.servers: - server.stop() - for server in self.servers: - server.wait() +init = rpc.init +cleanup = rpc.cleanup +BackingOffClient = rpc.BackingOffClient +get_client = rpc.get_client +get_server = rpc.get_server +get_notifier = rpc.get_notifier +RequestContextSerializer = rpc.RequestContextSerializer +Service = rpc.Service +Connection = rpc.Connection diff --git a/neutron/tests/base.py b/neutron/tests/base.py index a6821ad75aa..be28d5a3bed 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -29,10 +29,10 @@ import mock from neutron_lib.callbacks import manager as registry_manager from neutron_lib.db import api as db_api from neutron_lib import fixture +from neutron_lib.tests.unit import fake_notifier from oslo_concurrency.fixture import lockutils from oslo_config import cfg from oslo_db import options as db_options -from oslo_messaging import conffixture as messaging_conffixture from oslo_utils import excutils from oslo_utils import fileutils from oslo_utils import strutils @@ -45,7 +45,6 @@ from neutron.agent.linux import external_process from neutron.api.rpc.callbacks.consumer import registry as rpc_consumer_reg from neutron.api.rpc.callbacks.producer import registry as rpc_producer_reg from neutron.common import config -from neutron.common import rpc as n_rpc from neutron.conf.agent import common as agent_config from neutron.db import _model_query as model_query from neutron.db import _resource_extend as resource_extend @@ -53,7 +52,6 @@ from neutron.db import agentschedulers_db from neutron import manager from neutron import policy from neutron.quota import resource_registry -from neutron.tests import fake_notifier from neutron.tests import post_mortem_debug from neutron.tests import tools @@ -325,7 +323,8 @@ class BaseTestCase(DietTestCase): 'oslo_config.cfg.find_config_files', lambda project=None, prog=None, extension=None: [])) - self.setup_rpc_mocks() + self.useFixture(fixture.RPCFixture()) + self.setup_config() self._callback_manager = registry_manager.CallbacksManager() @@ -377,24 +376,6 @@ class BaseTestCase(DietTestCase): root = root or self.get_default_temp_dir() return root.join(filename) - def setup_rpc_mocks(self): - # don't actually start RPC listeners when testing - mock.patch( - 'neutron.common.rpc.Connection.consume_in_threads', - return_value=[]).start() - - self.useFixture(fixtures.MonkeyPatch( - 'oslo_messaging.Notifier', fake_notifier.FakeNotifier)) - - self.messaging_conf = messaging_conffixture.ConfFixture(CONF) - self.messaging_conf.transport_url = 'fake://' - # NOTE(russellb) We want all calls to return immediately. - self.messaging_conf.response_timeout = 0 - self.useFixture(self.messaging_conf) - - self.addCleanup(n_rpc.cleanup) - n_rpc.init(CONF) - def setup_config(self, args=None): """Tests that need a non-default config can override this method.""" self.config_parse(args=args) diff --git a/neutron/tests/fake_notifier.py b/neutron/tests/fake_notifier.py deleted file mode 100644 index ba68f50c92d..00000000000 --- a/neutron/tests/fake_notifier.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2014 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import functools - - -NOTIFICATIONS = [] - - -def reset(): - del NOTIFICATIONS[:] - - -FakeMessage = collections.namedtuple('Message', - ['publisher_id', 'priority', - 'event_type', 'payload']) - - -class FakeNotifier(object): - - def __init__(self, transport, publisher_id=None, - driver=None, topics=None, - serializer=None, retry=None): - self.transport = transport - self.publisher_id = publisher_id - for priority in ('debug', 'info', 'warn', 'error', 'critical'): - setattr(self, priority, - functools.partial(self._notify, priority=priority.upper())) - - def prepare(self, publisher_id=None): - if publisher_id is None: - publisher_id = self.publisher_id - return self.__class__(self.transport, publisher_id) - - def _notify(self, ctxt, event_type, payload, priority): - msg = dict(publisher_id=self.publisher_id, - priority=priority, - event_type=event_type, - payload=payload) - NOTIFICATIONS.append(msg) diff --git a/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py b/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py index f2872384cd3..5d08023745b 100644 --- a/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py +++ b/neutron/tests/unit/agent/l3/extensions/qos/test_fip.py @@ -26,6 +26,7 @@ from neutron.agent.l3 import router_info as l3router from neutron.api.rpc.callbacks.consumer import registry from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc +from neutron.common import rpc as n_rpc from neutron.objects.qos import policy from neutron.objects.qos import rule from neutron.tests import base @@ -136,8 +137,7 @@ class FipQosExtensionInitializeTestCase(QosExtensionBaseTestCase): @mock.patch.object(registry, 'register') @mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback') def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock): - call_to_patch = 'neutron.common.rpc.Connection' - with mock.patch(call_to_patch, + with mock.patch.object(n_rpc, 'Connection', return_value=self.connection) as create_connection: self.fip_qos_ext.initialize( self.connection, lib_const.L3_AGENT_MODE) diff --git a/neutron/tests/unit/agent/test_rpc.py b/neutron/tests/unit/agent/test_rpc.py index 4dcd31c8848..71315c5c8f4 100644 --- a/neutron/tests/unit/agent/test_rpc.py +++ b/neutron/tests/unit/agent/test_rpc.py @@ -26,6 +26,7 @@ from oslo_utils import uuidutils from neutron.agent import rpc from neutron.common import constants as n_const +from neutron.common import rpc as n_rpc from neutron.objects import network from neutron.objects import ports from neutron.tests import base @@ -125,8 +126,7 @@ class AgentRPCMethods(base.BaseTestCase): def _test_create_consumers( self, endpoints, method, expected, topics, listen): - call_to_patch = 'neutron.common.rpc.Connection' - with mock.patch(call_to_patch) as create_connection: + with mock.patch.object(n_rpc, 'Connection') as create_connection: rpc.create_consumers( endpoints, method, topics, start_listening=listen) create_connection.assert_has_calls(expected) @@ -167,8 +167,7 @@ class AgentRPCMethods(base.BaseTestCase): mock.call().consume_in_threads() ] - call_to_patch = 'neutron.common.rpc.Connection' - with mock.patch(call_to_patch) as create_connection: + with mock.patch.object(n_rpc, 'Connection') as create_connection: rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')]) create_connection.assert_has_calls(expected) diff --git a/neutron/tests/unit/api/rpc/agentnotifiers/test_l3_rpc_agent_api.py b/neutron/tests/unit/api/rpc/agentnotifiers/test_l3_rpc_agent_api.py index fa6d31f8f1a..cf83b2374d0 100644 --- a/neutron/tests/unit/api/rpc/agentnotifiers/test_l3_rpc_agent_api.py +++ b/neutron/tests/unit/api/rpc/agentnotifiers/test_l3_rpc_agent_api.py @@ -16,6 +16,7 @@ import mock from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api +from neutron.common import rpc from neutron.tests import base @@ -23,8 +24,8 @@ class TestL3AgentNotifyAPI(base.BaseTestCase): def setUp(self): super(TestL3AgentNotifyAPI, self).setUp() - self.rpc_client_mock = mock.patch( - 'neutron.common.rpc.get_client').start().return_value + self.rpc_client_mock = mock.patch.object( + rpc, 'get_client').start().return_value self.l3_notifier = l3_rpc_agent_api.L3AgentNotifyAPI() def _test_arp_update(self, method): diff --git a/neutron/tests/unit/api/v2/test_base.py b/neutron/tests/unit/api/v2/test_base.py index 902733b36f3..dd14c991d96 100644 --- a/neutron/tests/unit/api/v2/test_base.py +++ b/neutron/tests/unit/api/v2/test_base.py @@ -24,6 +24,7 @@ from neutron_lib import context from neutron_lib import exceptions as n_exc from neutron_lib import fixture from neutron_lib.plugins import directory +from neutron_lib.tests.unit import fake_notifier from oslo_config import cfg from oslo_db import exception as db_exc from oslo_policy import policy as oslo_policy @@ -42,7 +43,6 @@ from neutron import policy from neutron import quota from neutron.quota import resource_registry from neutron.tests import base -from neutron.tests import fake_notifier from neutron.tests import tools from neutron.tests.unit import dummy_plugin from neutron.tests.unit import testlib_api diff --git a/neutron/tests/unit/common/test_rpc.py b/neutron/tests/unit/common/test_rpc.py deleted file mode 100644 index e6f79a9d48f..00000000000 --- a/neutron/tests/unit/common/test_rpc.py +++ /dev/null @@ -1,499 +0,0 @@ -# Copyright 2015 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy - -import fixtures -import mock -from oslo_config import cfg -import oslo_messaging as messaging -from oslo_messaging import conffixture as messaging_conffixture -from oslo_messaging.rpc import dispatcher -import testtools - -from neutron.common import rpc -from neutron.tests import base - - -CONF = cfg.CONF -CONF.import_opt('state_path', 'neutron.conf.common') - - -class RPCFixture(fixtures.Fixture): - def _setUp(self): - self.trans = copy.copy(rpc.TRANSPORT) - self.noti_trans = copy.copy(rpc.NOTIFICATION_TRANSPORT) - self.noti = copy.copy(rpc.NOTIFIER) - self.addCleanup(self._reset_everything) - - def _reset_everything(self): - rpc.TRANSPORT = self.trans - rpc.NOTIFICATION_TRANSPORT = self.noti_trans - rpc.NOTIFIER = self.noti - - -class TestRPC(base.DietTestCase): - def setUp(self): - super(TestRPC, self).setUp() - self.useFixture(RPCFixture()) - - @mock.patch.object(rpc, 'RequestContextSerializer') - @mock.patch.object(messaging, 'get_rpc_transport') - @mock.patch.object(messaging, 'get_notification_transport') - @mock.patch.object(messaging, 'Notifier') - def test_init(self, mock_not, mock_noti_trans, mock_trans, mock_ser): - notifier = mock.Mock() - transport = mock.Mock() - noti_transport = mock.Mock() - serializer = mock.Mock() - conf = mock.Mock() - - mock_trans.return_value = transport - mock_noti_trans.return_value = noti_transport - mock_ser.return_value = serializer - mock_not.return_value = notifier - - rpc.init(conf, rpc_ext_mods=['foo']) - - expected_mods = list(set(['foo'] + rpc._DFT_EXMODS)) - mock_trans.assert_called_once_with( - conf, allowed_remote_exmods=expected_mods) - mock_noti_trans.assert_called_once_with( - conf, allowed_remote_exmods=expected_mods) - mock_not.assert_called_once_with(noti_transport, - serializer=serializer) - self.assertIsNotNone(rpc.TRANSPORT) - self.assertIsNotNone(rpc.NOTIFICATION_TRANSPORT) - self.assertIsNotNone(rpc.NOTIFIER) - - def test_cleanup_transport_null(self): - rpc.NOTIFIER = mock.Mock() - rpc.NOTIFICATION_TRANSPORT = mock.Mock() - self.assertRaises(AssertionError, rpc.cleanup) - - def test_cleanup_notification_transport_null(self): - rpc.TRANSPORT = mock.Mock() - rpc.NOTIFIER = mock.Mock() - self.assertRaises(AssertionError, rpc.cleanup) - - def test_cleanup_notifier_null(self): - rpc.TRANSPORT = mock.Mock() - rpc.NOTIFICATION_TRANSPORT = mock.Mock() - self.assertRaises(AssertionError, rpc.cleanup) - - def test_cleanup(self): - rpc.NOTIFIER = mock.Mock() - rpc.NOTIFICATION_TRANSPORT = mock.Mock() - rpc.TRANSPORT = mock.Mock() - trans_cleanup = mock.Mock() - not_trans_cleanup = mock.Mock() - rpc.TRANSPORT.cleanup = trans_cleanup - rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup - - rpc.cleanup() - - trans_cleanup.assert_called_once_with() - not_trans_cleanup.assert_called_once_with() - self.assertIsNone(rpc.TRANSPORT) - self.assertIsNone(rpc.NOTIFICATION_TRANSPORT) - self.assertIsNone(rpc.NOTIFIER) - - @mock.patch.object(rpc, 'RequestContextSerializer') - @mock.patch.object(rpc, 'BackingOffClient') - def test_get_client(self, mock_client, mock_ser): - rpc.TRANSPORT = mock.Mock() - tgt = mock.Mock() - ser = mock.Mock() - mock_client.return_value = 'client' - mock_ser.return_value = ser - - client = rpc.get_client(tgt, version_cap='1.0', serializer='foo') - - mock_ser.assert_called_once_with('foo') - mock_client.assert_called_once_with(rpc.TRANSPORT, - tgt, version_cap='1.0', - serializer=ser) - self.assertEqual('client', client) - - @mock.patch.object(rpc, 'RequestContextSerializer') - @mock.patch.object(messaging, 'get_rpc_server') - def test_get_server(self, mock_get, mock_ser): - rpc.TRANSPORT = mock.Mock() - ser = mock.Mock() - tgt = mock.Mock() - ends = mock.Mock() - mock_ser.return_value = ser - mock_get.return_value = 'server' - - server = rpc.get_server(tgt, ends, serializer='foo') - - mock_ser.assert_called_once_with('foo') - access_policy = dispatcher.DefaultRPCAccessPolicy - mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends, - 'eventlet', ser, - access_policy=access_policy) - self.assertEqual('server', server) - - def test_get_notifier(self): - rpc.NOTIFIER = mock.Mock() - mock_prep = mock.Mock() - mock_prep.return_value = 'notifier' - rpc.NOTIFIER.prepare = mock_prep - - notifier = rpc.get_notifier('service', publisher_id='foo') - - mock_prep.assert_called_once_with(publisher_id='foo') - self.assertEqual('notifier', notifier) - - def test_get_notifier_null_publisher(self): - rpc.NOTIFIER = mock.Mock() - mock_prep = mock.Mock() - mock_prep.return_value = 'notifier' - rpc.NOTIFIER.prepare = mock_prep - - notifier = rpc.get_notifier('service', host='bar') - - mock_prep.assert_called_once_with(publisher_id='service.bar') - self.assertEqual('notifier', notifier) - - -class TestRequestContextSerializer(base.DietTestCase): - def setUp(self): - super(TestRequestContextSerializer, self).setUp() - self.mock_base = mock.Mock() - self.ser = rpc.RequestContextSerializer(self.mock_base) - self.ser_null = rpc.RequestContextSerializer(None) - - def test_serialize_entity(self): - self.mock_base.serialize_entity.return_value = 'foo' - - ser_ent = self.ser.serialize_entity('context', 'entity') - - self.mock_base.serialize_entity.assert_called_once_with('context', - 'entity') - self.assertEqual('foo', ser_ent) - - def test_deserialize_entity(self): - self.mock_base.deserialize_entity.return_value = 'foo' - - deser_ent = self.ser.deserialize_entity('context', 'entity') - - self.mock_base.deserialize_entity.assert_called_once_with('context', - 'entity') - self.assertEqual('foo', deser_ent) - - def test_deserialize_entity_null_base(self): - deser_ent = self.ser_null.deserialize_entity('context', 'entity') - - self.assertEqual('entity', deser_ent) - - def test_serialize_context(self): - context = mock.Mock() - - self.ser.serialize_context(context) - - context.to_dict.assert_called_once_with() - - def test_deserialize_context(self): - context_dict = {'foo': 'bar', - 'user_id': 1, - 'tenant_id': 1, - 'is_admin': True} - - c = self.ser.deserialize_context(context_dict) - - self.assertEqual(1, c.user_id) - self.assertEqual(1, c.project_id) - - def test_deserialize_context_no_user_id(self): - context_dict = {'foo': 'bar', - 'user': 1, - 'tenant_id': 1, - 'is_admin': True} - - c = self.ser.deserialize_context(context_dict) - - self.assertEqual(1, c.user_id) - self.assertEqual(1, c.project_id) - - def test_deserialize_context_no_tenant_id(self): - context_dict = {'foo': 'bar', - 'user_id': 1, - 'project_id': 1, - 'is_admin': True} - - c = self.ser.deserialize_context(context_dict) - - self.assertEqual(1, c.user_id) - self.assertEqual(1, c.project_id) - - def test_deserialize_context_no_ids(self): - context_dict = {'foo': 'bar', 'is_admin': True} - - c = self.ser.deserialize_context(context_dict) - - self.assertIsNone(c.user_id) - self.assertIsNone(c.project_id) - - -class ServiceTestCase(base.DietTestCase): - # the class cannot be based on BaseTestCase since it mocks rpc.Connection - - def setUp(self): - super(ServiceTestCase, self).setUp() - self.host = 'foo' - self.topic = 'neutron-agent' - - self.target_mock = mock.patch('oslo_messaging.Target') - self.target_mock.start() - - self.messaging_conf = messaging_conffixture.ConfFixture(CONF) - self.messaging_conf.transport_url = 'fake://' - self.messaging_conf.response_timeout = 0 - self.useFixture(self.messaging_conf) - - self.addCleanup(rpc.cleanup) - rpc.init(CONF) - - def test_operations(self): - with mock.patch('oslo_messaging.get_rpc_server') as get_rpc_server: - rpc_server = get_rpc_server.return_value - - service = rpc.Service(self.host, self.topic) - service.start() - rpc_server.start.assert_called_once_with() - - service.stop() - rpc_server.stop.assert_called_once_with() - rpc_server.wait.assert_called_once_with() - - -class TimeoutTestCase(base.DietTestCase): - def setUp(self): - super(TimeoutTestCase, self).setUp() - - self.messaging_conf = messaging_conffixture.ConfFixture(CONF) - self.messaging_conf.transport_url = 'fake://' - self.messaging_conf.response_timeout = 0 - self.useFixture(self.messaging_conf) - - self.addCleanup(rpc.cleanup) - rpc.init(CONF) - rpc.TRANSPORT = mock.MagicMock() - rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout - target = messaging.Target(version='1.0', topic='testing') - self.client = rpc.get_client(target) - self.call_context = mock.Mock() - self.sleep = mock.patch('time.sleep').start() - rpc.TRANSPORT.conf.rpc_response_timeout = 10 - - def test_timeout_unaffected_when_explicitly_set(self): - rpc.TRANSPORT.conf.rpc_response_timeout = 5 - ctx = self.client.prepare(topic='sandwiches', timeout=77) - with testtools.ExpectedException(messaging.MessagingTimeout): - ctx.call(self.call_context, 'create_pb_and_j') - # ensure that the timeout was not increased and the back-off sleep - # wasn't called - self.assertEqual( - 5, - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['create_pb_and_j']) - self.assertFalse(self.sleep.called) - - def test_timeout_store_defaults(self): - # any method should default to the configured timeout - self.assertEqual( - rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) - self.assertEqual( - rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2']) - # a change to an existing should not affect new or existing ones - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000 - self.assertEqual( - rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) - self.assertEqual( - rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_3']) - - def test_method_timeout_sleep(self): - rpc.TRANSPORT.conf.rpc_response_timeout = 2 - for i in range(100): - with testtools.ExpectedException(messaging.MessagingTimeout): - self.client.call(self.call_context, 'method_1') - # sleep value should always be between 0 and configured timeout - self.assertGreaterEqual(self.sleep.call_args_list[0][0][0], 0) - self.assertLessEqual(self.sleep.call_args_list[0][0][0], 2) - self.sleep.reset_mock() - - def test_method_timeout_increases_on_timeout_exception(self): - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 - for i in range(5): - with testtools.ExpectedException(messaging.MessagingTimeout): - self.client.call(self.call_context, 'method_1') - - # we only care to check the timeouts sent to the transport - timeouts = [call[1]['timeout'] - for call in rpc.TRANSPORT._send.call_args_list] - self.assertEqual([1, 2, 4, 8, 16], timeouts) - - def test_method_timeout_10x_config_ceiling(self): - rpc.TRANSPORT.conf.rpc_response_timeout = 10 - # 5 doublings should max out at the 10xdefault ceiling - for i in range(5): - with testtools.ExpectedException(messaging.MessagingTimeout): - self.client.call(self.call_context, 'method_1') - self.assertEqual( - 10 * rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) - with testtools.ExpectedException(messaging.MessagingTimeout): - self.client.call(self.call_context, 'method_1') - self.assertEqual( - 10 * rpc.TRANSPORT.conf.rpc_response_timeout, - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) - - def test_timeout_unchanged_on_other_exception(self): - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 - rpc.TRANSPORT._send.side_effect = ValueError - with testtools.ExpectedException(ValueError): - self.client.call(self.call_context, 'method_1') - rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout - with testtools.ExpectedException(messaging.MessagingTimeout): - self.client.call(self.call_context, 'method_1') - timeouts = [call[1]['timeout'] - for call in rpc.TRANSPORT._send.call_args_list] - self.assertEqual([1, 1], timeouts) - - def test_timeouts_for_methods_tracked_independently(self): - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 1 - for method in ('method_1', 'method_1', 'method_2', - 'method_1', 'method_2'): - with testtools.ExpectedException(messaging.MessagingTimeout): - self.client.call(self.call_context, method) - timeouts = [call[1]['timeout'] - for call in rpc.TRANSPORT._send.call_args_list] - self.assertEqual([1, 2, 1, 4, 2], timeouts) - - def test_timeouts_for_namespaces_tracked_independently(self): - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1 - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1 - for ns in ('ns1', 'ns2'): - self.client.target.namespace = ns - for i in range(4): - with testtools.ExpectedException(messaging.MessagingTimeout): - self.client.call(self.call_context, 'method') - timeouts = [call[1]['timeout'] - for call in rpc.TRANSPORT._send.call_args_list] - self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts) - - def test_method_timeout_increases_with_prepare(self): - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1 - ctx = self.client.prepare(version='1.4') - with testtools.ExpectedException(messaging.MessagingTimeout): - ctx.call(self.call_context, 'method_1') - with testtools.ExpectedException(messaging.MessagingTimeout): - ctx.call(self.call_context, 'method_1') - - # we only care to check the timeouts sent to the transport - timeouts = [call[1]['timeout'] - for call in rpc.TRANSPORT._send.call_args_list] - self.assertEqual([1, 2], timeouts) - - def test_set_max_timeout_caps_all_methods(self): - rpc.TRANSPORT.conf.rpc_response_timeout = 300 - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 100 - rpc.BackingOffClient.set_max_timeout(50) - # both explicitly tracked - self.assertEqual( - 50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) - # as well as new methods - self.assertEqual( - 50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2']) - - def test_set_max_timeout_retains_lower_timeouts(self): - rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 10 - rpc.BackingOffClient.set_max_timeout(50) - self.assertEqual( - 10, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1']) - - def test_set_max_timeout_overrides_default_timeout(self): - rpc.TRANSPORT.conf.rpc_response_timeout = 10 - self.assertEqual( - 10 * 10, rpc._BackingOffContextWrapper.get_max_timeout()) - rpc._BackingOffContextWrapper.set_max_timeout(10) - self.assertEqual(10, rpc._BackingOffContextWrapper.get_max_timeout()) - - -class CastExceptionTestCase(base.DietTestCase): - def setUp(self): - super(CastExceptionTestCase, self).setUp() - - self.messaging_conf = messaging_conffixture.ConfFixture(CONF) - self.messaging_conf.transport_url = 'fake://' - self.messaging_conf.response_timeout = 0 - self.useFixture(self.messaging_conf) - - self.addCleanup(rpc.cleanup) - rpc.init(CONF) - rpc.TRANSPORT = mock.MagicMock() - rpc.TRANSPORT._send.side_effect = Exception - target = messaging.Target(version='1.0', topic='testing') - self.client = rpc.get_client(target) - self.cast_context = mock.Mock() - - def test_cast_catches_exception(self): - self.client.cast(self.cast_context, 'method_1') - - -class TestConnection(base.DietTestCase): - def setUp(self): - super(TestConnection, self).setUp() - self.conn = rpc.Connection() - - @mock.patch.object(messaging, 'Target') - @mock.patch.object(cfg, 'CONF') - @mock.patch.object(rpc, 'get_server') - def test_create_consumer(self, mock_get, mock_cfg, mock_tgt): - mock_cfg.host = 'foo' - server = mock.Mock() - target = mock.Mock() - mock_get.return_value = server - mock_tgt.return_value = target - - self.conn.create_consumer('topic', 'endpoints', fanout=True) - - mock_tgt.assert_called_once_with(topic='topic', server='foo', - fanout=True) - mock_get.assert_called_once_with(target, 'endpoints') - self.assertEqual([server], self.conn.servers) - - def test_consume_in_threads(self): - self.conn.servers = [mock.Mock(), mock.Mock()] - - servs = self.conn.consume_in_threads() - - for serv in self.conn.servers: - serv.start.assert_called_once_with() - self.assertEqual(servs, self.conn.servers) - - def test_close(self): - self.conn.servers = [mock.Mock(), mock.Mock()] - - self.conn.close() - - for serv in self.conn.servers: - serv.stop.assert_called_once_with() - serv.wait.assert_called_once_with() diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index 1002591447e..4ead873e2ea 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -21,6 +21,7 @@ from neutron_lib import constants from neutron_lib import context from neutron_lib.plugins import constants as plugin_constants from neutron_lib.plugins import directory +from neutron_lib.tests.unit import fake_notifier from oslo_config import cfg from oslo_db import exception as db_exc import oslo_messaging @@ -31,6 +32,7 @@ from neutron.api import extensions from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import l3_rpc +from neutron.common import rpc as n_rpc from neutron.db import agents_db from neutron.db import agentschedulers_db from neutron.db.models import agent as agent_model @@ -38,7 +40,6 @@ from neutron.extensions import l3agentscheduler from neutron.objects import agent as ag_obj from neutron.objects import l3agent as rb_obj from neutron.tests.common import helpers -from neutron.tests import fake_notifier from neutron.tests.unit.api import test_extensions from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin from neutron.tests.unit.extensions import test_agent @@ -239,8 +240,8 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin, # NOTE(ivasilevskaya) mocking this way allows some control over mocked # client like further method mocking with asserting calls self.client_mock = mock.MagicMock(name="mocked client") - mock.patch('neutron.common.rpc.get_client' - ).start().return_value = self.client_mock + mock.patch.object( + n_rpc, 'get_client').start().return_value = self.client_mock super(OvsAgentSchedulerTestCaseBase, self).setUp( 'ml2', service_plugins=service_plugins) mock.patch.object( diff --git a/neutron/tests/unit/extensions/test_data_plane_status.py b/neutron/tests/unit/extensions/test_data_plane_status.py index 767a2813346..593fc0454cf 100644 --- a/neutron/tests/unit/extensions/test_data_plane_status.py +++ b/neutron/tests/unit/extensions/test_data_plane_status.py @@ -17,12 +17,12 @@ from webob import exc as web_exc from neutron_lib.api.definitions import data_plane_status as dps_lib from neutron_lib.api.definitions import port as port_def from neutron_lib import constants +from neutron_lib.tests.unit import fake_notifier from neutron.db import _resource_extend as resource_extend from neutron.db import data_plane_status_db as dps_db from neutron.db import db_base_plugin_v2 from neutron.extensions import data_plane_status as dps_ext -from neutron.tests import fake_notifier from neutron.tests.unit.db import test_db_base_plugin_v2 diff --git a/neutron/tests/unit/extensions/test_l3.py b/neutron/tests/unit/extensions/test_l3.py index 6c54fa79787..b3a9fe41528 100644 --- a/neutron/tests/unit/extensions/test_l3.py +++ b/neutron/tests/unit/extensions/test_l3.py @@ -32,6 +32,7 @@ from neutron_lib import exceptions as n_exc from neutron_lib.exceptions import l3 as l3_exc from neutron_lib.plugins import constants as plugin_constants from neutron_lib.plugins import directory +from neutron_lib.tests.unit import fake_notifier from oslo_config import cfg from oslo_utils import importutils from oslo_utils import uuidutils @@ -58,7 +59,6 @@ from neutron.extensions import l3 from neutron.services.revisions import revision_plugin from neutron.tests import base from neutron.tests.common import helpers -from neutron.tests import fake_notifier from neutron.tests.unit.api import test_extensions from neutron.tests.unit.api.v2 import test_base from neutron.tests.unit.db import test_db_base_plugin_v2 diff --git a/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py b/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py index 6c7d49cadd8..dbfce8d07aa 100644 --- a/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py +++ b/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py @@ -32,6 +32,7 @@ from sqlalchemy import orm import testscenarios import testtools +from neutron.common import rpc as n_rpc from neutron.db import db_base_plugin_v2 as db_v2 from neutron.db import l3_db from neutron.db import l3_dvr_db @@ -1379,7 +1380,7 @@ class L3HATestCaseMixin(testlib_api.SqlTestCase, super(L3HATestCaseMixin, self).setUp() self.adminContext = n_context.get_admin_context() - mock.patch('neutron.common.rpc.get_client').start() + mock.patch.object(n_rpc, 'get_client').start() self.setup_coreplugin('ml2', load_plugins=False) cfg.CONF.set_override('service_plugins', diff --git a/neutron/tests/unit/services/logapi/rpc/test_server.py b/neutron/tests/unit/services/logapi/rpc/test_server.py index 1f7701df823..e88233223e9 100644 --- a/neutron/tests/unit/services/logapi/rpc/test_server.py +++ b/neutron/tests/unit/services/logapi/rpc/test_server.py @@ -14,6 +14,7 @@ # under the License. import mock +from neutron_lib import rpc from oslo_config import cfg import oslo_messaging @@ -84,7 +85,7 @@ class TestRegisterValidateRPCMethods(base.BaseTestCase): class LoggingApiSkeletonTestCase(base.BaseTestCase): - @mock.patch("neutron.common.rpc.get_server") + @mock.patch.object(rpc, "get_server") def test___init__(self, mocked_get_server): test_obj = server_rpc.LoggingApiSkeleton() _target = oslo_messaging.Target( diff --git a/neutron/tests/unit/services/metering/agents/test_metering_agent.py b/neutron/tests/unit/services/metering/agents/test_metering_agent.py index f657f9ffd28..9358034b8c5 100644 --- a/neutron/tests/unit/services/metering/agents/test_metering_agent.py +++ b/neutron/tests/unit/services/metering/agents/test_metering_agent.py @@ -13,6 +13,7 @@ # under the License. import mock +from neutron_lib.tests.unit import fake_notifier from oslo_config import cfg from oslo_utils import fixture as utils_fixture from oslo_utils import timeutils @@ -21,7 +22,6 @@ from oslo_utils import uuidutils from neutron.conf.services import metering_agent as metering_agent_config from neutron.services.metering.agents import metering_agent from neutron.tests import base -from neutron.tests import fake_notifier _uuid = uuidutils.generate_uuid diff --git a/neutron/tests/unit/services/trunk/rpc/test_agent.py b/neutron/tests/unit/services/trunk/rpc/test_agent.py index 8ed87c2ce65..52a07b6bb1d 100644 --- a/neutron/tests/unit/services/trunk/rpc/test_agent.py +++ b/neutron/tests/unit/services/trunk/rpc/test_agent.py @@ -17,6 +17,7 @@ import oslo_messaging from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc +from neutron.common import rpc from neutron.services.trunk.rpc import agent from neutron.tests import base @@ -25,24 +26,29 @@ class TrunkSkeletonTest(base.BaseTestCase): # TODO(fitoduarte): add more test to improve coverage of module @mock.patch("neutron.api.rpc.callbacks.resource_manager." "ConsumerResourceCallbacksManager.register") - @mock.patch("neutron.common.rpc.get_server") - def test___init__(self, mocked_get_server, mocked_register): - test_obj = agent.TrunkSkeleton() - self.assertEqual(2, mocked_register.call_count) - calls = [mock.call(test_obj.handle_trunks, resources.TRUNK), - mock.call(test_obj.handle_subports, resources.SUBPORT)] - mocked_register.assert_has_calls(calls, any_order=True) + def test___init__(self, mocked_register): + mock_conn = mock.MagicMock() + with mock.patch.object(rpc.Connection, 'create_consumer', + new_callable=mock_conn): + test_obj = agent.TrunkSkeleton() + self.assertEqual(2, mocked_register.call_count) + calls = [mock.call(test_obj.handle_trunks, resources.TRUNK), + mock.call(test_obj.handle_subports, resources.SUBPORT)] + mocked_register.assert_has_calls(calls, any_order=True) - # Test to see if the call to rpc.get_server has the correct - # target and the correct endpoints - topic = resources_rpc.resource_type_versioned_topic(resources.SUBPORT) - subport_target = oslo_messaging.Target( - topic=topic, server=cfg.CONF.host, fanout=True) - topic = resources_rpc.resource_type_versioned_topic(resources.TRUNK) - trunk_target = oslo_messaging.Target( - topic=topic, server=cfg.CONF.host, fanout=True) - calls = [mock.call(subport_target, mock.ANY), - mock.call(trunk_target, mock.ANY)] - mocked_get_server.assert_has_calls(calls, any_order=True) - self.assertIn("ResourcesPushRpcCallback", - str(mocked_get_server.call_args_list)) + # Test to see if the call to rpc.get_server has the correct + # target and the correct endpoints + topic = resources_rpc.resource_type_versioned_topic( + resources.SUBPORT) + subport_target = oslo_messaging.Target( + topic=topic, server=cfg.CONF.host, fanout=True) + topic = resources_rpc.resource_type_versioned_topic( + resources.TRUNK) + trunk_target = oslo_messaging.Target( + topic=topic, server=cfg.CONF.host, fanout=True) + calls = [mock.call(subport_target.topic, mock.ANY, fanout=True), + mock.call(trunk_target.topic, mock.ANY, fanout=True)] + self.assertIn(calls[0], mock_conn().mock_calls) + self.assertIn(calls[1], mock_conn().mock_calls) + self.assertIn("ResourcesPushRpcCallback", + str(mock_conn().call_args_list)) diff --git a/neutron/tests/unit/services/trunk/rpc/test_server.py b/neutron/tests/unit/services/trunk/rpc/test_server.py index e94a04ecf55..858c48517fc 100644 --- a/neutron/tests/unit/services/trunk/rpc/test_server.py +++ b/neutron/tests/unit/services/trunk/rpc/test_server.py @@ -14,19 +14,17 @@ import mock from neutron_lib.api.definitions import portbindings from neutron_lib.plugins import directory -from oslo_config import cfg -import oslo_messaging from neutron.api.rpc.callbacks import events from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc +from neutron.common import rpc as n_rpc from neutron.objects import trunk as trunk_obj from neutron.plugins.ml2 import plugin as ml2_plugin from neutron.services.trunk import constants from neutron.services.trunk import drivers from neutron.services.trunk import exceptions as trunk_exc from neutron.services.trunk import plugin as trunk_plugin -from neutron.services.trunk.rpc import constants as rpc_consts from neutron.services.trunk.rpc import server from neutron.tests import base from neutron.tests.unit.plugins.ml2 import test_plugin @@ -58,16 +56,16 @@ class TrunkSkeletonTest(test_plugin.Ml2PluginV2TestCase): @mock.patch("neutron.api.rpc.callbacks.resource_manager." "ResourceCallbacksManager.register") - @mock.patch("neutron.common.rpc.get_server") - def test___init__(self, mocked_get_server, mocked_registered): - test_obj = server.TrunkSkeleton() - self.mock_registry_provide.assert_called_with( - server.trunk_by_port_provider, - resources.TRUNK) - trunk_target = oslo_messaging.Target(topic=rpc_consts.TRUNK_BASE_TOPIC, - server=cfg.CONF.host, - fanout=False) - mocked_get_server.assert_called_with(trunk_target, [test_obj]) + def test___init__(self, mocked_get_server): + mock_conn = mock.MagicMock() + with mock.patch.object(n_rpc.Connection, 'create_consumer', + new_callable=mock_conn): + test_obj = server.TrunkSkeleton() + self.mock_registry_provide.assert_called_with( + server.trunk_by_port_provider, + resources.TRUNK) + self.assertItemsEqual(('trunk', [test_obj],), + mock_conn.mock_calls[1][1]) def test_update_subport_bindings(self): with self.port() as _parent_port: