move all tests to nova/tests/unit

As part of the split of functional and unit tests we need to isolate
the unit tests into a separate directory for having multiple test
targets in a sane way.

Part of bp:functional-tests-for-nova

Change-Id: Id42ba373c1bda6a312b673ab2b489ca56da8c628
changes/34/133234/7
Sean Dague 8 years ago
parent 5c8bbaafef
commit 89cd6a0c49
  1. 4
      nova/test.py
  2. 49
      nova/tests/__init__.py
  3. 635
      nova/tests/api/ec2/test_api.py
  4. 1096
      nova/tests/api/ec2/test_cinder_cloud.py
  5. 3255
      nova/tests/api/ec2/test_cloud.py
  6. 277
      nova/tests/api/ec2/test_ec2_validate.py
  7. 734
      nova/tests/api/openstack/compute/contrib/test_admin_actions.py
  8. 111
      nova/tests/api/openstack/compute/contrib/test_admin_password.py
  9. 670
      nova/tests/api/openstack/compute/contrib/test_aggregates.py
  10. 455
      nova/tests/api/openstack/compute/contrib/test_attach_interfaces.py
  11. 512
      nova/tests/api/openstack/compute/contrib/test_availability_zone.py
  12. 159
      nova/tests/api/openstack/compute/contrib/test_baremetal_nodes.py
  13. 359
      nova/tests/api/openstack/compute/contrib/test_block_device_mapping.py
  14. 421
      nova/tests/api/openstack/compute/contrib/test_block_device_mapping_v1.py
  15. 698
      nova/tests/api/openstack/compute/contrib/test_cells.py
  16. 140
      nova/tests/api/openstack/compute/contrib/test_certificates.py
  17. 210
      nova/tests/api/openstack/compute/contrib/test_cloudpipe.py
  18. 99
      nova/tests/api/openstack/compute/contrib/test_cloudpipe_update.py
  19. 260
      nova/tests/api/openstack/compute/contrib/test_config_drive.py
  20. 103
      nova/tests/api/openstack/compute/contrib/test_console_auth_tokens.py
  21. 171
      nova/tests/api/openstack/compute/contrib/test_console_output.py
  22. 587
      nova/tests/api/openstack/compute/contrib/test_consoles.py
  23. 387
      nova/tests/api/openstack/compute/contrib/test_createserverext.py
  24. 147
      nova/tests/api/openstack/compute/contrib/test_deferred_delete.py
  25. 449
      nova/tests/api/openstack/compute/contrib/test_disk_config.py
  26. 268
      nova/tests/api/openstack/compute/contrib/test_evacuate.py
  27. 184
      nova/tests/api/openstack/compute/contrib/test_extended_availability_zone.py
  28. 114
      nova/tests/api/openstack/compute/contrib/test_extended_evacuate_find_host.py
  29. 101
      nova/tests/api/openstack/compute/contrib/test_extended_hypervisors.py
  30. 189
      nova/tests/api/openstack/compute/contrib/test_extended_ips.py
  31. 196
      nova/tests/api/openstack/compute/contrib/test_extended_ips_mac.py
  32. 148
      nova/tests/api/openstack/compute/contrib/test_extended_server_attributes.py
  33. 148
      nova/tests/api/openstack/compute/contrib/test_extended_status.py
  34. 123
      nova/tests/api/openstack/compute/contrib/test_extended_virtual_interfaces_net.py
  35. 124
      nova/tests/api/openstack/compute/contrib/test_extended_volumes.py
  36. 256
      nova/tests/api/openstack/compute/contrib/test_fixed_ips.py
  37. 402
      nova/tests/api/openstack/compute/contrib/test_flavor_access.py
  38. 127
      nova/tests/api/openstack/compute/contrib/test_flavor_disabled.py
  39. 465
      nova/tests/api/openstack/compute/contrib/test_flavor_manage.py
  40. 127
      nova/tests/api/openstack/compute/contrib/test_flavor_rxtx.py
  41. 126
      nova/tests/api/openstack/compute/contrib/test_flavor_swap.py
  42. 127
      nova/tests/api/openstack/compute/contrib/test_flavorextradata.py
  43. 403
      nova/tests/api/openstack/compute/contrib/test_flavors_extra_specs.py
  44. 412
      nova/tests/api/openstack/compute/contrib/test_floating_ip_dns.py
  45. 83
      nova/tests/api/openstack/compute/contrib/test_floating_ip_pools.py
  46. 853
      nova/tests/api/openstack/compute/contrib/test_floating_ips.py
  47. 139
      nova/tests/api/openstack/compute/contrib/test_floating_ips_bulk.py
  48. 106
      nova/tests/api/openstack/compute/contrib/test_fping.py
  49. 172
      nova/tests/api/openstack/compute/contrib/test_hide_server_addresses.py
  50. 471
      nova/tests/api/openstack/compute/contrib/test_hosts.py
  51. 92
      nova/tests/api/openstack/compute/contrib/test_hypervisor_status.py
  52. 596
      nova/tests/api/openstack/compute/contrib/test_hypervisors.py
  53. 138
      nova/tests/api/openstack/compute/contrib/test_image_size.py
  54. 327
      nova/tests/api/openstack/compute/contrib/test_instance_actions.py
  55. 210
      nova/tests/api/openstack/compute/contrib/test_instance_usage_audit_log.py
  56. 497
      nova/tests/api/openstack/compute/contrib/test_keypairs.py
  57. 231
      nova/tests/api/openstack/compute/contrib/test_migrate_server.py
  58. 204
      nova/tests/api/openstack/compute/contrib/test_multinic.py
  59. 610
      nova/tests/api/openstack/compute/contrib/test_networks.py
  60. 918
      nova/tests/api/openstack/compute/contrib/test_neutron_security_groups.py
  61. 222
      nova/tests/api/openstack/compute/contrib/test_quota_classes.py
  62. 648
      nova/tests/api/openstack/compute/contrib/test_quotas.py
  63. 270
      nova/tests/api/openstack/compute/contrib/test_rescue.py
  64. 220
      nova/tests/api/openstack/compute/contrib/test_scheduler_hints.py
  65. 515
      nova/tests/api/openstack/compute/contrib/test_security_group_default_rules.py
  66. 1767
      nova/tests/api/openstack/compute/contrib/test_security_groups.py
  67. 132
      nova/tests/api/openstack/compute/contrib/test_server_diagnostics.py
  68. 188
      nova/tests/api/openstack/compute/contrib/test_server_group_quotas.py
  69. 521
      nova/tests/api/openstack/compute/contrib/test_server_groups.py
  70. 94
      nova/tests/api/openstack/compute/contrib/test_server_password.py
  71. 183
      nova/tests/api/openstack/compute/contrib/test_server_start_stop.py
  72. 159
      nova/tests/api/openstack/compute/contrib/test_server_usage.py
  73. 576
      nova/tests/api/openstack/compute/contrib/test_services.py
  74. 148
      nova/tests/api/openstack/compute/contrib/test_shelve.py
  75. 539
      nova/tests/api/openstack/compute/contrib/test_simple_tenant_usage.py
  76. 209
      nova/tests/api/openstack/compute/contrib/test_snapshots.py
  77. 76
      nova/tests/api/openstack/compute/contrib/test_tenant_networks.py
  78. 127
      nova/tests/api/openstack/compute/contrib/test_virtual_interfaces.py
  79. 1083
      nova/tests/api/openstack/compute/contrib/test_volumes.py
  80. 263
      nova/tests/api/openstack/compute/plugins/v3/admin_only_action_common.py
  81. 383
      nova/tests/api/openstack/compute/plugins/v3/test_access_ips.py
  82. 95
      nova/tests/api/openstack/compute/plugins/v3/test_console_auth_tokens.py
  83. 270
      nova/tests/api/openstack/compute/plugins/v3/test_consoles.py
  84. 261
      nova/tests/api/openstack/compute/plugins/v3/test_create_backup.py
  85. 387
      nova/tests/api/openstack/compute/plugins/v3/test_extended_volumes.py
  86. 98
      nova/tests/api/openstack/compute/plugins/v3/test_extension_info.py
  87. 57
      nova/tests/api/openstack/compute/plugins/v3/test_lock_server.py
  88. 547
      nova/tests/api/openstack/compute/plugins/v3/test_multiple_create.py
  89. 60
      nova/tests/api/openstack/compute/plugins/v3/test_pause_server.py
  90. 236
      nova/tests/api/openstack/compute/plugins/v3/test_pci.py
  91. 1131
      nova/tests/api/openstack/compute/plugins/v3/test_server_actions.py
  92. 80
      nova/tests/api/openstack/compute/plugins/v3/test_server_password.py
  93. 3352
      nova/tests/api/openstack/compute/plugins/v3/test_servers.py
  94. 453
      nova/tests/api/openstack/compute/plugins/v3/test_services.py
  95. 48
      nova/tests/api/openstack/compute/plugins/v3/test_suspend_server.py
  96. 195
      nova/tests/api/openstack/compute/plugins/v3/test_user_data.py
  97. 186
      nova/tests/api/openstack/compute/test_api.py
  98. 61
      nova/tests/api/openstack/compute/test_auth.py
  99. 293
      nova/tests/api/openstack/compute/test_consoles.py
  100. 747
      nova/tests/api/openstack/compute/test_extensions.py
  101. Some files were not shown because too many files have changed in this diff Show More

