Switch networking function in cloud layer to proxy

We need to complete rework of our cloud layer to rely on proxy layer
instead of reimplementing direct API calls. This time networking
functions are touched.

Change-Id: I23dc30abc8977c8ff14f5e7b9c9940af0d0894c7
This commit is contained in:
Artem Goncharov 2021-01-15 15:38:08 +01:00 committed by Artem Goncharov
parent 03b2b3b00e
commit 6db391f674
17 changed files with 540 additions and 347 deletions

View File

@ -82,8 +82,8 @@ class FloatingIPCloudMixin(_normalize.Normalizer):
def _neutron_list_floating_ips(self, filters=None): def _neutron_list_floating_ips(self, filters=None):
if not filters: if not filters:
filters = {} filters = {}
data = self.network.get('/floatingips', params=filters) data = list(self.network.ips(**filters))
return self._get_and_munchify('floatingips', data) return data
def _nova_list_floating_ips(self): def _nova_list_floating_ips(self):
try: try:
@ -228,11 +228,8 @@ class FloatingIPCloudMixin(_normalize.Normalizer):
error_message = "Error getting floating ip with ID {id}".format(id=id) error_message = "Error getting floating ip with ID {id}".format(id=id)
if self._use_neutron_floating(): if self._use_neutron_floating():
data = proxy._json_response( fip = self.network.get_ip(id)
self.network.get('/floatingips/{id}'.format(id=id)), return self._normalize_floating_ip(fip)
error_message=error_message)
return self._normalize_floating_ip(
self._get_and_munchify('floatingip', data))
else: else:
data = proxy._json_response( data = proxy._json_response(
self.compute.get('/os-floating-ips/{id}'.format(id=id)), self.compute.get('/os-floating-ips/{id}'.format(id=id)),
@ -461,10 +458,8 @@ class FloatingIPCloudMixin(_normalize.Normalizer):
def _submit_create_fip(self, kwargs): def _submit_create_fip(self, kwargs):
# Split into a method to aid in test mocking # Split into a method to aid in test mocking
data = self.network.post( data = self.network.create_ip(**kwargs)
"/floatingips", json={"floatingip": kwargs}) return self._normalize_floating_ip(data)
return self._normalize_floating_ip(
self._get_and_munchify('floatingip', data))
def _neutron_create_floating_ip( def _neutron_create_floating_ip(
self, network_name_or_id=None, server=None, self, network_name_or_id=None, server=None,
@ -474,8 +469,9 @@ class FloatingIPCloudMixin(_normalize.Normalizer):
if not network_id: if not network_id:
if network_name_or_id: if network_name_or_id:
network = self.get_network(network_name_or_id) try:
if not network: network = self.network.find_network(network_name_or_id)
except exceptions.ResourceNotFound:
raise exc.OpenStackCloudResourceNotFound( raise exc.OpenStackCloudResourceNotFound(
"unable to find network for floating ips with ID " "unable to find network for floating ips with ID "
"{0}".format(network_name_or_id)) "{0}".format(network_name_or_id))
@ -612,15 +608,11 @@ class FloatingIPCloudMixin(_normalize.Normalizer):
def _neutron_delete_floating_ip(self, floating_ip_id): def _neutron_delete_floating_ip(self, floating_ip_id):
try: try:
proxy._json_response(self.network.delete( self.network.delete_ip(
"/floatingips/{fip_id}".format(fip_id=floating_ip_id), floating_ip_id, ignore_missing=False
error_message="unable to delete floating IP")) )
except exc.OpenStackCloudResourceNotFound: except exceptions.ResourceNotFound:
return False return False
except Exception as e:
raise exc.OpenStackCloudException(
"Unable to delete floating IP ID {fip_id}: {msg}".format(
fip_id=floating_ip_id, msg=str(e)))
return True return True
def _nova_delete_floating_ip(self, floating_ip_id): def _nova_delete_floating_ip(self, floating_ip_id):
@ -751,14 +743,9 @@ class FloatingIPCloudMixin(_normalize.Normalizer):
if fixed_address is not None: if fixed_address is not None:
floating_ip_args['fixed_ip_address'] = fixed_address floating_ip_args['fixed_ip_address'] = fixed_address
return proxy._json_response( return self.network.update_ip(
self.network.put( floating_ip,
"/floatingips/{fip_id}".format(fip_id=floating_ip['id']), **floating_ip_args)
json={'floatingip': floating_ip_args}),
error_message=("Error attaching IP {ip} to "
"server {server_id}".format(
ip=floating_ip['id'],
server_id=server['id'])))
def _nova_attach_ip_to_server(self, server_id, floating_ip_id, def _nova_attach_ip_to_server(self, server_id, floating_ip_id,
fixed_address=None): fixed_address=None):
@ -809,11 +796,14 @@ class FloatingIPCloudMixin(_normalize.Normalizer):
f_ip = self.get_floating_ip(id=floating_ip_id) f_ip = self.get_floating_ip(id=floating_ip_id)
if f_ip is None or not f_ip['attached']: if f_ip is None or not f_ip['attached']:
return False return False
exceptions.raise_from_response( try:
self.network.put( self.network.update_ip(
"/floatingips/{fip_id}".format(fip_id=floating_ip_id), floating_ip_id,
json={"floatingip": {"port_id": None}}), port_id=None
error_message=("Error detaching IP {ip} from " )
except exceptions.SDKException:
raise exceptions.SDKException(
("Error detaching IP {ip} from "
"server {server_id}".format( "server {server_id}".format(
ip=floating_ip_id, server_id=server_id))) ip=floating_ip_id, server_id=server_id)))

View File

@ -21,7 +21,6 @@ from openstack.cloud import _normalize
from openstack.cloud import _utils from openstack.cloud import _utils
from openstack.cloud import exc from openstack.cloud import exc
from openstack import exceptions from openstack import exceptions
from openstack import proxy
class NetworkCloudMixin(_normalize.Normalizer): class NetworkCloudMixin(_normalize.Normalizer):
@ -53,9 +52,12 @@ class NetworkCloudMixin(_normalize.Normalizer):
:raises: ``OpenStackCloudException`` if something goes wrong during the :raises: ``OpenStackCloudException`` if something goes wrong during the
OpenStack API call. OpenStack API call.
""" """
networks = self.list_networks( query = {}
filters if isinstance(filters, dict) else None) if name_or_id:
return _utils._filter_list(networks, name_or_id, filters) query['name'] = name_or_id
if filters:
query.update(filters)
return list(self.network.networks(**query))
def search_routers(self, name_or_id=None, filters=None): def search_routers(self, name_or_id=None, filters=None):
"""Search routers """Search routers
@ -69,9 +71,12 @@ class NetworkCloudMixin(_normalize.Normalizer):
:raises: ``OpenStackCloudException`` if something goes wrong during the :raises: ``OpenStackCloudException`` if something goes wrong during the
OpenStack API call. OpenStack API call.
""" """
routers = self.list_routers( query = {}
filters if isinstance(filters, dict) else None) if name_or_id:
return _utils._filter_list(routers, name_or_id, filters) query['name'] = name_or_id
if filters:
query.update(filters)
return list(self.network.routers(**query))
def search_subnets(self, name_or_id=None, filters=None): def search_subnets(self, name_or_id=None, filters=None):
"""Search subnets """Search subnets
@ -85,9 +90,12 @@ class NetworkCloudMixin(_normalize.Normalizer):
:raises: ``OpenStackCloudException`` if something goes wrong during the :raises: ``OpenStackCloudException`` if something goes wrong during the
OpenStack API call. OpenStack API call.
""" """
subnets = self.list_subnets( query = {}
filters if isinstance(filters, dict) else None) if name_or_id:
return _utils._filter_list(subnets, name_or_id, filters) query['name'] = name_or_id
if filters:
query.update(filters)
return list(self.network.subnets(**query))
def search_ports(self, name_or_id=None, filters=None): def search_ports(self, name_or_id=None, filters=None):
"""Search ports """Search ports
@ -121,11 +129,11 @@ class NetworkCloudMixin(_normalize.Normalizer):
# If the cloud is running nova-network, just return an empty list. # If the cloud is running nova-network, just return an empty list.
if not self.has_service('network'): if not self.has_service('network'):
return [] return []
# Translate None from search interface to empty {} for kwargs below # Translate None from search interface to empty {} for kwargs below
if not filters: if not filters:
filters = {} filters = {}
data = self.network.get("/networks", params=filters) return list(self.network.networks(**filters))
return self._get_and_munchify('networks', data)
def list_routers(self, filters=None): def list_routers(self, filters=None):
"""List all available routers. """List all available routers.
@ -137,14 +145,11 @@ class NetworkCloudMixin(_normalize.Normalizer):
# If the cloud is running nova-network, just return an empty list. # If the cloud is running nova-network, just return an empty list.
if not self.has_service('network'): if not self.has_service('network'):
return [] return []
# Translate None from search interface to empty {} for kwargs below # Translate None from search interface to empty {} for kwargs below
if not filters: if not filters:
filters = {} filters = {}
resp = self.network.get("/routers", params=filters) return list(self.network.routers(**filters))
data = proxy._json_response(
resp,
error_message="Error fetching router list")
return self._get_and_munchify('routers', data)
def list_subnets(self, filters=None): def list_subnets(self, filters=None):
"""List all available subnets. """List all available subnets.
@ -156,11 +161,11 @@ class NetworkCloudMixin(_normalize.Normalizer):
# If the cloud is running nova-network, just return an empty list. # If the cloud is running nova-network, just return an empty list.
if not self.has_service('network'): if not self.has_service('network'):
return [] return []
# Translate None from search interface to empty {} for kwargs below # Translate None from search interface to empty {} for kwargs below
if not filters: if not filters:
filters = {} filters = {}
data = self.network.get("/subnets", params=filters) return list(self.network.subnets(**filters))
return self._get_and_munchify('subnets', data)
def list_ports(self, filters=None): def list_ports(self, filters=None):
"""List all available ports. """List all available ports.
@ -199,11 +204,10 @@ class NetworkCloudMixin(_normalize.Normalizer):
# If the cloud is running nova-network, just return an empty list. # If the cloud is running nova-network, just return an empty list.
if not self.has_service('network'): if not self.has_service('network'):
return [] return []
resp = self.network.get("/ports", params=filters)
data = proxy._json_response( if not filters:
resp, filters = {}
error_message="Error fetching port list") return list(self.network.ports(**filters))
return self._get_and_munchify('ports', data)
def get_qos_policy(self, name_or_id, filters=None): def get_qos_policy(self, name_or_id, filters=None):
"""Get a QoS policy by name or ID. """Get a QoS policy by name or ID.
@ -231,6 +235,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
if not self._has_neutron_extension('qos'): if not self._has_neutron_extension('qos'):
raise exc.OpenStackCloudUnavailableExtension( raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud') 'QoS extension is not available on target cloud')
if not filters: if not filters:
filters = {} filters = {}
return self.network.find_qos_policy( return self.network.find_qos_policy(
@ -253,6 +258,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
if not self._has_neutron_extension('qos'): if not self._has_neutron_extension('qos'):
raise exc.OpenStackCloudUnavailableExtension( raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud') 'QoS extension is not available on target cloud')
query = {} query = {}
if name_or_id: if name_or_id:
query['name'] = name_or_id query['name'] = name_or_id
@ -334,7 +340,12 @@ class NetworkCloudMixin(_normalize.Normalizer):
found. found.
""" """
return _utils._get_entity(self, 'network', name_or_id, filters) if not filters:
filters = {}
return self.network.find_network(
name_or_id=name_or_id,
ignore_missing=True,
**filters)
def get_network_by_id(self, id): def get_network_by_id(self, id):
""" Get a network by ID """ Get a network by ID
@ -342,14 +353,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
:param id: ID of the network. :param id: ID of the network.
:returns: A network ``munch.Munch``. :returns: A network ``munch.Munch``.
""" """
resp = self.network.get('/networks/{id}'.format(id=id)) return self.network.get_network(id)
data = proxy._json_response(
resp,
error_message="Error getting network with ID {id}".format(id=id)
)
network = self._get_and_munchify('network', data)
return network
def get_router(self, name_or_id, filters=None): def get_router(self, name_or_id, filters=None):
"""Get a router by name or ID. """Get a router by name or ID.
@ -374,7 +378,12 @@ class NetworkCloudMixin(_normalize.Normalizer):
found. found.
""" """
return _utils._get_entity(self, 'router', name_or_id, filters) if not filters:
filters = {}
return self.network.find_router(
name_or_id=name_or_id,
ignore_missing=True,
**filters)
def get_subnet(self, name_or_id, filters=None): def get_subnet(self, name_or_id, filters=None):
"""Get a subnet by name or ID. """Get a subnet by name or ID.
@ -395,7 +404,12 @@ class NetworkCloudMixin(_normalize.Normalizer):
found. found.
""" """
return _utils._get_entity(self, 'subnet', name_or_id, filters) if not filters:
filters = {}
return self.network.find_subnet(
name_or_id=name_or_id,
ignore_missing=True,
**filters)
def get_subnet_by_id(self, id): def get_subnet_by_id(self, id):
""" Get a subnet by ID """ Get a subnet by ID
@ -403,14 +417,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
:param id: ID of the subnet. :param id: ID of the subnet.
:returns: A subnet ``munch.Munch``. :returns: A subnet ``munch.Munch``.
""" """
resp = self.network.get('/subnets/{id}'.format(id=id)) return self.network.get_subnet(id)
data = proxy._json_response(
resp,
error_message="Error getting subnet with ID {id}".format(id=id)
)
subnet = self._get_and_munchify('subnet', data)
return subnet
def get_port(self, name_or_id, filters=None): def get_port(self, name_or_id, filters=None):
"""Get a port by name or ID. """Get a port by name or ID.
@ -434,7 +441,12 @@ class NetworkCloudMixin(_normalize.Normalizer):
:returns: A port ``munch.Munch`` or None if no matching port is found. :returns: A port ``munch.Munch`` or None if no matching port is found.
""" """
return _utils._get_entity(self, 'port', name_or_id, filters) if not filters:
filters = {}
return self.network.find_port(
name_or_id=name_or_id,
ignore_missing=True,
**filters)
def get_port_by_id(self, id): def get_port_by_id(self, id):
""" Get a port by ID """ Get a port by ID
@ -442,14 +454,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
:param id: ID of the port. :param id: ID of the port.
:returns: A port ``munch.Munch``. :returns: A port ``munch.Munch``.
""" """
resp = self.network.get('/ports/{id}'.format(id=id)) return self.network.get_port(id)
data = proxy._json_response(
resp,
error_message="Error getting port with ID {id}".format(id=id)
)
port = self._get_and_munchify('port', data)
return port
def create_network(self, name, shared=False, admin_state_up=True, def create_network(self, name, shared=False, admin_state_up=True,
external=False, provider=None, project_id=None, external=False, provider=None, project_id=None,
@ -487,7 +492,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
network['shared'] = shared network['shared'] = shared
if project_id is not None: if project_id is not None:
network['tenant_id'] = project_id network['project_id'] = project_id
if availability_zone_hints is not None: if availability_zone_hints is not None:
if not isinstance(availability_zone_hints, list): if not isinstance(availability_zone_hints, list):
@ -535,11 +540,11 @@ class NetworkCloudMixin(_normalize.Normalizer):
if dns_domain: if dns_domain:
network['dns_domain'] = dns_domain network['dns_domain'] = dns_domain
data = self.network.post("/networks", json={'network': network}) network = self.network.create_network(**network)
# Reset cache so the new network is picked up # Reset cache so the new network is picked up
self._reset_network_caches() self._reset_network_caches()
return self._get_and_munchify('network', data) return network
@_utils.valid_kwargs("name", "shared", "admin_state_up", "external", @_utils.valid_kwargs("name", "shared", "admin_state_up", "external",
"provider", "mtu_size", "port_security_enabled", "provider", "mtu_size", "port_security_enabled",
@ -598,14 +603,11 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudException( raise exc.OpenStackCloudException(
"Network %s not found." % name_or_id) "Network %s not found." % name_or_id)
data = proxy._json_response(self.network.put( network = self.network.update_network(network, **kwargs)
"/networks/{net_id}".format(net_id=network.id),
json={"network": kwargs}),
error_message="Error updating network {0}".format(name_or_id))
self._reset_network_caches() self._reset_network_caches()
return self._get_and_munchify('network', data) return network
def delete_network(self, name_or_id): def delete_network(self, name_or_id):
"""Delete a network. """Delete a network.
@ -621,8 +623,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
self.log.debug("Network %s not found for deleting", name_or_id) self.log.debug("Network %s not found for deleting", name_or_id)
return False return False
exceptions.raise_from_response(self.network.delete( self.network.delete_network(network)
"/networks/{network_id}".format(network_id=network['id'])))
# Reset cache so the deleted network is removed # Reset cache so the deleted network is removed
self._reset_network_caches() self._reset_network_caches()
@ -643,12 +644,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
if not proj: if not proj:
raise exc.OpenStackCloudException("project does not exist") raise exc.OpenStackCloudException("project does not exist")
exceptions.raise_from_response( self.network.update_quota(proj, **kwargs)
self.network.put(
'/quotas/{project_id}'.format(project_id=proj.id),
json={'quota': kwargs}),
error_message=("Error setting Neutron's quota for "
"project {0}".format(proj.id)))
def get_network_quotas(self, name_or_id, details=False): def get_network_quotas(self, name_or_id, details=False):
""" Get network quotas for a project """ Get network quotas for a project
@ -663,14 +659,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
proj = self.get_project(name_or_id) proj = self.get_project(name_or_id)
if not proj: if not proj:
raise exc.OpenStackCloudException("project does not exist") raise exc.OpenStackCloudException("project does not exist")
url = '/quotas/{project_id}'.format(project_id=proj.id) return self.network.get_quota(proj, details)
if details:
url = url + "/details"
data = proxy._json_response(
self.network.get(url),
error_message=("Error fetching Neutron's quota for "
"project {0}".format(proj.id)))
return self._get_and_munchify('quota', data)
def get_network_extensions(self): def get_network_extensions(self):
"""Get Cloud provided network extensions """Get Cloud provided network extensions
@ -691,11 +680,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
proj = self.get_project(name_or_id) proj = self.get_project(name_or_id)
if not proj: if not proj:
raise exc.OpenStackCloudException("project does not exist") raise exc.OpenStackCloudException("project does not exist")
exceptions.raise_from_response( self.network.delete_quota(proj)
self.network.delete(
'/quotas/{project_id}'.format(project_id=proj.id)),
error_message=("Error deleting Neutron's quota for "
"project {0}".format(proj.id)))
@_utils.valid_kwargs( @_utils.valid_kwargs(
'action', 'description', 'destination_firewall_group_id', 'action', 'description', 'destination_firewall_group_id',
@ -770,7 +755,10 @@ class NetworkCloudMixin(_normalize.Normalizer):
""" """
if not filters: if not filters:
filters = {} filters = {}
return self.network.find_firewall_rule(name_or_id, **filters) return self.network.find_firewall_rule(
name_or_id,
ignore_missing=True,
**filters)
def list_firewall_rules(self, filters=None): def list_firewall_rules(self, filters=None):
""" """
@ -894,7 +882,10 @@ class NetworkCloudMixin(_normalize.Normalizer):
""" """
if not filters: if not filters:
filters = {} filters = {}
return self.network.find_firewall_policy(name_or_id, **filters) return self.network.find_firewall_policy(
name_or_id,
ignore_missing=True,
**filters)
def list_firewall_policies(self, filters=None): def list_firewall_policies(self, filters=None):
""" """
@ -1093,7 +1084,10 @@ class NetworkCloudMixin(_normalize.Normalizer):
""" """
if not filters: if not filters:
filters = {} filters = {}
return self.network.find_firewall_group(name_or_id, **filters) return self.network.find_firewall_group(
name_or_id,
ignore_missing=True,
**filters)
def list_firewall_groups(self, filters=None): def list_firewall_groups(self, filters=None):
""" """
@ -1230,6 +1224,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
if not curr_policy: if not curr_policy:
raise exc.OpenStackCloudException( raise exc.OpenStackCloudException(
"QoS policy %s not found." % name_or_id) "QoS policy %s not found." % name_or_id)
return self.network.update_qos_policy(curr_policy, **kwargs) return self.network.update_qos_policy(curr_policy, **kwargs)
def delete_qos_policy(self, name_or_id): def delete_qos_policy(self, name_or_id):
@ -1248,6 +1243,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
if not policy: if not policy:
self.log.debug("QoS policy %s not found for deleting", name_or_id) self.log.debug("QoS policy %s not found for deleting", name_or_id)
return False return False
self.network.delete_qos_policy(policy) self.network.delete_qos_policy(policy)
return True return True
@ -1358,6 +1354,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
"target cloud") "target cloud")
kwargs['max_kbps'] = max_kbps kwargs['max_kbps'] = max_kbps
return self.network.create_qos_bandwidth_limit_rule(policy, **kwargs) return self.network.create_qos_bandwidth_limit_rule(policy, **kwargs)
@_utils.valid_kwargs("max_kbps", "max_burst_kbps", "direction") @_utils.valid_kwargs("max_kbps", "max_burst_kbps", "direction")
@ -1703,6 +1700,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
name_or_id=policy_name_or_id)) name_or_id=policy_name_or_id))
kwargs['min_kbps'] = min_kbps kwargs['min_kbps'] = min_kbps
return self.network.create_qos_minimum_bandwidth_rule(policy, **kwargs) return self.network.create_qos_minimum_bandwidth_rule(policy, **kwargs)
@_utils.valid_kwargs("min_kbps", "direction") @_utils.valid_kwargs("min_kbps", "direction")
@ -1793,19 +1791,11 @@ class NetworkCloudMixin(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error. :raises: OpenStackCloudException on operation error.
""" """
json_body = {} return self.network.add_interface_to_router(
if subnet_id: router=router,
json_body['subnet_id'] = subnet_id subnet_id=subnet_id,
if port_id: port_id=port_id
json_body['port_id'] = port_id )
return proxy._json_response(
self.network.put(
"/routers/{router_id}/add_router_interface".format(
router_id=router['id']),
json=json_body),
error_message="Error attaching interface to router {0}".format(
router['id']))
def remove_router_interface(self, router, subnet_id=None, port_id=None): def remove_router_interface(self, router, subnet_id=None, port_id=None):
"""Detach a subnet from an internal router interface. """Detach a subnet from an internal router interface.
@ -1824,23 +1814,15 @@ class NetworkCloudMixin(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error. :raises: OpenStackCloudException on operation error.
""" """
json_body = {} if not subnet_id and not port_id:
if subnet_id:
json_body['subnet_id'] = subnet_id
if port_id:
json_body['port_id'] = port_id
if not json_body:
raise ValueError( raise ValueError(
"At least one of subnet_id or port_id must be supplied.") "At least one of subnet_id or port_id must be supplied.")
exceptions.raise_from_response( self.network.remove_interface_from_router(
self.network.put( router=router,
"/routers/{router_id}/remove_router_interface".format( subnet_id=subnet_id,
router_id=router['id']), port_id=port_id
json=json_body), )
error_message="Error detaching interface from router {0}".format(
router['id']))
def list_router_interfaces(self, router, interface_type=None): def list_router_interfaces(self, router, interface_type=None):
"""List all interfaces for a router. """List all interfaces for a router.
@ -1923,10 +1905,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
'target cloud') 'target cloud')
router['availability_zone_hints'] = availability_zone_hints router['availability_zone_hints'] = availability_zone_hints
data = proxy._json_response( return self.network.create_router(**router)
self.network.post("/routers", json={"router": router}),
error_message="Error creating router {0}".format(name))
return self._get_and_munchify('router', data)
def update_router(self, name_or_id, name=None, admin_state_up=None, def update_router(self, name_or_id, name=None, admin_state_up=None,
ext_gateway_net_id=None, enable_snat=None, ext_gateway_net_id=None, enable_snat=None,
@ -1991,13 +1970,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudException( raise exc.OpenStackCloudException(
"Router %s not found." % name_or_id) "Router %s not found." % name_or_id)
resp = self.network.put( return self.network.update_router(curr_router, **router)
"/routers/{router_id}".format(router_id=curr_router['id']),
json={"router": router})
data = proxy._json_response(
resp,
error_message="Error updating router {0}".format(name_or_id))
return self._get_and_munchify('router', data)
def delete_router(self, name_or_id): def delete_router(self, name_or_id):
"""Delete a logical router. """Delete a logical router.
@ -2012,14 +1985,12 @@ class NetworkCloudMixin(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error. :raises: OpenStackCloudException on operation error.
""" """
router = self.get_router(name_or_id) router = self.network.find_router(name_or_id, ignore_missing=True)
if not router: if not router:
self.log.debug("Router %s not found for deleting", name_or_id) self.log.debug("Router %s not found for deleting", name_or_id)
return False return False
exceptions.raise_from_response(self.network.delete( self.network.delete_router(router)
"/routers/{router_id}".format(router_id=router['id']),
error_message="Error deleting router {0}".format(name_or_id)))
return True return True
@ -2168,9 +2139,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
if use_default_subnetpool: if use_default_subnetpool:
subnet['use_default_subnetpool'] = True subnet['use_default_subnetpool'] = True
response = self.network.post("/subnets", json={"subnet": subnet}) return self.network.create_subnet(**subnet)
return self._get_and_munchify('subnet', response)
def delete_subnet(self, name_or_id): def delete_subnet(self, name_or_id):
"""Delete a subnet. """Delete a subnet.
@ -2185,13 +2154,13 @@ class NetworkCloudMixin(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error. :raises: OpenStackCloudException on operation error.
""" """
subnet = self.get_subnet(name_or_id) subnet = self.network.find_subnet(name_or_id, ignore_missing=True)
if not subnet: if not subnet:
self.log.debug("Subnet %s not found for deleting", name_or_id) self.log.debug("Subnet %s not found for deleting", name_or_id)
return False return False
exceptions.raise_from_response(self.network.delete( self.network.delete_subnet(subnet)
"/subnets/{subnet_id}".format(subnet_id=subnet['id'])))
return True return True
def update_subnet(self, name_or_id, subnet_name=None, enable_dhcp=None, def update_subnet(self, name_or_id, subnet_name=None, enable_dhcp=None,
@ -2276,10 +2245,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudException( raise exc.OpenStackCloudException(
"Subnet %s not found." % name_or_id) "Subnet %s not found." % name_or_id)
response = self.network.put( return self.network.update_subnet(curr_subnet, **subnet)
"/subnets/{subnet_id}".format(subnet_id=curr_subnet['id']),
json={"subnet": subnet})
return self._get_and_munchify('subnet', response)
@_utils.valid_kwargs('name', 'admin_state_up', 'mac_address', 'fixed_ips', @_utils.valid_kwargs('name', 'admin_state_up', 'mac_address', 'fixed_ips',
'subnet_id', 'ip_address', 'security_groups', 'subnet_id', 'ip_address', 'security_groups',
@ -2346,11 +2312,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
""" """
kwargs['network_id'] = network_id kwargs['network_id'] = network_id
data = proxy._json_response( return self.network.create_port(**kwargs)
self.network.post("/ports", json={'port': kwargs}),
error_message="Error creating port for network {0}".format(
network_id))
return self._get_and_munchify('port', data)
@_utils.valid_kwargs('name', 'admin_state_up', 'fixed_ips', @_utils.valid_kwargs('name', 'admin_state_up', 'fixed_ips',
'security_groups', 'allowed_address_pairs', 'security_groups', 'allowed_address_pairs',
@ -2417,12 +2379,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudException( raise exc.OpenStackCloudException(
"failed to find port '{port}'".format(port=name_or_id)) "failed to find port '{port}'".format(port=name_or_id))
data = proxy._json_response( return self.network.update_port(port, **kwargs)
self.network.put(
"/ports/{port_id}".format(port_id=port['id']),
json={"port": kwargs}),
error_message="Error updating port {0}".format(name_or_id))
return self._get_and_munchify('port', data)
def delete_port(self, name_or_id): def delete_port(self, name_or_id):
"""Delete a port """Delete a port
@ -2433,15 +2390,14 @@ class NetworkCloudMixin(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error. :raises: OpenStackCloudException on operation error.
""" """
port = self.get_port(name_or_id=name_or_id) port = self.network.find_port(name_or_id)
if port is None: if port is None:
self.log.debug("Port %s not found for deleting", name_or_id) self.log.debug("Port %s not found for deleting", name_or_id)
return False return False
exceptions.raise_from_response( self.network.delete_port(port)
self.network.delete(
"/ports/{port_id}".format(port_id=port['id'])),
error_message="Error deleting port {0}".format(name_or_id))
return True return True
def _get_port_ids(self, name_or_id_list, filters=None): def _get_port_ids(self, name_or_id_list, filters=None):

View File

@ -94,9 +94,8 @@ class NetworkCommonCloudMixin(_normalize.Normalizer):
if (network['name'] in self._external_ipv4_names if (network['name'] in self._external_ipv4_names
or network['id'] in self._external_ipv4_names): or network['id'] in self._external_ipv4_names):
external_ipv4_networks.append(network) external_ipv4_networks.append(network)
elif ((('router:external' in network elif ((network.is_router_external
and network['router:external']) or network.provider_physical_network)
or network.get('provider:physical_network'))
and network['name'] not in self._internal_ipv4_names and network['name'] not in self._internal_ipv4_names
and network['id'] not in self._internal_ipv4_names): and network['id'] not in self._internal_ipv4_names):
external_ipv4_networks.append(network) external_ipv4_networks.append(network)
@ -105,8 +104,8 @@ class NetworkCommonCloudMixin(_normalize.Normalizer):
if (network['name'] in self._internal_ipv4_names if (network['name'] in self._internal_ipv4_names
or network['id'] in self._internal_ipv4_names): or network['id'] in self._internal_ipv4_names):
internal_ipv4_networks.append(network) internal_ipv4_networks.append(network)
elif (not network.get('router:external', False) elif (not network.is_router_external
and not network.get('provider:physical_network') and not network.provider_physical_network
and network['name'] not in self._external_ipv4_names and network['name'] not in self._external_ipv4_names
and network['id'] not in self._external_ipv4_names): and network['id'] not in self._external_ipv4_names):
internal_ipv4_networks.append(network) internal_ipv4_networks.append(network)
@ -115,7 +114,7 @@ class NetworkCommonCloudMixin(_normalize.Normalizer):
if (network['name'] in self._external_ipv6_names if (network['name'] in self._external_ipv6_names
or network['id'] in self._external_ipv6_names): or network['id'] in self._external_ipv6_names):
external_ipv6_networks.append(network) external_ipv6_networks.append(network)
elif (network.get('router:external') elif (network.is_router_external
and network['name'] not in self._internal_ipv6_names and network['name'] not in self._internal_ipv6_names
and network['id'] not in self._internal_ipv6_names): and network['id'] not in self._internal_ipv6_names):
external_ipv6_networks.append(network) external_ipv6_networks.append(network)
@ -124,7 +123,7 @@ class NetworkCommonCloudMixin(_normalize.Normalizer):
if (network['name'] in self._internal_ipv6_names if (network['name'] in self._internal_ipv6_names
or network['id'] in self._internal_ipv6_names): or network['id'] in self._internal_ipv6_names):
internal_ipv6_networks.append(network) internal_ipv6_networks.append(network)
elif (not network.get('router:external', False) elif (not network.is_router_external
and network['name'] not in self._external_ipv6_names and network['name'] not in self._external_ipv6_names
and network['id'] not in self._external_ipv6_names): and network['id'] not in self._external_ipv6_names):
internal_ipv6_networks.append(network) internal_ipv6_networks.append(network)
@ -144,7 +143,7 @@ class NetworkCommonCloudMixin(_normalize.Normalizer):
external_ipv4_floating_networks.append(network) external_ipv4_floating_networks.append(network)
nat_source = network nat_source = network
elif self._nat_source is None: elif self._nat_source is None:
if network.get('router:external'): if network.is_router_external:
external_ipv4_floating_networks.append(network) external_ipv4_floating_networks.append(network)
nat_source = nat_source or network nat_source = nat_source or network

View File

@ -610,11 +610,20 @@ class Normalizer:
] ]
def _normalize_floating_ip(self, ip): def _normalize_floating_ip(self, ip):
ret = munch.Munch()
# Copy incoming floating ip because of shared dicts in unittests # Copy incoming floating ip because of shared dicts in unittests
if isinstance(ip, resource.Resource):
ip = ip.to_dict(ignore_none=True, original_names=True)
location = ip.pop(
'location',
self._get_current_location(project_id=ip.get('owner')))
else:
location = self._get_current_location(
project_id=ip.get('owner'))
# This copy is to keep things from getting epically weird in tests
ip = ip.copy() ip = ip.copy()
ret = munch.Munch(location=location)
fixed_ip_address = ip.pop('fixed_ip_address', ip.pop('fixed_ip', None)) fixed_ip_address = ip.pop('fixed_ip_address', ip.pop('fixed_ip', None))
floating_ip_address = ip.pop('floating_ip_address', ip.pop('ip', None)) floating_ip_address = ip.pop('floating_ip_address', ip.pop('ip', None))
network_id = ip.pop( network_id = ip.pop(

View File

@ -66,21 +66,28 @@ class TestNetworkQuotas(base.BaseFunctionalTest):
def test_quotas(self): def test_quotas(self):
'''Test quotas functionality''' '''Test quotas functionality'''
quotas = self.operator_cloud.get_network_quotas('demo') quotas = self.operator_cloud.get_network_quotas('demo')
network = quotas['network'] network = quotas['networks']
self.operator_cloud.set_network_quotas('demo', network=network + 1) self.operator_cloud.set_network_quotas('demo', networks=network + 1)
self.assertEqual( self.assertEqual(
network + 1, network + 1,
self.operator_cloud.get_network_quotas('demo')['network']) self.operator_cloud.get_network_quotas('demo')['networks'])
self.operator_cloud.delete_network_quotas('demo') self.operator_cloud.delete_network_quotas('demo')
self.assertEqual( self.assertEqual(
network, network,
self.operator_cloud.get_network_quotas('demo')['network']) self.operator_cloud.get_network_quotas('demo')['networks'])
def test_get_quotas_details(self): def test_get_quotas_details(self):
quotas = [
'floating_ips', 'networks', 'ports',
'rbac_policies', 'routers', 'subnets',
'subnet_pools', 'security_group_rules',
'security_groups']
expected_keys = ['limit', 'used', 'reserved'] expected_keys = ['limit', 'used', 'reserved']
'''Test getting details about quota usage''' '''Test getting details about quota usage'''
quota_details = self.operator_cloud.get_network_quotas( quota_details = self.operator_cloud.get_network_quotas(
'demo', details=True) 'demo', details=True)
for quota_values in quota_details.values(): for quota in quotas:
quota_val = quota_details[quota]
if quota_val:
for expected_key in expected_keys: for expected_key in expected_keys:
self.assertTrue(expected_key in quota_values.keys()) self.assertTrue(expected_key in quota_val)

View File

@ -24,8 +24,8 @@ from openstack.tests.functional import base
EXPECTED_TOPLEVEL_FIELDS = ( EXPECTED_TOPLEVEL_FIELDS = (
'id', 'name', 'admin_state_up', 'external_gateway_info', 'id', 'name', 'is_admin_state_up', 'external_gateway_info',
'tenant_id', 'routes', 'status' 'project_id', 'routes', 'status'
) )
EXPECTED_GW_INFO_FIELDS = ('network_id', 'enable_snat', 'external_fixed_ips') EXPECTED_GW_INFO_FIELDS = ('network_id', 'enable_snat', 'external_fixed_ips')

View File

@ -23,6 +23,7 @@ from testscenarios import load_tests_apply_scenarios as load_tests # noqa
from openstack.cloud import exc from openstack.cloud import exc
from openstack import exceptions from openstack import exceptions
from openstack.network.v2 import port as _port
from openstack.tests import fakes from openstack.tests import fakes
from openstack.tests.unit import base from openstack.tests.unit import base
@ -1657,8 +1658,9 @@ class TestBaremetalNode(base.IronicTestCase):
uri=self.get_mock_url( uri=self.get_mock_url(
service_type='network', service_type='network',
resource='ports', resource='ports',
base_url_append='v2.0'), base_url_append='v2.0',
json={'ports': [{'id': vif_id}]}), append=[vif_id]),
json={'id': vif_id}),
dict( dict(
method='POST', method='POST',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -1683,8 +1685,9 @@ class TestBaremetalNode(base.IronicTestCase):
uri=self.get_mock_url( uri=self.get_mock_url(
service_type='network', service_type='network',
resource='ports', resource='ports',
base_url_append='v2.0'), base_url_append='v2.0',
json={'ports': [{'id': vif_id}]}), append=[vif_id]),
json={'id': vif_id}),
dict( dict(
method='DELETE', method='DELETE',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -1717,13 +1720,16 @@ class TestBaremetalNode(base.IronicTestCase):
uri=self.get_mock_url( uri=self.get_mock_url(
service_type='network', service_type='network',
resource='ports', resource='ports',
base_url_append='v2.0'), base_url_append='v2.0',
json={'ports': [fake_port]}), append=[vif_id]),
json=fake_port),
]) ])
res = self.cloud.list_ports_attached_to_machine( res = self.cloud.list_ports_attached_to_machine(
self.fake_baremetal_node['uuid']) self.fake_baremetal_node['uuid'])
self.assert_calls() self.assert_calls()
self.assertEqual([fake_port], res) self.assertEqual(
[_port.Port(**fake_port).to_dict(computed=False)],
[i.to_dict(computed=False) for i in res])
class TestUpdateMachinePatch(base.IronicTestCase): class TestUpdateMachinePatch(base.IronicTestCase):

View File

@ -19,6 +19,7 @@ import openstack
import openstack.cloud import openstack.cloud
from openstack.cloud import meta from openstack.cloud import meta
from openstack.compute.v2 import flavor as _flavor from openstack.compute.v2 import flavor as _flavor
from openstack.network.v2 import port as _port
from openstack import exceptions from openstack import exceptions
from openstack.tests import fakes from openstack.tests import fakes
from openstack.tests.unit import base from openstack.tests.unit import base
@ -565,7 +566,10 @@ class TestMemoryCache(base.TestCase):
]}), ]}),
]) ])
ports = self.cloud.list_ports(filters={'status': 'DOWN'}) ports = self.cloud.list_ports(filters={'status': 'DOWN'})
self.assertCountEqual([down_port], ports) for a, b in zip([down_port], ports):
self.assertDictEqual(
_port.Port(**a).to_dict(computed=False),
b.to_dict(computed=False))
self.assert_calls() self.assert_calls()

View File

@ -555,7 +555,14 @@ class TestCreateServer(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', 'network-name']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=network-name']),
json={'networks': [network]}), json={'networks': [network]}),
dict(method='POST', dict(method='POST',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -600,7 +607,14 @@ class TestCreateServer(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', 'network-name']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=network-name']),
json={'networks': [network]}), json={'networks': [network]}),
dict(method='POST', dict(method='POST',
uri=self.get_mock_url( uri=self.get_mock_url(

View File

@ -180,10 +180,10 @@ class TestFloatingIP(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=('https://network.example.com/v2.0/floatingips?' uri=('https://network.example.com/v2.0/floatingips?'
'Foo=42'), 'description=42'),
json={'floatingips': []})]) json={'floatingips': []})])
self.cloud.list_floating_ips(filters={'Foo': 42}) self.cloud.list_floating_ips(filters={'description': 42})
self.assert_calls() self.assert_calls()
@ -260,7 +260,11 @@ class TestFloatingIP(base.TestCase):
def test_create_floating_ip(self): def test_create_floating_ip(self):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri='https://network.example.com/v2.0/networks', uri='https://network.example.com/v2.0/networks/my-network',
status_code=404),
dict(method='GET',
uri='https://network.example.com/v2.0/networks'
'?name=my-network',
json={'networks': [self.mock_get_network_rep]}), json={'networks': [self.mock_get_network_rep]}),
dict(method='POST', dict(method='POST',
uri='https://network.example.com/v2.0/floatingips', uri='https://network.example.com/v2.0/floatingips',
@ -279,8 +283,8 @@ class TestFloatingIP(base.TestCase):
def test_create_floating_ip_port_bad_response(self): def test_create_floating_ip_port_bad_response(self):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri='https://network.example.com/v2.0/networks', uri='https://network.example.com/v2.0/networks/my-network',
json={'networks': [self.mock_get_network_rep]}), json=self.mock_get_network_rep),
dict(method='POST', dict(method='POST',
uri='https://network.example.com/v2.0/floatingips', uri='https://network.example.com/v2.0/floatingips',
json=self.mock_floating_ip_new_rep, json=self.mock_floating_ip_new_rep,
@ -300,7 +304,11 @@ class TestFloatingIP(base.TestCase):
def test_create_floating_ip_port(self): def test_create_floating_ip_port(self):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri='https://network.example.com/v2.0/networks', uri='https://network.example.com/v2.0/networks/my-network',
status_code=404),
dict(method='GET',
uri='https://network.example.com/v2.0/networks'
'?name=my-network',
json={'networks': [self.mock_get_network_rep]}), json={'networks': [self.mock_get_network_rep]}),
dict(method='POST', dict(method='POST',
uri='https://network.example.com/v2.0/floatingips', uri='https://network.example.com/v2.0/floatingips',
@ -395,7 +403,10 @@ class TestFloatingIP(base.TestCase):
# payloads taken from citycloud # payloads taken from citycloud
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri='https://network.example.com/v2.0/networks', uri='https://network.example.com/v2.0/networks/ext-net',
status_code=404),
dict(method='GET',
uri='https://network.example.com/v2.0/networks?name=ext-net',
json={"networks": [{ json={"networks": [{
"status": "ACTIVE", "status": "ACTIVE",
"subnets": [ "subnets": [
@ -416,24 +427,6 @@ class TestFloatingIP(base.TestCase):
"shared": False, "shared": False,
"id": "0232c17f-2096-49bc-b205-d3dcd9a30ebf", "id": "0232c17f-2096-49bc-b205-d3dcd9a30ebf",
"description": None "description": None
}, {
"status": "ACTIVE",
"subnets": ["f0ad1df5-53ee-473f-b86b-3604ea5591e9"],
"availability_zone_hints": [],
"availability_zones": ["nova"],
"name": "private",
"admin_state_up": True,
"tenant_id": "65222a4d09ea4c68934fa1028c77f394",
"created_at": "2016-10-22T13:46:26",
"tags": [],
"updated_at": "2016-10-22T13:46:26",
"ipv6_address_scope": None,
"router:external": False,
"ipv4_address_scope": None,
"shared": False,
"mtu": 1450,
"id": "2c9adcb5-c123-4c5a-a2ba-1ad4c4e1481f",
"description": ""
}]}), }]}),
dict(method='GET', dict(method='GET',
uri='https://network.example.com/v2.0/ports' uri='https://network.example.com/v2.0/ports'

View File

@ -881,8 +881,15 @@ class TestFirewallGroup(FirewallTestCase):
json={'firewall_policies': [self.mock_ingress_policy]}), json={'firewall_policies': [self.mock_ingress_policy]}),
dict(method='GET', dict(method='GET',
uri=self.get_mock_url('network', 'public', uri=self.get_mock_url(
append=['v2.0', 'ports']), 'network', 'public',
append=['v2.0', 'ports', self.mock_port['name']]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'ports'],
qs_elements=['name=%s' % self.mock_port['name']]),
json={'ports': [self.mock_port]}), json={'ports': [self.mock_port]}),
dict(method='POST', dict(method='POST',
uri=self._make_mock_url('firewall_groups'), uri=self._make_mock_url('firewall_groups'),
@ -1078,8 +1085,15 @@ class TestFirewallGroup(FirewallTestCase):
deepcopy(self.mock_ingress_policy)]}), deepcopy(self.mock_ingress_policy)]}),
dict(method='GET', dict(method='GET',
uri=self.get_mock_url('network', 'public', uri=self.get_mock_url(
append=['v2.0', 'ports']), 'network', 'public',
append=['v2.0', 'ports', self.mock_port['name']]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'ports'],
qs_elements=['name=%s' % self.mock_port['name']]),
json={'ports': [self.mock_port]}), json={'ports': [self.mock_port]}),
dict(method='PUT', dict(method='PUT',
uri=self._make_mock_url('firewall_groups', uri=self._make_mock_url('firewall_groups',

View File

@ -15,6 +15,7 @@ import testtools
import openstack import openstack
import openstack.cloud import openstack.cloud
from openstack.network.v2 import network as _network
from openstack.tests.unit import base from openstack.tests.unit import base
@ -43,10 +44,11 @@ class TestNetwork(base.TestCase):
'qos_policy_id': None, 'qos_policy_id': None,
'name': 'netname', 'name': 'netname',
'admin_state_up': True, 'admin_state_up': True,
'tenant_id': '861808a93da0484ea1767967c4df8a23',
'created_at': '2017-04-22T19:22:53Z', 'created_at': '2017-04-22T19:22:53Z',
'mtu': 0, 'mtu': 0,
'dns_domain': 'sample.openstack.org.' 'dns_domain': 'sample.openstack.org.',
'vlan_transparent': None,
'segments': None,
} }
network_availability_zone_extension = { network_availability_zone_extension = {
@ -59,6 +61,11 @@ class TestNetwork(base.TestCase):
enabled_neutron_extensions = [network_availability_zone_extension] enabled_neutron_extensions = [network_availability_zone_extension]
def _compare_networks(self, exp, real):
self.assertDictEqual(
_network.Network(**exp).to_dict(computed=False),
real.to_dict(computed=False))
def test_list_networks(self): def test_list_networks(self):
net1 = {'id': '1', 'name': 'net1'} net1 = {'id': '1', 'name': 'net1'}
net2 = {'id': '2', 'name': 'net2'} net2 = {'id': '2', 'name': 'net2'}
@ -69,7 +76,10 @@ class TestNetwork(base.TestCase):
json={'networks': [net1, net2]}) json={'networks': [net1, net2]})
]) ])
nets = self.cloud.list_networks() nets = self.cloud.list_networks()
self.assertEqual([net1, net2], nets) self.assertEqual(
[_network.Network(**i).to_dict(computed=False) for i in [
net1, net2]],
[i.to_dict(computed=False) for i in nets])
self.assert_calls() self.assert_calls()
def test_list_networks_filtered(self): def test_list_networks_filtered(self):
@ -95,7 +105,8 @@ class TestNetwork(base.TestCase):
'name': 'netname'}})) 'name': 'netname'}}))
]) ])
network = self.cloud.create_network("netname") network = self.cloud.create_network("netname")
self.assertEqual(self.mock_new_network_rep, network) self._compare_networks(
self.mock_new_network_rep, network)
self.assert_calls() self.assert_calls()
def test_create_network_specific_tenant(self): def test_create_network_specific_tenant(self):
@ -111,10 +122,10 @@ class TestNetwork(base.TestCase):
json={'network': { json={'network': {
'admin_state_up': True, 'admin_state_up': True,
'name': 'netname', 'name': 'netname',
'tenant_id': project_id}})) 'project_id': project_id}}))
]) ])
network = self.cloud.create_network("netname", project_id=project_id) network = self.cloud.create_network("netname", project_id=project_id)
self.assertEqual(mock_new_network_rep, network) self._compare_networks(mock_new_network_rep, network)
self.assert_calls() self.assert_calls()
def test_create_network_external(self): def test_create_network_external(self):
@ -132,7 +143,7 @@ class TestNetwork(base.TestCase):
'router:external': True}})) 'router:external': True}}))
]) ])
network = self.cloud.create_network("netname", external=True) network = self.cloud.create_network("netname", external=True)
self.assertEqual(mock_new_network_rep, network) self._compare_networks(mock_new_network_rep, network)
self.assert_calls() self.assert_calls()
def test_create_network_provider(self): def test_create_network_provider(self):
@ -160,7 +171,7 @@ class TestNetwork(base.TestCase):
json={'network': expected_send_params})) json={'network': expected_send_params}))
]) ])
network = self.cloud.create_network("netname", provider=provider_opts) network = self.cloud.create_network("netname", provider=provider_opts)
self.assertEqual(mock_new_network_rep, network) self._compare_networks(mock_new_network_rep, network)
self.assert_calls() self.assert_calls()
def test_create_network_with_availability_zone_hints(self): def test_create_network_with_availability_zone_hints(self):
@ -181,7 +192,7 @@ class TestNetwork(base.TestCase):
]) ])
network = self.cloud.create_network("netname", network = self.cloud.create_network("netname",
availability_zone_hints=['nova']) availability_zone_hints=['nova'])
self.assertEqual(self.mock_new_network_rep, network) self._compare_networks(self.mock_new_network_rep, network)
self.assert_calls() self.assert_calls()
def test_create_network_provider_ignored_value(self): def test_create_network_provider_ignored_value(self):
@ -210,7 +221,7 @@ class TestNetwork(base.TestCase):
json={'network': expected_send_params})) json={'network': expected_send_params}))
]) ])
network = self.cloud.create_network("netname", provider=provider_opts) network = self.cloud.create_network("netname", provider=provider_opts)
self.assertEqual(mock_new_network_rep, network) self._compare_networks(mock_new_network_rep, network)
self.assert_calls() self.assert_calls()
def test_create_network_wrong_availability_zone_hints_type(self): def test_create_network_wrong_availability_zone_hints_type(self):
@ -249,7 +260,7 @@ class TestNetwork(base.TestCase):
"netname", "netname",
port_security_enabled=port_security_state port_security_enabled=port_security_state
) )
self.assertEqual(mock_new_network_rep, network) self._compare_networks(mock_new_network_rep, network)
self.assert_calls() self.assert_calls()
def test_create_network_with_mtu(self): def test_create_network_with_mtu(self):
@ -270,7 +281,7 @@ class TestNetwork(base.TestCase):
network = self.cloud.create_network("netname", network = self.cloud.create_network("netname",
mtu_size=mtu_size mtu_size=mtu_size
) )
self.assertEqual(mock_new_network_rep, network) self._compare_networks(mock_new_network_rep, network)
self.assert_calls() self.assert_calls()
def test_create_network_with_wrong_mtu_size(self): def test_create_network_with_wrong_mtu_size(self):
@ -294,7 +305,13 @@ class TestNetwork(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', network_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks'],
qs_elements=['name=%s' % network_name]),
json={'networks': [network]}), json={'networks': [network]}),
dict(method='DELETE', dict(method='DELETE',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -309,7 +326,13 @@ class TestNetwork(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', 'test-net']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks'],
qs_elements=['name=test-net']),
json={'networks': []}), json={'networks': []}),
]) ])
self.assertFalse(self.cloud.delete_network('test-net')) self.assertFalse(self.cloud.delete_network('test-net'))
@ -322,7 +345,13 @@ class TestNetwork(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', network_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks'],
qs_elements=['name=%s' % network_name]),
json={'networks': [network]}), json={'networks': [network]}),
dict(method='DELETE', dict(method='DELETE',
uri=self.get_mock_url( uri=self.get_mock_url(

View File

@ -20,6 +20,7 @@ Test port resource (managed by neutron)
""" """
from openstack.cloud.exc import OpenStackCloudException from openstack.cloud.exc import OpenStackCloudException
from openstack.network.v2 import port as _port
from openstack.tests.unit import base from openstack.tests.unit import base
@ -139,6 +140,11 @@ class TestPort(base.TestCase):
] ]
} }
def _compare_ports(self, exp, real):
self.assertDictEqual(
_port.Port(**exp).to_dict(computed=False),
real.to_dict(computed=False))
def test_create_port(self): def test_create_port(self):
self.register_uris([ self.register_uris([
dict(method="POST", dict(method="POST",
@ -154,7 +160,7 @@ class TestPort(base.TestCase):
port = self.cloud.create_port( port = self.cloud.create_port(
network_id='test-net-id', name='test-port-name', network_id='test-net-id', name='test-port-name',
admin_state_up=True) admin_state_up=True)
self.assertEqual(self.mock_neutron_port_create_rep['port'], port) self._compare_ports(self.mock_neutron_port_create_rep['port'], port)
self.assert_calls() self.assert_calls()
def test_create_port_parameters(self): def test_create_port_parameters(self):
@ -187,7 +193,7 @@ class TestPort(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports']), 'network', 'public', append=['v2.0', 'ports', port_id]),
json=self.mock_neutron_port_list_rep), json=self.mock_neutron_port_list_rep),
dict(method='PUT', dict(method='PUT',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -200,7 +206,7 @@ class TestPort(base.TestCase):
port = self.cloud.update_port( port = self.cloud.update_port(
name_or_id=port_id, name='test-port-name-updated') name_or_id=port_id, name='test-port-name-updated')
self.assertEqual(self.mock_neutron_port_update_rep['port'], port) self._compare_ports(self.mock_neutron_port_update_rep['port'], port)
self.assert_calls() self.assert_calls()
def test_update_port_parameters(self): def test_update_port_parameters(self):
@ -214,7 +220,7 @@ class TestPort(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports']), 'network', 'public', append=['v2.0', 'ports', port_id]),
json=self.mock_neutron_port_list_rep), json=self.mock_neutron_port_list_rep),
dict(method='PUT', dict(method='PUT',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -238,7 +244,8 @@ class TestPort(base.TestCase):
json=self.mock_neutron_port_list_rep) json=self.mock_neutron_port_list_rep)
]) ])
ports = self.cloud.list_ports() ports = self.cloud.list_ports()
self.assertCountEqual(self.mock_neutron_port_list_rep['ports'], ports) for a, b in zip(self.mock_neutron_port_list_rep['ports'], ports):
self._compare_ports(a, b)
self.assert_calls() self.assert_calls()
def test_list_ports_filtered(self): def test_list_ports_filtered(self):
@ -250,7 +257,8 @@ class TestPort(base.TestCase):
json=self.mock_neutron_port_list_rep) json=self.mock_neutron_port_list_rep)
]) ])
ports = self.cloud.list_ports(filters={'status': 'DOWN'}) ports = self.cloud.list_ports(filters={'status': 'DOWN'})
self.assertCountEqual(self.mock_neutron_port_list_rep['ports'], ports) for a, b in zip(self.mock_neutron_port_list_rep['ports'], ports):
self._compare_ports(a, b)
self.assert_calls() self.assert_calls()
def test_list_ports_exception(self): def test_list_ports_exception(self):
@ -306,7 +314,13 @@ class TestPort(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports']), 'network', 'public',
append=['v2.0', 'ports', 'first-port']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports'],
qs_elements=['name=first-port']),
json=self.mock_neutron_port_list_rep), json=self.mock_neutron_port_list_rep),
dict(method='DELETE', dict(method='DELETE',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -321,8 +335,14 @@ class TestPort(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports']), 'network', 'public', append=['v2.0', 'ports',
json=self.mock_neutron_port_list_rep) 'non-existent']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports'],
qs_elements=['name=non-existent']),
json={'ports': []})
]) ])
self.assertFalse(self.cloud.delete_port(name_or_id='non-existent')) self.assertFalse(self.cloud.delete_port(name_or_id='non-existent'))
self.assert_calls() self.assert_calls()
@ -334,7 +354,12 @@ class TestPort(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports']), 'network', 'public', append=['v2.0', 'ports', port_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports'],
qs_elements=['name=%s' % port_name]),
json={'ports': [port1, port2]}) json={'ports': [port1, port2]})
]) ])
self.assertRaises(OpenStackCloudException, self.assertRaises(OpenStackCloudException,
@ -348,7 +373,8 @@ class TestPort(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'ports']), 'network', 'public',
append=['v2.0', 'ports', port1['id']]),
json={'ports': [port1, port2]}), json={'ports': [port1, port2]}),
dict(method='DELETE', dict(method='DELETE',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -371,5 +397,5 @@ class TestPort(base.TestCase):
]) ])
r = self.cloud.get_port_by_id(fake_port['id']) r = self.cloud.get_port_by_id(fake_port['id'])
self.assertIsNotNone(r) self.assertIsNotNone(r)
self.assertDictEqual(fake_port, r) self._compare_ports(fake_port, r)
self.assert_calls() self.assert_calls()

