Merge "fix N402 for nova/tests"

This commit is contained in:
Jenkins 2013-01-09 20:43:57 +00:00 committed by Gerrit Code Review
commit 9a5fed3b5d
73 changed files with 541 additions and 553 deletions

View File

@ -167,7 +167,7 @@ class CinderCloudTestCase(test.TestCase):
name)
def test_describe_volumes(self):
"""Makes sure describe_volumes works and filters results."""
# Makes sure describe_volumes works and filters results.
vol1 = self.cloud.create_volume(self.context,
size=1,
@ -208,7 +208,7 @@ class CinderCloudTestCase(test.TestCase):
self.cloud.delete_volume(self.context, volume_id)
def test_create_volume_from_snapshot(self):
"""Makes sure create_volume works when we specify a snapshot."""
# Makes sure create_volume works when we specify a snapshot.
availability_zone = 'zone1:host1'
vol1 = self.cloud.create_volume(self.context,
size=1,
@ -233,7 +233,7 @@ class CinderCloudTestCase(test.TestCase):
self.cloud.delete_volume(self.context, volume1_id)
def test_describe_snapshots(self):
"""Makes sure describe_snapshots works and filters results."""
# Makes sure describe_snapshots works and filters results.
availability_zone = 'zone1:host1'
vol1 = self.cloud.create_volume(self.context,
size=1,
@ -309,7 +309,7 @@ class CinderCloudTestCase(test.TestCase):
'banana')
def test_create_snapshot(self):
"""Makes sure create_snapshot works."""
# Makes sure create_snapshot works.
availability_zone = 'zone1:host1'
result = self.cloud.describe_snapshots(self.context)
vol1 = self.cloud.create_volume(self.context,
@ -330,7 +330,7 @@ class CinderCloudTestCase(test.TestCase):
self.cloud.delete_volume(self.context, vol1['volumeId'])
def test_delete_snapshot(self):
"""Makes sure delete_snapshot works."""
# Makes sure delete_snapshot works.
availability_zone = 'zone1:host1'
vol1 = self.cloud.create_volume(self.context,
size=1,
@ -707,7 +707,7 @@ class CinderCloudTestCase(test.TestCase):
self.assertEqual(vol['attach_status'], "detached")
def test_stop_start_with_volume(self):
"""Make sure run instance with block device mapping works"""
# Make sure run instance with block device mapping works.
availability_zone = 'zone1:host1'
vol1 = self.cloud.create_volume(self.context,
size=1,
@ -788,7 +788,7 @@ class CinderCloudTestCase(test.TestCase):
self._restart_compute_service()
def test_stop_with_attached_volume(self):
"""Make sure attach info is reflected to block device mapping"""
# Make sure attach info is reflected to block device mapping.
availability_zone = 'zone1:host1'
vol1 = self.cloud.create_volume(self.context,
@ -863,7 +863,7 @@ class CinderCloudTestCase(test.TestCase):
return result['snapshotId']
def test_run_with_snapshot(self):
"""Makes sure run/stop/start instance with snapshot works."""
# Makes sure run/stop/start instance with snapshot works.
availability_zone = 'zone1:host1'
vol1 = self.cloud.create_volume(self.context,
size=1,
@ -936,7 +936,7 @@ class CinderCloudTestCase(test.TestCase):
# self.cloud.delete_snapshot(self.context, snapshot_id)
def test_create_image(self):
"""Make sure that CreateImage works"""
# Make sure that CreateImage works.
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval_max=0.3)

View File

@ -187,7 +187,7 @@ class CloudTestCase(test.TestCase):
name)
def test_describe_regions(self):
"""Makes sure describe regions runs without raising an exception"""
# Makes sure describe regions runs without raising an exception.
result = self.cloud.describe_regions(self.context)
self.assertEqual(len(result['regionInfo']), 1)
self.flags(region_list=["one=test_host1", "two=test_host2"])
@ -195,7 +195,7 @@ class CloudTestCase(test.TestCase):
self.assertEqual(len(result['regionInfo']), 2)
def test_describe_addresses(self):
"""Makes sure describe addresses runs without raising an exception"""
# Makes sure describe addresses runs without raising an exception.
address = "10.10.10.10"
db.floating_ip_create(self.context,
{'address': address,
@ -207,7 +207,7 @@ class CloudTestCase(test.TestCase):
db.floating_ip_destroy(self.context, address)
def test_describe_specific_address(self):
"""Makes sure describe specific address works"""
# Makes sure describe specific address works.
addresses = ["10.10.10.10", "10.10.10.11"]
for address in addresses:
db.floating_ip_create(self.context,
@ -246,7 +246,7 @@ class CloudTestCase(test.TestCase):
self.assertEqual(result.get('return', None), 'true')
def test_associate_disassociate_address(self):
"""Verifies associate runs cleanly without raising an exception"""
# Verifies associate runs cleanly without raising an exception.
address = "10.10.10.10"
db.floating_ip_create(self.context,
{'address': address,
@ -326,7 +326,7 @@ class CloudTestCase(test.TestCase):
db.floating_ip_destroy(self.context, address)
def test_describe_security_groups(self):
"""Makes sure describe_security_groups works and filters results."""
# Makes sure describe_security_groups works and filters results.
sec = db.security_group_create(self.context,
{'project_id': self.context.project_id,
'name': 'test'})
@ -342,7 +342,7 @@ class CloudTestCase(test.TestCase):
db.security_group_destroy(self.context, sec['id'])
def test_describe_security_groups_all_tenants(self):
"""Makes sure describe_security_groups works and filters results."""
# Makes sure describe_security_groups works and filters results.
sec = db.security_group_create(self.context,
{'project_id': 'foobar',
'name': 'test'})
@ -673,7 +673,7 @@ class CloudTestCase(test.TestCase):
self.assertFalse(get_rules(self.context, group1['id']))
def test_delete_security_group_in_use_by_instance(self):
"""Ensure that a group can not be deleted if in use by an instance."""
# Ensure that a group can not be deleted if in use by an instance.
image_uuid = 'cedef40a-ed67-4d10-800e-17455edce175'
args = {'reservation_id': 'a',
'image_ref': image_uuid,
@ -699,7 +699,7 @@ class CloudTestCase(test.TestCase):
self.cloud.delete_security_group(self.context, 'testgrp')
def test_describe_availability_zones(self):
"""Makes sure describe_availability_zones works and filters results."""
# Makes sure describe_availability_zones works and filters results.
service1 = db.service_create(self.context, {'host': 'host1_zones',
'binary': "nova-compute",
'topic': 'compute',
@ -725,7 +725,7 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, service2['id'])
def test_describe_availability_zones_verbose(self):
"""Makes sure describe_availability_zones works and filters results."""
# Makes sure describe_availability_zones works and filters results.
service1 = db.service_create(self.context, {'host': 'host1_zones',
'binary': "nova-compute",
'topic': 'compute',
@ -747,7 +747,7 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, service2['id'])
def test_describe_instances(self):
"""Makes sure describe_instances works and filters results."""
# Makes sure describe_instances works and filters results.
self.flags(use_ipv6=True)
self._stub_instance_get_with_fixed_ips('get_all')
@ -812,7 +812,7 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, comp2['id'])
def test_describe_instances_all_invalid(self):
"""Makes sure describe_instances works and filters results."""
# Makes sure describe_instances works and filters results.
self.flags(use_ipv6=True)
self._stub_instance_get_with_fixed_ips('get_all')
@ -824,7 +824,7 @@ class CloudTestCase(test.TestCase):
instance_id=[instance_id])
def test_describe_instances_sorting(self):
"""Makes sure describe_instances works and is sorted as expected."""
# Makes sure describe_instances works and is sorted as expected.
self.flags(use_ipv6=True)
self._stub_instance_get_with_fixed_ips('get_all')
@ -878,7 +878,7 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, comp2['id'])
def test_describe_instance_state(self):
"""Makes sure describe_instances for instanceState works."""
# Makes sure describe_instances for instanceState works.
def test_instance_state(expected_code, expected_name,
power_state_, vm_state_, values=None):
@ -908,7 +908,7 @@ class CloudTestCase(test.TestCase):
{'shutdown_terminate': False})
def test_describe_instances_no_ipv6(self):
"""Makes sure describe_instances w/ no ipv6 works."""
# Makes sure describe_instances w/ no ipv6 works.
self.flags(use_ipv6=False)
self._stub_instance_get_with_fixed_ips('get_all')
@ -1153,7 +1153,7 @@ class CloudTestCase(test.TestCase):
# deleteOnTermination
# noDevice
def test_describe_image_mapping(self):
"""test for rootDeviceName and blockDeiceMapping"""
# test for rootDeviceName and blockDeiceMapping.
describe_images = self.cloud.describe_images
self._setUpImageSet()
@ -1645,7 +1645,7 @@ class CloudTestCase(test.TestCase):
self.compute = self.start_service('compute')
def test_stop_start_instance(self):
"""Makes sure stop/start instance works"""
# Makes sure stop/start instance works.
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval_max=0.3)
@ -1848,7 +1848,7 @@ class CloudTestCase(test.TestCase):
return result['snapshotId']
def _do_test_create_image(self, no_reboot):
"""Make sure that CreateImage works"""
"""Make sure that CreateImage works."""
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval_max=0.3)
@ -1891,7 +1891,7 @@ class CloudTestCase(test.TestCase):
connection_info='{"foo":"bar"}')
def __getattr__(self, name):
"""Properly delegate dotted lookups"""
"""Properly delegate dotted lookups."""
if name in self.__dict__['values']:
return self.values.get(name)
try:
@ -1945,11 +1945,11 @@ class CloudTestCase(test.TestCase):
self._restart_compute_service()
def test_create_image_no_reboot(self):
"""Make sure that CreateImage works"""
# Make sure that CreateImage works.
self._do_test_create_image(True)
def test_create_image_with_reboot(self):
"""Make sure that CreateImage works"""
# Make sure that CreateImage works.
self._do_test_create_image(False)
def test_create_image_instance_store(self):
@ -1980,7 +1980,7 @@ class CloudTestCase(test.TestCase):
delete_on_termination=False)
def __getattr__(self, name):
"""Properly delegate dotted lookups"""
"""Properly delegate dotted lookups."""
if name in self.__dict__['values']:
return self.values.get(name)
try:
@ -2052,7 +2052,7 @@ class CloudTestCase(test.TestCase):
]
def test_describe_instance_attribute(self):
"""Make sure that describe_instance_attribute works"""
# Make sure that describe_instance_attribute works.
self.stubs.Set(db, 'block_device_mapping_get_all_by_instance',
self._fake_bdm_get)

View File

@ -176,7 +176,7 @@ class EC2ValidateTestCase(test.TestCase):
class EC2TimestampValidationTestCase(test.TestCase):
"""Test case for EC2 request timestamp validation"""
"""Test case for EC2 request timestamp validation."""
def test_validate_ec2_timestamp_valid(self):
params = {'Timestamp': '2011-04-22T11:29:49Z'}

View File

@ -22,13 +22,13 @@ class TestFaults(test.TestCase):
"""Tests covering ec2 Fault class."""
def test_fault_exception(self):
"""Ensure the status_int is set correctly on faults"""
# Ensure the status_int is set correctly on faults.
fault = faults.Fault(webob.exc.HTTPBadRequest(
explanation='test'))
self.assertTrue(isinstance(fault.wrapped_exc,
webob.exc.HTTPBadRequest))
def test_fault_exception_status_int(self):
"""Ensure the status_int is set correctly on faults"""
# Ensure the status_int is set correctly on faults.
fault = faults.Fault(webob.exc.HTTPNotFound(explanation='test'))
self.assertEquals(fault.wrapped_exc.status_int, 404)

View File

@ -21,7 +21,7 @@ from nova.openstack.common import jsonutils
def webob_factory(url):
"""Factory for removing duplicate webob code from tests"""
"""Factory for removing duplicate webob code from tests."""
base_url = url

View File

@ -226,7 +226,7 @@ class CreateBackupTests(test.TestCase):
self.assertEqual(response.status_int, 413)
def test_create_backup_no_name(self):
"""Name is required for backups"""
# Name is required for backups.
body = {
'createBackup': {
'backup_type': 'daily',
@ -239,7 +239,7 @@ class CreateBackupTests(test.TestCase):
self.assertEqual(response.status_int, 400)
def test_create_backup_no_rotation(self):
"""Rotation is required for backup requests"""
# Rotation is required for backup requests.
body = {
'createBackup': {
'name': 'Backup 1',
@ -268,7 +268,7 @@ class CreateBackupTests(test.TestCase):
self.assertEqual(response.status_int, 400)
def test_create_backup_no_backup_type(self):
"""Backup Type (daily or weekly) is required for backup requests"""
# Backup Type (daily or weekly) is required for backup requests.
body = {
'createBackup': {
'name': 'Backup 1',
@ -288,7 +288,7 @@ class CreateBackupTests(test.TestCase):
self.assertEqual(response.status_int, 400)
def test_create_backup_rotation_is_zero(self):
"""The happy path for creating backups if rotation is zero"""
# The happy path for creating backups if rotation is zero.
body = {
'createBackup': {
'name': 'Backup 1',
@ -304,7 +304,7 @@ class CreateBackupTests(test.TestCase):
self.assertFalse('Location' in response.headers)
def test_create_backup_rotation_is_positive(self):
"""The happy path for creating backups if rotation is positive"""
# The happy path for creating backups if rotation is positive.
body = {
'createBackup': {
'name': 'Backup 1',

View File

@ -247,7 +247,7 @@ class DiskConfigTestCase(test.TestCase):
self.assertDiskConfig(server_dict, 'AUTO')
def test_update_server_invalid_disk_config(self):
"""Return BadRequest if user passes an invalid diskConfig value."""
# Return BadRequest if user passes an invalid diskConfig value.
req = fakes.HTTPRequest.blank(
'/fake/servers/%s' % MANUAL_INSTANCE_UUID)
req.method = 'PUT'

View File

@ -70,13 +70,13 @@ def stub_host_power_action(context, host, action):
def _create_instance(**kwargs):
"""Create a test instance"""
"""Create a test instance."""
ctxt = context.get_admin_context()
return db.instance_create(ctxt, _create_instance_dict(**kwargs))
def _create_instance_dict(**kwargs):
"""Create a dictionary for a test instance"""
"""Create a dictionary for a test instance."""
inst = {}
inst['image_ref'] = 'cedef40a-ed67-4d10-800e-17455edce175'
inst['reservation_id'] = 'r-fakeres'
@ -130,7 +130,7 @@ class HostTestCase(test.TestCase):
self.assertEqual(result[key], expected_value)
def test_list_hosts(self):
"""Verify that the compute hosts are returned."""
# Verify that the compute hosts are returned.
hosts = os_hosts._list_hosts(self.req)
self.assertEqual(hosts, HOST_LIST['hosts'])
@ -235,7 +235,7 @@ class HostTestCase(test.TestCase):
self.req.environ["nova.context"].is_admin = True
def test_show_host_not_exist(self):
"""A host given as an argument does not exists."""
# A host given as an argument does not exists.
self.req.environ["nova.context"].is_admin = True
dest = 'dummydest'
self.assertRaises(webob.exc.HTTPNotFound,
@ -259,7 +259,7 @@ class HostTestCase(test.TestCase):
return db.service_get(ctxt, s_ref['id'])
def test_show_no_project(self):
"""No instance are running on the given host."""
# No instance are running on the given host.
ctxt = context.get_admin_context()
s_ref = self._create_compute_service()
@ -275,7 +275,7 @@ class HostTestCase(test.TestCase):
db.service_destroy(ctxt, s_ref['id'])
def test_show_works_correctly(self):
"""show() works correctly as expected."""
# show() works correctly as expected.
ctxt = context.get_admin_context()
s_ref = self._create_compute_service()
i_ref1 = _create_instance(project_id='p-01', host=s_ref['host'])

View File

@ -99,7 +99,7 @@ class StubLateExtensionController(wsgi.Controller):
class StubExtensionManager(object):
"""Provides access to Tweedle Beetles"""
"""Provides access to Tweedle Beetles."""
name = "Tweedle Beetle Extension"
alias = "TWDLBETL"

View File

@ -350,7 +350,7 @@ class FlavorsTest(test.TestCase):
self.assertEqual(flavors, expected)
def test_get_flavor_list_filter_min_ram(self):
"""Flavor lists may be filtered by minRam."""
# Flavor lists may be filtered by minRam.
req = fakes.HTTPRequest.blank('/v2/fake/flavors?minRam=512')
flavor = self.controller.index(req)
expected = {
@ -374,13 +374,13 @@ class FlavorsTest(test.TestCase):
self.assertEqual(flavor, expected)
def test_get_flavor_list_filter_invalid_min_ram(self):
"""Ensure you cannot list flavors with invalid minRam param."""
# Ensure you cannot list flavors with invalid minRam param.
req = fakes.HTTPRequest.blank('/v2/fake/flavors?minRam=NaN')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index, req)
def test_get_flavor_list_filter_min_disk(self):
"""Flavor lists may be filtered by minDisk."""
# Flavor lists may be filtered by minDisk.
req = fakes.HTTPRequest.blank('/v2/fake/flavors?minDisk=20')
flavor = self.controller.index(req)
expected = {
@ -404,7 +404,7 @@ class FlavorsTest(test.TestCase):
self.assertEqual(flavor, expected)
def test_get_flavor_list_filter_invalid_min_disk(self):
"""Ensure you cannot list flavors with invalid minDisk param."""
# Ensure you cannot list flavors with invalid minDisk param.
req = fakes.HTTPRequest.blank('/v2/fake/flavors?minDisk=NaN')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index, req)

View File

@ -101,7 +101,7 @@ class LimitsControllerTest(BaseLimitTestSuite):
return request
def test_empty_index_json(self):
"""Test getting empty limit details in JSON."""
# Test getting empty limit details in JSON.
request = self._get_index_request()
response = request.get_response(self.controller)
expected = {
@ -114,7 +114,7 @@ class LimitsControllerTest(BaseLimitTestSuite):
self.assertEqual(expected, body)
def test_index_json(self):
"""Test getting limit details in JSON."""
# Test getting limit details in JSON.
request = self._get_index_request()
request = self._populate_limits(request)
self.absolute_limits = {
@ -189,7 +189,7 @@ class LimitsControllerTest(BaseLimitTestSuite):
return request
def test_index_diff_regex(self):
"""Test getting limit details in JSON."""
# Test getting limit details in JSON.
request = self._get_index_request()
request = self._populate_limits_diff_regex(request)
response = request.get_response(self.controller)
@ -308,17 +308,17 @@ class LimitMiddlewareTest(BaseLimitTestSuite):
self.__class__.__module__)
def test_limit_class(self):
"""Test that middleware selected correct limiter class."""
# Test that middleware selected correct limiter class.
assert isinstance(self.app._limiter, TestLimiter)
def test_good_request(self):
"""Test successful GET request through middleware."""
# Test successful GET request through middleware.
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
def test_limited_request_json(self):
"""Test a rate-limited (413) GET request through middleware."""
# Test a rate-limited (413) GET request through middleware.
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
@ -341,7 +341,7 @@ class LimitMiddlewareTest(BaseLimitTestSuite):
self.assertEqual(retryAfter, "60")
def test_limited_request_xml(self):
"""Test a rate-limited (413) response as XML"""
# Test a rate-limited (413) response as XML.
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
@ -371,7 +371,7 @@ class LimitTest(BaseLimitTestSuite):
"""
def test_GET_no_delay(self):
"""Test a limit handles 1 GET per second."""
# Test a limit handles 1 GET per second.
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
self.assertEqual(None, delay)
@ -379,7 +379,7 @@ class LimitTest(BaseLimitTestSuite):
self.assertEqual(0, limit.last_request)
def test_GET_delay(self):
"""Test two calls to 1 GET per second limit."""
# Test two calls to 1 GET per second limit.
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
self.assertEqual(None, delay)
@ -404,32 +404,32 @@ class ParseLimitsTest(BaseLimitTestSuite):
"""
def test_invalid(self):
"""Test that parse_limits() handles invalid input correctly."""
# Test that parse_limits() handles invalid input correctly.
self.assertRaises(ValueError, limits.Limiter.parse_limits,
';;;;;')
def test_bad_rule(self):
"""Test that parse_limits() handles bad rules correctly."""
# Test that parse_limits() handles bad rules correctly.
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'GET, *, .*, 20, minute')
def test_missing_arg(self):
"""Test that parse_limits() handles missing args correctly."""
# Test that parse_limits() handles missing args correctly.
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'(GET, *, .*, 20)')
def test_bad_value(self):
"""Test that parse_limits() handles bad values correctly."""
# Test that parse_limits() handles bad values correctly.
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'(GET, *, .*, foo, minute)')
def test_bad_unit(self):
"""Test that parse_limits() handles bad units correctly."""
# Test that parse_limits() handles bad units correctly.
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'(GET, *, .*, 20, lightyears)')
def test_multiple_rules(self):
"""Test that parse_limits() handles multiple rules correctly."""
# Test that parse_limits() handles multiple rules correctly.
try:
l = limits.Limiter.parse_limits('(get, *, .*, 20, minute);'
'(PUT, /foo*, /foo.*, 10, hour);'
@ -493,9 +493,7 @@ class LimiterTest(BaseLimitTestSuite):
self.assertEqual(delay, (None, None))
def test_no_delay_PUT(self):
"""
Simple test to ensure no delay on a single call for a known limit.
"""
# Simple test to ensure no delay on a single call for a known limit.
delay = self.limiter.check_for_delay("PUT", "/anything")
self.assertEqual(delay, (None, None))
@ -523,9 +521,7 @@ class LimiterTest(BaseLimitTestSuite):
self.failUnlessAlmostEqual(expected, results, 8)
def test_delay_GET(self):
"""
Ensure the 11th GET will result in NO delay.
"""
# Ensure the 11th GET will result in NO delay.
expected = [None] * 11
results = list(self._check(11, "GET", "/anything"))
@ -564,9 +560,7 @@ class LimiterTest(BaseLimitTestSuite):
self.assertEqual(expected, results)
def test_multiple_delays(self):
"""
Ensure multiple requests still get a delay.
"""
# Ensure multiple requests still get a delay.
expected = [None] * 10 + [6.0] * 10
results = list(self._check(20, "PUT", "/anything"))
self.assertEqual(expected, results)
@ -578,15 +572,11 @@ class LimiterTest(BaseLimitTestSuite):
self.assertEqual(expected, results)
def test_user_limit(self):
"""
Test user-specific limits.
"""
# Test user-specific limits.
self.assertEqual(self.limiter.levels['user3'], [])
def test_multiple_users(self):
"""
Tests involving multiple users.
"""
# Tests involving multiple users.
# User1
expected = [None] * 10 + [6.0] * 10
results = list(self._check(20, "PUT", "/anything", "user1"))
@ -652,7 +642,7 @@ class WsgiLimiterTest(BaseLimitTestSuite):
self.assertEqual(response.status_int, 204)
def test_invalid_methods(self):
"""Only POSTs should work."""
# Only POSTs should work.
requests = []
for method in ["GET", "PUT", "DELETE", "HEAD", "OPTIONS"]:
request = webob.Request.blank("/", method=method)
@ -794,12 +784,12 @@ class WsgiLimiterProxyTest(BaseLimitTestSuite):
self.proxy = limits.WsgiLimiterProxy("169.254.0.1:80")
def test_200(self):
"""Successful request test."""
# Successful request test.
delay = self.proxy.check_for_delay("GET", "/anything")
self.assertEqual(delay, (None, None))
def test_403(self):
"""Forbidden request test."""
# Forbidden request test.
delay = self.proxy.check_for_delay("GET", "/delayed")
self.assertEqual(delay, (None, None))

View File

@ -796,7 +796,7 @@ class ServerActionsControllerTest(test.TestCase):
delete_on_termination=False)
def __getattr__(self, name):
"""Properly delegate dotted lookups"""
"""Properly delegate dotted lookups."""
if name in self.__dict__['values']:
return self.values.get(name)
try:

View File

@ -855,7 +855,7 @@ class ServersControllerTest(test.TestCase):
self.assertEqual(servers[0]['id'], server_uuid)
def test_get_servers_invalid_status(self):
"""Test getting servers by invalid status"""
# Test getting servers by invalid status.
req = fakes.HTTPRequest.blank('/v2/fake/servers?status=baloney',
use_admin_context=False)
servers = self.controller.index(req)['servers']
@ -1686,7 +1686,7 @@ class ServerStatusTest(test.TestCase):
class ServersControllerCreateTest(test.TestCase):
def setUp(self):
"""Shared implementation for tests below that create instance"""
"""Shared implementation for tests below that create instance."""
super(ServersControllerCreateTest, self).setUp()
self.flags(verbose=True,
@ -1735,7 +1735,7 @@ class ServersControllerCreateTest(test.TestCase):
return self.instance_cache_by_id[instance_id]
def rpc_call_wrapper(context, topic, msg, timeout=None):
"""Stub out the scheduler creating the instance entry"""
"""Stub out the scheduler creating the instance entry."""
if (topic == CONF.scheduler_topic and
msg['method'] == 'run_instance'):
request_spec = msg['args']['request_spec']
@ -5264,7 +5264,7 @@ class ServersAllExtensionsTestCase(test.TestCase):
self.app = compute.APIRouter()
def test_create_missing_server(self):
"""Test create with malformed body"""
# Test create with malformed body.
def fake_create(*args, **kwargs):
raise test.TestingException("Should not reach the compute API.")
@ -5281,7 +5281,7 @@ class ServersAllExtensionsTestCase(test.TestCase):
self.assertEqual(422, res.status_int)
def test_update_missing_server(self):
"""Test create with malformed body"""
# Test create with malformed body.
def fake_update(*args, **kwargs):
raise test.TestingException("Should not reach the compute API.")

View File

@ -35,7 +35,7 @@ class UrlmapTest(test.TestCase):
nova.tests.image.fake.FakeImageService_reset()
def test_path_version_v1_1(self):
"""Test URL path specifying v1.1 returns v2 content."""
# Test URL path specifying v1.1 returns v2 content.
req = webob.Request.blank('/v1.1/')
req.accept = "application/json"
res = req.get_response(fakes.wsgi_app(init_only=('versions',)))
@ -45,7 +45,7 @@ class UrlmapTest(test.TestCase):
self.assertEqual(body['version']['id'], 'v2.0')
def test_content_type_version_v1_1(self):
"""Test Content-Type specifying v1.1 returns v2 content."""
# Test Content-Type specifying v1.1 returns v2 content.
req = webob.Request.blank('/')
req.content_type = "application/json;version=1.1"
req.accept = "application/json"
@ -56,7 +56,7 @@ class UrlmapTest(test.TestCase):
self.assertEqual(body['version']['id'], 'v2.0')
def test_accept_version_v1_1(self):
"""Test Accept header specifying v1.1 returns v2 content."""
# Test Accept header specifying v1.1 returns v2 content.
req = webob.Request.blank('/')
req.accept = "application/json;version=1.1"
res = req.get_response(fakes.wsgi_app(init_only=('versions',)))
@ -66,7 +66,7 @@ class UrlmapTest(test.TestCase):
self.assertEqual(body['version']['id'], 'v2.0')
def test_path_version_v2(self):
"""Test URL path specifying v2 returns v2 content."""
# Test URL path specifying v2 returns v2 content.
req = webob.Request.blank('/v2/')
req.accept = "application/json"
res = req.get_response(fakes.wsgi_app(init_only=('versions',)))
@ -76,7 +76,7 @@ class UrlmapTest(test.TestCase):
self.assertEqual(body['version']['id'], 'v2.0')
def test_content_type_version_v2(self):
"""Test Content-Type specifying v2 returns v2 content."""
# Test Content-Type specifying v2 returns v2 content.
req = webob.Request.blank('/')
req.content_type = "application/json;version=2"
req.accept = "application/json"
@ -87,7 +87,7 @@ class UrlmapTest(test.TestCase):
self.assertEqual(body['version']['id'], 'v2.0')
def test_accept_version_v2(self):
"""Test Accept header specifying v2 returns v2 content."""
# Test Accept header specifying v2 returns v2 content.
req = webob.Request.blank('/')
req.accept = "application/json;version=2"
res = req.get_response(fakes.wsgi_app(init_only=('versions',)))
@ -97,7 +97,7 @@ class UrlmapTest(test.TestCase):
self.assertEqual(body['version']['id'], 'v2.0')
def test_path_content_type(self):
"""Test URL path specifying JSON returns JSON content."""
# Test URL path specifying JSON returns JSON content.
url = '/v2/fake/images/cedef40a-ed67-4d10-800e-17455edce175.json'
req = webob.Request.blank(url)
req.accept = "application/xml"
@ -109,7 +109,7 @@ class UrlmapTest(test.TestCase):
'cedef40a-ed67-4d10-800e-17455edce175')
def test_accept_content_type(self):
"""Test Accept header specifying JSON returns JSON content."""
# Test Accept header specifying JSON returns JSON content.
url = '/v2/fake/images/cedef40a-ed67-4d10-800e-17455edce175'
req = webob.Request.blank(url)
req.accept = "application/xml;q=0.8, application/json"

View File

@ -43,7 +43,7 @@ class LimiterTest(test.TestCase):
"""
def setUp(self):
"""Run before each test. """
"""Run before each test."""
super(LimiterTest, self).setUp()
self.tiny = range(1)
self.small = range(10)
@ -51,7 +51,7 @@ class LimiterTest(test.TestCase):
self.large = range(10000)
def test_limiter_offset_zero(self):
"""Test offset key works with 0. """
# Test offset key works with 0.
req = webob.Request.blank('/?offset=0')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -59,7 +59,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_offset_medium(self):
"""Test offset key works with a medium sized number. """
# Test offset key works with a medium sized number.
req = webob.Request.blank('/?offset=10')
self.assertEqual(common.limited(self.tiny, req), [])
self.assertEqual(common.limited(self.small, req), self.small[10:])
@ -67,7 +67,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[10:1010])
def test_limiter_offset_over_max(self):
"""Test offset key works with a number over 1000 (max_limit). """
# Test offset key works with a number over 1000 (max_limit).
req = webob.Request.blank('/?offset=1001')
self.assertEqual(common.limited(self.tiny, req), [])
self.assertEqual(common.limited(self.small, req), [])
@ -76,19 +76,19 @@ class LimiterTest(test.TestCase):
common.limited(self.large, req), self.large[1001:2001])
def test_limiter_offset_blank(self):
"""Test offset key works with a blank offset. """
# Test offset key works with a blank offset.
req = webob.Request.blank('/?offset=')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_offset_bad(self):
"""Test offset key works with a BAD offset. """
# Test offset key works with a BAD offset.
req = webob.Request.blank(u'/?offset=\u0020aa')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_nothing(self):
"""Test request with no offset or limit """
# Test request with no offset or limit.
req = webob.Request.blank('/')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -96,7 +96,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_limit_zero(self):
"""Test limit of zero. """
# Test limit of zero.
req = webob.Request.blank('/?limit=0')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -104,7 +104,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_limit_medium(self):
"""Test limit of 10. """
# Test limit of 10.
req = webob.Request.blank('/?limit=10')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -112,7 +112,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:10])
def test_limiter_limit_over_max(self):
"""Test limit of 3000. """
# Test limit of 3000.
req = webob.Request.blank('/?limit=3000')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -120,7 +120,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_limit_and_offset(self):
"""Test request with both limit and offset. """
# Test request with both limit and offset.
items = range(2000)
req = webob.Request.blank('/?offset=1&limit=3')
self.assertEqual(common.limited(items, req), items[1:4])
@ -132,7 +132,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(items, req), [])
def test_limiter_custom_max_limit(self):
"""Test a max_limit other than 1000. """
# Test a max_limit other than 1000.
items = range(2000)
req = webob.Request.blank('/?offset=1&limit=3')
self.assertEqual(
@ -147,13 +147,13 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(items, req, max_limit=2000), [])
def test_limiter_negative_limit(self):
"""Test a negative limit. """
# Test a negative limit.
req = webob.Request.blank('/?limit=-3000')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_negative_offset(self):
"""Test a negative offset. """
# Test a negative offset.
req = webob.Request.blank('/?offset=-30')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
@ -167,30 +167,30 @@ class PaginationParamsTest(test.TestCase):
"""
def test_no_params(self):
"""Test no params. """
# Test no params.
req = webob.Request.blank('/')
self.assertEqual(common.get_pagination_params(req), {})
def test_valid_marker(self):
"""Test valid marker param. """
# Test valid marker param.
req = webob.Request.blank(
'/?marker=263abb28-1de6-412f-b00b-f0ee0c4333c2')
self.assertEqual(common.get_pagination_params(req),
{'marker': '263abb28-1de6-412f-b00b-f0ee0c4333c2'})
def test_valid_limit(self):
"""Test valid limit param. """
# Test valid limit param.
req = webob.Request.blank('/?limit=10')
self.assertEqual(common.get_pagination_params(req), {'limit': 10})
def test_invalid_limit(self):
"""Test invalid limit param. """
# Test invalid limit param.
req = webob.Request.blank('/?limit=-2')
self.assertRaises(
webob.exc.HTTPBadRequest, common.get_pagination_params, req)
def test_valid_limit_and_marker(self):
"""Test valid limit and marker parameters. """
# Test valid limit and marker parameters.
marker = '263abb28-1de6-412f-b00b-f0ee0c4333c2'
req = webob.Request.blank('/?limit=20&marker=%s' % marker)
self.assertEqual(common.get_pagination_params(req),

View File

@ -38,7 +38,7 @@ class TestFaults(test.TestCase):
return xml_string
def test_400_fault_json(self):
"""Test fault serialized to JSON via file-extension and/or header."""
# Test fault serialized to JSON via file-extension and/or header.
requests = [
webob.Request.blank('/.json'),
webob.Request.blank('/', headers={"Accept": "application/json"}),
@ -60,7 +60,7 @@ class TestFaults(test.TestCase):
self.assertEqual(expected, actual)
def test_413_fault_json(self):
"""Test fault serialized to JSON via file-extension and/or header."""
# Test fault serialized to JSON via file-extension and/or header.
requests = [
webob.Request.blank('/.json'),
webob.Request.blank('/', headers={"Accept": "application/json"}),
@ -85,7 +85,7 @@ class TestFaults(test.TestCase):
self.assertEqual(expected, actual)
def test_raise(self):
"""Ensure the ability to raise :class:`Fault` in WSGI-ified methods."""
# Ensure the ability to raise :class:`Fault` in WSGI-ified methods.
@webob.dec.wsgify
def raiser(req):
raise wsgi.Fault(webob.exc.HTTPNotFound(explanation='whut?'))
@ -97,7 +97,7 @@ class TestFaults(test.TestCase):
self.assertTrue('whut?' in resp.body)
def test_raise_403(self):
"""Ensure the ability to raise :class:`Fault` in WSGI-ified methods."""
# Ensure the ability to raise :class:`Fault` in WSGI-ified methods.
@webob.dec.wsgify
def raiser(req):
raise wsgi.Fault(webob.exc.HTTPForbidden(explanation='whut?'))
@ -110,12 +110,12 @@ class TestFaults(test.TestCase):
self.assertTrue('forbidden' in resp.body)
def test_fault_has_status_int(self):
"""Ensure the status_int is set correctly on faults"""
# Ensure the status_int is set correctly on faults.
fault = wsgi.Fault(webob.exc.HTTPBadRequest(explanation='what?'))
self.assertEqual(fault.status_int, 400)
def test_xml_serializer(self):
"""Ensure that a v1.1 request responds with a v1.1 xmlns"""
# Ensure that a v1.1 request responds with a v1.1 xmlns.
request = webob.Request.blank('/v1.1',
headers={"Accept": "application/xml"})

View File

@ -196,7 +196,7 @@ class XMLDeserializerTest(test.TestCase):
self.assertEqual(deserializer.deserialize(xml), as_dict)
def test_xml_empty(self):
xml = """<a></a>"""
xml = '<a></a>'
as_dict = {"body": {"a": {}}}
deserializer = wsgi.XMLDeserializer()
self.assertEqual(deserializer.deserialize(xml), as_dict)
@ -753,7 +753,7 @@ class ResourceTest(test.TestCase):
self.assertEqual(response, 'foo')
def test_resource_exception_handler_type_error(self):
"""A TypeError should be translated to a Fault/HTTP 400"""
# A TypeError should be translated to a Fault/HTTP 400.
def foo(a,):
return a

View File

@ -93,7 +93,7 @@ class TestKeystoneMiddlewareRoles(test.TestCase):
self.roles = "pawn, knight, rook"
def test_roles(self):
"""Test that the newer style role header takes precedence"""
# Test that the newer style role header takes precedence.
self.request.headers['X_ROLES'] = 'pawn,knight,rook'
self.request.headers['X_ROLE'] = 'bad'
@ -106,7 +106,7 @@ class TestKeystoneMiddlewareRoles(test.TestCase):
self.assertEqual(response.status, '200 No Roles')
def test_deprecated_role(self):
"""Test fallback to older role header"""
# Test fallback to older role header.
self.request.headers['X_ROLE'] = 'pawn,knight,rook'
response = self.request.get_response(self.middleware)
@ -118,7 +118,7 @@ class TestKeystoneMiddlewareRoles(test.TestCase):
self.assertEqual(response.status, '200 No Roles')
def test_no_role_headers(self):
"""Test with no role headers set"""
# Test with no role headers set.
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 No Roles')

View File

@ -26,7 +26,7 @@ from nova.tests.cells import fakes
class CellsManagerClassTestCase(test.TestCase):
"""Test case for CellsManager class"""
"""Test case for CellsManager class."""
def setUp(self):
super(CellsManagerClassTestCase, self).setUp()

View File

@ -31,7 +31,7 @@ CONF.import_opt('scheduler_retries', 'nova.cells.scheduler', group='cells')
class CellsSchedulerTestCase(test.TestCase):
"""Test case for CellsScheduler class"""
"""Test case for CellsScheduler class."""
def setUp(self):
super(CellsSchedulerTestCase, self).setUp()

View File

@ -19,7 +19,7 @@ from nova.compute import resource_tracker
class FakeResourceTracker(resource_tracker.ResourceTracker):
"""Version without a DB requirement"""
"""Version without a DB requirement."""
def _create(self, context, values):
self.compute_node = values

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for resource tracker claims"""
"""Tests for resource tracker claims."""
import uuid

File diff suppressed because it is too large Load Diff

View File

@ -235,7 +235,7 @@ class UsageInfoTestCase(test.TestCase):
fake_network.set_stub_network_methods(self.stubs)
def _create_instance(self, params={}):
"""Create a test instance"""
"""Create a test instance."""
inst = {}
inst['image_ref'] = 1
inst['reservation_id'] = 'r-fakeres'
@ -251,7 +251,7 @@ class UsageInfoTestCase(test.TestCase):
return db.instance_create(self.context, inst)['id']
def test_notify_usage_exists(self):
"""Ensure 'exists' notification generates appropriate usage data."""
# Ensure 'exists' notification generates appropriate usage data.
instance_id = self._create_instance()
instance = db.instance_get(self.context, instance_id)
# Set some system metadata
@ -286,7 +286,7 @@ class UsageInfoTestCase(test.TestCase):
self.compute.terminate_instance(self.context, instance)
def test_notify_usage_exists_deleted_instance(self):
"""Ensure 'exists' notification generates appropriate usage data."""
# Ensure 'exists' notification generates appropriate usage data.
instance_id = self._create_instance()
instance = db.instance_get(self.context, instance_id)
# Set some system metadata
@ -321,7 +321,7 @@ class UsageInfoTestCase(test.TestCase):
self.assertEquals(payload['image_ref_url'], image_ref_url)
def test_notify_usage_exists_instance_not_found(self):
"""Ensure 'exists' notification generates appropriate usage data."""
# Ensure 'exists' notification generates appropriate usage data.
instance_id = self._create_instance()
instance = db.instance_get(self.context, instance_id)
self.compute.terminate_instance(self.context, instance)

View File

@ -14,7 +14,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for compute service with multiple compute nodes"""
"""Tests for compute service with multiple compute nodes."""
from nova import context
from nova import exception

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for compute resource tracking"""
"""Tests for compute resource tracking."""
import uuid
@ -38,7 +38,7 @@ FAKE_VIRT_VCPUS = 1
class UnsupportedVirtDriver(driver.ComputeDriver):
"""Pretend version of a lame virt driver"""
"""Pretend version of a lame virt driver."""
def __init__(self):
super(UnsupportedVirtDriver, self).__init__(None)
@ -363,7 +363,7 @@ class BaseTrackerTestCase(BaseTestCase):
def _limits(self, memory_mb=FAKE_VIRT_MEMORY_MB,
disk_gb=FAKE_VIRT_LOCAL_GB, vcpus=FAKE_VIRT_VCPUS):
"""Create limits dictionary used for oversubscribing resources"""
"""Create limits dictionary used for oversubscribing resources."""
return {
'memory_mb': memory_mb,

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for compute node stats"""
"""Tests for compute node stats."""
from nova.compute import stats
from nova.compute import task_states

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for the conductor service"""
"""Tests for the conductor service."""
import mox
@ -357,7 +357,7 @@ class _BaseTestCase(object):
class ConductorTestCase(_BaseTestCase, test.TestCase):
"""Conductor Manager Tests"""
"""Conductor Manager Tests."""
def setUp(self):
super(ConductorTestCase, self).setUp()
self.conductor = conductor_manager.ConductorManager()
@ -450,7 +450,7 @@ class ConductorTestCase(_BaseTestCase, test.TestCase):
class ConductorRPCAPITestCase(_BaseTestCase, test.TestCase):
"""Conductor RPC API Tests"""
"""Conductor RPC API Tests."""
def setUp(self):
super(ConductorRPCAPITestCase, self).setUp()
self.conductor_service = self.start_service(
@ -541,7 +541,7 @@ class ConductorRPCAPITestCase(_BaseTestCase, test.TestCase):
class ConductorAPITestCase(_BaseTestCase, test.TestCase):
"""Conductor API Tests"""
"""Conductor API Tests."""
def setUp(self):
super(ConductorAPITestCase, self).setUp()
self.conductor_service = self.start_service(
@ -641,7 +641,7 @@ class ConductorAPITestCase(_BaseTestCase, test.TestCase):
class ConductorLocalAPITestCase(ConductorAPITestCase):
"""Conductor LocalAPI Tests"""
"""Conductor LocalAPI Tests."""
def setUp(self):
super(ConductorLocalAPITestCase, self).setUp()
self.conductor = conductor_api.LocalAPI()

View File

@ -34,7 +34,7 @@ CONF.import_opt('console_driver', 'nova.console.manager')
class ConsoleTestCase(test.TestCase):
"""Test case for console proxy manager"""
"""Test case for console proxy manager."""
def setUp(self):
super(ConsoleTestCase, self).setUp()
self.flags(console_driver='nova.console.fake.FakeConsoleProxy',
@ -46,7 +46,7 @@ class ConsoleTestCase(test.TestCase):
self.host = 'test_compute_host'
def _create_instance(self):
"""Create a test instance"""
"""Create a test instance."""
inst = {}
#inst['host'] = self.host
#inst['name'] = 'instance-1234'
@ -123,7 +123,7 @@ class ConsoleTestCase(test.TestCase):
class ConsoleAPITestCase(test.TestCase):
"""Test case for console API"""
"""Test case for console API."""
def setUp(self):
super(ConsoleAPITestCase, self).setUp()

View File

@ -38,7 +38,7 @@ class ConsoleauthTestCase(test.TestCase):
self.context = context.get_admin_context()
def test_tokens_expire(self):
"""Test that tokens expire correctly."""
# Test that tokens expire correctly.
self.useFixture(test.TimeOverride())
token = 'mytok'
self.flags(console_token_ttl=1)

View File

@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Stubouts, mocks and fixtures for the test suite"""
"""Stubouts, mocks and fixtures for the test suite."""
from nova import db
from nova import exception

View File

@ -67,7 +67,7 @@ class FakeVIFDriver(object):
class FakeModel(dict):
"""Represent a model from the db"""
"""Represent a model from the db."""
def __init__(self, *args, **kwargs):
self.update(kwargs)

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of a fake volume API"""
"""Implementation of a fake volume API."""
import uuid

View File

@ -87,7 +87,7 @@ class HyperVUtils(object):
% (path, ret_val))
def _check_job_status(self, jobpath):
"""Poll WMI job state for completion"""
"""Poll WMI job state for completion."""
job_wmi_path = jobpath.replace('\\', '/')
job = wmi.WMI(moniker=job_wmi_path)

View File

@ -49,7 +49,7 @@ def serialize_obj(obj):
def serialize_args(*args, **kwargs):
"""Workaround for float string conversion issues in Python 2.6"""
"""Workaround for float string conversion issues in Python 2.6."""
return serialize_obj((args, kwargs))

View File

@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of a fake image service"""
"""Implementation of a fake image service."""
import copy
import datetime

View File

@ -35,7 +35,7 @@ CONF = cfg.CONF
class NullWriter(object):
"""Used to test ImageService.get which takes a writer object"""
"""Used to test ImageService.get which takes a writer object."""
def write(self, *arg, **kwargs):
pass
@ -134,7 +134,7 @@ class TestGlanceImageService(test.TestCase):
deleted_at=self.NOW_GLANCE_FORMAT)
def test_create_with_instance_id(self):
"""Ensure instance_id is persisted as an image-property"""
# Ensure instance_id is persisted as an image-property.
fixture = {'name': 'test image',
'is_public': False,
'properties': {'instance_id': '42', 'user_id': 'fake'}}

View File

@ -330,7 +330,7 @@ class ApiSampleTestBase(integrated_helpers._IntegratedTestBase):
class ApiSamplesTrap(ApiSampleTestBase):
"""Make sure extensions don't get added without tests"""
"""Make sure extensions don't get added without tests."""
all_extensions = True
@ -488,12 +488,12 @@ class ServersMetadataJsonTest(ServersSampleBase):
return subs
def test_metadata_put_all(self):
"""Test setting all metadata for a server"""
# Test setting all metadata for a server.
subs = {'value': 'Foo Value'}
return self._create_and_set(subs)
def test_metadata_post_all(self):
"""Test updating all metadata for a server"""
# Test updating all metadata for a server.
subs = {'value': 'Foo Value'}
uuid = self._create_and_set(subs)
subs['value'] = 'Bar Value'
@ -504,7 +504,7 @@ class ServersMetadataJsonTest(ServersSampleBase):
self._verify_response('server-metadata-all-resp', subs, response)
def test_metadata_get_all(self):
"""Test getting all metadata for a server"""
# Test getting all metadata for a server.
subs = {'value': 'Foo Value'}
uuid = self._create_and_set(subs)
response = self._do_get('servers/%s/metadata' % uuid)
@ -512,7 +512,7 @@ class ServersMetadataJsonTest(ServersSampleBase):
self._verify_response('server-metadata-all-resp', subs, response)
def test_metadata_put(self):
"""Test putting an individual metadata item for a server"""
# Test putting an individual metadata item for a server.
subs = {'value': 'Foo Value'}
uuid = self._create_and_set(subs)
subs['value'] = 'Bar Value'
@ -523,7 +523,7 @@ class ServersMetadataJsonTest(ServersSampleBase):
return self._verify_response('server-metadata-resp', subs, response)
def test_metadata_get(self):
"""Test getting an individual metadata item for a server"""
# Test getting an individual metadata item for a server.
subs = {'value': 'Foo Value'}
uuid = self._create_and_set(subs)
response = self._do_get('servers/%s/metadata/foo' % uuid)
@ -531,7 +531,7 @@ class ServersMetadataJsonTest(ServersSampleBase):
return self._verify_response('server-metadata-resp', subs, response)
def test_metadata_delete(self):
"""Test deleting an individual metadata item for a server"""
# Test deleting an individual metadata item for a server.
subs = {'value': 'Foo Value'}
uuid = self._create_and_set(subs)
response = self._do_delete('servers/%s/metadata/foo' % uuid)
@ -545,14 +545,14 @@ class ServersMetadataXmlTest(ServersMetadataJsonTest):
class ServersIpsJsonTest(ServersSampleBase):
def test_get(self):
"""Test getting a server's IP information"""
# Test getting a server's IP information.
uuid = self._post_server()
response = self._do_get('servers/%s/ips' % uuid)
subs = self._get_regexes()
return self._verify_response('server-ips-resp', subs, response)
def test_get_by_network(self):
"""Test getting a server's IP information by network id"""
# Test getting a server's IP information by network id.
uuid = self._post_server()
response = self._do_get('servers/%s/ips/private' % uuid)
subs = self._get_regexes()
@ -649,13 +649,13 @@ class FlavorsSampleAllExtensionXmlTest(FlavorsSampleXmlTest):
class ImagesSampleJsonTest(ApiSampleTestBase):
def test_images_list(self):
"""Get api sample of images get list request"""
# Get api sample of images get list request.
response = self._do_get('images')
subs = self._get_regexes()
return self._verify_response('images-list-get-resp', subs, response)
def test_image_get(self):
"""Get api sample of one single image details request"""
# Get api sample of one single image details request.
image_id = fake.get_valid_image_id()
response = self._do_get('images/%s' % image_id)
self.assertEqual(response.status, 200)
@ -664,13 +664,13 @@ class ImagesSampleJsonTest(ApiSampleTestBase):
return self._verify_response('image-get-resp', subs, response)
def test_images_details(self):
"""Get api sample of all images details request"""
# Get api sample of all images details request.
response = self._do_get('images/detail')
subs = self._get_regexes()
return self._verify_response('images-details-get-resp', subs, response)
def test_image_metadata_get(self):
"""Get api sample of a image metadata request"""
# Get api sample of a image metadata request.
image_id = fake.get_valid_image_id()
response = self._do_get('images/%s/metadata' % image_id)
subs = self._get_regexes()
@ -678,7 +678,7 @@ class ImagesSampleJsonTest(ApiSampleTestBase):
return self._verify_response('image-metadata-get-resp', subs, response)
def test_image_metadata_post(self):
"""Get api sample to update metadata of an image metadata request"""
# Get api sample to update metadata of an image metadata request.
image_id = fake.get_valid_image_id()
response = self._do_post(
'images/%s/metadata' % image_id,
@ -689,7 +689,7 @@ class ImagesSampleJsonTest(ApiSampleTestBase):
subs, response)
def test_image_metadata_put(self):
"""Get api sample of image metadata put request"""
# Get api sample of image metadata put request.
image_id = fake.get_valid_image_id()
response = self._do_put('images/%s/metadata' % image_id,
'image-metadata-put-req', {})
@ -699,7 +699,7 @@ class ImagesSampleJsonTest(ApiSampleTestBase):
subs, response)
def test_image_meta_key_get(self):
"""Get api sample of a image metadata key request"""
# Get api sample of a image metadata key request.
image_id = fake.get_valid_image_id()
key = "kernel_id"
response = self._do_get('images/%s/metadata/%s' % (image_id, key))
@ -707,7 +707,7 @@ class ImagesSampleJsonTest(ApiSampleTestBase):
return self._verify_response('image-meta-key-get', subs, response)
def test_image_meta_key_put(self):
"""Get api sample of image metadata key put request"""
# Get api sample of image metadata key put request.
image_id = fake.get_valid_image_id()
key = "auto_disk_config"
response = self._do_put('images/%s/metadata/%s' % (image_id, key),
@ -752,21 +752,21 @@ class CoverageExtJsonTests(ApiSampleTestBase):
self.stubs.Set(coverage, 'xml_report', _fake_xml_report)
def test_start_coverage(self):
"""Start coverage data collection"""
# Start coverage data collection.
subs = {}
response = self._do_post('os-coverage/action',
'coverage-start-post-req', subs)
self.assertEqual(response.status, 200)
def test_start_coverage_combine(self):
"""Start coverage data collection"""
# Start coverage data collection.
subs = {}
response = self._do_post('os-coverage/action',
'coverage-start-combine-post-req', subs)
self.assertEqual(response.status, 200)
def test_stop_coverage(self):
"""Stop coverage data collection"""
# Stop coverage data collection.
subs = {
'path': '/.*',
}
@ -778,7 +778,7 @@ class CoverageExtJsonTests(ApiSampleTestBase):
subs, response)
def test_report_coverage(self):
"""Generate a coverage report"""
# Generate a coverage report.
subs = {
'filename': 'report',
'path': '/.*/report',
@ -1044,14 +1044,14 @@ class SecurityGroupsSampleJsonTest(ServersSampleBase):
self._verify_response('security-groups-create-resp', subs, response)
def test_security_groups_list(self):
"""Get api sample of security groups get list request"""
# Get api sample of security groups get list request.
response = self._do_get('os-security-groups')
subs = self._get_regexes()
return self._verify_response('security-groups-list-get-resp',
subs, response)
def test_security_groups_get(self):
"""Get api sample of security groups get request"""
# Get api sample of security groups get request.
security_group_id = '1'
response = self._do_get('os-security-groups/%s' % security_group_id)
subs = self._get_regexes()
@ -1059,7 +1059,7 @@ class SecurityGroupsSampleJsonTest(ServersSampleBase):
subs, response)
def test_security_groups_list_server(self):
"""Get api sample of security groups for a specific server."""
# Get api sample of security groups for a specific server.
uuid = self._post_server()
response = self._do_get('servers/%s/os-security-groups' % uuid)
subs = self._get_regexes()
@ -1076,7 +1076,7 @@ class SchedulerHintsJsonTest(ApiSampleTestBase):
"Scheduler_hints")
def test_scheduler_hints_post(self):
"""Get api sample of scheduler hint post request"""
# Get api sample of scheduler hint post request.
hints = {'image_id': fake.get_valid_image_id(),
'image_near': str(uuid_lib.uuid4())
}
@ -1321,7 +1321,7 @@ class KeyPairsSampleJsonTest(ApiSampleTestBase):
return subs
def test_keypairs_post(self, public_key=None):
"""Get api sample of key pairs post request"""
"""Get api sample of key pairs post request."""
key_name = 'keypair-' + str(uuid_lib.uuid4())
response = self._do_post('os-keypairs', 'keypairs-post-req',
{'keypair_name': key_name})
@ -1335,7 +1335,7 @@ class KeyPairsSampleJsonTest(ApiSampleTestBase):
return key_name
def test_keypairs_import_key_post(self):
"""Get api sample of key pairs post to import user's key"""
# Get api sample of key pairs post to import user's key.
key_name = 'keypair-' + str(uuid_lib.uuid4())
subs = {
'keypair_name': key_name,
@ -1353,7 +1353,7 @@ class KeyPairsSampleJsonTest(ApiSampleTestBase):
self._verify_response('keypairs-import-post-resp', subs, response)
def test_keypairs_get(self):
"""Get api sample of key pairs get request"""
# Get api sample of key pairs get request.
key_name = self.test_keypairs_post()
response = self._do_get('os-keypairs')
subs = self._get_regexes()
@ -1443,11 +1443,11 @@ class CloudPipeSampleJsonTest(ApiSampleTestBase):
super(CloudPipeSampleJsonTest, self).setUp()
def get_user_data(self, project_id):
"""Stub method to generate user data for cloudpipe tests"""
"""Stub method to generate user data for cloudpipe tests."""
return "VVNFUiBEQVRB\n"
def network_api_get(self, context, network_uuid):
"""Stub to get a valid network and its information"""
"""Stub to get a valid network and its information."""
return {'vpn_public_address': '127.0.0.1',
'vpn_public_port': 22}
@ -1459,7 +1459,7 @@ class CloudPipeSampleJsonTest(ApiSampleTestBase):
return subs
def test_cloud_pipe_create(self):
"""Get api samples of cloud pipe extension creation"""
# Get api samples of cloud pipe extension creation.
self.flags(vpn_image_id=fake.get_valid_image_id())
project = {'project_id': 'cloudpipe-' + str(uuid_lib.uuid4())}
response = self._do_post('os-cloudpipe', 'cloud-pipe-create-req',
@ -1472,7 +1472,7 @@ class CloudPipeSampleJsonTest(ApiSampleTestBase):
return project
def test_cloud_pipe_list(self):
"""Get api samples of cloud pipe extension get request"""
# Get api samples of cloud pipe extension get request.
project = self.test_cloud_pipe_create()
response = self._do_get('os-cloudpipe')
self.assertEqual(response.status, 200)
@ -1565,7 +1565,7 @@ class AgentsJsonTest(ApiSampleTestBase):
fake_agent_build_destroy)
def test_agent_create(self):
"""Creates a new agent build."""
# Creates a new agent build.
project = {'url': 'xxxxxxxxxxxx',
'hypervisor': 'hypervisor',
'architecture': 'x86',
@ -1581,7 +1581,7 @@ class AgentsJsonTest(ApiSampleTestBase):
return project
def test_agent_list(self):
"""Return a list of all agent builds."""
# Return a list of all agent builds.
response = self._do_get('os-agents')
self.assertEqual(response.status, 200)
project = {'url': 'xxxxxxxxxxxx',
@ -1595,7 +1595,7 @@ class AgentsJsonTest(ApiSampleTestBase):
return self._verify_response('agents-get-resp', project, response)
def test_agent_update(self):
"""Update an existing agent build."""
# Update an existing agent build.
agent_id = 1
subs = {'version': '7.0',
'url': 'xxx://xxxx/xxx/xxx',
@ -1607,7 +1607,7 @@ class AgentsJsonTest(ApiSampleTestBase):
return self._verify_response('agent-update-put-resp', subs, response)
def test_agent_delete(self):
"""Deletes an existing agent build."""
# Deletes an existing agent build.
agent_id = 1
response = self._do_delete('os-agents/%s' % agent_id)
self.assertEqual(response.status, 200)
@ -1679,7 +1679,7 @@ class FixedIpJsonTest(ApiSampleTestBase):
self.stubs.Set(db, "fixed_ip_update", fake_fixed_ip_update)
def test_fixed_ip_reserve(self):
"""Reserve a Fixed IP"""
# Reserve a Fixed IP.
project = {'reserve': None}
response = self._do_post('os-fixed-ips/192.168.1.1/action',
'fixedip-post-req',
@ -1687,7 +1687,7 @@ class FixedIpJsonTest(ApiSampleTestBase):
self.assertEqual(response.status, 202)
def test_get_fixed_ip(self):
"""Return data about the given fixed ip."""
# Return data about the given fixed ip.
response = self._do_get('os-fixed-ips/192.168.1.1')
self.assertEqual(response.status, 200)
project = {'cidr': '192.168.1.0/24',
@ -1802,7 +1802,7 @@ class UsedLimitsSamplesJsonTest(ApiSampleTestBase):
"Used_limits")
def test_get_used_limits(self):
"""Get api sample to used limits"""
# Get api sample to used limits.
response = self._do_get('limits')
self.assertEqual(response.status, 200)
subs = self._get_regexes()
@ -1854,7 +1854,7 @@ class SimpleTenantUsageSampleJsonTest(ServersSampleBase):
"Simple_tenant_usage")
def setUp(self):
"""setUp method for simple tenant usage"""
"""setUp method for simple tenant usage."""
super(SimpleTenantUsageSampleJsonTest, self).setUp()
self._post_server()
timeutils.set_time_override(timeutils.utcnow() +
@ -1865,12 +1865,12 @@ class SimpleTenantUsageSampleJsonTest(ServersSampleBase):
}
def tearDown(self):
"""tearDown method for simple tenant usage"""
"""tearDown method for simple tenant usage."""
super(SimpleTenantUsageSampleJsonTest, self).tearDown()
timeutils.clear_time_override()
def test_get_tenants_usage(self):
"""Get api sample to get all tenants usage request"""
# Get api sample to get all tenants usage request.
response = self._do_get('os-simple-tenant-usage?%s' % (
urllib.urlencode(self.query)))
self.assertEqual(response.status, 200)
@ -1878,7 +1878,7 @@ class SimpleTenantUsageSampleJsonTest(ServersSampleBase):
self._verify_response('simple-tenant-usage-get', subs, response)
def test_get_tenant_usage_details(self):
"""Get api sample to get specific tenant usage request"""
# Get api sample to get specific tenant usage request.
tenant_id = 'openstack'
response = self._do_get('os-simple-tenant-usage/%s?%s' % (tenant_id,
urllib.urlencode(self.query)))
@ -1941,64 +1941,64 @@ class AdminActionsSamplesJsonTest(ServersSampleBase):
self.uuid = self._post_server()
def test_post_pause(self):
"""Get api samples to pause server request"""
# Get api samples to pause server request.
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-pause', {})
self.assertEqual(response.status, 202)
def test_post_unpause(self):
"""Get api samples to unpause server request"""
# Get api samples to unpause server request.
self.test_post_pause()
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-unpause', {})
self.assertEqual(response.status, 202)
def test_post_suspend(self):
"""Get api samples to suspend server request"""
# Get api samples to suspend server request.
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-suspend', {})
self.assertEqual(response.status, 202)
def test_post_resume(self):
"""Get api samples to server resume request"""
# Get api samples to server resume request.
self.test_post_suspend()
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-resume', {})
self.assertEqual(response.status, 202)
def test_post_migrate(self):
"""Get api samples to migrate server request"""
# Get api samples to migrate server request.
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-migrate', {})
self.assertEqual(response.status, 202)
def test_post_reset_network(self):
"""Get api samples to reset server network request"""
# Get api samples to reset server network request.
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-reset-network', {})
self.assertEqual(response.status, 202)
def test_post_inject_network_info(self):
"""Get api samples to inject network info request"""
# Get api samples to inject network info request.
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-inject-network-info', {})
self.assertEqual(response.status, 202)
def test_post_lock_server(self):
"""Get api samples to lock server request"""
# Get api samples to lock server request.
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-lock-server', {})
self.assertEqual(response.status, 202)
def test_post_unlock_server(self):
"""Get api samples to unlock server request"""
# Get api samples to unlock server request.
self.test_post_lock_server()
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-unlock-server', {})
self.assertEqual(response.status, 202)
def test_post_backup_server(self):
"""Get api samples to backup server request"""
# Get api samples to backup server request.
def image_details(self, context, **kwargs):
"""This stub is specifically used on the backup action."""
# NOTE(maurosr): I've added this simple stub cause backup action
@ -2013,17 +2013,17 @@ class AdminActionsSamplesJsonTest(ServersSampleBase):
self.assertEqual(response.status, 202)
def test_post_live_migrate_server(self):
"""Get api samples to server live migrate request"""
# Get api samples to server live migrate request.
def fake_live_migration_src_check(self, context, instance_ref):
"""Skip live migration scheduler checks"""
"""Skip live migration scheduler checks."""
return
def fake_live_migration_dest_check(self, context, instance_ref, dest):
"""Skip live migration scheduler checks"""
"""Skip live migration scheduler checks."""
return
def fake_live_migration_common(self, context, instance_ref, dest):
"""Skip live migration scheduler checks"""
"""Skip live migration scheduler checks."""
return
self.stubs.Set(driver.Scheduler, '_live_migration_src_check',
fake_live_migration_src_check)
@ -2050,7 +2050,7 @@ class AdminActionsSamplesJsonTest(ServersSampleBase):
self.assertEqual(response.status, 202)
def test_post_reset_state(self):
"""get api samples to server reset state request"""
# get api samples to server reset state request.
response = self._do_post('servers/%s/action' % self.uuid,
'admin-actions-reset-server-state', {})
self.assertEqual(response.status, 202)
@ -2116,20 +2116,20 @@ class QuotasSampleJsonTests(ApiSampleTestBase):
extension_name = "nova.api.openstack.compute.contrib.quotas.Quotas"
def test_show_quotas(self):
"""Get api sample to show quotas"""
# Get api sample to show quotas.
response = self._do_get('os-quota-sets/fake_tenant')
self.assertEqual(response.status, 200)
return self._verify_response('quotas-show-get-resp', {}, response)
def test_show_quotas_defaults(self):
"""Get api sample to show quotas defaults"""
# Get api sample to show quotas defaults.
response = self._do_get('os-quota-sets/fake_tenant/defaults')
self.assertEqual(response.status, 200)
return self._verify_response('quotas-show-defaults-get-resp',
{}, response)
def test_update_quotas(self):
"""Get api sample to update quotas"""
# Get api sample to update quotas.
response = self._do_put('os-quota-sets/fake_tenant',
'quotas-update-post-req',
{})
@ -2172,7 +2172,7 @@ class FlavorManageSampleJsonTests(ApiSampleTestBase):
"Flavormanage")
def _create_flavor(self):
"""Create a flavor"""
"""Create a flavor."""
subs = {
'flavor_id': 10,
'flavor_name': "test_flavor"
@ -2185,11 +2185,11 @@ class FlavorManageSampleJsonTests(ApiSampleTestBase):
return self._verify_response("flavor-create-post-resp", subs, response)
def test_create_flavor(self):
"""Get api sample to create a flavor"""
# Get api sample to create a flavor.
self._create_flavor()
def test_delete_flavor(self):
"""Get api sample to delete a flavor"""
# Get api sample to delete a flavor.
self._create_flavor()
response = self._do_delete("flavors/10")
self.assertEqual(response.status, 202)
@ -2366,7 +2366,7 @@ class FlavorDisabledSampleJsonTests(ApiSampleTestBase):
"Flavor_disabled")
def test_show_flavor(self):
"""Get api sample to show flavor_disabled attr. of a flavor"""
# Get api sample to show flavor_disabled attr. of a flavor.
flavor_id = 1
response = self._do_get('flavors/%s' % flavor_id)
self.assertEqual(response.status, 200)
@ -2376,7 +2376,7 @@ class FlavorDisabledSampleJsonTests(ApiSampleTestBase):
response)
def test_detail_flavor(self):
"""Get api sample to show details of a flavor"""
# Get api sample to show details of a flavor.
response = self._do_get('flavors/detail')
self.assertEqual(response.status, 200)
subs = self._get_regexes()
@ -2394,7 +2394,7 @@ class QuotaClassesSampleJsonTests(ApiSampleTestBase):
set_id = 'test_class'
def test_show_quota_classes(self):
"""Get api sample to show quota classes"""
# Get api sample to show quota classes.
response = self._do_get('os-quota-class-sets/%s' % self.set_id)
self.assertEqual(response.status, 200)
subs = {'set_id': self.set_id}
@ -2402,7 +2402,7 @@ class QuotaClassesSampleJsonTests(ApiSampleTestBase):
response)
def test_update_quota_classes(self):
"""Get api sample to update quota classes"""
# Get api sample to update quota classes.
response = self._do_put('os-quota-class-sets/%s' % self.set_id,
'quota-classes-update-post-req',
{})

View File

@ -36,7 +36,7 @@ class ExtensionsTest(integrated_helpers._IntegratedTestBase):
return f
def test_get_foxnsocks(self):
"""Simple check that fox-n-socks works."""
# Simple check that fox-n-socks works.
response = self.api.api_request('/foxnsocks')
foxnsocks = response.read()
LOG.debug("foxnsocks: %s" % foxnsocks)

View File

@ -25,7 +25,7 @@ LOG = logging.getLogger(__name__)
class LoginTest(integrated_helpers._IntegratedTestBase):
def test_login(self):
"""Simple check - we list flavors - so we know we're logged in."""
# Simple check - we list flavors - so we know we're logged in.
flavors = self.api.get_flavors()
for flavor in flavors:
LOG.debug(_("flavor: %s") % flavor)

View File

@ -48,13 +48,13 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self.compute = self.start_service('compute', *args, **kwargs)
def test_get_servers(self):
"""Simple check that listing servers works."""
# Simple check that listing servers works.
servers = self.api.get_servers()
for server in servers:
LOG.debug("server: %s" % server)
def test_create_server_with_error(self):
"""Create a server which will enter error state."""
# Create a server which will enter error state.
fake_network.set_stub_network_methods(self.stubs)
def throw_error(*_):
@ -75,7 +75,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self._delete_server(created_server_id)
def test_create_and_delete_server(self):
"""Creates and deletes a server."""
# Creates and deletes a server.
fake_network.set_stub_network_methods(self.stubs)
# Create server
@ -140,7 +140,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self._delete_server(created_server_id)
def test_deferred_delete(self):
"""Creates, deletes and waits for server to be reclaimed."""
# Creates, deletes and waits for server to be reclaimed.
self.flags(reclaim_instance_interval=1)
fake_network.set_stub_network_methods(self.stubs)
@ -183,7 +183,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self._wait_for_deletion(created_server_id)
def test_deferred_delete_restore(self):
"""Creates, deletes and restores a server."""
# Creates, deletes and restores a server.
self.flags(reclaim_instance_interval=1)
fake_network.set_stub_network_methods(self.stubs)
@ -216,7 +216,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self.assertEqual('ACTIVE', found_server['status'])
def test_deferred_delete_force(self):
"""Creates, deletes and force deletes a server."""
# Creates, deletes and force deletes a server.
self.flags(reclaim_instance_interval=1)
fake_network.set_stub_network_methods(self.stubs)
@ -273,7 +273,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self._wait_for_deletion(server_id)
def test_create_server_with_metadata(self):
"""Creates a server with metadata."""
# Creates a server with metadata.
fake_network.set_stub_network_methods(self.stubs)
# Build the server data gradually, checking errors along the way
@ -315,7 +315,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self._delete_server(created_server_id)
def test_create_and_rebuild_server(self):
"""Rebuild a server with metadata."""
# Rebuild a server with metadata.
fake_network.set_stub_network_methods(self.stubs)
# create a server with initially has no metadata
@ -382,7 +382,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self._delete_server(created_server_id)
def test_rename_server(self):
"""Test building and renaming a server."""
# Test building and renaming a server.
fake_network.set_stub_network_methods(self.stubs)
# Create a server
@ -403,7 +403,7 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self._delete_server(server_id)
def test_create_multiple_servers(self):
"""Creates multiple servers and checks for reservation_id"""
# Creates multiple servers and checks for reservation_id.
# Create 2 servers, setting 'return_reservation_id, which should
# return a reservation_id

View File

@ -40,7 +40,7 @@ class XmlTests(integrated_helpers._IntegratedTestBase):
self.assertEqual(root.nsmap.get(None), xmlutil.XMLNS_COMMON_V10)
def test_namespace_servers(self):
"""/servers should have v1.1 namespace (has changed in 1.1)."""
# /servers should have v1.1 namespace (has changed in 1.1).
headers = {}
headers['Accept'] = 'application/xml'

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for network API"""
"""Tests for network API."""
import random
@ -38,7 +38,7 @@ class ApiTestCase(test.TestCase):
'fake-project')
def _do_test_associate_floating_ip(self, orig_instance_uuid):
"""Test post-association logic"""
"""Test post-association logic."""
new_instance = {'uuid': 'new-uuid'}

View File

@ -1030,7 +1030,7 @@ class VlanNetworkTestCase(test.TestCase):
self.assertFalse(fixed['allocated'])
def test_deallocate_fixed_deleted(self):
"""Verify doesn't deallocate deleted fixed_ip from deleted network"""
# Verify doesn't deallocate deleted fixed_ip from deleted network.
def network_get(_context, network_id, project_only="allow_none"):
return networks[network_id]
@ -1094,7 +1094,7 @@ class VlanNetworkTestCase(test.TestCase):
self.network.deallocate_fixed_ip(context1, fix_addr, 'fake')
def test_fixed_ip_cleanup_fail(self):
"""Verify IP is not deallocated if the security group refresh fails."""
# Verify IP is not deallocated if the security group refresh fails.
def network_get(_context, network_id, project_only="allow_none"):
return networks[network_id]
@ -1534,11 +1534,11 @@ class CommonNetworkTestCase(test.TestCase):
class TestRPCFixedManager(network_manager.RPCAllocateFixedIP,
network_manager.NetworkManager):
"""Dummy manager that implements RPCAllocateFixedIP"""
"""Dummy manager that implements RPCAllocateFixedIP."""
class RPCAllocateTestCase(test.TestCase):
"""Tests nova.network.manager.RPCAllocateFixedIP"""
"""Tests nova.network.manager.RPCAllocateFixedIP."""
def setUp(self):
super(RPCAllocateTestCase, self).setUp()
self.rpc_fixed = TestRPCFixedManager()
@ -1566,7 +1566,7 @@ class RPCAllocateTestCase(test.TestCase):
class BackdoorPortTestCase(test.TestCase):
"""Tests nova.network.manager.get_backdoor_port"""
"""Tests nova.network.manager.get_backdoor_port."""
def setUp(self):
super(BackdoorPortTestCase, self).setUp()
self.manager = network_manager.NetworkManager()
@ -1580,7 +1580,7 @@ class BackdoorPortTestCase(test.TestCase):
class TestFloatingIPManager(network_manager.FloatingIP,
network_manager.NetworkManager):
"""Dummy manager that implements FloatingIP"""
"""Dummy manager that implements FloatingIP."""
class AllocateTestCase(test.TestCase):
@ -1624,7 +1624,7 @@ class AllocateTestCase(test.TestCase):
class FloatingIPTestCase(test.TestCase):
"""Tests nova.network.manager.FloatingIP"""
"""Tests nova.network.manager.FloatingIP."""
def setUp(self):
super(FloatingIPTestCase, self).setUp()
self.tempdir = tempfile.mkdtemp()
@ -2023,7 +2023,7 @@ class FloatingIPTestCase(test.TestCase):
self.network.delete_dns_domain(context_admin, domain2)
def test_mac_conflicts(self):
"""Make sure MAC collisions are retried"""
# Make sure MAC collisions are retried.
self.flags(create_unique_mac_address_attempts=3)
ctxt = context.RequestContext('testuser', 'testproject', is_admin=True)
macs = ['bb:bb:bb:bb:bb:bb', 'aa:aa:aa:aa:aa:aa']
@ -2055,7 +2055,7 @@ class FloatingIPTestCase(test.TestCase):
self.assertEqual(macs, [])
def test_deallocate_client_exceptions(self):
"""Ensure that FloatingIpNotFoundForAddress is wrapped"""
# Ensure that FloatingIpNotFoundForAddress is wrapped.
self.mox.StubOutWithMock(self.network.db, 'floating_ip_get_by_address')
self.network.db.floating_ip_get_by_address(
self.context, '1.2.3.4').AndRaise(
@ -2066,7 +2066,7 @@ class FloatingIPTestCase(test.TestCase):
self.context, '1.2.3.4')
def test_associate_client_exceptions(self):
"""Ensure that FloatingIpNotFoundForAddress is wrapped"""
# Ensure that FloatingIpNotFoundForAddress is wrapped.
self.mox.StubOutWithMock(self.network.db, 'floating_ip_get_by_address')
self.network.db.floating_ip_get_by_address(
self.context, '1.2.3.4').AndRaise(
@ -2077,7 +2077,7 @@ class FloatingIPTestCase(test.TestCase):
self.context, '1.2.3.4', '10.0.0.1')
def test_disassociate_client_exceptions(self):
"""Ensure that FloatingIpNotFoundForAddress is wrapped"""
# Ensure that FloatingIpNotFoundForAddress is wrapped.
self.mox.StubOutWithMock(self.network.db, 'floating_ip_get_by_address')
self.network.db.floating_ip_get_by_address(
self.context, '1.2.3.4').AndRaise(
@ -2088,7 +2088,7 @@ class FloatingIPTestCase(test.TestCase):
self.context, '1.2.3.4')
def test_get_floating_ip_client_exceptions(self):
"""Ensure that FloatingIpNotFoundForAddress is wrapped"""
# Ensure that FloatingIpNotFoundForAddress is wrapped.
self.mox.StubOutWithMock(self.network.db, 'floating_ip_get')
self.network.db.floating_ip_get(self.context, 'fake-id').AndRaise(
exception.FloatingIpNotFound(id='fake'))
@ -2123,7 +2123,7 @@ class NetworkPolicyTestCase(test.TestCase):
class InstanceDNSTestCase(test.TestCase):
"""Tests nova.network.manager instance DNS"""
"""Tests nova.network.manager instance DNS."""
def setUp(self):
super(InstanceDNSTestCase, self).setUp()
self.tempdir = tempfile.mkdtemp()
@ -2166,7 +2166,7 @@ domain2 = "example.com"
class LdapDNSTestCase(test.TestCase):
"""Tests nova.network.ldapdns.LdapDNS"""
"""Tests nova.network.ldapdns.LdapDNS."""
def setUp(self):
super(LdapDNSTestCase, self).setUp()

View File

@ -270,21 +270,21 @@ class TestQuantumv2(test.TestCase):
self._verify_nw_info(nw_inf, i)
def test_get_instance_nw_info_1(self):
"""Test to get one port in one network and subnet."""
# Test to get one port in one network and subnet.
quantumv2.get_client(mox.IgnoreArg(),
admin=True).MultipleTimes().AndReturn(
self.moxed_client)
self._get_instance_nw_info(1)
def test_get_instance_nw_info_2(self):
"""Test to get one port in each of two networks and subnets."""
# Test to get one port in each of two networks and subnets.
quantumv2.get_client(mox.IgnoreArg(),
admin=True).MultipleTimes().AndReturn(
self.moxed_client)
self._get_instance_nw_info(2)
def test_get_instance_nw_info_with_nets(self):
"""Test get instance_nw_info with networks passed in."""
# Test get instance_nw_info with networks passed in.
api = quantumapi.API()
self.mox.StubOutWithMock(api.db, 'instance_info_cache_update')
api.db.instance_info_cache_update(
@ -311,7 +311,7 @@ class TestQuantumv2(test.TestCase):
self._verify_nw_info(nw_inf, 0)
def test_get_instance_nw_info_without_subnet(self):
"""Test get instance_nw_info for a port without subnet."""
# Test get instance_nw_info for a port without subnet.
api = quantumapi.API()
self.mox.StubOutWithMock(api.db, 'instance_info_cache_update')
api.db.instance_info_cache_update(
@ -413,11 +413,11 @@ class TestQuantumv2(test.TestCase):
api.allocate_for_instance(self.context, self.instance, **kwargs)
def test_allocate_for_instance_1(self):
"""Allocate one port in one network env."""
# Allocate one port in one network env.
self._allocate_for_instance(1)
def test_allocate_for_instance_2(self):
"""Allocate one port in two networks env."""
# Allocate one port in two networks env.
self._allocate_for_instance(2)
def test_allocate_for_instance_with_requested_networks(self):
@ -520,11 +520,11 @@ class TestQuantumv2(test.TestCase):
api.deallocate_for_instance(self.context, self.instance)
def test_deallocate_for_instance_1(self):
"""Test to deallocate in one port env."""
# Test to deallocate in one port env.
self._deallocate_for_instance(1)
def test_deallocate_for_instance_2(self):
"""Test to deallocate in two ports env."""
# Test to deallocate in two ports env.
self._deallocate_for_instance(2)
def test_validate_networks(self):

View File

@ -108,14 +108,14 @@ class FakeHostState(host_manager.HostState):
class FakeInstance(object):
def __init__(self, context=None, params=None, type_name='m1.tiny'):
"""Create a test instance. Returns uuid"""
"""Create a test instance. Returns uuid."""
self.context = context
i = self._create_fake_instance(params, type_name=type_name)
self.uuid = i['uuid']
def _create_fake_instance(self, params=None, type_name='m1.tiny'):
"""Create a test instance"""
"""Create a test instance."""
if not params:
params = {}

View File

@ -216,7 +216,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.assertRaises(exception.NovaException, sched._max_attempts)
def test_retry_disabled(self):
"""Retry info should not get populated when re-scheduling is off"""
# Retry info should not get populated when re-scheduling is off.
self.flags(scheduler_max_attempts=1)
sched = fakes.FakeFilterScheduler()
@ -231,7 +231,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.assertFalse("retry" in filter_properties)
def test_retry_attempt_one(self):
"""Test retry logic on initial scheduling attempt"""
# Test retry logic on initial scheduling attempt.
self.flags(scheduler_max_attempts=2)
sched = fakes.FakeFilterScheduler()
@ -246,7 +246,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.assertEqual(1, num_attempts)
def test_retry_attempt_two(self):
"""Test retry logic when re-scheduling"""
# Test retry logic when re-scheduling.
self.flags(scheduler_max_attempts=2)
sched = fakes.FakeFilterScheduler()
@ -263,7 +263,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.assertEqual(2, num_attempts)
def test_retry_exceeded_max_attempts(self):
"""Test for necessary explosion when max retries is exceeded"""
# Test for necessary explosion when max retries is exceeded.
self.flags(scheduler_max_attempts=2)
sched = fakes.FakeFilterScheduler()
@ -290,7 +290,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.assertEqual([host, node], hosts[0])
def test_post_select_populate(self):
"""Test addition of certain filter props after a node is selected"""
# Test addition of certain filter props after a node is selected.
retry = {'hosts': [], 'num_attempts': 1}
filter_properties = {'retry': retry}
sched = fakes.FakeFilterScheduler()
@ -306,7 +306,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.assertEqual({'vcpus': 5}, host_state.limits)
def test_prep_resize_post_populates_retry(self):
"""Prep resize should add a ('host', 'node') entry to the retry dict"""
# Prep resize should add a ('host', 'node') entry to the retry dict.
sched = fakes.FakeFilterScheduler()
image = 'image'

View File

@ -38,7 +38,7 @@ class TestFilter(filters.BaseHostFilter):
class TestBogusFilter(object):
"""Class that doesn't inherit from BaseHostFilter"""
"""Class that doesn't inherit from BaseHostFilter."""
pass
@ -928,7 +928,7 @@ class HostFiltersTestCase(test.TestCase):
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_happy_day(self):
"""Test json filter more thoroughly"""
# Test json filter more thoroughly.
filt_cls = self.class_map['JsonFilter']()
raw = ['and',
'$capabilities.enabled',
@ -1246,14 +1246,14 @@ class HostFiltersTestCase(test.TestCase):
self.assertFalse(filt_cls.host_passes(host, request))
def test_retry_filter_disabled(self):
"""Test case where retry/re-scheduling is disabled"""
# Test case where retry/re-scheduling is disabled.
filt_cls = self.class_map['RetryFilter']()
host = fakes.FakeHostState('host1', 'node1', {})
filter_properties = {}
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_retry_filter_pass(self):
"""Node not previously tried"""
# Node not previously tried.
filt_cls = self.class_map['RetryFilter']()
host = fakes.FakeHostState('host1', 'nodeX', {})
retry = dict(num_attempts=2,
@ -1264,7 +1264,7 @@ class HostFiltersTestCase(test.TestCase):
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_retry_filter_fail(self):
"""Node was already tried"""
# Node was already tried.
filt_cls = self.class_map['RetryFilter']()
host = fakes.FakeHostState('host1', 'node1', {})
retry = dict(num_attempts=1,

View File

@ -38,7 +38,7 @@ class FakeFilterClass2(filters.BaseHostFilter):
class HostManagerTestCase(test.TestCase):
"""Test case for HostManager class"""
"""Test case for HostManager class."""
def setUp(self):
super(HostManagerTestCase, self).setUp()
@ -159,7 +159,7 @@ class HostManagerTestCase(test.TestCase):
self._verify_result(info, result)
def test_get_filtered_hosts_with_ignore_and_force(self):
"""Ensure ignore_hosts processed before force_hosts in host filters"""
# Ensure ignore_hosts processed before force_hosts in host filters.
fake_properties = {'force_hosts': ['fake_host3', 'fake_host1'],
'ignore_hosts': ['fake_host1']}
@ -268,7 +268,7 @@ class HostManagerTestCase(test.TestCase):
class HostStateTestCase(test.TestCase):
"""Test case for HostState class"""
"""Test case for HostState class."""
# update_from_compute_node() and consume_from_instance() are tested
# in HostManagerTestCase.test_get_all_host_states()

View File

@ -45,7 +45,7 @@ class FakeDefaultScheduler(driver.Scheduler):
class MultiDriverTestCase(test_scheduler.SchedulerTestCase):
"""Test case for multi driver"""
"""Test case for multi driver."""
driver_cls = multi.MultiScheduler

View File

@ -40,7 +40,7 @@ from nova.tests.scheduler import fakes
class SchedulerManagerTestCase(test.TestCase):
"""Test case for scheduler manager"""
"""Test case for scheduler manager."""
manager_cls = manager.SchedulerManager
driver_cls = driver.Scheduler
@ -268,7 +268,7 @@ class SchedulerManagerTestCase(test.TestCase):
class SchedulerTestCase(test.TestCase):
"""Test case for base scheduler driver class"""
"""Test case for base scheduler driver class."""
# So we can subclass this test and re-use tests if we need.
driver_cls = driver.Scheduler
@ -325,7 +325,7 @@ class SchedulerTestCase(test.TestCase):
'task_state': ''}
def test_live_migration_basic(self):
"""Test basic schedule_live_migration functionality"""
# Test basic schedule_live_migration functionality.
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_common_check')
@ -359,7 +359,7 @@ class SchedulerTestCase(test.TestCase):
disk_over_commit=disk_over_commit)
def test_live_migration_all_checks_pass(self):
"""Test live migration when all checks pass."""
# Test live migration when all checks pass.
self.mox.StubOutWithMock(servicegroup.API, 'service_is_up')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@ -422,7 +422,7 @@ class SchedulerTestCase(test.TestCase):
self.assertEqual(result, None)
def test_live_migration_instance_not_running(self):
"""The instance given by instance_id is not running."""
# The instance given by instance_id is not running.
dest = 'fake_host2'
block_migration = False
@ -437,7 +437,7 @@ class SchedulerTestCase(test.TestCase):
disk_over_commit=disk_over_commit)
def test_live_migration_compute_src_not_exist(self):
"""Raise exception when src compute node is does not exist."""
# Raise exception when src compute node is does not exist.
self.mox.StubOutWithMock(servicegroup.API, 'service_is_up')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@ -460,7 +460,7 @@ class SchedulerTestCase(test.TestCase):
disk_over_commit=disk_over_commit)
def test_live_migration_compute_src_not_alive(self):
"""Raise exception when src compute node is not alive."""
# Raise exception when src compute node is not alive.
self.mox.StubOutWithMock(servicegroup.API, 'service_is_up')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@ -483,7 +483,7 @@ class SchedulerTestCase(test.TestCase):
disk_over_commit=disk_over_commit)
def test_live_migration_compute_dest_not_alive(self):
"""Raise exception when dest compute node is not alive."""
# Raise exception when dest compute node is not alive.
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@ -508,7 +508,7 @@ class SchedulerTestCase(test.TestCase):
disk_over_commit=disk_over_commit)
def test_live_migration_dest_check_service_same_host(self):
"""Confirms exception raises in case dest and src is same host."""
# Confirms exception raises in case dest and src is same host.
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@ -532,7 +532,7 @@ class SchedulerTestCase(test.TestCase):
disk_over_commit=False)
def test_live_migration_dest_check_service_lack_memory(self):
"""Confirms exception raises when dest doesn't have enough memory."""
# Confirms exception raises when dest doesn't have enough memory.
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@ -563,7 +563,7 @@ class SchedulerTestCase(test.TestCase):
disk_over_commit=disk_over_commit)
def test_live_migration_different_hypervisor_type_raises(self):
"""Confirm live_migration to hypervisor of different type raises"""
# Confirm live_migration to hypervisor of different type raises.
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
@ -595,7 +595,7 @@ class SchedulerTestCase(test.TestCase):
disk_over_commit=disk_over_commit)
def test_live_migration_dest_hypervisor_version_older_raises(self):
"""Confirm live migration to older hypervisor raises"""
# Confirm live migration to older hypervisor raises.
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
@ -654,7 +654,7 @@ class SchedulerDriverBaseTestCase(SchedulerTestCase):
class SchedulerDriverModuleTestCase(test.TestCase):
"""Test case for scheduler driver module methods"""
"""Test case for scheduler driver module methods."""
def setUp(self):
super(SchedulerDriverModuleTestCase, self).setUp()

View File

@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the API endpoint"""
"""Unit tests for the API endpoint."""
import random
import StringIO
@ -45,13 +45,13 @@ from nova.tests import matchers
class FakeHttplibSocket(object):
"""a fake socket implementation for httplib.HTTPResponse, trivial"""
"""a fake socket implementation for httplib.HTTPResponse, trivial."""
def __init__(self, response_string):
self.response_string = response_string
self._buffer = StringIO.StringIO(response_string)
def makefile(self, _mode, _other):
"""Returns the socket's internal buffer"""
"""Returns the socket's internal buffer."""
return self._buffer
@ -91,12 +91,12 @@ class FakeHttplibConnection(object):
return self.sock.response_string
def close(self):
"""Required for compatibility with boto/tornado"""
"""Required for compatibility with boto/tornado."""
pass
class XmlConversionTestCase(test.TestCase):
"""Unit test api xml conversion"""
"""Unit test api xml conversion."""
def test_number_conversion(self):
conv = ec2utils._try_convert
self.assertEqual(conv('None'), None)
@ -212,7 +212,7 @@ class Ec2utilsTestCase(test.TestCase):
class ApiEc2TestCase(test.TestCase):
"""Unit test for the cloud controller on an EC2 API"""
"""Unit test for the cloud controller on an EC2 API."""
def setUp(self):
super(ApiEc2TestCase, self).setUp()
self.host = '127.0.0.1'
@ -225,7 +225,7 @@ class ApiEc2TestCase(test.TestCase):
self.useFixture(fixtures.FakeLogger('boto'))
def expect_http(self, host=None, is_secure=False, api_version=None):
"""Returns a new EC2 connection"""
"""Returns a new EC2 connection."""
self.ec2 = boto.connect_ec2(
aws_access_key_id='fake',
aws_secret_access_key='fake',
@ -281,7 +281,7 @@ class ApiEc2TestCase(test.TestCase):
self.assertEqual(self.ec2.get_all_instances(), [])
def test_terminate_invalid_instance(self):
"""Attempt to terminate an invalid instance"""
# Attempt to terminate an invalid instance.
self.expect_http()
self.mox.ReplayAll()
self.assertRaises(boto_exc.EC2ResponseError,
@ -318,7 +318,7 @@ class ApiEc2TestCase(test.TestCase):
self.fail('Exception not raised.')
def test_get_all_security_groups(self):
"""Test that we can retrieve security groups"""
# Test that we can retrieve security groups.
self.expect_http()
self.mox.ReplayAll()
@ -328,7 +328,7 @@ class ApiEc2TestCase(test.TestCase):
self.assertEquals(rv[0].name, 'default')
def test_create_delete_security_group(self):
"""Test that we can create a security group"""
# Test that we can create a security group.
self.expect_http()
self.mox.ReplayAll()

View File

@ -26,7 +26,7 @@ from nova.tests import matchers
class BlockDeviceMappingEc2CloudTestCase(test.TestCase):
"""Test Case for Block Device Mapping"""
"""Test Case for Block Device Mapping."""
def fake_ec2_vol_id_to_uuid(obj, ec2_id):
if ec2_id == 'vol-87654321':

View File

@ -88,7 +88,7 @@ class FakeHTTPClient(cinder.cinder_client.client.HTTPClient):
raise cinder_exception.NotFound(code=404, message='Resource not found')
def get_volumes_5678(self, **kw):
"""Volume with image metadata"""
"""Volume with image metadata."""
volume = {'volume': _stub_volume(id='1234',
volume_image_metadata=_image_metadata)
}

View File

@ -17,7 +17,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the DB API"""
"""Unit tests for the DB API."""
import datetime
import uuid as stdlib_uuid
@ -114,7 +114,7 @@ class DbApiTestCase(test.TestCase):
self.assertEqual(2, len(result))
def test_instance_get_all_by_filters_regex_unsupported_db(self):
"""Ensure that the 'LIKE' operator is used for unsupported dbs."""
# Ensure that the 'LIKE' operator is used for unsupported dbs.
self.flags(sql_connection="notdb://")
self.create_instances_with_args(display_name='test1')
self.create_instances_with_args(display_name='test.*')
@ -321,7 +321,7 @@ class DbApiTestCase(test.TestCase):
inst['uuid'], 'vm_state', [None, 'disable'], 'run')
def test_instance_update_with_instance_uuid(self):
"""test instance_update() works when an instance UUID is passed """
# test instance_update() works when an instance UUID is passed.
ctxt = context.get_admin_context()
# Create an instance with some metadata
@ -428,7 +428,7 @@ class DbApiTestCase(test.TestCase):
self.assertEquals("needscoffee", new_ref["vm_state"])
def test_instance_update_with_extra_specs(self):
"""Ensure _extra_specs are returned from _instance_update"""
# Ensure _extra_specs are returned from _instance_update.
ctxt = context.get_admin_context()
# create a flavor
@ -463,7 +463,7 @@ class DbApiTestCase(test.TestCase):
self.assertEquals(spec, new_ref['extra_specs'])
def test_instance_fault_create(self):
"""Ensure we can create an instance fault"""
# Ensure we can create an instance fault.
ctxt = context.get_admin_context()
uuid = str(stdlib_uuid.uuid4())
@ -481,7 +481,7 @@ class DbApiTestCase(test.TestCase):
self.assertEqual(404, faults[uuid][0]['code'])
def test_instance_fault_get_by_instance(self):
"""ensure we can retrieve an instance fault by instance UUID """
# ensure we can retrieve an instance fault by instance UUID.
ctxt = context.get_admin_context()
instance1 = db.instance_create(ctxt, {})
instance2 = db.instance_create(ctxt, {})
@ -530,7 +530,7 @@ class DbApiTestCase(test.TestCase):
self.assertEqual(instance_faults, expected)
def test_instance_faults_get_by_instance_uuids_no_faults(self):
"""None should be returned when no faults exist"""
# None should be returned when no faults exist.
ctxt = context.get_admin_context()
instance1 = db.instance_create(ctxt, {})
instance2 = db.instance_create(ctxt, {})

View File

@ -52,7 +52,7 @@ class FiltersTestCase(test.TestCase):
self.assertEqual(list(result), ['obj1', 'obj3'])
def test_filter_all_recursive_yields(self):
"""Test filter_all() allows generators from previous filter_all()s."""
# Test filter_all() allows generators from previous filter_all()s.
# filter_all() yields results. We want to make sure that we can
# call filter_all() with generators returned from previous calls
# to filter_all().

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for hook customization"""
"""Tests for hook customization."""
import stevedore

View File

@ -30,9 +30,9 @@ LOG = logging.getLogger(__name__)
class InstanceTypeTestCase(test.TestCase):
"""Test cases for instance type code"""
"""Test cases for instance type code."""
def _generate_name(self):
"""return a name not in the DB"""
"""return a name not in the DB."""
nonexistent_flavor = str(int(time.time()))
flavors = instance_types.get_all_types()
while nonexistent_flavor in flavors:
@ -41,7 +41,7 @@ class InstanceTypeTestCase(test.TestCase):
return nonexistent_flavor
def _generate_flavorid(self):
"""return a flavorid not in the DB"""
"""return a flavorid not in the DB."""
nonexistent_flavor = 2700
flavor_ids = [value["id"] for key, value in
instance_types.get_all_types().iteritems()]
@ -51,11 +51,11 @@ class InstanceTypeTestCase(test.TestCase):
return nonexistent_flavor
def _existing_flavor(self):
"""return first instance type name"""
"""return first instance type name."""
return instance_types.get_all_types().keys()[0]
def test_instance_type_create(self):
"""Ensure instance types can be created"""
# Ensure instance types can be created.
name = 'Instance create test'
flavor_id = '512'
@ -79,7 +79,7 @@ class InstanceTypeTestCase(test.TestCase):
'instance type was not created')
def test_instance_type_create_then_delete(self):
"""Ensure instance types can be created"""
# Ensure instance types can be created.
name = 'Small Flavor'
flavorid = 'flavor1'
@ -136,21 +136,21 @@ class InstanceTypeTestCase(test.TestCase):
self.assertEqual(inst_type['rxtx_factor'], 9.9)
def test_instance_type_create_with_special_characters(self):
"""Ensure instance types raises InvalidInput for invalid characters"""
# Ensure instance types raises InvalidInput for invalid characters.
name = "foo.bar!@#$%^-test_name"
flavorid = "flavor1"
self.assertRaises(exception.InvalidInput, instance_types.create,
name, 256, 1, 120, 100, flavorid)
def test_get_all_instance_types(self):
"""Ensures that all instance types can be retrieved"""
# Ensures that all instance types can be retrieved.
session = sql_session.get_session()
total_instance_types = session.query(models.InstanceTypes).count()
inst_types = instance_types.get_all_types()
self.assertEqual(total_instance_types, len(inst_types))
def test_invalid_create_args_should_fail(self):
"""Ensures that instance type creation fails with invalid args"""
# Ensures that instance type creation fails with invalid args.
invalid_sigs = [
(('Zero memory', 0, 1, 10, 20, 'flavor1'), {}),
(('Negative memory', -256, 1, 10, 20, 'flavor1'), {}),
@ -177,13 +177,13 @@ class InstanceTypeTestCase(test.TestCase):
instance_types.create, *args, **kwargs)
def test_non_existent_inst_type_shouldnt_delete(self):
"""Ensures that instance type creation fails with invalid args"""
# Ensures that instance type creation fails with invalid args.
self.assertRaises(exception.InstanceTypeNotFoundByName,
instance_types.destroy,
'unknown_flavor')
def test_duplicate_names_fail(self):
"""Ensures that name duplicates raise InstanceTypeCreateFailed"""
# Ensures that name duplicates raise InstanceTypeCreateFailed.
name = 'some_name'
instance_types.create(name, 256, 1, 120, 200, 'flavor1')
self.assertRaises(exception.InstanceTypeExists,
@ -191,7 +191,7 @@ class InstanceTypeTestCase(test.TestCase):
name, 256, 1, 120, 200, 'flavor2')
def test_duplicate_flavorids_fail(self):
"""Ensures that flavorid duplicates raise InstanceTypeCreateFailed"""
# Ensures that flavorid duplicates raise InstanceTypeCreateFailed.
flavorid = 'flavor1'
instance_types.create('name one', 256, 1, 120, 200, flavorid)
self.assertRaises(exception.InstanceTypeIdExists,
@ -199,12 +199,12 @@ class InstanceTypeTestCase(test.TestCase):
'name two', 256, 1, 120, 200, flavorid)
def test_will_not_destroy_with_no_name(self):
"""Ensure destroy said path of no name raises error"""
# Ensure destroy said path of no name raises error.
self.assertRaises(exception.InstanceTypeNotFoundByName,
instance_types.destroy, None)
def test_will_not_get_bad_default_instance_type(self):
"""ensures error raised on bad default instance type"""
# ensures error raised on bad default instance type.
self.flags(default_instance_type='unknown_flavor')
self.assertRaises(exception.InstanceTypeNotFound,
instance_types.get_default_instance_type)
@ -216,28 +216,28 @@ class InstanceTypeTestCase(test.TestCase):
self.assertEqual(default_instance_type, fetched)
def test_will_not_get_instance_type_by_unknown_id(self):
"""Ensure get by name returns default flavor with no name"""
# Ensure get by name returns default flavor with no name.
self.assertRaises(exception.InstanceTypeNotFound,
instance_types.get_instance_type, 10000)
def test_will_not_get_instance_type_with_bad_id(self):
"""Ensure get by name returns default flavor with bad name"""
# Ensure get by name returns default flavor with bad name.
self.assertRaises(exception.InstanceTypeNotFound,
instance_types.get_instance_type, 'asdf')
def test_instance_type_get_by_None_name_returns_default(self):
"""Ensure get by name returns default flavor with no name"""
# Ensure get by name returns default flavor with no name.
default = instance_types.get_default_instance_type()
actual = instance_types.get_instance_type_by_name(None)
self.assertEqual(default, actual)
def test_will_not_get_instance_type_with_bad_name(self):
"""Ensure get by name returns default flavor with bad name"""
# Ensure get by name returns default flavor with bad name.
self.assertRaises(exception.InstanceTypeNotFound,
instance_types.get_instance_type_by_name, 10000)
def test_will_not_get_instance_by_unknown_flavor_id(self):
"""Ensure get by flavor raises error with wrong flavorid"""
# Ensure get by flavor raises error with wrong flavorid.
self.assertRaises(exception.FlavorNotFound,
instance_types.get_instance_type_by_flavor_id,
'unknown_flavor')
@ -249,7 +249,7 @@ class InstanceTypeTestCase(test.TestCase):
self.assertEqual(default_instance_type, fetched)
def test_can_read_deleted_types_using_flavor_id(self):
"""Ensure deleted instance types can be read when querying flavor_id"""
# Ensure deleted instance types can be read when querying flavor_id.
inst_type_name = "test"
inst_type_flavor_id = "test1"
@ -280,7 +280,7 @@ class InstanceTypeTestCase(test.TestCase):
self.assertEqual("instance_type1_redo", instance_type["name"])
def test_will_list_deleted_type_for_active_instance(self):
"""Ensure deleted instance types with active instances can be read"""
# Ensure deleted instance types with active instances can be read.
ctxt = context.get_admin_context()
inst_type = instance_types.create("test", 256, 1, 120, 100, "test1")
@ -299,7 +299,7 @@ class InstanceTypeTestCase(test.TestCase):
class InstanceTypeFilteringTest(test.TestCase):
"""Test cases for the filter option available for instance_type_get_all"""
"""Test cases for the filter option available for instance_type_get_all."""
def setUp(self):
super(InstanceTypeFilteringTest, self).setUp()
self.context = context.get_admin_context()
@ -317,19 +317,19 @@ class InstanceTypeFilteringTest(test.TestCase):
self.assertFilterResults(filters, expected)
def test_min_memory_mb_filter(self):
"""Exclude tiny instance which is 512 MB"""
# Exclude tiny instance which is 512 MB.
filters = dict(min_memory_mb=513)
expected = ['m1.large', 'm1.medium', 'm1.small', 'm1.xlarge']
self.assertFilterResults(filters, expected)
def test_min_root_gb_filter(self):
"""Exclude everything but large and xlarge which have >= 80 GB"""
# Exclude everything but large and xlarge which have >= 80 GB.
filters = dict(min_root_gb=80)
expected = ['m1.large', 'm1.xlarge']
self.assertFilterResults(filters, expected)
def test_min_memory_mb_AND_root_gb_filter(self):
"""Exclude everything but large and xlarge which have >= 80 GB"""
# Exclude everything but large and xlarge which have >= 80 GB.
filters = dict(min_memory_mb=16384, min_root_gb=80)
expected = ['m1.xlarge']
self.assertFilterResults(filters, expected)

View File

@ -473,7 +473,7 @@ class CacheConcurrencyTestCase(test.TestCase):
super(CacheConcurrencyTestCase, self).tearDown()
def test_same_fname_concurrency(self):
"""Ensures that the same fname cache runs at a sequentially"""
# Ensures that the same fname cache runs at a sequentially.
backend = imagebackend.Backend(False)
wait1 = eventlet.event.Event()
done1 = eventlet.event.Event()
@ -507,7 +507,7 @@ class CacheConcurrencyTestCase(test.TestCase):
thr2.wait()
def test_different_fname_concurrency(self):
"""Ensures that two different fname caches are concurrent"""
# Ensures that two different fname caches are concurrent.
backend = imagebackend.Backend(False)
wait1 = eventlet.event.Event()
done1 = eventlet.event.Event()
@ -2043,7 +2043,7 @@ class LibvirtConnTestCase(test.TestCase):
db.instance_destroy(user_context, instance_ref['uuid'])
def test_ensure_filtering_rules_for_instance_timeout(self):
"""ensure_filtering_fules_for_instance() finishes with timeout."""
# ensure_filtering_fules_for_instance() finishes with timeout.
# Preparing mocks
def fake_none(self, *args):
return
@ -2288,7 +2288,7 @@ class LibvirtConnTestCase(test.TestCase):
self.context, instance_ref, dest_check_data)
def test_live_migration_raises_exception(self):
"""Confirms recover method is called when exceptions are raised."""
# Confirms recover method is called when exceptions are raised.
# Preparing data
self.compute = importutils.import_object(CONF.compute_manager)
instance_dict = {'host': 'fake',
@ -2936,7 +2936,7 @@ class LibvirtConnTestCase(test.TestCase):
conn._destroy(instance)
def test_available_least_handles_missing(self):
"""Ensure destroy calls managedSaveRemove for saved instance"""
# Ensure destroy calls managedSaveRemove for saved instance.
conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
def list_instances():
@ -3427,7 +3427,7 @@ class HostStateTestCase(test.TestCase):
instance_caps = [("x86_64", "kvm", "hvm"), ("i686", "kvm", "hvm")]
class FakeConnection(object):
"""Fake connection object"""
"""Fake connection object."""
def get_vcpu_total(self):
return 1
@ -3939,7 +3939,7 @@ class NWFilterTestCase(test.TestCase):
'instance_type_id': 1})
def _create_instance_type(self, params=None):
"""Create a test instance"""
"""Create a test instance."""
if not params:
params = {}
@ -4285,7 +4285,7 @@ class LibvirtDriverTestCase(test.TestCase):
fake.FakeVirtAPI(), read_only=True)
def _create_instance(self, params=None):
"""Create a test instance"""
"""Create a test instance."""
if not params:
params = {}
@ -4641,14 +4641,14 @@ class LibvirtVolumeUsageTestCase(test.TestCase):
class LibvirtNonblockingTestCase(test.TestCase):
"""Test libvirt_nonblocking option"""
"""Test libvirt_nonblocking option."""
def setUp(self):
super(LibvirtNonblockingTestCase, self).setUp()
self.flags(libvirt_nonblocking=True, libvirt_uri="test:///default")
def test_connection_to_primitive(self):
"""Test bug 962840"""
# Test bug 962840.
import nova.virt.libvirt.driver as libvirt_driver
connection = libvirt_driver.LibvirtDriver('')
jsonutils.to_primitive(connection._conn, convert_instances=True)

View File

@ -151,7 +151,7 @@ class MetadataTestCase(test.TestCase):
"%s.%s" % (self.instance['hostname'], CONF.dhcp_domain))
def test_format_instance_mapping(self):
"""Make sure that _format_instance_mappings works"""
# Make sure that _format_instance_mappings works.
ctxt = None
instance_ref0 = {'id': 0,
'uuid': 'e5fe5518-0288-4fa3-b0c4-c79764101b85',

View File

@ -77,7 +77,7 @@ def _have_mysql():
class TestMigrations(test.TestCase):
"""Test sqlalchemy-migrate migrations"""
"""Test sqlalchemy-migrate migrations."""
DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
'test_migrations.conf')
@ -211,9 +211,7 @@ class TestMigrations(test.TestCase):
self.fail("Shouldn't have connected")
def test_mysql_innodb(self):
"""
Test that table creation on mysql only builds InnoDB tables
"""
# Test that table creation on mysql only builds InnoDB tables
if not _have_mysql():
self.skipTest("mysql not available")
# add this to the global lists to make reset work with it, it's removed

View File

@ -109,7 +109,7 @@ class RootwrapTestCase(test.TestCase):
p.wait()
def test_KillFilter_no_raise(self):
"""Makes sure ValueError from bug 926412 is gone"""
# Makes sure ValueError from bug 926412 is gone.
f = filters.KillFilter("root", "")
# Providing anything other than kill should be False
usercmd = ['notkill', 999999]
@ -119,7 +119,7 @@ class RootwrapTestCase(test.TestCase):
self.assertFalse(f.match(usercmd))
def test_KillFilter_deleted_exe(self):
"""Makes sure deleted exe's are killed correctly"""
# Makes sure deleted exe's are killed correctly.
# See bug #967931.
def fake_readlink(blah):
return '/bin/commandddddd (deleted)'

View File

@ -94,11 +94,11 @@ class S3APITestCase(test.TestCase):
return True
def test_list_buckets(self):
"""Make sure we are starting with no buckets."""
# Make sure we are starting with no buckets.
self._ensure_no_buckets(self.conn.get_all_buckets())
def test_create_and_delete_bucket(self):
"""Test bucket creation and deletion."""
# Test bucket creation and deletion.
bucket_name = 'testbucket'
self.conn.create_bucket(bucket_name)
@ -107,7 +107,7 @@ class S3APITestCase(test.TestCase):
self._ensure_no_buckets(self.conn.get_all_buckets())
def test_create_bucket_and_key_and_delete_key_again(self):
"""Test key operations on buckets."""
# Test key operations on buckets.
bucket_name = 'testbucket'
key_name = 'somekey'
key_contents = 'somekey'

View File

@ -57,7 +57,7 @@ class MockEntrypoint(pkg_resources.EntryPoint):
class APITestCase(test.TestCase):
"""Test case for the plugin api extension interface"""
"""Test case for the plugin api extension interface."""
def test_add_extension(self):
def mock_load(_s):
return TestPluginClass()

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Test of Policy Engine For Nova"""
"""Test of Policy Engine For Nova."""
import os.path
import StringIO
@ -48,10 +48,10 @@ class PolicyFileTestCase(test.TestCase):
action = "example:test"
with open(tmpfilename, "w") as policyfile:
policyfile.write("""{"example:test": ""}""")
policyfile.write('{"example:test": ""}')
policy.enforce(self.context, action, self.target)
with open(tmpfilename, "w") as policyfile:
policyfile.write("""{"example:test": "!"}""")
policyfile.write('{"example:test": "!"}')
# NOTE(vish): reset stored policy cache so we don't have to
# sleep(1)
policy._POLICY_CACHE = {}

View File

@ -162,7 +162,7 @@ class PowerVMDriverTestCase(test.TestCase):
self.assertEqual(state, power_state.RUNNING)
def test_spawn_cleanup_on_fail(self):
"""Verify on a failed spawn, we get the original exception raised"""
# Verify on a failed spawn, we get the original exception raised.
# helper function
def raise_(ex):
raise ex

View File

@ -59,7 +59,7 @@ class QuotaIntegrationTestCase(test.TestCase):
orig_rpc_call = rpc.call
def rpc_call_wrapper(context, topic, msg, timeout=None):
"""Stub out the scheduler creating the instance entry"""
"""Stub out the scheduler creating the instance entry."""
if (topic == CONF.scheduler_topic and
msg['method'] == 'run_instance'):
scheduler = scheduler_driver.Scheduler
@ -79,7 +79,7 @@ class QuotaIntegrationTestCase(test.TestCase):
nova.tests.image.fake.FakeImageService_reset()
def _create_instance(self, cores=2):
"""Create a test instance"""
"""Create a test instance."""
inst = {}
inst['image_id'] = 'cedef40a-ed67-4d10-800e-17455edce175'
inst['reservation_id'] = 'r-fakeres'

View File

@ -50,7 +50,7 @@ CONF.register_opts(test_service_opts)
class FakeManager(manager.Manager):
"""Fake manager for tests"""
"""Fake manager for tests."""
def test_method(self):
return 'manager'
@ -61,7 +61,7 @@ class ExtendedService(service.Service):
class ServiceManagerTestCase(test.TestCase):
"""Test cases for Services"""
"""Test cases for Services."""
def test_message_gets_to_manager(self):
serv = service.Service('test',
@ -105,7 +105,7 @@ class ServiceFlagsTestCase(test.TestCase):
class ServiceTestCase(test.TestCase):
"""Test cases for Services"""
"""Test cases for Services."""
def setUp(self):
super(ServiceTestCase, self).setUp()

View File

@ -21,7 +21,7 @@ from nova.tests import utils as test_utils
class TestUtilsTestCase(test.TestCase):
def test_get_test_admin_context(self):
"""get_test_admin_context's return value behaves like admin context"""
# get_test_admin_context's return value behaves like admin context.
ctxt = test_utils.get_test_admin_context()
# TODO(soren): This should verify the full interface context
@ -29,13 +29,13 @@ class TestUtilsTestCase(test.TestCase):
self.assertTrue(ctxt.is_admin)
def test_get_test_instance(self):
"""get_test_instance's return value looks like an instance_ref"""
# get_test_instance's return value looks like an instance_ref.
instance_ref = test_utils.get_test_instance()
ctxt = test_utils.get_test_admin_context()
db.instance_get(ctxt, instance_ref['id'])
def _test_get_test_network_info(self):
"""Does the return value match a real network_info structure"""
"""Does the return value match a real network_info structure."""
# The challenge here is to define what exactly such a structure
# must look like.
pass

View File

@ -674,7 +674,7 @@ class AuditPeriodTest(test.TestCase):
class DiffDict(test.TestCase):
"""Unit tests for diff_dict()"""
"""Unit tests for diff_dict()."""
def test_no_change(self):
old = dict(a=1, b=2, c=3)

View File

@ -23,9 +23,9 @@ from nova import version
class VersionTestCase(test.TestCase):
"""Test cases for Versions code"""
"""Test cases for Versions code."""
def setUp(self):
"""setup test with unchanging values"""
"""setup test with unchanging values."""
super(VersionTestCase, self).setUp()
self.version = version
self.version.FINAL = False
@ -37,15 +37,15 @@ class VersionTestCase(test.TestCase):
self.version.NOVA_PACKAGE = "g9ec3421"
def test_version_string_is_good(self):
"""Ensure version string works"""
# Ensure version string works.
self.assertEqual("2012.10-dev", self.version.version_string())
def test_canonical_version_string_is_good(self):
"""Ensure canonical version works"""
# Ensure canonical version works.
self.assertEqual("2012.10", self.version.canonical_version_string())
def test_final_version_strings_are_identical(self):
"""Ensure final version strings match only at release"""
# Ensure final version strings match only at release.
self.assertNotEqual(self.version.canonical_version_string(),
self.version.version_string())
self.version.FINAL = True
@ -53,7 +53,7 @@ class VersionTestCase(test.TestCase):
self.version.version_string())
def test_version_string_with_package_is_good(self):
"""Ensure uninstalled code get version string"""
# Ensure uninstalled code get version string.
self.assertEqual("2012.10-g9ec3421",
self.version.version_string_with_package())

View File

@ -265,7 +265,7 @@ class XenAPIVolumeTestCase(stubs.XenAPITestBase):
'dev/sd')
def test_attach_volume(self):
"""This shows how to test Ops classes' methods."""
# This shows how to test Ops classes' methods.
stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
instance = db.instance_create(self.context, self.instance_values)
@ -281,7 +281,7 @@ class XenAPIVolumeTestCase(stubs.XenAPITestBase):
self.assertEqual(vm_ref, vm)
def test_attach_volume_raise_exception(self):
"""This shows how to test when exceptions are raised."""
# This shows how to test when exceptions are raised.
stubs.stubout_session(self.stubs,
stubs.FakeSessionForVolumeFailedTests)
conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
@ -636,7 +636,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
self.assertTrue(instance['architecture'])
def test_spawn_empty_dns(self):
"""Test spawning with an empty dns list"""
# Test spawning with an empty dns list.
self._test_spawn(IMAGE_VHD, None, None,
os_type="linux", architecture="x86-64",
empty_dns=True)
@ -858,7 +858,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
str(3 * 10 * 1024))
def test_spawn_injected_files(self):
"""Test spawning with injected_files"""
# Test spawning with injected_files.
actual_injected_files = []
def fake_inject_file(self, method, args):
@ -1340,7 +1340,7 @@ class XenAPIMigrateInstance(stubs.XenAPITestBase):
network_info, image_meta, resize_instance=False)
def test_migrate_no_auto_disk_config_no_resize_down(self):
"""Resize down should fail when auto_disk_config not set"""
# Resize down should fail when auto_disk_config not set.
instance_values = self.instance_values
instance_values['root_gb'] = 40
instance_values['auto_disk_config'] = False
@ -1358,7 +1358,7 @@ class XenAPIImageTypeTestCase(test.TestCase):
"""Test ImageType class."""
def test_to_string(self):
"""Can convert from type id to type string."""
# Can convert from type id to type string.
self.assertEquals(
vm_utils.ImageType.to_string(vm_utils.ImageType.KERNEL),
vm_utils.ImageType.KERNEL_STR)
@ -1439,23 +1439,23 @@ class XenAPIDetermineIsPVTestCase(test.TestCase):
class CompareVersionTestCase(test.TestCase):
def test_less_than(self):
"""Test that cmp_version compares a as less than b"""
# Test that cmp_version compares a as less than b.
self.assertTrue(vmops.cmp_version('1.2.3.4', '1.2.3.5') < 0)
def test_greater_than(self):
"""Test that cmp_version compares a as greater than b"""
# Test that cmp_version compares a as greater than b.
self.assertTrue(vmops.cmp_version('1.2.3.5', '1.2.3.4') > 0)
def test_equal(self):
"""Test that cmp_version compares a as equal to b"""
# Test that cmp_version compares a as equal to b.
self.assertTrue(vmops.cmp_version('1.2.3.4', '1.2.3.4') == 0)
def test_non_lexical(self):
"""Test that cmp_version compares non-lexically"""
# Test that cmp_version compares non-lexically.
self.assertTrue(vmops.cmp_version('1.2.3.10', '1.2.3.4') > 0)
def test_length(self):
"""Test that cmp_version compares by length as last resort"""
# Test that cmp_version compares by length as last resort.
self.assertTrue(vmops.cmp_version('1.2.3', '1.2.3.4') < 0)
@ -1619,7 +1619,7 @@ class XenAPIAutoDiskConfigTestCase(stubs.XenAPITestBase):
@stub_vm_utils_with_vdi_attached_here
def test_instance_auto_disk_config_doesnt_pass_fail_safes(self):
"""Should not partition unless fail safes pass"""
# Should not partition unless fail safes pass.
self.instance_values['auto_disk_config'] = True
def fake_get_partitions(dev):
@ -1645,7 +1645,7 @@ class XenAPIAutoDiskConfigTestCase(stubs.XenAPITestBase):
class XenAPIGenerateLocal(stubs.XenAPITestBase):
"""Test generating of local disks, like swap and ephemeral"""
"""Test generating of local disks, like swap and ephemeral."""
def setUp(self):
super(XenAPIGenerateLocal, self).setUp()
self.flags(xenapi_connection_url='test_url',
@ -1697,7 +1697,7 @@ class XenAPIGenerateLocal(stubs.XenAPITestBase):
self.assertTrue(self.called)
def test_generate_swap(self):
"""Test swap disk generation."""
# Test swap disk generation.
instance = db.instance_create(self.context, self.instance_values)
instance = db.instance_update(self.context, instance['uuid'],
{'instance_type_id': 5})
@ -1714,7 +1714,7 @@ class XenAPIGenerateLocal(stubs.XenAPITestBase):
self.assertCalled(instance)
def test_generate_ephemeral(self):
"""Test ephemeral disk generation."""
# Test ephemeral disk generation.
instance = db.instance_create(self.context, self.instance_values)
instance = db.instance_update(self.context, instance['uuid'],
{'instance_type_id': 4})
@ -2136,7 +2136,7 @@ class XenAPIDom0IptablesFirewallTestCase(stubs.XenAPITestBase):
class XenAPISRSelectionTestCase(stubs.XenAPITestBase):
"""Unit tests for testing we find the right SR."""
def test_safe_find_sr_raise_exception(self):
"""Ensure StorageRepositoryNotFound is raise when wrong filter."""
# Ensure StorageRepositoryNotFound is raise when wrong filter.
self.flags(sr_matching_filter='yadayadayada')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
@ -2145,7 +2145,7 @@ class XenAPISRSelectionTestCase(stubs.XenAPITestBase):
vm_utils.safe_find_sr, session)
def test_safe_find_sr_local_storage(self):
"""Ensure the default local-storage is found."""
# Ensure the default local-storage is found.
self.flags(sr_matching_filter='other-config:i18n-key=local-storage')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
@ -2162,7 +2162,7 @@ class XenAPISRSelectionTestCase(stubs.XenAPITestBase):
self.assertEqual(local_sr, expected)
def test_safe_find_sr_by_other_criteria(self):
"""Ensure the SR is found when using a different filter."""
# Ensure the SR is found when using a different filter.
self.flags(sr_matching_filter='other-config:my_fake_sr=true')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
@ -2176,7 +2176,7 @@ class XenAPISRSelectionTestCase(stubs.XenAPITestBase):
self.assertEqual(local_sr, expected)
def test_safe_find_sr_default(self):
"""Ensure the default SR is found regardless of other-config."""
# Ensure the default SR is found regardless of other-config.
self.flags(sr_matching_filter='default-sr:true')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
@ -2283,7 +2283,7 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
matchers.DictMatches(result['metadetails']))
def test_join_slave(self):
"""Ensure join_slave gets called when the request gets to master."""
# Ensure join_slave gets called when the request gets to master.
def fake_join_slave(id, compute_uuid, host, url, user, password):
fake_join_slave.called = True
self.stubs.Set(self.conn._pool, "_join_slave", fake_join_slave)
@ -2338,7 +2338,7 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
self.context, result, "test_host")
def test_remove_slave(self):
"""Ensure eject slave gets called."""
# Ensure eject slave gets called.
def fake_eject_slave(id, compute_uuid, host_uuid):
fake_eject_slave.called = True
self.stubs.Set(self.conn._pool, "_eject_slave", fake_eject_slave)
@ -2350,7 +2350,7 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
self.assertTrue(fake_eject_slave.called)
def test_remove_master_solo(self):
"""Ensure metadata are cleared after removal."""
# Ensure metadata are cleared after removal.
def fake_clear_pool(id):
fake_clear_pool.called = True
self.stubs.Set(self.conn._pool, "_clear_pool", fake_clear_pool)
@ -2365,7 +2365,7 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
matchers.DictMatches(result['metadetails']))
def test_remote_master_non_empty_pool(self):
"""Ensure AggregateError is raised if removing the master."""
# Ensure AggregateError is raised if removing the master.
aggregate = self._aggregate_setup(hosts=['host', 'host2'],
metadata=self.fake_metadata)
@ -2415,7 +2415,7 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
aggregate, 'fake_host')
def test_remove_host_from_aggregate_error(self):
"""Ensure we can remove a host from an aggregate even if in error."""
# Ensure we can remove a host from an aggregate even if in error.
values = _create_service_entries(self.context)
fake_zone = values.keys()[0]
aggr = self.api.create_aggregate(self.context,
@ -2453,7 +2453,7 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
aggregate, 'fake_host')
def test_add_aggregate_host_raise_err(self):
"""Ensure the undo operation works correctly on add."""
# Ensure the undo operation works correctly on add.
def fake_driver_add_to_aggregate(context, aggregate, host, **_ignore):
raise exception.AggregateError(
aggregate_id='', action='', reason='')
@ -2492,7 +2492,7 @@ class MockComputeAPI(object):
class StubDependencies(object):
"""Stub dependencies for ResourcePool"""
"""Stub dependencies for ResourcePool."""
def __init__(self):
self.compute_rpcapi = MockComputeAPI()
@ -2511,7 +2511,7 @@ class StubDependencies(object):
class ResourcePoolWithStubs(StubDependencies, pool.ResourcePool):
"""A ResourcePool, use stub dependencies """
"""A ResourcePool, use stub dependencies."""
class HypervisorPoolTestCase(test.TestCase):

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Stubouts, mocks and fixtures for the test suite"""
"""Stubouts, mocks and fixtures for the test suite."""
import pickle
import random
@ -54,7 +54,7 @@ def stubout_instance_snapshot(stubs):
def stubout_session(stubs, cls, product_version=(5, 6, 2),
product_brand='XenServer', **opt_args):
"""Stubs out methods from XenAPISession"""
"""Stubs out methods from XenAPISession."""
stubs.Set(xenapi_conn.XenAPISession, '_create_session',
lambda s, url: cls(url, **opt_args))
stubs.Set(xenapi_conn.XenAPISession, '_get_product_version_and_brand',
@ -90,7 +90,7 @@ def stubout_is_vdi_pv(stubs):
def stubout_determine_is_pv_objectstore(stubs):
"""Assumes VMs stu have PV kernels"""
"""Assumes VMs stu have PV kernels."""
def f(*args):
return False
@ -158,7 +158,7 @@ def _make_fake_vdi():
class FakeSessionForVMTests(fake.SessionBase):
"""Stubs out a XenAPISession for VM tests """
"""Stubs out a XenAPISession for VM tests."""
_fake_iptables_save_output = ("# Generated by iptables-save v1.4.10 on "
"Sun Nov 6 22:49:02 2011\n"
@ -204,7 +204,7 @@ class FakeSessionForVMTests(fake.SessionBase):
class FakeSessionForFirewallTests(FakeSessionForVMTests):
"""Stubs out a XenApi Session for doing IPTable Firewall tests """
"""Stubs out a XenApi Session for doing IPTable Firewall tests."""
def __init__(self, uri, test_case=None):
super(FakeSessionForFirewallTests, self).__init__(uri)
@ -270,7 +270,7 @@ def stub_out_vm_methods(stubs):
class FakeSessionForVolumeTests(fake.SessionBase):
"""Stubs out a XenAPISession for Volume tests """
"""Stubs out a XenAPISession for Volume tests."""
def VDI_introduce(self, _1, uuid, _2, _3, _4, _5,
_6, _7, _8, _9, _10, _11):
valid_vdi = False
@ -284,7 +284,7 @@ class FakeSessionForVolumeTests(fake.SessionBase):
class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests):
"""Stubs out a XenAPISession for Volume tests: it injects failures """
"""Stubs out a XenAPISession for Volume tests: it injects failures."""
def VDI_introduce(self, _1, uuid, _2, _3, _4, _5,
_6, _7, _8, _9, _10, _11):
# This is for testing failure