@ -54,8 +54,8 @@ from nova.openstack.common import log as nova_logging
from nova import paths
from nova import rpc
from nova import service
from nova.tests import conf_fixture
from nova.tests import policy_fixture
from nova.tests.unit import conf_fixture
from nova.tests.unit import policy_fixture
from nova import utils

@ -1,49 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`nova.tests` -- Nova Unittests
=====================================================
.. automodule:: nova.tests
:platform: Unix
"""
# TODO(mikal): move eventlet imports to nova.__init__ once we move to PBR
import os
import sys
import traceback
# NOTE(mikal): All of this is because if dnspython is present in your
# environment then eventlet monkeypatches socket.getaddrinfo() with an
# implementation which doesn't work for IPv6. What we're checking here is
# that the magic environment variable was set when the import happened.
# NOTE(dims): Prevent this code from kicking in under docs generation
# as it leads to spurious errors/warning.
stack = traceback.extract_stack()
if ('eventlet' in sys.modules and
os.environ.get('EVENTLET_NO_GREENDNS', '').lower() != 'yes' and
(len(stack) < 2 or 'sphinx' not in stack[-2][0])):
raise ImportError('eventlet imported before nova/cmd/__init__ '
'(env var set to %s)'
% os.environ.get('EVENTLET_NO_GREENDNS'))
os.environ['EVENTLET_NO_GREENDNS'] = 'yes'
import eventlet
eventlet.monkey_patch(os=False)

@ -1,635 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the API endpoint."""
import random
import re
import StringIO
import boto
import boto.connection
from boto.ec2 import regioninfo
from boto import exception as boto_exc
# newer versions of boto use their own wrapper on top of httplib.HTTPResponse
if hasattr(boto.connection, 'HTTPResponse'):
httplib = boto.connection
else:
import httplib
import fixtures
import webob
from nova.api import auth
from nova.api import ec2
from nova.api.ec2 import ec2utils
from nova import block_device
from nova import context
from nova import exception
from nova.openstack.common import versionutils
from nova import test
from nova.tests import matchers
class FakeHttplibSocket(object):
"""a fake socket implementation for httplib.HTTPResponse, trivial."""
def __init__(self, response_string):
self.response_string = response_string
self._buffer = StringIO.StringIO(response_string)
def makefile(self, _mode, _other):
"""Returns the socket's internal buffer."""
return self._buffer
class FakeHttplibConnection(object):
"""A fake httplib.HTTPConnection for boto to use
requests made via this connection actually get translated and routed into
our WSGI app, we then wait for the response and turn it back into
the HTTPResponse that boto expects.
"""
def __init__(self, app, host, is_secure=False):
self.app = app
self.host = host
def request(self, method, path, data, headers):
req = webob.Request.blank(path)
req.method = method
req.body = data
req.headers = headers
req.headers['Accept'] = 'text/html'
req.host = self.host
# Call the WSGI app, get the HTTP response
resp = str(req.get_response(self.app))
# For some reason, the response doesn't have "HTTP/1.0 " prepended; I
# guess that's a function the web server usually provides.
resp = "HTTP/1.0 %s" % resp
self.sock = FakeHttplibSocket(resp)
self.http_response = httplib.HTTPResponse(self.sock)
# NOTE(vish): boto is accessing private variables for some reason
self._HTTPConnection__response = self.http_response
self.http_response.begin()
def getresponse(self):
return self.http_response
def getresponsebody(self):
return self.sock.response_string
def close(self):
"""Required for compatibility with boto/tornado."""
pass
class XmlConversionTestCase(test.NoDBTestCase):
"""Unit test api xml conversion."""
def test_number_conversion(self):
conv = ec2utils._try_convert
self.assertIsNone(conv('None'))
self.assertEqual(conv('True'), True)
self.assertEqual(conv('TRUE'), True)
self.assertEqual(conv('true'), True)
self.assertEqual(conv('False'), False)
self.assertEqual(conv('FALSE'), False)
self.assertEqual(conv('false'), False)
self.assertEqual(conv('0'), 0)
self.assertEqual(conv('42'), 42)
self.assertEqual(conv('3.14'), 3.14)
self.assertEqual(conv('-57.12'), -57.12)
self.assertEqual(conv('0x57'), 0x57)
self.assertEqual(conv('-0x57'), -0x57)
self.assertEqual(conv('-'), '-')
self.assertEqual(conv('-0'), 0)
self.assertEqual(conv('0.0'), 0.0)
self.assertEqual(conv('1e-8'), 0.0)
self.assertEqual(conv('-1e-8'), 0.0)
self.assertEqual(conv('0xDD8G'), '0xDD8G')
self.assertEqual(conv('0XDD8G'), '0XDD8G')
self.assertEqual(conv('-stringy'), '-stringy')
self.assertEqual(conv('stringy'), 'stringy')
self.assertEqual(conv('add'), 'add')
self.assertEqual(conv('remove'), 'remove')
self.assertEqual(conv(''), '')
class Ec2utilsTestCase(test.NoDBTestCase):
def test_ec2_id_to_id(self):
self.assertEqual(ec2utils.ec2_id_to_id('i-0000001e'), 30)
self.assertEqual(ec2utils.ec2_id_to_id('ami-1d'), 29)
self.assertEqual(ec2utils.ec2_id_to_id('snap-0000001c'), 28)
self.assertEqual(ec2utils.ec2_id_to_id('vol-0000001b'), 27)
def test_bad_ec2_id(self):
self.assertRaises(exception.InvalidEc2Id,
ec2utils.ec2_id_to_id,
'badone')
def test_id_to_ec2_id(self):
self.assertEqual(ec2utils.id_to_ec2_id(30), 'i-0000001e')
self.assertEqual(ec2utils.id_to_ec2_id(29, 'ami-%08x'), 'ami-0000001d')
self.assertEqual(ec2utils.id_to_ec2_snap_id(28), 'snap-0000001c')
self.assertEqual(ec2utils.id_to_ec2_vol_id(27), 'vol-0000001b')
def test_dict_from_dotted_str(self):
in_str = [('BlockDeviceMapping.1.DeviceName', '/dev/sda1'),
('BlockDeviceMapping.1.Ebs.SnapshotId', 'snap-0000001c'),
('BlockDeviceMapping.1.Ebs.VolumeSize', '80'),
('BlockDeviceMapping.1.Ebs.DeleteOnTermination', 'false'),
('BlockDeviceMapping.2.DeviceName', '/dev/sdc'),
('BlockDeviceMapping.2.VirtualName', 'ephemeral0')]
expected_dict = {
'block_device_mapping': {
'1': {'device_name': '/dev/sda1',
'ebs': {'snapshot_id': 'snap-0000001c',
'volume_size': 80,
'delete_on_termination': False}},
'2': {'device_name': '/dev/sdc',
'virtual_name': 'ephemeral0'}}}
out_dict = ec2utils.dict_from_dotted_str(in_str)
self.assertThat(out_dict, matchers.DictMatches(expected_dict))
def test_properties_root_defice_name(self):
mappings = [{"device": "/dev/sda1", "virtual": "root"}]
properties0 = {'mappings': mappings}
properties1 = {'root_device_name': '/dev/sdb', 'mappings': mappings}
root_device_name = block_device.properties_root_device_name(
properties0)
self.assertEqual(root_device_name, '/dev/sda1')
root_device_name = block_device.properties_root_device_name(
properties1)
self.assertEqual(root_device_name, '/dev/sdb')
def test_regex_from_ec2_regex(self):
def _test_re(ec2_regex, expected, literal, match=True):
regex = ec2utils.regex_from_ec2_regex(ec2_regex)
self.assertEqual(regex, expected)
if match:
self.assertIsNotNone(re.match(regex, literal))
else:
self.assertIsNone(re.match(regex, literal))
# wildcards
_test_re('foo', '\Afoo\Z(?s)', 'foo')
_test_re('foo', '\Afoo\Z(?s)', 'baz', match=False)
_test_re('foo?bar', '\Afoo.bar\Z(?s)', 'foo bar')
_test_re('foo?bar', '\Afoo.bar\Z(?s)', 'foo bar', match=False)
_test_re('foo*bar', '\Afoo.*bar\Z(?s)', 'foo QUUX bar')
# backslashes and escaped wildcards
_test_re('foo\\', '\Afoo\\\\\Z(?s)', 'foo\\')
_test_re('foo*bar', '\Afoo.*bar\Z(?s)', 'zork QUUX bar', match=False)
_test_re('foo\\?bar', '\Afoo[?]bar\Z(?s)', 'foo?bar')
_test_re('foo\\?bar', '\Afoo[?]bar\Z(?s)', 'foo bar', match=False)
_test_re('foo\\*bar', '\Afoo[*]bar\Z(?s)', 'foo*bar')
_test_re('foo\\*bar', '\Afoo[*]bar\Z(?s)', 'foo bar', match=False)
# analog to the example given in the EC2 API docs
ec2_regex = '\*nova\?\\end'
expected = r'\A[*]nova[?]\\end\Z(?s)'
literal = r'*nova?\end'
_test_re(ec2_regex, expected, literal)
def test_mapping_prepend_dev(self):
mappings = [
{'virtual': 'ami',
'device': 'sda1'},
{'virtual': 'root',
'device': '/dev/sda1'},
{'virtual': 'swap',
'device': 'sdb1'},
{'virtual': 'swap',
'device': '/dev/sdb2'},
{'virtual': 'ephemeral0',
'device': 'sdc1'},
{'virtual': 'ephemeral1',
'device': '/dev/sdc1'}]
expected_result = [
{'virtual': 'ami',
'device': 'sda1'},
{'virtual': 'root',
'device': '/dev/sda1'},
{'virtual': 'swap',
'device': '/dev/sdb1'},
{'virtual': 'swap',
'device': '/dev/sdb2'},
{'virtual': 'ephemeral0',
'device': '/dev/sdc1'},
{'virtual': 'ephemeral1',
'device': '/dev/sdc1'}]
self.assertThat(block_device.mappings_prepend_dev(mappings),
matchers.DictListMatches(expected_result))
class ApiEc2TestCase(test.TestCase):
"""Unit test for the cloud controller on an EC2 API."""
def setUp(self):
super(ApiEc2TestCase, self).setUp()
self.host = '127.0.0.1'
# NOTE(vish): skipping the Authorizer
roles = ['sysadmin', 'netadmin']
ctxt = context.RequestContext('fake', 'fake', roles=roles)
self.app = auth.InjectContext(ctxt, ec2.FaultWrapper(
ec2.RequestLogging(ec2.Requestify(ec2.Authorizer(ec2.Executor()
), 'nova.api.ec2.cloud.CloudController'))))
self.useFixture(fixtures.FakeLogger('boto'))
def expect_http(self, host=None, is_secure=False, api_version=None):
"""Returns a new EC2 connection."""
self.ec2 = boto.connect_ec2(
aws_access_key_id='fake',
aws_secret_access_key='fake',
is_secure=False,
region=regioninfo.RegionInfo(None, 'test', self.host),
port=8773,
path='/services/Cloud')
if api_version:
self.ec2.APIVersion = api_version
self.mox.StubOutWithMock(self.ec2, 'new_http_connection')
self.http = FakeHttplibConnection(
self.app, '%s:8773' % (self.host), False)
# pylint: disable=E1103
if versionutils.is_compatible('2.14', boto.Version, same_major=False):
self.ec2.new_http_connection(host or self.host, 8773,
is_secure).AndReturn(self.http)
elif versionutils.is_compatible('2', boto.Version, same_major=False):
self.ec2.new_http_connection(host or '%s:8773' % (self.host),
is_secure).AndReturn(self.http)
else:
self.ec2.new_http_connection(host, is_secure).AndReturn(self.http)
return self.http
def test_xmlns_version_matches_request_version(self):
self.expect_http(api_version='2010-10-30')
self.mox.ReplayAll()
# Any request should be fine
self.ec2.get_all_instances()
self.assertTrue(self.ec2.APIVersion in self.http.getresponsebody(),
'The version in the xmlns of the response does '
'not match the API version given in the request.')
def test_describe_instances(self):
"""Test that, after creating a user and a project, the describe
instances call to the API works properly.
"""
self.expect_http()
self.mox.ReplayAll()
self.assertEqual(self.ec2.get_all_instances(), [])
def test_terminate_invalid_instance(self):
# Attempt to terminate an invalid instance.
self.expect_http()
self.mox.ReplayAll()
self.assertRaises(boto_exc.EC2ResponseError,
self.ec2.terminate_instances, "i-00000005")
def test_get_all_key_pairs(self):
"""Test that, after creating a user and project and generating
a key pair, that the API call to list key pairs works properly.
"""
keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
self.expect_http()
self.mox.ReplayAll()
self.ec2.create_key_pair(keyname)
rv = self.ec2.get_all_key_pairs()
results = [k for k in rv if k.name == keyname]
self.assertEqual(len(results), 1)
def test_create_duplicate_key_pair(self):
"""Test that, after successfully generating a keypair,
requesting a second keypair with the same name fails sanely.
"""
self.expect_http()
self.mox.ReplayAll()
self.ec2.create_key_pair('test')
try:
self.ec2.create_key_pair('test')
except boto_exc.EC2ResponseError as e:
if e.code == 'InvalidKeyPair.Duplicate':
pass
else:
self.assertEqual('InvalidKeyPair.Duplicate', e.code)
else:
self.fail('Exception not raised.')
def test_get_all_security_groups(self):
# Test that we can retrieve security groups.
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
self.assertEqual(len(rv), 1)
self.assertEqual(rv[0].name, 'default')
def test_create_delete_security_group(self):
# Test that we can create a security group.
self.expect_http()
self.mox.ReplayAll()
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
self.ec2.create_security_group(security_group_name, 'test group')
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
self.assertEqual(len(rv), 2)
self.assertIn(security_group_name, [group.name for group in rv])
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
def test_group_name_valid_chars_security_group(self):
"""Test that we sanely handle invalid security group names.
EC2 API Spec states we should only accept alphanumeric characters,
spaces, dashes, and underscores. Amazon implementation
accepts more characters - so, [:print:] is ok.
"""
bad_strict_ec2 = "aa \t\x01\x02\x7f"
bad_amazon_ec2 = "aa #^% -=99"
test_raise = [
(True, bad_amazon_ec2, "test desc"),
(True, "test name", bad_amazon_ec2),
(False, bad_strict_ec2, "test desc"),
]
for t in test_raise:
self.expect_http()
self.mox.ReplayAll()
self.flags(ec2_strict_validation=t[0])
self.assertRaises(boto_exc.EC2ResponseError,
self.ec2.create_security_group,
t[1],
t[2])
test_accept = [
(False, bad_amazon_ec2, "test desc"),
(False, "test name", bad_amazon_ec2),
]
for t in test_accept:
self.expect_http()
self.mox.ReplayAll()
self.flags(ec2_strict_validation=t[0])
self.ec2.create_security_group(t[1], t[2])
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(t[1])
def test_group_name_valid_length_security_group(self):
"""Test that we sanely handle invalid security group names.
API Spec states that the length should not exceed 255 char.
"""
self.expect_http()
self.mox.ReplayAll()
# Test block group_name > 255 chars
security_group_name = "".join(random.choice("poiuytrewqasdfghjklmnbvc")
for x in range(random.randint(256, 266)))
self.assertRaises(boto_exc.EC2ResponseError,
self.ec2.create_security_group,
security_group_name,
'test group')
def test_authorize_revoke_security_group_cidr(self):
"""Test that we can add and remove CIDR based rules
to a security group
"""
self.expect_http()
self.mox.ReplayAll()
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name,
'test group')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.authorize('tcp', 80, 81, '0.0.0.0/0')
group.authorize('icmp', -1, -1, '0.0.0.0/0')
group.authorize('udp', 80, 81, '0.0.0.0/0')
group.authorize('tcp', 1, 65535, '0.0.0.0/0')
group.authorize('udp', 1, 65535, '0.0.0.0/0')
group.authorize('icmp', 1, 0, '0.0.0.0/0')
group.authorize('icmp', 0, 1, '0.0.0.0/0')
group.authorize('icmp', 0, 0, '0.0.0.0/0')
def _assert(message, *args):
try:
group.authorize(*args)
except boto_exc.EC2ResponseError as e:
self.assertEqual(e.status, 400, 'Expected status to be 400')
self.assertIn(message, e.error_message)
else:
raise self.failureException, 'EC2ResponseError not raised'
# Invalid CIDR address
_assert('Invalid CIDR', 'tcp', 80, 81, '0.0.0.0/0444')
# Missing ports
_assert('Not enough parameters', 'tcp', '0.0.0.0/0')
# from port cannot be greater than to port
_assert('Invalid port range', 'tcp', 100, 1, '0.0.0.0/0')
# For tcp, negative values are not allowed
_assert('Invalid port range', 'tcp', -1, 1, '0.0.0.0/0')
# For tcp, valid port range 1-65535
_assert('Invalid port range', 'tcp', 1, 65599, '0.0.0.0/0')
# Invalid Cidr for ICMP type
_assert('Invalid CIDR', 'icmp', -1, -1, '0.0.444.0/4')
# Invalid protocol
_assert('Invalid IP protocol', 'xyz', 1, 14, '0.0.0.0/0')
# Invalid port
_assert('Invalid input received: To and From ports must be integers',
'tcp', " ", "81", '0.0.0.0/0')
# Invalid icmp port
_assert('Invalid input received: '
'Type and Code must be integers for ICMP protocol type',
'icmp', " ", "81", '0.0.0.0/0')
# Invalid CIDR Address
_assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0')
# Invalid CIDR Address
_assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0/')
# Invalid Cidr ports
_assert('Invalid port range', 'icmp', 1, 256, '0.0.0.0/0')
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
group = [grp for grp in rv if grp.name == security_group_name][0]
self.assertEqual(len(group.rules), 8)
self.assertEqual(int(group.rules[0].from_port), 80)
self.assertEqual(int(group.rules[0].to_port), 81)
self.assertEqual(len(group.rules[0].grants), 1)
self.assertEqual(str(group.rules[0].grants[0]), '0.0.0.0/0')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.revoke('tcp', 80, 81, '0.0.0.0/0')
group.revoke('icmp', -1, -1, '0.0.0.0/0')
group.revoke('udp', 80, 81, '0.0.0.0/0')
group.revoke('tcp', 1, 65535, '0.0.0.0/0')
group.revoke('udp', 1, 65535, '0.0.0.0/0')
group.revoke('icmp', 1, 0, '0.0.0.0/0')
group.revoke('icmp', 0, 1, '0.0.0.0/0')
group.revoke('icmp', 0, 0, '0.0.0.0/0')
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
rv = self.ec2.get_all_security_groups()
self.assertEqual(len(rv), 1)
self.assertEqual(rv[0].name, 'default')
def test_authorize_revoke_security_group_cidr_v6(self):
"""Test that we can add and remove CIDR based rules
to a security group for IPv6
"""
self.expect_http()
self.mox.ReplayAll()
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name,
'test group')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.authorize('tcp', 80, 81, '::/0')
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
group = [grp for grp in rv if grp.name == security_group_name][0]
self.assertEqual(len(group.rules), 1)
self.assertEqual(int(group.rules[0].from_port), 80)
self.assertEqual(int(group.rules[0].to_port), 81)
self.assertEqual(len(group.rules[0].grants), 1)
self.assertEqual(str(group.rules[0].grants[0]), '::/0')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.revoke('tcp', 80, 81, '::/0')
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
rv = self.ec2.get_all_security_groups()
self.assertEqual(len(rv), 1)
self.assertEqual(rv[0].name, 'default')
def test_authorize_revoke_security_group_foreign_group(self):
"""Test that we can grant and revoke another security group access
to a security group
"""
self.expect_http()
self.mox.ReplayAll()
rand_string = 'sdiuisudfsdcnpaqwertasd'
security_group_name = "".join(random.choice(rand_string)
for x in range(random.randint(4, 8)))
other_security_group_name = "".join(random.choice(rand_string)
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name,
'test group')
self.expect_http()
self.mox.ReplayAll()
other_group = self.ec2.create_security_group(other_security_group_name,
'some other group')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.authorize(src_group=other_group)
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
# I don't bother checkng that we actually find it here,
# because the create/delete unit test further up should
# be good enough for that.
for group in rv:
if group.name == security_group_name:
self.assertEqual(len(group.rules), 3)
self.assertEqual(len(group.rules[0].grants), 1)
self.assertEqual(str(group.rules[0].grants[0]),
'%s-%s' % (other_security_group_name, 'fake'))
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
for group in rv:
if group.name == security_group_name:
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.revoke(src_group=other_group)
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
self.ec2.delete_security_group(other_security_group_name)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -1,277 +0,0 @@
# Copyright 2012 Cloudscaling, Inc.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo.config import cfg
from oslo.utils import timeutils
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.compute import utils as compute_utils
from nova import context
from nova import db
from nova import exception
from nova import test
from nova.tests import cast_as_call
from nova.tests import fake_network
from nova.tests import fake_notifier
from nova.tests.image import fake
CONF = cfg.CONF
CONF.import_opt('compute_driver', 'nova.virt.driver')
class EC2ValidateTestCase(test.TestCase):
def setUp(self):
super(EC2ValidateTestCase, self).setUp()
self.flags(compute_driver='nova.virt.fake.FakeDriver')
def dumb(*args, **kwargs):
pass
self.stubs.Set(compute_utils, 'notify_about_instance_usage', dumb)
fake_network.set_stub_network_methods(self.stubs)
# set up our cloud
self.cloud = cloud.CloudController()
# Short-circuit the conductor service
self.flags(use_local=True, group='conductor')
# Stub out the notification service so we use the no-op serializer
# and avoid lazy-load traces with the wrap_exception decorator in
# the compute service.
fake_notifier.stub_notifier(self.stubs)
self.addCleanup(fake_notifier.reset)
# set up services
self.conductor = self.start_service('conductor',
manager=CONF.conductor.manager)
self.compute = self.start_service('compute')
self.scheduter = self.start_service('scheduler')
self.network = self.start_service('network')
self.image_service = fake.FakeImageService()
self.user_id = 'fake'
self.project_id = 'fake'
self.context = context.RequestContext(self.user_id,
self.project_id,
is_admin=True)
self.EC2_MALFORMED_IDS = ['foobar', '', 123]
self.EC2_VALID__IDS = ['i-284f3a41', 'i-001', 'i-deadbeef']
self.ec2_id_exception_map = [(x,
exception.InvalidInstanceIDMalformed)
for x in self.EC2_MALFORMED_IDS]
self.ec2_id_exception_map.extend([(x, exception.InstanceNotFound)
for x in self.EC2_VALID__IDS])
self.volume_id_exception_map = [(x,
exception.InvalidVolumeIDMalformed)
for x in self.EC2_MALFORMED_IDS]
self.volume_id_exception_map.extend([(x, exception.VolumeNotFound)
for x in self.EC2_VALID__IDS])
def fake_show(meh, context, id, **kwargs):
return {'id': id,
'container_format': 'ami',
'properties': {
'kernel_id': 'cedef40a-ed67-4d10-800e-17455edce175',
'ramdisk_id': 'cedef40a-ed67-4d10-800e-17455edce175',
'type': 'machine',
'image_state': 'available'}}
def fake_detail(self, context, **kwargs):
image = fake_show(self, context, None)
image['name'] = kwargs.get('name')
return [image]
fake.stub_out_image_service(self.stubs)
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
self.stubs.Set(fake._FakeImageService, 'detail', fake_detail)
self.useFixture(cast_as_call.CastAsCall(self.stubs))
# make sure we can map ami-00000001/2 to a uuid in FakeImageService
db.s3_image_create(self.context,
'cedef40a-ed67-4d10-800e-17455edce175')
db.s3_image_create(self.context,
'76fa36fc-c930-4bf3-8c8a-ea2a2420deb6')
def tearDown(self):
super(EC2ValidateTestCase, self).tearDown()
fake.FakeImageService_reset()
# EC2_API tests (InvalidInstanceID.Malformed)
def test_console_output(self):
for ec2_id, e in self.ec2_id_exception_map:
self.assertRaises(e,
self.cloud.get_console_output,
context=self.context,
instance_id=[ec2_id])
def test_describe_instance_attribute(self):
for ec2_id, e in self.ec2_id_exception_map:
self.assertRaises(e,
self.cloud.describe_instance_attribute,
context=self.context,
instance_id=ec2_id,
attribute='kernel')
def test_instance_lifecycle(self):
lifecycle = [self.cloud.terminate_instances,
self.cloud.reboot_instances,
self.cloud.stop_instances,
self.cloud.start_instances,
]
for cmd in lifecycle:
for ec2_id, e in self.ec2_id_exception_map:
self.assertRaises(e,
cmd,
context=self.context,
instance_id=[ec2_id])
def test_create_image(self):
for ec2_id, e in self.ec2_id_exception_map:
self.assertRaises(e,
self.cloud.create_image,
context=self.context,
instance_id=ec2_id)
def test_create_snapshot(self):
for ec2_id, e in self.volume_id_exception_map:
self.assertRaises(e,
self.cloud.create_snapshot,
context=self.context,
volume_id=ec2_id)
def test_describe_volumes(self):
for ec2_id, e in self.volume_id_exception_map:
self.assertRaises(e,
self.cloud.describe_volumes,
context=self.context,
volume_id=[ec2_id])
def test_delete_volume(self):
for ec2_id, e in self.volume_id_exception_map:
self.assertRaises(e,
self.cloud.delete_volume,
context=self.context,
volume_id=ec2_id)
def test_detach_volume(self):
for ec2_id, e in self.volume_id_exception_map:
self.assertRaises(e,
self.cloud.detach_volume,
context=self.context,
volume_id=ec2_id)
class EC2TimestampValidationTestCase(test.NoDBTestCase):
"""Test case for EC2 request timestamp validation."""
def test_validate_ec2_timestamp_valid(self):
params = {'Timestamp': '2011-04-22T11:29:49Z'}
expired = ec2utils.is_ec2_timestamp_expired(params)
self.assertFalse(expired)
def test_validate_ec2_timestamp_old_format(self):
params = {'Timestamp': '2011-04-22T11:29:49'}
expired = ec2utils.is_ec2_timestamp_expired(params)
self.assertTrue(expired)
def test_validate_ec2_timestamp_not_set(self):
params = {}
expired = ec2utils.is_ec2_timestamp_expired(params)
self.assertFalse(expired)
def test_validate_ec2_timestamp_ms_time_regex(self):
result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49.123Z')
self.assertIsNotNone(result)
result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49.123456Z')
self.assertIsNotNone(result)
result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49.1234567Z')
self.assertIsNone(result)
result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49.123')
self.assertIsNone(result)
result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49Z')
self.assertIsNone(result)
def test_validate_ec2_timestamp_aws_sdk_format(self):
params = {'Timestamp': '2011-04-22T11:29:49.123Z'}
expired = ec2utils.is_ec2_timestamp_expired(params)
self.assertFalse(expired)
expired = ec2utils.is_ec2_timestamp_expired(params, expires=300)
self.assertTrue(expired)
def test_validate_ec2_timestamp_invalid_format(self):
params = {'Timestamp': '2011-04-22T11:29:49.000P'}
expired = ec2utils.is_ec2_timestamp_expired(params)
self.assertTrue(expired)
def test_validate_ec2_timestamp_advanced_time(self):
# EC2 request with Timestamp in advanced time
timestamp = timeutils.utcnow() + datetime.timedelta(seconds=250)
params = {'Timestamp': timeutils.strtime(timestamp,
"%Y-%m-%dT%H:%M:%SZ")}
expired = ec2utils.is_ec2_timestamp_expired(params, expires=300)
self.assertFalse(expired)
def test_validate_ec2_timestamp_advanced_time_expired(self):
timestamp = timeutils.utcnow() + datetime.timedelta(seconds=350)
params = {'Timestamp': timeutils.strtime(timestamp,
"%Y-%m-%dT%H:%M:%SZ")}
expired = ec2utils.is_ec2_timestamp_expired(params, expires=300)
self.assertTrue(expired)
def test_validate_ec2_req_timestamp_not_expired(self):
params = {'Timestamp': timeutils.isotime()}
expired = ec2utils.is_ec2_timestamp_expired(params, expires=15)
self.assertFalse(expired)
def test_validate_ec2_req_timestamp_expired(self):
params = {'Timestamp': '2011-04-22T12:00:00Z'}
compare = ec2utils.is_ec2_timestamp_expired(params, expires=300)
self.assertTrue(compare)
def test_validate_ec2_req_expired(self):
params = {'Expires': timeutils.isotime()}
expired = ec2utils.is_ec2_timestamp_expired(params)
self.assertTrue(expired)
def test_validate_ec2_req_not_expired(self):
expire = timeutils.utcnow() + datetime.timedelta(seconds=350)
params = {'Expires': timeutils.strtime(expire, "%Y-%m-%dT%H:%M:%SZ")}
expired = ec2utils.is_ec2_timestamp_expired(params)
self.assertFalse(expired)
def test_validate_Expires_timestamp_invalid_format(self):
# EC2 request with invalid Expires
params = {'Expires': '2011-04-22T11:29:49'}
expired = ec2utils.is_ec2_timestamp_expired(params)
self.assertTrue(expired)
def test_validate_ec2_req_timestamp_Expires(self):
# EC2 request with both Timestamp and Expires
params = {'Timestamp': '2011-04-22T11:29:49Z',
'Expires': timeutils.isotime()}
self.assertRaises(exception.InvalidRequest,
ec2utils.is_ec2_timestamp_expired,
params)