View File

@ -11,6 +11,7 @@
# under the License. # under the License.
from openstack.cloud import exc from openstack.cloud import exc
from openstack.network.v2 import quota as _quota
from openstack.tests.unit import base from openstack.tests.unit import base
fake_quota_set = { fake_quota_set = {
@ -183,8 +184,16 @@ class TestQuotas(base.TestCase):
append=['v2.0', 'quotas', project.project_id]), append=['v2.0', 'quotas', project.project_id]),
json={'quota': quota}) json={'quota': quota})
]) ])
received_quota = self.cloud.get_network_quotas(project.project_id) received_quota = self.cloud.get_network_quotas(
self.assertDictEqual(quota, received_quota) project.project_id).to_dict(computed=False)
expected_quota = _quota.Quota(**quota).to_dict(computed=False)
received_quota.pop('id')
received_quota.pop('name')
expected_quota.pop('id')
expected_quota.pop('name')
self.assertDictEqual(expected_quota, received_quota)
self.assert_calls() self.assert_calls()
def test_neutron_get_quotas_details(self): def test_neutron_get_quotas_details(self):
@ -233,12 +242,14 @@ class TestQuotas(base.TestCase):
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', 'network', 'public',
append=['v2.0', 'quotas', append=['v2.0', 'quotas',
'%s/details' % project.project_id]), project.project_id, 'details']),
json={'quota': quota_details}) json={'quota': quota_details})
]) ])
received_quota_details = self.cloud.get_network_quotas( received_quota_details = self.cloud.get_network_quotas(
project.project_id, details=True) project.project_id, details=True)
self.assertDictEqual(quota_details, received_quota_details) self.assertDictEqual(
_quota.QuotaDetails(**quota_details).to_dict(computed=False),
received_quota_details.to_dict(computed=False))
self.assert_calls() self.assert_calls()
def test_neutron_delete_quotas(self): def test_neutron_delete_quotas(self):

