refactor daemons to use common base class in preparation for network refactor

This commit is contained in:
Vishvananda Ishaya
2010-07-21 14:42:22 -05:00
parent 3c84af7cdc
commit 5b5b2cfe3a
14 changed files with 182 additions and 240 deletions

View File

@@ -19,80 +19,14 @@
"""
Twistd daemon for the nova compute nodes.
Receives messages via AMQP, manages pool of worker threads
for async tasks.
"""
import logging
import os
import sys
# NOTE(termie): kludge so that we can run this from the bin directory in the
# checkout without having to screw with paths
NOVA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'nova')
if os.path.exists(NOVA_PATH):
sys.path.insert(0, os.path.dirname(NOVA_PATH))
from twisted.internet import task
from twisted.application import service
from nova import flags
from nova import rpc
from nova import twistd
from nova.compute import node
FLAGS = flags.FLAGS
# NOTE(termie): This file will necessarily be re-imported under different
# context when the twistd.serve() call is made below so any
# flags we define here will have to be conditionally defined,
# flags defined by imported modules are safe.
if 'compute_report_state_interval' not in FLAGS:
flags.DEFINE_integer('compute_report_state_interval', 10,
'seconds between nodes reporting state to cloud',
lower_bound=1)
logging.getLogger().setLevel(logging.DEBUG)
def main():
logging.warn('Starting compute node')
n = node.Node()
d = n.adopt_instances()
d.addCallback(lambda x: logging.info('Adopted %d instances', x))
conn = rpc.Connection.instance()
consumer_all = rpc.AdapterConsumer(
connection=conn,
topic='%s' % FLAGS.compute_topic,
proxy=n)
consumer_node = rpc.AdapterConsumer(
connection=conn,
topic='%s.%s' % (FLAGS.compute_topic, FLAGS.node_name),
proxy=n)
bin_name = os.path.basename(__file__)
pulse = task.LoopingCall(n.report_state, FLAGS.node_name, bin_name)
pulse.start(interval=FLAGS.compute_report_state_interval, now=False)
consumer_all.attach_to_twisted()
consumer_node.attach_to_twisted()
# This is the parent service that twistd will be looking for when it
# parses this file, return it so that we can get it into globals below
application = service.Application(bin_name)
n.setServiceParent(application)
return application
# NOTE(termie): When this script is executed from the commandline what it will
# actually do is tell the twistd application runner that it
# should run this file as a twistd application (see below).
if __name__ == '__main__':
twistd.serve(__file__)
# NOTE(termie): When this script is loaded by the twistd application runner
# this code path will be executed and twistd will expect a
# variable named 'application' to be available, it will then
# handle starting it and stopping it.
if __name__ == '__builtin__':
application = main()
application = node.ComputeNode.create()

32
bin/nova-network Normal file
View File

