Makes the compute and volume daemon workers use a common base class called Service. Adds a NetworkService in preparation for splitting out networking code. General cleanup and standardizarion of naming.

This commit is contained in:
Vishvananda Ishaya 2010-07-27 22:42:40 +00:00 committed by Tarmac
commit c1e3a87253
19 changed files with 321 additions and 328 deletions

View File

@ -19,81 +19,14 @@
"""
Twistd daemon for the nova compute nodes.
Receives messages via AMQP, manages pool of worker threads
for async tasks.
"""
import logging
import os
import sys
# NOTE(termie): kludge so that we can run this from the bin directory in the
# checkout without having to screw with paths
NOVA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'nova')
if os.path.exists(NOVA_PATH):
sys.path.insert(0, os.path.dirname(NOVA_PATH))
from twisted.internet import task
from twisted.application import service
from nova import flags
from nova import rpc
from nova import twistd
from nova.compute import node
from nova.objectstore import image # For the images_path flag
from nova.compute import service
FLAGS = flags.FLAGS
# NOTE(termie): This file will necessarily be re-imported under different
# context when the twistd.serve() call is made below so any
# flags we define here will have to be conditionally defined,
# flags defined by imported modules are safe.
if 'compute_report_state_interval' not in FLAGS:
flags.DEFINE_integer('compute_report_state_interval', 10,
'seconds between nodes reporting state to cloud',
lower_bound=1)
logging.getLogger().setLevel(logging.DEBUG)
def main():
logging.warn('Starting compute node')
n = node.Node()
d = n.adopt_instances()
d.addCallback(lambda x: logging.info('Adopted %d instances', x))
conn = rpc.Connection.instance()
consumer_all = rpc.AdapterConsumer(
connection=conn,
topic='%s' % FLAGS.compute_topic,
proxy=n)
consumer_node = rpc.AdapterConsumer(
connection=conn,
topic='%s.%s' % (FLAGS.compute_topic, FLAGS.node_name),
proxy=n)
bin_name = os.path.basename(__file__)
pulse = task.LoopingCall(n.report_state, FLAGS.node_name, bin_name)
pulse.start(interval=FLAGS.compute_report_state_interval, now=False)
consumer_all.attach_to_twisted()
consumer_node.attach_to_twisted()
# This is the parent service that twistd will be looking for when it
# parses this file, return it so that we can get it into globals below
application = service.Application(bin_name)
n.setServiceParent(application)
return application
# NOTE(termie): When this script is executed from the commandline what it will
# actually do is tell the twistd application runner that it
# should run this file as a twistd application (see below).
if __name__ == '__main__':
twistd.serve(__file__)
# NOTE(termie): When this script is loaded by the twistd application runner
# this code path will be executed and twistd will expect a
# variable named 'application' to be available, it will then
# handle starting it and stopping it.
if __name__ == '__builtin__':
application = main()
application = service.ComputeService.create()

