diff --git a/bin/nova-scheduler b/bin/nova-scheduler new file mode 100755 index 00000000..38a8f213 --- /dev/null +++ b/bin/nova-scheduler @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" + Twistd daemon for the nova scheduler nodes. +""" + +import os +import sys + +# If ../nova/__init__.py exists, add ../ to Python search path, so that +# it will override what happens to be installed in /usr/(local/)lib/python... +possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, + os.pardir)) +if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')): + sys.path.insert(0, possible_topdir) + +from nova import service +from nova import twistd + + +if __name__ == '__main__': + twistd.serve(__file__) + +if __name__ == '__builtin__': + application = service.Service.create() diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py index 1618b784..4b39c6d1 100644 --- a/nova/endpoint/cloud.py +++ b/nova/endpoint/cloud.py @@ -23,6 +23,7 @@ datastore. """ import base64 +import datetime import logging import os import time @@ -299,9 +300,11 @@ class CloudController(object): vol['attach_status'] = "detached" volume_ref = db.volume_create(context, vol) - rpc.cast(FLAGS.volume_topic, {"method": "create_volume", - "args": {"context": None, - "volume_id": volume_ref['id']}}) + rpc.cast(FLAGS.scheduler_topic, + {"method": "create_volume", + "args": {"context": None, + "topic": FLAGS.volume_topic, + "volume_id": volume_ref['id']}}) return {'volumeSet': [self._format_volume(context, volume_ref)]} @@ -310,6 +313,8 @@ class CloudController(object): def attach_volume(self, context, volume_id, instance_id, device, **kwargs): volume_ref = db.volume_get_by_str(context, volume_id) # TODO(vish): abstract status checking? + if volume_ref['status'] != "available": + raise exception.ApiError("Volume status must be available") if volume_ref['attach_status'] == "attached": raise exception.ApiError("Volume is already attached") instance_ref = db.instance_get_by_str(context, instance_id) @@ -332,10 +337,10 @@ class CloudController(object): volume_ref = db.volume_get_by_str(context, volume_id) instance_ref = db.volume_get_instance(context, volume_ref['id']) if not instance_ref: - raise exception.Error("Volume isn't attached to anything!") + raise exception.ApiError("Volume isn't attached to anything!") # TODO(vish): abstract status checking? if volume_ref['status'] == "available": - raise exception.Error("Volume is already detached") + raise exception.ApiError("Volume is already detached") try: host = instance_ref['host'] rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host), @@ -379,7 +384,7 @@ class CloudController(object): instances = db.instance_get_by_reservation(context, reservation_id) else: - if not context.user.is_admin(): + if context.user.is_admin(): instances = db.instance_get_all(context) else: instances = db.instance_get_by_project(context, @@ -571,6 +576,7 @@ class CloudController(object): reservation_id = utils.generate_uid('r') base_options = {} + base_options['state_description'] = 'scheduling' base_options['image_id'] = image_id base_options['kernel_id'] = kernel_id base_options['ramdisk_id'] = ramdisk_id @@ -609,11 +615,12 @@ class CloudController(object): "args": {"context": None, "address": address}}) - rpc.cast(FLAGS.compute_topic, - {"method": "run_instance", - "args": {"context": None, - "instance_id": inst_id}}) - logging.debug("Casting to node for %s/%s's instance %s" % + rpc.cast(FLAGS.scheduler_topic, + {"method": "run_instance", + "args": {"context": None, + "topic": FLAGS.compute_topic, + "instance_id": inst_id}}) + logging.debug("Casting to scheduler for %s/%s's instance %s" % (context.project.name, context.user.name, inst_id)) defer.returnValue(self._format_run_instances(context, reservation_id)) @@ -632,6 +639,10 @@ class CloudController(object): % id_str) continue + now = datetime.datetime.utcnow() + db.instance_update(context, + instance_ref['id'], + {'terminated_at': now}) # FIXME(ja): where should network deallocate occur? address = db.instance_get_floating_address(context, instance_ref['id']) @@ -653,7 +664,7 @@ class CloudController(object): # NOTE(vish): Currently, nothing needs to be done on the # network node until release. If this changes, # we will need to cast here. - self.network.deallocate_fixed_ip(context, address) + self.network_manager.deallocate_fixed_ip(context, address) host = instance_ref['host'] if host: @@ -681,6 +692,10 @@ class CloudController(object): def delete_volume(self, context, volume_id, **kwargs): # TODO: return error if not authorized volume_ref = db.volume_get_by_str(context, volume_id) + if volume_ref['status'] != "available": + raise exception.ApiError("Volume status must be available") + now = datetime.datetime.utcnow() + db.volume_update(context, volume_ref['id'], {'terminated_at': now}) host = volume_ref['host'] rpc.cast(db.queue_get_for(context, FLAGS.volume_topic, host), {"method": "delete_volume", diff --git a/nova/flags.py b/nova/flags.py index 7b0c95a3..ed0baee6 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -171,6 +171,7 @@ DEFINE_string('connection_type', 'libvirt', 'libvirt, xenapi or fake') DEFINE_integer('s3_port', 3333, 's3 port') DEFINE_string('s3_host', '127.0.0.1', 's3 host') DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on') +DEFINE_string('scheduler_topic', 'scheduler', 'the topic scheduler nodes listen on') DEFINE_string('volume_topic', 'volume', 'the topic volume nodes listen on') DEFINE_string('network_topic', 'network', 'the topic network nodes listen on') @@ -213,6 +214,8 @@ DEFINE_string('network_manager', 'nova.network.manager.VlanManager', 'Manager for network') DEFINE_string('volume_manager', 'nova.volume.manager.AOEManager', 'Manager for volume') +DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager', + 'Manager for scheduler') DEFINE_string('host', socket.gethostname(), 'name of this node') diff --git a/nova/scheduler/__init__.py b/nova/scheduler/__init__.py new file mode 100644 index 00000000..8359a7ae --- /dev/null +++ b/nova/scheduler/__init__.py @@ -0,0 +1,25 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Openstack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`nova.scheduler` -- Scheduler Nodes +===================================================== + +.. automodule:: nova.scheduler + :platform: Unix + :synopsis: Module that picks a compute node to run a VM instance. +.. moduleauthor:: Chris Behrens +""" diff --git a/nova/scheduler/simple.py b/nova/scheduler/simple.py new file mode 100644 index 00000000..fdaff74d --- /dev/null +++ b/nova/scheduler/simple.py @@ -0,0 +1,90 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Openstack, LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Simple Scheduler +""" + +import datetime + +from nova import db +from nova import flags +from nova.scheduler import driver +from nova.scheduler import chance + +FLAGS = flags.FLAGS +flags.DEFINE_integer("max_cores", 16, + "maximum number of instance cores to allow per host") +flags.DEFINE_integer("max_gigabytes", 10000, + "maximum number of volume gigabytes to allow per host") +flags.DEFINE_integer("max_networks", 1000, + "maximum number of networks to allow per host") + +class SimpleScheduler(chance.ChanceScheduler): + """Implements Naive Scheduler that tries to find least loaded host.""" + + def schedule_run_instance(self, context, instance_id, *_args, **_kwargs): + """Picks a host that is up and has the fewest running instances.""" + instance_ref = db.instance_get(context, instance_id) + results = db.service_get_all_compute_sorted(context) + for result in results: + (service, instance_cores) = result + if instance_cores + instance_ref['vcpus'] > FLAGS.max_cores: + raise driver.NoValidHost("All hosts have too many cores") + if self.service_is_up(service): + # NOTE(vish): this probably belongs in the manager, if we + # can generalize this somehow + now = datetime.datetime.utcnow() + db.instance_update(context, + instance_id, + {'host': service['host'], + 'scheduled_at': now}) + return service['host'] + raise driver.NoValidHost("No hosts found") + + def schedule_create_volume(self, context, volume_id, *_args, **_kwargs): + """Picks a host that is up and has the fewest volumes.""" + volume_ref = db.volume_get(context, volume_id) + results = db.service_get_all_volume_sorted(context) + for result in results: + (service, volume_gigabytes) = result + if volume_gigabytes + volume_ref['size'] > FLAGS.max_gigabytes: + raise driver.NoValidHost("All hosts have too many gigabytes") + if self.service_is_up(service): + # NOTE(vish): this probably belongs in the manager, if we + # can generalize this somehow + now = datetime.datetime.utcnow() + db.volume_update(context, + volume_id, + {'host': service['host'], + 'scheduled_at': now}) + return service['host'] + raise driver.NoValidHost("No hosts found") + + def schedule_set_network_host(self, context, *_args, **_kwargs): + """Picks a host that is up and has the fewest networks.""" + + results = db.service_get_all_network_sorted(context) + for result in results: + (service, instance_count) = result + if instance_count >= FLAGS.max_networks: + raise driver.NoValidHost("All hosts have too many networks") + if self.service_is_up(service): + return service['host'] + raise driver.NoValidHost("No hosts found") diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py index 59cf2a9b..f5c0f1c0 100644 --- a/nova/tests/compute_unittest.py +++ b/nova/tests/compute_unittest.py @@ -84,21 +84,21 @@ class ComputeTestCase(test.TrialTestCase): @defer.inlineCallbacks def test_run_terminate_timestamps(self): - """Make sure it is possible to run and terminate instance""" + """Make sure timestamps are set for launched and destroyed""" instance_id = self._create_instance() instance_ref = db.instance_get(self.context, instance_id) self.assertEqual(instance_ref['launched_at'], None) - self.assertEqual(instance_ref['terminated_at'], None) + self.assertEqual(instance_ref['deleted_at'], None) launch = datetime.datetime.utcnow() yield self.compute.run_instance(self.context, instance_id) instance_ref = db.instance_get(self.context, instance_id) self.assert_(instance_ref['launched_at'] > launch) - self.assertEqual(instance_ref['terminated_at'], None) + self.assertEqual(instance_ref['deleted_at'], None) terminate = datetime.datetime.utcnow() yield self.compute.terminate_instance(self.context, instance_id) instance_ref = db.instance_get({'deleted': True}, instance_id) self.assert_(instance_ref['launched_at'] < terminate) - self.assert_(instance_ref['terminated_at'] > terminate) + self.assert_(instance_ref['deleted_at'] > terminate) @defer.inlineCallbacks def test_reboot(self): diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py new file mode 100644 index 00000000..fde30f81 --- /dev/null +++ b/nova/tests/scheduler_unittest.py @@ -0,0 +1,231 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests For Scheduler +""" + +from nova import db +from nova import flags +from nova import service +from nova import test +from nova import rpc +from nova import utils +from nova.auth import manager as auth_manager +from nova.scheduler import manager +from nova.scheduler import driver + + +FLAGS = flags.FLAGS +flags.DECLARE('max_cores', 'nova.scheduler.simple') + +class TestDriver(driver.Scheduler): + """Scheduler Driver for Tests""" + def schedule(context, topic, *args, **kwargs): + return 'fallback_host' + + def schedule_named_method(context, topic, num): + return 'named_host' + +class SchedulerTestCase(test.TrialTestCase): + """Test case for scheduler""" + def setUp(self): # pylint: disable=C0103 + super(SchedulerTestCase, self).setUp() + self.flags(scheduler_driver='nova.tests.scheduler_unittest.TestDriver') + + def test_fallback(self): + scheduler = manager.SchedulerManager() + self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True) + rpc.cast('topic.fallback_host', + {'method': 'noexist', + 'args': {'context': None, + 'num': 7}}) + self.mox.ReplayAll() + scheduler.noexist(None, 'topic', num=7) + + def test_named_method(self): + scheduler = manager.SchedulerManager() + self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True) + rpc.cast('topic.named_host', + {'method': 'named_method', + 'args': {'context': None, + 'num': 7}}) + self.mox.ReplayAll() + scheduler.named_method(None, 'topic', num=7) + + +class SimpleDriverTestCase(test.TrialTestCase): + """Test case for simple driver""" + def setUp(self): # pylint: disable-msg=C0103 + super(SimpleDriverTestCase, self).setUp() + self.flags(connection_type='fake', + max_cores=4, + max_gigabytes=4, + volume_driver='nova.volume.driver.FakeAOEDriver', + scheduler_driver='nova.scheduler.simple.SimpleScheduler') + self.scheduler = manager.SchedulerManager() + self.context = None + self.manager = auth_manager.AuthManager() + self.user = self.manager.create_user('fake', 'fake', 'fake') + self.project = self.manager.create_project('fake', 'fake', 'fake') + self.context = None + + def tearDown(self): # pylint: disable-msg=C0103 + self.manager.delete_user(self.user) + self.manager.delete_project(self.project) + + def _create_instance(self): + """Create a test instance""" + inst = {} + inst['image_id'] = 'ami-test' + inst['reservation_id'] = 'r-fakeres' + inst['user_id'] = self.user.id + inst['project_id'] = self.project.id + inst['instance_type'] = 'm1.tiny' + inst['mac_address'] = utils.generate_mac() + inst['ami_launch_index'] = 0 + inst['vcpus'] = 1 + return db.instance_create(self.context, inst)['id'] + + def _create_volume(self): + """Create a test volume""" + vol = {} + vol['image_id'] = 'ami-test' + vol['reservation_id'] = 'r-fakeres' + vol['size'] = 1 + return db.volume_create(self.context, vol)['id'] + + def test_hosts_are_up(self): + """Ensures driver can find the hosts that are up""" + # NOTE(vish): constructing service without create method + # because we are going to use it without queue + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + hosts = self.scheduler.driver.hosts_up(self.context, 'compute') + self.assertEqual(len(hosts), 2) + compute1.kill() + compute2.kill() + + def test_least_busy_host_gets_instance(self): + """Ensures the host with less cores gets the next one""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + instance_id1 = self._create_instance() + compute1.run_instance(self.context, instance_id1) + instance_id2 = self._create_instance() + host = self.scheduler.driver.schedule_run_instance(self.context, + instance_id2) + self.assertEqual(host, 'host2') + compute1.terminate_instance(self.context, instance_id1) + db.instance_destroy(self.context, instance_id2) + compute1.kill() + compute2.kill() + + def test_too_many_cores(self): + """Ensures we don't go over max cores""" + compute1 = service.Service('host1', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + compute2 = service.Service('host2', + 'nova-compute', + 'compute', + FLAGS.compute_manager) + instance_ids1 = [] + instance_ids2 = [] + for index in xrange(FLAGS.max_cores): + instance_id = self._create_instance() + compute1.run_instance(self.context, instance_id) + instance_ids1.append(instance_id) + instance_id = self._create_instance() + compute2.run_instance(self.context, instance_id) + instance_ids2.append(instance_id) + instance_id = self._create_instance() + self.assertRaises(driver.NoValidHost, + self.scheduler.driver.schedule_run_instance, + self.context, + instance_id) + for instance_id in instance_ids1: + compute1.terminate_instance(self.context, instance_id) + for instance_id in instance_ids2: + compute2.terminate_instance(self.context, instance_id) + compute1.kill() + compute2.kill() + + def test_least_busy_host_gets_volume(self): + """Ensures the host with less gigabytes gets the next one""" + volume1 = service.Service('host1', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume2 = service.Service('host2', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume_id1 = self._create_volume() + volume1.create_volume(self.context, volume_id1) + volume_id2 = self._create_volume() + host = self.scheduler.driver.schedule_create_volume(self.context, + volume_id2) + self.assertEqual(host, 'host2') + volume1.delete_volume(self.context, volume_id1) + db.volume_destroy(self.context, volume_id2) + volume1.kill() + volume2.kill() + + def test_too_many_gigabytes(self): + """Ensures we don't go over max gigabytes""" + volume1 = service.Service('host1', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume2 = service.Service('host2', + 'nova-volume', + 'volume', + FLAGS.volume_manager) + volume_ids1 = [] + volume_ids2 = [] + for index in xrange(FLAGS.max_gigabytes): + volume_id = self._create_volume() + volume1.create_volume(self.context, volume_id) + volume_ids1.append(volume_id) + volume_id = self._create_volume() + volume2.create_volume(self.context, volume_id) + volume_ids2.append(volume_id) + volume_id = self._create_volume() + self.assertRaises(driver.NoValidHost, + self.scheduler.driver.schedule_create_volume, + self.context, + volume_id) + for volume_id in volume_ids1: + volume1.delete_volume(self.context, volume_id) + for volume_id in volume_ids2: + volume2.delete_volume(self.context, volume_id) + volume1.kill() + volume2.kill() diff --git a/run_tests.py b/run_tests.py index 73bf57f9..4121f4c0 100644 --- a/run_tests.py +++ b/run_tests.py @@ -60,6 +60,7 @@ from nova.tests.objectstore_unittest import * from nova.tests.process_unittest import * from nova.tests.quota_unittest import * from nova.tests.rpc_unittest import * +from nova.tests.scheduler_unittest import * from nova.tests.service_unittest import * from nova.tests.validator_unittest import * from nova.tests.volume_unittest import *