Update neutron files for new over-indentation hacking rule (E117)

Change-Id: I594e2d1238f6ffa3c1039624e3b3ed6569485837
This commit is contained in:
Doug Wiegley 2019-01-29 11:30:54 -07:00 committed by Brian Haley
parent e55918ef9e
commit 8914f8247f
26 changed files with 314 additions and 314 deletions

View File

@ -947,9 +947,9 @@ class RouterInfo(object):
except (n_exc.FloatingIpSetupException,
n_exc.IpTablesApplyException):
# All floating IPs must be put in error state
LOG.exception("Failed to process floating IPs.")
fip_statuses = self.put_fips_in_error_state()
# All floating IPs must be put in error state
LOG.exception("Failed to process floating IPs.")
fip_statuses = self.put_fips_in_error_state()
finally:
self.update_fip_statuses(fip_statuses)

View File

@ -58,9 +58,10 @@ def upgrade():
# as a result of Ifd3e007aaf2a2ed8123275aa3a9f540838e3c003 being
# back-ported
for router_port in session.query(router_ports).filter(
router_ports.c.port_type == lib_const.DEVICE_OWNER_ROUTER_HA_INTF):
router_port_tuples.discard((router_port.router_id,
router_port.port_id))
router_ports.c.port_type ==
lib_const.DEVICE_OWNER_ROUTER_HA_INTF):
router_port_tuples.discard((router_port.router_id,
router_port.port_id))
new_records = [dict(router_id=router_id, port_id=port_id,
port_type=lib_const.DEVICE_OWNER_ROUTER_HA_INTF)
for router_id, port_id in router_port_tuples]

View File

@ -31,20 +31,20 @@ def check_bandwidth_rule_conflict(policy, rule_data):
continue
elif rule.rule_type == qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH:
if "max_kbps" in rule_data and (
int(rule.min_kbps) > int(rule_data["max_kbps"])):
raise n_exc.QoSRuleParameterConflict(
rule_value=rule_data["max_kbps"],
policy_id=policy["id"],
existing_rule=rule.rule_type,
existing_value=rule.min_kbps)
int(rule.min_kbps) > int(rule_data["max_kbps"])):
raise n_exc.QoSRuleParameterConflict(
rule_value=rule_data["max_kbps"],
policy_id=policy["id"],
existing_rule=rule.rule_type,
existing_value=rule.min_kbps)
elif rule.rule_type == qos_consts.RULE_TYPE_BANDWIDTH_LIMIT:
if "min_kbps" in rule_data and (
int(rule.max_kbps) < int(rule_data["min_kbps"])):
raise n_exc.QoSRuleParameterConflict(
rule_value=rule_data["min_kbps"],
policy_id=policy["id"],
existing_rule=rule.rule_type,
existing_value=rule.max_kbps)
int(rule.max_kbps) < int(rule_data["min_kbps"])):
raise n_exc.QoSRuleParameterConflict(
rule_value=rule_data["min_kbps"],
policy_id=policy["id"],
existing_rule=rule.rule_type,
existing_value=rule.max_kbps)
def check_rules_conflict(policy, rule_obj):

View File

@ -333,15 +333,15 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
return
port = port_context.current
if (port['device_owner'] != n_const.DEVICE_OWNER_DVR_INTERFACE and
status == n_const.PORT_STATUS_ACTIVE and
port[portbindings.HOST_ID] != host and
not l3_hamode_db.is_ha_router_port(rpc_context,
port['device_owner'],
port['device_id'])):
# don't setup ACTIVE forwarding entries unless bound to this
# host or if it's an HA or DVR port (which is special-cased in
# the mech driver)
return
status == n_const.PORT_STATUS_ACTIVE and
port[portbindings.HOST_ID] != host and
not l3_hamode_db.is_ha_router_port(rpc_context,
port['device_owner'],
port['device_id'])):
# don't setup ACTIVE forwarding entries unless bound to this
# host or if it's an HA or DVR port (which is special-cased in
# the mech driver)
return
port_context.current['status'] = status
port_context.current[portbindings.HOST_ID] = host
if status == n_const.PORT_STATUS_ACTIVE:

