merging with lp:quantum

This commit is contained in:
rohitagarwalla
2011-08-23 12:36:06 -07:00
12 changed files with 598 additions and 152 deletions

View File

@@ -240,7 +240,7 @@ def api_plug_iface(client, *args):
res = client.attach_resource(nid, pid, data)
except Exception, e:
LOG.error("Failed to plug iface \"%s\" to port \"%s\": %s" % (vid,
pid, output))
pid, e))
return
LOG.debug(res)
print "Plugged interface \"%s\" to port:%s on network:%s" % (vid, pid, nid)
@@ -386,6 +386,6 @@ if __name__ == "__main__":
commands[cmd]["api_func"](client, *args)
else:
quantum = QuantumManager()
manager = quantum.get_manager()
manager = quantum.get_plugin()
commands[cmd]["func"](manager, *args)
sys.exit(0)

View File

@@ -20,16 +20,32 @@ import httplib
import socket
import urllib
from quantum.common.wsgi import Serializer
from quantum.common import exceptions
EXCEPTIONS = {
400: exceptions.BadInputError,
401: exceptions.NotAuthorized,
420: exceptions.NetworkNotFound,
421: exceptions.NetworkInUse,
422: exceptions.NetworkNameExists,
430: exceptions.PortNotFound,
431: exceptions.StateInvalid,
432: exceptions.PortInUse,
440: exceptions.AlreadyAttached,
441: exceptions.AttachmentNotReady,
}
class api_call(object):
class ApiCall(object):
"""A Decorator to add support for format and tenant overriding"""
def __init__(self, f):
self.f = f
def __init__(self, function):
self.function = function
def __get__(self, instance, owner):
def with_params(*args, **kwargs):
# Temporarily set format and tenant for this request
"""
Temporarily sets the format and tenant for this request
"""
(format, tenant) = (instance.format, instance.tenant)
if 'format' in kwargs:
@@ -37,7 +53,7 @@ class api_call(object):
if 'tenant' in kwargs:
instance.tenant = kwargs['tenant']
ret = self.f(instance, *args)
ret = self.function(instance, *args)
(instance.format, instance.tenant) = (format, tenant)
return ret
return with_params
@@ -49,7 +65,7 @@ class Client(object):
action_prefix = '/v0.1/tenants/{tenant_id}'
"""Action query strings"""
# Action query strings
networks_path = "/networks"
network_path = "/networks/%s"
ports_path = "/networks/%s/ports"
@@ -133,9 +149,9 @@ class Client(object):
certs = dict((x, certs[x]) for x in certs if certs[x] != None)
if self.use_ssl and len(certs):
c = connection_type(self.host, self.port, **certs)
conn = connection_type(self.host, self.port, **certs)
else:
c = connection_type(self.host, self.port)
conn = connection_type(self.host, self.port)
if self.logger:
self.logger.debug("Quantum Client Request:\n" \
@@ -143,8 +159,8 @@ class Client(object):
if body:
self.logger.debug(body)
c.request(method, action, body, headers)
res = c.getresponse()
conn.request(method, action, body, headers)
res = conn.getresponse()
status_code = self.get_status_code(res)
data = res.read()
@@ -158,6 +174,8 @@ class Client(object):
httplib.NO_CONTENT):
return self.deserialize(data, status_code)
else:
if res.status in EXCEPTIONS:
raise EXCEPTIONS[res.status]()
raise Exception("Server returned error: %s" % res.read())
except (socket.error, IOError), e:
@@ -175,6 +193,10 @@ class Client(object):
return response.status
def serialize(self, data):
"""
Serializes a dictionary with a single key (which can contain any
structure) into either xml or json
"""
if data is None:
return None
elif type(data) is dict:
@@ -184,65 +206,72 @@ class Client(object):
% type(data))
def deserialize(self, data, status_code):
"""
Deserializes a an xml or json string into a dictionary
"""
if status_code == 202:
return data
return Serializer().deserialize(data, self.content_type())
def content_type(self, format=None):
"""
Returns the mime-type for either 'xml' or 'json'. Defaults to the
currently set format
"""
if not format:
format = self.format
return "application/%s" % (format)
@api_call
@ApiCall
def list_networks(self):
"""
Fetches a list of all networks for a tenant
"""
return self.do_request("GET", self.networks_path)
@api_call
@ApiCall
def show_network_details(self, network):
"""
Fetches the details of a certain network
"""
return self.do_request("GET", self.network_path % (network))
@api_call
@ApiCall
def create_network(self, body=None):
"""
Creates a new network
"""
return self.do_request("POST", self.networks_path, body=body)
@api_call
@ApiCall
def update_network(self, network, body=None):
"""
Updates a network
"""
return self.do_request("PUT", self.network_path % (network), body=body)
@api_call
@ApiCall
def delete_network(self, network):
"""
Deletes the specified network
"""
return self.do_request("DELETE", self.network_path % (network))
@api_call
@ApiCall
def list_ports(self, network):
"""
Fetches a list of ports on a given network
"""
return self.do_request("GET", self.ports_path % (network))
@api_call
@ApiCall
def show_port_details(self, network, port):
"""
Fetches the details of a certain port
"""
return self.do_request("GET", self.port_path % (network, port))
@api_call
@ApiCall
def create_port(self, network, body=None):
"""
Creates a new port on a given network
@@ -250,14 +279,14 @@ class Client(object):
body = self.serialize(body)
return self.do_request("POST", self.ports_path % (network), body=body)
@api_call
@ApiCall
def delete_port(self, network, port):
"""
Deletes the specified port from a network
"""
return self.do_request("DELETE", self.port_path % (network, port))
@api_call
@ApiCall
def set_port_state(self, network, port, body=None):
"""
Sets the state of the specified port
@@ -265,14 +294,14 @@ class Client(object):
return self.do_request("PUT",
self.port_path % (network, port), body=body)
@api_call
@ApiCall
def show_port_attachment(self, network, port):
"""
Fetches the attachment-id associated with the specified port
"""
return self.do_request("GET", self.attachment_path % (network, port))
@api_call
@ApiCall
def attach_resource(self, network, port, body=None):
"""
Sets the attachment-id of the specified port
@@ -280,7 +309,7 @@ class Client(object):
return self.do_request("PUT",
self.attachment_path % (network, port), body=body)
@api_call
@ApiCall
def detach_resource(self, network, port):
"""
Removes the attachment-id of the specified port

