Merging branch lp:~netstack/quantum/quantum-unit-tests

Provides functionality as specified by blueprint https://blueprints.launchpad.net/quantum/+spec/api-spec-unit-tests
This commit is contained in:
Salvatore Orlando 2011-07-22 08:01:43 +01:00
commit f5bd5b40f4
22 changed files with 1232 additions and 384 deletions

47
TESTING Normal file
View File

@ -0,0 +1,47 @@
Testing Quantum
=============================================================
Overview
There are two types of tests in quantum: functional and unit. Their
respective directories are located in the tests/ directory.
The functional tests are intended to be used when the service is running.
Their goal is to make sure the service is working end to end and also to
test any plugin for conformance and functionality. Also note that the
functional tests expect quantum to be running on the local machine. If it
isn't you will have to change the tests to point to your quuntum instance.
The unit tests can be run without the service running. They are designed
to test the various pieces of the quantum tree to make sure any new
changes don't break existing functionality.
Running tests
All tests can be run via the run_tests.sh script, which will allow you to
run them in the standard environment or create a virtual environment to
run them in. All of the functional tests will fail if the service isn't
running. One current TODO item is to be able to specify whether you want
to run the functional or unit tests via run_tests.sh.
After running all of the tests, run_test.sh will report any pep8 errors
found in the tree.
Adding more tests
Quantum is a pretty new code base at this point and there is plenty of
areas that need tests. The current blueprint and branch for adding tests
is located at:
https://code.launchpad.net/~netstack/quantum/quantum-unit-tests
Also, there is a wiki page tracking the status of the test effort:
http://wiki.openstack.org/QuantumUnitTestStatus
Development process
It is expected that any new changes that are proposed for merge come with
unit tests for that feature or code area. Ideally any bugs fixes that are
submitted also have unit tests to prove that they stay fixed! :) In
addition, before proposing for merge, all of the current unit tests should
be passing.

View File

@ -41,14 +41,14 @@ class APIRouterV01(wsgi.Router):
Routes requests on the Quantum API to the appropriate controller
"""
def __init__(self, ext_mgr=None):
def __init__(self, options=None):
mapper = routes.Mapper()
self._setup_routes(mapper)
self._setup_routes(mapper, options)
super(APIRouterV01, self).__init__(mapper)
def _setup_routes(self, mapper):
def _setup_routes(self, mapper, options):
# Loads the quantum plugin
plugin = manager.QuantumManager().get_plugin()
plugin = manager.QuantumManager(options).get_plugin()
uri_prefix = '/tenants/{tenant_id}/'
mapper.resource('network', 'networks',
controller=networks.Controller(plugin),

View File

@ -29,7 +29,7 @@ class Controller(common.QuantumController):
""" Network API controller for Quantum API """
_network_ops_param_list = [{
'param-name': 'network-name',
'param-name': 'net-name',
'required': True}, ]
_serialization_metadata = {
@ -80,7 +80,7 @@ class Controller(common.QuantumController):
return faults.Fault(e)
network = self._plugin.\
create_network(tenant_id,
request_params['network-name'])
request_params['net-name'])
builder = networks_view.get_view_builder(request)
result = builder.build(network)
return dict(networks=result)
@ -94,12 +94,9 @@ class Controller(common.QuantumController):
except exc.HTTPError as e:
return faults.Fault(e)
try:
network = self._plugin.rename_network(tenant_id,
id, request_params['network-name'])
builder = networks_view.get_view_builder(request)
result = builder.build(network, True)
return dict(networks=result)
self._plugin.rename_network(tenant_id, id,
request_params['net-name'])
return exc.HTTPAccepted()
except exception.NetworkNotFound as e:
return faults.Fault(faults.NetworkNotFound(e))

View File

@ -132,18 +132,16 @@ class Controller(common.QuantumController):
def get_resource(self, request, tenant_id, network_id, id):
try:
result = self._plugin.get_interface_details(
tenant_id, network_id, id)
result = self._plugin.get_port_details(
tenant_id, network_id, id).get('attachment-id',
None)
return dict(attachment=result)
except exception.NetworkNotFound as e:
return faults.Fault(faults.NetworkNotFound(e))
except exception.PortNotFound as e:
return faults.Fault(faults.PortNotFound(e))
#TODO - Complete implementation of these APIs
def attach_resource(self, request, tenant_id, network_id, id):
content_type = request.best_match_content_type()
print "Content type:%s" % content_type
try:
request_params = \
self._parse_request_params(request,
@ -164,11 +162,10 @@ class Controller(common.QuantumController):
except exception.AlreadyAttached as e:
return faults.Fault(faults.AlreadyAttached(e))
#TODO - Complete implementation of these APIs
def detach_resource(self, request, tenant_id, network_id, id):
try:
self._plugin.unplug_interface(tenant_id,
network_id, id)
network_id, id)
return exc.HTTPAccepted()
except exception.NetworkNotFound as e:
return faults.Fault(faults.NetworkNotFound(e))

View File

@ -33,7 +33,6 @@ class ViewBuilder(object):
def build(self, network_data, is_detail=False):
"""Generic method used to generate a network entity."""
print "NETWORK-DATA:%s" % network_data
if is_detail:
network = self._build_detail(network_data)
else:

View File

@ -31,7 +31,6 @@ class ViewBuilder(object):
def build(self, port_data, is_detail=False):
"""Generic method used to generate a port entity."""
print "PORT-DATA:%s" % port_data
if is_detail:
port = self._build_detail(port_data)
else:
@ -45,5 +44,4 @@ class ViewBuilder(object):
def _build_detail(self, port_data):
"""Return a simple model of a server."""
return dict(port=dict(id=port_data['port-id'],
attachment=port_data['attachment'],
state=port_data['port-state']))

View File

@ -300,7 +300,8 @@ def api_plug_iface(client, *args):
LOG.error("Failed to plug iface \"%s\" to port \"%s\": %s" % (vid,
pid, output))
return
print "Plugged interface \"%s\" to port:%s on network:%s" % (vid, pid, nid)
print "Plugged interface \"%s\" to port:%s on network:%s" % (vid, pid,
nid)
def unplug_iface(manager, *args):

View File

@ -190,7 +190,7 @@ def parse_isotime(timestr):
return datetime.datetime.strptime(timestr, TIME_FORMAT)
def getPluginFromConfig(file="config.ini"):
def get_plugin_from_config(file="config.ini"):
Config = ConfigParser.ConfigParser()
Config.read(file)
return Config.get("PLUGIN", "provider")

View File

@ -299,7 +299,6 @@ class Router(object):
Route the incoming request to a controller based on self.map.
If no match, return a 404.
"""
LOG.debug("HERE - wsgi.Router.__call__")
return self._router
@staticmethod
@ -337,10 +336,6 @@ class Controller(object):
arg_dict = req.environ['wsgiorg.routing_args'][1]
action = arg_dict['action']
method = getattr(self, action)
LOG.debug("ARG_DICT:%s", arg_dict)
LOG.debug("Action:%s", action)
LOG.debug("Method:%s", method)
LOG.debug("%s %s" % (req.method, req.url))
del arg_dict['controller']
del arg_dict['action']
if 'format' in arg_dict:

View File

