fix reviewer's comment

This commit is contained in:
Kei Masumoto
2011-07-12 17:59:24 +09:00
19 changed files with 512 additions and 22 deletions

View File

@@ -50,4 +50,5 @@
<ilyaalekseyev@acm.org> <ialekseev@griddynamics.com>
<ilyaalekseyev@acm.org> <ilya@oscloud.ru>
<reldan@oscloud.ru> <enugaev@griddynamics.com>
<kshileev@gmail.com> <kshileev@griddynamics.com>
<kshileev@gmail.com> <kshileev@griddynamics.com>
<nsokolov@griddynamics.com> <nsokolov@griddynamics.net>

View File

@@ -20,6 +20,7 @@ Dan Prince <dan.prince@rackspace.com>
Dave Walker <DaveWalker@ubuntu.com>
David Pravec <David.Pravec@danix.org>
Dean Troyer <dtroyer@gmail.com>
Devendra Modium <dmodium@isi.edu>
Devin Carlen <devin.carlen@gmail.com>
Ed Leafe <ed@leafe.com>
Eldar Nugaev <reldan@oscloud.ru>
@@ -43,6 +44,7 @@ John Dewey <john@dewey.ws>
John Tran <jtran@attinteractive.com>
Jonathan Bryce <jbryce@jbryce.com>
Jordan Rinke <jordan@openstack.org>
Joseph Suh <jsuh@isi.edu>
Josh Durgin <joshd@hq.newdream.net>
Josh Kearney <josh@jk0.org>
Josh Kleinpeter <josh@kleinpeter.org>
@@ -68,6 +70,7 @@ MORITA Kazutaka <morita.kazutaka@gmail.com>
Muneyuki Noguchi <noguchimn@nttdata.co.jp>
Nachi Ueno <ueno.nachi@lab.ntt.co.jp>
Naveed Massjouni <naveedm9@gmail.com>
Nikolay Sokolov <nsokolov@griddynamics.com>
Nirmal Ranganathan <nirmal.ranganathan@rackspace.com>
Paul Voccio <paul@openstack.org>
Renuka Apte <renuka.apte@citrix.com>

View File

