Nolan Brubaker b9bbfc92bf Add Neutron API test
Query the Neutron API for networks.

Added the get_objects helper method since simply listing resources
appears to be a fairly common operation in these tests. Will refactor
existing tests to conform to this standard in a separate patch.

Change-Id: I78fab1083e90bff8db46842b87f73556df153541
2017-05-31 14:12:10 -04:00

382 lines
12 KiB
Python
Executable File

#!/usr/bin/env python
# Copyright 2017, Rackspace US, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# (c) 2017, Nolan Brubaker <nolan.brubaker@rackspace.com>
# Necessary for accurate failure rate calculation
from __future__ import division
import argparse
import datetime
from keystoneauth1.identity import v3
from keystoneauth1 import session
from keystoneclient.v3 import client as key_client
import logging
import os
from openstack import connection
from openstack import profile
import signal
import sys
import time
import glanceclient
import tempfile
logger = logging.getLogger(__name__)
class ServiceTest(object):
def pre_test(self, *args, **kwargs):
"""Any actions that need to be taken before starting the timer
These actions will run inside the test loop, but before marking a
start time.
This might include creating a local resource, such as a file to upload
to Glance, Cinder, or Swift.
"""
raise NotImplementedError
def run(self):
"""Run the main test, within the timing window.
This test run should actually create and query a resource.
"""
raise NotImplementedError
def post_test(self):
"""Any post-test clean up work that needs to be done and not timed."""
raise NotImplementedError
def __init__(self):
self.get_connection()
def configure_logger(self, logger, console_logging=False):
"""Configure a stream and file log for a given service
:param: service - name of service for log file.
generates `/var/log/{service_name}_query.log`
:param: logger - logger to be configure for the test.
Filename will be based on the test's `service_name`
property
:param: console_logging - flag controlling whether or not a console
logger is used
"""
logger.setLevel(logging.INFO)
filename = '/var/log/{}_rolling.log'.format(self.service_name)
logfile = logging.FileHandler(filename, 'a')
logfile.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s')
# Make sure we're using UTC for everything.
formatter.converter = time.gmtime
logfile.setFormatter(formatter)
logger.addHandler(logfile)
if console_logging:
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)
# This is useful to a lot of tests, so implement it here for re-use
def get_session(self):
auth_url = os.environ['OS_AUTH_URL']
password = os.environ['OS_PASSWORD']
auth = v3.Password(auth_url=auth_url, username="admin",
password=password, project_name="admin",
user_domain_id="default",
project_domain_id="default")
sess = session.Session(auth=auth)
return sess
def get_keystone_client(self, session):
return key_client.Client(session=session)
def get_connection(self):
"""Get an OpenStackSDK connection"""
auth_url = os.environ['OS_AUTH_URL']
password = os.environ['OS_PASSWORD']
prof = profile.Profile()
prof.set_interface(profile.Profile.ALL, 'admin')
conn = connection.Connection(auth_url=auth_url,
username='admin',
password=password,
project_name='admin',
user_domain_id='default',
project_domain_id='default',
profile=prof)
self.conn = conn
return conn
def get_objects(self, service, name):
"""Retrieve some sort of object from OpenStack APIs
This applies to high level concepts like 'flavors', 'networks',
'subnets', etc.
:params: service - an openstack service corresponding to the OpenStack
SDK module used, such as 'compute', 'network', etc.
:param: name - name of a type of object, such as a 'network',
'server', 'volume', etc owned by an OpenStack service
"""
objs = [obj for obj in getattr(getattr(self.conn, service), name)()]
return objs
class KeystoneTest(ServiceTest):
service_name = 'keystone'
description = 'Obtain a token then a project list to validate it worked'
def run(self):
auth_url = os.environ['OS_AUTH_URL']
password = os.environ['OS_PASSWORD']
auth = v3.Password(auth_url=auth_url, username="admin",
password=password, project_name="admin",
user_domain_id="default",
project_domain_id="default")
sess = session.Session(auth=auth)
keystone = key_client.Client(session=sess)
keystone.projects.list()
msg = "Project list retrieved"
return msg
class GlanceTest(ServiceTest):
service_name = 'glance'
description = 'Upload and delete a 1MB file'
def pre_test(self):
# make a bogus file to give to glance.
self.temp_file = tempfile.TemporaryFile()
self.temp_file.write(os.urandom(1024 * 1024))
self.temp_file.seek(0)
def run(self):
sess = self.get_session()
keystone = self.get_keystone_client(sess)
endpoint = self.get_glance_endpoint(keystone)
glance = glanceclient.Client(version='2',
endpoint=endpoint, session=sess)
image = glance.images.create(name="Rolling test",
disk_format="raw",
container_format="bare")
glance.images.upload(image.id, self.temp_file)
glance.images.delete(image.id)
self.temp_file.close()
msg = "Image created and deleted."
return msg
def get_glance_endpoint(self, keystone):
"""Get the glance admin endpoint
Because we don't want to set up SSL handling, use the plain HTTP
endpoints.
"""
service_id = keystone.services.find(name='glance')
glance_endpoint = keystone.endpoints.list(service=service_id,
interface='admin')[0]
# The glance client wants the URL, not the keystone object
return glance_endpoint.url
class NovaTest(ServiceTest):
service_name = 'nova'
description = 'Query for a list of flavors'
def run(self):
conn = self.get_connection()
# Have to iterate over the generator returned to actually
# see the flavors
flavors = [flavor for flavor in conn.compute.flavors()]
msg = 'API reached, no flavors found'
if flavors:
msg = 'Flavor list received'
return msg
class NeutronTest(ServiceTest):
service_name = 'neutron'
description = 'Query for a list of networks'
def run(self):
networks = self.get_objects('network', 'networks')
msg = 'API reached, no networks found'
if networks:
msg = 'Network list received'
return msg
class TestRunner(object):
"""Run a test in a loop, with the option to gracefully exit"""
stop_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.prep_exit)
signal.signal(signal.SIGTERM, self.prep_exit)
self.failures = 0
self.attempts = 0
def prep_exit(self, signum, frame):
self.stop_now = True
logger.info("Received signal, stopping")
def write_summary(self):
percentage = (self.failures / self.attempts) * 100
# Display minimum of 2 digits, but don't use decimals.
percent_str = "%2.0f" % percentage
logger.info("%s%% failure rate", percent_str)
# Output to stdout for use by other programs
print(percent_str)
def test_loop(self, test):
"""Main loop to execute tests
Executes and times interactions with OpenStack services to gather
timing data.
Execution can be ended by sending SIGINT or SIGTERM and the running
test will finish.
:param: test - on object that performs some action
against an OpenStack service API.
"""
disconnected = None
while True:
self.attempts += 1
try:
# Pause for a bit so we're not generating more data than we
# can handle
time.sleep(1)
try:
test.pre_test()
except NotImplementedError:
pass
start_time = datetime.datetime.now()
# Let the test function report it's own errors
msg = test.run()
end_time = datetime.datetime.now()
if disconnected:
dis_delta = end_time - disconnected
disconnected = None
logger.info("Reconnect {}s".format(
dis_delta.total_seconds()))
delta = end_time - start_time
logger.info("{} {}".format(msg, delta.total_seconds()))
try:
test.post_test()
except NotImplementedError:
pass
# Catch all exceptions not handled by the tests themselves,
# since we want to keep the loop running until explicitly stopped
except Exception as e:
self.failures += 1
if not disconnected:
disconnected = datetime.datetime.now()
# OpenStack API exceptions put their info in the 'details'
# attribute; 'message' is the standard one.
error_msg = getattr(e, 'details', e.message)
logger.error("%s", error_msg)
if self.stop_now:
self.write_summary()
sys.exit()
available_tests = {
'keystone': KeystoneTest,
'glance': GlanceTest,
'nova': NovaTest,
'neutron': NeutronTest
}
def args(arg_list):
parser = argparse.ArgumentParser(
usage='%(prog)s',
description=('OpenStack activity simulators. Returns percentage of '
'failed attempts at creating/deleting resources.'),
)
parser.add_argument(
'test',
help=("Name of test to execute, 'list' for a list of available"
" tests")
)
parser.add_argument(
'-c',
'--console',
help=("Log output to the console for interactive viewing"),
action='store_true',
)
return parser.parse_args(arg_list)
def find_test(test_name):
if test_name in available_tests:
return available_tests[test_name]
elif test_name == "list":
for key, test_class in available_tests.items():
print("{} -> {}".format(key, test_class.description))
sys.exit()
else:
print("Test named {} not found.".format(test_name))
sys.exit()
if __name__ == "__main__":
all_args = args(sys.argv[1:])
target_test_class = find_test(all_args.test)
target_test = target_test_class()
target_test.configure_logger(logger, console_logging=all_args.console)
runner = TestRunner()
runner.test_loop(target_test)