@ -19,7 +19,8 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, exc
import models
from quantum.db import models
_ENGINE = None
_MAKER = None
@ -42,6 +43,13 @@ def configure_db(options):
register_models()
def clear_db():
global _ENGINE
assert _ENGINE
for table in reversed(BASE.metadata.sorted_tables):
_ENGINE.execute(table.delete())
def get_session(autocommit=True, expire_on_commit=False):
"""Helper method to grab session"""
global _MAKER, _ENGINE
@ -74,7 +82,8 @@ def network_create(tenant_id, name):
net = session.query(models.Network).\
filter_by(tenant_id=tenant_id, name=name).\
one()
raise Exception("Network with name \"%s\" already exists" % name)
raise Exception("Network with name %(name)s already " \
"exists for tenant %(tenant_id)s" % locals())
except exc.NoResultFound:
with session.begin():
net = models.Network(tenant_id, name)
@ -128,10 +137,11 @@ def network_destroy(net_id):
raise Exception("No network found with id = %s" % net_id)
def port_create(net_id):
def port_create(net_id, state=None):
session = get_session()
with session.begin():
port = models.Port(net_id)
port['state'] = state or 'DOWN'
session.add(port)
session.flush()
return port
@ -154,6 +164,16 @@ def port_get(port_id):
raise Exception("No port found with id = %s " % port_id)
def port_set_state(port_id, new_state):
port = port_get(port_id)
if port:
session = get_session()
port.state = new_state
session.merge(port)
session.flush()
return port
def port_set_attachment(port_id, new_interface_id):
session = get_session()
ports = []
@ -175,6 +195,14 @@ def port_set_attachment(port_id, new_interface_id):
% (new_interface_id))
def port_unset_attachment(port_id):
session = get_session()
port = port_get(port_id)
port.interface_id = None
session.merge(port)
session.flush
def port_destroy(port_id):
session = get_session()
try:

View File

@ -19,14 +19,49 @@
import uuid
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy import Column, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relation
from sqlalchemy.orm import relation, object_mapper
BASE = declarative_base()
class Port(BASE):
class QuantumBase(object):
"""Base class for Quantum Models."""
def __setitem__(self, key, value):
setattr(self, key, value)
def __getitem__(self, key):
return getattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
def __iter__(self):
self._i = iter(object_mapper(self).columns)
return self
def next(self):
n = self._i.next().name
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict"""
for k, v in values.iteritems():
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict.
Includes attributes from joins."""
local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems()
if not k[0] == '_'])
local.update(joined)
return local.iteritems()
class Port(BASE, QuantumBase):
"""Represents a port on a quantum network"""
__tablename__ = 'ports'
@ -34,17 +69,20 @@ class Port(BASE):
network_id = Column(String(255), ForeignKey("networks.uuid"),
nullable=False)
interface_id = Column(String(255))
# Port state - Hardcoding string value at the moment
state = Column(String(8))
def __init__(self, network_id):
self.uuid = uuid.uuid4()
self.uuid = str(uuid.uuid4())
self.network_id = network_id
self.state = "DOWN"
def __repr__(self):
return "<Port(%s,%s,%s)>" % (self.uuid, self.network_id,
self.interface_id)
return "<Port(%s,%s,%s,%s)>" % (self.uuid, self.network_id,
self.state, self.interface_id)
class Network(BASE):
class Network(BASE, QuantumBase):
"""Represents a quantum network"""
__tablename__ = 'networks'
@ -54,7 +92,7 @@ class Network(BASE):
ports = relation(Port, order_by=Port.uuid, backref="network")
def __init__(self, tenant_id, name):
self.uuid = uuid.uuid4()
self.uuid = str(uuid.uuid4())
self.tenant_id = tenant_id
self.name = name

View File