View File

@ -165,7 +165,7 @@ class _TestModelsMigrations(test_migrations.ModelsMigrationsSync):
def include_object(self, object_, name, type_, reflected, compare_to):
if type_ == 'table' and (name == 'alembic_version' or
name in external.TABLES):
return False
return False
return super(_TestModelsMigrations, self).include_object(
object_, name, type_, reflected, compare_to)

View File

@ -583,9 +583,9 @@ class TestDvrRouterOperations(base.BaseTestCase):
state = True
with mock.patch.object(l3_agent.ip_lib, 'IPDevice') as rtrdev,\
mock.patch.object(ri, '_cache_arp_entry') as arp_cache:
rtrdev.return_value.exists.return_value = False
state = ri._update_arp_entry(
mock.ANY, mock.ANY, subnet_id, 'add')
rtrdev.return_value.exists.return_value = False
state = ri._update_arp_entry(
mock.ANY, mock.ANY, subnet_id, 'add')
self.assertFalse(state)
self.assertTrue(arp_cache.called)
arp_cache.assert_called_once_with(mock.ANY, mock.ANY,

View File

@ -239,7 +239,7 @@ class TestFindForkTopParent(base.BaseTestCase):
side_effect=_find_parent_pid), \
mock.patch.object(utils, 'pid_invoked_with_cmdline',
**pid_invoked_with_cmdline):
actual = utils.find_fork_top_parent(_marker)
actual = utils.find_fork_top_parent(_marker)
self.assertEqual(expected, actual)
def test_returns_own_pid_no_parent(self):
@ -343,10 +343,10 @@ class TestGetRoothelperChildPid(base.BaseTestCase):
pid_invoked_with_cmdline['return_value'] = False
with mock.patch.object(utils, 'find_child_pids',
side_effect=_find_child_pids), \
mock.patch.object(utils, 'pid_invoked_with_cmdline',
**pid_invoked_with_cmdline):
actual = utils.get_root_helper_child_pid(
mock_pid, mock.ANY, run_as_root)
mock.patch.object(utils, 'pid_invoked_with_cmdline',
**pid_invoked_with_cmdline):
actual = utils.get_root_helper_child_pid(
mock_pid, mock.ANY, run_as_root)
if expected is _marker:
expected = str(mock_pid)
self.assertEqual(expected, actual)

View File

@ -158,21 +158,21 @@ class ResourceExtensionTest(base.BaseTestCase):
return {'collection': 'value'}
class DummySvcPlugin(wsgi.Controller):
@classmethod
def get_plugin_type(cls):
return dummy_plugin.DUMMY_SERVICE_TYPE
@classmethod
def get_plugin_type(cls):
return dummy_plugin.DUMMY_SERVICE_TYPE
def index(self, request, **kwargs):
return "resource index"
def index(self, request, **kwargs):
return "resource index"
def custom_member_action(self, request, **kwargs):
return {'member_action': 'value'}
def custom_member_action(self, request, **kwargs):
return {'member_action': 'value'}
def collection_action(self, request, **kwargs):
return {'collection': 'value'}
def collection_action(self, request, **kwargs):
return {'collection': 'value'}
def show(self, request, id):
return {'data': {'id': id}}
def show(self, request, id):
return {'data': {'id': id}}
def test_exceptions_notimplemented(self):
controller = self.ResourceExtensionController()

View File

@ -54,7 +54,7 @@ class TestQuotaDbApi(testlib_api.SqlTestCaseLight):
if expected_resource:
self.assertEqual(expected_resource, usage_info.resource)
if expected_dirty is not None:
self.assertEqual(expected_dirty, usage_info.dirty)
self.assertEqual(expected_dirty, usage_info.dirty)
if expected_used is not None:
self.assertEqual(expected_used, usage_info.used)

View File