32
bin/nova-network Executable file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Twistd daemon for the nova network nodes.
"""
from nova import twistd
from nova.network import service
if __name__ == '__main__':
twistd.serve(__file__)
if __name__ == '__builtin__':
application = service.NetworkService.create()

View File

@ -18,77 +18,15 @@
# under the License.
"""
Tornado Storage daemon manages AoE volumes via AMQP messaging.
Twistd daemon for the nova volume nodes.
"""
import logging
import os
import sys
# NOTE(termie): kludge so that we can run this from the bin directory in the
# checkout without having to screw with paths
NOVA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'nova')
if os.path.exists(NOVA_PATH):
sys.path.insert(0, os.path.dirname(NOVA_PATH))
from twisted.internet import task
from twisted.application import service
from nova import flags
from nova import rpc
from nova import twistd
from nova.volume import storage
from nova.volume import service
FLAGS = flags.FLAGS
# NOTE(termie): This file will necessarily be re-imported under different
# context when the twistd.serve() call is made below so any
# flags we define here will have to be conditionally defined,
# flags defined by imported modules are safe.
if 'volume_report_state_interval' not in FLAGS:
flags.DEFINE_integer('volume_report_state_interval', 10,
'seconds between nodes reporting state to cloud',
lower_bound=1)
def main():
logging.warn('Starting volume node')
bs = storage.BlockStore()
conn = rpc.Connection.instance()
consumer_all = rpc.AdapterConsumer(
connection=conn,
topic='%s' % FLAGS.storage_topic,
proxy=bs)
consumer_node = rpc.AdapterConsumer(
connection=conn,
topic='%s.%s' % (FLAGS.storage_topic, FLAGS.node_name),
proxy=bs)
bin_name = os.path.basename(__file__)
pulse = task.LoopingCall(bs.report_state, FLAGS.node_name, bin_name)
pulse.start(interval=FLAGS.volume_report_state_interval, now=False)
consumer_all.attach_to_twisted()
consumer_node.attach_to_twisted()
# This is the parent service that twistd will be looking for when it
# parses this file, return it so that we can get it into globals below
application = service.Application(bin_name)
bs.setServiceParent(application)
return application
# NOTE(termie): When this script is executed from the commandline what it will
# actually do is tell the twistd application runner that it
# should run this file as a twistd application (see below).
if __name__ == '__main__':
twistd.serve(__file__)
# NOTE(termie): When this script is loaded by the twistd application runner
# this code path will be executed and twistd will expect a
# variable named 'application' to be available, it will then
# handle starting it and stopping it.
if __name__ == '__builtin__':
application = main()
application = service.VolumeService.create()

View File

@ -17,9 +17,9 @@
# under the License.
"""
Compute Node:
Compute Service:
Runs on each compute node, managing the
Runs on each compute host, managing the
hypervisor using libvirt.
"""
@ -34,7 +34,6 @@ import sys
import time
from twisted.internet import defer
from twisted.internet import task
from twisted.application import service
try:
@ -46,13 +45,14 @@ from nova import exception
from nova import fakevirt
from nova import flags
from nova import process
from nova import service
from nova import utils
from nova.auth import signer, manager
from nova.compute import disk
from nova.compute import model
from nova.compute import network
from nova.objectstore import image # for image_path flag
from nova.volume import storage
from nova.volume import service as volume_service
FLAGS = flags.FLAGS
@ -81,13 +81,13 @@ def _image_url(path):
return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path)
class Node(object, service.Service):
class ComputeService(service.Service):
"""
Manages the running instances.
"""
def __init__(self):
""" load configuration options for this node and connect to libvirt """
super(Node, self).__init__()
super(ComputeService, self).__init__()
self._instances = {}
self._conn = self._get_connection()
self.instdir = model.InstanceDirectory()
@ -224,7 +224,7 @@ class Node(object, service.Service):
@exception.wrap_exception
def attach_volume(self, instance_id = None,
volume_id = None, mountpoint = None):
volume = storage.get_volume(volume_id)
volume = volume_service.get_volume(volume_id)
yield self._init_aoe()
yield process.simple_execute(
"sudo virsh attach-disk %s /dev/etherd/%s %s" %
@ -245,7 +245,7 @@ class Node(object, service.Service):
""" detach a volume from an instance """
# despite the documentation, virsh detach-disk just wants the device
# name without the leading /dev/
volume = storage.get_volume(volume_id)
volume = volume_service.get_volume(volume_id)
target = volume['mountpoint'].rpartition('/dev/')[2]
yield process.simple_execute(
"sudo virsh detach-disk %s %s " % (instance_id, target))

View File

@ -23,7 +23,6 @@ datastore.
"""
import base64
import json
import logging
import os
import time
@ -38,9 +37,9 @@ from nova.auth import rbac
from nova.auth import manager
from nova.compute import model
from nova.compute import network
from nova.compute import node
from nova.compute import service as compute_service
from nova.endpoint import images
from nova.volume import storage
from nova.volume import service as volume_service
FLAGS = flags.FLAGS
@ -76,7 +75,7 @@ class CloudController(object):
def volumes(self):
""" returns a list of all volumes """
for volume_id in datastore.Redis.instance().smembers("volumes"):
volume = storage.get_volume(volume_id)
volume = volume_service.get_volume(volume_id)
yield volume
def __str__(self):
@ -103,7 +102,7 @@ class CloudController(object):
result = {}
for instance in self.instdir.all:
if instance['project_id'] == project_id:
line = '%s slots=%d' % (instance['private_dns_name'], node.INSTANCE_TYPES[instance['instance_type']]['vcpus'])
line = '%s slots=%d' % (instance['private_dns_name'], compute_service.INSTANCE_TYPES[instance['instance_type']]['vcpus'])
if instance['key_name'] in result:
result[instance['key_name']].append(line)
else:
@ -296,8 +295,8 @@ class CloudController(object):
@rbac.allow('projectmanager', 'sysadmin')
def create_volume(self, context, size, **kwargs):
# TODO(vish): refactor this to create the volume object here and tell storage to create it
res = rpc.call(FLAGS.storage_topic, {"method": "create_volume",
# TODO(vish): refactor this to create the volume object here and tell service to create it
res = rpc.call(FLAGS.volume_topic, {"method": "create_volume",
"args" : {"size": size,
"user_id": context.user.id,
"project_id": context.project.id}})
@ -331,7 +330,7 @@ class CloudController(object):
raise exception.NotFound('Instance %s could not be found' % instance_id)
def _get_volume(self, context, volume_id):
volume = storage.get_volume(volume_id)
volume = volume_service.get_volume(volume_id)
if context.user.is_admin() or volume['project_id'] == context.project.id:
return volume
raise exception.NotFound('Volume %s could not be found' % volume_id)
@ -578,7 +577,7 @@ class CloudController(object):
"args": {"instance_id" : inst.instance_id}})
logging.debug("Casting to node for %s's instance with IP of %s" %
(context.user.name, inst['private_dns_name']))
# TODO: Make the NetworkComputeNode figure out the network name from ip.
# TODO: Make Network figure out the network name from ip.
return defer.succeed(self._format_instances(
context, reservation_id))
@ -628,8 +627,8 @@ class CloudController(object):
def delete_volume(self, context, volume_id, **kwargs):
# TODO: return error if not authorized
volume = self._get_volume(context, volume_id)
storage_node = volume['node_name']
rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node),
volume_node = volume['node_name']
rpc.cast('%s.%s' % (FLAGS.volume_topic, volume_node),
{"method": "delete_volume",
"args" : {"volume_id": volume_id}})
return defer.succeed(True)