@ -25,6 +25,7 @@ class.
The caller should make sure that QuantumManager is a singleton.
"""
import gettext
import logging
import os
gettext.install('quantum', unicode=1)
@ -33,6 +34,7 @@ from common import utils
from quantum_plugin_base import QuantumPluginBase
CONFIG_FILE = "plugins.ini"
LOG = logging.getLogger('quantum.manager')
def find_config(basepath):
@ -43,20 +45,26 @@ def find_config(basepath):
class QuantumManager(object):
def __init__(self, config=None):
if config == None:
def __init__(self, options=None, config_file=None):
if config_file == None:
self.configuration_file = find_config(
os.path.abspath(os.path.dirname(__file__)))
else:
self.configuration_file = config
plugin_location = utils.getPluginFromConfig(self.configuration_file)
plugin_klass = utils.import_class(plugin_location)
self.configuration_file = config_file
# If no options have been provided, create an empty dict
if not options:
options = {}
if not 'plugin_provider' in options:
options['plugin_provider'] = \
utils.get_plugin_from_config(self.configuration_file)
LOG.debug("Plugin location:%s", options['plugin_provider'])
plugin_klass = utils.import_class(options['plugin_provider'])
if not issubclass(plugin_klass, QuantumPluginBase):
raise Exception("Configured Quantum plug-in " \
"didn't pass compatibility test")
else:
print("Successfully imported Quantum plug-in." \
"All compatibility tests passed\n")
LOG.debug("Successfully imported Quantum plug-in." \
"All compatibility tests passed")
self.plugin = plugin_klass()
def get_plugin(self):

View File

@ -1,3 +1,4 @@
[PLUGIN]
# Quantum plugin provider module
provider = quantum.plugins.SamplePlugin.FakePlugin
#provider = quantum.plugins.SamplePlugin.FakePlugin
provider = quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPlugin

View File

@ -14,8 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
# @author: Somik Behera, Nicira Networks, Inc.
# @author: Salvatore Orlando, Citrix
import logging
from quantum.common import exceptions as exc
from quantum.db import api as db
LOG = logging.getLogger('quantum.plugins.SamplePlugin')
class QuantumEchoPlugin(object):
@ -227,62 +233,41 @@ class FakePlugin(object):
client/cli/api development
"""
#static data for networks and ports
_port_dict_1 = {
1: {'port-id': 1,
'port-state': 'DOWN',
'attachment': None},
2: {'port-id': 2,
'port-state': 'UP',
'attachment': None}}
_port_dict_2 = {
1: {'port-id': 1,
'port-state': 'UP',
'attachment': 'SomeFormOfVIFID'},
2: {'port-id': 2,
'port-state': 'DOWN',
'attachment': None}}
_networks = {'001':
{
'net-id': '001',
'net-name': 'pippotest',
'net-ports': _port_dict_1},
'002':
{
'net-id': '002',
'net-name': 'cicciotest',
'net-ports': _port_dict_2}}
def __init__(self):
FakePlugin._net_counter = len(FakePlugin._networks)
db.configure_db({'sql_connection': 'sqlite:///:memory:'})
FakePlugin._net_counter = 0
def _get_network(self, tenant_id, network_id):
network = FakePlugin._networks.get(network_id)
if not network:
try:
network = db.network_get(network_id)
except:
raise exc.NetworkNotFound(net_id=network_id)
return network
def _get_port(self, tenant_id, network_id, port_id):
net = self._get_network(tenant_id, network_id)
port = net['net-ports'].get(int(port_id))
if not port:
try:
port = db.port_get(port_id)
except:
raise exc.PortNotFound(net_id=network_id, port_id=port_id)
# Port must exist and belong to the appropriate network.
if port['network_id'] != net['uuid']:
raise exc.PortNotFound(net_id=network_id, port_id=port_id)
return port
def _validate_port_state(self, port_state):
if port_state.upper() not in ('UP', 'DOWN'):
if port_state.upper() not in ('ACTIVE', 'DOWN'):
raise exc.StateInvalid(port_state=port_state)
return True
def _validate_attachment(self, tenant_id, network_id, port_id,
remote_interface_id):
network = self._get_network(tenant_id, network_id)
for port in network['net-ports'].values():
if port['attachment'] == remote_interface_id:
for port in db.port_list(network_id):
if port['interface_id'] == remote_interface_id:
raise exc.AlreadyAttached(net_id=network_id,
port_id=port_id,
att_id=port['attachment'],
att_port_id=port['port-id'])
att_id=port['interface_id'],
att_port_id=port['uuid'])
def get_all_networks(self, tenant_id):
"""
@ -290,48 +275,47 @@ class FakePlugin(object):
<network_uuid, network_name> for
the specified tenant.
"""
print("get_all_networks() called\n")
return FakePlugin._networks.values()
LOG.debug("FakePlugin.get_all_networks() called")
nets = []
for net in db.network_list(tenant_id):
net_item = {'net-id': str(net.uuid),
'net-name': net.name}
nets.append(net_item)
return nets
def get_network_details(self, tenant_id, net_id):
"""
retrieved a list of all the remote vifs that
are attached to the network
"""
print("get_network_details() called\n")
return self._get_network(tenant_id, net_id)
LOG.debug("FakePlugin.get_network_details() called")
net = self._get_network(tenant_id, net_id)
return {'net-id': str(net.uuid),
'net-name': net.name}
def create_network(self, tenant_id, net_name):
"""
Creates a new Virtual Network, and assigns it
a symbolic name.
"""
print("create_network() called\n")
FakePlugin._net_counter += 1
new_net_id = ("0" * (3 - len(str(FakePlugin._net_counter)))) + \
str(FakePlugin._net_counter)
print new_net_id
new_net_dict = {'net-id': new_net_id,
'net-name': net_name,
'net-ports': {}}
FakePlugin._networks[new_net_id] = new_net_dict
# return network_id of the created network
return new_net_dict
LOG.debug("FakePlugin.create_network() called")
new_net = db.network_create(tenant_id, net_name)
# Return uuid for newly created network as net-id.
return {'net-id': new_net['uuid']}
def delete_network(self, tenant_id, net_id):
"""
Deletes the network with the specified network identifier
belonging to the specified tenant.
"""
print("delete_network() called\n")
net = FakePlugin._networks.get(net_id)
LOG.debug("FakePlugin.delete_network() called")
net = self._get_network(tenant_id, net_id)
# Verify that no attachments are plugged into the network
if net:
if net['net-ports']:
for port in net['net-ports'].values():
if port['attachment']:
raise exc.NetworkInUse(net_id=net_id)
FakePlugin._networks.pop(net_id)
for port in db.port_list(net_id):
if port['interface_id']:
raise exc.NetworkInUse(net_id=net_id)
db.network_destroy(net_id)
return net
# Network not found
raise exc.NetworkNotFound(net_id=net_id)
@ -341,9 +325,12 @@ class FakePlugin(object):
Updates the symbolic name belonging to a particular
Virtual Network.
"""
print("rename_network() called\n")
LOG.debug("FakePlugin.rename_network() called")
try:
db.network_rename(net_id, tenant_id, new_name)
except:
raise exc.NetworkNotFound(net_id=net_id)
net = self._get_network(tenant_id, net_id)
net['net-name'] = new_name
return net
def get_all_ports(self, tenant_id, net_id):
@ -351,45 +338,49 @@ class FakePlugin(object):
Retrieves all port identifiers belonging to the
specified Virtual Network.
"""
print("get_all_ports() called\n")
network = self._get_network(tenant_id, net_id)
ports_on_net = network['net-ports'].values()
return ports_on_net
LOG.debug("FakePlugin.get_all_ports() called")
port_ids = []
ports = db.port_list(net_id)
for x in ports:
d = {'port-id': str(x.uuid)}
port_ids.append(d)
return port_ids
def get_port_details(self, tenant_id, net_id, port_id):
"""
This method allows the user to retrieve a remote interface
that is attached to this particular port.
"""
print("get_port_details() called\n")
return self._get_port(tenant_id, net_id, port_id)
LOG.debug("FakePlugin.get_port_details() called")
port = self._get_port(tenant_id, net_id, port_id)
return {'port-id': str(port.uuid),
'attachment-id': port.interface_id,
'port-state': port.state}
def create_port(self, tenant_id, net_id, port_state=None):
"""
Creates a port on the specified Virtual Network.
"""
print("create_port() called\n")
net = self._get_network(tenant_id, net_id)
# check port state
# TODO(salvatore-orlando): Validate port state in API?
self._validate_port_state(port_state)
ports = net['net-ports']
new_port_id = max(ports.keys()) + 1
new_port_dict = {'port-id': new_port_id,
'port-state': port_state,
'attachment': None}
ports[new_port_id] = new_port_dict
return new_port_dict
LOG.debug("FakePlugin.create_port() called")
# verify net_id
self._get_network(tenant_id, net_id)
port = db.port_create(net_id, port_state)
port_item = {'port-id': str(port.uuid)}
return port_item
def update_port(self, tenant_id, net_id, port_id, port_state):
def update_port(self, tenant_id, net_id, port_id, new_state):
"""
Updates the state of a port on the specified Virtual Network.
"""
print("create_port() called\n")
port = self._get_port(tenant_id, net_id, port_id)
self._validate_port_state(port_state)
port['port-state'] = port_state
return port
LOG.debug("FakePlugin.update_port() called")
#validate port and network ids
self._get_network(tenant_id, net_id)
self._get_port(tenant_id, net_id, port_id)
self._validate_port_state(new_state)
db.port_set_state(port_id, new_state)
port_item = {'port-id': port_id,
'port-state': new_state}
return port_item
def delete_port(self, tenant_id, net_id, port_id):
"""
@ -398,39 +389,42 @@ class FakePlugin(object):
the remote interface is first un-plugged and then the port
is deleted.
"""
print("delete_port() called\n")
LOG.debug("FakePlugin.delete_port() called")
net = self._get_network(tenant_id, net_id)
port = self._get_port(tenant_id, net_id, port_id)
if port['attachment']:
if port['interface_id']:
raise exc.PortInUse(net_id=net_id, port_id=port_id,
att_id=port['attachment'])
att_id=port['interface_id'])
try:
net['net-ports'].pop(int(port_id))
except KeyError:
raise exc.PortNotFound(net_id=net_id, port_id=port_id)
port = db.port_destroy(port_id)
except Exception, e:
raise Exception("Failed to delete port: %s" % str(e))
d = {}
d["port-id"] = str(port.uuid)
return d
def plug_interface(self, tenant_id, net_id, port_id, remote_interface_id):
"""
Attaches a remote interface to the specified port on the
specified Virtual Network.
"""
print("plug_interface() called\n")
LOG.debug("FakePlugin.plug_interface() called")
# Validate attachment
self._validate_attachment(tenant_id, net_id, port_id,
remote_interface_id)
port = self._get_port(tenant_id, net_id, port_id)
if port['attachment']:
if port['interface_id']:
raise exc.PortInUse(net_id=net_id, port_id=port_id,
att_id=port['attachment'])
port['attachment'] = remote_interface_id
att_id=port['interface_id'])
db.port_set_attachment(port_id, remote_interface_id)
def unplug_interface(self, tenant_id, net_id, port_id):
"""
Detaches a remote interface from the specified port on the
specified Virtual Network.
"""
print("unplug_interface() called\n")
port = self._get_port(tenant_id, net_id, port_id)
LOG.debug("FakePlugin.unplug_interface() called")
self._get_port(tenant_id, net_id, port_id)
# TODO(salvatore-orlando):
# Should unplug on port without attachment raise an Error?
port['attachment'] = None
db.port_unset_attachment(port_id)

View File

@ -22,6 +22,7 @@ QuantumPluginBase provides the definition of minimum set of
methods that needs to be implemented by a Quantum Plug-in.
"""
import inspect
from abc import ABCMeta, abstractmethod
@ -242,7 +243,17 @@ class QuantumPluginBase(object):
"""
if cls is QuantumPluginBase:
for method in cls.__abstractmethods__:
if any(method in base.__dict__ for base in klass.__mro__):
method_ok = False
for base in klass.__mro__:
if method in base.__dict__:
fn_obj = base.__dict__[method]
if inspect.isfunction(fn_obj):
abstract_fn_obj = cls.__dict__[method]
arg_count = fn_obj.func_code.co_argcount
expected_arg_count = \
abstract_fn_obj.func_code.co_argcount
method_ok = arg_count == expected_arg_count
if method_ok:
continue
return NotImplemented
return True

