Merging the brand new Quantum-client-library feature.

Thanks to lp:tylesmit for this contribution to the Quantum project.

All, client, CLI, dashboard, Quantum API client, should now start
using this client library as our defacto "SDK" to program against
Quantum's Cloud Networking fabric.


added:
  quantum/client.py
  tests/unit/api.py
modified:
  quantum/cli.py
This commit is contained in:
Somik Behera 2011-08-02 12:40:57 -07:00
commit f1992c2c85
3 changed files with 965 additions and 131 deletions

View File

@ -26,58 +26,9 @@ import urllib
from manager import QuantumManager
from optparse import OptionParser
from quantum.common.wsgi import Serializer
from client import Client
FORMAT = "json"
CONTENT_TYPE = "application/" + FORMAT
### --- Miniclient (taking from the test directory)
### TODO(bgh): move this to a library within quantum
class MiniClient(object):
"""A base client class - derived from Glance.BaseClient"""
action_prefix = '/v0.1/tenants/{tenant_id}'
def __init__(self, host, port, use_ssl):
self.host = host
self.port = port
self.use_ssl = use_ssl
self.connection = None
def get_connection_type(self):
if self.use_ssl:
return httplib.HTTPSConnection
else:
return httplib.HTTPConnection
def do_request(self, tenant, method, action, body=None,
headers=None, params=None):
action = MiniClient.action_prefix + action
action = action.replace('{tenant_id}', tenant)
if type(params) is dict:
action += '?' + urllib.urlencode(params)
try:
connection_type = self.get_connection_type()
headers = headers or {}
# Open connection and send request
c = connection_type(self.host, self.port)
c.request(method, action, body, headers)
res = c.getresponse()
status_code = self.get_status_code(res)
if status_code in (httplib.OK, httplib.CREATED,
httplib.ACCEPTED, httplib.NO_CONTENT):
return res
else:
raise Exception("Server returned error: %s" % res.read())
except (socket.error, IOError), e:
raise Exception("Unable to connect to server. Got error: %s" % e)
def get_status_code(self, response):
if hasattr(response, 'status_int'):
return response.status_int
else:
return response.status
### -- end of miniclient
### -- Core CLI functions
@ -94,11 +45,10 @@ def list_nets(manager, *args):
def api_list_nets(client, *args):
tenant_id = args[0]
res = client.do_request(tenant_id, 'GET', "/networks." + FORMAT)
resdict = json.loads(res.read())
LOG.debug(resdict)
res = client.list_networks()
LOG.debug(res)
print "Virtual Networks on Tenant:%s\n" % tenant_id
for n in resdict["networks"]:
for n in res["networks"]:
net_id = n["id"]
print "\tNetwork ID:%s\n" % (net_id)
# TODO(bgh): we should make this call pass back the name too
@ -115,13 +65,11 @@ def create_net(manager, *args):
def api_create_net(client, *args):
tid, name = args
data = {'network': {'net-name': '%s' % name}}
body = Serializer().serialize(data, CONTENT_TYPE)
res = client.do_request(tid, 'POST', "/networks." + FORMAT, body=body)
rd = json.loads(res.read())
LOG.debug(rd)
res = client.create_network(data)
LOG.debug(res)
nid = None
try:
nid = rd["networks"]["network"]["id"]
nid = res["networks"]["network"]["id"]
except Exception, e:
print "Failed to create network"
# TODO(bgh): grab error details from ws request result
@ -137,14 +85,12 @@ def delete_net(manager, *args):
def api_delete_net(client, *args):
tid, nid = args
res = client.do_request(tid, 'DELETE', "/networks/" + nid + "." + FORMAT)
status = res.status
if status != 202:
print "Failed to delete network"
output = res.read()
print output
else:
try:
res = client.delete_network(nid)
print "Deleted Virtual Network with ID:%s" % nid
except Exception, e:
print "Failed to delete network"
LOG.error("Failed to delete network: %s" % e)
def detail_net(manager, *args):
@ -157,23 +103,25 @@ def detail_net(manager, *args):
def api_detail_net(client, *args):
tid, nid = args
res = client.do_request(tid, 'GET',
"/networks/%s/ports.%s" % (nid, FORMAT))
output = res.read()
if res.status != 200:
LOG.error("Failed to list ports: %s" % output)
try:
res = client.list_network_details(nid)["networks"]["network"]
except Exception, e:
LOG.error("Failed to get network details: %s" % e)
return
rd = json.loads(output)
LOG.debug(rd)
try:
ports = client.list_ports(nid)
except Exception, e:
LOG.error("Failed to list ports: %s" % e)
return
print "Network %s (%s)" % (res['name'], res['id'])
print "Remote Interfaces on Virtual Network:%s\n" % nid
for port in rd["ports"]:
for port in ports["ports"]:
pid = port["id"]
res = client.do_request(tid, 'GET',
"/networks/%s/ports/%s/attachment.%s" % (nid, pid, FORMAT))
output = res.read()
rd = json.loads(output)
LOG.debug(rd)
remote_iface = rd["attachment"]
res = client.list_port_attachments(nid, pid)
LOG.debug(res)
remote_iface = res["attachment"]
print "\tRemote interface:%s" % remote_iface
@ -186,11 +134,12 @@ def rename_net(manager, *args):
def api_rename_net(client, *args):
tid, nid, name = args
data = {'network': {'net-name': '%s' % name}}
body = Serializer().serialize(data, CONTENT_TYPE)
res = client.do_request(tid, 'PUT', "/networks/%s.%s" % (nid, FORMAT),
body=body)
resdict = json.loads(res.read())
LOG.debug(resdict)
try:
res = client.update_network(nid, data)
except Exception, e:
LOG.error("Failed to rename network %s: %s" % (nid, e))
return
LOG.debug(res)
print "Renamed Virtual Network with ID:%s" % nid
@ -204,16 +153,15 @@ def list_ports(manager, *args):
def api_list_ports(client, *args):
tid, nid = args
res = client.do_request(tid, 'GET',
"/networks/%s/ports.%s" % (nid, FORMAT))
output = res.read()
if res.status != 200:
LOG.error("Failed to list ports: %s" % output)
try:
ports = client.list_ports(nid)
except Exception, e:
LOG.error("Failed to list ports: %s" % e)
return
rd = json.loads(output)
LOG.debug(rd)
LOG.debug(ports)
print "Ports on Virtual Network:%s\n" % nid
for port in rd["ports"]:
for port in ports["ports"]:
print "\tVirtual Port:%s" % port["id"]
@ -226,14 +174,12 @@ def create_port(manager, *args):
def api_create_port(client, *args):
tid, nid = args
res = client.do_request(tid, 'POST',
"/networks/%s/ports.%s" % (nid, FORMAT))
output = res.read()
if res.status != 200:
LOG.error("Failed to create port: %s" % output)
try:
res = client.create_port(nid)
except Exception, e:
LOG.error("Failed to create port: %s" % e)
return
rd = json.loads(output)
new_port = rd["ports"]["port"]["id"]
new_port = res["ports"]["port"]["id"]
print "Created Virtual Port:%s " \
"on Virtual Network:%s" % (new_port, nid)
@ -247,14 +193,15 @@ def delete_port(manager, *args):
def api_delete_port(client, *args):
tid, nid, pid = args
res = client.do_request(tid, 'DELETE',
"/networks/%s/ports/%s.%s" % (nid, pid, FORMAT))
output = res.read()
if res.status != 202:
LOG.error("Failed to delete port: %s" % output)
try:
res = client.delete_port(nid, pid)
except Exception, e:
LOG.error("Failed to delete port: %s" % e)
return
LOG.info("Deleted Virtual Port:%s " \
"on Virtual Network:%s" % (pid, nid))
print "Deleted Virtual Port:%s " \
"on Virtual Network:%s" % (pid, nid)
def detail_port(manager, *args):
@ -266,14 +213,12 @@ def detail_port(manager, *args):
def api_detail_port(client, *args):
tid, nid, pid = args
res = client.do_request(tid, 'GET',
"/networks/%s/ports/%s.%s" % (nid, pid, FORMAT))
output = res.read()
if res.status != 200:
LOG.error("Failed to get port details: %s" % output)
try:
port = client.list_port_details(nid, pid)["ports"]["port"]
except Exception, e:
LOG.error("Failed to get port details: %s" % e)
return
rd = json.loads(output)
port = rd["ports"]["port"]
id = port["id"]
attachment = port["attachment"]
LOG.debug(port)
@ -290,18 +235,15 @@ def plug_iface(manager, *args):
def api_plug_iface(client, *args):
tid, nid, pid, vid = args
data = {'port': {'attachment-id': '%s' % vid}}
body = Serializer().serialize(data, CONTENT_TYPE)
res = client.do_request(tid, 'PUT',
"/networks/%s/ports/%s/attachment.%s" % (nid, pid, FORMAT), body=body)
output = res.read()
LOG.debug(output)
if res.status != 202:
try:
data = {'port': {'attachment-id': '%s' % vid}}
res = client.attach_resource(nid, pid, data)
except Exception, e:
LOG.error("Failed to plug iface \"%s\" to port \"%s\": %s" % (vid,
pid, output))
return
print "Plugged interface \"%s\" to port:%s on network:%s" % (vid, pid,
nid)
LOG.debug(res)
print "Plugged interface \"%s\" to port:%s on network:%s" % (vid, pid, nid)
def unplug_iface(manager, *args):
@ -313,16 +255,12 @@ def unplug_iface(manager, *args):
def api_unplug_iface(client, *args):
tid, nid, pid = args
data = {'port': {'attachment-id': ''}}
body = Serializer().serialize(data, CONTENT_TYPE)
res = client.do_request(tid, 'DELETE',
"/networks/%s/ports/%s/attachment.%s" % (nid, pid, FORMAT), body=body)
output = res.read()
LOG.debug(output)
if res.status != 202:
LOG.error("Failed to unplug iface from port \"%s\": %s" % \
(pid, output))
try:
res = client.detach_resource(nid, pid)
except Exception, e:
LOG.error("Failed to unplug iface from port \"%s\": %s" % (pid, e))
return
LOG.debug(res)
print "Unplugged interface from port:%s on network:%s" % (pid, nid)
@ -440,7 +378,8 @@ if __name__ == "__main__":
sys.exit(1)
LOG.debug("Executing command \"%s\" with args: %s" % (cmd, args))
if not options.load_plugin:
client = MiniClient(options.host, options.port, options.ssl)
client = Client(options.host, options.port, options.ssl,
args[0], FORMAT)
if "api_func" not in commands[cmd]:
LOG.error("API version of \"%s\" is not yet implemented" % cmd)
sys.exit(1)