View File

@ -39,7 +39,6 @@ from nova.compute import model
from nova.compute import network
from nova.endpoint import images
from nova.endpoint import wsgi
from nova.volume import storage
FLAGS = flags.FLAGS

View File

@ -40,7 +40,9 @@ DEFINE_integer('s3_port', 3333, 's3 port')
DEFINE_string('s3_host', '127.0.0.1', 's3 host')
#DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on')
DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on')
DEFINE_string('storage_topic', 'storage', 'the topic storage nodes listen on')
DEFINE_string('volume_topic', 'volume', 'the topic volume nodes listen on')
DEFINE_string('network_topic', 'network', 'the topic network nodes listen on')
DEFINE_bool('fake_libvirt', False,
'whether to use a fake libvirt or not')
DEFINE_bool('verbose', False, 'show debug output')

32
nova/network/__init__.py Normal file
View File

@ -0,0 +1,32 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`nova.network` -- Network Nodes
=====================================================
.. automodule:: nova.network
:platform: Unix
:synopsis: Network is responsible for managing networking
.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com>
.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com>
.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com>
.. moduleauthor:: Joshua McKenty <joshua@cognition.ca>
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""

35
nova/network/service.py Normal file
View File

@ -0,0 +1,35 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Network Nodes are responsible for allocating ips and setting up network
"""
import logging
from nova import flags
from nova import service
FLAGS = flags.FLAGS
class NetworkService(service.Service):
"""Allocates ips and sets up networks"""
def __init__(self):
logging.debug("Network node working")

103
nova/service.py Normal file
View File

@ -0,0 +1,103 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Generic Node baseclass for all workers that run on hosts
"""
import inspect
import logging
import os
from twisted.internet import defer
from twisted.internet import task
from twisted.application import service
from nova import datastore
from nova import flags
from nova import rpc
from nova.compute import model
FLAGS = flags.FLAGS
flags.DEFINE_integer('report_interval', 10,
'seconds between nodes reporting state to cloud',
lower_bound=1)
class Service(object, service.Service):
"""Base class for workers that run on hosts"""
@classmethod
def create(cls,
report_interval=None, # defaults to flag
bin_name=None, # defaults to basename of executable
topic=None): # defaults to basename - "nova-" part
"""Instantiates class and passes back application object"""
if not report_interval:
# NOTE(vish): set here because if it is set to flag in the
# parameter list, it wrongly uses the default
report_interval = FLAGS.report_interval
# NOTE(vish): magic to automatically determine bin_name and topic
if not bin_name:
bin_name = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = bin_name.rpartition("nova-")[2]
logging.warn("Starting %s node" % topic)
node_instance = cls()
conn = rpc.Connection.instance()
consumer_all = rpc.AdapterConsumer(
connection=conn,
topic='%s' % topic,
proxy=node_instance)
consumer_node = rpc.AdapterConsumer(
connection=conn,
topic='%s.%s' % (topic, FLAGS.node_name),
proxy=node_instance)
pulse = task.LoopingCall(node_instance.report_state,
FLAGS.node_name,
bin_name)
pulse.start(interval=report_interval, now=False)
consumer_all.attach_to_twisted()
consumer_node.attach_to_twisted()
# This is the parent service that twistd will be looking for when it
# parses this file, return it so that we can get it into globals below
application = service.Application(bin_name)
node_instance.setServiceParent(application)
return application
@defer.inlineCallbacks
def report_state(self, nodename, daemon):
# TODO(termie): make this pattern be more elegant. -todd
try:
record = model.Daemon(nodename, daemon)
record.heartbeat()
if getattr(self, "model_disconnected", False):
self.model_disconnected = False
logging.error("Recovered model server connection!")
except datastore.ConnectionError, ex:
if not getattr(self, "model_disconnected", False):
self.model_disconnected = True
logging.exception("model server went away")
yield