View File

@@ -111,6 +111,10 @@ class AlreadyAttached(QuantumException):
"already plugged into port %(att_port_id)s")
class AttachmentNotReady(QuantumException):
message = _("The attachment %(att_id)s is not ready")
class NetworkNameExists(QuantumException):
message = _("Unable to set network name to %(net_name). " \
"Network with id %(net_id) already has this name for " \

View File

@@ -226,9 +226,22 @@ class ExtensionMiddleware(wsgi.Middleware):
for resource in self.ext_mgr.get_resources():
LOG.debug(_('Extended resource: %s'),
resource.collection)
for action, method in resource.collection_actions.iteritems():
path_prefix = ""
parent = resource.parent
conditions = dict(method=[method])
path = "/%s/%s" % (resource.collection, action)
if parent:
path_prefix = "/%s/{%s_id}" % (parent["collection_name"],
parent["member_name"])
with mapper.submapper(controller=resource.controller,
action=action,
path_prefix=path_prefix,
conditions=conditions) as submap:
submap.connect(path)
submap.connect("%s.:(format)" % path)
mapper.resource(resource.collection, resource.collection,
controller=resource.controller,
collection=resource.collection_actions,
member=resource.member_actions,
parent_resource=resource.parent)

278
quantum/common/test_lib.py Normal file
View File

@@ -0,0 +1,278 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import gettext
import os
import unittest
import sys
import logging
from nose import result
from nose import core
from nose import config
class _AnsiColorizer(object):
"""
A colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
Colorizer classes must implement C{supported()} and C{write(text, color)}.
"""
_colors = dict(black=30, red=31, green=32, yellow=33,
blue=34, magenta=35, cyan=36, white=37)
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
raise
# guess false in case of error
return False
supported = classmethod(supported)
def write(self, text, color):
"""
Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
@param color: A string label for a color. e.g. 'red', 'white'.
"""
color = self._colors[color]
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
class _Win32Colorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
from win32console import GetStdHandle, STD_OUT_HANDLE, \
FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
FOREGROUND_INTENSITY
red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
FOREGROUND_BLUE, FOREGROUND_INTENSITY)
self.stream = stream
self.screenBuffer = GetStdHandle(STD_OUT_HANDLE)
self._colors = {
'normal': red | green | blue,
'red': red | bold,
'green': green | bold,
'blue': blue | bold,
'yellow': red | green | bold,
'magenta': red | blue | bold,
'cyan': green | blue | bold,
'white': red | green | blue | bold}
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
supported = classmethod(supported)
def write(self, text, color):
color = self._colors[color]
self.screenBuffer.SetConsoleTextAttribute(color)
self.stream.write(text)
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
class _NullColorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
return True
supported = classmethod(supported)
def write(self, text, color):
self.stream.write(text)
class QuantumTestResult(result.TextTestResult):
def __init__(self, *args, **kw):
result.TextTestResult.__init__(self, *args, **kw)
self._last_case = None
self.colorizer = None
# NOTE(vish, tfukushima): reset stdout for the terminal check
stdout = sys.__stdout__
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
if colorizer.supported():
self.colorizer = colorizer(self.stream)
break
sys.stdout = stdout
def getDescription(self, test):
return str(test)
# NOTE(vish, tfukushima): copied from unittest with edit to add color
def addSuccess(self, test):
unittest.TestResult.addSuccess(self, test)
if self.showAll:
self.colorizer.write("OK", 'green')
self.stream.writeln()
elif self.dots:
self.stream.write('.')
self.stream.flush()
# NOTE(vish, tfukushima): copied from unittest with edit to add color
def addFailure(self, test, err):
unittest.TestResult.addFailure(self, test, err)
if self.showAll:
self.colorizer.write("FAIL", 'red')
self.stream.writeln()
elif self.dots:
self.stream.write('F')
self.stream.flush()
# NOTE(vish, tfukushima): copied from unittest with edit to add color
def addError(self, test, err):
"""Overrides normal addError to add support for errorClasses.
If the exception is a registered class, the error will be added
to the list for that class, not errors.
"""
stream = getattr(self, 'stream', None)
ec, ev, tb = err
try:
exc_info = self._exc_info_to_string(err, test)
except TypeError:
# This is for compatibility with Python 2.3.
exc_info = self._exc_info_to_string(err)
for cls, (storage, label, isfail) in self.errorClasses.items():
if result.isclass(ec) and issubclass(ec, cls):
if isfail:
test.passwd = False
storage.append((test, exc_info))
# Might get patched into a streamless result
if stream is not None:
if self.showAll:
message = [label]
detail = result._exception_details(err[1])
if detail:
message.append(detail)
stream.writeln(": ".join(message))
elif self.dots:
stream.write(label[:1])
return
self.errors.append((test, exc_info))
test.passed = False
if stream is not None:
if self.showAll:
self.colorizer.write("ERROR", 'red')
self.stream.writeln()
elif self.dots:
stream.write('E')
def startTest(self, test):
unittest.TestResult.startTest(self, test)
current_case = test.test.__class__.__name__
if self.showAll:
if current_case != self._last_case:
self.stream.writeln(current_case)
self._last_case = current_case
self.stream.write(
' %s' % str(test.test._testMethodName).ljust(60))
self.stream.flush()
class QuantumTestRunner(core.TextTestRunner):
def _makeResult(self):
return QuantumTestResult(self.stream,
self.descriptions,
self.verbosity,
self.config)
def run_tests(c):
logger = logging.getLogger()
hdlr = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.DEBUG)
runner = QuantumTestRunner(stream=c.stream,
verbosity=c.verbosity,
config=c)
return not core.run(config=c, testRunner=runner)
# describes parameters used by different unit/functional tests
# a plugin-specific testing mechanism should import this dictionary
# and override the values in it if needed (e.g., run_tests.py in
# quantum/plugins/openvswitch/ )
test_config = {
"plugin_name": "quantum.plugins.SamplePlugin.FakePlugin",
}

