Remove kong. Fixes bug 1052511.
Change-Id: I5e027c9539fd2c594d24447866e79c8caad9aa40
This commit is contained in:
parent
e62b9f020e
commit
1301f8d026
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,7 +1,4 @@
|
||||
.kong-venv
|
||||
*.pyc
|
||||
etc/config.ini
|
||||
etc/storm.conf
|
||||
etc/tempest.conf
|
||||
include/swift_objects/swift_small
|
||||
include/swift_objects/swift_medium
|
||||
|
@ -1,13 +0,0 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
@ -1,54 +0,0 @@
|
||||
from kong import exceptions
|
||||
|
||||
import httplib2
|
||||
import os
|
||||
import time
|
||||
|
||||
|
||||
class Client(object):
|
||||
|
||||
USER_AGENT = 'python-nova_test_client'
|
||||
|
||||
def __init__(self, host='localhost', port=80, base_url=''):
|
||||
#TODO: join these more robustly
|
||||
self.base_url = "http://%s:%s/%s" % (host, port, base_url)
|
||||
|
||||
def poll_request(self, method, url, check_response, **kwargs):
|
||||
timeout = kwargs.pop('timeout', 180)
|
||||
interval = kwargs.pop('interval', 2)
|
||||
# Start timestamp
|
||||
start_ts = int(time.time())
|
||||
|
||||
while True:
|
||||
resp, body = self.request(method, url, **kwargs)
|
||||
if (check_response(resp, body)):
|
||||
break
|
||||
if (int(time.time()) - start_ts >= timeout):
|
||||
raise exceptions.TimeoutException
|
||||
time.sleep(interval)
|
||||
|
||||
def poll_request_status(self, method, url, status=200, **kwargs):
|
||||
def check_response(resp, body):
|
||||
return resp['status'] == str(status)
|
||||
|
||||
self.poll_request(method, url, check_response, **kwargs)
|
||||
|
||||
def request(self, method, url, **kwargs):
|
||||
# Default to management_url, but can be overridden here
|
||||
# (for auth requests)
|
||||
base_url = kwargs.get('base_url', self.management_url)
|
||||
|
||||
self.http_obj = httplib2.Http()
|
||||
|
||||
params = {}
|
||||
params['headers'] = {'User-Agent': self.USER_AGENT}
|
||||
params['headers'].update(kwargs.get('headers', {}))
|
||||
if 'Content-Type' not in params.get('headers',{}):
|
||||
params['headers']['Content-Type'] = 'application/json'
|
||||
|
||||
if 'body' in kwargs:
|
||||
params['body'] = kwargs.get('body')
|
||||
|
||||
req_url = os.path.join(base_url, url.strip('/'))
|
||||
resp, body = self.http_obj.request(req_url, method, **params)
|
||||
return resp, body
|
@ -1,79 +0,0 @@
|
||||
import time
|
||||
import socket
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore")
|
||||
import paramiko
|
||||
|
||||
|
||||
class Client(object):
|
||||
|
||||
def __init__(self, host, username, password, timeout=300):
|
||||
self.host = host
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.timeout = int(timeout)
|
||||
|
||||
def _get_ssh_connection(self):
|
||||
"""Returns an ssh connection to the specified host"""
|
||||
_timeout = True
|
||||
ssh = paramiko.SSHClient()
|
||||
ssh.set_missing_host_key_policy(
|
||||
paramiko.AutoAddPolicy())
|
||||
_start_time = time.time()
|
||||
|
||||
while not self._is_timed_out(self.timeout, _start_time):
|
||||
try:
|
||||
ssh.connect(self.host, username=self.username,
|
||||
password=self.password, look_for_keys=False,
|
||||
timeout=self.timeout)
|
||||
_timeout = False
|
||||
break
|
||||
except socket.error:
|
||||
continue
|
||||
except paramiko.AuthenticationException:
|
||||
time.sleep(15)
|
||||
continue
|
||||
if _timeout:
|
||||
raise socket.error("SSH connect timed out")
|
||||
return ssh
|
||||
|
||||
def _is_timed_out(self, timeout, start_time):
|
||||
return (time.time() - timeout) > start_time
|
||||
|
||||
def connect_until_closed(self):
|
||||
"""Connect to the server and wait until connection is lost"""
|
||||
try:
|
||||
ssh = self._get_ssh_connection()
|
||||
_transport = ssh.get_transport()
|
||||
_start_time = time.time()
|
||||
_timed_out = self._is_timed_out(self.timeout, _start_time)
|
||||
while _transport.is_active() and not _timed_out:
|
||||
time.sleep(5)
|
||||
_timed_out = self._is_timed_out(self.timeout, _start_time)
|
||||
ssh.close()
|
||||
except (EOFError, paramiko.AuthenticationException, socket.error):
|
||||
return
|
||||
|
||||
def exec_command(self, cmd):
|
||||
"""Execute the specified command on the server.
|
||||
|
||||
:returns: data read from standard output of the command
|
||||
|
||||
"""
|
||||
ssh = self._get_ssh_connection()
|
||||
stdin, stdout, stderr = ssh.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
ssh.close()
|
||||
return output
|
||||
|
||||
def test_connection_auth(self):
|
||||
""" Returns true if ssh can connect to server"""
|
||||
try:
|
||||
connection = self._get_ssh_connection()
|
||||
connection.close()
|
||||
except paramiko.AuthenticationException:
|
||||
return False
|
||||
|
||||
return True
|
@ -1,15 +0,0 @@
|
||||
import datetime
|
||||
|
||||
|
||||
#TODO(bcwaldon): expand this to support more iso-compliant formats
|
||||
ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
|
||||
|
||||
|
||||
def load_isotime(time_str):
|
||||
"""Convert formatted time stjring to a datetime object."""
|
||||
return datetime.datetime.strptime(time_str, ISO_TIME_FORMAT)
|
||||
|
||||
|
||||
def dump_isotime(datetime_obj):
|
||||
"""Format a datetime object as an iso-8601 string."""
|
||||
return datetime_obj.strftime(ISO_TIME_FORMAT)
|
112
kong/config.py
112
kong/config.py
@ -1,112 +0,0 @@
|
||||
import ConfigParser
|
||||
|
||||
|
||||
class NovaConfig(object):
|
||||
"""Provides configuration information for connecting to Nova."""
|
||||
|
||||
def __init__(self, conf):
|
||||
"""Initialize a Nova-specific configuration object."""
|
||||
self.conf = conf
|
||||
|
||||
def get(self, item_name, default_value):
|
||||
try:
|
||||
return self.conf.get("nova", item_name)
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
return default_value
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
"""Host for the Nova HTTP API. Defaults to 127.0.0.1."""
|
||||
return self.get("host", "127.0.0.1")
|
||||
|
||||
@property
|
||||
def port(self):
|
||||
"""Port for the Nova HTTP API. Defaults to 8774."""
|
||||
return int(self.get("port", 8774))
|
||||
|
||||
@property
|
||||
def username(self):
|
||||
"""Username to use for Nova API requests. Defaults to 'admin'."""
|
||||
return self.get("user", "admin")
|
||||
|
||||
@property
|
||||
def base_url(self):
|
||||
"""Base of the HTTP API URL. Defaults to '/v1.1'."""
|
||||
return self.get("base_url", "/v1.1")
|
||||
|
||||
@property
|
||||
def project_id(self):
|
||||
"""Base of the HTTP API URL. Defaults to '/v1.1'."""
|
||||
return self.get("project_id", "admin")
|
||||
|
||||
@property
|
||||
def api_key(self):
|
||||
"""API key to use when authenticating. Defaults to 'admin_key'."""
|
||||
return self.get("api_key", "admin_key")
|
||||
|
||||
@property
|
||||
def ssh_timeout(self):
|
||||
"""Timeout in seconds to use when connecting via ssh."""
|
||||
return float(self.get("ssh_timeout", 300))
|
||||
|
||||
@property
|
||||
def build_timeout(self):
|
||||
"""Timeout in seconds to use when connecting via ssh."""
|
||||
return float(self.get("build_timeout", 300))
|
||||
|
||||
|
||||
|
||||
class EnvironmentConfig(object):
|
||||
def __init__(self, conf):
|
||||
"""Initialize a Environment-specific configuration object."""
|
||||
self.conf = conf
|
||||
|
||||
def get(self, item_name, default_value):
|
||||
try:
|
||||
return self.conf.get("environment", item_name)
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
return default_value
|
||||
|
||||
@property
|
||||
def image_ref(self):
|
||||
"""Valid imageRef to use """
|
||||
return self.get("image_ref", 3);
|
||||
|
||||
@property
|
||||
def image_ref_alt(self):
|
||||
"""Valid imageRef to rebuild images with"""
|
||||
return self.get("image_ref_alt", 3);
|
||||
|
||||
@property
|
||||
def flavor_ref(self):
|
||||
"""Valid flavorRef to use"""
|
||||
return self.get("flavor_ref", 1);
|
||||
|
||||
@property
|
||||
def flavor_ref_alt(self):
|
||||
"""Valid flavorRef to resize images with"""
|
||||
return self.get("flavor_ref_alt", 2);
|
||||
|
||||
@property
|
||||
def multi_node(self):
|
||||
""" Does the test environment have more than one compute node """
|
||||
return self.get("multi_node", 'false') != 'false'
|
||||
|
||||
|
||||
class StackConfig(object):
|
||||
"""Provides `kong` configuration information."""
|
||||
|
||||
_path = None
|
||||
|
||||
def __init__(self, path=None):
|
||||
"""Initialize a configuration from a path."""
|
||||
self._path = path or self._path
|
||||
self._conf = self.load_config(self._path)
|
||||
self.nova = NovaConfig(self._conf)
|
||||
self.env = EnvironmentConfig(self._conf)
|
||||
|
||||
def load_config(self, path=None):
|
||||
"""Read configuration from given path and return a config object."""
|
||||
config = ConfigParser.SafeConfigParser()
|
||||
config.read(path)
|
||||
return config
|
@ -1,9 +0,0 @@
|
||||
|
||||
class TimeoutException(Exception):
|
||||
""" Exception on timeout """
|
||||
def __repr__(self):
|
||||
return "Request Timed Out"
|
||||
|
||||
|
||||
class ServerNotFound(KeyError):
|
||||
pass
|
@ -1,25 +0,0 @@
|
||||
import re
|
||||
|
||||
|
||||
class KnownIssuesFinder(object):
|
||||
|
||||
def __init__(self):
|
||||
self.count = 0
|
||||
self._pattern = re.compile('# *KNOWN-ISSUE')
|
||||
|
||||
def find_known_issues(self, package):
|
||||
for file in self._find_test_module_files(package):
|
||||
self._count_known_issues(file)
|
||||
|
||||
def _find_test_module_files(self, package):
|
||||
for name in dir(package):
|
||||
if name.startswith('test'):
|
||||
module = getattr(package, name)
|
||||
yield module.__file__
|
||||
|
||||
def _count_known_issues(self, file):
|
||||
if file.endswith('.pyc') or file.endswith('.pyo'):
|
||||
file = file[0:-1]
|
||||
for line in open(file):
|
||||
if self._pattern.search(line) is not None:
|
||||
self.count += 1
|
@ -1,34 +0,0 @@
|
||||
import json
|
||||
|
||||
import kong.common.http
|
||||
from kong import exceptions
|
||||
|
||||
|
||||
class API(kong.common.http.Client):
|
||||
"""Barebones Keystone HTTP API client."""
|
||||
|
||||
def __init__(self, service_host, service_port):
|
||||
super(API, self).__init__(service_host, service_port, 'v2.0')
|
||||
|
||||
#TODO(bcwaldon): This is a hack, we should clean up the superclass
|
||||
self.management_url = self.base_url
|
||||
|
||||
def get_token(self, user, password, tenant_id):
|
||||
headers = {'content-type': 'application/json'}
|
||||
|
||||
body = {
|
||||
"auth": {
|
||||
"passwordCredentials":{
|
||||
"username": user,
|
||||
"password": password,
|
||||
},
|
||||
"tenantId": tenant_id,
|
||||
},
|
||||
}
|
||||
|
||||
response, content = self.request('POST', '/tokens',
|
||||
headers=headers,
|
||||
body=json.dumps(body))
|
||||
|
||||
res_body = json.loads(content)
|
||||
return res_body['access']['token']['id']
|
153
kong/nova.py
153
kong/nova.py
@ -1,153 +0,0 @@
|
||||
import json
|
||||
|
||||
import kong.common.http
|
||||
from kong import exceptions
|
||||
|
||||
|
||||
class API(kong.common.http.Client):
|
||||
"""Barebones Nova HTTP API client."""
|
||||
|
||||
def __init__(self, host, port, base_url, user, api_key, project_id=''):
|
||||
"""Initialize Nova HTTP API client.
|
||||
|
||||
:param host: Hostname/IP of the Nova API to test.
|
||||
:param port: Port of the Nova API to test.
|
||||
:param base_url: Version identifier (normally /v1.0 or /v1.1)
|
||||
:param user: The username to use for tests.
|
||||
:param api_key: The API key of the user.
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
super(API, self).__init__(host, port, base_url)
|
||||
self.user = user
|
||||
self.api_key = api_key
|
||||
self.project_id = project_id
|
||||
# Default to same as base_url, but will be changed for auth
|
||||
self.management_url = self.base_url
|
||||
|
||||
def authenticate(self, user, api_key, project_id):
|
||||
"""Request and return an authentication token from Nova.
|
||||
|
||||
:param user: The username we're authenticating.
|
||||
:param api_key: The API key for the user we're authenticating.
|
||||
:returns: Authentication token (string)
|
||||
:raises: KeyError if authentication fails.
|
||||
|
||||
"""
|
||||
headers = {
|
||||
'X-Auth-User': user,
|
||||
'X-Auth-Key': api_key,
|
||||
'X-Auth-Project-Id': project_id,
|
||||
}
|
||||
resp, body = super(API, self).request('GET', '', headers=headers,
|
||||
base_url=self.base_url)
|
||||
|
||||
try:
|
||||
self.management_url = resp['x-server-management-url']
|
||||
return resp['x-auth-token']
|
||||
except KeyError:
|
||||
print "Failed to authenticate user"
|
||||
raise
|
||||
|
||||
def _wait_for_entity_status(self, url, entity_name, status, **kwargs):
|
||||
"""Poll the provided url until expected entity status is returned"""
|
||||
|
||||
def check_response(resp, body):
|
||||
try:
|
||||
data = json.loads(body)
|
||||
return data[entity_name]['status'] == status
|
||||
except (ValueError, KeyError):
|
||||
return False
|
||||
|
||||
try:
|
||||
self.poll_request('GET', url, check_response, **kwargs)
|
||||
except exceptions.TimeoutException:
|
||||
msg = "%s failed to reach status %s" % (entity_name, status)
|
||||
raise AssertionError(msg)
|
||||
|
||||
def wait_for_server_status(self, server_id, status='ACTIVE', **kwargs):
|
||||
"""Wait for the server status to be equal to the status passed in.
|
||||
|
||||
:param server_id: Server ID to query.
|
||||
:param status: The status string to look for.
|
||||
:returns: None
|
||||
:raises: AssertionError if request times out
|
||||
|
||||
"""
|
||||
url = '/servers/%s' % server_id
|
||||
return self._wait_for_entity_status(url, 'server', status, **kwargs)
|
||||
|
||||
def wait_for_image_status(self, image_id, status='ACTIVE', **kwargs):
|
||||
"""Wait for the image status to be equal to the status passed in.
|
||||
|
||||
:param image_id: Image ID to query.
|
||||
:param status: The status string to look for.
|
||||
:returns: None
|
||||
:raises: AssertionError if request times out
|
||||
|
||||
"""
|
||||
url = '/images/%s' % image_id
|
||||
return self._wait_for_entity_status(url, 'image', status, **kwargs)
|
||||
|
||||
def request(self, method, url, **kwargs):
|
||||
"""Generic HTTP request on the Nova API.
|
||||
|
||||
:param method: Request verb to use (GET, PUT, POST, etc.)
|
||||
:param url: The API resource to request.
|
||||
:param kwargs: Additional keyword arguments to pass to the request.
|
||||
:returns: HTTP response object.
|
||||
|
||||
"""
|
||||
headers = kwargs.get('headers', {})
|
||||
project_id = kwargs.get('project_id', self.project_id)
|
||||
|
||||
headers['X-Auth-Token'] = self.authenticate(self.user, self.api_key,
|
||||
project_id)
|
||||
kwargs['headers'] = headers
|
||||
return super(API, self).request(method, url, **kwargs)
|
||||
|
||||
def get_server(self, server_id):
|
||||
"""Fetch a server by id
|
||||
|
||||
:param server_id: dict of server attributes
|
||||
:returns: dict of server attributes
|
||||
:raises: ServerNotFound if server does not exist
|
||||
|
||||
"""
|
||||
resp, body = self.request('GET', '/servers/%s' % server_id)
|
||||
try:
|
||||
assert resp['status'] == '200'
|
||||
data = json.loads(body)
|
||||
return data['server']
|
||||
except (AssertionError, ValueError, TypeError, KeyError):
|
||||
raise exceptions.ServerNotFound(server_id)
|
||||
|
||||
def create_server(self, entity):
|
||||
"""Attempt to create a new server.
|
||||
|
||||
:param entity: dict of server attributes
|
||||
:returns: dict of server attributes after creation
|
||||
:raises: AssertionError if server creation fails
|
||||
|
||||
"""
|
||||
post_body = json.dumps({
|
||||
'server': entity,
|
||||
})
|
||||
|
||||
resp, body = self.request('POST', '/servers', body=post_body)
|
||||
try:
|
||||
assert resp['status'] == '202'
|
||||
data = json.loads(body)
|
||||
return data['server']
|
||||
except (AssertionError, ValueError, TypeError, KeyError):
|
||||
raise AssertionError("Failed to create server")
|
||||
|
||||
def delete_server(self, server_id):
|
||||
"""Attempt to delete a server.
|
||||
|
||||
:param server_id: server identifier
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
url = '/servers/%s' % server_id
|
||||
response, body = self.request('DELETE', url)
|
@ -1,14 +0,0 @@
|
||||
import kong.config
|
||||
import kong.nova
|
||||
|
||||
|
||||
class Manager(object):
|
||||
"""Top-level object to access OpenStack resources."""
|
||||
|
||||
def __init__(self, nova):
|
||||
self.nova = kong.nova.API(nova['host'],
|
||||
nova['port'],
|
||||
nova['ver'],
|
||||
nova['user'],
|
||||
nova['key'],
|
||||
nova['project'])
|
@ -1,299 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
import heapq
|
||||
import os
|
||||
import unittest
|
||||
import sys
|
||||
import time
|
||||
|
||||
from nose import config
|
||||
from nose import result
|
||||
from nose import core
|
||||
|
||||
|
||||
class _AnsiColorizer(object):
|
||||
"""
|
||||
A colorizer is an object that loosely wraps around a stream, allowing
|
||||
callers to write text to the stream in a particular color.
|
||||
|
||||
Colorizer classes must implement C{supported()} and C{write(text, color)}.
|
||||
"""
|
||||
_colors = dict(black=30, red=31, green=32, yellow=33,
|
||||
blue=34, magenta=35, cyan=36, white=37)
|
||||
|
||||
def __init__(self, stream):
|
||||
self.stream = stream
|
||||
|
||||
def supported(cls, stream=sys.stdout):
|
||||
"""
|
||||
A class method that returns True if the current platform supports
|
||||
coloring terminal output using this method. Returns False otherwise.
|
||||
"""
|
||||
if not stream.isatty():
|
||||
return False # auto color only on TTYs
|
||||
try:
|
||||
import curses
|
||||
except ImportError:
|
||||
return False
|
||||
else:
|
||||
try:
|
||||
try:
|
||||
return curses.tigetnum("colors") > 2
|
||||
except curses.error:
|
||||
curses.setupterm()
|
||||
return curses.tigetnum("colors") > 2
|
||||
except:
|
||||
raise
|
||||
# guess false in case of error
|
||||
return False
|
||||
supported = classmethod(supported)
|
||||
|
||||
def write(self, text, color):
|
||||
"""
|
||||
Write the given text to the stream in the given color.
|
||||
|
||||
@param text: Text to be written to the stream.
|
||||
|
||||
@param color: A string label for a color. e.g. 'red', 'white'.
|
||||
"""
|
||||
color = self._colors[color]
|
||||
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
|
||||
|
||||
|
||||
class _Win32Colorizer(object):
|
||||
"""
|
||||
See _AnsiColorizer docstring.
|
||||
"""
|
||||
def __init__(self, stream):
|
||||
from win32console import GetStdHandle, STD_OUT_HANDLE, \
|
||||
FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
|
||||
FOREGROUND_INTENSITY
|
||||
red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
|
||||
FOREGROUND_BLUE, FOREGROUND_INTENSITY)
|
||||
self.stream = stream
|
||||
self.screenBuffer = GetStdHandle(STD_OUT_HANDLE)
|
||||
self._colors = {
|
||||
'normal': red | green | blue,
|
||||
'red': red | bold,
|
||||
'green': green | bold,
|
||||
'blue': blue | bold,
|
||||
'yellow': red | green | bold,
|
||||
'magenta': red | blue | bold,
|
||||
'cyan': green | blue | bold,
|
||||
'white': red | green | blue | bold
|
||||
}
|
||||
|
||||
def supported(cls, stream=sys.stdout):
|
||||
try:
|
||||
import win32console
|
||||
screenBuffer = win32console.GetStdHandle(
|
||||
win32console.STD_OUT_HANDLE)
|
||||
except ImportError:
|
||||
return False
|
||||
import pywintypes
|
||||
try:
|
||||
screenBuffer.SetConsoleTextAttribute(
|
||||
win32console.FOREGROUND_RED |
|
||||
win32console.FOREGROUND_GREEN |
|
||||
win32console.FOREGROUND_BLUE)
|
||||
except pywintypes.error:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
supported = classmethod(supported)
|
||||
|
||||
def write(self, text, color):
|
||||
color = self._colors[color]
|
||||
self.screenBuffer.SetConsoleTextAttribute(color)
|
||||
self.stream.write(text)
|
||||
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
|
||||
|
||||
|
||||
class _NullColorizer(object):
|
||||
"""
|
||||
See _AnsiColorizer docstring.
|
||||
"""
|
||||
def __init__(self, stream):
|
||||
self.stream = stream
|
||||
|
||||
def supported(cls, stream=sys.stdout):
|
||||
return True
|
||||
supported = classmethod(supported)
|
||||
|
||||
def write(self, text, color):
|
||||
self.stream.write(text)
|
||||
|
||||
|
||||
def get_elapsed_time_color(elapsed_time):
|
||||
if elapsed_time > 1.0:
|
||||
return 'red'
|
||||
elif elapsed_time > 0.25:
|
||||
return 'yellow'
|
||||
else:
|
||||
return 'green'
|
||||
|
||||
|
||||
class KongTestResult(result.TextTestResult):
|
||||
def __init__(self, *args, **kw):
|
||||
self.show_elapsed = kw.pop('show_elapsed')
|
||||
result.TextTestResult.__init__(self, *args, **kw)
|
||||
self.num_slow_tests = 5
|
||||
self.slow_tests = [] # this is a fixed-sized heap
|
||||
self._last_case = None
|
||||
self.colorizer = None
|
||||
# NOTE(vish, tfukushima): reset stdout for the terminal check
|
||||
stdout = sys.__stdout__
|
||||
sys.stdout = sys.__stdout__
|
||||
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
|
||||
if colorizer.supported():
|
||||
self.colorizer = colorizer(self.stream)
|
||||
break
|
||||
sys.stdout = stdout
|
||||
|
||||
# NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate
|
||||
# error results in it failing to be initialized later. Otherwise,
|
||||
# _handleElapsedTime will fail, causing the wrong error message to
|
||||
# be outputted.
|
||||
self.start_time = time.time()
|
||||
|
||||
def getDescription(self, test):
|
||||
return str(test)
|
||||
|
||||
def _handleElapsedTime(self, test):
|
||||
self.elapsed_time = time.time() - self.start_time
|
||||
item = (self.elapsed_time, test)
|
||||
# Record only the n-slowest tests using heap
|
||||
if len(self.slow_tests) >= self.num_slow_tests:
|
||||
heapq.heappushpop(self.slow_tests, item)
|
||||
else:
|
||||
heapq.heappush(self.slow_tests, item)
|
||||
|
||||
def _writeElapsedTime(self, test):
|
||||
color = get_elapsed_time_color(self.elapsed_time)
|
||||
self.colorizer.write(" %.2f" % self.elapsed_time, color)
|
||||
|
||||
def _writeResult(self, test, long_result, color, short_result, success):
|
||||
if self.showAll:
|
||||
self.colorizer.write(long_result, color)
|
||||
if self.show_elapsed and success:
|
||||
self._writeElapsedTime(test)
|
||||
self.stream.writeln()
|
||||
elif self.dots:
|
||||
self.stream.write(short_result)
|
||||
self.stream.flush()
|
||||
|
||||
# NOTE(vish, tfukushima): copied from unittest with edit to add color
|
||||
def addSuccess(self, test):
|
||||
unittest.TestResult.addSuccess(self, test)
|
||||
self._handleElapsedTime(test)
|
||||
self._writeResult(test, 'OK', 'green', '.', True)
|
||||
|
||||
# NOTE(vish, tfukushima): copied from unittest with edit to add color
|
||||
def addFailure(self, test, err):
|
||||
unittest.TestResult.addFailure(self, test, err)
|
||||
self._handleElapsedTime(test)
|
||||
self._writeResult(test, 'FAIL', 'red', 'F', False)
|
||||
|
||||
# NOTE(vish, tfukushima): copied from unittest with edit to add color
|
||||
def addError(self, test, err):
|
||||
"""Overrides normal addError to add support for errorClasses.
|
||||
If the exception is a registered class, the error will be added
|
||||
to the list for that class, not errors.
|
||||
"""
|
||||
self._handleElapsedTime(test)
|
||||
stream = getattr(self, 'stream', None)
|
||||
ec, ev, tb = err
|
||||
try:
|
||||
exc_info = self._exc_info_to_string(err, test)
|
||||
except TypeError:
|
||||
# This is for compatibility with Python 2.3.
|
||||
exc_info = self._exc_info_to_string(err)
|
||||
for cls, (storage, label, isfail) in self.errorClasses.items():
|
||||
if result.isclass(ec) and issubclass(ec, cls):
|
||||
if isfail:
|
||||
test.passwd = False
|
||||
storage.append((test, exc_info))
|
||||
# Might get patched into a streamless result
|
||||
if stream is not None:
|
||||
if self.showAll:
|
||||
message = [label]
|
||||
detail = result._exception_detail(err[1])
|
||||
if detail:
|
||||
message.append(detail)
|
||||
stream.writeln(": ".join(message))
|
||||
elif self.dots:
|
||||
stream.write(label[:1])
|
||||
return
|
||||
self.errors.append((test, exc_info))
|
||||
test.passed = False
|
||||
if stream is not None:
|
||||
self._writeResult(test, 'ERROR', 'red', 'E', False)
|
||||
|
||||
def startTest(self, test):
|
||||
unittest.TestResult.startTest(self, test)
|
||||
self.start_time = time.time()
|
||||
current_case = test.test.__class__.__name__
|
||||
|
||||
if self.showAll:
|
||||
if current_case != self._last_case:
|
||||
self.stream.writeln(current_case)
|
||||
self._last_case = current_case
|
||||
|
||||
self.stream.write(
|
||||
' %s' % str(test.test._testMethodName).ljust(60))
|
||||
self.stream.flush()
|
||||
|
||||
|
||||
class KongTestRunner(core.TextTestRunner):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.show_elapsed = kwargs.pop('show_elapsed')
|
||||
core.TextTestRunner.__init__(self, *args, **kwargs)
|
||||
|
||||
def _makeResult(self):
|
||||
return KongTestResult(self.stream,
|
||||
self.descriptions,
|
||||
self.verbosity,
|
||||
self.config,
|
||||
show_elapsed=self.show_elapsed)
|
||||
|
||||
def _writeSlowTests(self, result_):
|
||||
# Pare out 'fast' tests
|
||||
slow_tests = [item for item in result_.slow_tests
|
||||
if get_elapsed_time_color(item[0]) != 'green']
|
||||
if slow_tests:
|
||||
slow_total_time = sum(item[0] for item in slow_tests)
|
||||
self.stream.writeln("Slowest %i tests took %.2f secs:"
|
||||
% (len(slow_tests), slow_total_time))
|
||||
for elapsed_time, test in sorted(slow_tests, reverse=True):
|
||||
time_str = "%.2f" % elapsed_time
|
||||
self.stream.writeln(" %s %s" % (time_str.ljust(10), test))
|
||||
|
||||
def run(self, test):
|
||||
result_ = core.TextTestRunner.run(self, test)
|
||||
if self.show_elapsed:
|
||||
self._writeSlowTests(result_)
|
||||
return result_
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
show_elapsed = True
|
||||
argv = []
|
||||
for x in sys.argv:
|
||||
if x.startswith('test_'):
|
||||
argv.append('nova.tests.%s' % x)
|
||||
elif x.startswith('--hide-elapsed'):
|
||||
show_elapsed = False
|
||||
else:
|
||||
argv.append(x)
|
||||
|
||||
c = config.Config(stream=sys.stdout,
|
||||
env=os.environ,
|
||||
verbosity=3,
|
||||
plugins=core.DefaultPluginManager())
|
||||
|
||||
runner = KongTestRunner(stream=c.stream,
|
||||
verbosity=c.verbosity,
|
||||
config=c,
|
||||
show_elapsed=show_elapsed)
|
||||
sys.exit(not core.run(config=c, testRunner=runner, argv=argv))
|
@ -1,99 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
function usage {
|
||||
echo "Usage: $0 [OPTION]..."
|
||||
echo "Run the Kong test suite(s)"
|
||||
echo ""
|
||||
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
|
||||
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"
|
||||
echo " -f, --force Force a clean re-build of the virtual environment. Useful when dependencies have been added."
|
||||
echo " -p, --pep8 Just run pep8"
|
||||
echo " --nova Run all tests tagged as \"nova\"."
|
||||
echo " --swift Run all tests tagged as \"swift\"."
|
||||
echo " --glance Run all tests tagged as \"glance\"."
|
||||
echo " --auth Run all tests tagged as \"auth\"."
|
||||
echo " -h, --help Print this usage message"
|
||||
echo ""
|
||||
echo "Note: with no options specified, the script will try to run the tests in a virtual environment,"
|
||||
echo " If no virtualenv is found, the script will ask if you would like to create one. If you "
|
||||
echo " prefer to run tests NOT in a virtual environment, simply pass the -N option."
|
||||
exit
|
||||
}
|
||||
|
||||
function process_option {
|
||||
case "$1" in
|
||||
-h|--help) usage;;
|
||||
-V|--virtual-env) let always_venv=1; let never_venv=0;;
|
||||
-N|--no-virtual-env) let always_venv=0; let never_venv=1;;
|
||||
-f|--force) let force=1;;
|
||||
-p|--pep8) let just_pep8=1;;
|
||||
--nova) noseargs="$noseargs -a tags=nova";;
|
||||
--glance) noseargs="$noseargs -a tags=glance";;
|
||||
--swift) noseargs="$noseargs -a tags=swift";;
|
||||
--auth) noseargs="$noseargs -a tags=auth";;
|
||||
*) noseargs="$noseargs $1"
|
||||
esac
|
||||
}
|
||||
|
||||
base_dir=$(dirname $0)
|
||||
venv=.kong-venv
|
||||
with_venv=../tools/with_venv.sh
|
||||
always_venv=0
|
||||
never_venv=0
|
||||
force=0
|
||||
noseargs=
|
||||
wrapper=""
|
||||
just_pep8=0
|
||||
|
||||
for arg in "$@"; do
|
||||
process_option $arg
|
||||
done
|
||||
|
||||
function run_tests {
|
||||
# Just run the test suites in current environment
|
||||
${wrapper} $NOSETESTS 2> run_tests.err.log
|
||||
}
|
||||
|
||||
function run_pep8 {
|
||||
echo "Running pep8 ..."
|
||||
PEP8_EXCLUDE=vcsversion.y
|
||||
PEP8_OPTIONS="--exclude=$PEP8_EXCLUDE --repeat --show-pep8 --show-source"
|
||||
PEP8_INCLUDE="tests tools run_tests.py"
|
||||
${wrapper} pep8 $PEP8_OPTIONS $PEP8_INCLUDE || exit 1
|
||||
}
|
||||
NOSETESTS="env python run_tests.py $noseargs"
|
||||
function setup_venv {
|
||||
if [ $never_venv -eq 0 ]
|
||||
then
|
||||
# Remove the virtual environment if --force used
|
||||
if [ $force -eq 1 ]; then
|
||||
echo "Cleaning virtualenv..."
|
||||
rm -rf ${venv}
|
||||
fi
|
||||
if [ -e "../${venv}" ]; then
|
||||
wrapper="${with_venv}"
|
||||
else
|
||||
if [ $always_venv -eq 1 ]; then
|
||||
# Automatically install the virtualenv
|
||||
use_ve='y'
|
||||
else
|
||||
echo -e "No virtual environment found...create one? (Y/n) \c"
|
||||
read use_ve
|
||||
fi
|
||||
if [ "x$use_ve" = "xY" -o "x$use_ve" = "x" -o "x$use_ve" = "xy" ]; then
|
||||
# Install the virtualenv and run the test suite in it
|
||||
env python ../tools/install_venv.py
|
||||
wrapper=${with_venv}
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
if [ $just_pep8 -eq 1 ]; then
|
||||
run_pep8
|
||||
exit
|
||||
fi
|
||||
|
||||
cd $base_dir
|
||||
setup_venv
|
||||
run_tests || exit
|
@ -1,54 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack, LLC
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Functional test case to check the status of gepetto and
|
||||
set information of hosts etc..
|
||||
"""
|
||||
|
||||
import os
|
||||
from kong import tests
|
||||
|
||||
|
||||
class TestSkipExamples(tests.FunctionalTest):
|
||||
@tests.skip_test("testing skipping")
|
||||
def test_absolute_skip(self):
|
||||
x = 1
|
||||
|
||||
@tests.skip_unless(os.getenv("BLAH"),
|
||||
"Skipping -- Environment variable BLAH does not exist")
|
||||
def test_skip_unless_env_blah_exists(self):
|
||||
x = 1
|
||||
|
||||
@tests.skip_unless(os.getenv("USER"),
|
||||
"Not Skipping -- Environment variable USER does not exist")
|
||||
def test_skip_unless_env_user_exists(self):
|
||||
x = 1
|
||||
|
||||
@tests.skip_if(os.getenv("USER"),
|
||||
"Skiping -- Environment variable USER exists")
|
||||
def test_skip_if_env_user_exists(self):
|
||||
x = 1
|
||||
|
||||
@tests.skip_if(os.getenv("BLAH"),
|
||||
"Not Skipping -- Environment variable BLAH exists")
|
||||
def test_skip_if_env_blah_exists(self):
|
||||
x = 1
|
||||
|
||||
def test_tags_example(self):
|
||||
pass
|
||||
test_tags_example.tags = ['kvm', 'olympus']
|
@ -1,106 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack, LLC
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Functional test case to check RabbitMQ """
|
||||
try:
|
||||
import pika
|
||||
except ImportError:
|
||||
pika = None
|
||||
from kong import tests
|
||||
|
||||
from pprint import pprint
|
||||
#RABBITMQ_HOST = get_config("rabbitmq/host")
|
||||
#RABBITMQ_USERNAME = get_config("rabbitmq/user")
|
||||
#RABBITMQ_PASSWORD = get_config("rabbitmq/password")
|
||||
|
||||
|
||||
class TestRabbitMQ(tests.FunctionalTest):
|
||||
@tests.skip_unless(pika, "pika not available")
|
||||
def test_000_ghetto(self):
|
||||
"""
|
||||
This sets the host, user, and pass self variables so they
|
||||
are accessible by all other methods
|
||||
"""
|
||||
self.rabbitmq['host'] = self.config['rabbitmq']['host']
|
||||
self.rabbitmq['user'] = self.config['rabbitmq']['user']
|
||||
self.rabbitmq['pass'] = self.config['rabbitmq']['password']
|
||||
test_000_ghetto.tags = ['rabbitmq']
|
||||
|
||||
def _cnx(self):
|
||||
creds = pika.credentials.PlainCredentials(
|
||||
self.rabbitmq['user'], self.rabbitmq['pass'])
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(
|
||||
host=self.rabbitmq['host'],credentials=creds))
|
||||
channel = connection.channel()
|
||||
return (channel, connection)
|
||||
|
||||
@tests.skip_unless(pika, "pika not available")
|
||||
def test_001_connect(self):
|
||||
channel, connection = self._cnx()
|
||||
self.assert_(channel)
|
||||
connection.close()
|
||||
test_001_connect.tags = ['rabbitmq']
|
||||
|
||||
@tests.skip_unless(pika, "pika not available")
|
||||
def test_002_send_receive_msg(self):
|
||||
unitmsg = 'Hello from unittest'
|
||||
channel, connection = self._cnx()
|
||||
channel.queue_declare(queue='u1')
|
||||
channel.basic_publish(exchange='',
|
||||
routing_key='u1',
|
||||
body=unitmsg)
|
||||
connection.close()
|
||||
|
||||
channel, connection = self._cnx()
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
self.assertEquals(body, unitmsg)
|
||||
ch.stop_consuming()
|
||||
|
||||
channel.basic_consume(callback,
|
||||
queue='u1',
|
||||
no_ack=True)
|
||||
channel.start_consuming()
|
||||
test_002_send_receive_msg.tags = ['rabbitmq']
|
||||
|
||||
@tests.skip_unless(pika, "pika not available")
|
||||
def test_003_send_receive_msg_with_persistense(self):
|
||||
unitmsg = 'Hello from unittest with Persistense'
|
||||
channel, connection = self._cnx()
|
||||
channel.queue_declare(queue='u2', durable=True)
|
||||
prop = pika.BasicProperties(delivery_mode=2)
|
||||
channel.basic_publish(exchange='',
|
||||
routing_key='u2',
|
||||
body=unitmsg,
|
||||
properties=prop,
|
||||
)
|
||||
connection.close()
|
||||
|
||||
channel, connection = self._cnx()
|
||||
channel.queue_declare(queue='u2', durable=True)
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
self.assertEquals(body, unitmsg)
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
ch.stop_consuming()
|
||||
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(callback,
|
||||
queue='u2')
|
||||
|
||||
channel.start_consuming()
|
||||
test_003_send_receive_msg_with_persistense.tags = ['rabbitmq']
|
@ -1,185 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack, LLC
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Functional test case for OpenStack Swift """
|
||||
|
||||
import httplib2
|
||||
import os
|
||||
|
||||
from pprint import pprint
|
||||
|
||||
from kong import tests
|
||||
|
||||
SMALL_OBJ = "include/swift_objects/swift_small"
|
||||
MED_OBJ = "include/swift_objects/swift_medium"
|
||||
LRG_OBJ = "include/swift_objects/swift_large"
|
||||
|
||||
|
||||
class TestSwift(tests.FunctionalTest):
|
||||
def test_000_auth(self):
|
||||
if self.swift['auth_ssl'] == "False":
|
||||
prot = "http://"
|
||||
else:
|
||||
prot = "https://"
|
||||
|
||||
path = "%s%s:%s%s%s" % (prot, self.swift['auth_host'],
|
||||
self.swift['auth_port'],
|
||||
self.swift['auth_prefix'],
|
||||
self.swift['ver'])
|
||||
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
self.swift['auth_user'] = '%s:%s' % (self.swift['account'],
|
||||
self.swift['username'])
|
||||
headers = {'X-Auth-User': '%s' % (self.swift['auth_user']),
|
||||
'X-Auth-Key': '%s' % (self.swift['password'])}
|
||||
response, content = http.request(path, 'GET', headers=headers)
|
||||
self.assertEqual(response.status, 200)
|
||||
self.assertIsNotNone(response['x-auth-token'])
|
||||
self.assertIsNotNone(response['x-storage-token'])
|
||||
self.assertIsNotNone(response['x-storage-url'])
|
||||
|
||||
# TODO: there has got to be a better way to do this (jshepher)
|
||||
for k, v in response.items():
|
||||
if (k == 'x-auth-token'):
|
||||
self.swift['x-auth-token'] = v
|
||||
if (k == 'x-storage-token'):
|
||||
self.swift['x-storage-token'] = v
|
||||
|
||||
# Since we don't have DNS this is a bit of a hack, but works
|
||||
url = response['x-storage-url'].split('/')
|
||||
self.swift['storage_url'] = "%s//%s:%s/%s/%s" % (url[0],
|
||||
self.swift['auth_host'],
|
||||
self.swift['auth_port'],
|
||||
url[3],
|
||||
url[4])
|
||||
test_000_auth.tags = ['swift']
|
||||
|
||||
def test_001_create_container(self):
|
||||
path = "%s/%s/" % (self.swift['storage_url'], "test_container")
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
headers = {'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
|
||||
response, content = http.request(path, 'PUT', headers=headers)
|
||||
self.assertEqual(response.status, 201)
|
||||
test_001_create_container.tags = ['swift']
|
||||
|
||||
def test_002_list_containers(self):
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
headers = {'X-Auth-Token': '%s' % (self.swift['x-auth-token'])}
|
||||
response, content = http.request(self.swift['storage_url'], 'GET',
|
||||
headers=headers)
|
||||
self.assertEqual(response.status, 200)
|
||||
self.assertLessEqual('1', response['x-account-container-count'])
|
||||
test_002_list_containers.tags = ['swift']
|
||||
|
||||
def test_010_create_small_object(self):
|
||||
md5 = self._md5sum_file(SMALL_OBJ)
|
||||
path = "%s/%s/%s" % (self.swift['storage_url'],
|
||||
"test_container",
|
||||
"swift_small")
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
|
||||
self.swift['username']),
|
||||
'X-Storage-Token': '%s' % (self.swift['x-storage-token']),
|
||||
'ETag': '%s' % (md5),
|
||||
'Content-Length': '%d' % os.path.getsize(SMALL_OBJ),
|
||||
'Content-Type': 'application/octet-stream'}
|
||||
upload = open(SMALL_OBJ, "rb")
|
||||
response, content = http.request(path, 'PUT',
|
||||
headers=headers,
|
||||
body=upload)
|
||||
self.assertEqual(response.status, 201)
|
||||
test_010_create_small_object.tags = ['swift']
|
||||
|
||||
def test_011_create_medium_object(self):
|
||||
md5 = self._md5sum_file(MED_OBJ)
|
||||
path = "%s/%s/%s" % (self.swift['storage_url'],
|
||||
"test_container",
|
||||
"swift_medium")
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
|
||||
self.swift['username']),
|
||||
'X-Storage-Token': '%s' % (self.swift['x-storage-token']),
|
||||
'ETag': '%s' % (md5),
|
||||
'Content-Length': '%d' % (os.path.getsize(MED_OBJ)),
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'Content-Encoding': 'gzip'}
|
||||
upload = ""
|
||||
for chunk in self._read_in_chunks(MED_OBJ):
|
||||
upload += chunk
|
||||
response, content = http.request(path, 'PUT',
|
||||
headers=headers,
|
||||
body=upload)
|
||||
self.assertEqual(response.status, 201)
|
||||
test_011_create_medium_object.tags = ['swift']
|
||||
|
||||
def test_013_get_small_object(self):
|
||||
path = "%s/%s/%s" % (self.swift['storage_url'],
|
||||
"test_container",
|
||||
"swift_small")
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
|
||||
self.swift['username']),
|
||||
'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
|
||||
response, content = http.request(path, 'GET',
|
||||
headers=headers)
|
||||
self.assertEqual(response.status, 200)
|
||||
self.assertEqual(response['etag'], self._md5sum_file(SMALL_OBJ))
|
||||
test_013_get_small_object.tags = ['swift']
|
||||
|
||||
def test_017_delete_small_object(self):
|
||||
path = "%s/%s/%s" % (self.swift['storage_url'], "test_container",
|
||||
"swift_small")
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
|
||||
self.swift['username']),
|
||||
'X-Storage-Token': '%s' % (
|
||||
self.swift['x-storage-token'])}
|
||||
response, content = http.request(path, 'DELETE', headers=headers)
|
||||
self.assertEqual(response.status, 204)
|
||||
test_017_delete_small_object.tags = ['swift']
|
||||
|
||||
def test_018_delete_medium_object(self):
|
||||
path = "%s/%s/%s" % (self.swift['storage_url'], "test_container",
|
||||
"swift_medium")
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
|
||||
self.swift['username']),
|
||||
'X-Storage-Token': '%s' % (
|
||||
self.swift['x-storage-token'])}
|
||||
response, content = http.request(path, 'DELETE', headers=headers)
|
||||
self.assertEqual(response.status, 204)
|
||||
test_018_delete_medium_object.tags = ['swift']
|
||||
|
||||
def test_030_check_container_metadata(self):
|
||||
path = "%s/%s" % (self.swift['storage_url'], "test_container")
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
|
||||
self.swift['username']),
|
||||
'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
|
||||
response, content = http.request(path, 'HEAD', headers=headers)
|
||||
self.assertEqual(response.status, 204)
|
||||
test_030_check_container_metadata.tags = ['swift']
|
||||
|
||||
def test_050_delete_container(self):
|
||||
path = "%s/%s" % (self.swift['storage_url'], "test_container")
|
||||
http = httplib2.Http(disable_ssl_certificate_validation=True)
|
||||
headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
|
||||
self.swift['username']),
|
||||
'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
|
||||
response, content = http.request(path, 'DELETE', headers=headers)
|
||||
self.assertEqual(response.status, 204)
|
||||
test_050_delete_container.tags = ['swift']
|
@ -1,91 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack, LLC
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Functional test case that utilizes cURL against the API server"""
|
||||
|
||||
import httplib2
|
||||
|
||||
from pprint import pprint
|
||||
|
||||
from kong import tests
|
||||
|
||||
|
||||
class TestCleanUp(tests.FunctionalTest):
|
||||
def test_995_delete_server(self):
|
||||
path = "http://%s:%s/%s/servers/%s" % (self.nova['host'],
|
||||
self.nova['port'],
|
||||
self.nova['ver'],
|
||||
self.nova['single_server_id'])
|
||||
http = httplib2.Http()
|
||||
headers = {'X-Auth-User': '%s' % (self.nova['user']),
|
||||
'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
|
||||
response, content = http.request(path, 'DELETE', headers=headers)
|
||||
self.assertEqual(response.status, 202)
|
||||
test_995_delete_server.tags = ['nova']
|
||||
|
||||
def test_996_delete_multi_server(self):
|
||||
print "Deleting %s instances." % (len(self.nova['multi_server']))
|
||||
for k, v in self.nova['multi_server'].iteritems():
|
||||
path = "http://%s:%s/%s/servers/%s" % (self.nova['host'],
|
||||
self.nova['port'],
|
||||
self.nova['ver'],
|
||||
v)
|
||||
http = httplib2.Http()
|
||||
headers = {'X-Auth-User': '%s' % (self.nova['user']),
|
||||
'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
|
||||
response, content = http.request(path, 'DELETE', headers=headers)
|
||||
self.assertEqual(204, response.status)
|
||||
test_996_delete_multi_server.tags = ['nova']
|
||||
|
||||
def test_997_delete_kernel_from_glance(self):
|
||||
if 'apiver' in self.glance:
|
||||
path = "http://%s:%s/%s/images/%s" % (self.glance['host'],
|
||||
self.glance['port'], self.glance['apiver'],
|
||||
self.glance['kernel_id'])
|
||||
else:
|
||||
path = "http://%s:%s/images/%s" % (self.glance['host'],
|
||||
self.glance['port'], self.glance['kernel_id'])
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(path, 'DELETE')
|
||||
self.assertEqual(200, response.status)
|
||||
test_997_delete_kernel_from_glance.tags = ['glance', 'nova']
|
||||
|
||||
def test_998_delete_initrd_from_glance(self):
|
||||
if 'apiver' in self.glance:
|
||||
path = "http://%s:%s/%s/images/%s" % (self.glance['host'],
|
||||
self.glance['port'], self.glance['apiver'],
|
||||
self.glance['ramdisk_id'])
|
||||
else:
|
||||
path = "http://%s:%s/images/%s" % (self.glance['host'],
|
||||
self.glance['port'], self.glance['ramdisk_id'])
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(path, 'DELETE')
|
||||
self.assertEqual(200, response.status)
|
||||
test_998_delete_initrd_from_glance.tags = ['glance', 'nova']
|
||||
|
||||
def test_999_delete_image_from_glance(self):
|
||||
if 'apiver' in self.glance:
|
||||
path = "http://%s:%s/%s/images/%s" % (self.glance['host'],
|
||||
self.glance['port'], self.glance['apiver'],
|
||||
self.glance['image_id'])
|
||||
else:
|
||||
path = "http://%s:%s/images/%s" % (self.glance['host'],
|
||||
self.glance['port'], self.glance['image_id'])
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(path, 'DELETE')
|
||||
self.assertEqual(200, response.status)
|
||||
test_999_delete_image_from_glance.tags = ['glance', 'nova']
|
@ -1,146 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010-2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ConfigParser
|
||||
from hashlib import md5
|
||||
import nose.plugins.skip
|
||||
import os
|
||||
import unittest2
|
||||
|
||||
NOVA_DATA = {}
|
||||
GLANCE_DATA = {}
|
||||
SWIFT_DATA = {}
|
||||
RABBITMQ_DATA = {}
|
||||
CONFIG_DATA = {}
|
||||
KEYSTONE_DATA = {}
|
||||
|
||||
|
||||
class skip_test(object):
|
||||
"""Decorator that skips a test."""
|
||||
def __init__(self, msg):
|
||||
self.message = msg
|
||||
|
||||
def __call__(self, func):
|
||||
def _skipper(*args, **kw):
|
||||
"""Wrapped skipper function."""
|
||||
raise nose.SkipTest(self.message)
|
||||
_skipper.__name__ = func.__name__
|
||||
_skipper.__doc__ = func.__doc__
|
||||
return _skipper
|
||||
|
||||
|
||||
class skip_if(object):
|
||||
"""Decorator that skips a test."""
|
||||
def __init__(self, condition, msg):
|
||||
self.condition = condition
|
||||
self.message = msg
|
||||
|
||||
def __call__(self, func):
|
||||
def _skipper(*args, **kw):
|
||||
"""Wrapped skipper function."""
|
||||
if self.condition:
|
||||
raise nose.SkipTest(self.message)
|
||||
func(*args, **kw)
|
||||
_skipper.__name__ = func.__name__
|
||||
_skipper.__doc__ = func.__doc__
|
||||
return _skipper
|
||||
|
||||
|
||||
class skip_unless(object):
|
||||
"""Decorator that skips a test."""
|
||||
def __init__(self, condition, msg):
|
||||
self.condition = condition
|
||||
self.message = msg
|
||||
|
||||
def __call__(self, func):
|
||||
def _skipper(*args, **kw):
|
||||
"""Wrapped skipper function."""
|
||||
if not self.condition:
|
||||
raise nose.SkipTest(self.message)
|
||||
func(*args, **kw)
|
||||
_skipper.__name__ = func.__name__
|
||||
_skipper.__doc__ = func.__doc__
|
||||
return _skipper
|
||||
|
||||
|
||||
class FunctionalTest(unittest2.TestCase):
|
||||
def setUp(self):
|
||||
global GLANCE_DATA, NOVA_DATA, SWIFT_DATA, RABBITMQ_DATA
|
||||
global KEYSTONE_DATA, CONFIG_DATA
|
||||
# Define config dict
|
||||
self.config = CONFIG_DATA
|
||||
# Define service specific dicts
|
||||
self.glance = GLANCE_DATA
|
||||
self.nova = NOVA_DATA
|
||||
self.swift = SWIFT_DATA
|
||||
self.rabbitmq = RABBITMQ_DATA
|
||||
self.keystone = KEYSTONE_DATA
|
||||
|
||||
self._parse_defaults_file()
|
||||
|
||||
# Swift Setup
|
||||
if 'swift' in self.config:
|
||||
self.swift = self.config['swift']
|
||||
self.swift['ver'] = 'v1.0' # need to find a better way to get this
|
||||
|
||||
# Glance Setup
|
||||
self.glance = self.config['glance']
|
||||
|
||||
if 'nova' in self.config:
|
||||
self.nova = self.config['nova']
|
||||
self.nova['ver'] = self.config['nova']['apiver']
|
||||
|
||||
if 'keystone' in self.config:
|
||||
self.keystone = self.config['keystone']
|
||||
self.keystone['pass'] = self.config['keystone']['password']
|
||||
|
||||
def _md5sum_file(self, path):
|
||||
md5sum = md5()
|
||||
with open(path, 'rb') as file:
|
||||
for chunk in iter(lambda: file.read(8192), ''):
|
||||
md5sum.update(chunk)
|
||||
return md5sum.hexdigest()
|
||||
|
||||
def _read_in_chunks(self, infile, chunk_size=1024 * 64):
|
||||
file_data = open(infile, "rb")
|
||||
while True:
|
||||
# chunk = file_data.read(chunk_size).encode('base64')
|
||||
chunk = file_data.read(chunk_size)
|
||||
if chunk:
|
||||
yield chunk
|
||||
else:
|
||||
return
|
||||
file_data.close()
|
||||
|
||||
def _parse_defaults_file(self):
|
||||
cfg = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
os.path.pardir, os.path.pardir,
|
||||
"etc", "config.ini"))
|
||||
if os.path.exists(cfg):
|
||||
self._build_config(cfg)
|
||||
else:
|
||||
raise Exception("Cannot read %s" % cfg)
|
||||
|
||||
def _build_config(self, config_file):
|
||||
parser = ConfigParser.ConfigParser()
|
||||
parser.read(config_file)
|
||||
|
||||
for section in parser.sections():
|
||||
self.config[section] = {}
|
||||
for value in parser.options(section):
|
||||
self.config[section][value] = parser.get(section, value)
|
||||
# print "%s = %s" % (value, parser.get(section, value))
|
@ -1,185 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack, LLC
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Functional test case against the OpenStack Nova API server"""
|
||||
|
||||
import httplib2
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from kong import keystone
|
||||
from kong import tests
|
||||
|
||||
|
||||
class TestKeystoneAuth(tests.FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestKeystoneAuth, self).setUp()
|
||||
|
||||
api_version = self.keystone['apiver']
|
||||
if api_version != 'v2.0':
|
||||
raise ValueError("Must use Identity API v2.0")
|
||||
|
||||
args = (self.keystone['service_host'],
|
||||
self.keystone['service_port'],
|
||||
api_version)
|
||||
|
||||
self.base_url = "http://%s:%s/%s/tokens" % args
|
||||
|
||||
self.user = self.keystone['user']
|
||||
self.password = self.keystone['password']
|
||||
self.tenant_id = self.keystone['tenant_id']
|
||||
|
||||
def test_can_get_token(self):
|
||||
headers = {'content-type': 'application/json'}
|
||||
|
||||
body = {
|
||||
"auth": {
|
||||
"passwordCredentials":{
|
||||
"username": self.user,
|
||||
"password": self.password,
|
||||
},
|
||||
"tenantId": self.tenant_id,
|
||||
},
|
||||
}
|
||||
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(self.base_url, 'POST',
|
||||
headers=headers,
|
||||
body=json.dumps(body))
|
||||
|
||||
self.assertEqual(response.status, 200)
|
||||
res_body = json.loads(content)
|
||||
self.assertTrue(res_body['access']['token']['id'])
|
||||
self.assertTrue(res_body['access']['token']['tenant']['id'],
|
||||
self.tenant_id)
|
||||
self.assertTrue(res_body['access']['user']['name'],
|
||||
self.user)
|
||||
test_can_get_token.tags = ['auth']
|
||||
|
||||
def test_bad_user(self):
|
||||
headers = {'content-type': 'application/json'}
|
||||
|
||||
body = {
|
||||
"auth": {
|
||||
"passwordCredentials": {
|
||||
"username": str(uuid.uuid4()),
|
||||
"password": self.password,
|
||||
},
|
||||
"tenantId": self.tenant_id,
|
||||
},
|
||||
}
|
||||
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(self.base_url, 'POST',
|
||||
headers=headers,
|
||||
body=json.dumps(body))
|
||||
|
||||
self.assertEqual(response.status, 401)
|
||||
test_bad_user.tags = ['auth']
|
||||
|
||||
def test_bad_password(self):
|
||||
headers = {'content-type': 'application/json'}
|
||||
|
||||
body = {
|
||||
"auth": {
|
||||
"passwordCredentials": {
|
||||
"username": self.user,
|
||||
"password": str(uuid.uuid4()),
|
||||
},
|
||||
"tenantId": self.tenant_id,
|
||||
},
|
||||
}
|
||||
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(self.base_url, 'POST',
|
||||
headers=headers,
|
||||
body=json.dumps(body))
|
||||
|
||||
self.assertEqual(response.status, 401)
|
||||
test_bad_password.tags = ['auth']
|
||||
|
||||
def test_bad_tenant_id(self):
|
||||
headers = {'content-type': 'application/json'}
|
||||
|
||||
body = {
|
||||
"auth": {
|
||||
"passwordCredentials": {
|
||||
"username": self.user,
|
||||
"password": self.password,
|
||||
},
|
||||
"tenantId": str(uuid.uuid4()),
|
||||
},
|
||||
}
|
||||
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(self.base_url, 'POST',
|
||||
headers=headers,
|
||||
body=json.dumps(body))
|
||||
|
||||
self.assertEqual(response.status, 401)
|
||||
test_bad_tenant_id.tags = ['auth']
|
||||
|
||||
|
||||
|
||||
class TestKeystoneAuthWithNova(tests.FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestKeystoneAuthWithNova, self).setUp()
|
||||
args = (self.nova['host'], self.nova['port'],
|
||||
self.nova['ver'], self.nova['project'])
|
||||
self.base_url = "http://%s:%s/%s/%s" % args
|
||||
|
||||
self.keystone_api = keystone.API(self.keystone['service_host'],
|
||||
self.keystone['service_port'])
|
||||
|
||||
def _get_token(self):
|
||||
user = self.keystone['user']
|
||||
password = self.keystone['password']
|
||||
tenant_id = self.keystone['tenant_id']
|
||||
return self.keystone_api.get_token(user, password, tenant_id)
|
||||
|
||||
def test_good_token(self):
|
||||
http = httplib2.Http()
|
||||
url = '%s/flavors' % self.base_url
|
||||
headers = {'x-auth-token': self._get_token()}
|
||||
response, content = http.request(url, 'GET', headers=headers)
|
||||
self.assertEqual(response.status, 200)
|
||||
test_good_token.tags = ['nova', 'auth']
|
||||
|
||||
def test_bad_token(self):
|
||||
http = httplib2.Http()
|
||||
url = '%s/flavors' % self.base_url
|
||||
headers = {'x-auth-token': str(uuid.uuid4())}
|
||||
response, content = http.request(url, 'GET', headers=headers)
|
||||
self.assertEqual(response.status, 401)
|
||||
test_bad_token.tags = ['nova', 'auth']
|
||||
|
||||
def test_no_token(self):
|
||||
http = httplib2.Http()
|
||||
url = '%s/flavors' % self.base_url
|
||||
headers = {'x-auth-token': str(uuid.uuid4())}
|
||||
response, content = http.request(url, 'GET', headers=headers)
|
||||
self.assertEqual(response.status, 401)
|
||||
test_no_token.tags = ['nova', 'auth']
|
||||
|
||||
def test_no_header(self):
|
||||
http = httplib2.Http()
|
||||
url = '%s/flavors' % self.base_url
|
||||
response, content = http.request(url, 'GET')
|
||||
self.assertEqual(response.status, 401)
|
||||
test_no_header.tags = ['nova', 'auth']
|
@ -1,101 +0,0 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
from kong import openstack
|
||||
from kong import tests
|
||||
|
||||
|
||||
class FlavorsTest(tests.FunctionalTest):
|
||||
def setUp(self):
|
||||
super(FlavorsTest, self).setUp()
|
||||
self.os = openstack.Manager(self.nova)
|
||||
|
||||
def _index_flavors(self):
|
||||
url = '/flavors'
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEqual(response['status'], '200')
|
||||
body_dict = json.loads(body)
|
||||
self.assertEqual(body_dict.keys(), ['flavors'])
|
||||
return body_dict['flavors']
|
||||
|
||||
def _show_flavor(self, flavor_id):
|
||||
url = '/flavors/%s' % flavor_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEqual(response['status'], '200')
|
||||
body_dict = json.loads(body)
|
||||
self.assertEqual(body_dict.keys(), ['flavor'])
|
||||
return body_dict['flavor']
|
||||
|
||||
def _assert_flavor_entity_basic(self, flavor):
|
||||
actual_keys = set(flavor.keys())
|
||||
expected_keys = set(('id', 'name', 'links'))
|
||||
self.assertEqual(actual_keys, expected_keys)
|
||||
self._assert_flavor_links(flavor)
|
||||
|
||||
def _assert_flavor_entity_detailed(self, flavor):
|
||||
actual_keys = set(flavor.keys())
|
||||
expected_keys = set(('id', 'name', 'ram', 'disk', 'vcpus', 'links'))
|
||||
#NOTE(bcwaldon): We cannot expect an explicit list of keys since
|
||||
# any extension can a new attribute to a flavor entity. However, we
|
||||
# can expect a minimum set of keys defined in the spec.
|
||||
self.assertTrue(actual_keys >= expected_keys)
|
||||
self.assertEqual(type(flavor['ram']), int)
|
||||
self.assertEqual(type(flavor['disk']), int)
|
||||
self._assert_flavor_links(flavor)
|
||||
|
||||
def _assert_flavor_links(self, flavor):
|
||||
actual_links = flavor['links']
|
||||
|
||||
flavor_id = str(flavor['id'])
|
||||
mgmt_url = self.os.nova.management_url
|
||||
bmk_url = re.sub(r'v1.1\/', r'', mgmt_url)
|
||||
|
||||
self_link = os.path.join(mgmt_url, 'flavors', flavor_id)
|
||||
bookmark_link = os.path.join(bmk_url, 'flavors', flavor_id)
|
||||
|
||||
expected_links = [
|
||||
{
|
||||
'rel': 'self',
|
||||
'href': self_link,
|
||||
},
|
||||
{
|
||||
'rel': 'bookmark',
|
||||
'href': bookmark_link,
|
||||
},
|
||||
]
|
||||
|
||||
self.assertEqual(actual_links, expected_links)
|
||||
|
||||
def test_show_flavor(self):
|
||||
"""Retrieve a single flavor"""
|
||||
|
||||
flavors = self._index_flavors()
|
||||
|
||||
for flavor in flavors:
|
||||
detailed_flavor = self._show_flavor(flavor['id'])
|
||||
self._assert_flavor_entity_detailed(detailed_flavor)
|
||||
test_show_flavor.tags = ['nova', 'glance']
|
||||
|
||||
def test_index_flavors_basic(self):
|
||||
"""List all flavors"""
|
||||
|
||||
flavors = self._index_flavors()
|
||||
|
||||
for flavor in flavors:
|
||||
self._assert_flavor_entity_basic(flavor)
|
||||
test_index_flavors_basic.tags = ['nova', 'glance']
|
||||
|
||||
def test_index_flavors_detailed(self):
|
||||
"""List all flavors in detail"""
|
||||
|
||||
url = '/flavors/detail'
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEqual(response['status'], '200')
|
||||
body_dict = json.loads(body)
|
||||
self.assertEqual(body_dict.keys(), ['flavors'])
|
||||
flavors = body_dict['flavors']
|
||||
|
||||
for flavor in flavors:
|
||||
self._assert_flavor_entity_detailed(flavor)
|
||||
test_index_flavors_detailed.tags = ['nova', 'glance']
|
@ -1,206 +0,0 @@
|
||||
import httplib2
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
from kong import openstack
|
||||
from kong import tests
|
||||
|
||||
|
||||
class TestImagesThroughCompute(tests.FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestImagesThroughCompute, self).setUp()
|
||||
self.os = openstack.Manager(self.nova)
|
||||
|
||||
def _assert_image_links(self, image):
|
||||
image_id = str(image['id'])
|
||||
|
||||
mgmt_url = self.os.nova.management_url
|
||||
bmk_url = re.sub(r'v1.1\/', r'', mgmt_url)
|
||||
self_link = {'rel': 'self',
|
||||
'href': os.path.join(mgmt_url, 'images', image_id)}
|
||||
bookmark_link = {'rel': 'bookmark',
|
||||
'href': os.path.join(bmk_url, 'images', image_id)}
|
||||
|
||||
self.assertIn(bookmark_link, image['links'])
|
||||
self.assertIn(self_link, image['links'])
|
||||
|
||||
def _assert_image_entity_basic(self, image):
|
||||
actual_keys = set(image.keys())
|
||||
expected_keys = set((
|
||||
'id',
|
||||
'name',
|
||||
'links',
|
||||
))
|
||||
self.assertEqual(actual_keys, expected_keys)
|
||||
|
||||
self._assert_image_links(image)
|
||||
|
||||
def _assert_image_entity_detailed(self, image):
|
||||
keys = image.keys()
|
||||
if 'server' in keys:
|
||||
keys.remove('server')
|
||||
actual_keys = set(keys)
|
||||
expected_keys = set((
|
||||
'created',
|
||||
'id',
|
||||
'links',
|
||||
'metadata',
|
||||
'minDisk',
|
||||
'minRam',
|
||||
'name',
|
||||
'progress',
|
||||
'status',
|
||||
'updated',
|
||||
))
|
||||
self.assertEqual(actual_keys, expected_keys)
|
||||
|
||||
self._assert_image_links(image)
|
||||
|
||||
def test_index(self):
|
||||
"""List all images"""
|
||||
|
||||
response, body = self.os.nova.request('GET', '/images')
|
||||
|
||||
self.assertEqual(response['status'], '200')
|
||||
resp_body = json.loads(body)
|
||||
self.assertEqual(resp_body.keys(), ['images'])
|
||||
|
||||
for image in resp_body['images']:
|
||||
self._assert_image_entity_basic(image)
|
||||
test_index.tags = ['nova', 'glance']
|
||||
|
||||
def test_detail(self):
|
||||
"""List all images in detail"""
|
||||
|
||||
response, body = self.os.nova.request('GET', '/images/detail')
|
||||
self.assertEqual(response['status'], '200')
|
||||
resp_body = json.loads(body)
|
||||
self.assertEqual(resp_body.keys(), ['images'])
|
||||
|
||||
for image in resp_body['images']:
|
||||
self._assert_image_entity_detailed(image)
|
||||
test_detail.tags = ['nova', 'glance']
|
||||
|
||||
|
||||
class TestGlanceAPI(tests.FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestGlanceAPI, self).setUp()
|
||||
self.base_url = "http://%s:%s/%s/images" % (self.glance['host'],
|
||||
self.glance['port'],
|
||||
self.glance['apiver'])
|
||||
|
||||
def test_upload_ami_style_image(self):
|
||||
"""Uploads a three-part ami-style image"""
|
||||
aki_location = self.config['environment']['aki_location']
|
||||
headers = {'x-image-meta-is-public': 'true',
|
||||
'x-image-meta-name': 'test-kernel',
|
||||
'x-image-meta-disk-format': 'aki',
|
||||
'x-image-meta-container-format': 'aki',
|
||||
'Content-Length': '%d' % os.path.getsize(aki_location),
|
||||
'Content-Type': 'application/octet-stream'}
|
||||
image_file = open(aki_location, "rb")
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(self.base_url, 'POST',
|
||||
headers=headers,body=image_file)
|
||||
image_file.close()
|
||||
self.assertEqual(201, response.status)
|
||||
data = json.loads(content)
|
||||
self.assertEqual(data['image']['name'], "test-kernel")
|
||||
self.assertEqual(data['image']['checksum'],
|
||||
self._md5sum_file(aki_location))
|
||||
kernel_id = data['image']['id']
|
||||
|
||||
ari_location = self.config['environment'].get('ari_location')
|
||||
if ari_location:
|
||||
headers = {'x-image-meta-is-public': 'true',
|
||||
'x-image-meta-name': 'test-ramdisk',
|
||||
'x-image-meta-disk-format': 'ari',
|
||||
'x-image-meta-container-format': 'ari',
|
||||
'Content-Length': '%d' % os.path.getsize(ari_location),
|
||||
'Content-Type': 'application/octet-stream'}
|
||||
image_file = open(ari_location, "rb")
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(self.base_url, 'POST',
|
||||
headers=headers, body=image_file)
|
||||
image_file.close()
|
||||
self.assertEqual(201, response.status)
|
||||
data = json.loads(content)
|
||||
self.assertEqual(data['image']['name'], "test-ramdisk")
|
||||
self.assertEqual(data['image']['checksum'],
|
||||
self._md5sum_file(ari_location))
|
||||
ramdisk_id = data['image']['id']
|
||||
else:
|
||||
ramdisk_id = None
|
||||
|
||||
ami_location = self.config['environment']['ami_location']
|
||||
upload_data = ""
|
||||
for chunk in self._read_in_chunks(ami_location):
|
||||
upload_data += chunk
|
||||
headers = {'x-image-meta-is-public': 'true',
|
||||
'x-image-meta-name': 'test-image',
|
||||
'x-image-meta-disk-format': 'ami',
|
||||
'x-image-meta-container-format': 'ami',
|
||||
'x-image-meta-property-kernel_id': str(kernel_id),
|
||||
'Content-Length': '%d' % os.path.getsize(ami_location),
|
||||
'Content-Type': 'application/octet-stream'}
|
||||
|
||||
if ari_location:
|
||||
headers['x-image-meta-property-ramdisk_id'] = str(ramdisk_id)
|
||||
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(self.base_url, 'POST',
|
||||
headers=headers, body=upload_data)
|
||||
self.assertEqual(201, response.status)
|
||||
data = json.loads(content)
|
||||
self.assertEqual(data['image']['name'], "test-image")
|
||||
self.assertEqual(data['image']['checksum'],
|
||||
self._md5sum_file(ami_location))
|
||||
machine_id = data['image']['id']
|
||||
|
||||
# now ensure we can modify the image properties
|
||||
headers = {'X-Image-Meta-Property-distro': 'Ubuntu',
|
||||
'X-Image-Meta-Property-arch': 'x86_64',
|
||||
'X-Image-Meta-Property-kernel_id': str(kernel_id)}
|
||||
if ari_location:
|
||||
headers['X-Image-Meta-Property-ramdisk_id'] = str(ramdisk_id)
|
||||
|
||||
http = httplib2.Http()
|
||||
url = '%s/%s' % (self.base_url, machine_id)
|
||||
response, content = http.request(url, 'PUT', headers=headers)
|
||||
self.assertEqual(response.status, 200)
|
||||
data = json.loads(content)
|
||||
properties = data['image']['properties']
|
||||
self.assertEqual(properties['arch'], "x86_64")
|
||||
self.assertEqual(properties['distro'], "Ubuntu")
|
||||
self.assertEqual(properties['kernel_id'], str(kernel_id))
|
||||
if ari_location:
|
||||
self.assertEqual(properties['ramdisk_id'], str(ramdisk_id))
|
||||
|
||||
# list the metadata to ensure the new values stuck
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(url, 'HEAD')
|
||||
self.assertEqual(response.status, 200)
|
||||
self.assertEqual(response['x-image-meta-name'], "test-image")
|
||||
self.assertEqual(response['x-image-meta-checksum'],
|
||||
self._md5sum_file(ami_location))
|
||||
self.assertEqual(response['x-image-meta-container_format'], "ami")
|
||||
self.assertEqual(response['x-image-meta-disk_format'], "ami")
|
||||
self.assertEqual(response['x-image-meta-property-arch'], "x86_64")
|
||||
self.assertEqual(response['x-image-meta-property-distro'], "Ubuntu")
|
||||
self.assertEqual(response['x-image-meta-property-kernel_id'],
|
||||
str(kernel_id))
|
||||
if ari_location:
|
||||
self.assertEqual(response['x-image-meta-property-ramdisk_id'],
|
||||
str(ramdisk_id))
|
||||
|
||||
# delete images for which we have non-None ids
|
||||
delete_ids = filter(lambda x: x, (kernel_id, ramdisk_id, machine_id))
|
||||
for image_id in delete_ids:
|
||||
http = httplib2.Http()
|
||||
url = '%s/%s' % (self.base_url, image_id)
|
||||
response, content = http.request(url, 'DELETE')
|
||||
|
||||
test_upload_ami_style_image.tags = ['glance']
|
@ -1,385 +0,0 @@
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
from kong import exceptions
|
||||
from kong import openstack
|
||||
from kong import tests
|
||||
from kong.common import ssh
|
||||
|
||||
import unittest2 as unittest
|
||||
|
||||
|
||||
class ServerActionsTest(tests.FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(ServerActionsTest, self).setUp()
|
||||
self.os = openstack.Manager(self.nova)
|
||||
|
||||
self.multi_node = self.nova['multi_node'] == 'yes'
|
||||
self.image_ref = self.glance['image_id']
|
||||
self.image_ref_alt = self.glance['image_id_alt']
|
||||
self.flavor_ref = self.nova['flavor_ref']
|
||||
self.flavor_ref_alt = self.nova['flavor_ref_alt']
|
||||
self.ssh_timeout = int(self.nova['ssh_timeout'])
|
||||
self.build_timeout = int(self.nova['build_timeout'])
|
||||
|
||||
self.server_password = 'testpwd'
|
||||
self.server_name = 'stacktester1'
|
||||
|
||||
expected_server = {
|
||||
'name': self.server_name,
|
||||
'imageRef': self.image_ref,
|
||||
'flavorRef': self.flavor_ref,
|
||||
'adminPass': self.server_password,
|
||||
}
|
||||
|
||||
created_server = self.os.nova.create_server(expected_server)
|
||||
|
||||
self.server_id = created_server['id']
|
||||
self._wait_for_server_status(self.server_id, 'ACTIVE')
|
||||
|
||||
server = self.os.nova.get_server(self.server_id)
|
||||
self.access_ip = server['addresses']['public'][0]['addr']
|
||||
|
||||
# Ensure server came up
|
||||
if self.ssh_timeout:
|
||||
self._assert_ssh_password()
|
||||
|
||||
def tearDown(self):
|
||||
self.os.nova.delete_server(self.server_id)
|
||||
|
||||
def _get_ssh_client(self, password):
|
||||
return ssh.Client(self.access_ip, 'root', password, self.ssh_timeout)
|
||||
|
||||
def _assert_ssh_password(self, password=None):
|
||||
_password = password or self.server_password
|
||||
client = self._get_ssh_client(_password)
|
||||
self.assertTrue(client.test_connection_auth())
|
||||
|
||||
def _wait_for_server_status(self, server_id, status):
|
||||
try:
|
||||
self.os.nova.wait_for_server_status(server_id, status,
|
||||
timeout=self.build_timeout)
|
||||
except exceptions.TimeoutException:
|
||||
self.fail("Server failed to change status to %s" % status)
|
||||
|
||||
def _get_boot_time(self):
|
||||
"""Return the time the server was started"""
|
||||
output = self._read_file("/proc/uptime")
|
||||
uptime = float(output.split().pop(0))
|
||||
return time.time() - uptime
|
||||
|
||||
def _write_file(self, filename, contents, password=None):
|
||||
command = "echo -n %s > %s" % (contents, filename)
|
||||
return self._exec_command(command, password)
|
||||
|
||||
def _read_file(self, filename, password=None):
|
||||
command = "cat %s" % filename
|
||||
return self._exec_command(command, password)
|
||||
|
||||
def _exec_command(self, command, password=None):
|
||||
if password is None:
|
||||
password = self.server_password
|
||||
client = self._get_ssh_client(password)
|
||||
return client.exec_command(command)
|
||||
|
||||
def test_reboot_server(self):
|
||||
"""Reboot a server SOFT and HARD"""
|
||||
# SSH and get the uptime
|
||||
if self.ssh_timeout:
|
||||
initial_time_started = self._get_boot_time()
|
||||
else:
|
||||
intitial_time_started = 0
|
||||
|
||||
# Make reboot request
|
||||
post_body = json.dumps({'reboot': {'type': 'SOFT'}})
|
||||
url = "/servers/%s/action" % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
self.assertEqual(response['status'], '202')
|
||||
|
||||
# Assert status transition
|
||||
self._wait_for_server_status(self.server_id, 'REBOOT')
|
||||
self._wait_for_server_status(self.server_id, 'ACTIVE')
|
||||
|
||||
# SSH and verify uptime is less than before
|
||||
if self.ssh_timeout:
|
||||
post_reboot_time_started = self._get_boot_time()
|
||||
self.assertTrue(initial_time_started < post_reboot_time_started)
|
||||
|
||||
# SSH and get the uptime for the next reboot
|
||||
initial_time_started = post_reboot_time_started
|
||||
|
||||
# Make reboot request
|
||||
post_body = json.dumps({'reboot': {'type': 'HARD'}})
|
||||
url = "/servers/%s/action" % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
self.assertEqual(response['status'], '202')
|
||||
|
||||
# Assert status transition
|
||||
# KNOWN-ISSUE 884906
|
||||
#self._wait_for_server_status(self.server_id, 'HARD_REBOOT')
|
||||
self._wait_for_server_status(self.server_id, 'REBOOT')
|
||||
self._wait_for_server_status(self.server_id, 'ACTIVE')
|
||||
|
||||
# SSH and verify uptime is less than before
|
||||
if self.ssh_timeout:
|
||||
post_reboot_time_started = self._get_boot_time()
|
||||
self.assertTrue(initial_time_started < post_reboot_time_started)
|
||||
test_reboot_server.tags = ['nova']
|
||||
|
||||
def test_change_password(self):
|
||||
"""Change root password of a server"""
|
||||
# Change server password
|
||||
post_body = json.dumps({'changePassword': {'adminPass': 'test123'}})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
|
||||
# Assert status transition
|
||||
self.assertEqual('202', response['status'])
|
||||
self._wait_for_server_status(self.server_id, 'PASSWORD')
|
||||
self._wait_for_server_status(self.server_id, 'ACTIVE')
|
||||
|
||||
# SSH into server using new password
|
||||
if self.ssh_timeout:
|
||||
self._assert_ssh_password('test123')
|
||||
test_change_password.tags = ['nova']
|
||||
|
||||
def test_rebuild(self):
|
||||
"""Rebuild a server"""
|
||||
|
||||
FILENAME = '/tmp/testfile'
|
||||
CONTENTS = 'WORDS'
|
||||
|
||||
# write file to server
|
||||
if self.ssh_timeout:
|
||||
self._write_file(FILENAME, CONTENTS)
|
||||
self.assertEqual(self._read_file(FILENAME), CONTENTS)
|
||||
|
||||
# Make rebuild request
|
||||
post_body = json.dumps({'rebuild': {'imageRef': self.image_ref_alt}})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
|
||||
# check output
|
||||
self.assertEqual('202', response['status'])
|
||||
rebuilt_server = json.loads(body)['server']
|
||||
generated_password = rebuilt_server['adminPass']
|
||||
|
||||
# Ensure correct status transition
|
||||
self._wait_for_server_status(self.server_id, 'REBUILD')
|
||||
self._wait_for_server_status(self.server_id, 'ACTIVE')
|
||||
|
||||
# Treats an issue where we ssh'd in too soon after rebuild
|
||||
#TODO(bcwaldon): fix the Xen driver so we don't have to sleep here
|
||||
time.sleep(30)
|
||||
|
||||
# Check that the instance's imageRef matches the new imageRef
|
||||
server = self.os.nova.get_server(self.server_id)
|
||||
ref_match = self.image_ref_alt == server['image']['links'][0]['href']
|
||||
id_match = self.image_ref_alt == server['image']['id']
|
||||
self.assertTrue(ref_match or id_match)
|
||||
|
||||
# SSH into the server to ensure it came back up
|
||||
if self.ssh_timeout:
|
||||
self._assert_ssh_password(generated_password)
|
||||
|
||||
# make sure file is gone
|
||||
self.assertEqual(self._read_file(FILENAME, generated_password), '')
|
||||
|
||||
# test again with a specified password
|
||||
self._write_file(FILENAME, CONTENTS, generated_password)
|
||||
_contents = self._read_file(FILENAME, generated_password)
|
||||
self.assertEqual(_contents, CONTENTS)
|
||||
|
||||
specified_password = 'some_password'
|
||||
|
||||
# Make rebuild request
|
||||
post_body = json.dumps({
|
||||
'rebuild': {
|
||||
'imageRef': self.image_ref,
|
||||
'adminPass': specified_password,
|
||||
}
|
||||
})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
|
||||
# check output
|
||||
self.assertEqual('202', response['status'])
|
||||
rebuilt_server = json.loads(body)['server']
|
||||
self.assertEqual(rebuilt_server['adminPass'], specified_password)
|
||||
|
||||
# Ensure correct status transition
|
||||
self._wait_for_server_status(self.server_id, 'REBUILD')
|
||||
self._wait_for_server_status(self.server_id, 'ACTIVE')
|
||||
|
||||
# Treats an issue where we ssh'd in too soon after rebuild
|
||||
time.sleep(30)
|
||||
|
||||
# Check that the instance's imageRef matches the new imageRef
|
||||
server = self.os.nova.get_server(self.server_id)
|
||||
ref_match = self.image_ref == server['image']['links'][0]['href']
|
||||
id_match = self.image_ref == server['image']['id']
|
||||
self.assertTrue(ref_match or id_match)
|
||||
|
||||
# make sure file is gone
|
||||
if self.ssh_timeout:
|
||||
self.assertEqual(self._read_file(FILENAME, specified_password), '')
|
||||
test_rebuild.tags = ['nova']
|
||||
|
||||
def test_resize(self):
|
||||
"""Resize a server"""
|
||||
|
||||
# Make resize request
|
||||
post_body = json.dumps({'resize': {'flavorRef': self.flavor_ref_alt}})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
|
||||
# Wait for status transition
|
||||
self.assertEqual('202', response['status'])
|
||||
self._wait_for_server_status(self.server_id, 'VERIFY_RESIZE')
|
||||
|
||||
# Ensure API reports new flavor
|
||||
server = self.os.nova.get_server(self.server_id)
|
||||
self.assertEqual(self.flavor_ref_alt, server['flavor']['id'])
|
||||
|
||||
#SSH into the server to ensure it came back up
|
||||
if self.ssh_timeout:
|
||||
self._assert_ssh_password()
|
||||
|
||||
# Make confirmResize request
|
||||
post_body = json.dumps({'confirmResize': 'null'})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
|
||||
# Wait for status transition
|
||||
self.assertEqual('204', response['status'])
|
||||
self._wait_for_server_status(self.server_id, 'ACTIVE')
|
||||
|
||||
# Ensure API still reports new flavor
|
||||
server = self.os.nova.get_server(self.server_id)
|
||||
self.assertEqual(self.flavor_ref_alt, server['flavor']['id'])
|
||||
test_resize.tags = ['nova']
|
||||
|
||||
def test_resize_revert(self):
|
||||
"""Resize a server, then revert"""
|
||||
# Make resize request
|
||||
post_body = json.dumps({'resize': {'flavorRef': self.flavor_ref_alt}})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
|
||||
# Wait for status transition
|
||||
self.assertEqual('202', response['status'])
|
||||
self._wait_for_server_status(self.server_id, 'VERIFY_RESIZE')
|
||||
|
||||
# SSH into the server to ensure it came back up
|
||||
if self.ssh_timeout:
|
||||
self._assert_ssh_password()
|
||||
|
||||
# Ensure API reports new flavor
|
||||
server = self.os.nova.get_server(self.server_id)
|
||||
self.assertEqual(self.flavor_ref_alt, server['flavor']['id'])
|
||||
|
||||
# Make revertResize request
|
||||
post_body = json.dumps({'revertResize': 'null'})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
|
||||
# Assert status transition
|
||||
self.assertEqual('202', response['status'])
|
||||
self._wait_for_server_status(self.server_id, 'ACTIVE')
|
||||
|
||||
# Ensure flavor ref was reverted to original
|
||||
server = self.os.nova.get_server(self.server_id)
|
||||
self.assertEqual(self.flavor_ref, server['flavor']['id'])
|
||||
test_resize_revert.tags = ['nova']
|
||||
|
||||
|
||||
class SnapshotTests(tests.FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(SnapshotTests, self).setUp()
|
||||
self.os = openstack.Manager(self.nova)
|
||||
|
||||
self.image_ref = self.glance['image_id']
|
||||
self.flavor_ref = self.nova['flavor_ref']
|
||||
self.ssh_timeout = int(self.nova['ssh_timeout'])
|
||||
self.build_timeout = int(self.nova['build_timeout'])
|
||||
|
||||
self.server_name = 'stacktester1'
|
||||
|
||||
expected_server = {
|
||||
'name': self.server_name,
|
||||
'imageRef': self.image_ref,
|
||||
'flavorRef': self.flavor_ref,
|
||||
}
|
||||
|
||||
created_server = self.os.nova.create_server(expected_server)
|
||||
self.server_id = created_server['id']
|
||||
|
||||
def tearDown(self):
|
||||
self.os.nova.delete_server(self.server_id)
|
||||
|
||||
def _wait_for_server_status(self, server_id, status):
|
||||
try:
|
||||
self.os.nova.wait_for_server_status(server_id, status,
|
||||
timeout=self.build_timeout)
|
||||
except exceptions.TimeoutException:
|
||||
self.fail("Server failed to change status to %s" % status)
|
||||
|
||||
def test_snapshot(self):
|
||||
"""Create image from an existing server"""
|
||||
|
||||
# Wait for server to come up before running this test
|
||||
self._wait_for_server_status(self.server_id, 'ACTIVE')
|
||||
|
||||
# Create snapshot
|
||||
image_data = {'name': 'testserver_snapshot'}
|
||||
req_body = json.dumps({'createImage': image_data})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=req_body)
|
||||
self.assertEqual(response['status'], '202')
|
||||
image_ref = response['location']
|
||||
snapshot_id = image_ref.rsplit('/', 1)[1]
|
||||
|
||||
# Get snapshot and check its attributes
|
||||
resp, body = self.os.nova.request('GET', '/images/%s' % snapshot_id)
|
||||
snapshot = json.loads(body)['image']
|
||||
self.assertEqual(snapshot['name'], image_data['name'])
|
||||
server_ref = snapshot['server']['links'][0]['href']
|
||||
self.assertTrue(server_ref.endswith('/%s' % self.server_id))
|
||||
|
||||
# Wait for first snapshot to start saving before making second snapshot
|
||||
self.os.nova.wait_for_image_status(snapshot['id'], 'SAVING')
|
||||
|
||||
# Create second snapshot
|
||||
image_data = {'name': 'testserver_snapshot2'}
|
||||
req_body = json.dumps({'createImage': image_data})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=req_body)
|
||||
self.assertEqual(response['status'], '409')
|
||||
|
||||
# Ensure both snapshots succeed
|
||||
self.os.nova.wait_for_image_status(snapshot['id'], 'ACTIVE')
|
||||
|
||||
# Cleaning up
|
||||
self.os.nova.request('DELETE', '/images/%s' % snapshot_id)
|
||||
test_snapshot.tags = ['nova']
|
||||
|
||||
def test_snapshot_while_building(self):
|
||||
"""Ensure inability to snapshot server in BUILD state"""
|
||||
|
||||
# Ensure server is in BUILD state
|
||||
url = '/servers/%s' % self.server_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
server = json.loads(body)['server']
|
||||
self.assertEqual(server['status'], 'BUILD', 'Server built too quickly')
|
||||
|
||||
# Create snapshot
|
||||
req_body = json.dumps({'createImage': {'name': 'testserver_snapshot'}})
|
||||
url = '/servers/%s/action' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=req_body)
|
||||
|
||||
# KNOWN-ISSUE 885232
|
||||
self.assertEqual(response['status'], '409')
|
||||
test_snapshot_while_building.tags = ['nova']
|
@ -1,501 +0,0 @@
|
||||
|
||||
import base64
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
|
||||
from kong import openstack
|
||||
from kong import exceptions
|
||||
from kong import tests
|
||||
from kong.common import ssh
|
||||
from kong.common import utils
|
||||
|
||||
|
||||
class ServersTest(tests.FunctionalTest):
|
||||
def setUp(self):
|
||||
super(ServersTest, self).setUp()
|
||||
self.os = openstack.Manager(self.nova)
|
||||
self.image_ref = self.glance['image_id']
|
||||
self.flavor_ref = self.nova['flavor_ref']
|
||||
self.ssh_timeout = self.nova['ssh_timeout']
|
||||
self.build_timeout = self.nova['build_timeout']
|
||||
|
||||
def tearDown(self):
|
||||
if getattr(self, 'server_id', False):
|
||||
self.os.nova.delete_server(self.server_id)
|
||||
|
||||
def _assert_created_server_entity(self, created_server):
|
||||
actual_keys = set(created_server.keys())
|
||||
expected_keys = set((
|
||||
'id',
|
||||
'adminPass',
|
||||
'links',
|
||||
))
|
||||
print actual_keys
|
||||
print expected_keys
|
||||
self.assertTrue(expected_keys <= actual_keys)
|
||||
self._assert_server_links(created_server)
|
||||
|
||||
def _assert_server_entity(self, server):
|
||||
actual_keys = set(server.keys())
|
||||
expected_keys = set((
|
||||
'accessIPv4',
|
||||
'accessIPv6',
|
||||
'addresses',
|
||||
'created',
|
||||
'flavor',
|
||||
'hostId',
|
||||
'id',
|
||||
'image',
|
||||
'links',
|
||||
'metadata',
|
||||
'name',
|
||||
'progress',
|
||||
'status',
|
||||
'updated',
|
||||
))
|
||||
self.assertTrue(expected_keys <= actual_keys)
|
||||
self._assert_server_links(server)
|
||||
|
||||
def _assert_server_links(self, server):
|
||||
server_id = str(server['id'])
|
||||
host = self.nova['host']
|
||||
port = self.nova['port']
|
||||
api_url = '%s:%s' % (host, port)
|
||||
base_url = os.path.join(api_url, self.nova['apiver'])
|
||||
|
||||
self_link = 'http://' + os.path.join(base_url,
|
||||
self.os.nova.project_id,
|
||||
'servers', server_id)
|
||||
bookmark_link = 'http://' + os.path.join(api_url,
|
||||
self.os.nova.project_id,
|
||||
'servers', server_id)
|
||||
|
||||
expected_links = [
|
||||
{
|
||||
'rel': 'self',
|
||||
'href': self_link,
|
||||
},
|
||||
{
|
||||
'rel': 'bookmark',
|
||||
'href': bookmark_link,
|
||||
},
|
||||
]
|
||||
|
||||
self.assertEqual(server['links'], expected_links)
|
||||
|
||||
def test_build_update_delete(self):
|
||||
"""Build and delete a server"""
|
||||
|
||||
server_password = 'testpwd'
|
||||
|
||||
expected_server = {
|
||||
'name': 'testserver',
|
||||
'imageRef': self.image_ref,
|
||||
'flavorRef': self.flavor_ref,
|
||||
'metadata': {'testEntry': 'testValue'},
|
||||
}
|
||||
|
||||
post_body = json.dumps({'server': expected_server})
|
||||
response, body = self.os.nova.request('POST',
|
||||
'/servers',
|
||||
body=post_body)
|
||||
|
||||
# Ensure attributes were returned
|
||||
self.assertEqual(response.status, 202)
|
||||
_body = json.loads(body)
|
||||
self.assertEqual(_body.keys(), ['server'])
|
||||
created_server = _body['server']
|
||||
admin_pass = created_server['adminPass']
|
||||
self._assert_created_server_entity(created_server)
|
||||
self.server_id = created_server['id']
|
||||
|
||||
# Get server again and ensure attributes stuck
|
||||
server = self.os.nova.get_server(self.server_id)
|
||||
self._assert_server_entity(server)
|
||||
self.assertEqual(server['name'], expected_server['name'])
|
||||
self.assertEqual(server['accessIPv4'], '')
|
||||
self.assertEqual(server['accessIPv6'], '')
|
||||
self.assertEqual(server['metadata'], {'testEntry': 'testValue'})
|
||||
|
||||
# Parse last-updated time
|
||||
update_time = utils.load_isotime(server['created'])
|
||||
|
||||
# Ensure server not returned with future changes-since
|
||||
future_time = utils.dump_isotime(update_time + datetime.timedelta(100))
|
||||
params = 'changes-since=%s' % future_time
|
||||
response, body = self.os.nova.request('GET', '/servers?%s' % params)
|
||||
servers = json.loads(body)['servers']
|
||||
self.assertTrue(len(servers) == 0)
|
||||
|
||||
# Ensure server is returned with past changes-since
|
||||
future_time = utils.dump_isotime(update_time - datetime.timedelta(1))
|
||||
params = 'changes-since=%s' % future_time
|
||||
response, body = self.os.nova.request('GET', '/servers?%s' % params)
|
||||
servers = json.loads(body)['servers']
|
||||
server_ids = map(lambda x: x['id'], servers)
|
||||
self.assertTrue(self.server_id in server_ids)
|
||||
|
||||
# Update name
|
||||
new_name = 'testserver2'
|
||||
new_server = {'name': new_name}
|
||||
put_body = json.dumps({'server': new_server})
|
||||
url = '/servers/%s' % self.server_id
|
||||
resp, body = self.os.nova.request('PUT', url, body=put_body)
|
||||
|
||||
# Output from update should be a full server
|
||||
self.assertEqual(resp.status, 200)
|
||||
data = json.loads(body)
|
||||
self.assertEqual(data.keys(), ['server'])
|
||||
self._assert_server_entity(data['server'])
|
||||
self.assertEqual(new_name, data['server']['name'])
|
||||
|
||||
# Check that name was changed
|
||||
updated_server = self.os.nova.get_server(self.server_id)
|
||||
self._assert_server_entity(updated_server)
|
||||
self.assertEqual(new_name, updated_server['name'])
|
||||
|
||||
# Update accessIPv4
|
||||
new_server = {'accessIPv4': '192.168.0.200'}
|
||||
put_body = json.dumps({'server': new_server})
|
||||
url = '/servers/%s' % self.server_id
|
||||
resp, body = self.os.nova.request('PUT', url, body=put_body)
|
||||
|
||||
# Output from update should be a full server
|
||||
self.assertEqual(resp.status, 200)
|
||||
data = json.loads(body)
|
||||
self.assertEqual(data.keys(), ['server'])
|
||||
self._assert_server_entity(data['server'])
|
||||
self.assertEqual('192.168.0.200', data['server']['accessIPv4'])
|
||||
|
||||
# Check that accessIPv4 was changed
|
||||
updated_server = self.os.nova.get_server(self.server_id)
|
||||
self._assert_server_entity(updated_server)
|
||||
self.assertEqual('192.168.0.200', updated_server['accessIPv4'])
|
||||
|
||||
# Update accessIPv6
|
||||
new_server = {'accessIPv6': 'feed::beef'}
|
||||
put_body = json.dumps({'server': new_server})
|
||||
url = '/servers/%s' % self.server_id
|
||||
resp, body = self.os.nova.request('PUT', url, body=put_body)
|
||||
|
||||
# Output from update should be a full server
|
||||
self.assertEqual(resp.status, 200)
|
||||
data = json.loads(body)
|
||||
self.assertEqual(data.keys(), ['server'])
|
||||
self._assert_server_entity(data['server'])
|
||||
self.assertEqual('feed::beef', data['server']['accessIPv6'])
|
||||
|
||||
# Check that accessIPv6 was changed
|
||||
updated_server = self.os.nova.get_server(self.server_id)
|
||||
self._assert_server_entity(updated_server)
|
||||
self.assertEqual('feed::beef', updated_server['accessIPv6'])
|
||||
|
||||
# Check metadata subresource
|
||||
url = '/servers/%s/metadata' % self.server_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEqual(200, response.status)
|
||||
|
||||
result = json.loads(body)
|
||||
expected = {'metadata': {'testEntry': 'testValue'}}
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# Ensure metadata container can be modified
|
||||
expected = {
|
||||
'metadata': {
|
||||
'new_meta1': 'new_value1',
|
||||
'new_meta2': 'new_value2',
|
||||
},
|
||||
}
|
||||
post_body = json.dumps(expected)
|
||||
url = '/servers/%s/metadata' % self.server_id
|
||||
response, body = self.os.nova.request('POST', url, body=post_body)
|
||||
self.assertEqual(200, response.status)
|
||||
result = json.loads(body)
|
||||
expected['metadata']['testEntry'] = 'testValue'
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# Ensure values stick
|
||||
url = '/servers/%s/metadata' % self.server_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEqual(200, response.status)
|
||||
result = json.loads(body)
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# Ensure metadata container can be overwritten
|
||||
expected = {
|
||||
'metadata': {
|
||||
'new_meta3': 'new_value3',
|
||||
'new_meta4': 'new_value4',
|
||||
},
|
||||
}
|
||||
url = '/servers/%s/metadata' % self.server_id
|
||||
post_body = json.dumps(expected)
|
||||
response, body = self.os.nova.request('PUT', url, body=post_body)
|
||||
self.assertEqual(200, response.status)
|
||||
result = json.loads(body)
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# Ensure values stick
|
||||
url = '/servers/%s/metadata' % self.server_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEqual(200, response.status)
|
||||
result = json.loads(body)
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# Set specific key
|
||||
expected_meta = {'meta': {'new_meta5': 'new_value5'}}
|
||||
put_body = json.dumps(expected_meta)
|
||||
url = '/servers/%s/metadata/new_meta5' % self.server_id
|
||||
response, body = self.os.nova.request('PUT', url, body=put_body)
|
||||
self.assertEqual(200, response.status)
|
||||
result = json.loads(body)
|
||||
self.assertDictEqual(expected_meta, result)
|
||||
|
||||
# Ensure value sticks
|
||||
expected_metadata = {
|
||||
'metadata': {
|
||||
'new_meta3': 'new_value3',
|
||||
'new_meta4': 'new_value4',
|
||||
'new_meta5': 'new_value5',
|
||||
},
|
||||
}
|
||||
url = '/servers/%s/metadata' % self.server_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
result = json.loads(body)
|
||||
self.assertDictEqual(expected_metadata, result)
|
||||
|
||||
# Update existing key
|
||||
expected_meta = {'meta': {'new_meta4': 'new_value6'}}
|
||||
put_body = json.dumps(expected_meta)
|
||||
url = '/servers/%s/metadata/new_meta4' % self.server_id
|
||||
response, body = self.os.nova.request('PUT', url, body=put_body)
|
||||
self.assertEqual(200, response.status)
|
||||
result = json.loads(body)
|
||||
self.assertEqual(expected_meta, result)
|
||||
|
||||
# Ensure value sticks
|
||||
expected_metadata = {
|
||||
'metadata': {
|
||||
'new_meta3': 'new_value3',
|
||||
'new_meta4': 'new_value6',
|
||||
'new_meta5': 'new_value5',
|
||||
},
|
||||
}
|
||||
url = '/servers/%s/metadata' % self.server_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
result = json.loads(body)
|
||||
self.assertDictEqual(expected_metadata, result)
|
||||
|
||||
# Delete a certain key
|
||||
url = '/servers/%s/metadata/new_meta3' % self.server_id
|
||||
response, body = self.os.nova.request('DELETE', url)
|
||||
self.assertEquals(204, response.status)
|
||||
|
||||
# Make sure the key is gone
|
||||
url = '/servers/%s/metadata/new_meta3' % self.server_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEquals(404, response.status)
|
||||
|
||||
# Delete a nonexistant key
|
||||
url = '/servers/%s/metadata/new_meta3' % self.server_id
|
||||
response, body = self.os.nova.request('DELETE', url)
|
||||
self.assertEquals(404, response.status)
|
||||
|
||||
# Wait for instance to boot
|
||||
self.os.nova.wait_for_server_status(self.server_id,
|
||||
'ACTIVE',
|
||||
timeout=self.build_timeout)
|
||||
|
||||
# Look for 'addresses' attribute on server
|
||||
url = '/servers/%s' % self.server_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEqual(response.status, 200)
|
||||
body = json.loads(body)
|
||||
self.assertTrue('addresses' in body['server'].keys())
|
||||
server_addresses = body['server']['addresses']
|
||||
|
||||
# Addresses should be available from subresource
|
||||
url = '/servers/%s/ips' % self.server_id
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEqual(response.status, 200)
|
||||
body = json.loads(body)
|
||||
self.assertEqual(body.keys(), ['addresses'])
|
||||
ips_addresses = body['addresses']
|
||||
|
||||
# Ensure both resources return identical information
|
||||
self.assertEqual(server_addresses, ips_addresses)
|
||||
|
||||
# Validate entities within network containers
|
||||
for (network, network_data) in ips_addresses.items():
|
||||
url = '/servers/%s/ips/%s' % (self.server_id, network)
|
||||
response, body = self.os.nova.request('GET', url)
|
||||
self.assertEqual(response.status, 200)
|
||||
body = json.loads(body)
|
||||
self.assertEqual(body.keys(), [network])
|
||||
self.assertEqual(body[network], network_data)
|
||||
|
||||
# Check each IP entity
|
||||
for ip_data in network_data:
|
||||
self.assertEqual(set(ip_data.keys()), set(['addr', 'version']))
|
||||
|
||||
# Find IP of server
|
||||
try:
|
||||
(_, network) = server_addresses.items()[0]
|
||||
ip = network[0]['addr']
|
||||
except KeyError:
|
||||
self.fail("Failed to retrieve IP address from server entity")
|
||||
|
||||
# Assert password works
|
||||
if int(self.nova['ssh_timeout']) > 0:
|
||||
client = ssh.Client(ip, 'root', admin_pass, self.ssh_timeout)
|
||||
self.assertTrue(client.test_connection_auth())
|
||||
|
||||
self.os.nova.delete_server(self.server_id)
|
||||
|
||||
# Poll server until deleted
|
||||
try:
|
||||
url = '/servers/%s' % self.server_id
|
||||
self.os.nova.poll_request_status('GET', url, 404)
|
||||
except exceptions.TimeoutException:
|
||||
self.fail("Server deletion timed out")
|
||||
test_build_update_delete.tags = ['nova']
|
||||
|
||||
def test_build_with_password_and_file(self):
|
||||
"""Build a server with a custom password and an injected file"""
|
||||
|
||||
file_contents = 'testing'
|
||||
|
||||
expected_server = {
|
||||
'name': 'testserver',
|
||||
'metadata': {
|
||||
'key1': 'value1',
|
||||
'key2': 'value2',
|
||||
},
|
||||
'personality': [
|
||||
{
|
||||
'path': '/etc/test.txt',
|
||||
'contents': base64.b64encode(file_contents),
|
||||
},
|
||||
],
|
||||
'imageRef': self.image_ref,
|
||||
'flavorRef': self.flavor_ref,
|
||||
'adminPass': 'secrete',
|
||||
}
|
||||
|
||||
post_body = json.dumps({'server': expected_server})
|
||||
response, body = self.os.nova.request('POST',
|
||||
'/servers',
|
||||
body=post_body)
|
||||
|
||||
self.assertEqual(response.status, 202)
|
||||
|
||||
_body = json.loads(body)
|
||||
self.assertEqual(_body.keys(), ['server'])
|
||||
created_server = _body['server']
|
||||
self.server_id = _body['server']['id']
|
||||
|
||||
admin_pass = created_server['adminPass']
|
||||
self.assertEqual(expected_server['adminPass'], admin_pass)
|
||||
self._assert_created_server_entity(created_server)
|
||||
self.assertEqual(expected_server['metadata'], {
|
||||
'key1': 'value1',
|
||||
'key2': 'value2',
|
||||
})
|
||||
|
||||
self.os.nova.wait_for_server_status(created_server['id'],
|
||||
'ACTIVE',
|
||||
timeout=self.build_timeout)
|
||||
|
||||
server = self.os.nova.get_server(created_server['id'])
|
||||
|
||||
# Find IP of server
|
||||
try:
|
||||
(_, network) = server['addresses'].popitem()
|
||||
ip = network[0]['addr']
|
||||
except KeyError:
|
||||
self.fail("Failed to retrieve IP address from server entity")
|
||||
|
||||
# Assert injected file is on instance, also verifying password works
|
||||
if int(self.nova['ssh_timeout']) > 0:
|
||||
client = ssh.Client(ip, 'root', admin_pass, self.ssh_timeout)
|
||||
injected_file = client.exec_command('cat /etc/test.txt')
|
||||
self.assertEqual(injected_file, file_contents)
|
||||
test_build_with_password_and_file.tags = ['nova']
|
||||
|
||||
def test_delete_while_building(self):
|
||||
"""Delete a server while building"""
|
||||
|
||||
# Make create server request
|
||||
server = {
|
||||
'name' : 'testserver',
|
||||
'imageRef' : self.image_ref,
|
||||
'flavorRef' : self.flavor_ref,
|
||||
}
|
||||
created_server = self.os.nova.create_server(server)
|
||||
|
||||
# Server should immediately be accessible, but in have building status
|
||||
server = self.os.nova.get_server(created_server['id'])
|
||||
self.assertEqual(server['status'], 'BUILD')
|
||||
|
||||
self.os.nova.delete_server(created_server['id'])
|
||||
|
||||
# Poll server until deleted
|
||||
try:
|
||||
url = '/servers/%s' % created_server['id']
|
||||
self.os.nova.poll_request_status('GET', url, 404)
|
||||
except exceptions.TimeoutException:
|
||||
self.fail("Server deletion timed out")
|
||||
test_delete_while_building.tags = ['nova']
|
||||
|
||||
def test_create_with_invalid_image(self):
|
||||
"""Create a server with an unknown image"""
|
||||
|
||||
post_body = json.dumps({
|
||||
'server' : {
|
||||
'name' : 'testserver',
|
||||
'imageRef' : -1,
|
||||
'flavorRef' : self.flavor_ref,
|
||||
}
|
||||
})
|
||||
|
||||
resp, body = self.os.nova.request('POST', '/servers', body=post_body)
|
||||
|
||||
self.assertEqual(400, resp.status)
|
||||
|
||||
fault = json.loads(body)
|
||||
expected_fault = {
|
||||
"badRequest": {
|
||||
"message": "Cannot find requested image",
|
||||
"code": 400,
|
||||
},
|
||||
}
|
||||
# KNOWN-ISSUE - The error message is confusing and should be improved
|
||||
#self.assertEqual(fault, expected_fault)
|
||||
test_create_with_invalid_image.tags = ['nova']
|
||||
|
||||
def test_create_with_invalid_flavor(self):
|
||||
"""Create a server with an unknown flavor"""
|
||||
|
||||
post_body = json.dumps({
|
||||
'server' : {
|
||||
'name' : 'testserver',
|
||||
'imageRef' : self.image_ref,
|
||||
'flavorRef' : -1,
|
||||
}
|
||||
})
|
||||
|
||||
resp, body = self.os.nova.request('POST', '/servers', body=post_body)
|
||||
|
||||
self.assertEqual(400, resp.status)
|
||||
|
||||
fault = json.loads(body)
|
||||
expected_fault = {
|
||||
"badRequest": {
|
||||
"message": "Cannot find requested flavor",
|
||||
"code": 400,
|
||||
},
|
||||
}
|
||||
# KNOWN-ISSUE lp804084
|
||||
#self.assertEqual(fault, expected_fault)
|
||||
test_create_with_invalid_flavor.tags = ['nova']
|
@ -43,7 +43,7 @@ function run_tests {
|
||||
|
||||
function run_pep8 {
|
||||
echo "Running pep8 ..."
|
||||
PEP8_EXCLUDE="kong,etc,include,tools"
|
||||
PEP8_EXCLUDE="etc,include,tools"
|
||||
PEP8_OPTIONS="--exclude=$PEP8_EXCLUDE --repeat"
|
||||
PEP8_INCLUDE="."
|
||||
pep8 $PEP8_OPTIONS $PEP8_INCLUDE
|
||||
|
2
tox.ini
2
tox.ini
@ -15,4 +15,4 @@ commands = nosetests {posargs}
|
||||
|
||||
[testenv:pep8]
|
||||
deps = pep8==1.3.3
|
||||
commands = pep8 --ignore=E121,E122,E125,E126 --repeat --show-source --exclude=.venv,.tox,dist,doc,openstack,kong .
|
||||
commands = pep8 --ignore=E121,E122,E125,E126 --repeat --show-source --exclude=.venv,.tox,dist,doc,openstack .
|
||||
|
Loading…
Reference in New Issue
Block a user