# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ test_exceptions ---------------------------------- Tests for `neutron_lib.exception` module. """ import functools from neutron_lib._i18n import _ import neutron_lib.exceptions as ne from neutron_lib.tests import _base as base def _raise(exc_class, **kwargs): raise exc_class(**kwargs) class TestExceptionsBase(base.BaseTestCase): def _check_nexc(self, exc_class, expected_msg, **kwargs): raise_exc_class = functools.partial(_raise, exc_class) e = self.assertRaises(exc_class, raise_exc_class, **kwargs) self.assertEqual(expected_msg, str(e)) class TestExceptions(TestExceptionsBase): def test_base(self): self._check_nexc( ne.NeutronException, _('An unknown exception occurred.')) def test_not_found(self): self._check_nexc( ne.NotFound, _('An unknown exception occurred.')) def test_conflict(self): self._check_nexc( ne.Conflict, _('An unknown exception occurred.')) def test_bad_request(self): self._check_nexc( ne.BadRequest, _('Bad A request: B.'), resource='A', msg='B') def test_bad_request_misused(self): try: self._check_nexc( ne.BadRequest, _('Bad A request: B.'), resource='A', msg='B') except AttributeError: pass def test_not_authorized(self): self._check_nexc( ne.NotAuthorized, _("Not authorized.")) def test_service_unavailable(self): self._check_nexc( ne.ServiceUnavailable, _("The service is unavailable.")) def test_admin_required(self): self._check_nexc( ne.AdminRequired, _("User does not have admin privileges: hoser."), reason="hoser") def test_object_not_found(self): self._check_nexc( ne.ObjectNotFound, _("Object fallout tato not found."), id="fallout tato") def test_network_not_found(self): self._check_nexc( ne.NetworkNotFound, _("Network spam could not be found."), net_id="spam") def test_subnet_not_found(self): self._check_nexc( ne.SubnetNotFound, _("Subnet root could not be found."), subnet_id="root") def test_port_not_found(self): self._check_nexc( ne.PortNotFound, _("Port harbor could not be found."), port_id="harbor") def test_port_not_found_on_network(self): self._check_nexc( ne.PortNotFoundOnNetwork, _("Port serial could not be found on network USB."), port_id="serial", net_id="USB") def test_device_not_found_error(self): self._check_nexc( ne.DeviceNotFoundError, _("Device 'device' does not exist."), device_name="device") def test_in_use(self): self._check_nexc( ne.InUse, _("The resource is in use.")) def test_network_in_use(self): self._check_nexc( ne.NetworkInUse, _("Unable to complete operation on network foo. " "There are one or more ports still in use on the network."), net_id="foo") def test_network_in_use_custom_reason(self): self._check_nexc( ne.NetworkInUse, _("Unable to complete operation on network foo. not full."), net_id="foo", reason="not full") def test_subnet_in_use(self): self._check_nexc( ne.SubnetInUse, _("Unable to complete operation on subnet garbage: not full."), subnet_id="garbage", reason="not full") def test_subnet_in_use_no_reason(self): self._check_nexc( ne.SubnetInUse, _("Unable to complete operation on subnet garbage: " "One or more ports have an IP allocation from this subnet."), subnet_id="garbage") def test_subnet_pool_in_use(self): self._check_nexc( ne.SubnetPoolInUse, _("Unable to complete operation on subnet pool ymca. because."), subnet_pool_id="ymca", reason="because") def test_subnet_pool_in_use_no_reason(self): self._check_nexc( ne.SubnetPoolInUse, _("Unable to complete operation on subnet pool ymca. " "Two or more concurrent subnets allocated."), subnet_pool_id="ymca") def test_port_in_use(self): self._check_nexc( ne.PortInUse, _("Unable to complete operation on port a for network c. " "Port already has an attached device b."), port_id='a', device_id='b', net_id='c') def test_service_port_in_use(self): self._check_nexc( ne.ServicePortInUse, _("Port harbor cannot be deleted directly via the " "port API: docking."), port_id='harbor', reason='docking') def test_port_bound(self): self._check_nexc( ne.PortBound, _("Unable to complete operation on port bigmac, " "port is already bound, port type: ketchup, " "old_mac onions, new_mac salt."), port_id='bigmac', vif_type='ketchup', old_mac='onions', new_mac='salt') def test_mac_address_in_use(self): self._check_nexc( ne.MacAddressInUse, _("Unable to complete operation for network nutters. " "The mac address grill is in use."), net_id='nutters', mac='grill') def test_invalid_ip_for_network(self): self._check_nexc( ne.InvalidIpForNetwork, _("IP address shazam! is not a valid IP " "for any of the subnets on the specified network."), ip_address='shazam!') def test_invalid_ip_for_subnet(self): self._check_nexc( ne.InvalidIpForSubnet, _("IP address 300.400.500.600 is not a valid IP " "for the specified subnet."), ip_address='300.400.500.600') def test_ip_address_in_use(self): self._check_nexc( ne.IpAddressInUse, _("Unable to complete operation for network boredom. " "The IP address crazytown is in use."), net_id='boredom', ip_address='crazytown') def test_vlan_id_in_use(self): self._check_nexc( ne.VlanIdInUse, _("Unable to create the network. The VLAN virtual on physical " "network phys is in use."), vlan_id='virtual', physical_network='phys') def test_tunnel_id_in_use(self): self._check_nexc( ne.TunnelIdInUse, _("Unable to create the network. The tunnel ID sewer is in use."), tunnel_id='sewer') def test_resource_exhausted(self): self._check_nexc( ne.ResourceExhausted, _("The service is unavailable.")) def test_no_network_available(self): self._check_nexc( ne.NoNetworkAvailable, _("Unable to create the network. " "No tenant network is available for allocation.")) def test_subnet_mismatch_for_port(self): self._check_nexc( ne.SubnetMismatchForPort, _("Subnet on port porter does not match " "the requested subnet submit."), port_id='porter', subnet_id='submit') def test_invalid(self): try: raise ne.Invalid("hello world") except ne.Invalid as e: self.assertEqual(e.msg, "hello world") def test_invalid_input(self): self._check_nexc( ne.InvalidInput, _("Invalid input for operation: warp core breach."), error_message='warp core breach') def test_ip_address_generation_failure(self): self._check_nexc( ne.IpAddressGenerationFailure, _("No more IP addresses available on network nuke."), net_id='nuke') def test_preexisting_device_failure(self): self._check_nexc( ne.PreexistingDeviceFailure, _("Creation failed. hal9000 already exists."), dev_name='hal9000') def test_over_quota(self): self._check_nexc( ne.OverQuota, _("Quota exceeded for resources: tube socks."), overs='tube socks') def test_invalid_content_type(self): self._check_nexc( ne.InvalidContentType, _("Invalid content type porn."), content_type='porn') def test_external_ip_address_exhausted(self): self._check_nexc( ne.ExternalIpAddressExhausted, _("Unable to find any IP address on external network darpanet."), net_id='darpanet') def test_invalid_configuration_option(self): self._check_nexc( ne.InvalidConfigurationOption, _("An invalid value was provided for which muppet: big bird."), opt_name='which muppet', opt_value='big bird') def test_network_tunnel_range_error(self): self._check_nexc( ne.NetworkTunnelRangeError, _("Invalid network tunnel range: 'rats' - present."), tunnel_range='rats', error='present') def test_network_tunnel_range_error_tuple(self): self._check_nexc( ne.NetworkTunnelRangeError, _("Invalid network tunnel range: '3:4' - present."), tunnel_range=(3, 4), error='present') def test_policy_init_error(self): self._check_nexc( ne.PolicyInitError, _("Failed to initialize policy policy because reason."), policy='policy', reason='reason') def test_policy_check_error(self): self._check_nexc( ne.PolicyCheckError, _("Failed to check policy policy because reason."), policy='policy', reason='reason')