Dean Troyer 589a65c3fe Fix Nova-net netowrk commands
In cleaning up functional tests for nova-net, I discovered some
problems in network create:
* --subnet option is required in network create command
* Switch API to use /os-networks rather than /os-tenant-networks as this
  is what we were actually using via novaclient
* Fix functional tests for nova-net
* Normalize some private function names in network/v2/network.py

Change-Id: I426b864406756d58d140575a3a45ee9aee67ce84
2017-04-27 20:36:00 +00:00

503 lines
12 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Compute v2 API Library"""
from keystoneauth1 import exceptions as ksa_exceptions
from osc_lib.api import api
from osc_lib import exceptions
from osc_lib.i18n import _
# TODO(dtroyer): Mingrate this to osc-lib
class InvalidValue(Exception):
"""An argument value is not valid: wrong type, out of range, etc"""
message = "Supplied value is not valid"
class APIv2(api.BaseAPI):
"""Compute v2 API"""
def __init__(self, **kwargs):
super(APIv2, self).__init__(**kwargs)
# Overrides
def _check_integer(self, value, msg=None):
"""Attempt to convert value to an integer
Raises InvalidValue on failure
:param value:
Convert this to an integer. None is converted to 0 (zero).
:param msg:
An alternate message for the exception, must include exactly
one substitution to receive the attempted value.
"""
if value is None:
return 0
try:
value = int(value)
except (TypeError, ValueError):
if not msg:
msg = "%s is not an integer" % value
raise InvalidValue(msg)
return value
# TODO(dtroyer): Override find() until these fixes get into an osc-lib
# minimum release
def find(
self,
path,
value=None,
attr=None,
):
"""Find a single resource by name or ID
:param string path:
The API-specific portion of the URL path
:param string value:
search expression (required, really)
:param string attr:
name of attribute for secondary search
"""
try:
ret = self._request('GET', "/%s/%s" % (path, value)).json()
if isinstance(ret, dict):
# strip off the enclosing dict
key = list(ret.keys())[0]
ret = ret[key]
except (
ksa_exceptions.NotFound,
ksa_exceptions.BadRequest,
):
kwargs = {attr: value}
try:
ret = self.find_one(path, **kwargs)
except ksa_exceptions.NotFound:
msg = _("%s not found") % value
raise exceptions.NotFound(msg)
return ret
# Flaoting IPs
def floating_ip_create(
self,
pool=None,
):
"""Create a new floating ip
https://developer.openstack.org/api-ref/compute/#create-allocate-floating-ip-address
:param pool: Name of floating IP pool
"""
url = "/os-floating-ips"
try:
return self.create(
url,
json={'pool': pool},
)['floating_ip']
except (
ksa_exceptions.NotFound,
ksa_exceptions.BadRequest,
):
msg = _("%s not found") % pool
raise exceptions.NotFound(msg)
def floating_ip_delete(
self,
floating_ip_id=None,
):
"""Delete a floating IP
https://developer.openstack.org/api-ref/compute/#delete-deallocate-floating-ip-address
:param string security_group:
Floating IP ID
"""
url = "/os-floating-ips"
if floating_ip_id is not None:
return self.delete('/%s/%s' % (url, floating_ip_id))
return None
def floating_ip_find(
self,
floating_ip=None,
):
"""Return a security group given name or ID
https://developer.openstack.org/api-ref/compute/#list-floating-ip-addresses
:param string floating_ip:
Floating IP address
:returns: A dict of the floating IP attributes
"""
url = "/os-floating-ips"
return self.find(
url,
attr='ip',
value=floating_ip,
)
def floating_ip_list(
self,
):
"""Get floating IPs
https://developer.openstack.org/api-ref/compute/#show-floating-ip-address-details
:returns:
list of floating IPs
"""
url = "/os-floating-ips"
return self.list(url)["floating_ips"]
# Floating IP Pools
def floating_ip_pool_list(
self,
):
"""Get floating IP pools
https://developer.openstack.org/api-ref/compute/?expanded=#list-floating-ip-pools
:returns:
list of floating IP pools
"""
url = "/os-floating-ip-pools"
return self.list(url)["floating_ip_pools"]
# Networks
def network_create(
self,
name=None,
subnet=None,
share_subnet=None,
):
"""Create a new network
https://developer.openstack.org/api-ref/compute/#create-network
:param string name:
Network label (required)
:param integer subnet:
Subnet for IPv4 fixed addresses in CIDR notation (required)
:param integer share_subnet:
Shared subnet between projects, True or False
:returns: A dict of the network attributes
"""
url = "/os-networks"
params = {
'label': name,
'cidr': subnet,
}
if share_subnet is not None:
params['share_address'] = share_subnet
return self.create(
url,
json={'network': params},
)['network']
def network_delete(
self,
network=None,
):
"""Delete a network
https://developer.openstack.org/api-ref/compute/#delete-network
:param string network:
Network name or ID
"""
url = "/os-networks"
network = self.find(
url,
attr='label',
value=network,
)['id']
if network is not None:
return self.delete('/%s/%s' % (url, network))
return None
def network_find(
self,
network=None,
):
"""Return a network given name or ID
https://developer.openstack.org/api-ref/compute/#show-network-details
:param string network:
Network name or ID
:returns: A dict of the network attributes
"""
url = "/os-networks"
return self.find(
url,
attr='label',
value=network,
)
def network_list(
self,
):
"""Get networks
https://developer.openstack.org/api-ref/compute/#list-networks
:returns:
list of networks
"""
url = "/os-networks"
return self.list(url)["networks"]
# Security Groups
def security_group_create(
self,
name=None,
description=None,
):
"""Create a new security group
https://developer.openstack.org/api-ref/compute/#create-security-group
:param string name:
Security group name
:param integer description:
Security group description
"""
url = "/os-security-groups"
params = {
'name': name,
'description': description,
}
return self.create(
url,
json={'security_group': params},
)['security_group']
def security_group_delete(
self,
security_group=None,
):
"""Delete a security group
https://developer.openstack.org/api-ref/compute/#delete-security-group
:param string security_group:
Security group name or ID
"""
url = "/os-security-groups"
security_group = self.find(
url,
attr='name',
value=security_group,
)['id']
if security_group is not None:
return self.delete('/%s/%s' % (url, security_group))
return None
def security_group_find(
self,
security_group=None,
):
"""Return a security group given name or ID
https://developer.openstack.org/api-ref/compute/#show-security-group-details
:param string security_group:
Security group name or ID
:returns: A dict of the security group attributes
"""
url = "/os-security-groups"
return self.find(
url,
attr='name',
value=security_group,
)
def security_group_list(
self,
limit=None,
marker=None,
search_opts=None,
):
"""Get security groups
https://developer.openstack.org/api-ref/compute/#list-security-groups
:param integer limit:
query return count limit
:param string marker:
query marker
:param search_opts:
(undocumented) Search filter dict
all_tenants: True|False - return all projects
:returns:
list of security groups names
"""
params = {}
if search_opts is not None:
params = dict((k, v) for (k, v) in search_opts.items() if v)
if limit:
params['limit'] = limit
if marker:
params['offset'] = marker
url = "/os-security-groups"
return self.list(url, **params)["security_groups"]
def security_group_set(
self,
security_group=None,
# name=None,
# description=None,
**params
):
"""Update a security group
https://developer.openstack.org/api-ref/compute/#update-security-group
:param string security_group:
Security group name or ID
TODO(dtroyer): Create an update method in osc-lib
"""
# Short-circuit no-op
if params is None:
return None
url = "/os-security-groups"
security_group = self.find(
url,
attr='name',
value=security_group,
)
if security_group is not None:
for (k, v) in params.items():
# Only set a value if it is already present
if k in security_group:
security_group[k] = v
return self._request(
"PUT",
"/%s/%s" % (url, security_group['id']),
json={'security_group': security_group},
).json()['security_group']
return None
# Security Group Rules
def security_group_rule_create(
self,
security_group_id=None,
ip_protocol=None,
from_port=None,
to_port=None,
remote_ip=None,
remote_group=None,
):
"""Create a new security group rule
https://developer.openstack.org/api-ref/compute/#create-security-group-rule
:param string security_group_id:
Security group ID
:param ip_protocol:
IP protocol, 'tcp', 'udp' or 'icmp'
:param from_port:
Source port
:param to_port:
Destination port
:param remote_ip:
Source IP address in CIDR notation
:param remote_group:
Remote security group
"""
url = "/os-security-group-rules"
if ip_protocol.lower() not in ['icmp', 'tcp', 'udp']:
raise InvalidValue(
"%(s) is not one of 'icmp', 'tcp', or 'udp'" % ip_protocol
)
params = {
'parent_group_id': security_group_id,
'ip_protocol': ip_protocol,
'from_port': self._check_integer(from_port),
'to_port': self._check_integer(to_port),
'cidr': remote_ip,
'group_id': remote_group,
}
return self.create(
url,
json={'security_group_rule': params},
)['security_group_rule']
def security_group_rule_delete(
self,
security_group_rule_id=None,
):
"""Delete a security group rule
https://developer.openstack.org/api-ref/compute/#delete-security-group-rule
:param string security_group_rule_id:
Security group rule ID
"""
url = "/os-security-group-rules"
if security_group_rule_id is not None:
return self.delete('/%s/%s' % (url, security_group_rule_id))
return None