View File

@ -63,6 +63,7 @@ To run a single functional test module::
"""
import gettext
import logging
import os
import unittest
import sys
@ -281,12 +282,19 @@ class QuantumTestRunner(core.TextTestRunner):
if __name__ == '__main__':
# Set up test logger.
logger = logging.getLogger()
hdlr = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.DEBUG)
working_dir = os.path.abspath("tests")
c = config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=working_dir)
runner = QuantumTestRunner(stream=c.stream,
verbosity=c.verbosity,
config=c)

View File

@ -2,7 +2,7 @@
function usage {
echo "Usage: $0 [OPTION]..."
echo "Run Melange's test suite(s)"
echo "Run Quantum's test suite(s)"
echo ""
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"

View File

@ -1,99 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Citrix Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib
import socket
import urllib
class MiniClient(object):
"""A base client class - derived from Glance.BaseClient"""
action_prefix = '/v0.1/tenants/{tenant_id}'
def __init__(self, host, port, use_ssl):
"""
Creates a new client to some service.
:param host: The host where service resides
:param port: The port where service resides
:param use_ssl: Should we use HTTPS?
"""
self.host = host
self.port = port
self.use_ssl = use_ssl
self.connection = None
def get_connection_type(self):
"""
Returns the proper connection type
"""
if self.use_ssl:
return httplib.HTTPSConnection
else:
return httplib.HTTPConnection
def do_request(self, tenant, method, action, body=None,
headers=None, params=None):
"""
Connects to the server and issues a request.
Returns the result data, or raises an appropriate exception if
HTTP status code is not 2xx
:param method: HTTP method ("GET", "POST", "PUT", etc...)
:param body: string of data to send, or None (default)
:param headers: mapping of key/value pairs to add as headers
:param params: dictionary of key/value pairs to add to append
to action
"""
action = MiniClient.action_prefix + action
action = action.replace('{tenant_id}', tenant)
if type(params) is dict:
action += '?' + urllib.urlencode(params)
try:
connection_type = self.get_connection_type()
headers = headers or {}
# Open connection and send request
c = connection_type(self.host, self.port)
c.request(method, action, body, headers)
res = c.getresponse()
status_code = self.get_status_code(res)
if status_code in (httplib.OK,
httplib.CREATED,
httplib.ACCEPTED,
httplib.NO_CONTENT):
return res
else:
raise Exception("Server returned error: %s" % res.read())
except (socket.error, IOError), e:
raise Exception("Unable to connect to "
"server. Got error: %s" % e)
def get_status_code(self, response):
"""
Returns the integer status code from the response, which
can be either a Webob.Response (used in testing) or httplib.Response
"""
if hasattr(response, 'status_int'):
return response.status_int
else:
return response.status

View File

@ -1,136 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Citrix Systems
# Copyright 2011 Nicira Networks
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gettext
import simplejson
import sys
import unittest
gettext.install('quantum', unicode=1)
from miniclient import MiniClient
from quantum.common.wsgi import Serializer
HOST = '127.0.0.1'
PORT = 9696
USE_SSL = False
TENANT_ID = 'totore'
FORMAT = "json"
test_network1_data = \
{'network': {'network-name': 'test1'}}
test_network2_data = \
{'network': {'network-name': 'test2'}}
def print_response(res):
content = res.read()
print "Status: %s" % res.status
print "Content: %s" % content
return content
class QuantumTest(unittest.TestCase):
def setUp(self):
self.client = MiniClient(HOST, PORT, USE_SSL)
def create_network(self, data):
content_type = "application/" + FORMAT
body = Serializer().serialize(data, content_type)
res = self.client.do_request(TENANT_ID, 'POST', "/networks." + FORMAT,
body=body)
self.assertEqual(res.status, 200, "bad response: %s" % res.read())
def test_listNetworks(self):
self.create_network(test_network1_data)
self.create_network(test_network2_data)
res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
self.assertEqual(res.status, 200, "bad response: %s" % res.read())
def test_createNetwork(self):
self.create_network(test_network1_data)
def test_createPort(self):
self.create_network(test_network1_data)
res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
resdict = simplejson.loads(res.read())
for n in resdict["networks"]:
net_id = n["id"]
# Step 1 - List Ports for network (should not find any)
res = self.client.do_request(TENANT_ID, 'GET',
"/networks/%s/ports.%s" % (net_id, FORMAT))
self.assertEqual(res.status, 200, "Bad response: %s" % res.read())
output = res.read()
self.assertTrue(len(output) == 0,
"Found unexpected ports: %s" % output)
# Step 2 - Create Port for network
res = self.client.do_request(TENANT_ID, 'POST',
"/networks/%s/ports.%s" % (net_id, FORMAT))
self.assertEqual(res.status, 200, "Bad response: %s" % output)
# Step 3 - List Ports for network (again); should find one
res = self.client.do_request(TENANT_ID, 'GET',
"/networks/%s/ports.%s" % (net_id, FORMAT))
output = res.read()
self.assertEqual(res.status, 200, "Bad response: %s" % output)
resdict = simplejson.loads(output)
ids = []
for p in resdict["ports"]:
ids.append(p["id"])
self.assertTrue(len(ids) == 1,
"Didn't find expected # of ports (1): %s" % ids)
def test_renameNetwork(self):
self.create_network(test_network1_data)
res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
resdict = simplejson.loads(res.read())
net_id = resdict["networks"][0]["id"]
data = test_network1_data.copy()
data['network']['network-name'] = 'test_renamed'
content_type = "application/" + FORMAT
body = Serializer().serialize(data, content_type)
res = self.client.do_request(TENANT_ID, 'PUT',
"/networks/%s.%s" % (net_id, FORMAT), body=body)
resdict = simplejson.loads(res.read())
self.assertTrue(resdict["networks"]["network"]["id"] == net_id,
"Network_rename: renamed network has a different uuid")
self.assertTrue(
resdict["networks"]["network"]["name"] == "test_renamed",
"Network rename didn't take effect")
def delete_networks(self):
# Remove all the networks created on the tenant
res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
resdict = simplejson.loads(res.read())
for n in resdict["networks"]:
net_id = n["id"]
res = self.client.do_request(TENANT_ID, 'DELETE',
"/networks/" + net_id + "." + FORMAT)
self.assertEqual(res.status, 202)
def tearDown(self):
self.delete_networks()
# Standard boilerplate to call the main() function.
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(QuantumTest)
unittest.TextTestRunner(verbosity=2).run(suite)

831
tests/unit/test_api.py Normal file
View File

@ -0,0 +1,831 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010-2011 ????
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Brad Hall, Nicira Networks
# @author: Salvatore Orlando, Citrix Systems
import logging
import unittest
import tests.unit.testlib_api as testlib
from quantum import api as server
from quantum.db import api as db
from quantum.common.wsgi import Serializer
LOG = logging.getLogger('quantum.tests.test_api')
class APITest(unittest.TestCase):
def _create_network(self, format, name=None, custom_req_body=None,
expected_res_status=200):
LOG.debug("Creating network")
content_type = "application/" + format
if name:
net_name = name
else:
net_name = self.network_name
network_req = testlib.new_network_request(self.tenant_id,
net_name, format,
custom_req_body)
network_res = network_req.get_response(self.api)
self.assertEqual(network_res.status_int, expected_res_status)
if expected_res_status == 200:
network_data = Serializer().deserialize(network_res.body,
content_type)
return network_data['networks']['network']['id']
def _create_port(self, network_id, port_state, format,
custom_req_body=None, expected_res_status=200):
LOG.debug("Creating port for network %s", network_id)
content_type = "application/%s" % format
port_req = testlib.new_port_request(self.tenant_id, network_id,
port_state, format,
custom_req_body)
port_res = port_req.get_response(self.api)
self.assertEqual(port_res.status_int, expected_res_status)
if expected_res_status == 200:
port_data = Serializer().deserialize(port_res.body, content_type)
return port_data['ports']['port']['id']
def _test_create_network(self, format):
LOG.debug("_test_create_network - format:%s - START", format)
content_type = "application/%s" % format
network_id = self._create_network(format)
show_network_req = testlib.show_network_request(self.tenant_id,
network_id,
format)
show_network_res = show_network_req.get_response(self.api)
self.assertEqual(show_network_res.status_int, 200)
network_data = Serializer().deserialize(show_network_res.body,
content_type)
self.assertEqual(network_id,
network_data['networks']['network']['id'])
LOG.debug("_test_create_network - format:%s - END", format)
def _test_create_network_badrequest(self, format):
LOG.debug("_test_create_network_badrequest - format:%s - START",
format)
bad_body = {'network': {'bad-attribute': 'very-bad'}}
self._create_network(format, custom_req_body=bad_body,
expected_res_status=400)
LOG.debug("_test_create_network_badrequest - format:%s - END",
format)
def _test_list_networks(self, format):
LOG.debug("_test_list_networks - format:%s - START", format)
content_type = "application/%s" % format
self._create_network(format, "net_1")
self._create_network(format, "net_2")
list_network_req = testlib.network_list_request(self.tenant_id,
format)
list_network_res = list_network_req.get_response(self.api)
self.assertEqual(list_network_res.status_int, 200)
network_data = Serializer().deserialize(list_network_res.body,
content_type)
# Check network count: should return 2
self.assertEqual(len(network_data['networks']), 2)
LOG.debug("_test_list_networks - format:%s - END", format)
def _test_show_network(self, format):
LOG.debug("_test_show_network - format:%s - START", format)
content_type = "application/%s" % format
network_id = self._create_network(format)
show_network_req = testlib.show_network_request(self.tenant_id,
network_id,
format)
show_network_res = show_network_req.get_response(self.api)
self.assertEqual(show_network_res.status_int, 200)
network_data = Serializer().deserialize(show_network_res.body,
content_type)
self.assertEqual({'id': network_id, 'name': self.network_name},
network_data['networks']['network'])
LOG.debug("_test_show_network - format:%s - END", format)
def _test_show_network_not_found(self, format):
LOG.debug("_test_show_network_not_found - format:%s - START", format)
show_network_req = testlib.show_network_request(self.tenant_id,
"A_BAD_ID",
format)
show_network_res = show_network_req.get_response(self.api)
self.assertEqual(show_network_res.status_int, 420)
LOG.debug("_test_show_network_not_found - format:%s - END", format)
def _test_rename_network(self, format):
LOG.debug("_test_rename_network - format:%s - START", format)
content_type = "application/%s" % format
new_name = 'new_network_name'
network_id = self._create_network(format)
update_network_req = testlib.update_network_request(self.tenant_id,
network_id,
new_name,
format)
update_network_res = update_network_req.get_response(self.api)
self.assertEqual(update_network_res.status_int, 202)
show_network_req = testlib.show_network_request(self.tenant_id,
network_id,
format)
show_network_res = show_network_req.get_response(self.api)
self.assertEqual(show_network_res.status_int, 200)
network_data = Serializer().deserialize(show_network_res.body,
content_type)
self.assertEqual({'id': network_id, 'name': new_name},
network_data['networks']['network'])
LOG.debug("_test_rename_network - format:%s - END", format)
def _test_rename_network_badrequest(self, format):
LOG.debug("_test_rename_network_badrequest - format:%s - START",
format)
network_id = self._create_network(format)
bad_body = {'network': {'bad-attribute': 'very-bad'}}
update_network_req = testlib.\
update_network_request(self.tenant_id,
network_id, format,
custom_req_body=bad_body)
update_network_res = update_network_req.get_response(self.api)
self.assertEqual(update_network_res.status_int, 400)
LOG.debug("_test_rename_network_badrequest - format:%s - END",
format)
def _test_rename_network_not_found(self, format):
LOG.debug("_test_rename_network_not_found - format:%s - START",
format)
new_name = 'new_network_name'
update_network_req = testlib.update_network_request(self.tenant_id,
"A BAD ID",
new_name,
format)
update_network_res = update_network_req.get_response(self.api)
self.assertEqual(update_network_res.status_int, 420)
LOG.debug("_test_rename_network_not_found - format:%s - END",
format)
def _test_delete_network(self, format):
LOG.debug("_test_delete_network - format:%s - START", format)
content_type = "application/%s" % format
network_id = self._create_network(format)
LOG.debug("Deleting network %(network_id)s"\
" of tenant %(tenant_id)s", locals())
delete_network_req = testlib.network_delete_request(self.tenant_id,
network_id,
format)
delete_network_res = delete_network_req.get_response(self.api)
self.assertEqual(delete_network_res.status_int, 202)
list_network_req = testlib.network_list_request(self.tenant_id,
format)
list_network_res = list_network_req.get_response(self.api)
network_list_data = Serializer().deserialize(list_network_res.body,
content_type)
network_count = len(network_list_data['networks'])
self.assertEqual(network_count, 0)
LOG.debug("_test_delete_network - format:%s - END", format)
def _test_delete_network_in_use(self, format):
LOG.debug("_test_delete_network_in_use - format:%s - START", format)
content_type = "application/%s" % format
port_state = "ACTIVE"
attachment_id = "test_attachment"
network_id = self._create_network(format)
LOG.debug("Deleting network %(network_id)s"\
" of tenant %(tenant_id)s", locals())
port_id = self._create_port(network_id, port_state, format)
#plug an attachment into the port
LOG.debug("Putting attachment into port %s", port_id)
attachment_req = testlib.put_attachment_request(self.tenant_id,
network_id,
port_id,
attachment_id)
attachment_res = attachment_req.get_response(self.api)
self.assertEquals(attachment_res.status_int, 202)
LOG.debug("Deleting network %(network_id)s"\
" of tenant %(tenant_id)s", locals())
delete_network_req = testlib.network_delete_request(self.tenant_id,
network_id,
format)
delete_network_res = delete_network_req.get_response(self.api)
self.assertEqual(delete_network_res.status_int, 421)
LOG.debug("_test_delete_network_in_use - format:%s - END", format)
def _test_list_ports(self, format):
LOG.debug("_test_list_ports - format:%s - START", format)
content_type = "application/%s" % format
port_state = "ACTIVE"
network_id = self._create_network(format)
self._create_port(network_id, port_state, format)
self._create_port(network_id, port_state, format)
list_port_req = testlib.port_list_request(self.tenant_id,
network_id, format)
list_port_res = list_port_req.get_response(self.api)
self.assertEqual(list_port_res.status_int, 200)
port_data = Serializer().deserialize(list_port_res.body,
content_type)
# Check port count: should return 2
self.assertEqual(len(port_data['ports']), 2)
LOG.debug("_test_list_ports - format:%s - END", format)
def _test_show_port(self, format):
LOG.debug("_test_show_port - format:%s - START", format)
content_type = "application/%s" % format
port_state = "ACTIVE"
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
show_port_req = testlib.show_port_request(self.tenant_id,
network_id, port_id,
format)
show_port_res = show_port_req.get_response(self.api)
self.assertEqual(show_port_res.status_int, 200)
port_data = Serializer().deserialize(show_port_res.body,
content_type)
self.assertEqual({'id': port_id, 'state': port_state},
port_data['ports']['port'])
LOG.debug("_test_show_port - format:%s - END", format)
def _test_show_port_networknotfound(self, format):
LOG.debug("_test_show_port_networknotfound - format:%s - START",
format)
port_state = "ACTIVE"
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
show_port_req = testlib.show_port_request(self.tenant_id,
"A_BAD_ID", port_id,
format)
show_port_res = show_port_req.get_response(self.api)
self.assertEqual(show_port_res.status_int, 420)
LOG.debug("_test_show_port_networknotfound - format:%s - END",
format)
def _test_show_port_portnotfound(self, format):
LOG.debug("_test_show_port_portnotfound - format:%s - START", format)
network_id = self._create_network(format)
show_port_req = testlib.show_port_request(self.tenant_id,
network_id,
"A_BAD_ID",
format)
show_port_res = show_port_req.get_response(self.api)
self.assertEqual(show_port_res.status_int, 430)
LOG.debug("_test_show_port_portnotfound - format:%s - END", format)
def _test_create_port(self, format):
LOG.debug("_test_create_port - format:%s - START", format)
content_type = "application/%s" % format
port_state = "ACTIVE"
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
show_port_req = testlib.show_port_request(self.tenant_id,
network_id, port_id, format)
show_port_res = show_port_req.get_response(self.api)
self.assertEqual(show_port_res.status_int, 200)
port_data = Serializer().deserialize(show_port_res.body, content_type)
self.assertEqual(port_id, port_data['ports']['port']['id'])
LOG.debug("_test_create_port - format:%s - END", format)
def _test_create_port_networknotfound(self, format):
LOG.debug("_test_create_port_networknotfound - format:%s - START",
format)
port_state = "ACTIVE"
self._create_port("A_BAD_ID", port_state, format,
expected_res_status=420)
LOG.debug("_test_create_port_networknotfound - format:%s - END",
format)
def _test_create_port_badrequest(self, format):
LOG.debug("_test_create_port_badrequest - format:%s - START", format)
bad_body = {'bad-resource': {'bad-attribute': 'bad-value'}}
network_id = self._create_network(format)
port_state = "ACTIVE"
self._create_port(network_id, port_state, format,
custom_req_body=bad_body, expected_res_status=400)
LOG.debug("_test_create_port_badrequest - format:%s - END", format)
def _test_delete_port(self, format):
LOG.debug("_test_delete_port - format:%s - START", format)
content_type = "application/%s" % format
port_state = "ACTIVE"
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
LOG.debug("Deleting port %(port_id)s for network %(network_id)s"\
" of tenant %(tenant_id)s", locals())
delete_port_req = testlib.port_delete_request(self.tenant_id,
network_id, port_id,
format)
delete_port_res = delete_port_req.get_response(self.api)
self.assertEqual(delete_port_res.status_int, 202)
list_port_req = testlib.port_list_request(self.tenant_id, network_id,
format)
list_port_res = list_port_req.get_response(self.api)
port_list_data = Serializer().deserialize(list_port_res.body,
content_type)
port_count = len(port_list_data['ports'])
self.assertEqual(port_count, 0)
LOG.debug("_test_delete_port - format:%s - END", format)
def _test_delete_port_in_use(self, format):
LOG.debug("_test_delete_port_in_use - format:%s - START", format)
content_type = "application/" + format
port_state = "ACTIVE"
attachment_id = "test_attachment"
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
#plug an attachment into the port
LOG.debug("Putting attachment into port %s", port_id)
attachment_req = testlib.put_attachment_request(self.tenant_id,
network_id,
port_id,
attachment_id)
attachment_res = attachment_req.get_response(self.api)
self.assertEquals(attachment_res.status_int, 202)
LOG.debug("Deleting port %(port_id)s for network %(network_id)s"\
" of tenant %(tenant_id)s", locals())
delete_port_req = testlib.port_delete_request(self.tenant_id,
network_id, port_id,
format)
delete_port_res = delete_port_req.get_response(self.api)
self.assertEqual(delete_port_res.status_int, 432)
LOG.debug("_test_delete_port_in_use - format:%s - END", format)
def _test_delete_port_with_bad_id(self, format):
LOG.debug("_test_delete_port_with_bad_id - format:%s - START",
format)
port_state = "ACTIVE"
network_id = self._create_network(format)
self._create_port(network_id, port_state, format)
# Test for portnotfound
delete_port_req = testlib.port_delete_request(self.tenant_id,
network_id, "A_BAD_ID",
format)
delete_port_res = delete_port_req.get_response(self.api)
self.assertEqual(delete_port_res.status_int, 430)
LOG.debug("_test_delete_port_with_bad_id - format:%s - END", format)
def _test_delete_port_networknotfound(self, format):
LOG.debug("_test_delete_port_networknotfound - format:%s - START",
format)
port_state = "ACTIVE"
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
delete_port_req = testlib.port_delete_request(self.tenant_id,
"A_BAD_ID", port_id,
format)
delete_port_res = delete_port_req.get_response(self.api)
self.assertEqual(delete_port_res.status_int, 420)
LOG.debug("_test_delete_port_networknotfound - format:%s - END",
format)
def _test_set_port_state(self, format):
LOG.debug("_test_set_port_state - format:%s - START", format)
content_type = "application/%s" % format
port_state = 'DOWN'
new_port_state = 'ACTIVE'
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
update_port_req = testlib.update_port_request(self.tenant_id,
network_id, port_id,
new_port_state,
format)
update_port_res = update_port_req.get_response(self.api)
self.assertEqual(update_port_res.status_int, 200)
show_port_req = testlib.show_port_request(self.tenant_id,
network_id, port_id,
format)
show_port_res = show_port_req.get_response(self.api)
self.assertEqual(show_port_res.status_int, 200)
network_data = Serializer().deserialize(show_port_res.body,
content_type)
self.assertEqual({'id': port_id, 'state': new_port_state},
network_data['ports']['port'])
LOG.debug("_test_set_port_state - format:%s - END", format)
def _test_set_port_state_networknotfound(self, format):
LOG.debug("_test_set_port_state_networknotfound - format:%s - START",
format)
port_state = 'DOWN'
new_port_state = 'ACTIVE'
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
update_port_req = testlib.update_port_request(self.tenant_id,
"A_BAD_ID", port_id,
new_port_state,
format)
update_port_res = update_port_req.get_response(self.api)
self.assertEqual(update_port_res.status_int, 420)
LOG.debug("_test_set_port_state_networknotfound - format:%s - END",
format)
def _test_set_port_state_portnotfound(self, format):
LOG.debug("_test_set_port_state_portnotfound - format:%s - START",
format)
port_state = 'DOWN'
new_port_state = 'ACTIVE'
network_id = self._create_network(format)
self._create_port(network_id, port_state, format)
update_port_req = testlib.update_port_request(self.tenant_id,
network_id,
"A_BAD_ID",
new_port_state,
format)
update_port_res = update_port_req.get_response(self.api)
self.assertEqual(update_port_res.status_int, 430)
LOG.debug("_test_set_port_state_portnotfound - format:%s - END",
format)
def _test_set_port_state_stateinvalid(self, format):
LOG.debug("_test_set_port_state_stateinvalid - format:%s - START",
format)
port_state = 'DOWN'
new_port_state = 'A_BAD_STATE'
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
update_port_req = testlib.update_port_request(self.tenant_id,
network_id, port_id,
new_port_state,
format)
update_port_res = update_port_req.get_response(self.api)
self.assertEqual(update_port_res.status_int, 431)
LOG.debug("_test_set_port_state_stateinvalid - format:%s - END",
format)
def _test_show_attachment(self, format):
LOG.debug("_test_show_attachment - format:%s - START", format)
content_type = "application/%s" % format
port_state = "ACTIVE"
network_id = self._create_network(format)
interface_id = "test_interface"
port_id = self._create_port(network_id, port_state, format)
put_attachment_req = testlib.put_attachment_request(self.tenant_id,
network_id,
port_id,
interface_id,
format)
put_attachment_res = put_attachment_req.get_response(self.api)
self.assertEqual(put_attachment_res.status_int, 202)
get_attachment_req = testlib.get_attachment_request(self.tenant_id,
network_id,
port_id,
format)
get_attachment_res = get_attachment_req.get_response(self.api)
attachment_data = Serializer().deserialize(get_attachment_res.body,
content_type)
self.assertEqual(attachment_data['attachment'], interface_id)
LOG.debug("_test_show_attachment - format:%s - END", format)
def _test_show_attachment_networknotfound(self, format):
LOG.debug("_test_show_attachment_networknotfound - format:%s - START",
format)
port_state = "ACTIVE"
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
get_attachment_req = testlib.get_attachment_request(self.tenant_id,
"A_BAD_ID",
port_id,
format)
get_attachment_res = get_attachment_req.get_response(self.api)
self.assertEqual(get_attachment_res.status_int, 420)
LOG.debug("_test_show_attachment_networknotfound - format:%s - END",
format)
def _test_show_attachment_portnotfound(self, format):
LOG.debug("_test_show_attachment_portnotfound - format:%s - START",
format)
port_state = "ACTIVE"
network_id = self._create_network(format)
self._create_port(network_id, port_state, format)
get_attachment_req = testlib.get_attachment_request(self.tenant_id,
network_id,
"A_BAD_ID",
format)
get_attachment_res = get_attachment_req.get_response(self.api)
self.assertEqual(get_attachment_res.status_int, 430)
LOG.debug("_test_show_attachment_portnotfound - format:%s - END",
format)
def _test_put_attachment(self, format):
LOG.debug("_test_put_attachment - format:%s - START", format)
port_state = "ACTIVE"
network_id = self._create_network(format)
interface_id = "test_interface"
port_id = self._create_port(network_id, port_state, format)
put_attachment_req = testlib.put_attachment_request(self.tenant_id,
network_id,
port_id,
interface_id,
format)
put_attachment_res = put_attachment_req.get_response(self.api)
self.assertEqual(put_attachment_res.status_int, 202)
LOG.debug("_test_put_attachment - format:%s - END", format)
def _test_put_attachment_networknotfound(self, format):
LOG.debug("_test_put_attachment_networknotfound - format:%s - START",
format)
port_state = 'DOWN'
interface_id = "test_interface"
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
put_attachment_req = testlib.put_attachment_request(self.tenant_id,
"A_BAD_ID",
port_id,
interface_id,
format)
put_attachment_res = put_attachment_req.get_response(self.api)
self.assertEqual(put_attachment_res.status_int, 420)
LOG.debug("_test_put_attachment_networknotfound - format:%s - END",
format)
def _test_put_attachment_portnotfound(self, format):
LOG.debug("_test_put_attachment_portnotfound - format:%s - START",
format)
port_state = 'DOWN'
interface_id = "test_interface"
network_id = self._create_network(format)
self._create_port(network_id, port_state, format)
put_attachment_req = testlib.put_attachment_request(self.tenant_id,
network_id,
"A_BAD_ID",
interface_id,
format)
put_attachment_res = put_attachment_req.get_response(self.api)
self.assertEqual(put_attachment_res.status_int, 430)
LOG.debug("_test_put_attachment_portnotfound - format:%s - END",
format)
def _test_delete_attachment(self, format):
LOG.debug("_test_delete_attachment - format:%s - START", format)
port_state = "ACTIVE"
network_id = self._create_network(format)
interface_id = "test_interface"
port_id = self._create_port(network_id, port_state, format)
put_attachment_req = testlib.put_attachment_request(self.tenant_id,
network_id,
port_id,
interface_id,
format)
put_attachment_res = put_attachment_req.get_response(self.api)
self.assertEqual(put_attachment_res.status_int, 202)
del_attachment_req = testlib.delete_attachment_request(self.tenant_id,
network_id,
port_id,
format)
del_attachment_res = del_attachment_req.get_response(self.api)
self.assertEqual(del_attachment_res.status_int, 202)
LOG.debug("_test_delete_attachment - format:%s - END", format)
def _test_delete_attachment_networknotfound(self, format):
LOG.debug("_test_delete_attachment_networknotfound -" \
" format:%s - START", format)
port_state = "ACTIVE"
network_id = self._create_network(format)
port_id = self._create_port(network_id, port_state, format)
del_attachment_req = testlib.delete_attachment_request(self.tenant_id,
"A_BAD_ID",
port_id,
format)
del_attachment_res = del_attachment_req.get_response(self.api)
self.assertEqual(del_attachment_res.status_int, 420)
LOG.debug("_test_delete_attachment_networknotfound -" \
" format:%s - END", format)
def _test_delete_attachment_portnotfound(self, format):
LOG.debug("_test_delete_attachment_portnotfound - " \
" format:%s - START", format)
port_state = "ACTIVE"
network_id = self._create_network(format)
self._create_port(network_id, port_state, format)
del_attachment_req = testlib.delete_attachment_request(self.tenant_id,
network_id,
"A_BAD_ID",
format)
del_attachment_res = del_attachment_req.get_response(self.api)
self.assertEqual(del_attachment_res.status_int, 430)
LOG.debug("_test_delete_attachment_portnotfound - " \
"format:%s - END", format)
def setUp(self):
options = {}
options['plugin_provider'] = 'quantum.plugins.SamplePlugin.FakePlugin'
self.api = server.APIRouterV01(options)
self.tenant_id = "test_tenant"
self.network_name = "test_network"
def tearDown(self):
"""Clear the test environment"""
# Remove database contents
db.clear_db()
def test_list_networks_json(self):
self._test_list_networks('json')
def test_list_networks_xml(self):
self._test_list_networks('xml')
def test_create_network_json(self):
self._test_create_network('json')
def test_create_network_xml(self):
self._test_create_network('xml')
def test_create_network_badrequest_json(self):
self._test_create_network_badrequest('json')
def test_create_network_badreqyest_xml(self):
self._test_create_network_badrequest('xml')
def test_show_network_not_found_json(self):
self._test_show_network_not_found('json')
def test_show_network_not_found_xml(self):
self._test_show_network_not_found('xml')
def test_show_network_json(self):
self._test_show_network('json')
def test_show_network_xml(self):
self._test_show_network('xml')
def test_delete_network_json(self):
self._test_delete_network('json')
def test_delete_network_xml(self):
self._test_delete_network('xml')
def test_rename_network_json(self):
self._test_rename_network('json')
def test_rename_network_xml(self):
self._test_rename_network('xml')
def test_rename_network_badrequest_json(self):
self._test_rename_network_badrequest('json')
def test_rename_network_badrequest_xml(self):
self._test_rename_network_badrequest('xml')
def test_rename_network_not_found_json(self):
self._test_rename_network_not_found('json')
def test_rename_network_not_found_xml(self):
self._test_rename_network_not_found('xml')
def test_delete_network_in_use_json(self):
self._test_delete_network_in_use('json')
def test_delete_network_in_use_xml(self):
self._test_delete_network_in_use('xml')
def test_list_ports_json(self):
self._test_list_ports('json')
def test_list_ports_xml(self):
self._test_list_ports('xml')
def test_show_port_json(self):
self._test_show_port('json')
def test_show_port_xml(self):
self._test_show_port('xml')
def test_show_port_networknotfound_json(self):
self._test_show_port_networknotfound('json')
def test_show_port_networknotfound_xml(self):
self._test_show_port_networknotfound('xml')
def test_show_port_portnotfound_json(self):
self._test_show_port_portnotfound('json')
def test_show_port_portnotfound_xml(self):
self._test_show_port_portnotfound('xml')
def test_create_port_json(self):
self._test_create_port('json')
def test_create_port_xml(self):
self._test_create_port('xml')
def test_create_port_networknotfound_json(self):
self._test_create_port_networknotfound('json')
def test_create_port_networknotfound_xml(self):
self._test_create_port_networknotfound('xml')
def test_create_port_badrequest_json(self):
self._test_create_port_badrequest('json')
def test_create_port_badrequest_xml(self):
self._test_create_port_badrequest('xml')
def test_delete_port_xml(self):
self._test_delete_port('xml')
def test_delete_port_json(self):
self._test_delete_port('json')
def test_delete_port_in_use_xml(self):
self._test_delete_port_in_use('xml')
def test_delete_port_in_use_json(self):
self._test_delete_port_in_use('json')
def test_delete_port_networknotfound_xml(self):
self._test_delete_port_networknotfound('xml')
def test_delete_port_networknotfound_json(self):
self._test_delete_port_networknotfound('json')
def test_delete_port_with_bad_id_xml(self):
self._test_delete_port_with_bad_id('xml')
def test_delete_port_with_bad_id_json(self):
self._test_delete_port_with_bad_id('json')
def test_set_port_state_xml(self):
self._test_set_port_state('xml')
def test_set_port_state_json(self):
self._test_set_port_state('json')
def test_set_port_state_networknotfound_xml(self):
self._test_set_port_state_networknotfound('xml')
def test_set_port_state_networknotfound_json(self):
self._test_set_port_state_networknotfound('json')
def test_set_port_state_portnotfound_xml(self):
self._test_set_port_state_portnotfound('xml')
def test_set_port_state_portnotfound_json(self):
self._test_set_port_state_portnotfound('json')
def test_set_port_state_stateinvalid_xml(self):
self._test_set_port_state_stateinvalid('xml')
def test_set_port_state_stateinvalid_json(self):
self._test_set_port_state_stateinvalid('json')
def test_show_attachment_xml(self):
self._test_show_attachment('xml')
def test_show_attachment_json(self):
self._test_show_attachment('json')
def test_show_attachment_networknotfound_xml(self):
self._test_show_attachment_networknotfound('xml')
def test_show_attachment_networknotfound_json(self):
self._test_show_attachment_networknotfound('json')
def test_show_attachment_portnotfound_xml(self):
self._test_show_attachment_portnotfound('xml')
def test_show_attachment_portnotfound_json(self):
self._test_show_attachment_portnotfound('json')
def test_put_attachment_xml(self):
self._test_put_attachment('xml')
def test_put_attachment_json(self):
self._test_put_attachment('json')
def test_put_attachment_networknotfound_xml(self):
self._test_put_attachment_networknotfound('xml')
def test_put_attachment_networknotfound_json(self):
self._test_put_attachment_networknotfound('json')
def test_put_attachment_portnotfound_xml(self):
self._test_put_attachment_portnotfound('xml')
def test_put_attachment_portnotfound_json(self):
self._test_put_attachment_portnotfound('json')
def test_delete_attachment_xml(self):
self._test_delete_attachment('xml')
def test_delete_attachment_json(self):
self._test_delete_attachment('json')
def test_delete_attachment_networknotfound_xml(self):
self._test_delete_attachment_networknotfound('xml')
def test_delete_attachment_networknotfound_json(self):
self._test_delete_attachment_networknotfound('json')
def test_delete_attachment_portnotfound_xml(self):
self._test_delete_attachment_portnotfound('xml')
def test_delete_attachment_portnotfound_json(self):
self._test_delete_attachment_portnotfound('json')

130
tests/unit/testlib_api.py Normal file
View File

@ -0,0 +1,130 @@
import webob
from quantum.common.wsgi import Serializer
def create_request(path, body, content_type, method='GET'):
req = webob.Request.blank(path)
req.method = method
req.headers = {}
req.headers['Accept'] = content_type
req.body = body
return req
def network_list_request(tenant_id, format='xml'):
method = 'GET'
path = "/tenants/%(tenant_id)s/networks.%(format)s" % locals()
content_type = "application/%s" % format
return create_request(path, None, content_type, method)
def show_network_request(tenant_id, network_id, format='xml'):
method = 'GET'
path = "/tenants/%(tenant_id)s/networks" \
"/%(network_id)s.%(format)s" % locals()
content_type = "application/%s" % format
return create_request(path, None, content_type, method)
def new_network_request(tenant_id, network_name='new_name',
format='xml', custom_req_body=None):
method = 'POST'
path = "/tenants/%(tenant_id)s/networks.%(format)s" % locals()
data = custom_req_body or {'network': {'net-name': '%s' % network_name}}
content_type = "application/%s" % format
body = Serializer().serialize(data, content_type)
return create_request(path, body, content_type, method)
def update_network_request(tenant_id, network_id, network_name, format='xml',
custom_req_body=None):
method = 'PUT'
path = "/tenants/%(tenant_id)s/networks" \
"/%(network_id)s.%(format)s" % locals()
data = custom_req_body or {'network': {'net-name': '%s' % network_name}}
content_type = "application/%s" % format
body = Serializer().serialize(data, content_type)
return create_request(path, body, content_type, method)
def network_delete_request(tenant_id, network_id, format='xml'):
method = 'DELETE'
path = "/tenants/%(tenant_id)s/networks/" \
"%(network_id)s.%(format)s" % locals()
content_type = "application/%s" % format
return create_request(path, None, content_type, method)
def port_list_request(tenant_id, network_id, format='xml'):
method = 'GET'
path = "/tenants/%(tenant_id)s/networks/" \
"%(network_id)s/ports.%(format)s" % locals()
content_type = "application/%s" % format
return create_request(path, None, content_type, method)
def show_port_request(tenant_id, network_id, port_id, format='xml'):
method = 'GET'
path = "/tenants/%(tenant_id)s/networks/%(network_id)s" \
"/ports/%(port_id)s.%(format)s" % locals()
content_type = "application/%s" % format
return create_request(path, None, content_type, method)
def new_port_request(tenant_id, network_id, port_state,
format='xml', custom_req_body=None):
method = 'POST'
path = "/tenants/%(tenant_id)s/networks/" \
"%(network_id)s/ports.%(format)s" % locals()
data = custom_req_body or {'port': {'port-state': '%s' % port_state}}
content_type = "application/%s" % format
body = Serializer().serialize(data, content_type)
return create_request(path, body, content_type, method)
def port_delete_request(tenant_id, network_id, port_id, format='xml'):
method = 'DELETE'
path = "/tenants/%(tenant_id)s/networks/" \
"%(network_id)s/ports/%(port_id)s.%(format)s" % locals()
content_type = "application/%s" % format
return create_request(path, None, content_type, method)
def update_port_request(tenant_id, network_id, port_id, port_state,
format='xml', custom_req_body=None):
method = 'PUT'
path = "/tenants/%(tenant_id)s/networks" \
"/%(network_id)s/ports/%(port_id)s.%(format)s" % locals()
data = custom_req_body or {'port': {'port-state': '%s' % port_state}}
content_type = "application/%s" % format
body = Serializer().serialize(data, content_type)
return create_request(path, body, content_type, method)
def get_attachment_request(tenant_id, network_id, port_id, format='xml'):
method = 'GET'
path = "/tenants/%(tenant_id)s/networks/" \
"%(network_id)s/ports/%(port_id)s/attachment.%(format)s" % locals()
content_type = "application/%s" % format
return create_request(path, None, content_type, method)
def put_attachment_request(tenant_id, network_id, port_id,
attachment_id, format='xml'):
method = 'PUT'
path = "/tenants/%(tenant_id)s/networks/" \
"%(network_id)s/ports/%(port_id)s/attachment.%(format)s" % locals()
data = {'port': {'attachment-id': attachment_id}}
content_type = "application/%s" % format
body = Serializer().serialize(data, content_type)
return create_request(path, body, content_type, method)
def delete_attachment_request(tenant_id, network_id, port_id,
attachment_id, format='xml'):
method = 'DELETE'
path = "/tenants/%(tenant_id)s/networks/" \
"%(network_id)s/ports/%(port_id)s/attachment.%(format)s" % locals()
content_type = "application/%s" % format
return create_request(path, None, content_type, method)