Moved client code from main melange repo.

Implements blueprint melange-client-repo
Changed cli tests to not depend on server-side models.
YAMLed cli output.
We now let templatization of the command line output with bottlepy's templating

Change-Id: I035d9ec8834da4a912475b29419a1d5c91202316
This commit is contained in:
Rajaram Mallya and Gavri Fernandez 2012-01-04 15:17:37 +05:30 committed by Zangetsu
parent f0e6fa2aa9
commit 484c7313ec
32 changed files with 3051 additions and 0 deletions

13
.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
*.pyc
*.DS_Store
local_settings.py
keeper
build/*
build-stamp
melange.egg-info
.melange-venv
.venv
*.sqlite
*.log
tags
*~

11
README Normal file
View File

@ -0,0 +1,11 @@
To run unit tests:
melange_client_dir> ./run_tests.sh melange_client.tests.unit
To run functional tests:
1. Start the melange server
2. Update the configuration values in
melange_client/tests/functional/tests.conf
to point to the melange server
3. Run the tests:
melange_client_dir> ./run_tests.sh melange_client.tests.functional

252
bin/melange Executable file
View File

@ -0,0 +1,252 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""CLI interface for common Melange client opertaions.
Simple cli for creating ip blocks, adding policies and rules for ip address
allocations from these blocks.
"""
import optparse
import os
from os import environ as env
import sys
import yaml
# If ../melange_client/__init__.py exists, add ../ to Python search path, so
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir,
'melange_client',
'__init__.py')):
sys.path.insert(0, possible_topdir)
import melange_client
from melange_client import client as base_client
from melange_client import exception
from melange_client import inspector
from melange_client import ipam_client
from melange_client import template
def create_options(parser):
"""Sets up the CLI and config-file options.
:param parser: The option parser
:returns: None
"""
parser.add_option('-v', '--verbose', default=False, action="store_true",
help="Print more verbose output")
parser.add_option('-H', '--host', metavar="ADDRESS", default="0.0.0.0",
help="Address of Melange API host. "
"Default: %default")
parser.add_option('-p', '--port', dest="port", metavar="PORT",
type=int, default=9898,
help="Port the Melange API host listens on. "
"Default: %default")
parser.add_option('-t', '--tenant', dest="tenant", metavar="TENANT",
type=str, default=env.get('MELANGE_TENANT', None),
help="tenant id in case of tenant resources")
parser.add_option('--auth-token', dest="auth_token",
metavar="MELANGE_AUTH_TOKEN",
default=env.get('MELANGE_AUTH_TOKEN', None),
type=str, help="Auth token received from keystone")
parser.add_option('-u', '--username', dest="username",
metavar="MELANGE_USERNAME",
default=env.get('MELANGE_USERNAME', None),
type=str, help="Melange user name")
parser.add_option('-k', '--api-key', dest="api_key",
metavar="MELANGE_API_KEY",
default=env.get('MELANGE_API_KEY', None),
type=str, help="Melange access key")
parser.add_option('-a', '--auth-url', dest="auth_url",
metavar="MELANGE_AUTH_URL", type=str,
default=env.get('MELANGE_AUTH_URL', None),
help="Url of keystone service")
parser.add_option('--timeout', dest="timeout",
metavar="MELANGE_TIME_OUT", type=int,
default=env.get('MELANGE_TIME_OUT', None),
help="timeout for melange client operations")
def parse_options(parser, cli_args):
"""Parses CLI options.
Returns parsed CLI options, command to run and its arguments, merged
with any same-named options found in a configuration file
:param parser: The option parser
:returns: (options, args)
"""
(options, args) = parser.parse_args(cli_args)
if not args:
parser.print_usage()
sys.exit(2)
return (options, args)
def usage():
usage = """
%prog category action [args] [options]
Available categories:
"""
return usage + client_category_usage()
def client_category_usage():
usage = "\n"
for category in client_categories:
usage = usage + ("\t%s\n" % category)
return usage
client_categories = ['ip_block', 'subnet', 'policy', 'unusable_ip_range',
'unusable_ip_octet', 'allocated_ip', 'ip_address',
'ip_route', 'interface', 'mac_address_range',
'allowed_ip']
def lookup_methods(name, hash):
result = hash.get(name, None)
if not result:
print "The second parameter should be one of the following:"
print_keys(hash)
sys.exit(2)
return result
def lookup_client_categories(category, factory):
category_class = getattr(factory, category, None)
if not category_class:
print "The first parameter should be one of the following:"
print client_category_usage()
sys.exit(2)
return category_class
def print_keys(hash):
for k, _v in hash.iteritems():
print "\t%s" % k
def auth_client(options):
if options.auth_url or options.auth_token:
return base_client.AuthorizationClient(options.auth_url,
options.username,
options.api_key,
options.auth_token)
def view(data, template_name):
data = data or {}
try:
view_path = os.path.join(melange_client.melange_root_path(), 'views')
return template.template(template_name,
template_lookup=[view_path], **data)
except exception.TemplateNotFoundError:
return yaml.safe_dump(data, indent=4, default_flow_style=False)
def args_to_dict(args):
try:
return dict(arg.split("=") for arg in args)
except ValueError:
raise exception.MelangeClientError("Action arguments should be "
"of the form of field=value")
def main():
oparser = optparse.OptionParser(version='%%prog 0.1',
usage=usage())
create_options(oparser)
(options, args) = parse_options(oparser, sys.argv[1:])
script_name = os.path.basename(sys.argv[0])
category = args.pop(0)
http_client = base_client.HTTPClient(options.host,
options.port,
options.timeout)
factory = ipam_client.Factory(options.host,
options.port,
timeout=options.timeout,
auth_url=options.auth_url,
username=options.username,
api_key=options.api_key,
auth_token=options.auth_token,
tenant_id=options.tenant)
client = lookup_client_categories(category, factory)
client_actions = inspector.ClassInspector(client).methods()
if len(args) < 1:
print _("Usage: %s category action [<args>]") % script_name
print _("Available actions for %s category:") % category
print_keys(client_actions)
sys.exit(2)
if client.TENANT_ID_REQUIRED and not options.tenant:
print _("Please provide a tenant id for this action."
"You can use option '-t' to provide the tenant id.")
sys.exit(2)
action = args.pop(0)
fn = lookup_methods(action, client_actions)
def get_response(fn, args):
try:
response = fn(**args_to_dict(args))
return response
except TypeError:
print _("Possible wrong number of arguments supplied")
print _("Usage: %s %s %s") % (script_name,
category,
inspector.MethodInspector(fn))
if options.verbose:
raise
sys.exit(2)
try:
response = get_response(fn, args)
template_name = category + "_" + action + ".tpl"
print view(response, template_name=template_name)
except exception.MelangeServiceResponseError as server_error:
print _("The server returned an error:")
print server_error
sys.exit(1)
except exception.MelangeClientError as client_error:
print client_error
sys.exit(2)
except Exception:
if options.verbose:
raise
else:
print _("Command failed, please check log for more info")
sys.exit(2)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,33 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gettext
import os
gettext.install('melange', unicode=1)
def melange_root_path():
return os.path.dirname(__file__)
def melange_bin_path(filename="."):
return os.path.join(melange_root_path(), "..", "bin", filename)
def melange_etc_path(filename="."):
return os.path.join(melange_root_path(), "..", "etc", "melange", filename)

86
melange_client/client.py Normal file
View File

@ -0,0 +1,86 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib
import httplib2
import json
import socket
import urllib
import urlparse
from melange_client import exception
class HTTPClient(object):
def __init__(self, host='localhost', port=8080, use_ssl=False, timeout=60):
self.host = host
self.port = port
self.use_ssl = use_ssl
self.timeout = timeout
def _get_connection(self):
if self.use_ssl:
return httplib.HTTPSConnection(self.host,
self.port,
timeout=self.timeout)
else:
return httplib.HTTPConnection(self.host,
self.port,
timeout=self.timeout)
def do_request(self, method, path, body=None, headers=None, params=None):
params = params or {}
headers = headers or {}
url = path + '?' + urllib.urlencode(params)
try:
connection = self._get_connection()
connection.request(method, url, body, headers)
response = connection.getresponse()
if response.status >= 400:
raise exception.MelangeServiceResponseError(response.read())
return response
except (socket.error, IOError) as error:
raise exception.ClientConnectionError(
_("Error while communicating with server. "
"Got error: %s") % error)
class AuthorizationClient(httplib2.Http):
def __init__(self, url, username, access_key, auth_token=None):
super(AuthorizationClient, self).__init__()
self.url = urlparse.urljoin(url, "/v2.0/tokens")
self.username = username
self.access_key = access_key
self.auth_token = auth_token
def get_token(self):
if self.auth_token:
return self.auth_token
headers = {'content-type': 'application/json'}
request_body = json.dumps({"passwordCredentials":
{"username": self.username,
'password': self.access_key}})
res, body = self.request(self.url, "POST", headers=headers,
body=request_body)
if int(res.status) >= 400:
raise Exception(_("Error occured while retrieving token : %s")
% body)
return json.loads(body)['auth']['token']['id']

View File

@ -0,0 +1,41 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.common import exception as openstack_exception
ClientConnectionError = openstack_exception.ClientConnectionError
class MelangeClientError(Exception):
def __init__(self, message):
super(MelangeClientError, self).__init__(message)
class TemplateNotFoundError(Exception):
def __init__(self, message=None):
message = message or "template not found"
super(TemplateNotFoundError, self).__init__(message)
class MelangeServiceResponseError(Exception):
def __init__(self, error):
super(MelangeServiceResponseError, self).__init__(error)

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
class MethodInspector(object):
def __init__(self, func):
self._func = func
def required_args(self):
return self.args()[0:self.required_args_count()]
def optional_args(self):
keys = self.args()[self.required_args_count(): len(self.args())]
return zip(keys, self.defaults())
def defaults(self):
return self.argspec().defaults or ()
def required_args_count(self):
return len(self.args()) - len(self.defaults())
def args(self):
args = self.argspec().args
if inspect.ismethod(self._func):
args.pop(0)
return args
def argspec(self):
return inspect.getargspec(self._func)
def __str__(self):
optionals = ["[{0}=<{0}>]".format(k) for k, v in self.optional_args()]
required = ["{0}=<{0}>".format(arg) for arg in self.required_args()]
args_str = ' '.join(required + optionals)
return "%s %s" % (self._func.__name__, args_str)
class ClassInspector(object):
def __init__(self, obj):
self.obj = obj
def methods(self):
"""Gets callable public methods.
Get all callable methods of an object that don't start with underscore
returns a dictionary of the form dict(method_name, method)
"""
def is_public_method(attr):
return (callable(getattr(self.obj, attr))
and not attr.startswith('_'))
return dict((attr, getattr(self.obj, attr)) for attr in dir(self.obj)
if is_public_method(attr))

View File

@ -0,0 +1,384 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import sys
import urlparse
from melange_client import client
from melange_client import utils
class Factory(object):
def __init__(self, host, port, timeout=None, auth_url=None, username=None,
api_key=None, auth_token=None, tenant_id=None):
self.host = host
self.port = port
self.timeout = timeout
self.auth_url = auth_url
self.username = username
self.api_key = api_key
self.auth_token = auth_token
self.tenant_id = tenant_id
def _auth_client(self):
if self.auth_url or self.auth_token:
return client.AuthorizationClient(self.auth_url,
self.username,
self.api_key,
self.auth_token)
def _client(self):
return client.HTTPClient(self.host,
self.port,
self.timeout)
def __getattr__(self, item):
class_name = utils.camelize(item) + "Client"
cls = getattr(sys.modules[__name__], class_name, None)
if cls is None:
raise AttributeError("%s has no attribute %s" %
(self.__class__.__name__, item))
return cls(self._client(), self._auth_client(), self.tenant_id)
class Resource(object):
def __init__(self, path, name, client, auth_client, tenant_id=None):
if tenant_id:
path = "tenants/{0}/{1}".format(tenant_id, path)
self.path = urlparse.urljoin("/v0.1/ipam/", path)
self.name = name
self.client = client
self.auth_client = auth_client
def create(self, **kwargs):
return self.request("POST",
self.path,
body=json.dumps({self.name: kwargs}))
def update(self, id, **kwargs):
return self.request("PUT",
self._member_path(id),
body=json.dumps(
{self.name: utils.remove_nones(kwargs)}))
def all(self, **params):
return self.request("GET",
self.path,
params=utils.remove_nones(params))
def find(self, id):
return self.request("GET", self._member_path(id))
def delete(self, id):
return self.request("DELETE", self._member_path(id))
def _member_path(self, id):
return "{0}/{1}".format(self.path, id)
def request(self, method, path, **kwargs):
kwargs['headers'] = {'Content-Type': "application/json"}
if self.auth_client:
kwargs['headers']['X-AUTH-TOKEN'] = self.auth_client.get_token()
result = self.client.do_request(method, path, **kwargs).read()
if result:
return json.loads(result)
class BaseClient(object):
TENANT_ID_REQUIRED = True
def __init__(self, client, auth_client, tenant_id):
self.client = client
self.auth_client = auth_client
self.tenant_id = tenant_id
class IpBlockClient(BaseClient):
def __init__(self, client, auth_client, tenant_id):
self.resource = Resource("ip_blocks",
"ip_block",
client,
auth_client,
tenant_id)
def create(self, type, cidr, network_id=None, policy_id=None):
return self.resource.create(type=type,
cidr=cidr,
network_id=network_id,
policy_id=policy_id)
def list(self):
return self.resource.all()
def show(self, id):
return self.resource.find(id)
def update(self, id, network_id=None, policy_id=None):
return self.resource.update(id,
network_id=network_id,
policy_id=policy_id)
def delete(self, id):
return self.resource.delete(id)
class SubnetClient(BaseClient):
def _resource(self, parent_id):
return Resource("ip_blocks/{0}/subnets".format(parent_id),
"subnet",
self.client,
self.auth_client,
self.tenant_id)
def create(self, parent_id, cidr, network_id=None):
return self._resource(parent_id).create(cidr=cidr,
network_id=network_id)
def list(self, parent_id):
return self._resource(parent_id).all()
class PolicyClient(BaseClient):
def __init__(self, client, auth_client, tenant_id):
self.resource = Resource("policies",
"policy",
client,
auth_client,
tenant_id)
def create(self, name, desc=None):
return self.resource.create(name=name, description=desc)
def update(self, id, name, desc=None):
return self.resource.update(id, name=name, description=desc)
def list(self):
return self.resource.all()
def show(self, id):
return self.resource.find(id)
def delete(self, id):
return self.resource.delete(id)
class UnusableIpRangeClient(BaseClient):
def _resource(self, policy_id):
return Resource("policies/{0}/unusable_ip_ranges".format(policy_id),
"ip_range",
self.client,
self.auth_client,
self.tenant_id)
def create(self, policy_id, offset, length):
return self._resource(policy_id).create(offset=offset, length=length)
def update(self, policy_id, id, offset=None, length=None):
return self._resource(policy_id).update(id,
offset=offset,
length=length)
def list(self, policy_id):
return self._resource(policy_id).all()
def show(self, policy_id, id):
return self._resource(policy_id).find(id)
def delete(self, policy_id, id):
return self._resource(policy_id).delete(id)
class UnusableIpOctetClient(BaseClient):
def _resource(self, policy_id):
return Resource("policies/{0}/unusable_ip_octets".format(policy_id),
"ip_octet",
self.client,
self.auth_client,
self.tenant_id)
def create(self, policy_id, octet):
return self._resource(policy_id).create(octet=octet)
def update(self, policy_id, id, octet=None):
return self._resource(policy_id).update(id, octet=octet)
def list(self, policy_id):
return self._resource(policy_id).all()
def show(self, policy_id, id):
return self._resource(policy_id).find(id)
def delete(self, policy_id, id):
return self._resource(policy_id).delete(id)
class AllocatedIpClient(BaseClient):
TENANT_ID_REQUIRED = False
def __init__(self, client, auth_client, tenant_id=None):
self._resource = Resource("allocated_ip_addresses",
"allocated_ip_addresses",
client,
auth_client,
tenant_id)
def list(self, used_by_device=None):
return self._resource.all(used_by_device=used_by_device)
class IpAddressClient(BaseClient):
def _resource(self, ip_block_id):
path = "ip_blocks/{0}/ip_addresses".format(ip_block_id)
return Resource(path,
"ip_address",
self.client,
self.auth_client,
self.tenant_id)
def create(self, ip_block_id, address=None, interface_id=None,
used_by_tenant=None, used_by_device=None):
resource = self._resource(ip_block_id)
return resource.create(address=address,
interface_id=interface_id,
used_by_device=used_by_device,
tenant_id=used_by_tenant)
def list(self, ip_block_id):
return self._resource(ip_block_id).all()
def show(self, ip_block_id, address):
return self._resource(ip_block_id).find(address)
def delete(self, ip_block_id, address):
return self._resource(ip_block_id).delete(address)
class IpRouteClient(BaseClient):
def _resource(self, ip_block_id):
path = "ip_blocks/{0}/ip_routes".format(ip_block_id)
return Resource(path,
"ip_route",
self.client,
self.auth_client,
self.tenant_id)
def create(self, ip_block_id, destination, gateway, netmask=None):
resource = self._resource(ip_block_id)
return resource.create(destination=destination,
gateway=gateway,
netmask=netmask)
def list(self, ip_block_id):
return self._resource(ip_block_id).all()
def show(self, ip_block_id, id):
return self._resource(ip_block_id).find(id)
def delete(self, ip_block_id, id):
return self._resource(ip_block_id).delete(id)
class InterfaceClient(BaseClient):
TENANT_ID_REQUIRED = False
def __init__(self, client, auth_client, tenant_id=None):
self._resource = Resource("interfaces",
"interface",
client,
auth_client,
tenant_id)
def create(self, vif_id, tenant_id, device_id=None, network_id=None):
request_params = dict(id=vif_id,
tenant_id=tenant_id,
device_id=device_id)
if network_id:
request_params['network'] = dict(id=network_id)
return self._resource.create(**request_params)
def show(self, vif_id):
return self._resource.find(vif_id)
def delete(self, vif_id):
return self._resource.delete(vif_id)
class MacAddressRangeClient(BaseClient):
TENANT_ID_REQUIRED = False
def __init__(self, client, auth_client, tenant_id=None):
self._resource = Resource("mac_address_ranges",
"mac_address_range",
client,
auth_client,
tenant_id)
def create(self, cidr):
return self._resource.create(cidr=cidr)
def show(self, id):
return self._resource.find(id)
def list(self):
return self._resource.all()
def delete(self, id):
return self._resource.delete(id)
class AllowedIpClient(BaseClient):
def __init__(self, client, auth_client, tenant_id=None):
self.client = client
self.auth_client = auth_client
self.tenant_id = tenant_id
def _resource(self, interface_id):
return Resource("interfaces/{0}/allowed_ips".format(interface_id),
"allowed_ip",
self.client,
self.auth_client,
self.tenant_id)
def create(self, interface_id, network_id, ip_address):
return self._resource(interface_id).create(network_id=network_id,
ip_address=ip_address)
def show(self, interface_id, ip_address):
return self._resource(interface_id).find(ip_address)
def list(self, interface_id):
return self._resource(interface_id).all()
def delete(self, interface_id, ip_address):
return self._resource(interface_id).delete(ip_address)

314
melange_client/template.py Normal file
View File

@ -0,0 +1,314 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Template library copied from bottle: http://bottlepy.org/
#
# Copyright (c) 2011, Marcel Hellkamp.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import cgi
import re
import os
import functools
import time
import tokenize
import mimetypes
import datetime
from melange_client import exception
TEMPLATES = {}
DEBUG = False
TEMPLATE_PATH = ['./', './views/']
class BaseTemplate(object):
""" Base class and minimal API for template adapters """
extentions = ['tpl', 'html', 'thtml', 'stpl']
settings = {} # used in prepare()
defaults = {} # used in render()
def __init__(self, source=None, name=None, lookup=None, encoding='utf8',
**settings):
""" Create a new template.
If the source parameter (str or buffer) is missing, the name argument
is used to guess a template filename. Subclasses can assume that
self.source and/or self.filename are set. Both are strings.
The lookup, encoding and settings parameters are stored as instance
variables.
The lookup parameter stores a list containing directory paths.
The encoding parameter should be used to decode byte strings or files.
The settings parameter contains a dict for engine-specific settings.
"""
lookup = lookup or []
self.name = name
self.source = source.read() if hasattr(source, 'read') else source
self.filename = source.filename \
if hasattr(source, 'filename') \
else None
self.lookup = [os.path.abspath(path) for path in lookup]
self.encoding = encoding
self.settings = self.settings.copy() # Copy from class variable
self.settings.update(settings) # Apply
if not self.source and self.name:
self.filename = self.search(self.name, self.lookup)
if not self.filename:
raise exception.TemplateNotFoundError(
message='Template %s not found' % repr(name))
if not self.source and not self.filename:
raise Exception('No template specified', (0, 0), None)
self.prepare(**self.settings)
@classmethod
def search(cls, name, lookup=None):
""" Search name in all directories specified in lookup.
First without, then with common extensions. Return first hit. """
lookup = lookup or []
if os.path.isfile(name):
return name
for spath in lookup:
fname = os.path.join(spath, name)
if os.path.isfile(fname):
return fname
for ext in cls.extentions:
if os.path.isfile('%s.%s' % (fname, ext)):
return '%s.%s' % (fname, ext)
@classmethod
def global_config(cls, key, *args):
'''This reads or sets the global settings stored in class.settings.'''
if args:
cls.settings[key] = args[0]
else:
return cls.settings[key]
def prepare(self, **options):
"""Run preparations (parsing, caching, ...).
It should be possible to call this again to refresh a template or to
update settings.
"""
raise NotImplementedError
def render(self, **args):
"""Render the template with the specified local variables and return
a single byte or unicode string. If it is a byte string, the encoding
must match self.encoding. This method must be thread-safe!
"""
raise NotImplementedError
class SimpleTemplate(BaseTemplate):
blocks = ('if', 'elif', 'else', 'try', 'except', 'finally', 'for', 'while',
'with', 'def', 'class')
dedent_blocks = ('elif', 'else', 'except', 'finally')
cache = None
code = None
compiled = None
_str = None
_escape = None
def prepare(self, escape_func=cgi.escape, noescape=False):
self.cache = {}
if self.source:
self.code = self.translate(self.source)
self.compiled = compile(self.code, '<string>', 'exec')
else:
self.code = self.translate(open(self.filename).read())
self.compiled = compile(self.code, self.filename, 'exec')
enc = self.encoding
touni = functools.partial(unicode, encoding=self.encoding)
self._str = lambda x: touni(x, enc)
self._escape = lambda x: escape_func(touni(x))
if noescape:
self._str, self._escape = self._escape, self._str
def translate(self, template):
stack = [] # Current Code indentation
lineno = 0 # Current line of code
ptrbuffer = [] # Buffer for printable strings and token tuples
codebuffer = [] # Buffer for generated python code
functools.partial(unicode, encoding=self.encoding)
multiline = dedent = False
def yield_tokens(line):
for i, part in enumerate(re.split(r'\{\{(.*?)\}\}', line)):
if i % 2:
if part.startswith('!'):
yield 'RAW', part[1:]
else:
yield 'CMD', part
else:
yield 'TXT', part
def split_comment(codeline):
""" Removes comments from a line of code. """
line = codeline.splitlines()[0]
try:
tokens = list(tokenize.generate_tokens(iter(line).next))
except tokenize.TokenError:
return line.rsplit('#', 1) if '#' in line else (line, '')
for token in tokens:
if token[0] == tokenize.COMMENT:
start, end = token[2][1], token[3][1]
return (
codeline[:start] + codeline[end:],
codeline[start:end])
return line, ''
def flush():
"""Flush the ptrbuffer"""
if not ptrbuffer:
return
cline = ''
for line in ptrbuffer:
for token, value in line:
if token == 'TXT':
cline += repr(value)
elif token == 'RAW':
cline += '_str(%s)' % value
elif token == 'CMD':
cline += '_escape(%s)' % value
cline += ', '
cline = cline[:-2] + '\\\n'
cline = cline[:-2]
if cline[:-1].endswith('\\\\\\\\\\n'):
cline = cline[:-7] + cline[-1] # 'nobr\\\\\n' --> 'nobr'
cline = '_printlist([' + cline + '])'
del ptrbuffer[:] # Do this before calling code() again
code(cline)
def code(stmt):
for line in stmt.splitlines():
codebuffer.append(' ' * len(stack) + line.strip())
for line in template.splitlines(True):
lineno += 1
line = line if isinstance(line, unicode)\
else unicode(line, encoding=self.encoding)
if lineno <= 2:
m = re.search(r"%.*coding[:=]\s*([-\w\.]+)", line)
if m:
self.encoding = m.group(1)
if m:
line = line.replace('coding', 'coding (removed)')
if line.strip()[:2].count('%') == 1:
line = line.split('%', 1)[1].lstrip() # Rest of line after %
cline = split_comment(line)[0].strip()
cmd = re.split(r'[^a-zA-Z0-9_]', cline)[0]
flush() # encodig (TODO: why?)
if cmd in self.blocks or multiline:
cmd = multiline or cmd
dedent = cmd in self.dedent_blocks # "else:"
if dedent and not multiline:
cmd = stack.pop()
code(line)
oneline = not cline.endswith(':') # "if 1: pass"
multiline = cmd if cline.endswith('\\') else False
if not oneline and not multiline:
stack.append(cmd)
elif cmd == 'end' and stack:
code('#end(%s) %s' % (stack.pop(), line.strip()[3:]))
elif cmd == 'include':
p = cline.split(None, 2)[1:]
if len(p) == 2:
code("_=_include(%s, _stdout, %s)" %
(repr(p[0]), p[1]))
elif p:
code("_=_include(%s, _stdout)" % repr(p[0]))
else: # Empty %include -> reverse of %rebase
code("_printlist(_base)")
elif cmd == 'rebase':
p = cline.split(None, 2)[1:]
if len(p) == 2:
code("globals()['_rebase']=(%s, dict(%s))" % (
repr(p[0]), p[1]))
elif p:
code("globals()['_rebase']=(%s, {})" % repr(p[0]))
else:
code(line)
else: # Line starting with text (not '%') or '%%' (escaped)
if line.strip().startswith('%%'):
line = line.replace('%%', '%', 1)
ptrbuffer.append(yield_tokens(line))
flush()
return '\n'.join(codebuffer) + '\n'
def subtemplate(self, _name, _stdout, **args):
if _name not in self.cache:
self.cache[_name] = self.__class__(name=_name, lookup=self.lookup)
return self.cache[_name].execute(_stdout, **args)
def execute(self, _stdout, **args):
env = self.defaults.copy()
env.update({'_stdout': _stdout, '_printlist': _stdout.extend,
'_include': self.subtemplate, '_str': self._str,
'_escape': self._escape})
env.update(args)
eval(self.compiled, env)
if '_rebase' in env:
subtpl, rargs = env['_rebase']
subtpl = self.__class__(name=subtpl, lookup=self.lookup)
rargs['_base'] = _stdout[:] # copy stdout
del _stdout[:] # clear stdout
return subtpl.execute(_stdout, **rargs)
return env
def render(self, **args):
""" Render the template using keyword arguments as local variables. """
stdout = []
self.execute(stdout, **args)
return ''.join(stdout)
def template(tpl, template_adapter=SimpleTemplate, **kwargs):
'''
Get a rendered template as a string iterator.
You can use a name, a filename or a template string as first parameter.
'''
if tpl not in TEMPLATES or DEBUG:
settings = kwargs.get('template_settings', {})
lookup = kwargs.get('template_lookup', TEMPLATE_PATH)
if isinstance(tpl, template_adapter):
TEMPLATES[tpl] = tpl
if settings:
TEMPLATES[tpl].prepare(**settings)
elif "\n" in tpl or "{" in tpl or "%" in tpl or '$' in tpl:
TEMPLATES[tpl] = template_adapter(source=tpl, lookup=lookup,
**settings)
else:
TEMPLATES[tpl] = template_adapter(name=tpl, lookup=lookup,
**settings)
return TEMPLATES[tpl].render(**kwargs)

View File

@ -0,0 +1,42 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import mox
class BaseTest(unittest.TestCase):
def setUp(self):
self.mock = mox.Mox()
super(BaseTest, self).setUp()
def assertUnorderedEqual(self, expected, actual):
self.assertEqual(sorted(expected), sorted(actual))
def assertRaisesExcMessage(self, exception, message,
func, *args, **kwargs):
"""This is similar to assertRaisesRegexp in python 2.7"""
try:
func(*args, **kwargs)
self.fail("Expected {0} to raise {1}".format(func,
repr(exception)))
except exception as error:
self.assertIn(message, str(error))

View File

@ -0,0 +1,34 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
import os
import melange_client
from melange_client import utils
def run(command, **kwargs):
config = ConfigParser.ConfigParser()
config.read(os.path.join(melange_client.melange_root_path(),
"tests/functional/tests.conf"))
full_command = "{0} --host={1} --port={2} {3} -v ".format(
melange_client.melange_bin_path('melange'),
config.get('DEFAULT', 'server_name'),
config.get('DEFAULT', 'server_port'),
command)
return utils.execute(full_command, **kwargs)

View File

@ -0,0 +1,67 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import yaml
from melange_client.tests import functional
def create_policy(tenant_id="123"):
create_res = functional.run("policy create name=policy_name "
"desc=policy_desc "
"-t %s" % tenant_id)
return yaml.load(create_res['out'])['policy']
def create_ip(address="10.1.1.1", used_by_tenant=None,
used_by_device="device", interface_id=None,
block_id=None, tenant_id=None):
used_by_tenant = used_by_tenant or tenant_id
interface_id = interface_id or uuid.uuid4()
block_id = block_id or create_block(cidr="10.1.1.1/24",
tenant_id=tenant_id)['id']
create_res = functional.run("ip_address create ip_block_id=%s address=%s "
"interface_id=%s used_by_tenant=%s "
"used_by_device=%s -t %s " % (block_id, address, interface_id,
used_by_tenant, used_by_device,
tenant_id))
return model("ip_address", create_res)
def create_interface(vif_id=None, device_id="device", network_id=None,
tenant_id=None):
vif_id = vif_id or uuid.uuid4()
network_id = network_id or uuid.uuid4()
tenant_id = tenant_id or uuid.uuid4()
create_res = functional.run("interface create vif_id=%s tenant_id=%s "
"device_id=%s network_id=%s" % (vif_id, tenant_id,
device_id, network_id))
return model("interface", create_res)
def create_block(tenant_id="1234", cidr="10.1.1.0/29", network_id=None):
network_id = network_id or uuid.uuid4()
create_res = functional.run("ip_block create type=private "
"cidr=%(cidr)s network_id=%(network_id)s "
"-t %(tenant_id)s" % locals())
return model('ip_block', create_res)
def model(name, res):
return yaml.load(res['out'])[name]

View File

@ -0,0 +1,56 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
class TemplateTestHelper:
def assertTableEquals(self, expected_row_maps, actual_contents):
actual_lines = [line for line in actual_contents.splitlines() if line]
actual_cells = [line.split("\t") for line in actual_lines]
self.assertColumnCellWidthsAreEqual(actual_cells)
actual_header = actual_lines.pop(0)
self.assertHeader(expected_row_maps[0].keys(), actual_header)
def cells_to_map(cells_in_a_line):
return dict((key.strip(), cell.strip()) for key, cell
in itertools.izip(actual_cells[0], cells_in_a_line))
actual_row_maps = [cells_to_map(cells_in_a_line) for cells_in_a_line
in actual_cells[1:]]
self.assertUnorderedEqual(expected_row_maps, actual_row_maps)
def assertColumnCellWidthsAreEqual(self, cells):
cell_lengths = lambda lst: map(lambda cell: len(cell), lst)
for column_values in zip(*cells):
self.assertElementsAreSame(cell_lengths(column_values),
"column with first element '%s' has "
"dissimilar widths" % column_values[0])
def assertElementsAreSame(self, lst, error_msg="List elements differ"):
self.assertEqual(len(set(lst)), 1, error_msg)
def assertHeader(self, expected_header_columns, actual_header):
actual_header_columns = [header_column.strip() for header_column
in actual_header.split("\t")]
self.assertUnorderedEqual(expected_header_columns,
actual_header_columns)

View File

@ -0,0 +1,499 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import uuid
import yaml
from melange_client import tests
from melange_client import utils
from melange_client.tests import functional
from melange_client.tests.functional import template_test_helper
from melange_client.tests.functional import factory
class TestBaseCLI(tests.BaseTest, template_test_helper.TemplateTestHelper):
def setUp(self):
self.maxDiff = None
self.tenant_id = str(uuid.uuid4())
def command(self, command_name, is_tenanted=True, raise_error=True,
**kwargs):
parameters = ["%s=%s" % (key, value)
for key, value in kwargs.iteritems()]
cmd_args = " ".join(parameters)
if is_tenanted:
return functional.run("{0} -t {1} {2} ".format(command_name,
self.tenant_id,
cmd_args),
raise_error=raise_error)
else:
return functional.run("{0} {1} ".format(command_name, cmd_args),
raise_error=raise_error)
def assertShows(self, res_name, expected_resource, is_tenanted=True,
command_name=None, parameters=""):
command_name = command_name or res_name
parameters = parameters or "id=%s" % expected_resource['id']
tenant_option = "-t %s" % self.tenant_id if is_tenanted else ""
show_command = ("%s show %s %s"
% (command_name, parameters, tenant_option))
show_res = functional.run(show_command)
shown_resource = factory.model(res_name, show_res)
for key, expected_resource_field in expected_resource.iteritems():
self.assertEqual(expected_resource_field, shown_resource[key])
def assertResourceNotFound(self, res_name, expected_resource,
is_tenanted=True, command_name=None,
parameters=""):
command_name = command_name or res_name
parameters = parameters or "id=%s" % expected_resource['id']
tenant_option = "-t %s" % self.tenant_id if is_tenanted else ""
show_res = functional.run("%s show %s %s " % (command_name,
parameters,
tenant_option),
raise_error=False)
self.assertTrue("%s Not Found" % utils.camelize(res_name)
in show_res['out'])
class TestIpBlockCLI(TestBaseCLI):
def test_crud(self):
create_res = functional.run("ip_block create type=private "
"cidr=10.1.1.0/29 network_id=%s "
"-t %s" % (uuid.uuid4(), self.tenant_id))
ip_block = factory.model('ip_block', create_res)
self.assertEqual(0, create_res['exitcode'])
self.assertShows('ip_block', ip_block)
update_res = functional.run("ip_block update id=%s network_id=%s "
"-t %s" % (ip_block['id'],
uuid.uuid4(),
self.tenant_id))
updated_block = factory.model('ip_block', update_res)
self.assertEqual(0, update_res['exitcode'])
self.assertShows('ip_block', updated_block)
deleted_res = functional.run("ip_block delete "
"id=%s -t %s" % (ip_block['id'],
self.tenant_id))
self.assertEqual(0, deleted_res['exitcode'])
self.assertResourceNotFound('ip_block', ip_block)
def test_list(self):
block1 = factory.create_block(self.tenant_id)
block2 = factory.create_block(self.tenant_id)
list_res = functional.run("ip_block list -t %s" % self.tenant_id)
self.assertEqual(0, list_res['exitcode'])
self.assertEqual(sorted([block1, block2]),
sorted(factory.model('ip_blocks', list_res)))
def test_list_without_tenant_id_should_error_out(self):
self.assertRaises(RuntimeError,
functional.run,
"ip_block list")
class TestSubnetCLI(TestBaseCLI):
def test_create_and_list(self):
block = factory.create_block(cidr="10.0.0.0/8", tenant_id="123")
subnet_res_1 = functional.run("subnet create parent_id={0} "
"cidr=10.0.1.0/29 -t 123".format(block['id']))
self.assertEqual(0, subnet_res_1['exitcode'])
subnet_res_2 = functional.run("subnet create parent_id={0} "
"cidr=10.0.0.0/29 -t 123".format(block['id']))
self.assertEqual(0, subnet_res_2['exitcode'])
subnet_list_res = functional.run("subnet list parent_id={0} "
"-t 123".format(block['id']))
self.assertEqual(sorted([factory.model('subnet', subnet_res_1),
factory.model('subnet', subnet_res_2)]),
sorted(factory.model('subnets', subnet_list_res)))
class TestPolicyCLI(TestBaseCLI):
def test_list(self):
policy1 = factory.create_policy(self.tenant_id)
policy2 = factory.create_policy(self.tenant_id)
list_res = functional.run("policy list -t %s" % self.tenant_id)
self.assertEqual(sorted([policy1, policy2]),
sorted(yaml.load(list_res['out'])['policies']))
def test_crud(self):
create_res = self.command("policy create",
name="policy_name",
desc="policy_desc")
self.assertEqual(0, create_res['exitcode'])
policy = factory.model('policy', create_res)
self.assertShows('policy', policy)
update_res = self.command("policy update",
id=policy['id'],
name="new_name")
self.assertEqual(0, update_res['exitcode'])
updated_policy = factory.model('policy', update_res)
self.assertShows('policy', updated_policy)
delete_res = self.command("policy delete", id=policy['id'])
self.assertEqual(0, delete_res['exitcode'])
self.assertResourceNotFound('policy', policy)
class TestUnusableIpRangesCLI(TestBaseCLI):
def test_create(self):
policy = factory.create_policy(tenant_id=self.tenant_id)
create_res = self.command('unusable_ip_range create',
policy_id=policy['id'],
offset=1,
length=2)
ip_range = factory.model('ip_range', create_res)
self.assertShows('ip_range',
ip_range,
command_name="unusable_ip_range",
parameters="id=%s policy_id=%s" % (ip_range['id'],
policy['id']))
update_res = self.command('unusable_ip_range update',
policy_id=policy['id'],
id=ip_range['id'],
offset=10,
length=122)
updated_ip_range = factory.model('ip_range', update_res)
self.assertShows('ip_range',
updated_ip_range,
command_name="unusable_ip_range",
parameters="id=%s policy_id=%s" % (ip_range['id'],
policy['id']))
another_create_res = self.command('unusable_ip_range create',
policy_id=policy['id'],
offset=1,
length=2)
another_ip_range = factory.model('ip_range', another_create_res)
list_res = functional.run("unusable_ip_range list"
" policy_id={0} -t {1}".format(policy['id'],
self.tenant_id))
self.assertEqual(sorted([updated_ip_range, another_ip_range]),
sorted(yaml.load(list_res['out'])['ip_ranges']))
self.command("unusable_ip_range delete",
policy_id=policy['id'],
id=ip_range['id'])
parameters = ("policy_id=%s id=%s" % (policy['id'],
ip_range['id']))
self.assertResourceNotFound("ip_range",
ip_range,
command_name="unusable_ip_range",
parameters=parameters)
class TestUnusableIpOctetsCLI(TestBaseCLI):
def test_crud(self):
policy = factory.create_policy(tenant_id=self.tenant_id)
create_res = self.command('unusable_ip_octet create',
policy_id=policy['id'],
octet=255)
ip_octet = factory.model('ip_octet', create_res)
self.assertShows('ip_octet',
ip_octet,
command_name="unusable_ip_octet",
parameters="id=%s policy_id=%s" % (ip_octet['id'],
policy['id']))
update_res = self.command('unusable_ip_octet update',
policy_id=policy['id'],
id=ip_octet['id'],
octet=200)
updated_ip_octet = factory.model('ip_octet', update_res)
self.assertShows('ip_octet', updated_ip_octet,
command_name="unusable_ip_octet",
parameters="id=%s policy_id=%s" % (ip_octet['id'],
policy['id']))
another_create_res = self.command('unusable_ip_octet create',
policy_id=policy['id'],
octet=200)
another_ip_octet = factory.model('ip_octet', another_create_res)
list_res = functional.run("unusable_ip_octet list policy_id={0}"
" -t {1}".format(policy['id'],
self.tenant_id))
self.assertEqual(sorted([updated_ip_octet, another_ip_octet]),
sorted(yaml.load(list_res['out'])['ip_octets']))
self.command("unusable_ip_octet delete",
policy_id=policy['id'],
id=ip_octet['id'])
parameters = "policy_id=%s id=%s" % (policy['id'],
ip_octet['id'])
self.assertResourceNotFound("ip_octet",
ip_octet,
command_name='unusable_ip_octet',
parameters=parameters)
class TestAllocatedIpAddressCLI(TestBaseCLI):
def test_list(self):
device1_id, device2_id = uuid.uuid4(), uuid.uuid4()
block = factory.create_block(cidr="30.1.1.1/24",
tenant_id=self.tenant_id)
ip1 = factory.create_ip(block_id=block['id'],
address="30.1.1.2",
used_by_device=device1_id,
tenant_id=self.tenant_id)
ip2 = factory.create_ip(block_id=block['id'],
address="30.1.1.3",
used_by_device=device2_id,
tenant_id=self.tenant_id)
list_res = self.command("allocated_ip list",
is_tenanted=False,
used_by_device=device1_id)
self.assertEqual([ip1], yaml.load(list_res['out'])['ip_addresses'])
def test_list_with_tenant(self):
device1_id, device2_id = uuid.uuid4(), uuid.uuid4()
tenant1_id, tenant2_id = uuid.uuid4(), uuid.uuid4()
block = factory.create_block(cidr="30.1.1.1/24",
tenant_id=self.tenant_id)
tenant1_ip1 = factory.create_ip(block_id=block['id'],
address="30.1.1.2",
used_by_device=device1_id,
tenant_id=self.tenant_id,
used_by_tenant=tenant1_id,)
tenant1_ip2 = factory.create_ip(block_id=block['id'],
address="30.1.1.3",
used_by_device=device1_id,
tenant_id=self.tenant_id,
used_by_tenant=tenant1_id,)
tenant2_ip1 = factory.create_ip(block_id=block['id'],
address="30.1.1.4",
used_by_device=device2_id,
tenant_id=self.tenant_id,
used_by_tenant=tenant2_id)
list_res = functional.run("allocated_ip list -t %s" % tenant1_id)
self.assertEqual(sorted([tenant1_ip1, tenant1_ip2]),
sorted(yaml.load(list_res['out'])['ip_addresses']))
class TestIpAddressCLI(TestBaseCLI):
def test_crud(self):
block = factory.create_block(cidr="10.1.1.0/24",
tenant_id=self.tenant_id)
ip = factory.create_ip(block_id=block['id'],
address="10.1.1.2",
tenant_id=self.tenant_id)
self._assert_ip_shows(ip)
another_ip = factory.create_ip(block_id=block['id'],
address="10.1.1.3",
tenant_id=self.tenant_id)
list_res = self.command("ip_address list",
ip_block_id=block['id'])
self.assertEqual(sorted([ip, another_ip]),
sorted(yaml.load(list_res['out'])['ip_addresses']))
self.command("ip_address delete",
ip_block_id=block['id'],
address="10.1.1.2")
# TODO: fix bug 911255 on show of deallocated addresses
# show_res = functional.run("ip_address show ip_block_id=%s address=%s"
# "-t %s" % (block['id'], "10.1.1.2", self.tenant_id),
# raise_error=False)
# self.assertTrue("IpAddress Not Found" in show_res['out'])
def _assert_ip_shows(self, expected_resource):
show_command = ("ip_address show ip_block_id=%s address=%s "
"-t %s" % (expected_resource['ip_block_id'],
expected_resource['address'],
self.tenant_id))
show_res = functional.run(show_command)
shown_resource = factory.model('ip_address', show_res)
for key, expected_resource_field in expected_resource.iteritems():
self.assertEqual(expected_resource_field, shown_resource[key])
class TestIpRoutesCLI(TestBaseCLI):
def test_crud(self):
block = factory.create_block(cidr="77.1.1.0/24",
tenant_id=self.tenant_id)
create_res = self.command("ip_route create",
ip_block_id=block['id'],
destination="10.1.1.2",
gateway="10.1.1.1",
netmask="255.255.255.0")
route = factory.model("ip_route", create_res)
self.assertShows("ip_route",
route,
parameters="ip_block_id=%s id=%s" % (block['id'],
route['id']))
another_create_res = self.command("ip_route create",
ip_block_id=block['id'],
destination="20.1.1.2",
gateway="20.1.1.1",
netmask="255.255.255.0")
another_route = factory.model("ip_route", another_create_res)
list_res = self.command("ip_route list",
ip_block_id=block['id'])
self.assertTableEquals([route, another_route], list_res['out'])
self.command("ip_route delete",
ip_block_id=block['id'],
id=route['id'])
self.assertResourceNotFound("ip_route",
route,
parameters="ip_block_id=%s id=%s"
% (block['id'], route['id']))
class TestInterfaceCLI(TestBaseCLI):
def test_crud(self):
iface = factory.create_interface(tenant_id=self.tenant_id)
self.assertShows("interface",
iface,
parameters="vif_id=%s" % iface['id'])
self.command("interface delete", is_tenanted=False, vif_id=iface['id'])
self.assertResourceNotFound("interface",
iface,
parameters="vif_id=%s" % iface['id'])
class TestMacAddressRangeCLI(TestBaseCLI):
def test_crud(self):
create_res = self.command("mac_address_range create",
is_tenanted=False,
cidr="ab-bc-cd-12-23-34/2")
rng = factory.model('mac_address_range', create_res)
self.assertShows('mac_address_range', rng, is_tenanted=False)
another_create_res = self.command("mac_address_range create",
is_tenanted=False,
cidr="bc-ab-dc-12-23-34/2")
another_rng = factory.model('mac_address_range', another_create_res)
list_res = self.command("mac_address_range list", is_tenanted=False)
rng_list = yaml.load(list_res['out'])['mac_address_ranges']
self.assertTrue(rng in rng_list)
self.assertTrue(another_rng in rng_list)
self.command("mac_address_range delete",
is_tenanted=False,
id=rng['id'])
self.assertResourceNotFound('mac_address_range',
rng,
is_tenanted=False)
class TestAllowedIpCLI(TestBaseCLI):
def test_crud(self):
network_id = uuid.uuid4()
iface = factory.create_interface(network_id=network_id,
tenant_id=self.tenant_id)
block = factory.create_block(cidr="20.1.1.0/24",
network_id=network_id,
tenant_id=self.tenant_id)
ip = factory.create_ip(address="20.1.1.2",
block_id=block['id'],
used_by_tenant=self.tenant_id,
tenant_id=self.tenant_id)
another_ip = factory.create_ip(address="20.1.1.3",
block_id=block['id'],
used_by_tenant=self.tenant_id,
tenant_id=self.tenant_id)
ip_res = self.command("allowed_ip create",
interface_id=iface['id'],
network_id=network_id,
ip_address=ip['address'])
allowed_ip = factory.model("ip_address", ip_res)
self.assertShows("ip_address",
allowed_ip,
parameters="interface_id=%s ip_address=%s" %
(iface['id'], ip['address']),
command_name="allowed_ip")
another_ip_res = self.command("allowed_ip create",
interface_id=iface['id'],
network_id=network_id,
ip_address=another_ip['address'])
another_allowed_ip = factory.model('ip_address', another_ip_res)
list_res = self.command("allowed_ip list", interface_id=iface['id'])
actual_allowed_ips = yaml.load(list_res['out'])['ip_addresses']
self.assertTrue(allowed_ip in actual_allowed_ips)
self.assertTrue(another_allowed_ip in actual_allowed_ips)
self.command("allowed_ip delete",
interface_id=iface['id'],
ip_address=ip['address'])
show_res = self.command("allowed_ip show",
interface_id=iface['id'],
ip_address=ip['address'],
raise_error=False)
expected_error = ("Ip Address %s hasnt been allowed on interface %s" %
(ip['address'], iface['id']))
self.assertTrue(expected_error in show_res['out'])
class TestMelangeCLI(TestBaseCLI):
def test_raises_error_for_non_keyword_arguments(self):
res = functional.run("allowed_ip delete interface_id123 -t RAX",
raise_error=False)
self.assertEqual(res['exitcode'], 2)
self.assertIn("Action arguments should be of the form of field=value",
res['out'])

View File

@ -0,0 +1,3 @@
[DEFAULT]
server_name=localhost
server_port=9898

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from melange_client import tests
from melange_client.views.helpers import table
class TestTable(tests.BaseTest):
def test_padded_keys(self):
data = [{'k1': 'v1', 'k2':'v12345', 'k1234':'v3'},
{'k2': 'v22', 'k5': 'v11', 'k1234':'v11'},
]
actual_elem_pads = table.padded_keys(data)
expected_elem_pads = {
'k1': len('v1'),
'k2': len('v12345'),
'k1234': len('k1234'),
'k5': len('v11'),
}
self.assertEqual(expected_elem_pads, actual_elem_pads)
def test_row_view(self):
data = {'k1': 2, 'k2': 3, 'k1234': 6, 9: 4}
row = table.row_view(sorted(data.iteritems()))
self.assertEqual("9 \tk1\tk1234 \tk2 ", row)

View File

@ -0,0 +1,86 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import urlparse
import httplib2
import mox
from melange_client import client
from melange_client import tests
class TestAuthorizationClient(tests.BaseTest):
def test_get_token_doesnt_call_auth_service_when_token_is_given(self):
url = "http://localhost:5001"
auth_client = client.AuthorizationClient(url,
"username",
"access_key",
"auth_token")
self.mock.StubOutWithMock(auth_client, "request")
self.assertEqual(auth_client.get_token(), "auth_token")
def test_get_token_calls_auth_service_when_token_is_not_given(self):
url = "http://localhost:5001"
auth_client = client.AuthorizationClient(url,
"username",
"access_key",
auth_token=None)
self.mock.StubOutWithMock(auth_client, "request")
request_body = json.dumps({
"passwordCredentials": {
"username": "username",
'password': "access_key"},
})
response_body = json.dumps({'auth': {'token': {'id': "auth_token"}}})
res = httplib2.Response(dict(status='200'))
auth_client.request(urlparse.urljoin(url, "/v2.0/tokens"),
"POST",
headers=mox.IgnoreArg(),
body=request_body).AndReturn((res,
response_body))
self.mock.ReplayAll()
self.assertEqual(auth_client.get_token(), "auth_token")
def test_raises_error_when_retreiveing_token_fails(self):
url = "http://localhost:5001"
auth_client = client.AuthorizationClient(url,
None,
"access_key",
auth_token=None)
self.mock.StubOutWithMock(auth_client, "request")
res = httplib2.Response(dict(status='401'))
response_body = "Failed to get token"
auth_client.request(urlparse.urljoin(url, "/v2.0/tokens"),
"POST",
headers=mox.IgnoreArg(),
body=mox.IgnoreArg()).AndReturn((res,
response_body))
self.mock.ReplayAll()
expected_error_msg = ("Error occured while retrieving token :"
" Failed to get token")
self.assertRaisesExcMessage(Exception,
expected_error_msg,
auth_client.get_token)

View File

@ -0,0 +1,79 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from melange_client import tests
from melange_client import inspector
class TestMethodInspector(tests.BaseTest):
def test_method_without_optional_args(self):
def foo(bar):
pass
method = inspector.MethodInspector(foo)
self.assertEqual(method.required_args(), ['bar'])
self.assertEqual(method.optional_args(), [])
def test_method_with_optional_args(self):
def foo(bar, baz=1):
pass
method = inspector.MethodInspector(foo)
self.assertEqual(method.required_args(), ['bar'])
self.assertEqual(method.optional_args(), [('baz', 1)])
def test_instance_method_with_optional_args(self):
class Foo():
def bar(self, baz, qux=2):
pass
method = inspector.MethodInspector(Foo().bar)
self.assertEqual(method.required_args(), ['baz'])
self.assertEqual(method.optional_args(), [('qux', 2)])
def test_method_without_args(self):
def foo():
pass
method = inspector.MethodInspector(foo)
self.assertEqual(method.required_args(), [])
self.assertEqual(method.optional_args(), [])
def test_instance_method_without_args(self):
class Foo():
def bar(self):
pass
method = inspector.MethodInspector(Foo().bar)
self.assertEqual(method.required_args(), [])
self.assertEqual(method.optional_args(), [])
def test_method_str(self):
class Foo():
def bar(self, baz, qux=None):
pass
method = inspector.MethodInspector(Foo().bar)
self.assertEqual(str(method), "bar baz=<baz> [qux=<qux>]")

View File

@ -0,0 +1,36 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from melange_client import ipam_client
from melange_client import tests
class TestFactory(tests.BaseTest):
def test_factory_gives_client(self):
factory = ipam_client.Factory("host", "8080")
self.assertEquals(ipam_client.IpBlockClient, type(factory.ip_block))
def test_factory_raises_attribute_error_for_non_existent_client(self):
factory = ipam_client.Factory("host", "8080")
self.assertRaisesExcMessage(AttributeError,
"Factory has no attribute "
"non_existent_client",
lambda: factory.non_existent_client)

View File

@ -0,0 +1,37 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from melange_client import tests
from melange_client import utils
class TestUtils(tests.BaseTest):
def test_remove_nones(self):
self.assertEquals(dict(a=1, c=3),
utils.remove_nones(dict(a=1, b=None, c=3)))
self.assertEquals(dict(),
utils.remove_nones(dict(a=None, b=None)))
self.assertEquals(dict(a=1, b=2, c=3),
utils.remove_nones(dict(a=1, b=2, c=3)))
def test_camelize(self):
self.assertEquals("AaBbCc", utils.camelize("aa_bb_cc"))
self.assertEquals("Aa", utils.camelize("aa"))
self.assertEquals("AaBbCc", utils.camelize("AaBbCc"))

65
melange_client/utils.py Normal file
View File

@ -0,0 +1,65 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import os
import subprocess
import melange_client
def camelize(string):
return re.sub(r"(?:^|_)(.)", lambda x: x.group(0)[-1].upper(), string)
def remove_nones(hash):
return dict((key, value)
for key, value in hash.iteritems() if value is not None)
def execute(cmd, raise_error=True):
"""Executes a command in a subprocess.
Returns a tuple of (exitcode, out, err), where out is the string output
from stdout and err is the string output from stderr when
executing the command.
:param cmd: Command string to execute
:param raise_error: If returncode is not 0 (success), then
raise a RuntimeError? Default: True)
"""
env = os.environ.copy()
# Make sure that we use the programs in the
# current source directory's bin/ directory.
env['PATH'] = melange_client.melange_bin_path() + ':' + env['PATH']
process = subprocess.Popen(cmd,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env)
(out, err) = process.communicate()
exitcode = process.returncode
if process.returncode != 0 and raise_error:
msg = "Command %(cmd)s did not succeed. Returned an exit "\
"code of %(exitcode)d."\
"\n\nSTDOUT: %(out)s"\
"\n\nSTDERR: %(err)s" % locals()
raise RuntimeError(msg)
return {'exitcode': exitcode, 'out': out, 'err': err}

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,29 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def row_view(cells):
return "\t".join(map((lambda (key, value): str(key).ljust(value)),
cells))
def padded_keys(data):
w = {}
for rows in data:
for key in rows:
w[key] = max(len(str(rows[key])), w.get(key, len(key)))
return w

View File

@ -0,0 +1,4 @@
%from melange_client.views.helpers import table
{{table.row_view(table.padded_keys(ip_routes).iteritems())}}
%for route in ip_routes:
{{table.row_view(map(lambda (key,value): (route[key], value), table.padded_keys(ip_routes).iteritems()))}}

367
run_tests.py Normal file
View File

@ -0,0 +1,367 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Unittest runner for Nova.
To run all tests
python run_tests.py
To run a single test:
python run_tests.py test_compute:ComputeTestCase.test_run_terminate
To run a single test module:
python run_tests.py test_compute
or
python run_tests.py api.test_wsgi
"""
import gettext
import heapq
import logging
import os
import unittest
import sys
import time
gettext.install('melange', unicode=1)
from nose import config
from nose import core
from nose import result
class _AnsiColorizer(object):
"""
A colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
Colorizer classes must implement C{supported()} and C{write(text, color)}.
"""
_colors = dict(black=30, red=31, green=32, yellow=33,
blue=34, magenta=35, cyan=36, white=37)
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
raise
# guess false in case of error
return False
supported = classmethod(supported)
def write(self, text, color):
"""
Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
@param color: A string label for a color. e.g. 'red', 'white'.
"""
color = self._colors[color]
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
class _Win32Colorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
from win32console import GetStdHandle, STD_OUT_HANDLE, \
FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
FOREGROUND_INTENSITY
red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
FOREGROUND_BLUE, FOREGROUND_INTENSITY)
self.stream = stream
self.screenBuffer = GetStdHandle(STD_OUT_HANDLE)
self._colors = {
'normal': red | green | blue,
'red': red | bold,
'green': green | bold,
'blue': blue | bold,
'yellow': red | green | bold,
'magenta': red | blue | bold,
'cyan': green | blue | bold,
'white': red | green | blue | bold
}
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
supported = classmethod(supported)
def write(self, text, color):
color = self._colors[color]
self.screenBuffer.SetConsoleTextAttribute(color)
self.stream.write(text)
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
class _NullColorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
return True
supported = classmethod(supported)
def write(self, text, color):
self.stream.write(text)
def get_elapsed_time_color(elapsed_time):
if elapsed_time > 1.0:
return 'red'
elif elapsed_time > 0.25:
return 'yellow'
else:
return 'green'
class MelangeTestResult(result.TextTestResult):
def __init__(self, *args, **kw):
self.show_elapsed = kw.pop('show_elapsed')
result.TextTestResult.__init__(self, *args, **kw)
self.num_slow_tests = 5
self.slow_tests = [] # this is a fixed-sized heap
self._last_case = None
self.colorizer = None
# NOTE(vish): reset stdout for the terminal check
stdout = sys.stdout
sys.stdout = sys.__stdout__
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
if colorizer.supported():
self.colorizer = colorizer(self.stream)
break
sys.stdout = stdout
# NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate
# error results in it failing to be initialized later. Otherwise,
# _handleElapsedTime will fail, causing the wrong error message to
# be outputted.
self.start_time = time.time()
def getDescription(self, test):
return str(test)
def _handleElapsedTime(self, test):
self.elapsed_time = time.time() - self.start_time
item = (self.elapsed_time, test)
# Record only the n-slowest tests using heap
if len(self.slow_tests) >= self.num_slow_tests:
heapq.heappushpop(self.slow_tests, item)
else:
heapq.heappush(self.slow_tests, item)
def _writeElapsedTime(self, test):
color = get_elapsed_time_color(self.elapsed_time)
self.colorizer.write(" %.2f" % self.elapsed_time, color)
def _writeResult(self, test, long_result, color, short_result, success):
if self.showAll:
self.colorizer.write(long_result, color)
if self.show_elapsed and success:
self._writeElapsedTime(test)
self.stream.writeln()
elif self.dots:
self.stream.write(short_result)
self.stream.flush()
# NOTE(vish): copied from unittest with edit to add color
def addSuccess(self, test):
unittest.TestResult.addSuccess(self, test)
self._handleElapsedTime(test)
self._writeResult(test, 'OK', 'green', '.', True)
# NOTE(vish): copied from unittest with edit to add color
def addFailure(self, test, err):
unittest.TestResult.addFailure(self, test, err)
self._handleElapsedTime(test)
self._writeResult(test, 'FAIL', 'red', 'F', False)
# NOTE(vish): copied from nose with edit to add color
def addError(self, test, err):
"""Overrides normal addError to add support for
errorClasses. If the exception is a registered class, the
error will be added to the list for that class, not errors.
"""
self._handleElapsedTime(test)
stream = getattr(self, 'stream', None)
ec, ev, tb = err
try:
exc_info = self._exc_info_to_string(err, test)
except TypeError:
# 2.3 compat
exc_info = self._exc_info_to_string(err)
for cls, (storage, label, isfail) in self.errorClasses.items():
if result.isclass(ec) and issubclass(ec, cls):
if isfail:
test.passed = False
storage.append((test, exc_info))
# Might get patched into a streamless result
if stream is not None:
if self.showAll:
message = [label]
detail = result._exception_detail(err[1])
if detail:
message.append(detail)
stream.writeln(": ".join(message))
elif self.dots:
stream.write(label[:1])
return
self.errors.append((test, exc_info))
test.passed = False
if stream is not None:
self._writeResult(test, 'ERROR', 'red', 'E', False)
def startTest(self, test):
unittest.TestResult.startTest(self, test)
self.start_time = time.time()
current_case = test.test.__class__.__name__
if self.showAll:
if current_case != self._last_case:
self.stream.writeln(current_case)
self._last_case = current_case
self.stream.write(
' %s' % str(test.test._testMethodName).ljust(60))
self.stream.flush()
class MelangeTestRunner(core.TextTestRunner):
def __init__(self, *args, **kwargs):
self.show_elapsed = kwargs.pop('show_elapsed')
core.TextTestRunner.__init__(self, *args, **kwargs)
def _makeResult(self):
return MelangeTestResult(self.stream,
self.descriptions,
self.verbosity,
self.config,
show_elapsed=self.show_elapsed)
def _writeSlowTests(self, result_):
# Pare out 'fast' tests
slow_tests = [item for item in result_.slow_tests
if get_elapsed_time_color(item[0]) != 'green']
if slow_tests:
slow_total_time = sum(item[0] for item in slow_tests)
self.stream.writeln("Slowest %i tests took %.2f secs:"
% (len(slow_tests), slow_total_time))
for elapsed_time, test in sorted(slow_tests, reverse=True):
time_str = "%.2f" % elapsed_time
self.stream.writeln(" %s %s" % (time_str.ljust(10), test))
def run(self, test):
result_ = core.TextTestRunner.run(self, test)
if self.show_elapsed:
self._writeSlowTests(result_)
return result_
if __name__ == '__main__':
logger = logging.getLogger()
hdlr = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.DEBUG)
# If any argument looks like a test name but doesn't have "melange.tests" in
# front of it, automatically add that so we don't have to type as much
show_elapsed = True
argv = []
for x in sys.argv:
if x.startswith('test_'):
argv.append('melange.tests.%s' % x)
elif x.startswith('--hide-elapsed'):
show_elapsed = False
else:
argv.append(x)
testdir = os.path.abspath(os.path.join("melange_client", "tests"))
c = config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=testdir,
plugins=core.DefaultPluginManager())
runner = MelangeTestRunner(stream=c.stream,
verbosity=c.verbosity,
config=c,
show_elapsed=show_elapsed)
sys.exit(not core.run(config=c, testRunner=runner, argv=argv))

168
run_tests.sh Executable file
View File

@ -0,0 +1,168 @@
#!/bin/bash
set -eu
function usage {
echo "Usage: $0 [OPTION]..."
echo "Run Melange's test suite(s)"
echo ""
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"
echo " -r, --recreate-db Recreate the test database (deprecated, as this is now the default)."
echo " -n, --no-recreate-db Don't recreate the test database."
echo " -x, --stop Stop running tests after the first error or failure."
echo " -f, --force Force a clean re-build of the virtual environment. Useful when dependencies have been added."
echo " -p, --pep8 Just run pep8"
echo " -P, --no-pep8 Don't run pep8"
echo " -c, --coverage Generate coverage report"
echo " -h, --help Print this usage message"
echo " --hide-elapsed Don't print the elapsed time for each test along with slow test list"
echo ""
echo "Note: with no options specified, the script will try to run the tests in a virtual environment,"
echo " If no virtualenv is found, the script will ask if you would like to create one. If you "
echo " prefer to run tests NOT in a virtual environment, simply pass the -N option."
exit
}
function process_option {
case "$1" in
-h|--help) usage;;
-V|--virtual-env) always_venv=1; never_venv=0;;
-N|--no-virtual-env) always_venv=0; never_venv=1;;
-r|--recreate-db) recreate_db=1;;
-n|--no-recreate-db) recreate_db=0;;
-f|--force) force=1;;
-p|--pep8) just_pep8=1;;
-P|--no-pep8) no_pep8=1;;
-c|--coverage) coverage=1;;
-*) noseopts="$noseopts $1";;
*) noseargs="$noseargs $1"
esac
}
venv=.venv
with_venv=tools/with_venv.sh
always_venv=0
never_venv=0
force=0
noseargs=
noseopts=
wrapper=""
just_pep8=0
no_pep8=0
coverage=0
recreate_db=1
for arg in "$@"; do
process_option $arg
done
# If enabled, tell nose to collect coverage data
if [ $coverage -eq 1 ]; then
noseopts="$noseopts --with-coverage --cover-package=melange"
fi
function run_tests {
# Just run the test suites in current environment
${wrapper} $NOSETESTS 2> run_tests.log
# If we get some short import error right away, print the error log directly
RESULT=$?
if [ "$RESULT" -ne "0" ];
then
ERRSIZE=`wc -l run_tests.log | awk '{print \$1}'`
if [ "$ERRSIZE" -lt "40" ];
then
cat run_tests.log
fi
fi
return $RESULT
}
function run_pep8 {
echo "Running pep8 ..."
# Opt-out files from pep8
ignore_scripts="*.sh:*melange-debug:*clean-vlans"
ignore_files="*eventlet-patch:*pip-requires"
GLOBIGNORE="$ignore_scripts:$ignore_files"
srcfiles=`find bin -type f ! -name "melange.conf*"`
srcfiles+=" `find tools/*`"
srcfiles+=" bin melange_client"
# Just run PEP8 in current environment
#
# NOTE(sirp): W602 (deprecated 3-arg raise) is being ignored for the
# following reasons:
#
# 1. It's needed to preserve traceback information when re-raising
# exceptions; this is needed b/c Eventlet will clear exceptions when
# switching contexts.
#
# 2. There doesn't appear to be an alternative, "pep8-tool" compatible way of doing this
# in Python 2 (in Python 3 `with_traceback` could be used).
#
# 3. Can find no corroborating evidence that this is deprecated in Python 2
# other than what the PEP8 tool claims. It is deprecated in Python 3, so,
# perhaps the mistake was thinking that the deprecation applied to Python 2
# as well.
${wrapper} pep8 --repeat --show-pep8 --show-source \
--ignore=E202,W602 \
--exclude=vcsversion.py ${srcfiles}
}
NOSETESTS="python run_tests.py $noseopts $noseargs"
if [ $never_venv -eq 0 ]
then
# Remove the virtual environment if --force used
if [ $force -eq 1 ]; then
echo "Cleaning virtualenv..."
rm -rf ${venv}
fi
if [ -e ${venv} ]; then
wrapper="${with_venv}"
else
if [ $always_venv -eq 1 ]; then
# Automatically install the virtualenv
python tools/install_venv.py
wrapper="${with_venv}"
else
echo -e "No virtual environment found...create one? (Y/n) \c"
read use_ve
if [ "x$use_ve" = "xY" -o "x$use_ve" = "x" -o "x$use_ve" = "xy" ]; then
# Install the virtualenv and run the test suite in it
python tools/install_venv.py
wrapper=${with_venv}
fi
fi
fi
fi
# Delete old coverage data from previous runs
if [ $coverage -eq 1 ]; then
${wrapper} coverage erase
fi
if [ $just_pep8 -eq 1 ]; then
run_pep8
exit
fi
if [ $recreate_db -eq 1 ]; then
rm -f tests.sqlite
fi
run_tests
# NOTE(sirp): we only want to run pep8 when we're running the full-test suite,
# not when we're running tests individually. To handle this, we need to
# distinguish between options (noseopts), which begin with a '-', and
# arguments (noseargs).
if [ -z "$noseargs" ]; then
if [ $no_pep8 -eq 0 ]; then
run_pep8
fi
fi
if [ $coverage -eq 1 ]; then
echo "Generating coverage report in covhtml/"
${wrapper} coverage html -d covhtml -i
fi

145
tools/install_venv.py Normal file
View File

@ -0,0 +1,145 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Copyright 2010 OpenStack, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Installation script for Melange client's development virtualenv.
"""
import os
import subprocess
import sys
ROOT = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
VENV = os.path.join(ROOT, '.venv')
PIP_REQUIRES = os.path.join(ROOT, 'tools', 'pip-requires')
PY_VERSION = "python%s.%s" % (sys.version_info[0], sys.version_info[1])
def die(message, *args):
print >> sys.stderr, message % args
sys.exit(1)
def check_python_version():
if sys.version_info < (2, 6):
die("Need Python Version >= 2.6")
def run_command(cmd, redirect_output=True, check_exit_code=True):
"""
Runs a command in an out-of-process shell, returning the
output of that command. Working directory is ROOT.
"""
if redirect_output:
stdout = subprocess.PIPE
else:
stdout = None
proc = subprocess.Popen(cmd, cwd=ROOT, stdout=stdout)
output = proc.communicate()[0]
if check_exit_code and proc.returncode != 0:
die('Command "%s" failed.\n%s', ' '.join(cmd), output)
return output
HAS_EASY_INSTALL = bool(run_command(['which', 'easy_install'],
check_exit_code=False).strip())
HAS_VIRTUALENV = bool(run_command(['which', 'virtualenv'],
check_exit_code=False).strip())
def check_dependencies():
"""Make sure virtualenv is in the path."""
if not HAS_VIRTUALENV:
print 'not found.'
# Try installing it via easy_install...
if HAS_EASY_INSTALL:
print 'Installing virtualenv via easy_install...',
if not (run_command(['which', 'easy_install']) and
run_command(['easy_install', 'virtualenv'])):
die('ERROR: virtualenv not found.\n\nMelange client'
' development requires virtualenv, please install it'
' using your favorite package management tool')
print 'done.'
print 'done.'
def create_virtualenv(venv=VENV):
"""Creates the virtual environment and installs PIP only into the
virtual environment
"""
print 'Creating venv...',
run_command(['virtualenv', '-q', '--no-site-packages', VENV])
print 'done.'
print 'Installing pip in virtualenv...',
if not run_command(['tools/with_venv.sh', 'easy_install', 'pip']).strip():
die("Failed to install pip.")
print 'done.'
def install_dependencies(venv=VENV):
print 'Installing dependencies with pip (this can take a while)...'
# Install greenlet by hand - just listing it in the requires file does not
# get it in stalled in the right order
run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv,
'greenlet'], redirect_output=False)
run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, '-r',
PIP_REQUIRES], redirect_output=False)
# Tell the virtual env how to "import melange"
pthfile = os.path.join(venv, "lib", PY_VERSION, "site-packages",
"melange.pth")
f = open(pthfile, 'w')
f.write("%s\n" % ROOT)
def print_help():
help = """
Melange client development environment setup is complete.
Melange client development uses virtualenv to track and manage Python
dependencies while in development and testing.
To activate the Melange client virtualenv for the extent of your current
shell session you can run:
$ source .venv/bin/activate
Or, if you prefer, you can run commands in the virtualenv on a case by case
basis by running:
$ tools/with_venv.sh <your command>
Also, make test will automatically use the virtualenv.
"""
print help
def main(argv):
check_python_version()
check_dependencies()
create_virtualenv()
install_dependencies()
print_help()
if __name__ == '__main__':
main(sys.argv)

10
tools/pip-requires Normal file
View File

@ -0,0 +1,10 @@
pep8
pylint
mox
nose
sphinx
coverage
nosexcover
httplib2
pyyaml
-e git+https://github.com/jkoelker/openstack-common.git@melange_compat#egg=openstack.common

4
tools/with_venv.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
TOOLS=`dirname $0`
VENV=$TOOLS/../.venv
source $VENV/bin/activate && $@