@@ -0,0 +1,32 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Twistd daemon for the nova network nodes.
"""
from nova import twistd
from nova.network import node
if __name__ == '__main__':
twistd.serve(__file__)
if __name__ == '__builtin__':
application = node.NetworkNode.create()

View File

@@ -18,77 +18,15 @@
# under the License.
"""
Tornado Storage daemon manages AoE volumes via AMQP messaging.
Twistd daemon for the nova volume nodes.
"""
import logging
import os
import sys
# NOTE(termie): kludge so that we can run this from the bin directory in the
# checkout without having to screw with paths
NOVA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'nova')
if os.path.exists(NOVA_PATH):
sys.path.insert(0, os.path.dirname(NOVA_PATH))
from twisted.internet import task
from twisted.application import service
from nova import flags
from nova import rpc
from nova import twistd
from nova.volume import storage
from nova.volume import node
FLAGS = flags.FLAGS
# NOTE(termie): This file will necessarily be re-imported under different
# context when the twistd.serve() call is made below so any
# flags we define here will have to be conditionally defined,
# flags defined by imported modules are safe.
if 'volume_report_state_interval' not in FLAGS:
flags.DEFINE_integer('volume_report_state_interval', 10,
'seconds between nodes reporting state to cloud',
lower_bound=1)
def main():
logging.warn('Starting volume node')
bs = storage.BlockStore()
conn = rpc.Connection.instance()
consumer_all = rpc.AdapterConsumer(
connection=conn,
topic='%s' % FLAGS.storage_topic,
proxy=bs)
consumer_node = rpc.AdapterConsumer(
connection=conn,
topic='%s.%s' % (FLAGS.storage_topic, FLAGS.node_name),
proxy=bs)
bin_name = os.path.basename(__file__)
pulse = task.LoopingCall(bs.report_state, FLAGS.node_name, bin_name)
pulse.start(interval=FLAGS.volume_report_state_interval, now=False)
consumer_all.attach_to_twisted()
consumer_node.attach_to_twisted()
# This is the parent service that twistd will be looking for when it
# parses this file, return it so that we can get it into globals below
application = service.Application(bin_name)
bs.setServiceParent(application)
return application
# NOTE(termie): When this script is executed from the commandline what it will
# actually do is tell the twistd application runner that it
# should run this file as a twistd application (see below).
if __name__ == '__main__':
twistd.serve(__file__)
# NOTE(termie): When this script is loaded by the twistd application runner
# this code path will be executed and twistd will expect a
# variable named 'application' to be available, it will then
# handle starting it and stopping it.
if __name__ == '__builtin__':
application = main()
application = node.VolumeNode.create()

View File

@@ -38,9 +38,9 @@ from nova.auth import rbac
from nova.auth import users
from nova.compute import model
from nova.compute import network
from nova.compute import node
from nova.compute import computenode
from nova.endpoint import images
from nova.volume import storage
from nova.volume import volumenode
FLAGS = flags.FLAGS
@@ -76,7 +76,7 @@ class CloudController(object):
def volumes(self):
""" returns a list of all volumes """
for volume_id in datastore.Redis.instance().smembers("volumes"):
volume = storage.get_volume(volume_id)
volume = volumenode.get_volume(volume_id)
yield volume
def __str__(self):
@@ -103,7 +103,7 @@ class CloudController(object):
result = {}
for instance in self.instdir.all:
if instance['project_id'] == project_id:
line = '%s slots=%d' % (instance['private_dns_name'], node.INSTANCE_TYPES[instance['instance_type']]['vcpus'])
line = '%s slots=%d' % (instance['private_dns_name'], computenode.INSTANCE_TYPES[instance['instance_type']]['vcpus'])
if instance['key_name'] in result:
result[instance['key_name']].append(line)
else:
@@ -296,8 +296,8 @@ class CloudController(object):
@rbac.allow('projectmanager', 'sysadmin')
def create_volume(self, context, size, **kwargs):
# TODO(vish): refactor this to create the volume object here and tell storage to create it
res = rpc.call(FLAGS.storage_topic, {"method": "create_volume",
# TODO(vish): refactor this to create the volume object here and tell volumenode to create it
res = rpc.call(FLAGS.volume_topic, {"method": "create_volume",
"args" : {"size": size,
"user_id": context.user.id,
"project_id": context.project.id}})
@@ -331,7 +331,7 @@ class CloudController(object):
raise exception.NotFound('Instance %s could not be found' % instance_id)
def _get_volume(self, context, volume_id):
volume = storage.get_volume(volume_id)
volume = volumenode.get_volume(volume_id)
if context.user.is_admin() or volume['project_id'] == context.project.id:
return volume
raise exception.NotFound('Volume %s could not be found' % volume_id)
@@ -628,8 +628,8 @@ class CloudController(object):
def delete_volume(self, context, volume_id, **kwargs):
# TODO: return error if not authorized
volume = self._get_volume(context, volume_id)
storage_node = volume['node_name']
rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node),
volume_node = volume['node_name']
rpc.cast('%s.%s' % (FLAGS.volume_topic, volume_node),
{"method": "delete_volume",
"args" : {"volume_id": volume_id}})
return defer.succeed(True)

View File

@@ -39,7 +39,6 @@ from nova.compute import model
from nova.compute import network
from nova.endpoint import images
from nova.endpoint import wsgi
from nova.volume import storage
FLAGS = flags.FLAGS

View File

@@ -40,7 +40,9 @@ DEFINE_integer('s3_port', 3333, 's3 port')
DEFINE_string('s3_host', '127.0.0.1', 's3 host')
#DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on')
DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on')
DEFINE_string('storage_topic', 'storage', 'the topic storage nodes listen on')
DEFINE_string('volume_topic', 'volume', 'the topic volume nodes listen on')
DEFINE_string('network_topic', 'network', 'the topic network nodes listen on')
DEFINE_bool('fake_libvirt', False,
'whether to use a fake libvirt or not')
DEFINE_bool('verbose', False, 'show debug output')

103
nova/node.py Normal file
View File

@@ -0,0 +1,103 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Generic Node baseclass for all workers that run on hosts
"""
import inspect
import logging
import os
from twisted.internet import defer
from twisted.internet import task
from twisted.application import service
from nova import datastore
from nova import flags
from nova import rpc
from nova.compute import model
FLAGS = flags.FLAGS
flags.DEFINE_integer('report_interval', 10,
'seconds between nodes reporting state to cloud',
lower_bound=1)
class Node(object, service.Service):
"""Base class for workers that run on hosts"""
@classmethod
def create(cls,
report_interval=None, # defaults to flag
bin_name=None, # defaults to basename of executable
topic=None): # defaults to basename - "nova-" part
"""Instantiates class and passes back application object"""
if not report_interval:
# NOTE(vish): set here because if it is set to flag in the
# parameter list, it wrongly uses the default
report_interval = FLAGS.report_interval
# NOTE(vish): magic to automatically determine bin_name and topic
if not bin_name:
bin_name = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = bin_name.rpartition("nova-")[2]
logging.warn("Starting %s node" % topic)
node_instance = cls()
conn = rpc.Connection.instance()
consumer_all = rpc.AdapterConsumer(
connection=conn,
topic='%s' % topic,
proxy=node_instance)
consumer_node = rpc.AdapterConsumer(
connection=conn,
topic='%s.%s' % (topic, FLAGS.node_name),
proxy=node_instance)
pulse = task.LoopingCall(node_instance.report_state,
FLAGS.node_name,
bin_name)
pulse.start(interval=report_interval, now=False)
consumer_all.attach_to_twisted()
consumer_node.attach_to_twisted()
# This is the parent service that twistd will be looking for when it
# parses this file, return it so that we can get it into globals below
application = service.Application(bin_name)
node_instance.setServiceParent(application)
return application
@defer.inlineCallbacks
def report_state(self, nodename, daemon):
# TODO(termie): make this pattern be more elegant. -todd
try:
record = model.Daemon(nodename, daemon)
record.heartbeat()
if getattr(self, "model_disconnected", False):
self.model_disconnected = False
logging.error("Recovered model server connection!")
except datastore.ConnectionError, ex:
if not getattr(self, "model_disconnected", False):
self.model_disconnected = True
logging.exception("model server went away")
yield