270
quantum/client.py Normal file
View File

@ -0,0 +1,270 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Citrix Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Tyler Smith, Cisco Systems
import httplib
import socket
import urllib
from quantum.common.wsgi import Serializer
class api_call(object):
"""A Decorator to add support for format and tenant overriding"""
def __init__(self, f):
self.f = f
def __get__(self, instance, owner):
def with_params(*args, **kwargs):
# Temporarily set format and tenant for this request
(format, tenant) = (instance.format, instance.tenant)
if 'format' in kwargs:
instance.format = kwargs['format']
if 'tenant' in kwargs:
instance.tenant = kwargs['tenant']
ret = self.f(instance, *args)
(instance.format, instance.tenant) = (format, tenant)
return ret
return with_params
class Client(object):
"""A base client class - derived from Glance.BaseClient"""
action_prefix = '/v0.1/tenants/{tenant_id}'
"""Action query strings"""
networks_path = "/networks"
network_path = "/networks/%s"
ports_path = "/networks/%s/ports"
port_path = "/networks/%s/ports/%s"
attachment_path = "/networks/%s/ports/%s/attachment"
def __init__(self, host="127.0.0.1", port=9696, use_ssl=False, tenant=None,
format="xml", testingStub=None, key_file=None, cert_file=None):
"""
Creates a new client to some service.
:param host: The host where service resides
:param port: The port where service resides
:param use_ssl: True to use SSL, False to use HTTP
:param tenant: The tenant ID to make requests with
:param format: The format to query the server with
:param testingStub: A class that stubs basic server methods for tests
:param key_file: The SSL key file to use if use_ssl is true
:param cert_file: The SSL cert file to use if use_ssl is true
"""
self.host = host
self.port = port
self.use_ssl = use_ssl
self.tenant = tenant
self.format = format
self.connection = None
self.testingStub = testingStub
self.key_file = key_file
self.cert_file = cert_file
def get_connection_type(self):
"""
Returns the proper connection type
"""
if self.testingStub:
return self.testingStub
if self.use_ssl:
return httplib.HTTPSConnection
else:
return httplib.HTTPConnection
def do_request(self, method, action, body=None,
headers=None, params=None):
"""
Connects to the server and issues a request.
Returns the result data, or raises an appropriate exception if
HTTP status code is not 2xx
:param method: HTTP method ("GET", "POST", "PUT", etc...)
:param body: string of data to send, or None (default)
:param headers: mapping of key/value pairs to add as headers
:param params: dictionary of key/value pairs to add to append
to action
"""
# Ensure we have a tenant id
if not self.tenant:
raise Exception("Tenant ID not set")
# Add format and tenant_id
action += ".%s" % self.format
action = Client.action_prefix + action
action = action.replace('{tenant_id}', self.tenant)
if type(params) is dict:
action += '?' + urllib.urlencode(params)
try:
connection_type = self.get_connection_type()
headers = headers or {"Content-Type":
"application/%s" % self.format}
# Open connection and send request, handling SSL certs
certs = {'key_file': self.key_file, 'cert_file': self.cert_file}
certs = dict((x, certs[x]) for x in certs if certs[x] != None)
if self.use_ssl and len(certs):
c = connection_type(self.host, self.port, **certs)
else:
c = connection_type(self.host, self.port)
c.request(method, action, body, headers)
res = c.getresponse()
status_code = self.get_status_code(res)
if status_code in (httplib.OK,
httplib.CREATED,
httplib.ACCEPTED,
httplib.NO_CONTENT):
return self.deserialize(res)
else:
raise Exception("Server returned error: %s" % res.read())
except (socket.error, IOError), e:
raise Exception("Unable to connect to "
"server. Got error: %s" % e)
def get_status_code(self, response):
"""
Returns the integer status code from the response, which
can be either a Webob.Response (used in testing) or httplib.Response
"""
if hasattr(response, 'status_int'):
return response.status_int
else:
return response.status
def serialize(self, data):
if type(data) is dict:
return Serializer().serialize(data, self.content_type())
def deserialize(self, data):
if self.get_status_code(data) == 202:
return data.read()
return Serializer().deserialize(data.read(), self.content_type())
def content_type(self, format=None):
if not format:
format = self.format
return "application/%s" % (format)
@api_call
def list_networks(self):
"""
Queries the server for a list of networks
"""
return self.do_request("GET", self.networks_path)
@api_call
def list_network_details(self, network):
"""
Queries the server for the details of a certain network
"""
return self.do_request("GET", self.network_path % (network))
@api_call
def create_network(self, body=None):
"""
Creates a new network on the server
"""
body = self.serialize(body)
return self.do_request("POST", self.networks_path, body=body)
@api_call
def update_network(self, network, body=None):
"""
Updates a network on the server
"""
body = self.serialize(body)
return self.do_request("PUT", self.network_path % (network), body=body)
@api_call
def delete_network(self, network):
"""
Deletes a network on the server
"""
return self.do_request("DELETE", self.network_path % (network))
@api_call
def list_ports(self, network):
"""
Queries the server for a list of ports on a given network
"""
return self.do_request("GET", self.ports_path % (network))
@api_call
def list_port_details(self, network, port):
"""
Queries the server for a list of ports on a given network
"""
return self.do_request("GET", self.port_path % (network, port))
@api_call
def create_port(self, network):
"""
Creates a new port on a network on the server
"""
return self.do_request("POST", self.ports_path % (network))
@api_call
def delete_port(self, network, port):
"""
Deletes a port from a network on the server
"""
return self.do_request("DELETE", self.port_path % (network, port))
@api_call
def set_port_state(self, network, port, body=None):
"""
Sets the state of a port on the server
"""
body = self.serialize(body)
return self.do_request("PUT",
self.port_path % (network, port), body=body)
@api_call
def list_port_attachments(self, network, port):
"""
Deletes a port from a network on the server
"""
return self.do_request("GET", self.attachment_path % (network, port))
@api_call
def attach_resource(self, network, port, body=None):
"""
Deletes a port from a network on the server
"""
body = self.serialize(body)
return self.do_request("PUT",
self.attachment_path % (network, port), body=body)
@api_call
def detach_resource(self, network, port):
"""
Deletes a port from a network on the server
"""
return self.do_request("DELETE",
self.attachment_path % (network, port))