View File

@ -17,6 +17,8 @@ import copy
import testtools import testtools
from openstack.cloud import exc from openstack.cloud import exc
from openstack.network.v2 import router as _router
from openstack.network.v2 import port as _port
from openstack.tests.unit import base from openstack.tests.unit import base
@ -78,23 +80,40 @@ class TestRouter(base.TestCase):
router_availability_zone_extension, router_availability_zone_extension,
router_extraroute_extension] router_extraroute_extension]
def _compare_routers(self, exp, real):
self.assertDictEqual(
_router.Router(**exp).to_dict(computed=False),
real.to_dict(computed=False))
def test_get_router(self): def test_get_router(self):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers']), 'network', 'public',
append=['v2.0', 'routers', self.router_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers'],
qs_elements=['name=%s' % self.router_name]),
json={'routers': [self.mock_router_rep]}) json={'routers': [self.mock_router_rep]})
]) ])
r = self.cloud.get_router(self.router_name) r = self.cloud.get_router(self.router_name)
self.assertIsNotNone(r) self.assertIsNotNone(r)
self.assertDictEqual(self.mock_router_rep, r) self._compare_routers(self.mock_router_rep, r)
self.assert_calls() self.assert_calls()
def test_get_router_not_found(self): def test_get_router_not_found(self):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers']), 'network', 'public',
append=['v2.0', 'routers', 'mickey']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers'],
qs_elements=['name=mickey']),
json={'routers': []}) json={'routers': []})
]) ])
r = self.cloud.get_router('mickey') r = self.cloud.get_router('mickey')
@ -114,7 +133,8 @@ class TestRouter(base.TestCase):
]) ])
new_router = self.cloud.create_router(name=self.router_name, new_router = self.cloud.create_router(name=self.router_name,
admin_state_up=True) admin_state_up=True)
self.assertDictEqual(self.mock_router_rep, new_router)
self._compare_routers(self.mock_router_rep, new_router)
self.assert_calls() self.assert_calls()
def test_create_router_specific_tenant(self): def test_create_router_specific_tenant(self):
@ -269,8 +289,9 @@ class TestRouter(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}), json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers']), 'network', 'public', append=['v2.0', 'routers',
json={'routers': [self.mock_router_rep]}), self.router_id]),
json=self.mock_router_rep),
dict(method='PUT', dict(method='PUT',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', 'network', 'public',
@ -283,14 +304,21 @@ class TestRouter(base.TestCase):
]) ])
new_router = self.cloud.update_router( new_router = self.cloud.update_router(
self.router_id, name=new_router_name, routes=new_routes) self.router_id, name=new_router_name, routes=new_routes)
self.assertDictEqual(expected_router_rep, new_router)
self._compare_routers(expected_router_rep, new_router)
self.assert_calls() self.assert_calls()
def test_delete_router(self): def test_delete_router(self):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers']), 'network', 'public',
append=['v2.0', 'routers', self.router_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers'],
qs_elements=['name=%s' % self.router_name]),
json={'routers': [self.mock_router_rep]}), json={'routers': [self.mock_router_rep]}),
dict(method='DELETE', dict(method='DELETE',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -305,8 +333,14 @@ class TestRouter(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers']), 'network', 'public',
json={'routers': []}), append=['v2.0', 'routers', self.router_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers'],
qs_elements=['name=%s' % self.router_name]),
json={'routers': []})
]) ])
self.assertFalse(self.cloud.delete_router(self.router_name)) self.assertFalse(self.cloud.delete_router(self.router_name))
self.assert_calls() self.assert_calls()
@ -317,31 +351,20 @@ class TestRouter(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers']), 'network', 'public',
json={'routers': [router1, router2]}), append=['v2.0', 'routers', 'mickey']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers'],
qs_elements=['name=mickey']),
json={'routers': [router1, router2]})
]) ])
self.assertRaises(exc.OpenStackCloudException, self.assertRaises(exc.OpenStackCloudException,
self.cloud.delete_router, self.cloud.delete_router,
'mickey') 'mickey')
self.assert_calls() self.assert_calls()
def test_delete_router_multiple_using_id(self):
router1 = dict(id='123', name='mickey')
router2 = dict(id='456', name='mickey')
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'routers']),
json={'routers': [router1, router2]}),
dict(method='DELETE',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'routers', '123']),
json={})
])
self.assertTrue(self.cloud.delete_router("123"))
self.assert_calls()
def _get_mock_dict(self, owner, json): def _get_mock_dict(self, owner, json):
return dict(method='GET', return dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -389,16 +412,20 @@ class TestRouter(base.TestCase):
for port_type in ['router_interface', for port_type in ['router_interface',
'router_interface_distributed', 'router_interface_distributed',
'ha_router_replicated_interface']: 'ha_router_replicated_interface']:
ports = {}
if port_type == device_owner: if port_type == device_owner:
ports = {'ports': [internal_port]} ports = {'ports': [internal_port]}
else:
ports = {'ports': []}
mock_uris.append(self._get_mock_dict(port_type, ports)) mock_uris.append(self._get_mock_dict(port_type, ports))
mock_uris.append(self._get_mock_dict('router_gateway', mock_uris.append(self._get_mock_dict('router_gateway',
{'ports': [external_port]})) {'ports': [external_port]}))
self.register_uris(mock_uris) self.register_uris(mock_uris)
ret = self.cloud.list_router_interfaces(router, interface_type) ret = self.cloud.list_router_interfaces(router, interface_type)
self.assertEqual(expected_result, ret) self.assertEqual(
[_port.Port(**i).to_dict(computed=False) for i in expected_result],
[i.to_dict(computed=False) for i in ret]
)
self.assert_calls() self.assert_calls()
router = { router = {

View File

@ -17,6 +17,7 @@ import copy
import testtools import testtools
from openstack.cloud import exc from openstack.cloud import exc
from openstack.network.v2 import subnet as _subnet
from openstack.tests.unit import base from openstack.tests.unit import base
@ -36,8 +37,8 @@ class TestSubnet(base.TestCase):
mock_subnet_rep = { mock_subnet_rep = {
'allocation_pools': [{ 'allocation_pools': [{
'start': u'192.168.199.2', 'start': '192.168.199.2',
'end': u'192.168.199.254' 'end': '192.168.199.254'
}], }],
'cidr': subnet_cidr, 'cidr': subnet_cidr,
'created_at': '2017-04-24T20:22:23Z', 'created_at': '2017-04-24T20:22:23Z',
@ -66,16 +67,27 @@ class TestSubnet(base.TestCase):
] ]
} }
def _compare_subnets(self, exp, real):
self.assertDictEqual(
_subnet.Subnet(**exp).to_dict(computed=False),
real.to_dict(computed=False))
def test_get_subnet(self): def test_get_subnet(self):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets']), 'network', 'public',
append=['v2.0', 'subnets', self.subnet_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets'],
qs_elements=['name=%s' % self.subnet_name]),
json={'subnets': [self.mock_subnet_rep]}) json={'subnets': [self.mock_subnet_rep]})
]) ])
r = self.cloud.get_subnet(self.subnet_name) r = self.cloud.get_subnet(self.subnet_name)
self.assertIsNotNone(r) self.assertIsNotNone(r)
self.assertDictEqual(self.mock_subnet_rep, r) self._compare_subnets(self.mock_subnet_rep, r)
self.assert_calls() self.assert_calls()
def test_get_subnet_by_id(self): def test_get_subnet_by_id(self):
@ -89,7 +101,7 @@ class TestSubnet(base.TestCase):
]) ])
r = self.cloud.get_subnet_by_id(self.subnet_id) r = self.cloud.get_subnet_by_id(self.subnet_id)
self.assertIsNotNone(r) self.assertIsNotNone(r)
self.assertDictEqual(self.mock_subnet_rep, r) self._compare_subnets(self.mock_subnet_rep, r)
self.assert_calls() self.assert_calls()
def test_create_subnet(self): def test_create_subnet(self):
@ -103,7 +115,14 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', self.network_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=%s' % self.network_name]),
json={'networks': [self.mock_network_rep]}), json={'networks': [self.mock_network_rep]}),
dict(method='POST', dict(method='POST',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -123,7 +142,7 @@ class TestSubnet(base.TestCase):
allocation_pools=pool, allocation_pools=pool,
dns_nameservers=dns, dns_nameservers=dns,
host_routes=routes) host_routes=routes)
self.assertDictEqual(mock_subnet_rep, subnet) self._compare_subnets(mock_subnet_rep, subnet)
self.assert_calls() self.assert_calls()
def test_create_subnet_string_ip_version(self): def test_create_subnet_string_ip_version(self):
@ -131,7 +150,14 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', self.network_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=%s' % self.network_name]),
json={'networks': [self.mock_network_rep]}), json={'networks': [self.mock_network_rep]}),
dict(method='POST', dict(method='POST',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -146,7 +172,7 @@ class TestSubnet(base.TestCase):
]) ])
subnet = self.cloud.create_subnet( subnet = self.cloud.create_subnet(
self.network_name, self.subnet_cidr, ip_version='4') self.network_name, self.subnet_cidr, ip_version='4')
self.assertDictEqual(self.mock_subnet_rep, subnet) self._compare_subnets(self.mock_subnet_rep, subnet)
self.assert_calls() self.assert_calls()
def test_create_subnet_bad_ip_version(self): def test_create_subnet_bad_ip_version(self):
@ -154,8 +180,15 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
json={'networks': [self.mock_network_rep]}) append=['v2.0', 'networks', self.network_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=%s' % self.network_name]),
json={'networks': [self.mock_network_rep]}),
]) ])
with testtools.ExpectedException( with testtools.ExpectedException(
exc.OpenStackCloudException, exc.OpenStackCloudException,
@ -175,7 +208,14 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', self.network_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=%s' % self.network_name]),
json={'networks': [self.mock_network_rep]}), json={'networks': [self.mock_network_rep]}),
dict(method='POST', dict(method='POST',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -195,7 +235,7 @@ class TestSubnet(base.TestCase):
allocation_pools=pool, allocation_pools=pool,
dns_nameservers=dns, dns_nameservers=dns,
disable_gateway_ip=True) disable_gateway_ip=True)
self.assertDictEqual(mock_subnet_rep, subnet) self._compare_subnets(mock_subnet_rep, subnet)
self.assert_calls() self.assert_calls()
def test_create_subnet_with_gateway_ip(self): def test_create_subnet_with_gateway_ip(self):
@ -209,7 +249,14 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', self.network_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=%s' % self.network_name]),
json={'networks': [self.mock_network_rep]}), json={'networks': [self.mock_network_rep]}),
dict(method='POST', dict(method='POST',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -229,14 +276,21 @@ class TestSubnet(base.TestCase):
allocation_pools=pool, allocation_pools=pool,
dns_nameservers=dns, dns_nameservers=dns,
gateway_ip=gateway) gateway_ip=gateway)
self.assertDictEqual(mock_subnet_rep, subnet) self._compare_subnets(mock_subnet_rep, subnet)
self.assert_calls() self.assert_calls()
def test_create_subnet_conflict_gw_ops(self): def test_create_subnet_conflict_gw_ops(self):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', 'kooky']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=kooky']),
json={'networks': [self.mock_network_rep]}) json={'networks': [self.mock_network_rep]})
]) ])
gateway = '192.168.200.3' gateway = '192.168.200.3'
@ -250,8 +304,15 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
json={'networks': [self.mock_network_rep]}) append=['v2.0', 'networks', 'duck']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=duck']),
json={'networks': [self.mock_network_rep]}),
]) ])
self.assertRaises(exc.OpenStackCloudException, self.assertRaises(exc.OpenStackCloudException,
self.cloud.create_subnet, self.cloud.create_subnet,
@ -264,8 +325,15 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
json={'networks': [net1, net2]}) append=['v2.0', 'networks', self.network_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=%s' % self.network_name]),
json={'networks': [net1, net2]}),
]) ])
self.assertRaises(exc.OpenStackCloudException, self.assertRaises(exc.OpenStackCloudException,
self.cloud.create_subnet, self.cloud.create_subnet,
@ -289,7 +357,14 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'networks']), 'network', 'public',
append=['v2.0', 'networks', self.network_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'networks'],
qs_elements=['name=%s' % self.network_name]),
json={'networks': [self.mock_network_rep]}), json={'networks': [self.mock_network_rep]}),
dict(method='POST', dict(method='POST',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -312,14 +387,25 @@ class TestSubnet(base.TestCase):
use_default_subnetpool=True, use_default_subnetpool=True,
prefixlen=self.prefix_length, prefixlen=self.prefix_length,
host_routes=routes) host_routes=routes)
self.assertDictEqual(mock_subnet_rep, subnet) mock_subnet_rep.update(
{
'prefixlen': self.prefix_length,
'use_default_subnetpool': True
})
self._compare_subnets(mock_subnet_rep, subnet)
self.assert_calls() self.assert_calls()
def test_delete_subnet(self): def test_delete_subnet(self):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets']), 'network', 'public',
append=['v2.0', 'subnets', self.subnet_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets'],
qs_elements=['name=%s' % self.subnet_name]),
json={'subnets': [self.mock_subnet_rep]}), json={'subnets': [self.mock_subnet_rep]}),
dict(method='DELETE', dict(method='DELETE',
uri=self.get_mock_url( uri=self.get_mock_url(
@ -334,7 +420,13 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets']), 'network', 'public',
append=['v2.0', 'subnets', 'goofy']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets'],
qs_elements=['name=goofy']),
json={'subnets': []}) json={'subnets': []})
]) ])
self.assertFalse(self.cloud.delete_subnet('goofy')) self.assertFalse(self.cloud.delete_subnet('goofy'))
@ -346,7 +438,13 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets']), 'network', 'public',
append=['v2.0', 'subnets', self.subnet_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets'],
qs_elements=['name=%s' % self.subnet_name]),
json={'subnets': [subnet1, subnet2]}) json={'subnets': [subnet1, subnet2]})
]) ])
self.assertRaises(exc.OpenStackCloudException, self.assertRaises(exc.OpenStackCloudException,
@ -354,14 +452,14 @@ class TestSubnet(base.TestCase):
self.subnet_name) self.subnet_name)
self.assert_calls() self.assert_calls()
def test_delete_subnet_multiple_using_id(self): def test_delete_subnet_using_id(self):
subnet1 = dict(id='123', name=self.subnet_name) subnet1 = dict(id='123', name=self.subnet_name)
subnet2 = dict(id='456', name=self.subnet_name)
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets']), 'network', 'public', append=['v2.0', 'subnets',
json={'subnets': [subnet1, subnet2]}), subnet1['id']]),
json=subnet1),
dict(method='DELETE', dict(method='DELETE',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', 'network', 'public',
@ -377,8 +475,9 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets']), 'network', 'public',
json={'subnets': [self.mock_subnet_rep]}), append=['v2.0', 'subnets', self.subnet_id]),
json=self.mock_subnet_rep),
dict(method='PUT', dict(method='PUT',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', 'network', 'public',
@ -388,7 +487,7 @@ class TestSubnet(base.TestCase):
json={'subnet': {'name': 'goofy'}})) json={'subnet': {'name': 'goofy'}}))
]) ])
subnet = self.cloud.update_subnet(self.subnet_id, subnet_name='goofy') subnet = self.cloud.update_subnet(self.subnet_id, subnet_name='goofy')
self.assertDictEqual(expected_subnet, subnet) self._compare_subnets(expected_subnet, subnet)
self.assert_calls() self.assert_calls()
def test_update_subnet_gateway_ip(self): def test_update_subnet_gateway_ip(self):
@ -398,8 +497,9 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets']), 'network', 'public',
json={'subnets': [self.mock_subnet_rep]}), append=['v2.0', 'subnets', self.subnet_id]),
json=self.mock_subnet_rep),
dict(method='PUT', dict(method='PUT',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', 'network', 'public',
@ -409,7 +509,7 @@ class TestSubnet(base.TestCase):
json={'subnet': {'gateway_ip': gateway}})) json={'subnet': {'gateway_ip': gateway}}))
]) ])
subnet = self.cloud.update_subnet(self.subnet_id, gateway_ip=gateway) subnet = self.cloud.update_subnet(self.subnet_id, gateway_ip=gateway)
self.assertDictEqual(expected_subnet, subnet) self._compare_subnets(expected_subnet, subnet)
self.assert_calls() self.assert_calls()
def test_update_subnet_disable_gateway_ip(self): def test_update_subnet_disable_gateway_ip(self):
@ -418,8 +518,9 @@ class TestSubnet(base.TestCase):
self.register_uris([ self.register_uris([
dict(method='GET', dict(method='GET',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'subnets']), 'network', 'public',
json={'subnets': [self.mock_subnet_rep]}), append=['v2.0', 'subnets', self.subnet_id]),
json=self.mock_subnet_rep),
dict(method='PUT', dict(method='PUT',
uri=self.get_mock_url( uri=self.get_mock_url(
'network', 'public', 'network', 'public',
@ -430,7 +531,7 @@ class TestSubnet(base.TestCase):
]) ])
subnet = self.cloud.update_subnet(self.subnet_id, subnet = self.cloud.update_subnet(self.subnet_id,
disable_gateway_ip=True) disable_gateway_ip=True)
self.assertDictEqual(expected_subnet, subnet) self._compare_subnets(expected_subnet, subnet)
self.assert_calls() self.assert_calls()
def test_update_subnet_conflict_gw_ops(self): def test_update_subnet_conflict_gw_ops(self):

View File

@ -0,0 +1,7 @@
---
upgrade:
- |
Networking functions of the cloud layer return now resource objects
`openstack.resource`. While those still implement Munch interface and are
accessible as dictionary modification of an instance might be causing
issues (i.e. forbidden).