View File

@@ -28,7 +28,7 @@ from nova import flags
from nova import rpc
from nova import test
from nova.auth import users
from nova.compute import node
from nova.compute import computenode
from nova.endpoint import api
from nova.endpoint import cloud
@@ -54,7 +54,7 @@ class CloudTestCase(test.BaseTestCase):
self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop))
# set up a node
self.node = node.Node()
self.node = computenode.ComputeNode()
self.node_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.compute_topic,
proxy=self.node)

View File

@@ -26,7 +26,7 @@ from nova import flags
from nova import test
from nova import utils
from nova.compute import model
from nova.compute import node
from nova.compute import computenode
FLAGS = flags.FLAGS
@@ -53,14 +53,14 @@ class InstanceXmlTestCase(test.TrialTestCase):
# rv = yield first_node.terminate_instance(instance_id)
class NodeConnectionTestCase(test.TrialTestCase):
class ComputeConnectionTestCase(test.TrialTestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(NodeConnectionTestCase, self).setUp()
super(ComputeConnectionTestCase, self).setUp()
self.flags(fake_libvirt=True,
fake_storage=True,
fake_users=True)
self.node = node.Node()
self.node = computenode.ComputeNode()
def create_instance(self):
instdir = model.InstanceDirectory()

View File

@@ -1,75 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import mox
import StringIO
import time
from tornado import ioloop
from twisted.internet import defer
import unittest
from xml.etree import ElementTree
from nova import cloud
from nova import exception
from nova import flags
from nova import node
from nova import rpc
from nova import test
FLAGS = flags.FLAGS
class AdminTestCase(test.BaseTestCase):
def setUp(self):
super(AdminTestCase, self).setUp()
self.flags(fake_libvirt=True,
fake_rabbit=True)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.INFO)
# set up our cloud
self.cloud = cloud.CloudController()
self.cloud_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.cloud_topic,
proxy=self.cloud)
self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop))
# set up a node
self.node = node.Node()
self.node_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.compute_topic,
proxy=self.node)
self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop))
def test_flush_terminated(self):
# Launch an instance
# Wait until it's running
# Terminate it
# Wait until it's terminated
# Flush terminated nodes
# ASSERT that it's gone
pass

View File