625
tests/unit/api.py Normal file
View File

@ -0,0 +1,625 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Tyler Smith, Cisco Systems
import logging
import unittest
import re
from quantum.common.wsgi import Serializer
from quantum.client import Client
LOG = logging.getLogger('quantum.tests.test_api')
# Set a couple tenants to use for testing
TENANT_1 = 'totore'
TENANT_2 = 'totore2'
class ServerStub():
"""This class stubs a basic server for the API client to talk to"""
class Response(object):
"""This class stubs a basic response to send the API client"""
def __init__(self, content=None, status=None):
self.content = content
self.status = status
def read(self):
return self.content
def status(self):
return status
# To test error codes, set the host to 10.0.0.1, and the port to the code
def __init__(self, host, port=9696, key_file="", cert_file=""):
self.host = host
self.port = port
self.key_file = key_file
self.cert_file = cert_file
def request(self, method, action, body, headers):
self.method = method
self.action = action
self.body = body
def status(self, status=None):
return status or 200
def getresponse(self):
res = self.Response(status=self.status())
# If the host is 10.0.0.1, return the port as an error code
if self.host == "10.0.0.1":
res.status = self.port
return res
# Extract important information from the action string to assure sanity
match = re.search('tenants/(.+?)/(.+)\.(json|xml)$', self.action)
tenant = match.group(1)
path = match.group(2)
format = match.group(3)
data = {'data': {'method': self.method, 'action': self.action,
'body': self.body, 'tenant': tenant, 'path': path,
'format': format, 'key_file': self.key_file,
'cert_file': self.cert_file}}
# Serialize it to the proper format so the API client can handle it
if data['data']['format'] == 'json':
res.content = Serializer().serialize(data, "application/json")
else:
res.content = Serializer().serialize(data, "application/xml")
return res
class APITest(unittest.TestCase):
def setUp(self):
""" Setups a test environment for the API client """
HOST = '127.0.0.1'
PORT = 9696
USE_SSL = False
self.client = Client(HOST, PORT, USE_SSL, TENANT_1, 'json', ServerStub)
def _assert_sanity(self, call, status, method, path, data=[], params={}):
""" Perform common assertions to test the sanity of client requests """
# Handle an error case first
if status != 200:
(self.client.host, self.client.port) = ("10.0.0.1", status)
self.assertRaises(Exception, call, *data, **params)
return
# Make the call, then get the data from the root node and assert it
data = call(*data, **params)['data']
self.assertEqual(data['method'], method)
self.assertEqual(data['format'], params['format'])
self.assertEqual(data['tenant'], params['tenant'])
self.assertEqual(data['path'], path)
return data
def _test_list_networks(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_list_networks - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.list_networks,
status,
"GET",
"networks",
data=[],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_list_networks - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_list_network_details(self,
tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_list_network_details - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.list_network_details,
status,
"GET",
"networks/001",
data=["001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_list_network_details - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_create_network(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_create_network - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.create_network,
status,
"POST",
"networks",
data=[{'network': {'net-name': 'testNetwork'}}],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_create_network - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_update_network(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_update_network - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.update_network,
status,
"PUT",
"networks/001",
data=["001",
{'network': {'net-name': 'newName'}}],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_update_network - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_delete_network(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_delete_network - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.delete_network,
status,
"DELETE",
"networks/001",
data=["001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_delete_network - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_list_ports(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_list_ports - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.list_ports,
status,
"GET",
"networks/001/ports",
data=["001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_list_ports - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_list_port_details(self,
tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_list_port_details - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.list_port_details,
status,
"GET",
"networks/001/ports/001",
data=["001", "001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_list_port_details - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_create_port(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_create_port - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.create_port,
status,
"POST",
"networks/001/ports",
data=["001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_create_port - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_delete_port(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_delete_port - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.delete_port,
status,
"DELETE",
"networks/001/ports/001",
data=["001", "001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_delete_port - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_set_port_state(self, tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_set_port_state - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.set_port_state,
status,
"PUT",
"networks/001/ports/001",
data=["001", "001",
{'port': {'state': 'ACTIVE'}}],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_set_port_state - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_list_port_attachments(self,
tenant=TENANT_1, format='json', status=200):
LOG.debug("_test_list_port_attachments - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.list_port_attachments,
status,
"GET",
"networks/001/ports/001/attachment",
data=["001", "001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_list_port_attachments - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_attach_resource(self, tenant=TENANT_1,
format='json', status=200):
LOG.debug("_test_attach_resource - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.attach_resource,
status,
"PUT",
"networks/001/ports/001/attachment",
data=["001", "001",
{'resource': {'id': '1234'}}],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_attach_resource - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_detach_resource(self, tenant=TENANT_1,
format='json', status=200):
LOG.debug("_test_detach_resource - tenant:%s "\
"- format:%s - START", format, tenant)
self._assert_sanity(self.client.detach_resource,
status,
"DELETE",
"networks/001/ports/001/attachment",
data=["001", "001"],
params={'tenant': tenant, 'format': format})
LOG.debug("_test_detach_resource - tenant:%s "\
"- format:%s - END", format, tenant)
def _test_ssl_certificates(self, tenant=TENANT_1,
format='json', status=200):
LOG.debug("_test_ssl_certificates - tenant:%s "\
"- format:%s - START", format, tenant)
# Set SSL, and our cert file
self.client.use_ssl = True
cert_file = "/fake.cert"
self.client.key_file = self.client.cert_file = cert_file
data = self._assert_sanity(self.client.list_networks,
status,
"GET",
"networks",
data=[],
params={'tenant': tenant, 'format': format})
self.assertEquals(data["key_file"], cert_file)
self.assertEquals(data["cert_file"], cert_file)
LOG.debug("_test_ssl_certificates - tenant:%s "\
"- format:%s - END", format, tenant)
def test_list_networks_json(self):
self._test_list_networks(format='json')
def test_list_networks_xml(self):
self._test_list_networks(format='xml')
def test_list_networks_alt_tenant(self):
self._test_list_networks(tenant=TENANT_2)
def test_list_networks_error_470(self):
self._test_list_networks(status=470)
def test_list_networks_error_401(self):
self._test_list_networks(status=401)
def test_list_network_details_json(self):
self._test_list_network_details(format='json')
def test_list_network_details_xml(self):
self._test_list_network_details(format='xml')
def test_list_network_details_alt_tenant(self):
self._test_list_network_details(tenant=TENANT_2)
def test_list_network_details_error_470(self):
self._test_list_network_details(status=470)
def test_list_network_details_error_401(self):
self._test_list_network_details(status=401)
def test_list_network_details_error_420(self):
self._test_list_network_details(status=420)
def test_create_network_json(self):
self._test_create_network(format='json')
def test_create_network_xml(self):
self._test_create_network(format='xml')
def test_create_network_alt_tenant(self):
self._test_create_network(tenant=TENANT_2)
def test_create_network_error_470(self):
self._test_create_network(status=470)
def test_create_network_error_401(self):
self._test_create_network(status=401)
def test_create_network_error_400(self):
self._test_create_network(status=400)
def test_create_network_error_422(self):
self._test_create_network(status=422)
def test_update_network_json(self):
self._test_update_network(format='json')
def test_update_network_xml(self):
self._test_update_network(format='xml')
def test_update_network_alt_tenant(self):
self._test_update_network(tenant=TENANT_2)
def test_update_network_error_470(self):
self._test_update_network(status=470)
def test_update_network_error_401(self):
self._test_update_network(status=401)
def test_update_network_error_400(self):
self._test_update_network(status=400)
def test_update_network_error_420(self):
self._test_update_network(status=420)
def test_update_network_error_422(self):
self._test_update_network(status=422)
def test_delete_network_json(self):
self._test_delete_network(format='json')
def test_delete_network_xml(self):
self._test_delete_network(format='xml')
def test_delete_network_alt_tenant(self):
self._test_delete_network(tenant=TENANT_2)
def test_delete_network_error_470(self):
self._test_delete_network(status=470)
def test_delete_network_error_401(self):
self._test_delete_network(status=401)
def test_delete_network_error_420(self):
self._test_delete_network(status=420)
def test_delete_network_error_421(self):
self._test_delete_network(status=421)
def test_list_ports_json(self):
self._test_list_ports(format='json')
def test_list_ports_xml(self):
self._test_list_ports(format='xml')
def test_list_ports_alt_tenant(self):
self._test_list_ports(tenant=TENANT_2)
def test_list_ports_error_470(self):
self._test_list_ports(status=470)
def test_list_ports_error_401(self):
self._test_list_ports(status=401)
def test_list_ports_error_420(self):
self._test_list_ports(status=420)
def test_list_port_details_json(self):
self._test_list_ports(format='json')
def test_list_port_details_xml(self):
self._test_list_ports(format='xml')
def test_list_port_details_alt_tenant(self):
self._test_list_ports(tenant=TENANT_2)
def test_list_port_details_error_470(self):
self._test_list_port_details(status=470)
def test_list_port_details_error_401(self):
self._test_list_ports(status=401)
def test_list_port_details_error_420(self):
self._test_list_ports(status=420)
def test_list_port_details_error_430(self):
self._test_list_ports(status=430)
def test_create_port_json(self):
self._test_create_port(format='json')
def test_create_port_xml(self):
self._test_create_port(format='xml')
def test_create_port_alt_tenant(self):
self._test_create_port(tenant=TENANT_2)
def test_create_port_error_470(self):
self._test_create_port(status=470)
def test_create_port_error_401(self):
self._test_create_port(status=401)
def test_create_port_error_400(self):
self._test_create_port(status=400)
def test_create_port_error_420(self):
self._test_create_port(status=420)
def test_create_port_error_430(self):
self._test_create_port(status=430)
def test_create_port_error_431(self):
self._test_create_port(status=431)
def test_delete_port_json(self):
self._test_delete_port(format='json')
def test_delete_port_xml(self):
self._test_delete_port(format='xml')
def test_delete_port_alt_tenant(self):
self._test_delete_port(tenant=TENANT_2)
def test_delete_port_error_470(self):
self._test_delete_port(status=470)
def test_delete_port_error_401(self):
self._test_delete_port(status=401)
def test_delete_port_error_420(self):
self._test_delete_port(status=420)
def test_delete_port_error_430(self):
self._test_delete_port(status=430)
def test_delete_port_error_432(self):
self._test_delete_port(status=432)
def test_set_port_state_json(self):
self._test_set_port_state(format='json')
def test_set_port_state_xml(self):
self._test_set_port_state(format='xml')
def test_set_port_state_alt_tenant(self):
self._test_set_port_state(tenant=TENANT_2)
def test_set_port_state_error_470(self):
self._test_set_port_state(status=470)
def test_set_port_state_error_401(self):
self._test_set_port_state(status=401)
def test_set_port_state_error_400(self):
self._test_set_port_state(status=400)
def test_set_port_state_error_420(self):
self._test_set_port_state(status=420)
def test_set_port_state_error_430(self):
self._test_set_port_state(status=430)
def test_set_port_state_error_431(self):
self._test_set_port_state(status=431)
def test_list_port_attachments_json(self):
self._test_list_port_attachments(format='json')
def test_list_port_attachments_xml(self):
self._test_list_port_attachments(format='xml')
def test_list_port_attachments_alt_tenant(self):
self._test_list_port_attachments(tenant=TENANT_2)
def test_list_port_attachments_error_470(self):
self._test_list_port_attachments(status=470)
def test_list_port_attachments_error_401(self):
self._test_list_port_attachments(status=401)
def test_list_port_attachments_error_400(self):
self._test_list_port_attachments(status=400)
def test_list_port_attachments_error_420(self):
self._test_list_port_attachments(status=420)
def test_list_port_attachments_error_430(self):
self._test_list_port_attachments(status=430)
def test_attach_resource_json(self):
self._test_attach_resource(format='json')
def test_attach_resource_xml(self):
self._test_attach_resource(format='xml')
def test_attach_resource_alt_tenant(self):
self._test_attach_resource(tenant=TENANT_2)
def test_attach_resource_error_470(self):
self._test_attach_resource(status=470)
def test_attach_resource_error_401(self):
self._test_attach_resource(status=401)
def test_attach_resource_error_400(self):
self._test_attach_resource(status=400)
def test_attach_resource_error_420(self):
self._test_attach_resource(status=420)
def test_attach_resource_error_430(self):
self._test_attach_resource(status=430)
def test_attach_resource_error_432(self):
self._test_attach_resource(status=432)
def test_attach_resource_error_440(self):
self._test_attach_resource(status=440)
def test_detach_resource_json(self):
self._test_detach_resource(format='json')
def test_detach_resource_xml(self):
self._test_detach_resource(format='xml')
def test_detach_resource_alt_tenant(self):
self._test_detach_resource(tenant=TENANT_2)
def test_detach_resource_error_470(self):
self._test_detach_resource(status=470)
def test_detach_resource_error_401(self):
self._test_detach_resource(status=401)
def test_detach_resource_error_420(self):
self._test_detach_resource(status=420)
def test_detach_resource_error_430(self):
self._test_detach_resource(status=430)
def test_ssl_certificates(self):
self._test_ssl_certificates()