View File

@ -156,9 +156,9 @@ class BaseTestCase(TrialTestCase):
Example (callback chain, ugly):
d = self.node.terminate_instance(instance_id) # a Deferred instance
d = self.compute.terminate_instance(instance_id) # a Deferred instance
def _describe(_):
d_desc = self.node.describe_instances() # another Deferred instance
d_desc = self.compute.describe_instances() # another Deferred instance
return d_desc
def _checkDescribe(rv):
self.assertEqual(rv, [])
@ -169,8 +169,8 @@ class BaseTestCase(TrialTestCase):
Example (inline callbacks! yay!):
yield self.node.terminate_instance(instance_id)
rv = yield self.node.describe_instances()
yield self.compute.terminate_instance(instance_id)
rv = yield self.compute.describe_instances()
self.assertEqual(rv, [])
If the test fits the Inline Callbacks pattern we will automatically

View File

@ -28,7 +28,7 @@ from nova import flags
from nova import rpc
from nova import test
from nova.auth import manager
from nova.compute import node
from nova.compute import service
from nova.endpoint import api
from nova.endpoint import cloud
@ -52,12 +52,12 @@ class CloudTestCase(test.BaseTestCase):
proxy=self.cloud)
self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop))
# set up a node
self.node = node.Node()
self.node_consumer = rpc.AdapterConsumer(connection=self.conn,
# set up a service
self.compute = service.ComputeService()
self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.compute_topic,
proxy=self.node)
self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop))
proxy=self.compute)
self.injected.append(self.compute_consumer.attach_to_tornado(self.ioloop))
try:
manager.AuthManager().create_user('admin', 'admin', 'admin')
@ -75,11 +75,11 @@ class CloudTestCase(test.BaseTestCase):
logging.debug("Can't test instances without a real virtual env.")
return
instance_id = 'foo'
inst = yield self.node.run_instance(instance_id)
inst = yield self.compute.run_instance(instance_id)
output = yield self.cloud.get_console_output(self.context, [instance_id])
logging.debug(output)
self.assert_(output)
rv = yield self.node.terminate_instance(instance_id)
rv = yield self.compute.terminate_instance(instance_id)
def test_run_instances(self):
if FLAGS.fake_libvirt:
@ -111,7 +111,7 @@ class CloudTestCase(test.BaseTestCase):
# for instance in reservations[res_id]:
for instance in reservations[reservations.keys()[0]]:
logging.debug("Terminating instance %s" % instance['instance_id'])
rv = yield self.node.terminate_instance(instance['instance_id'])
rv = yield self.compute.terminate_instance(instance['instance_id'])
def test_instance_update_state(self):
def instance(num):