@@ -24,8 +24,10 @@ Starts both the EC2 and OpenStack APIs in separate processes.
"""
import os
import signal
import sys
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(
sys.argv[0]), os.pardir, os.pardir))
if os.path.exists(os.path.join(possible_topdir, "nova", "__init__.py")):
@@ -34,17 +36,23 @@ if os.path.exists(os.path.join(possible_topdir, "nova", "__init__.py")):
import nova.service
import nova.utils
from nova import flags
FLAGS = flags.FLAGS
def main():
"""Launch EC2 and OSAPI services."""
nova.utils.Bootstrapper.bootstrap_binary(sys.argv)
ec2 = nova.service.WSGIService("ec2")
osapi = nova.service.WSGIService("osapi")
launcher = nova.service.Launcher()
launcher.launch_service(ec2)
launcher.launch_service(osapi)
for api in FLAGS.enabled_apis:
service = nova.service.WSGIService(api)
launcher.launch_service(service)
signal.signal(signal.SIGTERM, lambda *_: launcher.stop())
try:
launcher.wait()

View File

@@ -63,6 +63,19 @@ flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
def handle_flash_socket_policy(socket):
LOG.info(_("Received connection on flash socket policy port"))
fd = socket.makefile('rw')
expected_command = "<policy-file-request/>"
if expected_command in fd.read(len(expected_command) + 1):
LOG.info(_("Received valid flash socket policy request"))
fd.write('<?xml version="1.0"?><cross-domain-policy><allow-'
'access-from domain="*" to-ports="%d" /></cross-'
'domain-policy>' % (FLAGS.vncproxy_port))
fd.flush()
socket.close()
if __name__ == "__main__":
utils.default_flagfile()
FLAGS(sys.argv)
@@ -101,4 +114,6 @@ if __name__ == "__main__":
host=FLAGS.vncproxy_host,
port=FLAGS.vncproxy_port)
server.start()
server.start_tcp(handle_flash_socket_policy, 843, host=FLAGS.vncproxy_host)
server.wait()

View File

@@ -220,7 +220,7 @@ class DestinationHypervisorTooOld(Invalid):
"has been provided.")
class DestinatioinDiskExists(Invalid):
class DestinationDiskExists(Invalid):
message = _("The supplied disk path (%(path)s) already exists, "
"it is expected not to exist.")

View File

@@ -305,6 +305,8 @@ DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host')
DEFINE_integer('rabbit_retry_interval', 10, 'rabbit connection retry interval')
DEFINE_integer('rabbit_max_retries', 12, 'rabbit connection attempts')
DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to')
DEFINE_list('enabled_apis', ['ec2', 'osapi'],
'list of APIs to enable by default')
DEFINE_string('ec2_host', '$my_ip', 'ip of api server')
DEFINE_string('ec2_dmz_host', '$my_ip', 'internal ip of api server')
DEFINE_integer('ec2_port', 8773, 'cloud controller port')

View File

@@ -51,6 +51,11 @@ def _call_scheduler(method, context, params=None):
return rpc.call(context, queue, kwargs)
def get_host_list(context):
"""Return a list of hosts associated with this zone."""
return _call_scheduler('get_host_list', context)
def get_zone_list(context):
"""Return a list of zones assoicated with this zone."""
items = _call_scheduler('get_zone_list', context)

View File

@@ -199,7 +199,8 @@ class Scheduler(object):
try:
self.mounted_on_same_shared_storage(context, instance_ref, dest)
if block_migration:
reason = "Block migration can not be used with shared storage."
reason = _("Block migration can not be used "
"with shared storage.")
raise exception.InvalidSharedStorage(reason=reason, path=dest)
except rpc.RemoteError:
if not block_migration:

View File

@@ -56,6 +56,10 @@ class SchedulerManager(manager.Manager):
"""Poll child zones periodically to get status."""
self.zone_manager.ping(context)
def get_host_list(self, context=None):
"""Get a list of hosts from the ZoneManager."""
return self.zone_manager.get_host_list()
def get_zone_list(self, context=None):
"""Get a list of zones from the ZoneManager."""
return self.zone_manager.get_zone_list()
@@ -115,7 +119,7 @@ class SchedulerManager(manager.Manager):
:returns:
example format is below.
{'resource':D, 'usage':{proj_id1:D, proj_id2:D}}
D: {'vcpus':3, 'memory_mb':2048, 'local_gb':2048
D: {'vcpus':3, 'memory_mb':2048, 'local_gb':2048,
'vcpus_used': 12, 'memory_mb': 10240,
'local_gb': 64}

View File

@@ -178,12 +178,14 @@ class ZoneAwareScheduler(driver.Scheduler):
to adjust the weights returned from the child zones. Alters
child_results in place.
"""
for zone, result in child_results:
for zone_id, result in child_results:
if not result:
continue
assert isinstance(zone_id, int)
for zone_rec in zones:
if zone_rec['api_url'] != zone:
if zone_rec['id'] != zone_id:
continue
for item in result:
@@ -196,7 +198,7 @@ class ZoneAwareScheduler(driver.Scheduler):
item['raw_weight'] = raw_weight
except KeyError:
LOG.exception(_("Bad child zone scaling values "
"for Zone: %(zone)s") % locals())
"for Zone: %(zone_id)s") % locals())
def schedule_run_instance(self, context, instance_id, request_spec,
*args, **kwargs):

View File

