diff --git a/bin/nova-compute b/bin/nova-compute index 49710e1b3..67c93fcb8 100755 --- a/bin/nova-compute +++ b/bin/nova-compute @@ -19,80 +19,14 @@ """ Twistd daemon for the nova compute nodes. - Receives messages via AMQP, manages pool of worker threads - for async tasks. """ -import logging -import os -import sys - -# NOTE(termie): kludge so that we can run this from the bin directory in the -# checkout without having to screw with paths -NOVA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'nova') -if os.path.exists(NOVA_PATH): - sys.path.insert(0, os.path.dirname(NOVA_PATH)) - -from twisted.internet import task -from twisted.application import service - -from nova import flags -from nova import rpc from nova import twistd from nova.compute import node -FLAGS = flags.FLAGS -# NOTE(termie): This file will necessarily be re-imported under different -# context when the twistd.serve() call is made below so any -# flags we define here will have to be conditionally defined, -# flags defined by imported modules are safe. -if 'compute_report_state_interval' not in FLAGS: - flags.DEFINE_integer('compute_report_state_interval', 10, - 'seconds between nodes reporting state to cloud', - lower_bound=1) -logging.getLogger().setLevel(logging.DEBUG) - -def main(): - logging.warn('Starting compute node') - n = node.Node() - d = n.adopt_instances() - d.addCallback(lambda x: logging.info('Adopted %d instances', x)) - - conn = rpc.Connection.instance() - consumer_all = rpc.AdapterConsumer( - connection=conn, - topic='%s' % FLAGS.compute_topic, - proxy=n) - - consumer_node = rpc.AdapterConsumer( - connection=conn, - topic='%s.%s' % (FLAGS.compute_topic, FLAGS.node_name), - proxy=n) - - bin_name = os.path.basename(__file__) - pulse = task.LoopingCall(n.report_state, FLAGS.node_name, bin_name) - pulse.start(interval=FLAGS.compute_report_state_interval, now=False) - - consumer_all.attach_to_twisted() - consumer_node.attach_to_twisted() - - # This is the parent service that twistd will be looking for when it - # parses this file, return it so that we can get it into globals below - application = service.Application(bin_name) - n.setServiceParent(application) - return application - - -# NOTE(termie): When this script is executed from the commandline what it will -# actually do is tell the twistd application runner that it -# should run this file as a twistd application (see below). if __name__ == '__main__': twistd.serve(__file__) -# NOTE(termie): When this script is loaded by the twistd application runner -# this code path will be executed and twistd will expect a -# variable named 'application' to be available, it will then -# handle starting it and stopping it. if __name__ == '__builtin__': - application = main() + application = node.ComputeNode.create() diff --git a/bin/nova-network b/bin/nova-network new file mode 100644 index 000000000..c69690081 --- /dev/null +++ b/bin/nova-network @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" + Twistd daemon for the nova network nodes. +""" + +from nova import twistd +from nova.network import node + + +if __name__ == '__main__': + twistd.serve(__file__) + +if __name__ == '__builtin__': + application = node.NetworkNode.create() diff --git a/bin/nova-volume b/bin/nova-volume index 7d4b65205..cdf2782bc 100755 --- a/bin/nova-volume +++ b/bin/nova-volume @@ -18,77 +18,15 @@ # under the License. """ - Tornado Storage daemon manages AoE volumes via AMQP messaging. + Twistd daemon for the nova volume nodes. """ -import logging -import os -import sys - -# NOTE(termie): kludge so that we can run this from the bin directory in the -# checkout without having to screw with paths -NOVA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'nova') -if os.path.exists(NOVA_PATH): - sys.path.insert(0, os.path.dirname(NOVA_PATH)) - -from twisted.internet import task -from twisted.application import service - -from nova import flags -from nova import rpc from nova import twistd -from nova.volume import storage +from nova.volume import node -FLAGS = flags.FLAGS -# NOTE(termie): This file will necessarily be re-imported under different -# context when the twistd.serve() call is made below so any -# flags we define here will have to be conditionally defined, -# flags defined by imported modules are safe. -if 'volume_report_state_interval' not in FLAGS: - flags.DEFINE_integer('volume_report_state_interval', 10, - 'seconds between nodes reporting state to cloud', - lower_bound=1) - - -def main(): - logging.warn('Starting volume node') - bs = storage.BlockStore() - - conn = rpc.Connection.instance() - consumer_all = rpc.AdapterConsumer( - connection=conn, - topic='%s' % FLAGS.storage_topic, - proxy=bs) - - consumer_node = rpc.AdapterConsumer( - connection=conn, - topic='%s.%s' % (FLAGS.storage_topic, FLAGS.node_name), - proxy=bs) - - bin_name = os.path.basename(__file__) - pulse = task.LoopingCall(bs.report_state, FLAGS.node_name, bin_name) - pulse.start(interval=FLAGS.volume_report_state_interval, now=False) - - consumer_all.attach_to_twisted() - consumer_node.attach_to_twisted() - - # This is the parent service that twistd will be looking for when it - # parses this file, return it so that we can get it into globals below - application = service.Application(bin_name) - bs.setServiceParent(application) - return application - - -# NOTE(termie): When this script is executed from the commandline what it will -# actually do is tell the twistd application runner that it -# should run this file as a twistd application (see below). if __name__ == '__main__': twistd.serve(__file__) -# NOTE(termie): When this script is loaded by the twistd application runner -# this code path will be executed and twistd will expect a -# variable named 'application' to be available, it will then -# handle starting it and stopping it. if __name__ == '__builtin__': - application = main() + application = node.VolumeNode.create() diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py index 3b7b4804b..eaa608b1e 100644 --- a/nova/endpoint/cloud.py +++ b/nova/endpoint/cloud.py @@ -38,9 +38,9 @@ from nova.auth import rbac from nova.auth import users from nova.compute import model from nova.compute import network -from nova.compute import node +from nova.compute import computenode from nova.endpoint import images -from nova.volume import storage +from nova.volume import volumenode FLAGS = flags.FLAGS @@ -76,7 +76,7 @@ class CloudController(object): def volumes(self): """ returns a list of all volumes """ for volume_id in datastore.Redis.instance().smembers("volumes"): - volume = storage.get_volume(volume_id) + volume = volumenode.get_volume(volume_id) yield volume def __str__(self): @@ -103,7 +103,7 @@ class CloudController(object): result = {} for instance in self.instdir.all: if instance['project_id'] == project_id: - line = '%s slots=%d' % (instance['private_dns_name'], node.INSTANCE_TYPES[instance['instance_type']]['vcpus']) + line = '%s slots=%d' % (instance['private_dns_name'], computenode.INSTANCE_TYPES[instance['instance_type']]['vcpus']) if instance['key_name'] in result: result[instance['key_name']].append(line) else: @@ -296,8 +296,8 @@ class CloudController(object): @rbac.allow('projectmanager', 'sysadmin') def create_volume(self, context, size, **kwargs): - # TODO(vish): refactor this to create the volume object here and tell storage to create it - res = rpc.call(FLAGS.storage_topic, {"method": "create_volume", + # TODO(vish): refactor this to create the volume object here and tell volumenode to create it + res = rpc.call(FLAGS.volume_topic, {"method": "create_volume", "args" : {"size": size, "user_id": context.user.id, "project_id": context.project.id}}) @@ -331,7 +331,7 @@ class CloudController(object): raise exception.NotFound('Instance %s could not be found' % instance_id) def _get_volume(self, context, volume_id): - volume = storage.get_volume(volume_id) + volume = volumenode.get_volume(volume_id) if context.user.is_admin() or volume['project_id'] == context.project.id: return volume raise exception.NotFound('Volume %s could not be found' % volume_id) @@ -628,8 +628,8 @@ class CloudController(object): def delete_volume(self, context, volume_id, **kwargs): # TODO: return error if not authorized volume = self._get_volume(context, volume_id) - storage_node = volume['node_name'] - rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node), + volume_node = volume['node_name'] + rpc.cast('%s.%s' % (FLAGS.volume_topic, volume_node), {"method": "delete_volume", "args" : {"volume_id": volume_id}}) return defer.succeed(True) diff --git a/nova/endpoint/rackspace.py b/nova/endpoint/rackspace.py index 9208ddab7..08e435c5d 100644 --- a/nova/endpoint/rackspace.py +++ b/nova/endpoint/rackspace.py @@ -39,7 +39,6 @@ from nova.compute import model from nova.compute import network from nova.endpoint import images from nova.endpoint import wsgi -from nova.volume import storage FLAGS = flags.FLAGS diff --git a/nova/flags.py b/nova/flags.py index 06ea1e007..ffb395f13 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -40,7 +40,9 @@ DEFINE_integer('s3_port', 3333, 's3 port') DEFINE_string('s3_host', '127.0.0.1', 's3 host') #DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on') DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on') -DEFINE_string('storage_topic', 'storage', 'the topic storage nodes listen on') +DEFINE_string('volume_topic', 'volume', 'the topic volume nodes listen on') +DEFINE_string('network_topic', 'network', 'the topic network nodes listen on') + DEFINE_bool('fake_libvirt', False, 'whether to use a fake libvirt or not') DEFINE_bool('verbose', False, 'show debug output') diff --git a/nova/node.py b/nova/node.py new file mode 100644 index 000000000..852344da9 --- /dev/null +++ b/nova/node.py @@ -0,0 +1,103 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Generic Node baseclass for all workers that run on hosts +""" + +import inspect +import logging +import os + +from twisted.internet import defer +from twisted.internet import task +from twisted.application import service + +from nova import datastore +from nova import flags +from nova import rpc +from nova.compute import model + + +FLAGS = flags.FLAGS + +flags.DEFINE_integer('report_interval', 10, + 'seconds between nodes reporting state to cloud', + lower_bound=1) + +class Node(object, service.Service): + """Base class for workers that run on hosts""" + + @classmethod + def create(cls, + report_interval=None, # defaults to flag + bin_name=None, # defaults to basename of executable + topic=None): # defaults to basename - "nova-" part + """Instantiates class and passes back application object""" + if not report_interval: + # NOTE(vish): set here because if it is set to flag in the + # parameter list, it wrongly uses the default + report_interval = FLAGS.report_interval + # NOTE(vish): magic to automatically determine bin_name and topic + if not bin_name: + bin_name = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = bin_name.rpartition("nova-")[2] + logging.warn("Starting %s node" % topic) + node_instance = cls() + + conn = rpc.Connection.instance() + consumer_all = rpc.AdapterConsumer( + connection=conn, + topic='%s' % topic, + proxy=node_instance) + + consumer_node = rpc.AdapterConsumer( + connection=conn, + topic='%s.%s' % (topic, FLAGS.node_name), + proxy=node_instance) + + pulse = task.LoopingCall(node_instance.report_state, + FLAGS.node_name, + bin_name) + pulse.start(interval=report_interval, now=False) + + consumer_all.attach_to_twisted() + consumer_node.attach_to_twisted() + + # This is the parent service that twistd will be looking for when it + # parses this file, return it so that we can get it into globals below + application = service.Application(bin_name) + node_instance.setServiceParent(application) + return application + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except datastore.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index b8614fdc8..7ab2c257a 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -28,7 +28,7 @@ from nova import flags from nova import rpc from nova import test from nova.auth import users -from nova.compute import node +from nova.compute import computenode from nova.endpoint import api from nova.endpoint import cloud @@ -54,7 +54,7 @@ class CloudTestCase(test.BaseTestCase): self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) # set up a node - self.node = node.Node() + self.node = computenode.ComputeNode() self.node_consumer = rpc.AdapterConsumer(connection=self.conn, topic=FLAGS.compute_topic, proxy=self.node) diff --git a/nova/tests/node_unittest.py b/nova/tests/compute_unittest.py similarity index 95% rename from nova/tests/node_unittest.py rename to nova/tests/compute_unittest.py index 93942d79e..4c0f1afb3 100644 --- a/nova/tests/node_unittest.py +++ b/nova/tests/compute_unittest.py @@ -26,7 +26,7 @@ from nova import flags from nova import test from nova import utils from nova.compute import model -from nova.compute import node +from nova.compute import computenode FLAGS = flags.FLAGS @@ -53,14 +53,14 @@ class InstanceXmlTestCase(test.TrialTestCase): # rv = yield first_node.terminate_instance(instance_id) -class NodeConnectionTestCase(test.TrialTestCase): +class ComputeConnectionTestCase(test.TrialTestCase): def setUp(self): logging.getLogger().setLevel(logging.DEBUG) - super(NodeConnectionTestCase, self).setUp() + super(ComputeConnectionTestCase, self).setUp() self.flags(fake_libvirt=True, fake_storage=True, fake_users=True) - self.node = node.Node() + self.node = computenode.ComputeNode() def create_instance(self): instdir = model.InstanceDirectory() diff --git a/nova/tests/future_unittest.py b/nova/tests/future_unittest.py deleted file mode 100644 index da5470ffe..000000000 --- a/nova/tests/future_unittest.py +++ /dev/null @@ -1,75 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import mox -import StringIO -import time -from tornado import ioloop -from twisted.internet import defer -import unittest -from xml.etree import ElementTree - -from nova import cloud -from nova import exception -from nova import flags -from nova import node -from nova import rpc -from nova import test - - -FLAGS = flags.FLAGS - - -class AdminTestCase(test.BaseTestCase): - def setUp(self): - super(AdminTestCase, self).setUp() - self.flags(fake_libvirt=True, - fake_rabbit=True) - - self.conn = rpc.Connection.instance() - - logging.getLogger().setLevel(logging.INFO) - - # set up our cloud - self.cloud = cloud.CloudController() - self.cloud_consumer = rpc.AdapterConsumer(connection=self.conn, - topic=FLAGS.cloud_topic, - proxy=self.cloud) - self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) - - # set up a node - self.node = node.Node() - self.node_consumer = rpc.AdapterConsumer(connection=self.conn, - topic=FLAGS.compute_topic, - proxy=self.node) - self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop)) - - def test_flush_terminated(self): - # Launch an instance - - # Wait until it's running - - # Terminate it - - # Wait until it's terminated - - # Flush terminated nodes - - # ASSERT that it's gone - pass diff --git a/nova/tests/model_unittest.py b/nova/tests/model_unittest.py index 1bd7e527f..f84b6d11c 100644 --- a/nova/tests/model_unittest.py +++ b/nova/tests/model_unittest.py @@ -25,7 +25,6 @@ from nova import flags from nova import test from nova import utils from nova.compute import model -from nova.compute import node FLAGS = flags.FLAGS diff --git a/nova/tests/storage_unittest.py b/nova/tests/volume_unittest.py similarity index 86% rename from nova/tests/storage_unittest.py rename to nova/tests/volume_unittest.py index 60576d74f..c176453d8 100644 --- a/nova/tests/storage_unittest.py +++ b/nova/tests/volume_unittest.py @@ -21,22 +21,22 @@ import logging from nova import exception from nova import flags from nova import test -from nova.compute import node -from nova.volume import storage +from nova.compute import computenode +from nova.volume import volumenode FLAGS = flags.FLAGS -class StorageTestCase(test.TrialTestCase): +class VolumeTestCase(test.TrialTestCase): def setUp(self): logging.getLogger().setLevel(logging.DEBUG) - super(StorageTestCase, self).setUp() - self.mynode = node.Node() + super(VolumeTestCase, self).setUp() + self.mynode = computenode.ComputeNode() self.mystorage = None self.flags(fake_libvirt=True, fake_storage=True) - self.mystorage = storage.BlockStore() + self.mystorage = volumenode.VolumeNode() def test_run_create_volume(self): vol_size = '0' @@ -45,11 +45,11 @@ class StorageTestCase(test.TrialTestCase): volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) # TODO(termie): get_volume returns differently than create_volume self.assertEqual(volume_id, - storage.get_volume(volume_id)['volume_id']) + volumenode.get_volume(volume_id)['volume_id']) rv = self.mystorage.delete_volume(volume_id) self.assertRaises(exception.Error, - storage.get_volume, + volumenode.get_volume, volume_id) def test_too_big_volume(self): @@ -70,7 +70,7 @@ class StorageTestCase(test.TrialTestCase): for i in xrange(total_slots): vid = self.mystorage.create_volume(vol_size, user_id, project_id) vols.append(vid) - self.assertRaises(storage.NoMoreVolumes, + self.assertRaises(volumenode.NoMoreVolumes, self.mystorage.create_volume, vol_size, user_id, project_id) for id in vols: @@ -85,7 +85,7 @@ class StorageTestCase(test.TrialTestCase): mountpoint = "/dev/sdf" volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) - volume_obj = storage.get_volume(volume_id) + volume_obj = volumenode.get_volume(volume_id) volume_obj.start_attach(instance_id, mountpoint) rv = yield self.mynode.attach_volume(volume_id, instance_id, @@ -100,12 +100,12 @@ class StorageTestCase(test.TrialTestCase): volume_id) rv = yield self.mystorage.detach_volume(volume_id) - volume_obj = storage.get_volume(volume_id) + volume_obj = volumenode.get_volume(volume_id) self.assertEqual(volume_obj['status'], "available") rv = self.mystorage.delete_volume(volume_id) self.assertRaises(exception.Error, - storage.get_volume, + volumenode.get_volume, volume_id) def test_multi_node(self): diff --git a/nova/twistd.py b/nova/twistd.py index 32a46ce03..fc7dad26a 100644 --- a/nova/twistd.py +++ b/nova/twistd.py @@ -32,7 +32,6 @@ from twisted.python import log from twisted.python import reflect from twisted.python import runtime from twisted.python import usage -import UserDict from nova import flags @@ -161,6 +160,13 @@ def WrapTwistedOptions(wrapped): except (AttributeError, KeyError): self._data[key] = value + def get(self, key, default): + key = key.replace('-', '_') + try: + return getattr(FLAGS, key) + except (AttributeError, KeyError): + self._data.get(key, default) + return TwistedOptionsToFlags @@ -210,8 +216,12 @@ def serve(filename): elif FLAGS.pidfile.endswith('twistd.pid'): FLAGS.pidfile = FLAGS.pidfile.replace('twistd.pid', '%s.pid' % name) + print FLAGS.logfile if not FLAGS.logfile: FLAGS.logfile = '%s.log' % name + elif FLAGS.logfile.endswith('twistd.log'): + FLAGS.logfile = FLAGS.logfile.replace('twistd.log', '%s.log' % name) + print FLAGS.logfile action = 'start' if len(argv) > 1: diff --git a/run_tests.py b/run_tests.py index db8a582ea..ae2874f58 100644 --- a/run_tests.py +++ b/run_tests.py @@ -52,14 +52,14 @@ from nova import twistd from nova.tests.access_unittest import * from nova.tests.api_unittest import * from nova.tests.cloud_unittest import * +from nova.tests.compute_unittest import * from nova.tests.model_unittest import * from nova.tests.network_unittest import * -from nova.tests.node_unittest import * from nova.tests.objectstore_unittest import * from nova.tests.process_unittest import * -from nova.tests.storage_unittest import * from nova.tests.users_unittest import * from nova.tests.validator_unittest import * +from nova.tests.volume_unittest import * FLAGS = flags.FLAGS