Merge pull request #366 from Cerberus98/rm11016_merge
Adds validation around port create with fixed ips
This commit is contained in:
@@ -42,6 +42,39 @@ def _raise_if_unauthorized(tenant_id, net):
|
|||||||
raise exceptions.NotAuthorized()
|
raise exceptions.NotAuthorized()
|
||||||
|
|
||||||
|
|
||||||
|
def split_and_validate_requested_subnets(context, net_id, segment_id,
|
||||||
|
fixed_ips):
|
||||||
|
subnets = []
|
||||||
|
ip_addresses = {}
|
||||||
|
for fixed_ip in fixed_ips:
|
||||||
|
subnet_id = fixed_ip.get("subnet_id")
|
||||||
|
ip_address = fixed_ip.get("ip_address")
|
||||||
|
if not subnet_id:
|
||||||
|
raise exceptions.BadRequest(resource="fixed_ips",
|
||||||
|
msg="subnet_id required")
|
||||||
|
if ip_address:
|
||||||
|
ip_addresses[ip_address] = subnet_id
|
||||||
|
else:
|
||||||
|
subnets.append(subnet_id)
|
||||||
|
|
||||||
|
subnets = ip_addresses.values() + subnets
|
||||||
|
|
||||||
|
sub_models = db_api.subnet_find(context, id=subnets, scope=db_api.ALL)
|
||||||
|
if len(sub_models) == 0:
|
||||||
|
raise exceptions.NotFound(msg="Requested subnet(s) not found")
|
||||||
|
|
||||||
|
for s in sub_models:
|
||||||
|
if s["network_id"] != net_id:
|
||||||
|
raise exceptions.InvalidInput(
|
||||||
|
error_message="Requested subnet doesn't belong to requested "
|
||||||
|
"network")
|
||||||
|
|
||||||
|
if segment_id and segment_id != s["segment_id"]:
|
||||||
|
raise q_exc.AmbiguousNetworkId(net_id=net_id)
|
||||||
|
|
||||||
|
return ip_addresses, subnets
|
||||||
|
|
||||||
|
|
||||||
def create_port(context, port):
|
def create_port(context, port):
|
||||||
"""Create a port
|
"""Create a port
|
||||||
|
|
||||||
@@ -128,34 +161,22 @@ def create_port(context, port):
|
|||||||
with utils.CommandManager().execute() as cmd_mgr:
|
with utils.CommandManager().execute() as cmd_mgr:
|
||||||
@cmd_mgr.do
|
@cmd_mgr.do
|
||||||
def _allocate_ips(fixed_ips, net, port_id, segment_id, mac):
|
def _allocate_ips(fixed_ips, net, port_id, segment_id, mac):
|
||||||
subnets = []
|
fixed_ip_kwargs = {}
|
||||||
ip_addresses = {}
|
|
||||||
if fixed_ips:
|
if fixed_ips:
|
||||||
for fixed_ip in fixed_ips:
|
if STRATEGY.is_parent_network(net_id) and not context.is_admin:
|
||||||
subnet_id = fixed_ip.get("subnet_id")
|
raise exceptions.NotAuthorized()
|
||||||
ip_address = fixed_ip.get("ip_address")
|
|
||||||
if not subnet_id:
|
|
||||||
raise exceptions.BadRequest(
|
|
||||||
resource="fixed_ips",
|
|
||||||
msg="subnet_id required")
|
|
||||||
if ip_address:
|
|
||||||
ip_addresses[ip_address] = subnet_id
|
|
||||||
else:
|
|
||||||
subnets.append(subnet_id)
|
|
||||||
|
|
||||||
ips = ip_addresses.keys()
|
ips, subnets = split_and_validate_requested_subnets(context,
|
||||||
subnets = ip_addresses.values() + subnets
|
net_id,
|
||||||
|
segment_id,
|
||||||
|
fixed_ips)
|
||||||
|
fixed_ip_kwargs["ip_addresses"] = ips
|
||||||
|
fixed_ip_kwargs["subnets"] = subnets
|
||||||
|
|
||||||
ipam_driver.allocate_ip_address(
|
ipam_driver.allocate_ip_address(
|
||||||
context, addresses, net["id"], port_id,
|
context, addresses, net["id"], port_id,
|
||||||
CONF.QUARK.ipam_reuse_after, segment_id=segment_id,
|
CONF.QUARK.ipam_reuse_after, segment_id=segment_id,
|
||||||
ip_addresses=ips, subnets=subnets,
|
mac_address=mac, **fixed_ip_kwargs)
|
||||||
mac_address=mac)
|
|
||||||
else:
|
|
||||||
ipam_driver.allocate_ip_address(
|
|
||||||
context, addresses, net["id"], port_id,
|
|
||||||
CONF.QUARK.ipam_reuse_after, segment_id=segment_id,
|
|
||||||
mac_address=mac)
|
|
||||||
|
|
||||||
@cmd_mgr.undo
|
@cmd_mgr.undo
|
||||||
def _allocate_ips_undo(addr):
|
def _allocate_ips_undo(addr):
|
||||||
|
|||||||
@@ -95,12 +95,12 @@ class QuarkPortsPaginationFunctionalTest(BaseFunctionalTest):
|
|||||||
|
|
||||||
ip_network = netaddr.IPNetwork(cidr)
|
ip_network = netaddr.IPNetwork(cidr)
|
||||||
|
|
||||||
network = dict(id=1, name="public", tenant_id="make",
|
network = dict(id="1", name="public", tenant_id="make",
|
||||||
network_plugin="BASE",
|
network_plugin="BASE",
|
||||||
ipam_strategy="ANY")
|
ipam_strategy="ANY")
|
||||||
network = {"network": network}
|
network = {"network": network}
|
||||||
|
|
||||||
subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
|
subnet = dict(id="1", ip_version=4, next_auto_assign_ip=2,
|
||||||
cidr=cidr, first_ip=ip_network.first,
|
cidr=cidr, first_ip=ip_network.first,
|
||||||
last_ip=ip_network.last, ip_policy=None,
|
last_ip=ip_network.last, ip_policy=None,
|
||||||
tenant_id="fake")
|
tenant_id="fake")
|
||||||
|
|||||||
@@ -75,15 +75,15 @@ class QuarkCreatePortSatisfyIpam(BaseFunctionalTest):
|
|||||||
cidr_v6 = "2001:db8::/32"
|
cidr_v6 = "2001:db8::/32"
|
||||||
ip_network = netaddr.IPNetwork(cidr)
|
ip_network = netaddr.IPNetwork(cidr)
|
||||||
ipv6_network = netaddr.IPNetwork(cidr_v6)
|
ipv6_network = netaddr.IPNetwork(cidr_v6)
|
||||||
network = dict(id=1, name="public", tenant_id="make",
|
network = dict(id='1', name="public", tenant_id="make",
|
||||||
network_plugin="BASE",
|
network_plugin="BASE",
|
||||||
ipam_strategy="BOTH_REQUIRED")
|
ipam_strategy="BOTH_REQUIRED")
|
||||||
network = {"network": network}
|
network = {"network": network}
|
||||||
subnet_v4 = dict(id=1, ip_version=4, next_auto_assign_ip=2,
|
subnet_v4 = dict(id='1', ip_version=4, next_auto_assign_ip=2,
|
||||||
cidr=cidr, first_ip=ip_network.first,
|
cidr=cidr, first_ip=ip_network.first,
|
||||||
last_ip=ip_network.last, ip_policy=None,
|
last_ip=ip_network.last, ip_policy=None,
|
||||||
tenant_id="fake")
|
tenant_id="fake")
|
||||||
subnet_v6 = dict(id=2, ip_version=6, next_auto_assign_ip=2,
|
subnet_v6 = dict(id='2', ip_version=6, next_auto_assign_ip=2,
|
||||||
cidr=cidr_v6, first_ip=ipv6_network.first,
|
cidr=cidr_v6, first_ip=ipv6_network.first,
|
||||||
last_ip=ipv6_network.last, ip_policy=None,
|
last_ip=ipv6_network.last, ip_policy=None,
|
||||||
tenant_id="fake")
|
tenant_id="fake")
|
||||||
@@ -111,11 +111,11 @@ class QuarkCreatePortSatisfyIpam(BaseFunctionalTest):
|
|||||||
def test_port_created_doesnt_satisfy_ipam_strategy_raises(self):
|
def test_port_created_doesnt_satisfy_ipam_strategy_raises(self):
|
||||||
cidr = "192.168.1.0/24"
|
cidr = "192.168.1.0/24"
|
||||||
ip_network = netaddr.IPNetwork(cidr)
|
ip_network = netaddr.IPNetwork(cidr)
|
||||||
network = dict(id=1, name="public", tenant_id="make",
|
network = dict(id='1', name="public", tenant_id="make",
|
||||||
network_plugin="BASE",
|
network_plugin="BASE",
|
||||||
ipam_strategy="BOTH_REQUIRED")
|
ipam_strategy="BOTH_REQUIRED")
|
||||||
network = {"network": network}
|
network = {"network": network}
|
||||||
subnet_v4 = dict(id=1, ip_version=4, next_auto_assign_ip=2,
|
subnet_v4 = dict(id='1', ip_version=4, next_auto_assign_ip=2,
|
||||||
cidr=cidr, first_ip=ip_network.first,
|
cidr=cidr, first_ip=ip_network.first,
|
||||||
last_ip=ip_network.last, ip_policy=None,
|
last_ip=ip_network.last, ip_policy=None,
|
||||||
tenant_id="fake")
|
tenant_id="fake")
|
||||||
@@ -160,11 +160,11 @@ class QuarkCreatePortWithIpNotMandatory(BaseFunctionalTest):
|
|||||||
def test_port_created_with_only_subnet(self):
|
def test_port_created_with_only_subnet(self):
|
||||||
cidr = "192.168.1.0/24"
|
cidr = "192.168.1.0/24"
|
||||||
ip_network = netaddr.IPNetwork(cidr)
|
ip_network = netaddr.IPNetwork(cidr)
|
||||||
network = dict(id=1, name="public", tenant_id="make",
|
network = dict(id='1', name="public", tenant_id="make",
|
||||||
network_plugin="BASE",
|
network_plugin="BASE",
|
||||||
ipam_strategy="ANY")
|
ipam_strategy="ANY")
|
||||||
network = {"network": network}
|
network = {"network": network}
|
||||||
subnet_v4 = dict(id=1, ip_version=4, next_auto_assign_ip=2,
|
subnet_v4 = dict(id='1', ip_version=4, next_auto_assign_ip=2,
|
||||||
cidr=cidr, first_ip=ip_network.first,
|
cidr=cidr, first_ip=ip_network.first,
|
||||||
last_ip=ip_network.last, ip_policy=None,
|
last_ip=ip_network.last, ip_policy=None,
|
||||||
tenant_id="fake")
|
tenant_id="fake")
|
||||||
@@ -192,20 +192,20 @@ class QuarkCreatePortWithIpNotMandatory(BaseFunctionalTest):
|
|||||||
ip_network = netaddr.IPNetwork(cidr)
|
ip_network = netaddr.IPNetwork(cidr)
|
||||||
another_ip_network = netaddr.IPNetwork(another_cidr)
|
another_ip_network = netaddr.IPNetwork(another_cidr)
|
||||||
ipv6_network = netaddr.IPNetwork(cidr_v6)
|
ipv6_network = netaddr.IPNetwork(cidr_v6)
|
||||||
network = dict(id=1, name="public", tenant_id="make",
|
network = dict(id='1', name="public", tenant_id="make",
|
||||||
network_plugin="BASE",
|
network_plugin="BASE",
|
||||||
ipam_strategy="BOTH_REQUIRED")
|
ipam_strategy="BOTH_REQUIRED")
|
||||||
network = {"network": network}
|
network = {"network": network}
|
||||||
subnet_v4 = dict(id=1, ip_version=4, next_auto_assign_ip=2,
|
subnet_v4 = dict(id='1', ip_version=4, next_auto_assign_ip=2,
|
||||||
cidr=cidr, first_ip=ip_network.first,
|
cidr=cidr, first_ip=ip_network.first,
|
||||||
last_ip=ip_network.last, ip_policy=None,
|
last_ip=ip_network.last, ip_policy=None,
|
||||||
tenant_id="fake")
|
tenant_id="fake")
|
||||||
another_subnet_v4 = dict(id=2, ip_version=4, next_auto_assign_ip=2,
|
another_subnet_v4 = dict(id='2', ip_version=4, next_auto_assign_ip=2,
|
||||||
cidr=another_cidr,
|
cidr=another_cidr,
|
||||||
first_ip=another_ip_network.first,
|
first_ip=another_ip_network.first,
|
||||||
last_ip=another_ip_network.last,
|
last_ip=another_ip_network.last,
|
||||||
ip_policy=None, tenant_id="fake")
|
ip_policy=None, tenant_id="fake")
|
||||||
subnet_v6 = dict(id=3, ip_version=6, next_auto_assign_ip=2,
|
subnet_v6 = dict(id='3', ip_version=6, next_auto_assign_ip=2,
|
||||||
cidr=cidr_v6, first_ip=ipv6_network.first,
|
cidr=cidr_v6, first_ip=ipv6_network.first,
|
||||||
last_ip=ipv6_network.last, ip_policy=None,
|
last_ip=ipv6_network.last, ip_policy=None,
|
||||||
tenant_id="fake")
|
tenant_id="fake")
|
||||||
@@ -261,11 +261,11 @@ class QuarkCreatePortWithForbiddenMacRange(BaseFunctionalTest):
|
|||||||
def test_port_created_with_forbidden_mac_range(self):
|
def test_port_created_with_forbidden_mac_range(self):
|
||||||
cidr = "192.168.1.0/24"
|
cidr = "192.168.1.0/24"
|
||||||
ip_network = netaddr.IPNetwork(cidr)
|
ip_network = netaddr.IPNetwork(cidr)
|
||||||
network = dict(id=1, name="public", tenant_id="make",
|
network = dict(id='1', name="public", tenant_id="make",
|
||||||
network_plugin="BASE",
|
network_plugin="BASE",
|
||||||
ipam_strategy="ANY")
|
ipam_strategy="ANY")
|
||||||
network = {"network": network}
|
network = {"network": network}
|
||||||
subnet_v4 = dict(id=1, ip_version=4, next_auto_assign_ip=2,
|
subnet_v4 = dict(id='1', ip_version=4, next_auto_assign_ip=2,
|
||||||
cidr=cidr, first_ip=ip_network.first,
|
cidr=cidr, first_ip=ip_network.first,
|
||||||
last_ip=ip_network.last, ip_policy=None,
|
last_ip=ip_network.last, ip_policy=None,
|
||||||
tenant_id="fake")
|
tenant_id="fake")
|
||||||
|
|||||||
@@ -497,7 +497,13 @@ class TestQuarkCreatePortRM9305(test_quark_plugin.TestQuarkPlugin):
|
|||||||
class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
|
class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _stubs(self, port=None, network=None, addr=None, mac=None,
|
def _stubs(self, port=None, network=None, addr=None, mac=None,
|
||||||
limit_checks=None):
|
limit_checks=None, subnet=None):
|
||||||
|
subnet_model = None
|
||||||
|
|
||||||
|
if subnet:
|
||||||
|
subnet_model = models.Subnet()
|
||||||
|
subnet_model.update(subnet)
|
||||||
|
|
||||||
if network:
|
if network:
|
||||||
network["network_plugin"] = "BASE"
|
network["network_plugin"] = "BASE"
|
||||||
network["ipam_strategy"] = "ANY"
|
network["ipam_strategy"] = "ANY"
|
||||||
@@ -520,13 +526,16 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
|
|||||||
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
|
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
|
||||||
mock.patch("quark.ipam.QuarkIpam.allocate_mac_address"),
|
mock.patch("quark.ipam.QuarkIpam.allocate_mac_address"),
|
||||||
mock.patch("quark.db.api.port_count_all"),
|
mock.patch("quark.db.api.port_count_all"),
|
||||||
mock.patch("neutron.quota.QuotaEngine.limit_check")
|
mock.patch("neutron.quota.QuotaEngine.limit_check"),
|
||||||
|
mock.patch("quark.db.api.subnet_find"),
|
||||||
) as (port_create, net_find, alloc_ip, alloc_mac, port_count,
|
) as (port_create, net_find, alloc_ip, alloc_mac, port_count,
|
||||||
limit_check):
|
limit_check, subnet_find):
|
||||||
port_create.side_effect = _create_db_port
|
port_create.side_effect = _create_db_port
|
||||||
net_find.return_value = network
|
net_find.return_value = network
|
||||||
alloc_ip.side_effect = _alloc_ip
|
alloc_ip.side_effect = _alloc_ip
|
||||||
alloc_mac.return_value = mac
|
alloc_mac.return_value = mac
|
||||||
|
if subnet:
|
||||||
|
subnet_find.return_value = [subnet_model]
|
||||||
port_count.return_value = 0
|
port_count.return_value = 0
|
||||||
if limit_checks:
|
if limit_checks:
|
||||||
limit_check.side_effect = limit_checks
|
limit_check.side_effect = limit_checks
|
||||||
@@ -604,15 +613,16 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
|
|||||||
self.assertEqual(result[key], expected[key])
|
self.assertEqual(result[key], expected[key])
|
||||||
|
|
||||||
def test_create_port_fixed_ip(self):
|
def test_create_port_fixed_ip(self):
|
||||||
network = dict(id=1, tenant_id=self.context.tenant_id)
|
network = dict(id='1', tenant_id=self.context.tenant_id)
|
||||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||||
|
subnet = dict(id=1, network_id=network["id"])
|
||||||
ip = mock.MagicMock()
|
ip = mock.MagicMock()
|
||||||
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
|
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
|
||||||
ip.formatted = lambda: "192.168.10.45"
|
ip.formatted = lambda: "192.168.10.45"
|
||||||
ip.enabled_for_port = lambda x: True
|
ip.enabled_for_port = lambda x: True
|
||||||
fixed_ips = [dict(subnet_id=1, enabled=True,
|
fixed_ips = [dict(subnet_id=1, enabled=True,
|
||||||
ip_address="192.168.10.45")]
|
ip_address="192.168.10.45")]
|
||||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
port = dict(port=dict(mac_address=mac["address"], network_id='1',
|
||||||
tenant_id=self.context.tenant_id, device_id=2,
|
tenant_id=self.context.tenant_id, device_id=2,
|
||||||
fixed_ips=fixed_ips, ip_addresses=[ip]))
|
fixed_ips=fixed_ips, ip_addresses=[ip]))
|
||||||
|
|
||||||
@@ -625,12 +635,96 @@ class TestQuarkCreatePortsSameDevBadRequest(test_quark_plugin.TestQuarkPlugin):
|
|||||||
'fixed_ips': fixed_ips,
|
'fixed_ips': fixed_ips,
|
||||||
'device_id': 2}
|
'device_id': 2}
|
||||||
with self._stubs(port=port["port"], network=network, addr=ip,
|
with self._stubs(port=port["port"], network=network, addr=ip,
|
||||||
mac=mac) as port_create:
|
mac=mac, subnet=subnet) as port_create:
|
||||||
result = self.plugin.create_port(self.context, port)
|
result = self.plugin.create_port(self.context, port)
|
||||||
self.assertTrue(port_create.called)
|
self.assertTrue(port_create.called)
|
||||||
for key in expected.keys():
|
for key in expected.keys():
|
||||||
self.assertEqual(result[key], expected[key])
|
self.assertEqual(result[key], expected[key])
|
||||||
|
|
||||||
|
@mock.patch("quark.network_strategy.JSONStrategy.is_parent_network")
|
||||||
|
def test_create_providernet_port_fixed_ip_not_authorized(self, is_parent):
|
||||||
|
is_parent.return_value = True
|
||||||
|
network = dict(id='1', tenant_id=self.context.tenant_id)
|
||||||
|
subnet = dict(id=1, network_id=network["id"])
|
||||||
|
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||||
|
ip = mock.MagicMock()
|
||||||
|
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
|
||||||
|
ip.formatted = lambda: "192.168.10.45"
|
||||||
|
ip.enabled_for_port = lambda x: True
|
||||||
|
fixed_ips = [dict(subnet_id=1, enabled=True,
|
||||||
|
ip_address="192.168.10.45")]
|
||||||
|
port = dict(port=dict(mac_address=mac["address"], network_id='1',
|
||||||
|
tenant_id=self.context.tenant_id, device_id=2,
|
||||||
|
fixed_ips=fixed_ips, ip_addresses=[ip],
|
||||||
|
segment_id="provider_segment"))
|
||||||
|
|
||||||
|
with self._stubs(port=port["port"], network=network, addr=ip,
|
||||||
|
mac=mac, subnet=subnet):
|
||||||
|
with self.assertRaises(exceptions.NotAuthorized):
|
||||||
|
self.plugin.create_port(self.context, port)
|
||||||
|
|
||||||
|
@mock.patch("quark.network_strategy.JSONStrategy.is_parent_network")
|
||||||
|
def test_create_providernet_port_fixed_ip_wrong_segment(self, is_parent):
|
||||||
|
is_parent.return_value = True
|
||||||
|
network = dict(id='1', tenant_id=self.context.tenant_id)
|
||||||
|
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||||
|
subnet = dict(id=1, network_id=network["id"])
|
||||||
|
ip = mock.MagicMock()
|
||||||
|
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
|
||||||
|
ip.formatted = lambda: "192.168.10.45"
|
||||||
|
ip.enabled_for_port = lambda x: True
|
||||||
|
fixed_ips = [dict(subnet_id=1, enabled=True,
|
||||||
|
ip_address="192.168.10.45")]
|
||||||
|
port = dict(port=dict(mac_address=mac["address"], network_id='1',
|
||||||
|
tenant_id=self.context.tenant_id, device_id=2,
|
||||||
|
fixed_ips=fixed_ips, ip_addresses=[ip],
|
||||||
|
segment_id="provider_segment"))
|
||||||
|
|
||||||
|
with self._stubs(port=port["port"], network=network, addr=ip,
|
||||||
|
mac=mac, subnet=subnet):
|
||||||
|
with self.assertRaises(q_exc.AmbiguousNetworkId):
|
||||||
|
self.plugin.create_port(self.context.elevated(), port)
|
||||||
|
|
||||||
|
def test_create_port_fixed_ip_subnet_not_found(self):
|
||||||
|
network = dict(id='1', tenant_id=self.context.tenant_id)
|
||||||
|
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||||
|
ip = mock.MagicMock()
|
||||||
|
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
|
||||||
|
ip.formatted = lambda: "192.168.10.45"
|
||||||
|
ip.enabled_for_port = lambda x: True
|
||||||
|
fixed_ips = [dict(subnet_id=1, enabled=True,
|
||||||
|
ip_address="192.168.10.45")]
|
||||||
|
port = dict(port=dict(mac_address=mac["address"], network_id='1',
|
||||||
|
tenant_id=self.context.tenant_id, device_id=2,
|
||||||
|
fixed_ips=fixed_ips, ip_addresses=[ip],
|
||||||
|
segment_id="provider_segment"))
|
||||||
|
|
||||||
|
with self._stubs(port=port["port"], network=network, addr=ip,
|
||||||
|
mac=mac):
|
||||||
|
with self.assertRaises(exceptions.NotFound):
|
||||||
|
self.plugin.create_port(self.context.elevated(), port)
|
||||||
|
|
||||||
|
def test_create_port_fixed_ip_subnet_not_in_network(self):
|
||||||
|
network = dict(id='1', tenant_id=self.context.tenant_id)
|
||||||
|
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||||
|
subnet = dict(id=1, network_id='2')
|
||||||
|
|
||||||
|
ip = mock.MagicMock()
|
||||||
|
ip.get = lambda x, *y: 1 if x == "subnet_id" else None
|
||||||
|
ip.formatted = lambda: "192.168.10.45"
|
||||||
|
ip.enabled_for_port = lambda x: True
|
||||||
|
fixed_ips = [dict(subnet_id=1, enabled=True,
|
||||||
|
ip_address="192.168.10.45")]
|
||||||
|
port = dict(port=dict(mac_address=mac["address"], network_id='1',
|
||||||
|
tenant_id=self.context.tenant_id, device_id=2,
|
||||||
|
fixed_ips=fixed_ips, ip_addresses=[ip],
|
||||||
|
segment_id="provider_segment"))
|
||||||
|
|
||||||
|
with self._stubs(port=port["port"], network=network, addr=ip,
|
||||||
|
mac=mac, subnet=subnet):
|
||||||
|
with self.assertRaises(exceptions.InvalidInput):
|
||||||
|
self.plugin.create_port(self.context.elevated(), port)
|
||||||
|
|
||||||
def test_create_port_fixed_ips_bad_request(self):
|
def test_create_port_fixed_ips_bad_request(self):
|
||||||
network = dict(id=1, tenant_id=self.context.tenant_id)
|
network = dict(id=1, tenant_id=self.context.tenant_id)
|
||||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||||
|
|||||||
Reference in New Issue
Block a user