@@ -115,6 +115,18 @@ class ZoneManager(object):
"""Return the list of zones we know about."""
return [zone.to_dict() for zone in self.zone_states.values()]
def get_host_list(self):
"""Returns a list of dicts for each host that the Zone Manager
knows about. Each dict contains the host_name and the service
for that host.
"""
all_hosts = self.service_states.keys()
ret = []
for host in self.service_states:
for svc in self.service_states[host]:
ret.append({"service": svc, "host_name": host})
return ret
def get_zone_capabilities(self, context):
"""Roll up all the individual host info to generic 'service'
capabilities. Each capability is aggregated into
@@ -125,15 +137,30 @@ class ZoneManager(object):
# But it's likely to change once we understand what the Best-Match
# code will need better.
combined = {} # { <service>_<cap> : (min, max), ... }
stale_host_services = {} # { host1 : [svc1, svc2], host2 :[svc1]}
for host, host_dict in hosts_dict.iteritems():
for service_name, service_dict in host_dict.iteritems():
if not service_dict.get("enabled", True):
# Service is disabled; do no include it
continue
#Check if the service capabilities became stale
if self.host_service_caps_stale(host, service_name):
if host not in stale_host_services:
stale_host_services[host] = [] # Adding host key once
stale_host_services[host].append(service_name)
continue
for cap, value in service_dict.iteritems():
if cap == "timestamp": # Timestamp is not needed
continue
key = "%s_%s" % (service_name, cap)
min_value, max_value = combined.get(key, (value, value))
min_value = min(min_value, value)
max_value = max(max_value, value)
combined[key] = (min_value, max_value)
# Delete the expired host services
self.delete_expired_host_services(stale_host_services)
return combined
def _refresh_from_db(self, context):
@@ -172,5 +199,24 @@ class ZoneManager(object):
logging.debug(_("Received %(service_name)s service update from "
"%(host)s: %(capabilities)s") % locals())
service_caps = self.service_states.get(host, {})
capabilities["timestamp"] = utils.utcnow() # Reported time
service_caps[service_name] = capabilities
self.service_states[host] = service_caps
def host_service_caps_stale(self, host, service):
"""Check if host service capabilites are not recent enough."""
allowed_time_diff = FLAGS.periodic_interval * 3
caps = self.service_states[host][service]
if (utils.utcnow() - caps["timestamp"]) <= \
datetime.timedelta(seconds=allowed_time_diff):
return False
return True
def delete_expired_host_services(self, host_services_dict):
"""Delete all the inactive host services information."""
for host, services in host_services_dict.iteritems():
service_caps = self.service_states[host]
for service in services:
del service_caps[service]
if len(service_caps) == 0: # Delete host if no services
del self.service_states[host]

View File

@@ -122,19 +122,19 @@ def fake_decrypt_blob_returns_child_info(blob):
def fake_call_zone_method(context, method, specs, zones):
return [
('zone1', [
(1, [
dict(weight=1, blob='AAAAAAA'),
dict(weight=111, blob='BBBBBBB'),
dict(weight=112, blob='CCCCCCC'),
dict(weight=113, blob='DDDDDDD'),
]),
('zone2', [
(2, [
dict(weight=120, blob='EEEEEEE'),
dict(weight=2, blob='FFFFFFF'),
dict(weight=122, blob='GGGGGGG'),
dict(weight=123, blob='HHHHHHH'),
]),
('zone3', [
(3, [
dict(weight=130, blob='IIIIIII'),
dict(weight=131, blob='JJJJJJJ'),
dict(weight=132, blob='KKKKKKK'),

View File

@@ -67,7 +67,8 @@ class CloudTestCase(test.TestCase):
host = self.network.host
def fake_show(meh, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
return {'id': 1, 'container_format': 'ami',
'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine', 'image_state': 'available'}}
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
@@ -418,7 +419,8 @@ class CloudTestCase(test.TestCase):
describe_images = self.cloud.describe_images
def fake_detail(meh, context):
return [{'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
return [{'id': 1, 'container_format': 'ami',
'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine'}}]
def fake_show_none(meh, context, id):
@@ -448,7 +450,8 @@ class CloudTestCase(test.TestCase):
def fake_show(meh, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine'}, 'is_public': True}
'type': 'machine'}, 'container_format': 'ami',
'is_public': True}
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show)
@@ -460,7 +463,8 @@ class CloudTestCase(test.TestCase):
modify_image_attribute = self.cloud.modify_image_attribute
def fake_show(meh, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
return {'id': 1, 'container_format': 'ami',
'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine'}, 'is_public': False}
def fake_update(meh, context, image_id, metadata, data=None):
@@ -494,6 +498,16 @@ class CloudTestCase(test.TestCase):
self.assertRaises(exception.ImageNotFound, deregister_image,
self.context, 'ami-bad001')
def test_deregister_image_wrong_container_type(self):
deregister_image = self.cloud.deregister_image
def fake_delete(self, context, id):
return None
self.stubs.Set(fake._FakeImageService, 'delete', fake_delete)
self.assertRaises(exception.NotFound, deregister_image, self.context,
'aki-00000001')
def _run_instance(self, **kwargs):
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
@@ -609,7 +623,7 @@ class CloudTestCase(test.TestCase):
def fake_show_no_state(self, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine'}}
'type': 'machine'}, 'container_format': 'ami'}
self.stubs.UnsetAll()
self.stubs.Set(fake._FakeImageService, 'show', fake_show_no_state)
@@ -623,7 +637,8 @@ class CloudTestCase(test.TestCase):
run_instances = self.cloud.run_instances
def fake_show_decrypt(self, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
return {'id': 1, 'container_format': 'ami',
'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine', 'image_state': 'decrypting'}}
self.stubs.UnsetAll()
@@ -638,7 +653,8 @@ class CloudTestCase(test.TestCase):
run_instances = self.cloud.run_instances
def fake_show_stat_active(self, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
return {'id': 1, 'container_format': 'ami',
'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine'}, 'status': 'active'}
self.stubs.Set(fake._FakeImageService, 'show', fake_show_stat_active)

View File

@@ -532,6 +532,14 @@ class ComputeTestCase(test.TestCase):
self.context, instance_id, 1)
self.compute.terminate_instance(self.context, instance_id)
def test_migrate(self):
context = self.context.elevated()
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
# Migrate simply calls resize() without a flavor_id.
self.compute_api.resize(context, instance_id, None)
self.compute.terminate_instance(context, instance_id)
def _setup_other_managers(self):
self.volume_manager = utils.import_object(FLAGS.volume_manager)
self.network_manager = utils.import_object(FLAGS.network_manager)

102
nova/tests/test_hosts.py Normal file
View File

@@ -0,0 +1,102 @@
# Copyright (c) 2011 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import stubout
import webob.exc
from nova import context
from nova import exception
from nova import flags
from nova import log as logging
from nova import test
from nova.api.openstack.contrib import hosts as os_hosts
from nova.scheduler import api as scheduler_api
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.hosts')
# Simulate the hosts returned by the zone manager.
HOST_LIST = [
{"host_name": "host_c1", "service": "compute"},
{"host_name": "host_c2", "service": "compute"},
{"host_name": "host_v1", "service": "volume"},
{"host_name": "host_v2", "service": "volume"}]
def stub_get_host_list(req):
return HOST_LIST
def stub_set_host_enabled(context, host, enabled):
# We'll simulate success and failure by assuming
# that 'host_c1' always succeeds, and 'host_c2'
# always fails
fail = (host == "host_c2")
status = "enabled" if (enabled ^ fail) else "disabled"
return status
class FakeRequest(object):
environ = {"nova.context": context.get_admin_context()}
class HostTestCase(test.TestCase):
"""Test Case for hosts."""
def setUp(self):
super(HostTestCase, self).setUp()
self.controller = os_hosts.HostController()
self.req = FakeRequest()
self.stubs.Set(scheduler_api, 'get_host_list', stub_get_host_list)
self.stubs.Set(self.controller.compute_api, 'set_host_enabled',
stub_set_host_enabled)
def test_list_hosts(self):
"""Verify that the compute hosts are returned."""
hosts = os_hosts._list_hosts(self.req)
self.assertEqual(hosts, HOST_LIST)
compute_hosts = os_hosts._list_hosts(self.req, "compute")
expected = [host for host in HOST_LIST
if host["service"] == "compute"]
self.assertEqual(compute_hosts, expected)
def test_disable_host(self):
dis_body = {"status": "disable"}
result_c1 = self.controller.update(self.req, "host_c1", body=dis_body)
self.assertEqual(result_c1["status"], "disabled")
result_c2 = self.controller.update(self.req, "host_c2", body=dis_body)
self.assertEqual(result_c2["status"], "enabled")
def test_enable_host(self):
en_body = {"status": "enable"}
result_c1 = self.controller.update(self.req, "host_c1", body=en_body)
self.assertEqual(result_c1["status"], "enabled")
result_c2 = self.controller.update(self.req, "host_c2", body=en_body)
self.assertEqual(result_c2["status"], "disabled")
def test_bad_status_value(self):
bad_body = {"status": "bad"}
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.update,
self.req, "host_c1", body=bad_body)
def test_bad_update_key(self):
bad_body = {"crazy": "bad"}
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.update,
self.req, "host_c1", body=bad_body)
def test_bad_host(self):
self.assertRaises(exception.HostNotFound, self.controller.update,
self.req, "bogus_host_name", body={"status": "disable"})

View File

@@ -381,6 +381,18 @@ class XenAPIVMTestCase(test.TestCase):
self.assertEquals(self.vm['HVM_boot_params'], {})
self.assertEquals(self.vm['HVM_boot_policy'], '')
def _list_vdis(self):
url = FLAGS.xenapi_connection_url
username = FLAGS.xenapi_connection_username
password = FLAGS.xenapi_connection_password
session = xenapi_conn.XenAPISession(url, username, password)
return session.call_xenapi('VDI.get_all')
def _check_vdis(self, start_list, end_list):
for vdi_ref in end_list:
if not vdi_ref in start_list:
self.fail('Found unexpected VDI:%s' % vdi_ref)
def _test_spawn(self, image_ref, kernel_id, ramdisk_id,
instance_type_id="3", os_type="linux",
architecture="x86-64", instance_id=1,
@@ -422,6 +434,36 @@ class XenAPIVMTestCase(test.TestCase):
self._test_spawn,
1, 2, 3, "4") # m1.xlarge
def test_spawn_fail_cleanup_1(self):
"""Simulates an error while downloading an image.
Verifies that VDIs created are properly cleaned up.
"""
vdi_recs_start = self._list_vdis()
FLAGS.xenapi_image_service = 'glance'
stubs.stubout_fetch_image_glance_disk(self.stubs)
self.assertRaises(xenapi_fake.Failure,
self._test_spawn, 1, 2, 3)
# No additional VDI should be found.
vdi_recs_end = self._list_vdis()
self._check_vdis(vdi_recs_start, vdi_recs_end)
def test_spawn_fail_cleanup_2(self):
"""Simulates an error while creating VM record.
It verifies that VDIs created are properly cleaned up.
"""
vdi_recs_start = self._list_vdis()
FLAGS.xenapi_image_service = 'glance'
stubs.stubout_create_vm(self.stubs)
self.assertRaises(xenapi_fake.Failure,
self._test_spawn, 1, 2, 3)
# No additional VDI should be found.
vdi_recs_end = self._list_vdis()
self._check_vdis(vdi_recs_start, vdi_recs_end)
def test_spawn_raw_objectstore(self):
FLAGS.xenapi_image_service = 'objectstore'
self._test_spawn(1, None, None)

View File

@@ -198,3 +198,178 @@ class ZoneManagerTestCase(test.TestCase):
self.assertEquals(zone_state.attempt, 3)
self.assertFalse(zone_state.is_active)
self.assertEquals(zone_state.name, None)
def test_host_service_caps_stale_no_stale_service(self):
zm = zone_manager.ZoneManager()
# services just updated capabilities
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
self.assertFalse(zm.host_service_caps_stale("host1", "svc1"))
self.assertFalse(zm.host_service_caps_stale("host1", "svc2"))
def test_host_service_caps_stale_all_stale_services(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# Both services became stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
time_future = utils.utcnow() + datetime.timedelta(seconds=expiry_time)
utils.set_time_override(time_future)
self.assertTrue(zm.host_service_caps_stale("host1", "svc1"))
self.assertTrue(zm.host_service_caps_stale("host1", "svc2"))
utils.clear_time_override()
def test_host_service_caps_stale_one_stale_service(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# One service became stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
caps = zm.service_states["host1"]["svc1"]
caps["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
self.assertTrue(zm.host_service_caps_stale("host1", "svc1"))
self.assertFalse(zm.host_service_caps_stale("host1", "svc2"))
def test_delete_expired_host_services_del_one_service(self):
zm = zone_manager.ZoneManager()
# Delete one service in a host
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
stale_host_services = {"host1": ["svc1"]}
zm.delete_expired_host_services(stale_host_services)
self.assertFalse("svc1" in zm.service_states["host1"])
self.assertTrue("svc2" in zm.service_states["host1"])
def test_delete_expired_host_services_del_all_hosts(self):
zm = zone_manager.ZoneManager()
# Delete all services in a host
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
stale_host_services = {"host1": ["svc1", "svc2"]}
zm.delete_expired_host_services(stale_host_services)
self.assertFalse("host1" in zm.service_states)
def test_delete_expired_host_services_del_one_service_per_host(self):
zm = zone_manager.ZoneManager()
# Delete one service per host
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
stale_host_services = {"host1": ["svc1"], "host2": ["svc1"]}
zm.delete_expired_host_services(stale_host_services)
self.assertFalse("host1" in zm.service_states)
self.assertFalse("host2" in zm.service_states)
def test_get_zone_capabilities_one_host(self):
zm = zone_manager.ZoneManager()
# Service capabilities recent
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
def test_get_zone_capabilities_expired_host(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# Service capabilities stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
time_future = utils.utcnow() + datetime.timedelta(seconds=expiry_time)
utils.set_time_override(time_future)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, {})
utils.clear_time_override()
def test_get_zone_capabilities_multiple_hosts(self):
zm = zone_manager.ZoneManager()
# Both host service capabilities recent
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 3), svc1_b=(2, 4)))
def test_get_zone_capabilities_one_stale_host(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# One host service capabilities become stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
serv_caps = zm.service_states["host1"]["svc1"]
serv_caps["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(3, 3), svc1_b=(4, 4)))
def test_get_zone_capabilities_multiple_service_per_host(self):
zm = zone_manager.ZoneManager()
# Multiple services per host
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
zm.update_service_capabilities("svc2", "host1", dict(a=5, b=6))
zm.update_service_capabilities("svc2", "host2", dict(a=7, b=8))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 3), svc1_b=(2, 4),
svc2_a=(5, 7), svc2_b=(6, 8)))
def test_get_zone_capabilities_one_stale_service_per_host(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# Two host services among four become stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
zm.update_service_capabilities("svc2", "host1", dict(a=5, b=6))
zm.update_service_capabilities("svc2", "host2", dict(a=7, b=8))
serv_caps_1 = zm.service_states["host1"]["svc2"]
serv_caps_1["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
serv_caps_2 = zm.service_states["host2"]["svc1"]
serv_caps_2["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2),
svc2_a=(7, 7), svc2_b=(8, 8)))
def test_get_zone_capabilities_three_stale_host_services(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# Three host services among four become stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
zm.update_service_capabilities("svc2", "host1", dict(a=5, b=6))
zm.update_service_capabilities("svc2", "host2", dict(a=7, b=8))
serv_caps_1 = zm.service_states["host1"]["svc2"]
serv_caps_1["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
serv_caps_2 = zm.service_states["host2"]["svc1"]
serv_caps_2["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
serv_caps_3 = zm.service_states["host2"]["svc2"]
serv_caps_3["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
def test_get_zone_capabilities_all_stale_host_services(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# All the host services become stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
zm.update_service_capabilities("svc2", "host1", dict(a=5, b=6))
zm.update_service_capabilities("svc2", "host2", dict(a=7, b=8))
time_future = utils.utcnow() + datetime.timedelta(seconds=expiry_time)
utils.set_time_override(time_future)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, {})

View File

@@ -98,6 +98,42 @@ def stubout_is_vdi_pv(stubs):
stubs.Set(vm_utils, '_is_vdi_pv', f)
def stubout_determine_is_pv_objectstore(stubs):
"""Assumes VMs never have PV kernels"""
@classmethod
def f(cls, *args):
return False
stubs.Set(vm_utils.VMHelper, '_determine_is_pv_objectstore', f)
def stubout_lookup_image(stubs):
"""Simulates a failure in lookup image."""
def f(_1, _2, _3, _4):
raise Exception("Test Exception raised by fake lookup_image")
stubs.Set(vm_utils, 'lookup_image', f)
def stubout_fetch_image_glance_disk(stubs):
"""Simulates a failure in fetch image_glance_disk."""
@classmethod
def f(cls, *args):
raise fake.Failure("Test Exception raised by " +
"fake fetch_image_glance_disk")
stubs.Set(vm_utils.VMHelper, '_fetch_image_glance_disk', f)
def stubout_create_vm(stubs):
"""Simulates a failure in create_vm."""
@classmethod
def f(cls, *args):
raise fake.Failure("Test Exception raised by " +
"fake create_vm")
stubs.Set(vm_utils.VMHelper, 'create_vm', f)
def stubout_loopingcall_start(stubs):
def fake_start(self, interval, now=True):
self.f(*self.args, **self.kw)
@@ -120,6 +156,9 @@ class FakeSessionForVMTests(fake.SessionBase):
super(FakeSessionForVMTests, self).__init__(uri)
def host_call_plugin(self, _1, _2, plugin, method, _5):
# If the call is for 'copy_kernel_vdi' return None.
if method == 'copy_kernel_vdi':
return
sr_ref = fake.get_all('SR')[0]
vdi_ref = fake.create_vdi('', False, sr_ref, False)
vdi_rec = fake.get_record('VDI', vdi_ref)

View File

@@ -67,6 +67,7 @@ class Server(object):
self.host = host or "0.0.0.0"
self.port = port or 0
self._server = None
self._tcp_server = None
self._socket = None
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
self._logger = logging.getLogger("eventlet.wsgi.server")
@@ -106,6 +107,17 @@ class Server(object):
"""
LOG.info(_("Stopping WSGI server."))
self._server.kill()
if self._tcp_server is not None:
LOG.info(_("Stopping raw TCP server."))
self._tcp_server.kill()
def start_tcp(self, listener, port, host='0.0.0.0', key=None, backlog=128):
"""Run a raw TCP server with the given application."""
arg0 = sys.argv[0]
LOG.info(_('Starting TCP server %(arg0)s on %(host)s:%(port)s')
% locals())
socket = eventlet.listen((host, port), backlog=backlog)
self._tcp_server = self._pool.spawn_n(self._run_tcp, listener, socket)
def wait(self):
"""Block, until the server has stopped.
@@ -120,6 +132,15 @@ class Server(object):
except greenlet.GreenletExit:
LOG.info(_("WSGI server has stopped."))
def _run_tcp(self, listener, socket):
"""Start a raw TCP server in a new green thread."""
while True:
try:
new_sock, address = socket.accept()
self._pool.spawn_n(listener, new_sock)
except (SystemExit, KeyboardInterrupt):
pass
class Request(webob.Request):
pass