View File

@ -26,7 +26,7 @@ from nova import flags
from nova import test
from nova import utils
from nova.compute import model
from nova.compute import node
from nova.compute import service
FLAGS = flags.FLAGS
@ -53,13 +53,13 @@ class InstanceXmlTestCase(test.TrialTestCase):
# rv = yield first_node.terminate_instance(instance_id)
class NodeConnectionTestCase(test.TrialTestCase):
class ComputeConnectionTestCase(test.TrialTestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(NodeConnectionTestCase, self).setUp()
super(ComputeConnectionTestCase, self).setUp()
self.flags(fake_libvirt=True,
fake_storage=True)
self.node = node.Node()
self.compute = service.ComputeService()
def create_instance(self):
instdir = model.InstanceDirectory()
@ -80,48 +80,48 @@ class NodeConnectionTestCase(test.TrialTestCase):
def test_run_describe_terminate(self):
instance_id = self.create_instance()
rv = yield self.node.run_instance(instance_id)
rv = yield self.compute.run_instance(instance_id)
rv = yield self.node.describe_instances()
rv = yield self.compute.describe_instances()
logging.info("Running instances: %s", rv)
self.assertEqual(rv[instance_id].name, instance_id)
rv = yield self.node.terminate_instance(instance_id)
rv = yield self.compute.terminate_instance(instance_id)
rv = yield self.node.describe_instances()
rv = yield self.compute.describe_instances()
logging.info("After terminating instances: %s", rv)
self.assertEqual(rv, {})
@defer.inlineCallbacks
def test_reboot(self):
instance_id = self.create_instance()
rv = yield self.node.run_instance(instance_id)
rv = yield self.compute.run_instance(instance_id)
rv = yield self.node.describe_instances()
rv = yield self.compute.describe_instances()
self.assertEqual(rv[instance_id].name, instance_id)
yield self.node.reboot_instance(instance_id)
yield self.compute.reboot_instance(instance_id)
rv = yield self.node.describe_instances()
rv = yield self.compute.describe_instances()
self.assertEqual(rv[instance_id].name, instance_id)
rv = yield self.node.terminate_instance(instance_id)
rv = yield self.compute.terminate_instance(instance_id)
@defer.inlineCallbacks
def test_console_output(self):
instance_id = self.create_instance()
rv = yield self.node.run_instance(instance_id)
rv = yield self.compute.run_instance(instance_id)
console = yield self.node.get_console_output(instance_id)
console = yield self.compute.get_console_output(instance_id)
self.assert_(console)
rv = yield self.node.terminate_instance(instance_id)
rv = yield self.compute.terminate_instance(instance_id)
@defer.inlineCallbacks
def test_run_instance_existing(self):
instance_id = self.create_instance()
rv = yield self.node.run_instance(instance_id)
rv = yield self.compute.run_instance(instance_id)
rv = yield self.node.describe_instances()
rv = yield self.compute.describe_instances()
self.assertEqual(rv[instance_id].name, instance_id)
self.assertRaises(exception.Error, self.node.run_instance, instance_id)
rv = yield self.node.terminate_instance(instance_id)
self.assertRaises(exception.Error, self.compute.run_instance, instance_id)
rv = yield self.compute.terminate_instance(instance_id)

View File

@ -1,75 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import mox
import StringIO
import time
from tornado import ioloop
from twisted.internet import defer
import unittest
from xml.etree import ElementTree
from nova import cloud
from nova import exception
from nova import flags
from nova import node
from nova import rpc
from nova import test
FLAGS = flags.FLAGS
class AdminTestCase(test.BaseTestCase):
def setUp(self):
super(AdminTestCase, self).setUp()
self.flags(fake_libvirt=True,
fake_rabbit=True)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.INFO)
# set up our cloud
self.cloud = cloud.CloudController()
self.cloud_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.cloud_topic,
proxy=self.cloud)
self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop))
# set up a node
self.node = node.Node()
self.node_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.compute_topic,
proxy=self.node)
self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop))
def test_flush_terminated(self):
# Launch an instance
# Wait until it's running
# Terminate it
# Wait until it's terminated
# Flush terminated nodes
# ASSERT that it's gone
pass