View File

@@ -22,7 +22,6 @@ import logging as LOG
from optparse import OptionParser
import os
import sys
import unittest
from quantum.common import exceptions as q_exc
from quantum.quantum_plugin_base import QuantumPluginBase
@@ -200,41 +199,3 @@ class OVSQuantumPlugin(QuantumPluginBase):
def get_interface_details(self, tenant_id, net_id, port_id):
res = db.port_get(port_id, net_id)
return res.interface_id
class VlanMapTest(unittest.TestCase):
def setUp(self):
self.vmap = VlanMap()
def tearDown(self):
pass
def testAddVlan(self):
vlan_id = self.vmap.acquire("foobar")
self.assertTrue(vlan_id == 2)
def testReleaseVlan(self):
vlan_id = self.vmap.acquire("foobar")
self.vmap.release("foobar")
self.assertTrue(self.vmap.get(vlan_id) == None)
if __name__ == "__main__":
usagestr = "Usage: %prog [OPTIONS] <command> [args]"
parser = OptionParser(usage=usagestr)
parser.add_option("-v", "--verbose", dest="verbose",
action="store_true", default=False, help="turn on verbose logging")
options, args = parser.parse_args()
if options.verbose:
LOG.basicConfig(level=LOG.DEBUG)
else:
LOG.basicConfig(level=LOG.WARN)
# Make sqlalchemy quieter
LOG.getLogger('sqlalchemy.engine').setLevel(LOG.WARN)
# Run the tests
suite = unittest.TestLoader().loadTestsFromTestCase(VlanMapTest)
unittest.TextTestRunner(verbosity=2).run(suite)