@@ -25,7 +25,6 @@ from nova import flags
from nova import test
from nova import utils
from nova.compute import model
from nova.compute import node
FLAGS = flags.FLAGS

View File

@@ -21,22 +21,22 @@ import logging
from nova import exception
from nova import flags
from nova import test
from nova.compute import node
from nova.volume import storage
from nova.compute import computenode
from nova.volume import volumenode
FLAGS = flags.FLAGS
class StorageTestCase(test.TrialTestCase):
class VolumeTestCase(test.TrialTestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(StorageTestCase, self).setUp()
self.mynode = node.Node()
super(VolumeTestCase, self).setUp()
self.mynode = computenode.ComputeNode()
self.mystorage = None
self.flags(fake_libvirt=True,
fake_storage=True)
self.mystorage = storage.BlockStore()
self.mystorage = volumenode.VolumeNode()
def test_run_create_volume(self):
vol_size = '0'
@@ -45,11 +45,11 @@ class StorageTestCase(test.TrialTestCase):
volume_id = self.mystorage.create_volume(vol_size, user_id, project_id)
# TODO(termie): get_volume returns differently than create_volume
self.assertEqual(volume_id,
storage.get_volume(volume_id)['volume_id'])
volumenode.get_volume(volume_id)['volume_id'])
rv = self.mystorage.delete_volume(volume_id)
self.assertRaises(exception.Error,
storage.get_volume,
volumenode.get_volume,
volume_id)
def test_too_big_volume(self):
@@ -70,7 +70,7 @@ class StorageTestCase(test.TrialTestCase):
for i in xrange(total_slots):
vid = self.mystorage.create_volume(vol_size, user_id, project_id)
vols.append(vid)
self.assertRaises(storage.NoMoreVolumes,
self.assertRaises(volumenode.NoMoreVolumes,
self.mystorage.create_volume,
vol_size, user_id, project_id)
for id in vols:
@@ -85,7 +85,7 @@ class StorageTestCase(test.TrialTestCase):
mountpoint = "/dev/sdf"
volume_id = self.mystorage.create_volume(vol_size, user_id, project_id)
volume_obj = storage.get_volume(volume_id)
volume_obj = volumenode.get_volume(volume_id)
volume_obj.start_attach(instance_id, mountpoint)
rv = yield self.mynode.attach_volume(volume_id,
instance_id,
@@ -100,12 +100,12 @@ class StorageTestCase(test.TrialTestCase):
volume_id)
rv = yield self.mystorage.detach_volume(volume_id)
volume_obj = storage.get_volume(volume_id)
volume_obj = volumenode.get_volume(volume_id)
self.assertEqual(volume_obj['status'], "available")
rv = self.mystorage.delete_volume(volume_id)
self.assertRaises(exception.Error,
storage.get_volume,
volumenode.get_volume,
volume_id)
def test_multi_node(self):

View File

@@ -32,7 +32,6 @@ from twisted.python import log
from twisted.python import reflect
from twisted.python import runtime
from twisted.python import usage
import UserDict
from nova import flags
@@ -161,6 +160,13 @@ def WrapTwistedOptions(wrapped):
except (AttributeError, KeyError):
self._data[key] = value
def get(self, key, default):
key = key.replace('-', '_')
try:
return getattr(FLAGS, key)
except (AttributeError, KeyError):
self._data.get(key, default)
return TwistedOptionsToFlags
@@ -210,8 +216,12 @@ def serve(filename):
elif FLAGS.pidfile.endswith('twistd.pid'):
FLAGS.pidfile = FLAGS.pidfile.replace('twistd.pid', '%s.pid' % name)
print FLAGS.logfile
if not FLAGS.logfile:
FLAGS.logfile = '%s.log' % name
elif FLAGS.logfile.endswith('twistd.log'):
FLAGS.logfile = FLAGS.logfile.replace('twistd.log', '%s.log' % name)
print FLAGS.logfile
action = 'start'
if len(argv) > 1:

View File

@@ -52,14 +52,14 @@ from nova import twistd
from nova.tests.access_unittest import *
from nova.tests.api_unittest import *
from nova.tests.cloud_unittest import *
from nova.tests.compute_unittest import *
from nova.tests.model_unittest import *
from nova.tests.network_unittest import *
from nova.tests.node_unittest import *
from nova.tests.objectstore_unittest import *
from nova.tests.process_unittest import *
from nova.tests.storage_unittest import *
from nova.tests.users_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.volume_unittest import *
FLAGS = flags.FLAGS