View File

@ -25,7 +25,6 @@ from nova import flags
from nova import test
from nova import utils
from nova.compute import model
from nova.compute import node
FLAGS = flags.FLAGS

View File

@ -18,38 +18,38 @@
import logging
from nova import compute
from nova import exception
from nova import flags
from nova import test
from nova.compute import node
from nova.volume import storage
from nova.volume import service as volume_service
FLAGS = flags.FLAGS
class StorageTestCase(test.TrialTestCase):
class VolumeTestCase(test.TrialTestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(StorageTestCase, self).setUp()
self.mynode = node.Node()
self.mystorage = None
super(VolumeTestCase, self).setUp()
self.compute = compute.service.ComputeService()
self.volume = None
self.flags(fake_libvirt=True,
fake_storage=True)
self.mystorage = storage.BlockStore()
self.volume = volume_service.VolumeService()
def test_run_create_volume(self):
vol_size = '0'
user_id = 'fake'
project_id = 'fake'
volume_id = self.mystorage.create_volume(vol_size, user_id, project_id)
volume_id = self.volume.create_volume(vol_size, user_id, project_id)
# TODO(termie): get_volume returns differently than create_volume
self.assertEqual(volume_id,
storage.get_volume(volume_id)['volume_id'])
volume_service.get_volume(volume_id)['volume_id'])
rv = self.mystorage.delete_volume(volume_id)
rv = self.volume.delete_volume(volume_id)
self.assertRaises(exception.Error,
storage.get_volume,
volume_service.get_volume,
volume_id)
def test_too_big_volume(self):
@ -57,7 +57,7 @@ class StorageTestCase(test.TrialTestCase):
user_id = 'fake'
project_id = 'fake'
self.assertRaises(TypeError,
self.mystorage.create_volume,
self.volume.create_volume,
vol_size, user_id, project_id)
def test_too_many_volumes(self):
@ -68,26 +68,26 @@ class StorageTestCase(test.TrialTestCase):
total_slots = FLAGS.slots_per_shelf * num_shelves
vols = []
for i in xrange(total_slots):
vid = self.mystorage.create_volume(vol_size, user_id, project_id)
vid = self.volume.create_volume(vol_size, user_id, project_id)
vols.append(vid)
self.assertRaises(storage.NoMoreVolumes,
self.mystorage.create_volume,
self.assertRaises(volume_service.NoMoreVolumes,
self.volume.create_volume,
vol_size, user_id, project_id)
for id in vols:
self.mystorage.delete_volume(id)
self.volume.delete_volume(id)
def test_run_attach_detach_volume(self):
# Create one volume and one node to test with
# Create one volume and one compute to test with
instance_id = "storage-test"
vol_size = "5"
user_id = "fake"
project_id = 'fake'
mountpoint = "/dev/sdf"
volume_id = self.mystorage.create_volume(vol_size, user_id, project_id)
volume_id = self.volume.create_volume(vol_size, user_id, project_id)
volume_obj = storage.get_volume(volume_id)
volume_obj = volume_service.get_volume(volume_id)
volume_obj.start_attach(instance_id, mountpoint)
rv = yield self.mynode.attach_volume(volume_id,
rv = yield self.compute.attach_volume(volume_id,
instance_id,
mountpoint)
self.assertEqual(volume_obj['status'], "in-use")
@ -96,16 +96,16 @@ class StorageTestCase(test.TrialTestCase):
self.assertEqual(volume_obj['mountpoint'], mountpoint)
self.assertRaises(exception.Error,
self.mystorage.delete_volume,
self.volume.delete_volume,
volume_id)
rv = yield self.mystorage.detach_volume(volume_id)
volume_obj = storage.get_volume(volume_id)
rv = yield self.volume.detach_volume(volume_id)
volume_obj = volume_service.get_volume(volume_id)
self.assertEqual(volume_obj['status'], "available")
rv = self.mystorage.delete_volume(volume_id)
rv = self.volume.delete_volume(volume_id)
self.assertRaises(exception.Error,
storage.get_volume,
volume_service.get_volume,
volume_id)
def test_multi_node(self):

View File

@ -22,7 +22,6 @@ manage pid files and support syslogging.
"""
import logging
import logging.handlers
import os
import signal
import sys
@ -32,7 +31,6 @@ from twisted.python import log
from twisted.python import reflect
from twisted.python import runtime
from twisted.python import usage
import UserDict
from nova import flags
@ -161,6 +159,13 @@ def WrapTwistedOptions(wrapped):
except (AttributeError, KeyError):
self._data[key] = value
def get(self, key, default):
key = key.replace('-', '_')
try:
return getattr(FLAGS, key)
except (AttributeError, KeyError):
self._data.get(key, default)
return TwistedOptionsToFlags
@ -209,9 +214,14 @@ def serve(filename):
FLAGS.pidfile = '%s.pid' % name
elif FLAGS.pidfile.endswith('twistd.pid'):
FLAGS.pidfile = FLAGS.pidfile.replace('twistd.pid', '%s.pid' % name)
if not FLAGS.logfile:
FLAGS.logfile = '%s.log' % name
elif FLAGS.logfile.endswith('twistd.log'):
FLAGS.logfile = FLAGS.logfile.replace('twistd.log', '%s.log' % name)
if not FLAGS.prefix:
FLAGS.prefix = name
elif FLAGS.prefix.endswith('twisted'):
FLAGS.prefix = FLAGS.prefix.replace('twisted', name)
action = 'start'
if len(argv) > 1:
@ -228,8 +238,16 @@ def serve(filename):
print 'usage: %s [options] [start|stop|restart]' % argv[0]
sys.exit(1)
formatter = logging.Formatter(
name + '(%(name)s): %(levelname)s %(message)s')
class NoNewlineFormatter(logging.Formatter):
"""Strips newlines from default formatter"""
def format(self, record):
"""Grabs default formatter's output and strips newlines"""
data = logging.Formatter.format(self, record)
return data.replace("\n", "--")
# NOTE(vish): syslog-ng doesn't handle newlines from trackbacks very well
formatter = NoNewlineFormatter(
'(%(name)s): %(levelname)s %(message)s')
handler = logging.StreamHandler(log.StdioOnnaStick())
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)
@ -239,11 +257,6 @@ def serve(filename):
else:
logging.getLogger().setLevel(logging.WARNING)
if FLAGS.syslog:
syslog = logging.handlers.SysLogHandler(address='/dev/log')
syslog.setFormatter(formatter)
logging.getLogger().addHandler(syslog)
logging.debug("Full set of FLAGS:")
for flag in FLAGS:
logging.debug("%s : %s" % (flag, FLAGS.get(flag, None)))

View File

@ -29,16 +29,15 @@ import shutil
import socket
import tempfile
from twisted.application import service
from twisted.internet import defer
from nova import datastore
from nova import exception
from nova import flags
from nova import process
from nova import service
from nova import utils
from nova import validate
from nova.compute import model
FLAGS = flags.FLAGS
@ -50,13 +49,13 @@ flags.DEFINE_string('aoe_eth_dev', 'eth0',
'Which device to export the volumes on')
flags.DEFINE_string('storage_name',
socket.gethostname(),
'name of this node')
'name of this service')
flags.DEFINE_integer('first_shelf_id',
utils.last_octet(utils.get_my_ip()) * 10,
'AoE starting shelf_id for this node')
'AoE starting shelf_id for this service')
flags.DEFINE_integer('last_shelf_id',
utils.last_octet(utils.get_my_ip()) * 10 + 9,
'AoE starting shelf_id for this node')
'AoE starting shelf_id for this service')
flags.DEFINE_string('aoe_export_dir',
'/var/lib/vblade-persist/vblades',
'AoE directory where exports are created')
@ -65,7 +64,7 @@ flags.DEFINE_integer('slots_per_shelf',
'Number of AoE slots per shelf')
flags.DEFINE_string('storage_availability_zone',
'nova',
'availability zone of this node')
'availability zone of this service')
flags.DEFINE_boolean('fake_storage', False,
'Should we make real storage volumes to attach?')
@ -82,14 +81,14 @@ def get_volume(volume_id):
return volume_class(volume_id=volume_id)
raise exception.Error("Volume does not exist")
class BlockStore(object, service.Service):
class VolumeService(service.Service):
"""
There is one BlockStore running on each volume node.
However, each BlockStore can report on the state of
There is one VolumeNode running on each host.
However, each VolumeNode can report on the state of
*all* volumes in the cluster.
"""
def __init__(self):
super(BlockStore, self).__init__()
super(VolumeService, self).__init__()
self.volume_class = Volume
if FLAGS.fake_storage:
FLAGS.aoe_export_dir = tempfile.mkdtemp()
@ -104,22 +103,6 @@ class BlockStore(object, service.Service):
except Exception, err:
pass
@defer.inlineCallbacks
def report_state(self, nodename, daemon):
# TODO(termie): make this pattern be more elegant. -todd
try:
record = model.Daemon(nodename, daemon)
record.heartbeat()
if getattr(self, "model_disconnected", False):
self.model_disconnected = False
logging.error("Recovered model server connection!")
except model.ConnectionError, ex:
if not getattr(self, "model_disconnected", False):
self.model_disconnected = True
logging.exception("model server went away")
yield
@validate.rangetest(size=(0, 1000))
def create_volume(self, size, user_id, project_id):
"""

View File

@ -50,16 +50,16 @@ from nova import flags
from nova import twistd
from nova.tests.access_unittest import *
from nova.tests.auth_unittest import *
from nova.tests.api_unittest import *
from nova.tests.cloud_unittest import *
from nova.tests.compute_unittest import *
from nova.tests.model_unittest import *
from nova.tests.network_unittest import *
from nova.tests.node_unittest import *
from nova.tests.objectstore_unittest import *
from nova.tests.process_unittest import *
from nova.tests.storage_unittest import *
from nova.tests.auth_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.volume_unittest import *
FLAGS = flags.FLAGS