@ -1531,7 +1531,7 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
mock.patch.object(plugin,
'get_dhcp_agents_hosting_networks',
return_value=[]):
return dhcp_notifier_schedule.call_count > 1
return dhcp_notifier_schedule.call_count > 1
def test_reserved_dhcp_port_creation(self):
device_id = constants.DEVICE_ID_RESERVED_DHCP_PORT

View File

@ -1903,46 +1903,45 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_requested_subnet_id_v4_and_v6(self):
with self.subnet() as subnet:
# Get a IPv4 and IPv6 address
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(
self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='2607:f0d0:1002:51::/124',
ip_version=constants.IP_VERSION_6,
gateway_ip=constants.ATTR_NOT_SPECIFIED)
subnet2 = self.deserialize(self.fmt, res)
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port3 = self.deserialize(self.fmt, res)
ips = port3['port']['fixed_ips']
cidr_v4 = subnet['subnet']['cidr']
cidr_v6 = subnet2['subnet']['cidr']
self.assertEqual(2, len(ips))
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
# Get a IPv4 and IPv6 address
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(
self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='2607:f0d0:1002:51::/124',
ip_version=constants.IP_VERSION_6,
gateway_ip=constants.ATTR_NOT_SPECIFIED)
subnet2 = self.deserialize(self.fmt, res)
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port3 = self.deserialize(self.fmt, res)
ips = port3['port']['fixed_ips']
cidr_v4 = subnet['subnet']['cidr']
cidr_v6 = subnet2['subnet']['cidr']
self.assertEqual(2, len(ips))
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
res = self._create_port(self.fmt, net_id=net_id)
port4 = self.deserialize(self.fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port4['port']['fixed_ips']
self.assertEqual(2, len(ips))
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
res = self._create_port(self.fmt, net_id=net_id)
port4 = self.deserialize(self.fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port4['port']['fixed_ips']
self.assertEqual(2, len(ips))
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
def _test_requested_port_subnet_ids(self, ips, expected_subnet_ids):
self.assertEqual(set(x['subnet_id'] for x in ips),
@ -2352,49 +2351,47 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_range_allocation(self):
with self.subnet(gateway_ip='10.0.0.3',
cidr='10.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(5, len(ips))
alloc = ['10.0.0.1', '10.0.0.2', '10.0.0.4', '10.0.0.5',
'10.0.0.6']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(0, len(alloc))
self._delete('ports', port['port']['id'])
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(5, len(ips))
alloc = ['10.0.0.1', '10.0.0.2', '10.0.0.4', '10.0.0.5',
'10.0.0.6']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(0, len(alloc))
self._delete('ports', port['port']['id'])
with self.subnet(gateway_ip='11.0.0.6',
cidr='11.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(5, len(ips))
alloc = ['11.0.0.1', '11.0.0.2', '11.0.0.3', '11.0.0.4',
'11.0.0.5']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(0, len(alloc))
self._delete('ports', port['port']['id'])
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(5, len(ips))
alloc = ['11.0.0.1', '11.0.0.2', '11.0.0.3', '11.0.0.4',
'11.0.0.5']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(0, len(alloc))
self._delete('ports', port['port']['id'])
def test_requested_invalid_fixed_ips(self):
with self.subnet() as subnet:

View File

@ -911,15 +911,16 @@ class TestRollback(test_db_base.NeutronDbPluginV2TestCase):
Backend = ipam_pluggable_backend.IpamPluggableBackend
with mock.patch.object(Backend, '_store_ip_allocation', wraps=store),\
mock.patch.object(Backend, '_safe_rollback', wraps=rollback),\
mock.patch.object(Backend, '_allocate_ips_for_port', wraps=alloc):
# Create port with two addresses. The wrapper lets one succeed
# then simulates race for the second to trigger IPAM rollback.
response = self._create_port(
self.fmt,
net_id=net['network']['id'],
fixed_ips=[{'subnet_id': subnet1['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}])
mock.patch.object(Backend, '_safe_rollback', wraps=rollback),\
mock.patch.object(Backend, '_allocate_ips_for_port',
wraps=alloc):
# Create port with two addresses. The wrapper lets one succeed
# then simulates race for the second to trigger IPAM rollback.
response = self._create_port(
self.fmt,
net_id=net['network']['id'],
fixed_ips=[{'subnet_id': subnet1['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}])
# When all goes well, retry kicks in and the operation is successful.
self.assertEqual(webob.exc.HTTPCreated.code, response.status_int)

View File

@ -633,8 +633,8 @@ class L3HATestCase(L3HATestFramework):
with mock.patch.object(self.plugin, '_create_ha_subnet',
side_effect=ValueError):
self.assertRaises(ValueError, self.plugin._create_ha_network,
self.admin_ctx, _uuid())
self.assertRaises(ValueError, self.plugin._create_ha_network,
self.admin_ctx, _uuid())
networks_after = self.core_plugin.get_networks(self.admin_ctx)
self.assertEqual(networks_before, networks_after)

View File

@ -1167,21 +1167,21 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
with self.router() as r, self.network() as n:
with self.subnet(network=n, cidr='10.0.0.0/24') as s1, (
self.subnet(network=n, cidr='10.0.1.0/24')) as s2:
body = self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
pid1 = body['port_id']
body = self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
pid2 = body['port_id']
self.assertNotEqual(pid1, pid2)
self._router_interface_action('remove', r['router']['id'],
s1['subnet']['id'], None)
self._router_interface_action('remove', r['router']['id'],
s2['subnet']['id'], None)
body = self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
pid1 = body['port_id']
body = self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
pid2 = body['port_id']
self.assertNotEqual(pid1, pid2)
self._router_interface_action('remove', r['router']['id'],
s1['subnet']['id'], None)
self._router_interface_action('remove', r['router']['id'],
s2['subnet']['id'], None)
def test_router_add_interface_multiple_ipv6_subnets_same_net(self):
"""Test router-interface-add for multiple ipv6 subnets on a network.
@ -1195,27 +1195,27 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
) as s1, self.subnet(network=n, cidr='fd01::1/64',
ip_version=lib_constants.IP_VERSION_6
) as s2:
body = self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
pid1 = body['port_id']
body = self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
pid2 = body['port_id']
self.assertEqual(pid1, pid2)
port = self._show('ports', pid1)
self.assertEqual(2, len(port['port']['fixed_ips']))
port_subnet_ids = [fip['subnet_id'] for fip in
port['port']['fixed_ips']]
self.assertIn(s1['subnet']['id'], port_subnet_ids)
self.assertIn(s2['subnet']['id'], port_subnet_ids)
self._router_interface_action('remove', r['router']['id'],
s1['subnet']['id'], None)
self._router_interface_action('remove', r['router']['id'],
s2['subnet']['id'], None)
body = self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
pid1 = body['port_id']
body = self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
pid2 = body['port_id']
self.assertEqual(pid1, pid2)
port = self._show('ports', pid1)
self.assertEqual(2, len(port['port']['fixed_ips']))
port_subnet_ids = [fip['subnet_id'] for fip in
port['port']['fixed_ips']]
self.assertIn(s1['subnet']['id'], port_subnet_ids)
self.assertIn(s2['subnet']['id'], port_subnet_ids)
self._router_interface_action('remove', r['router']['id'],
s1['subnet']['id'], None)
self._router_interface_action('remove', r['router']['id'],
s2['subnet']['id'], None)
def test_router_add_interface_multiple_ipv6_subnets_different_net(self):
"""Test router-interface-add for ipv6 subnets on different networks.
@ -1229,21 +1229,21 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
) as s1, self.subnet(network=n2, cidr='fd01::1/64',
ip_version=lib_constants.IP_VERSION_6
) as s2:
body = self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
pid1 = body['port_id']
body = self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
pid2 = body['port_id']
self.assertNotEqual(pid1, pid2)
self._router_interface_action('remove', r['router']['id'],
s1['subnet']['id'], None)
self._router_interface_action('remove', r['router']['id'],
s2['subnet']['id'], None)
body = self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
pid1 = body['port_id']
body = self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
pid2 = body['port_id']
self.assertNotEqual(pid1, pid2)
self._router_interface_action('remove', r['router']['id'],
s1['subnet']['id'], None)
self._router_interface_action('remove', r['router']['id'],
s2['subnet']['id'], None)
def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self):
"""Test router-interface-add for in-valid ipv6 subnets.
@ -3781,43 +3781,43 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
def test_l3_agent_routers_query_ignore_interfaces_with_moreThanOneIp(self):
with self.router() as r, self.subnet(
cidr='9.0.1.0/24') as subnet, self.port(
subnet=subnet,
fixed_ips=[{'ip_address': '9.0.1.3'}]) as p1, self.port(
cidr='9.0.1.0/24') as subnet, self.port(
subnet=subnet,
fixed_ips=[{'ip_address': '9.0.1.100'},
{'ip_address': '9.0.1.101'}]) as p2:
# Cannot have multiple IPv4 subnets on router port,
# see neutron.db.l3_db line L752-L754.
self._router_interface_action(
'add', r['router']['id'],
None, p2['port']['id'],
expected_code=exc.HTTPBadRequest.code)
fixed_ips=[{'ip_address': '9.0.1.3'}]) as p1, self.port(
subnet=subnet,
fixed_ips=[{'ip_address': '9.0.1.100'},
{'ip_address': '9.0.1.101'}]) as p2:
# Cannot have multiple IPv4 subnets on router port,
# see neutron.db.l3_db line L752-L754.
self._router_interface_action(
'add', r['router']['id'],
None, p2['port']['id'],
expected_code=exc.HTTPBadRequest.code)
self._router_interface_action('add',
r['router']['id'],
None,
p1['port']['id'])
port = {'port': {'fixed_ips':
[{'ip_address': '9.0.1.4',
'subnet_id': subnet['subnet']['id']},
{'ip_address': '9.0.1.5',
'subnet_id': subnet['subnet']['id']}]}}
ctx = context.get_admin_context()
self.assertRaises(
n_exc.BadRequest,
self.core_plugin.update_port,
ctx, p1['port']['id'], port)
self._router_interface_action('add',
r['router']['id'],
None,
p1['port']['id'])
port = {'port': {'fixed_ips':
[{'ip_address': '9.0.1.4',
'subnet_id': subnet['subnet']['id']},
{'ip_address': '9.0.1.5',
'subnet_id': subnet['subnet']['id']}]}}
ctx = context.get_admin_context()
self.assertRaises(
n_exc.BadRequest,
self.core_plugin.update_port,
ctx, p1['port']['id'], port)
routers = self.plugin.get_sync_data(ctx, None)
self.assertEqual(1, len(routers))
interfaces = routers[0].get(lib_constants.INTERFACE_KEY,
[])
self.assertEqual(1, len(interfaces))
self._router_interface_action('remove',
r['router']['id'],
None,
p1['port']['id'])
routers = self.plugin.get_sync_data(ctx, None)
self.assertEqual(1, len(routers))
interfaces = routers[0].get(lib_constants.INTERFACE_KEY,
[])
self.assertEqual(1, len(interfaces))
self._router_interface_action('remove',
r['router']['id'],
None,
p1['port']['id'])
def test_l3_agent_routers_query_gateway(self):
with self.router() as r:

View File

@ -405,10 +405,10 @@ class ExtGwModeIntTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
ext_net_id = s['subnet']['network_id']
self._set_net_external(ext_net_id)
with mock.patch.object(
l3_db.L3_NAT_dbonly_mixin, '_validate_gw_info',
side_effect=[db_exc.RetryRequest(None), ext_net_id]):
self._set_router_external_gateway(r['router']['id'],
ext_net_id)
l3_db.L3_NAT_dbonly_mixin, '_validate_gw_info',
side_effect=[db_exc.RetryRequest(None), ext_net_id]):
self._set_router_external_gateway(r['router']['id'],
ext_net_id)
res = self._show('routers', r['router']['id'])['router']
self.assertEqual(ext_net_id,
res['external_gateway_info']['network_id'])

View File

@ -296,14 +296,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self._assert_sg_rule_has_kvs(v6_rule, expected)
def test_create_security_group_bulk(self):
rule1 = self._build_security_group("sg_1", "sec_grp_1")
rule2 = self._build_security_group("sg_2", "sec_grp_2")
rules = {'security_groups': [rule1['security_group'],
rule2['security_group']]}
res = self._create_security_group_response(self.fmt, rules)
ret = self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
self.assertEqual(2, len(ret['security_groups']))
rule1 = self._build_security_group("sg_1", "sec_grp_1")
rule2 = self._build_security_group("sg_2", "sec_grp_2")
rules = {'security_groups': [rule1['security_group'],
rule2['security_group']]}
res = self._create_security_group_response(self.fmt, rules)
ret = self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
self.assertEqual(2, len(ret['security_groups']))
def test_skip_duplicate_default_sg_error(self):
num_called = [0]

View File

@ -2418,8 +2418,8 @@ class TestSegmentHostRoutes(TestSegmentML2):
segment_id=segment['id'],
gateway_ip=gateway_ips[1],
cidr=cidrs[1]) as subnet1:
subnet0 = subnet0['subnet']
subnet1 = subnet1['subnet']
subnet0 = subnet0['subnet']
subnet1 = subnet1['subnet']
req = self.new_show_request('subnets', subnet0['id'])
res = req.get_response(self.api)

View File

@ -970,9 +970,9 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
@mock.patch.object(obj_db_api, 'update_objects')
@mock.patch.object(obj_db_api, 'update_object', return_value={})
def test_update_objects_without_validate_filters(self, *mocks):
self._test_class.update_objects(
self.context, {'unknown_filter': 'new_value'},
validate_filters=False, unknown_filter='value')
self._test_class.update_objects(
self.context, {'unknown_filter': 'new_value'},
validate_filters=False, unknown_filter='value')
def _prep_string_field(self):
self.filter_string_field = None
@ -1066,9 +1066,9 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
self.assertEqual(expected, self._test_class.count(self.context))
def test_count_invalid_fields(self):
self.assertRaises(n_exc.InvalidInput,
self._test_class.count, self.context,
fake_field='xxx')
self.assertRaises(n_exc.InvalidInput,
self._test_class.count, self.context,
fake_field='xxx')
def _check_equal(self, expected, observed):
self.assertItemsEqual(get_obj_persistent_fields(expected),

View File

@ -573,9 +573,9 @@ class TestLinuxBridgeManager(base.BaseTestCase):
def test_ensure_physical_in_bridge_with_existed_brq(self):
with mock.patch.object(linuxbridge_neutron_agent.LOG, 'error') as log:
self.lbm.ensure_physical_in_bridge("123", constants.TYPE_FLAT,
"physnet9", "1", 1450)
self.assertEqual(1, log.call_count)
self.lbm.ensure_physical_in_bridge("123", constants.TYPE_FLAT,
"physnet9", "1", 1450)
self.assertEqual(1, log.call_count)
@mock.patch.object(ip_lib, "device_exists", return_value=False)
def test_add_tap_interface_with_interface_disappearing(self, exists):
@ -1019,9 +1019,9 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
with mock.patch.object(linuxbridge_neutron_agent.LOG, 'info') as log,\
mock.patch.object(self.lb_rpc.agent.mgr,
"delete_bridge") as del_fn:
self.lb_rpc.network_delete("anycontext", network_id="123")
self.assertEqual(0, del_fn.call_count)
self.assertEqual(1, log.call_count)
self.lb_rpc.network_delete("anycontext", network_id="123")
self.assertEqual(0, del_fn.call_count)
self.assertEqual(1, log.call_count)
def test_binding_deactivate(self):
with mock.patch.object(self.lb_rpc.agent.mgr,
@ -1059,9 +1059,9 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
self.assertIn("tap456", self.lb_rpc.updated_devices)
def test_binding_activate_not_for_host(self):
self.lb_rpc.binding_activate(mock.ANY, host="other-host",
port_id="456")
self.assertFalse(self.lb_rpc.updated_devices)
self.lb_rpc.binding_activate(mock.ANY, host="other-host",
port_id="456")
self.assertFalse(self.lb_rpc.updated_devices)
def _test_fdb_add(self, proxy_enabled=False):
fdb_entries = {'net_id':

View File

@ -50,11 +50,12 @@ class TestMacvtapRPCCallbacks(base.BaseTestCase):
def test_network_delete_vlan(self):
self.rpc.network_map = {NETWORK_ID: NETWORK_SEGMENT_VLAN}
with mock.patch.object(ip_lib.IpLinkCommand, 'delete') as mock_del,\
mock.patch.object(macvtap_common, 'get_vlan_device_name',
return_value='vlan1'),\
mock.patch.object(ip_lib.IPDevice, 'exists', return_value=True):
self.rpc.network_delete("anycontext", network_id=NETWORK_ID)
self.assertTrue(mock_del.called)
mock.patch.object(macvtap_common, 'get_vlan_device_name',
return_value='vlan1'),\
mock.patch.object(ip_lib.IPDevice, 'exists',
return_value=True):
self.rpc.network_delete("anycontext", network_id=NETWORK_ID)
self.assertTrue(mock_del.called)
def test_network_delete_flat(self):
self.rpc.network_map = {NETWORK_ID: NETWORK_SEGMENT_FLAT}

View File

@ -28,20 +28,20 @@ from neutron.tests.unit.plugins.ml2 import _test_mech_agent as base
class TestFakePortContext(base.FakePortContext):
def __init__(self, agent_type, agents, segments,
vnic_type=portbindings.VNIC_NORMAL,
profile=None):
super(TestFakePortContext, self).__init__(agent_type,
agents,
segments,
vnic_type=vnic_type,
profile=profile)
def __init__(self, agent_type, agents, segments,
vnic_type=portbindings.VNIC_NORMAL,
profile=None):
super(TestFakePortContext, self).__init__(agent_type,
agents,
segments,
vnic_type=vnic_type,
profile=profile)
def set_binding(self, segment_id, vif_type, vif_details, state):
self._bound_segment_id = segment_id
self._bound_vif_type = vif_type
self._bound_vif_details = vif_details
self._bound_state = state
def set_binding(self, segment_id, vif_type, vif_details, state):
self._bound_segment_id = segment_id
self._bound_vif_type = vif_type
self._bound_vif_details = vif_details
self._bound_state = state
class SriovNicSwitchMechanismBaseTestCase(base.AgentMechanismBaseTestCase):

View File

@ -2044,15 +2044,15 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin.'
'_attempt_binding',
side_effect=plugin._attempt_binding) as at_mock:
plugin._bind_port_if_needed(port_context)
if bound_vif_type == portbindings.VIF_TYPE_BINDING_FAILED:
# An unsuccessful binding attempt should be retried
# MAX_BIND_TRIES amount of times.
self.assertEqual(ml2_plugin.MAX_BIND_TRIES,
at_mock.call_count)
else:
# Successful binding should only be attempted once.
self.assertEqual(1, at_mock.call_count)
plugin._bind_port_if_needed(port_context)
if bound_vif_type == portbindings.VIF_TYPE_BINDING_FAILED:
# An unsuccessful binding attempt should be retried
# MAX_BIND_TRIES amount of times.
self.assertEqual(ml2_plugin.MAX_BIND_TRIES,
at_mock.call_count)
else:
# Successful binding should only be attempted once.
self.assertEqual(1, at_mock.call_count)
def test_port_binding_profile_not_changed(self):
profile = {'e': 5}
@ -2858,36 +2858,36 @@ class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase):
'update_port_precommit') as port_pre,\
mock.patch.object(
ml2_db, 'get_distributed_port_bindings') as dist_bindings:
dist_bindings.return_value = [binding]
port_pre.return_value = True
with self.network() as network:
with self.subnet(network=network) as subnet:
subnet_id = subnet['subnet']['id']
data = {'port': {
dist_bindings.return_value = [binding]
port_pre.return_value = True
with self.network() as network:
with self.subnet(network=network) as subnet:
subnet_id = subnet['subnet']['id']
data = {'port': {
'network_id': network['network']['id'],
'tenant_id':
network['network']['tenant_id'],
network['network']['tenant_id'],
'name': 'port1',
'device_owner':
constants.DEVICE_OWNER_DVR_INTERFACE,
constants.DEVICE_OWNER_DVR_INTERFACE,
'admin_state_up': 1,
'fixed_ips':
[{'subnet_id': subnet_id}]}}
port_req = self.new_create_request('ports', data)
port_res = port_req.get_response(self.api)
self.assertEqual(201, port_res.status_int)
port = self.deserialize(self.fmt, port_res)
port_id = port['port']['id']
new_name = 'a_brand_new_name'
data = {'port': {'name': new_name}}
req = self.new_update_request('ports', data, port_id)
res = req.get_response(self.api)
self.assertEqual(200, res.status_int)
self.assertTrue(dist_bindings.called)
self.assertTrue(port_pre.called)
self.assertTrue(port_post.called)
port = self._show('ports', port_id)
self.assertEqual(new_name, port['port']['name'])
[{'subnet_id': subnet_id}]}}
port_req = self.new_create_request('ports', data)
port_res = port_req.get_response(self.api)
self.assertEqual(201, port_res.status_int)
port = self.deserialize(self.fmt, port_res)
port_id = port['port']['id']
new_name = 'a_brand_new_name'
data = {'port': {'name': new_name}}
req = self.new_update_request('ports', data, port_id)
res = req.get_response(self.api)
self.assertEqual(200, res.status_int)
self.assertTrue(dist_bindings.called)
self.assertTrue(port_pre.called)
self.assertTrue(port_post.called)
port = self._show('ports', port_id)
self.assertEqual(new_name, port['port']['name'])
class TestML2PluggableIPAM(test_ipam.UseIpamMixin, TestMl2SubnetsV2):

View File

@ -224,5 +224,5 @@ class Test_LegacyPlusProviderConfiguration(base.BaseTestCase):
@mock.patch.object(provider_configuration.ProviderConfiguration,
"add_provider")
def test__update_router_provider_invalid(self, mock_method):
mock_method.side_effect = lib_exc.Invalid(message='message')
driver_controller._LegacyPlusProviderConfiguration()
mock_method.side_effect = lib_exc.Invalid(message='message')
driver_controller._LegacyPlusProviderConfiguration()

View File

@ -362,9 +362,9 @@ class TrunkPluginCompatDriversTestCase(test_plugin.Ml2PluginV2TestCase):
fakes.FakeDriver.create()
with mock.patch.object(
validators, 'get_validator', side_effect=KeyError), \
testtools.ExpectedException(
trunk_exc.SegmentationTypeValidatorNotFound):
trunk_plugin.TrunkPlugin()
testtools.ExpectedException(
trunk_exc.SegmentationTypeValidatorNotFound):
trunk_plugin.TrunkPlugin()
def test_plugins_fails_to_start_conflicting_seg_types(self):
fakes.FakeDriver.create()

View File

@ -622,7 +622,7 @@ class ResourceTest(base.BaseTestCase):
@staticmethod
def my_fault_body_function():
return 'off'
return 'off'
class Controller(object):
def index(self, request, index=None):

View File

@ -50,10 +50,10 @@ class TestTesttoolsExceptionHandler(base.BaseTestCase):
with mock.patch('six.moves.builtins.__import__',
side_effect=import_mock):
pdb_debugger = post_mortem_debug._get_debugger('pdb')
pudb_debugger = post_mortem_debug._get_debugger('pudb')
self.assertEqual('pdb', pdb_debugger.__name__)
self.assertEqual('pudb', pudb_debugger.__name__)
pdb_debugger = post_mortem_debug._get_debugger('pdb')
pudb_debugger = post_mortem_debug._get_debugger('pudb')
self.assertEqual('pdb', pdb_debugger.__name__)
self.assertEqual('pudb', pudb_debugger.__name__)
class TestFilteredTraceback(base.BaseTestCase):