@ -1,734 +0,0 @@
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.serialization import jsonutils
from oslo.utils import timeutils
import webob
from nova.api.openstack import common
from nova.api.openstack.compute.contrib import admin_actions as \
admin_actions_v2
from nova.api.openstack.compute.plugins.v3 import admin_actions as \
admin_actions_v21
from nova.compute import vm_states
import nova.context
from nova import exception
from nova import objects
from nova.openstack.common import uuidutils
from nova import test
from nova.tests.api.openstack import fakes
from nova.tests import fake_instance
class CommonMixin(object):
admin_actions = None
fake_url = None
def _make_request(self, url, body):
req = webob.Request.blank(self.fake_url + url)
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.content_type = 'application/json'
return req.get_response(self.app)
def _stub_instance_get(self, uuid=None):
if uuid is None:
uuid = uuidutils.generate_uuid()
instance = fake_instance.fake_db_instance(
id=1, uuid=uuid, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
instance = objects.Instance._from_db_object(
self.context, objects.Instance(), instance)
self.compute_api.get(self.context, uuid, expected_attrs=None,
want_objects=True).AndReturn(instance)
return instance
def _stub_instance_get_failure(self, exc_info, uuid=None):
if uuid is None:
uuid = uuidutils.generate_uuid()
self.compute_api.get(self.context, uuid, expected_attrs=None,
want_objects=True).AndRaise(exc_info)
return uuid
def _test_non_existing_instance(self, action, body_map=None):
uuid = uuidutils.generate_uuid()
self._stub_instance_get_failure(
exception.InstanceNotFound(instance_id=uuid), uuid=uuid)
self.mox.ReplayAll()
res = self._make_request('/servers/%s/action' % uuid,
{action: body_map.get(action)})
self.assertEqual(404, res.status_int)
# Do these here instead of tearDown because this method is called
# more than once for the same test case
self.mox.VerifyAll()
self.mox.UnsetStubs()
def _test_action(self, action, body=None, method=None):
if method is None:
method = action
instance = self._stub_instance_get()
getattr(self.compute_api, method)(self.context, instance)
self.mox.ReplayAll()
res = self._make_request('/servers/%s/action' % instance['uuid'],
{action: None})
self.assertEqual(202, res.status_int)
# Do these here instead of tearDown because this method is called
# more than once for the same test case
self.mox.VerifyAll()
self.mox.UnsetStubs()
def _test_invalid_state(self, action, method=None, body_map=None,
compute_api_args_map=None):
if method is None:
method = action
if body_map is None:
body_map = {}
if compute_api_args_map is None:
compute_api_args_map = {}
instance = self._stub_instance_get()
args, kwargs = compute_api_args_map.get(action, ((), {}))
getattr(self.compute_api, method)(self.context, instance,
*args, **kwargs).AndRaise(
exception.InstanceInvalidState(
attr='vm_state', instance_uuid=instance['uuid'],
state='foo', method=method))
self.mox.ReplayAll()
res = self._make_request('/servers/%s/action' % instance['uuid'],
{action: body_map.get(action)})
self.assertEqual(409, res.status_int)
self.assertIn("Cannot \'%(action)s\' instance %(id)s"
% {'id': instance['uuid'], 'action': action}, res.body)
# Do these here instead of tearDown because this method is called
# more than once for the same test case
self.mox.VerifyAll()
self.mox.UnsetStubs()
def _test_locked_instance(self, action, method=None, body_map=None,
compute_api_args_map=None):
if method is None:
method = action
instance = self._stub_instance_get()
args, kwargs = (), {}
act = None
if compute_api_args_map:
args, kwargs = compute_api_args_map.get(action, ((), {}))
act = body_map.get(action)
getattr(self.compute_api, method)(self.context, instance,
*args, **kwargs).AndRaise(
exception.InstanceIsLocked(instance_uuid=instance['uuid']))
self.mox.ReplayAll()
res = self._make_request('/servers/%s/action' % instance['uuid'],
{action: act})
self.assertEqual(409, res.status_int)
self.assertIn('Instance %s is locked' % instance['uuid'], res.body)
# Do these here instead of tearDown because this method is called
# more than once for the same test case
self.mox.VerifyAll()
self.mox.UnsetStubs()
class AdminActionsTestV21(CommonMixin, test.NoDBTestCase):
admin_actions = admin_actions_v21
fake_url = '/v2/fake'
def setUp(self):
super(AdminActionsTestV21, self).setUp()
self.controller = self.admin_actions.AdminActionsController()
self.compute_api = self.controller.compute_api
self.context = nova.context.RequestContext('fake', 'fake')
def _fake_controller(*args, **kwargs):
return self.controller
self.stubs.Set(self.admin_actions, 'AdminActionsController',
_fake_controller)
self.app = self._get_app()
self.mox.StubOutWithMock(self.compute_api, 'get')
def _get_app(self):
return fakes.wsgi_app_v21(init_only=('servers',
'os-admin-actions'),
fake_auth_context=self.context)
def test_actions(self):
actions = ['resetNetwork', 'injectNetworkInfo']
method_translations = {'resetNetwork': 'reset_network',
'injectNetworkInfo': 'inject_network_info'}
for action in actions:
method = method_translations.get(action)
self.mox.StubOutWithMock(self.compute_api, method or action)
self._test_action(action, method=method)
# Re-mock this.
self.mox.StubOutWithMock(self.compute_api, 'get')
def test_actions_with_non_existed_instance(self):
actions = ['resetNetwork', 'injectNetworkInfo', 'os-resetState']
body_map = {'os-resetState': {'state': 'active'}}
for action in actions:
self._test_non_existing_instance(action,
body_map=body_map)
# Re-mock this.
self.mox.StubOutWithMock(self.compute_api, 'get')
def test_actions_with_locked_instance(self):
actions = ['resetNetwork', 'injectNetworkInfo']
method_translations = {'resetNetwork': 'reset_network',
'injectNetworkInfo': 'inject_network_info'}
for action in actions:
method = method_translations.get(action)
self.mox.StubOutWithMock(self.compute_api, method or action)
self._test_locked_instance(action, method=method)
# Re-mock this.
self.mox.StubOutWithMock(self.compute_api, 'get')
class AdminActionsTestV2(AdminActionsTestV21):
admin_actions = admin_actions_v2
def setUp(self):
super(AdminActionsTestV2, self).setUp()
self.flags(
osapi_compute_extension=[
'nova.api.openstack.compute.contrib.select_extensions'],
osapi_compute_ext_list=['Admin_actions'])
def _get_app(self):
return fakes.wsgi_app(init_only=('servers',),
fake_auth_context=self.context)
def test_actions(self):
actions = ['pause', 'unpause', 'suspend', 'resume', 'migrate',
'resetNetwork', 'injectNetworkInfo', 'lock',
'unlock']
method_translations = {'migrate': 'resize',
'resetNetwork': 'reset_network',
'injectNetworkInfo': 'inject_network_info'}
for action in actions:
method = method_translations.get(action)
self.mox.StubOutWithMock(self.compute_api, method or action)
self._test_action(action, method=method)
# Re-mock this.
self.mox.StubOutWithMock(self.compute_api, 'get')
def test_actions_raise_conflict_on_invalid_state(self):
actions = ['pause', 'unpause', 'suspend', 'resume', 'migrate',
'os-migrateLive']
method_translations = {'migrate': 'resize',
'os-migrateLive': 'live_migrate'}
body_map = {'os-migrateLive':
{'host': 'hostname',
'block_migration': False,
'disk_over_commit': False}}
args_map = {'os-migrateLive': ((False, False, 'hostname'), {})}
for action in actions:
method = method_translations.get(action)
self.mox.StubOutWithMock(self.compute_api, method or action)
self._test_invalid_state(action, method=method, body_map=body_map,
compute_api_args_map=args_map)
# Re-mock this.
self.mox.StubOutWithMock(self.compute_api, 'get')
def test_actions_with_non_existed_instance(self):
actions = ['pause', 'unpause', 'suspend', 'resume',
'resetNetwork', 'injectNetworkInfo', 'lock',
'unlock', 'os-resetState', 'migrate', 'os-migrateLive']
body_map = {'os-resetState': {'state': 'active'},
'os-migrateLive':