Merge pull request #463 from asadoughi/ncp1667

Validate ip_address input on update_port and get_ports
This commit is contained in:
Justin Hammond
2015-10-13 14:25:15 -05:00
2 changed files with 29 additions and 2 deletions

View File

@@ -334,7 +334,12 @@ def update_port(context, id, port):
msg="subnet_id required for ip_address allocation")
if subnet_id and ip_address:
ip_netaddr = netaddr.IPAddress(ip_address).ipv6()
ip_netaddr = None
try:
ip_netaddr = netaddr.IPAddress(ip_address).ipv6()
except netaddr.AddrFormatError:
raise exceptions.InvalidInput(
error_message="Invalid format provided for ip_address")
ip_addresses[ip_netaddr] = subnet_id
else:
subnet_ids.append(subnet_id)
@@ -453,7 +458,12 @@ def get_ports(context, limit=None, sorts=None, marker=None, page_reverse=False,
if "ip_address" in filters:
if not context.is_admin:
raise exceptions.NotAuthorized()
ips = [netaddr.IPAddress(ip) for ip in filters.pop("ip_address")]
ips = []
try:
ips = [netaddr.IPAddress(ip) for ip in filters.pop("ip_address")]
except netaddr.AddrFormatError:
raise exceptions.InvalidInput(
error_message="Invalid format provided for ip_address")
query = db_api.port_find_by_ip_address(context, ip_address=ips,
scope=db_api.ALL, **filters)
ports = []

View File

@@ -192,6 +192,13 @@ class TestQuarkGetPortsByIPAddress(test_quark_plugin.TestQuarkPlugin):
self.plugin.get_ports(self.context, filters=filters,
fields=None)
def test_port_list_malformed_address_bad_request(self):
with self._stubs(ports=[]):
filters = {"ip_address": ["malformed-address-here"]}
admin_ctx = self.context.elevated()
with self.assertRaises(exceptions.BadRequest):
self.plugin.get_ports(admin_ctx, filters=filters, fields=None)
class TestQuarkCreatePortFailure(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
@@ -689,6 +696,16 @@ class TestQuarkUpdatePort(test_quark_plugin.TestQuarkPlugin):
with self.assertRaises(exceptions.BadRequest):
self.plugin.update_port(self.context, 1, new_port)
def test_update_port_fixed_ip_bad_request_malformed_address(self):
with self._stubs(
port=dict(id=1, name="myport", mac_address="0:0:0:0:0:1")
) as (port_find, port_update, alloc_ip, dealloc_ip):
new_port = dict(port=dict(
fixed_ips=[dict(subnet_id=1,
ip_address="malformed-address-here")]))
with self.assertRaises(exceptions.BadRequest):
self.plugin.update_port(self.context, 1, new_port)
def test_update_port_fixed_ip(self):
with self._stubs(
port=dict(id=1, name="myport", mac_address="0:0:0:0:0:1")