View File

@@ -0,0 +1,89 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unittest runner for quantum OVS plugin
This file should be run from the top dir in the quantum directory
To run all test::
python quantum/plugins/openvswitch/run_tests.py
To run all unit tests::
python quantum/plugins/openvswitch/run_tests.py unit
To run all functional tests::
python quantum/plugins/openvswitch/run_tests.py functional
To run a single unit test::
python quantum/plugins/openvswitch/run_tests.py \
unit.test_stores:TestSwiftBackend.test_get
To run a single functional test::
python quantum/plugins/openvswitch/run_tests.py \
functional.test_service:TestController.test_create
To run a single unit test module::
python quantum/plugins/openvswitch/run_tests.py unit.test_stores
To run a single functional test module::
python quantum/plugins/openvswitch/run_tests.py functional.test_stores
"""
import gettext
import logging
import os
import unittest
import sys
from nose import config
sys.path.append(os.getcwd())
from quantum.common.test_lib import run_tests, test_config
from quantum.plugins.openvswitch.tests.test_vlan_map import VlanMapTest
if __name__ == '__main__':
exit_status = False
# if a single test case was specified,
# we should only invoked the tests once
invoke_once = len(sys.argv) > 1
cwd = os.getcwd()
working_dir = os.path.abspath("tests")
c = config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=working_dir)
exit_status = run_tests(c)
if invoke_once:
sys.exit(0)
os.chdir(cwd)
working_dir = os.path.abspath("quantum/plugins/openvswitch/tests")
c = config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=working_dir)
exit_status = exit_status or run_tests(c)
sys.exit(exit_status)

View File

@@ -0,0 +1,36 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from quantum.plugins.openvswitch.ovs_quantum_plugin import VlanMap
class VlanMapTest(unittest.TestCase):
def setUp(self):
self.vmap = VlanMap()
def tearDown(self):
pass
def testAddVlan(self):
vlan_id = self.vmap.acquire("foobar")
self.assertTrue(vlan_id == 2)
def testReleaseVlan(self):
vlan_id = self.vmap.acquire("foobar")
self.vmap.release("foobar")
self.assertTrue(self.vmap.get(vlan_id) == None)

34
setup.py Normal file
View File

@@ -0,0 +1,34 @@
import os
import sys
from setuptools import setup, find_packages
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
requirements = ['httplib2','eventlet','routes','webob']
setup(
name = "Quantum",
version = "0.1",
description = "Layer 2 network as a service for Openstack",
long_description = read('README'),
url = 'http://launchpad.net/quantum',
license = 'Apache',
author = 'Netstack',
author_email = 'netstack@launchpad.net',
packages = find_packages(exclude=['tests']),
classifiers = [
'Development Status :: 4 - Beta',
'Environment :: Console',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'License :: OSI Approved :: BSD License',
'Operating System :: OS Independent',
'Programming Language :: Python',
],
namespace_packages = ["quantum"],
install_requires = requirements,
tests_require = ["nose"],
test_suite = "nose.collector",
)

View File

@@ -62,7 +62,7 @@ class ResourceExtensionTest(unittest.TestCase):
def custom_member_action(self, request, id):
return {'member_action': 'value'}
def custom_collection_action(self, request):
def custom_collection_action(self, request, **kwargs):
return {'collection': 'value'}
def test_resource_can_be_added_as_extension(self):
@@ -88,7 +88,7 @@ class ResourceExtensionTest(unittest.TestCase):
self.assertEqual(200, response.status_int)
self.assertEqual(json.loads(response.body)['member_action'], "value")
def test_resource_extension_with_custom_collection_action(self):
def test_resource_extension_for_get_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "GET"}
res_ext = extensions.ResourceExtension('tweedles', controller,
@@ -99,6 +99,69 @@ class ResourceExtensionTest(unittest.TestCase):
self.assertEqual(200, response.status_int)
self.assertEqual(json.loads(response.body)['collection'], "value")
def test_resource_extension_for_put_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "PUT"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
response = test_app.put("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(json.loads(response.body)['collection'], 'value')
def test_resource_extension_for_post_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "POST"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
response = test_app.post("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(json.loads(response.body)['collection'], 'value')
def test_resource_extension_for_delete_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "DELETE"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
response = test_app.delete("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(json.loads(response.body)['collection'], 'value')
def test_resource_ext_for_formatted_req_on_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "GET"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
response = test_app.get("/tweedles/custom_collection_action.json")
self.assertEqual(200, response.status_int)
self.assertEqual(json.loads(response.body)['collection'], "value")
def test_resource_ext_for_nested_resource_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "GET"}
parent = dict(collection_name='beetles', member_name='beetle')
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections,
parent=parent)
test_app = setup_extensions_test_app(SimpleExtensionManager(res_ext))
response = test_app.get("/beetles/beetle_id"
"/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(json.loads(response.body)['collection'], "value")
def test_returns_404_for_non_existant_extension(self):
test_app = setup_extensions_test_app(SimpleExtensionManager(None))

View File

@@ -15,106 +15,45 @@
# under the License.
# @author: Dan Wendlandt, Nicira Networks, Inc.
import httplib
import logging as LOG
import json
import socket
import sys
import urllib
from quantum.manager import QuantumManager
from optparse import OptionParser
from quantum.common.wsgi import Serializer
from quantum.cli import MiniClient
import sys
from quantum.client import Client
from quantum.manager import QuantumManager
FORMAT = "json"
CONTENT_TYPE = "application/" + FORMAT
def delete_all_nets(client, tenant_id):
res = client.do_request(tenant_id, 'GET', "/networks." + FORMAT)
resdict = json.loads(res.read())
LOG.debug(resdict)
for n in resdict["networks"]:
def delete_all_nets(client):
res = client.list_networks()
for n in res["networks"]:
nid = n["id"]
res = client.do_request(tenant_id, 'GET',
"/networks/%s/ports.%s" % (nid, FORMAT))
output = res.read()
if res.status != 200:
LOG.error("Failed to list ports: %s" % output)
continue
rd = json.loads(output)
LOG.debug(rd)
for port in rd["ports"]:
pid = port["id"]
data = {'port': {'attachment-id': ''}}
body = Serializer().serialize(data, CONTENT_TYPE)
res = client.do_request(tenant_id, 'DELETE',
"/networks/%s/ports/%s/attachment.%s" % \
(nid, pid, FORMAT), body=body)
output = res.read()
LOG.debug(output)
if res.status != 202:
LOG.error("Failed to unplug iface from port \"%s\": %s" % (vid,
pid, output))
continue
LOG.info("Unplugged interface from port:%s on network:%s" % (pid,
nid))
res = client.do_request(tenant_id, 'DELETE',
"/networks/%s/ports/%s.%s" % (nid, pid, FORMAT))
output = res.read()
if res.status != 202:
LOG.error("Failed to delete port: %s" % output)
continue
pres = client.list_ports(nid)
for port in pres["ports"]:
pid = port['id']
client.detach_resource(nid, pid)
client.delete_port(nid, pid)
print "Deleted Virtual Port:%s " \
"on Virtual Network:%s" % (pid, nid)
res = client.do_request(tenant_id, 'DELETE',
"/networks/" + nid + "." + FORMAT)
status = res.status
if status != 202:
Log.error("Failed to delete network: %s" % nid)
output = res.read()
print output
else:
client.delete_network(nid)
print "Deleted Virtual Network with ID:%s" % nid
def create_net_with_attachments(net_name, iface_ids):
def create_net_with_attachments(client, net_name, iface_ids):
data = {'network': {'net-name': '%s' % net_name}}
body = Serializer().serialize(data, CONTENT_TYPE)
res = client.do_request(tenant_id, 'POST',
"/networks." + FORMAT, body=body)
rd = json.loads(res.read())
LOG.debug(rd)
nid = rd["networks"]["network"]["id"]
res = client.create_network(data)
nid = res["networks"]["network"]["id"]
print "Created a new Virtual Network %s with ID:%s" % (net_name, nid)
for iface_id in iface_ids:
res = client.do_request(tenant_id, 'POST',
"/networks/%s/ports.%s" % (nid, FORMAT))
output = res.read()
if res.status != 200:
LOG.error("Failed to create port: %s" % output)
continue
rd = json.loads(output)
new_port_id = rd["ports"]["port"]["id"]
res = client.create_port(nid)
new_port_id = res["ports"]["port"]["id"]
print "Created Virtual Port:%s " \
"on Virtual Network:%s" % (new_port_id, nid)
data = {'port': {'attachment-id': '%s' % iface_id}}
body = Serializer().serialize(data, CONTENT_TYPE)
res = client.do_request(tenant_id, 'PUT',
"/networks/%s/ports/%s/attachment.%s" %\
(nid, new_port_id, FORMAT), body=body)
output = res.read()
LOG.debug(output)
if res.status != 202:
LOG.error("Failed to plug iface \"%s\" to port \"%s\": %s" % \
(iface_id, new_port_id, output))
continue
client.attach_resource(nid, new_port_id, data)
print "Plugged interface \"%s\" to port:%s on network:%s" % \
(iface_id, new_port_id, nid)
@@ -149,7 +88,6 @@ if __name__ == "__main__":
if len(args) < 1:
parser.print_help()
help()
sys.exit(1)
nets = {}
@@ -163,12 +101,13 @@ if __name__ == "__main__":
print "nets: %s" % str(nets)
client = MiniClient(options.host, options.port, options.ssl)
client = Client(options.host, options.port, options.ssl,
format='json', tenant=tenant_id)
if options.delete:
delete_all_nets(client, tenant_id)
delete_all_nets(client)
for net_name, iface_ids in nets.items():
create_net_with_attachments(net_name, iface_ids)
create_net_with_attachments(client, net_name, iface_ids)
sys.exit(0)