merged trunk

This commit is contained in:
Vishvananda Ishaya 2010-09-20 19:19:28 -07:00
commit 435a78e48c
26 changed files with 1250 additions and 47 deletions

43
bin/nova-scheduler Executable file

@ -0,0 +1,43 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Twistd daemon for the nova scheduler nodes.
"""
import os
import sys
# If ../nova/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import service
from nova import twistd
if __name__ == '__main__':
twistd.serve(__file__)
if __name__ == '__builtin__':
application = service.Service.create()

@ -21,6 +21,7 @@ Root WSGI middleware for all API controllers.
"""
import routes
import webob.dec
from nova import wsgi
from nova.api import ec2
@ -32,6 +33,18 @@ class API(wsgi.Router):
def __init__(self):
mapper = routes.Mapper()
mapper.connect("/", controller=self.versions)
mapper.connect("/v1.0/{path_info:.*}", controller=rackspace.API())
mapper.connect("/ec2/{path_info:.*}", controller=ec2.API())
super(API, self).__init__(mapper)
@webob.dec.wsgify
def versions(self, req):
"""Respond to a request for all OpenStack API versions."""
response = {
"versions": [
dict(status="CURRENT", id="v1.0")]}
metadata = {
"application/xml": {
"attributes": dict(version=["status", "id"])}}
return wsgi.Serializer(req.environ, metadata).to_content_type(response)

@ -31,6 +31,7 @@ from nova import flags
from nova import wsgi
from nova.api.rackspace import flavors
from nova.api.rackspace import images
from nova.api.rackspace import ratelimiting
from nova.api.rackspace import servers
from nova.api.rackspace import sharedipgroups
from nova.auth import manager
@ -40,7 +41,7 @@ class API(wsgi.Middleware):
"""WSGI entry point for all Rackspace API requests."""
def __init__(self):
app = AuthMiddleware(APIRouter())
app = AuthMiddleware(RateLimitingMiddleware(APIRouter()))
super(API, self).__init__(app)
@ -65,6 +66,72 @@ class AuthMiddleware(wsgi.Middleware):
return self.application
class RateLimitingMiddleware(wsgi.Middleware):
"""Rate limit incoming requests according to the OpenStack rate limits."""
def __init__(self, application, service_host=None):
"""Create a rate limiting middleware that wraps the given application.
By default, rate counters are stored in memory. If service_host is
specified, the middleware instead relies on the ratelimiting.WSGIApp
at the given host+port to keep rate counters.
"""
super(RateLimitingMiddleware, self).__init__(application)
if not service_host:
#TODO(gundlach): These limits were based on limitations of Cloud
#Servers. We should revisit them in Nova.
self.limiter = ratelimiting.Limiter(limits={
'DELETE': (100, ratelimiting.PER_MINUTE),
'PUT': (10, ratelimiting.PER_MINUTE),
'POST': (10, ratelimiting.PER_MINUTE),
'POST servers': (50, ratelimiting.PER_DAY),
'GET changes-since': (3, ratelimiting.PER_MINUTE),
})
else:
self.limiter = ratelimiting.WSGIAppProxy(service_host)
@webob.dec.wsgify
def __call__(self, req):
"""Rate limit the request.
If the request should be rate limited, return a 413 status with a
Retry-After header giving the time when the request would succeed.
"""
username = req.headers['X-Auth-User']
action_name = self.get_action_name(req)
if not action_name: # not rate limited
return self.application
delay = self.get_delay(action_name, username)
if delay:
# TODO(gundlach): Get the retry-after format correct.
raise webob.exc.HTTPRequestEntityTooLarge(headers={
'Retry-After': time.time() + delay})
return self.application
def get_delay(self, action_name, username):
"""Return the delay for the given action and username, or None if
the action would not be rate limited.
"""
if action_name == 'POST servers':
# "POST servers" is a POST, so it counts against "POST" too.
# Attempt the "POST" first, lest we are rate limited by "POST" but
# use up a precious "POST servers" call.
delay = self.limiter.perform("POST", username=username)
if delay:
return delay
return self.limiter.perform(action_name, username=username)
def get_action_name(self, req):
"""Return the action name for this request."""
if req.method == 'GET' and 'changes-since' in req.GET:
return 'GET changes-since'
if req.method == 'POST' and req.path_info.startswith('/servers'):
return 'POST servers'
if req.method in ['PUT', 'POST', 'DELETE']:
return req.method
return None
class APIRouter(wsgi.Router):
"""
Routes requests on the Rackspace API to the appropriate controller

@ -0,0 +1,122 @@
"""Rate limiting of arbitrary actions."""
import httplib
import time
import urllib
import webob.dec
import webob.exc
# Convenience constants for the limits dictionary passed to Limiter().
PER_SECOND = 1
PER_MINUTE = 60
PER_HOUR = 60 * 60
PER_DAY = 60 * 60 * 24
class Limiter(object):
"""Class providing rate limiting of arbitrary actions."""
def __init__(self, limits):
"""Create a rate limiter.
limits: a dict mapping from action name to a tuple. The tuple contains
the number of times the action may be performed, and the time period
(in seconds) during which the number must not be exceeded for this
action. Example: dict(reboot=(10, ratelimiting.PER_MINUTE)) would
allow 10 'reboot' actions per minute.
"""
self.limits = limits
self._levels = {}
def perform(self, action_name, username='nobody'):
"""Attempt to perform an action by the given username.
action_name: the string name of the action to perform. This must
be a key in the limits dict passed to the ctor.
username: an optional string name of the user performing the action.
Each user has her own set of rate limiting counters. Defaults to
'nobody' (so that if you never specify a username when calling
perform(), a single set of counters will be used.)
Return None if the action may proceed. If the action may not proceed
because it has been rate limited, return the float number of seconds
until the action would succeed.
"""
# Think of rate limiting as a bucket leaking water at 1cc/second. The
# bucket can hold as many ccs as there are seconds in the rate
# limiting period (e.g. 3600 for per-hour ratelimits), and if you can
# perform N actions in that time, each action fills the bucket by
# 1/Nth of its volume. You may only perform an action if the bucket
# would not overflow.
now = time.time()
key = '%s:%s' % (username, action_name)
last_time_performed, water_level = self._levels.get(key, (now, 0))
# The bucket leaks 1cc/second.
water_level -= (now - last_time_performed)
if water_level < 0:
water_level = 0
num_allowed_per_period, period_in_secs = self.limits[action_name]
# Fill the bucket by 1/Nth its capacity, and hope it doesn't overflow.
capacity = period_in_secs
new_level = water_level + (capacity * 1.0 / num_allowed_per_period)
if new_level > capacity:
# Delay this many seconds.
return new_level - capacity
self._levels[key] = (now, new_level)
return None
# If one instance of this WSGIApps is unable to handle your load, put a
# sharding app in front that shards by username to one of many backends.
class WSGIApp(object):
"""Application that tracks rate limits in memory. Send requests to it of
this form:
POST /limiter/<username>/<urlencoded action>
and receive a 200 OK, or a 403 Forbidden with an X-Wait-Seconds header
containing the number of seconds to wait before the action would succeed.
"""
def __init__(self, limiter):
"""Create the WSGI application using the given Limiter instance."""
self.limiter = limiter
@webob.dec.wsgify
def __call__(self, req):
parts = req.path_info.split('/')
# format: /limiter/<username>/<urlencoded action>
if req.method != 'POST':
raise webob.exc.HTTPMethodNotAllowed()
if len(parts) != 4 or parts[1] != 'limiter':
raise webob.exc.HTTPNotFound()
username = parts[2]
action_name = urllib.unquote(parts[3])
delay = self.limiter.perform(action_name, username)
if delay:
return webob.exc.HTTPForbidden(
headers={'X-Wait-Seconds': "%.2f" % delay})
else:
return '' # 200 OK
class WSGIAppProxy(object):
"""Limiter lookalike that proxies to a ratelimiting.WSGIApp."""
def __init__(self, service_host):
"""Creates a proxy pointing to a ratelimiting.WSGIApp at the given
host."""
self.service_host = service_host
def perform(self, action, username='nobody'):
conn = httplib.HTTPConnection(self.service_host)
conn.request('POST', '/limiter/%s/%s' % (username, action))
resp = conn.getresponse()
if resp.status == 200:
return None # no delay
return float(resp.getheader('X-Wait-Seconds'))

@ -0,0 +1,237 @@
import httplib
import StringIO
import time
import unittest
import webob
import nova.api.rackspace.ratelimiting as ratelimiting
class LimiterTest(unittest.TestCase):
def setUp(self):
self.limits = {
'a': (5, ratelimiting.PER_SECOND),
'b': (5, ratelimiting.PER_MINUTE),
'c': (5, ratelimiting.PER_HOUR),
'd': (1, ratelimiting.PER_SECOND),
'e': (100, ratelimiting.PER_SECOND)}
self.rl = ratelimiting.Limiter(self.limits)
def exhaust(self, action, times_until_exhausted, **kwargs):
for i in range(times_until_exhausted):
when = self.rl.perform(action, **kwargs)
self.assertEqual(when, None)
num, period = self.limits[action]
delay = period * 1.0 / num
# Verify that we are now thoroughly delayed
for i in range(10):
when = self.rl.perform(action, **kwargs)
self.assertAlmostEqual(when, delay, 2)
def test_second(self):
self.exhaust('a', 5)
time.sleep(0.2)
self.exhaust('a', 1)
time.sleep(1)
self.exhaust('a', 5)
def test_minute(self):
self.exhaust('b', 5)
def test_one_per_period(self):
def allow_once_and_deny_once():
when = self.rl.perform('d')
self.assertEqual(when, None)
when = self.rl.perform('d')
self.assertAlmostEqual(when, 1, 2)
return when
time.sleep(allow_once_and_deny_once())
time.sleep(allow_once_and_deny_once())
allow_once_and_deny_once()
def test_we_can_go_indefinitely_if_we_spread_out_requests(self):
for i in range(200):
when = self.rl.perform('e')
self.assertEqual(when, None)
time.sleep(0.01)
def test_users_get_separate_buckets(self):
self.exhaust('c', 5, username='alice')
self.exhaust('c', 5, username='bob')
self.exhaust('c', 5, username='chuck')
self.exhaust('c', 0, username='chuck')
self.exhaust('c', 0, username='bob')
self.exhaust('c', 0, username='alice')
class FakeLimiter(object):
"""Fake Limiter class that you can tell how to behave."""
def __init__(self, test):
self._action = self._username = self._delay = None
self.test = test
def mock(self, action, username, delay):
self._action = action
self._username = username
self._delay = delay
def perform(self, action, username):
self.test.assertEqual(action, self._action)
self.test.assertEqual(username, self._username)
return self._delay
class WSGIAppTest(unittest.TestCase):
def setUp(self):
self.limiter = FakeLimiter(self)
self.app = ratelimiting.WSGIApp(self.limiter)
def test_invalid_methods(self):
requests = []
for method in ['GET', 'PUT', 'DELETE']:
req = webob.Request.blank('/limits/michael/breakdance',
dict(REQUEST_METHOD=method))
requests.append(req)
for req in requests:
self.assertEqual(req.get_response(self.app).status_int, 405)
def test_invalid_urls(self):
requests = []
for prefix in ['limit', '', 'limiter2', 'limiter/limits', 'limiter/1']:
req = webob.Request.blank('/%s/michael/breakdance' % prefix,
dict(REQUEST_METHOD='POST'))
requests.append(req)
for req in requests:
self.assertEqual(req.get_response(self.app).status_int, 404)
def verify(self, url, username, action, delay=None):
"""Make sure that POSTing to the given url causes the given username
to perform the given action. Make the internal rate limiter return
delay and make sure that the WSGI app returns the correct response.
"""
req = webob.Request.blank(url, dict(REQUEST_METHOD='POST'))
self.limiter.mock(action, username, delay)
resp = req.get_response(self.app)
if not delay:
self.assertEqual(resp.status_int, 200)
else:
self.assertEqual(resp.status_int, 403)
self.assertEqual(resp.headers['X-Wait-Seconds'], "%.2f" % delay)
def test_good_urls(self):
self.verify('/limiter/michael/hoot', 'michael', 'hoot')
def test_escaping(self):
self.verify('/limiter/michael/jump%20up', 'michael', 'jump up')
def test_response_to_delays(self):
self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1)
self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1.56)
self.verify('/limiter/michael/hoot', 'michael', 'hoot', 1000)
class FakeHttplibSocket(object):
"""a fake socket implementation for httplib.HTTPResponse, trivial"""
def __init__(self, response_string):
self._buffer = StringIO.StringIO(response_string)
def makefile(self, _mode, _other):
"""Returns the socket's internal buffer"""
return self._buffer
class FakeHttplibConnection(object):
"""A fake httplib.HTTPConnection
Requests made via this connection actually get translated and routed into
our WSGI app, we then wait for the response and turn it back into
an httplib.HTTPResponse.
"""
def __init__(self, app, host, is_secure=False):
self.app = app
self.host = host
def request(self, method, path, data='', headers={}):
req = webob.Request.blank(path)
req.method = method
req.body = data
req.headers = headers
req.host = self.host
# Call the WSGI app, get the HTTP response
resp = str(req.get_response(self.app))
# For some reason, the response doesn't have "HTTP/1.0 " prepended; I
# guess that's a function the web server usually provides.
resp = "HTTP/1.0 %s" % resp
sock = FakeHttplibSocket(resp)
self.http_response = httplib.HTTPResponse(sock)
self.http_response.begin()
def getresponse(self):
return self.http_response
def wire_HTTPConnection_to_WSGI(host, app):
"""Monkeypatches HTTPConnection so that if you try to connect to host, you
are instead routed straight to the given WSGI app.
After calling this method, when any code calls
httplib.HTTPConnection(host)
the connection object will be a fake. Its requests will be sent directly
to the given WSGI app rather than through a socket.
Code connecting to hosts other than host will not be affected.
This method may be called multiple times to map different hosts to
different apps.
"""
class HTTPConnectionDecorator(object):
"""Wraps the real HTTPConnection class so that when you instantiate
the class you might instead get a fake instance."""
def __init__(self, wrapped):
self.wrapped = wrapped
def __call__(self, connection_host, *args, **kwargs):
if connection_host == host:
return FakeHttplibConnection(app, host)
else:
return self.wrapped(connection_host, *args, **kwargs)
httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
class WSGIAppProxyTest(unittest.TestCase):
def setUp(self):
"""Our WSGIAppProxy is going to call across an HTTPConnection to a
WSGIApp running a limiter. The proxy will send input, and the proxy
should receive that same input, pass it to the limiter who gives a
result, and send the expected result back.
The HTTPConnection isn't real -- it's monkeypatched to point straight
at the WSGIApp. And the limiter isn't real -- it's a fake that
behaves the way we tell it to.
"""
self.limiter = FakeLimiter(self)
app = ratelimiting.WSGIApp(self.limiter)
wire_HTTPConnection_to_WSGI('100.100.100.100:80', app)
self.proxy = ratelimiting.WSGIAppProxy('100.100.100.100:80')
def test_200(self):
self.limiter.mock('conquer', 'caesar', None)
when = self.proxy.perform('conquer', 'caesar')
self.assertEqual(when, None)
def test_403(self):
self.limiter.mock('grumble', 'proletariat', 1.5)
when = self.proxy.perform('grumble', 'proletariat')
self.assertEqual(when, 1.5)
def test_failure(self):
def shouldRaise():
self.limiter.mock('murder', 'brutus', None)
self.proxy.perform('stab', 'brutus')
self.assertRaises(AssertionError, shouldRaise)
if __name__ == '__main__':
unittest.main()

@ -85,7 +85,9 @@ class ComputeManager(manager.Manager):
try:
yield self.driver.spawn(instance_ref)
now = datetime.datetime.utcnow()
self.db.instance_update(None, instance_id, {'launched_at': now})
self.db.instance_update(context,
instance_id,
{'launched_at': now})
except Exception: # pylint: disable-msg=W0702
logging.exception("instance %s: Failed to spawn",
instance_ref['name'])
@ -100,8 +102,8 @@ class ComputeManager(manager.Manager):
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
logging.debug("instance %s: terminating", instance_id)
instance_ref = self.db.instance_get(context, instance_id)
instance_ref = self.db.instance_get(context, instance_id)
if instance_ref['state'] == power_state.SHUTOFF:
self.db.instance_destroy(context, instance_id)
raise exception.Error('trying to destroy already destroyed'
@ -112,8 +114,6 @@ class ComputeManager(manager.Manager):
power_state.NOSTATE,
'shutting_down')
yield self.driver.destroy(instance_ref)
now = datetime.datetime.utcnow()
self.db.instance_update(None, instance_id, {'terminated_at': now})
# TODO(ja): should we keep it in a terminated state for a bit?
self.db.instance_destroy(context, instance_id)
@ -189,7 +189,7 @@ class ComputeManager(manager.Manager):
volume_id)
instance_ref = self.db.instance_get(context, instance_id)
volume_ref = self.db.volume_get(context, volume_id)
self.driver.detach_volume(instance_ref['str_id'],
volume_ref['mountpoint'])
yield self.driver.detach_volume(instance_ref['str_id'],
volume_ref['mountpoint'])
self.db.volume_detached(context, volume_id)
defer.returnValue(True)

@ -51,11 +51,45 @@ class NoMoreNetworks(exception.Error):
###################
def service_destroy(context, instance_id):
"""Destroy the service or raise if it does not exist."""
return IMPL.service_destroy(context, instance_id)
def service_get(context, service_id):
"""Get an service or raise if it does not exist."""
return IMPL.service_get(context, service_id)
def service_get_all_by_topic(context, topic):
"""Get all compute services for a given topic """
return IMPL.service_get_all_by_topic(context, topic)
def service_get_all_compute_sorted(context):
"""Get all compute services sorted by instance count
Returns a list of (Service, instance_count) tuples
"""
return IMPL.service_get_all_compute_sorted(context)
def service_get_all_network_sorted(context):
"""Get all network services sorted by network count
Returns a list of (Service, network_count) tuples
"""
return IMPL.service_get_all_network_sorted(context)
def service_get_all_volume_sorted(context):
"""Get all volume services sorted by volume count
Returns a list of (Service, volume_count) tuples
"""
return IMPL.service_get_all_volume_sorted(context)
def service_get_by_args(context, host, binary):
"""Get the state of an service by node name and binary."""
return IMPL.service_get_by_args(context, host, binary)

@ -47,10 +47,92 @@ def _deleted(context):
###################
def service_destroy(context, service_id):
session = get_session()
with session.begin():
service_ref = models.Service.find(service_id, session=session)
service_ref.delete(session=session)
def service_get(_context, service_id):
return models.Service.find(service_id)
def service_get_all_by_topic(context, topic):
session = get_session()
return session.query(models.Service
).filter_by(deleted=False
).filter_by(topic=topic
).all()
def _service_get_all_topic_subquery(_context, session, topic, subq, label):
sort_value = getattr(subq.c, label)
return session.query(models.Service, func.coalesce(sort_value, 0)
).filter_by(topic=topic
).filter_by(deleted=False
).outerjoin((subq, models.Service.host == subq.c.host)
).order_by(sort_value
).all()
def service_get_all_compute_sorted(context):
session = get_session()
with session.begin():
# NOTE(vish): The intended query is below
# SELECT services.*, COALESCE(inst_cores.instance_cores,
# 0)
# FROM services LEFT OUTER JOIN
# (SELECT host, SUM(instances.vcpus) AS instance_cores
# FROM instances GROUP BY host) AS inst_cores
# ON services.host = inst_cores.host
topic = 'compute'
label = 'instance_cores'
subq = session.query(models.Instance.host,
func.sum(models.Instance.vcpus).label(label)
).filter_by(deleted=False
).group_by(models.Instance.host
).subquery()
return _service_get_all_topic_subquery(context,
session,
topic,
subq,
label)
def service_get_all_network_sorted(context):
session = get_session()
with session.begin():
topic = 'network'
label = 'network_count'
subq = session.query(models.Network.host,
func.count(models.Network.id).label(label)
).filter_by(deleted=False
).group_by(models.Network.host
).subquery()
return _service_get_all_topic_subquery(context,
session,
topic,
subq,
label)
def service_get_all_volume_sorted(context):
session = get_session()
with session.begin():
topic = 'volume'
label = 'volume_gigabytes'
subq = session.query(models.Volume.host,
func.sum(models.Volume.size).label(label)
).filter_by(deleted=False
).group_by(models.Volume.host
).subquery()
return _service_get_all_topic_subquery(context,
session,
topic,
subq,
label)
def service_get_by_args(_context, host, binary):
return models.Service.find_by_args(host, binary)

@ -103,7 +103,7 @@ class NovaBase(object):
def delete(self, session=None):
"""Delete this object"""
self.deleted = True
self.deleted_at = datetime.datetime.now()
self.deleted_at = datetime.datetime.utcnow()
self.save(session=session)
def __setitem__(self, key, value):
@ -244,6 +244,7 @@ class Instance(BASE, NovaBase):
reservation_id = Column(String(255))
mac_address = Column(String(255))
scheduled_at = Column(DateTime)
launched_at = Column(DateTime)
terminated_at = Column(DateTime)
# TODO(vish): see Ewan's email about state improvements, probably
@ -277,6 +278,9 @@ class Volume(BASE, NovaBase):
status = Column(String(255)) # TODO(vish): enum?
attach_status = Column(String(255)) # TODO(vish): enum
scheduled_at = Column(DateTime)
launched_at = Column(DateTime)
terminated_at = Column(DateTime)
class Quota(BASE, NovaBase):
"""Represents quota overrides for a project"""

@ -23,6 +23,7 @@ datastore.
"""
import base64
import datetime
import logging
import os
import time
@ -299,9 +300,11 @@ class CloudController(object):
vol['attach_status'] = "detached"
volume_ref = db.volume_create(context, vol)
rpc.cast(FLAGS.volume_topic, {"method": "create_volume",
"args": {"context": None,
"volume_id": volume_ref['id']}})
rpc.cast(FLAGS.scheduler_topic,
{"method": "create_volume",
"args": {"context": None,
"topic": FLAGS.volume_topic,
"volume_id": volume_ref['id']}})
return {'volumeSet': [self._format_volume(context, volume_ref)]}
@ -310,6 +313,8 @@ class CloudController(object):
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
volume_ref = db.volume_get_by_str(context, volume_id)
# TODO(vish): abstract status checking?
if volume_ref['status'] != "available":
raise exception.ApiError("Volume status must be available")
if volume_ref['attach_status'] == "attached":
raise exception.ApiError("Volume is already attached")
instance_ref = db.instance_get_by_str(context, instance_id)
@ -332,10 +337,10 @@ class CloudController(object):
volume_ref = db.volume_get_by_str(context, volume_id)
instance_ref = db.volume_get_instance(context, volume_ref['id'])
if not instance_ref:
raise exception.Error("Volume isn't attached to anything!")
raise exception.ApiError("Volume isn't attached to anything!")
# TODO(vish): abstract status checking?
if volume_ref['status'] == "available":
raise exception.Error("Volume is already detached")
raise exception.ApiError("Volume is already detached")
try:
host = instance_ref['host']
rpc.cast(db.queue_get_for(context, FLAGS.compute_topic, host),
@ -379,7 +384,7 @@ class CloudController(object):
instances = db.instance_get_by_reservation(context,
reservation_id)
else:
if not context.user.is_admin():
if context.user.is_admin():
instances = db.instance_get_all(context)
else:
instances = db.instance_get_by_project(context,
@ -571,6 +576,7 @@ class CloudController(object):
reservation_id = utils.generate_uid('r')
base_options = {}
base_options['state_description'] = 'scheduling'
base_options['image_id'] = image_id
base_options['kernel_id'] = kernel_id
base_options['ramdisk_id'] = ramdisk_id
@ -609,11 +615,12 @@ class CloudController(object):
"args": {"context": None,
"address": address}})
rpc.cast(FLAGS.compute_topic,
{"method": "run_instance",
"args": {"context": None,
"instance_id": inst_id}})
logging.debug("Casting to node for %s/%s's instance %s" %
rpc.cast(FLAGS.scheduler_topic,
{"method": "run_instance",
"args": {"context": None,
"topic": FLAGS.compute_topic,
"instance_id": inst_id}})
logging.debug("Casting to scheduler for %s/%s's instance %s" %
(context.project.name, context.user.name, inst_id))
defer.returnValue(self._format_run_instances(context,
reservation_id))
@ -632,6 +639,10 @@ class CloudController(object):
% id_str)
continue
now = datetime.datetime.utcnow()
db.instance_update(context,
instance_ref['id'],
{'terminated_at': now})
# FIXME(ja): where should network deallocate occur?
address = db.instance_get_floating_address(context,
instance_ref['id'])
@ -653,7 +664,7 @@ class CloudController(object):
# NOTE(vish): Currently, nothing needs to be done on the
# network node until release. If this changes,
# we will need to cast here.
self.network.deallocate_fixed_ip(context, address)
self.network_manager.deallocate_fixed_ip(context, address)
host = instance_ref['host']
if host:
@ -681,6 +692,10 @@ class CloudController(object):
def delete_volume(self, context, volume_id, **kwargs):
# TODO: return error if not authorized
volume_ref = db.volume_get_by_str(context, volume_id)
if volume_ref['status'] != "available":
raise exception.ApiError("Volume status must be available")
now = datetime.datetime.utcnow()
db.volume_update(context, volume_ref['id'], {'terminated_at': now})
host = volume_ref['host']
rpc.cast(db.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_volume",

@ -171,6 +171,7 @@ DEFINE_string('connection_type', 'libvirt', 'libvirt, xenapi or fake')
DEFINE_integer('s3_port', 3333, 's3 port')
DEFINE_string('s3_host', '127.0.0.1', 's3 host')
DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on')
DEFINE_string('scheduler_topic', 'scheduler', 'the topic scheduler nodes listen on')
DEFINE_string('volume_topic', 'volume', 'the topic volume nodes listen on')
DEFINE_string('network_topic', 'network', 'the topic network nodes listen on')
@ -213,6 +214,8 @@ DEFINE_string('network_manager', 'nova.network.manager.VlanManager',
'Manager for network')
DEFINE_string('volume_manager', 'nova.volume.manager.AOEManager',
'Manager for volume')
DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
'Manager for scheduler')
DEFINE_string('host', socket.gethostname(),
'name of this node')

@ -90,7 +90,7 @@ class NetworkManager(manager.Manager):
network_id = network_ref['id']
host = self.db.network_set_host(context,
network_id,
FLAGS.host)
self.host)
self._on_set_network_host(context, network_id)
return host
@ -118,7 +118,7 @@ class NetworkManager(manager.Manager):
"""Gets an floating ip from the pool"""
# TODO(vish): add floating ips through manage command
return self.db.floating_ip_allocate_address(context,
FLAGS.host,
self.host,
project_id)
def associate_floating_ip(self, context, floating_address, fixed_address):

@ -0,0 +1,25 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Openstack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`nova.scheduler` -- Scheduler Nodes
=====================================================
.. automodule:: nova.scheduler
:platform: Unix
:synopsis: Module that picks a compute node to run a VM instance.
.. moduleauthor:: Chris Behrens <cbehrens@codestud.com>
"""

38
nova/scheduler/chance.py Normal file

@ -0,0 +1,38 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Openstack, LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Chance (Random) Scheduler implementation
"""
import random
from nova.scheduler import driver
class ChanceScheduler(driver.Scheduler):
"""Implements Scheduler as a random node selector."""
def schedule(self, context, topic, *_args, **_kwargs):
"""Picks a host that is up at random."""
hosts = self.hosts_up(context, topic)
if not hosts:
raise driver.NoValidHost("No hosts found")
return hosts[int(random.random() * len(hosts))]

58
nova/scheduler/driver.py Normal file

@ -0,0 +1,58 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Openstack, LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduler base class that all Schedulers should inherit from
"""
import datetime
from nova import db
from nova import exception
from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('service_down_time', 60,
'maximum time since last checkin for up service')
class NoValidHost(exception.Error):
"""There is no valid host for the command."""
pass
class Scheduler(object):
"""The base class that all Scheduler clases should inherit from."""
@staticmethod
def service_is_up(service):
"""Check whether a service is up based on last heartbeat."""
last_heartbeat = service['updated_at'] or service['created_at']
elapsed = datetime.datetime.now() - last_heartbeat
return elapsed < datetime.timedelta(seconds=FLAGS.service_down_time)
def hosts_up(self, context, topic):
"""Return the list of hosts that have a running service for topic."""
services = db.service_get_all_by_topic(context, topic)
return [service.host
for service in services
if self.service_is_up(service)]
def schedule(self, context, topic, *_args, **_kwargs):
"""Must override at least this method for scheduler to work."""
raise NotImplementedError("Must implement a fallback schedule")

66
nova/scheduler/manager.py Normal file

@ -0,0 +1,66 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Openstack, LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduler Service
"""
import logging
import functools
from nova import db
from nova import flags
from nova import manager
from nova import rpc
from nova import utils
FLAGS = flags.FLAGS
flags.DEFINE_string('scheduler_driver',
'nova.scheduler.chance.ChanceScheduler',
'Driver to use for the scheduler')
class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""
def __init__(self, scheduler_driver=None, *args, **kwargs):
if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver
self.driver = utils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
def __getattr__(self, key):
"""Converts all method calls to use the schedule method"""
return functools.partial(self._schedule, key)
def _schedule(self, method, context, topic, *args, **kwargs):
"""Tries to call schedule_* method on the driver to retrieve host.
Falls back to schedule(context, topic) if method doesn't exist.
"""
driver_method = 'schedule_%s' % method
try:
host = getattr(self.driver, driver_method)(context, *args, **kwargs)
except AttributeError:
host = self.driver.schedule(context, topic, *args, **kwargs)
kwargs.update({"context": None})
rpc.cast(db.queue_get_for(context, topic, host),
{"method": method,
"args": kwargs})
logging.debug("Casting to %s %s for %s", topic, host, method)

90
nova/scheduler/simple.py Normal file

@ -0,0 +1,90 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Openstack, LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Simple Scheduler
"""
import datetime
from nova import db
from nova import flags
from nova.scheduler import driver
from nova.scheduler import chance
FLAGS = flags.FLAGS
flags.DEFINE_integer("max_cores", 16,
"maximum number of instance cores to allow per host")
flags.DEFINE_integer("max_gigabytes", 10000,
"maximum number of volume gigabytes to allow per host")
flags.DEFINE_integer("max_networks", 1000,
"maximum number of networks to allow per host")
class SimpleScheduler(chance.ChanceScheduler):
"""Implements Naive Scheduler that tries to find least loaded host."""
def schedule_run_instance(self, context, instance_id, *_args, **_kwargs):
"""Picks a host that is up and has the fewest running instances."""
instance_ref = db.instance_get(context, instance_id)
results = db.service_get_all_compute_sorted(context)
for result in results:
(service, instance_cores) = result
if instance_cores + instance_ref['vcpus'] > FLAGS.max_cores:
raise driver.NoValidHost("All hosts have too many cores")
if self.service_is_up(service):
# NOTE(vish): this probably belongs in the manager, if we
# can generalize this somehow
now = datetime.datetime.utcnow()
db.instance_update(context,
instance_id,
{'host': service['host'],
'scheduled_at': now})
return service['host']
raise driver.NoValidHost("No hosts found")
def schedule_create_volume(self, context, volume_id, *_args, **_kwargs):
"""Picks a host that is up and has the fewest volumes."""
volume_ref = db.volume_get(context, volume_id)
results = db.service_get_all_volume_sorted(context)
for result in results:
(service, volume_gigabytes) = result
if volume_gigabytes + volume_ref['size'] > FLAGS.max_gigabytes:
raise driver.NoValidHost("All hosts have too many gigabytes")
if self.service_is_up(service):
# NOTE(vish): this probably belongs in the manager, if we
# can generalize this somehow
now = datetime.datetime.utcnow()
db.volume_update(context,
volume_id,
{'host': service['host'],
'scheduled_at': now})
return service['host']
raise driver.NoValidHost("No hosts found")
def schedule_set_network_host(self, context, *_args, **_kwargs):
"""Picks a host that is up and has the fewest networks."""
results = db.service_get_all_network_sorted(context)
for result in results:
(service, instance_count) = result
if instance_count >= FLAGS.max_networks:
raise driver.NoValidHost("All hosts have too many networks")
if self.service_is_up(service):
return service['host']
raise driver.NoValidHost("No hosts found")

@ -52,8 +52,9 @@ class Test(unittest.TestCase):
result = webob.Request.blank('/test/cloud').get_response(api.API())
self.assertNotEqual(result.body, "/cloud")
def test_query_api_version(self):
pass
def test_query_api_versions(self):
result = webob.Request.blank('/').get_response(api.API())
self.assertTrue('CURRENT' in result.body)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,79 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from nova.api.rackspace import RateLimitingMiddleware
from nova.tests.api.test_helper import *
from webob import Request
class RateLimitingMiddlewareTest(unittest.TestCase):
def test_get_action_name(self):
middleware = RateLimitingMiddleware(APIStub())
def verify(method, url, action_name):
req = Request.blank(url)
req.method = method
action = middleware.get_action_name(req)
self.assertEqual(action, action_name)
verify('PUT', '/servers/4', 'PUT')
verify('DELETE', '/servers/4', 'DELETE')
verify('POST', '/images/4', 'POST')
verify('POST', '/servers/4', 'POST servers')
verify('GET', '/foo?a=4&changes-since=never&b=5', 'GET changes-since')
verify('GET', '/foo?a=4&monkeys-since=never&b=5', None)
verify('GET', '/servers/4', None)
verify('HEAD', '/servers/4', None)
def exhaust(self, middleware, method, url, username, times):
req = Request.blank(url, dict(REQUEST_METHOD=method),
headers={'X-Auth-User': username})
for i in range(times):
resp = req.get_response(middleware)
self.assertEqual(resp.status_int, 200)
resp = req.get_response(middleware)
self.assertEqual(resp.status_int, 413)
self.assertTrue('Retry-After' in resp.headers)
def test_single_action(self):
middleware = RateLimitingMiddleware(APIStub())
self.exhaust(middleware, 'DELETE', '/servers/4', 'usr1', 100)
self.exhaust(middleware, 'DELETE', '/servers/4', 'usr2', 100)
def test_POST_servers_action_implies_POST_action(self):
middleware = RateLimitingMiddleware(APIStub())
self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10)
self.exhaust(middleware, 'POST', '/images/4', 'usr2', 10)
self.assertTrue(set(middleware.limiter._levels) ==
set(['usr1:POST', 'usr1:POST servers', 'usr2:POST']))
def test_POST_servers_action_correctly_ratelimited(self):
middleware = RateLimitingMiddleware(APIStub())
# Use up all of our "POST" allowance for the minute, 5 times
for i in range(5):
self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10)
# Reset the 'POST' action counter.
del middleware.limiter._levels['usr1:POST']
# All 50 daily "POST servers" actions should be all used up
self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 0)
def test_proxy_ctor_works(self):
middleware = RateLimitingMiddleware(APIStub())
self.assertEqual(middleware.limiter.__class__.__name__, "Limiter")
middleware = RateLimitingMiddleware(APIStub(), service_host='foobar')
self.assertEqual(middleware.limiter.__class__.__name__, "WSGIAppProxy")

@ -84,21 +84,21 @@ class ComputeTestCase(test.TrialTestCase):
@defer.inlineCallbacks
def test_run_terminate_timestamps(self):
"""Make sure it is possible to run and terminate instance"""
"""Make sure timestamps are set for launched and destroyed"""
instance_id = self._create_instance()
instance_ref = db.instance_get(self.context, instance_id)
self.assertEqual(instance_ref['launched_at'], None)
self.assertEqual(instance_ref['terminated_at'], None)
self.assertEqual(instance_ref['deleted_at'], None)
launch = datetime.datetime.utcnow()
yield self.compute.run_instance(self.context, instance_id)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] > launch)
self.assertEqual(instance_ref['terminated_at'], None)
self.assertEqual(instance_ref['deleted_at'], None)
terminate = datetime.datetime.utcnow()
yield self.compute.terminate_instance(self.context, instance_id)
instance_ref = db.instance_get({'deleted': True}, instance_id)
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['terminated_at'] > terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
@defer.inlineCallbacks
def test_reboot(self):

@ -0,0 +1,231 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Scheduler
"""
from nova import db
from nova import flags
from nova import service
from nova import test
from nova import rpc
from nova import utils
from nova.auth import manager as auth_manager
from nova.scheduler import manager
from nova.scheduler import driver
FLAGS = flags.FLAGS
flags.DECLARE('max_cores', 'nova.scheduler.simple')
class TestDriver(driver.Scheduler):
"""Scheduler Driver for Tests"""
def schedule(context, topic, *args, **kwargs):
return 'fallback_host'
def schedule_named_method(context, topic, num):
return 'named_host'
class SchedulerTestCase(test.TrialTestCase):
"""Test case for scheduler"""
def setUp(self): # pylint: disable=C0103
super(SchedulerTestCase, self).setUp()
self.flags(scheduler_driver='nova.tests.scheduler_unittest.TestDriver')
def test_fallback(self):
scheduler = manager.SchedulerManager()
self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True)
rpc.cast('topic.fallback_host',
{'method': 'noexist',
'args': {'context': None,
'num': 7}})
self.mox.ReplayAll()
scheduler.noexist(None, 'topic', num=7)
def test_named_method(self):
scheduler = manager.SchedulerManager()
self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True)
rpc.cast('topic.named_host',
{'method': 'named_method',
'args': {'context': None,
'num': 7}})
self.mox.ReplayAll()
scheduler.named_method(None, 'topic', num=7)
class SimpleDriverTestCase(test.TrialTestCase):
"""Test case for simple driver"""
def setUp(self): # pylint: disable-msg=C0103
super(SimpleDriverTestCase, self).setUp()
self.flags(connection_type='fake',
max_cores=4,
max_gigabytes=4,
volume_driver='nova.volume.driver.FakeAOEDriver',
scheduler_driver='nova.scheduler.simple.SimpleScheduler')
self.scheduler = manager.SchedulerManager()
self.context = None
self.manager = auth_manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake')
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.context = None
def tearDown(self): # pylint: disable-msg=C0103
self.manager.delete_user(self.user)
self.manager.delete_project(self.project)
def _create_instance(self):
"""Create a test instance"""
inst = {}
inst['image_id'] = 'ami-test'
inst['reservation_id'] = 'r-fakeres'
inst['user_id'] = self.user.id
inst['project_id'] = self.project.id
inst['instance_type'] = 'm1.tiny'
inst['mac_address'] = utils.generate_mac()
inst['ami_launch_index'] = 0
inst['vcpus'] = 1
return db.instance_create(self.context, inst)['id']
def _create_volume(self):
"""Create a test volume"""
vol = {}
vol['image_id'] = 'ami-test'
vol['reservation_id'] = 'r-fakeres'
vol['size'] = 1
return db.volume_create(self.context, vol)['id']
def test_hosts_are_up(self):
"""Ensures driver can find the hosts that are up"""
# NOTE(vish): constructing service without create method
# because we are going to use it without queue
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
self.assertEqual(len(hosts), 2)
compute1.kill()
compute2.kill()
def test_least_busy_host_gets_instance(self):
"""Ensures the host with less cores gets the next one"""
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance()
host = self.scheduler.driver.schedule_run_instance(self.context,
instance_id2)
self.assertEqual(host, 'host2')
compute1.terminate_instance(self.context, instance_id1)
db.instance_destroy(self.context, instance_id2)
compute1.kill()
compute2.kill()
def test_too_many_cores(self):
"""Ensures we don't go over max cores"""
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
instance_ids1 = []
instance_ids2 = []
for index in xrange(FLAGS.max_cores):
instance_id = self._create_instance()
compute1.run_instance(self.context, instance_id)
instance_ids1.append(instance_id)
instance_id = self._create_instance()
compute2.run_instance(self.context, instance_id)
instance_ids2.append(instance_id)
instance_id = self._create_instance()
self.assertRaises(driver.NoValidHost,
self.scheduler.driver.schedule_run_instance,
self.context,
instance_id)
for instance_id in instance_ids1:
compute1.terminate_instance(self.context, instance_id)
for instance_id in instance_ids2:
compute2.terminate_instance(self.context, instance_id)
compute1.kill()
compute2.kill()
def test_least_busy_host_gets_volume(self):
"""Ensures the host with less gigabytes gets the next one"""
volume1 = service.Service('host1',
'nova-volume',
'volume',
FLAGS.volume_manager)
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
volume_id1 = self._create_volume()
volume1.create_volume(self.context, volume_id1)
volume_id2 = self._create_volume()
host = self.scheduler.driver.schedule_create_volume(self.context,
volume_id2)
self.assertEqual(host, 'host2')
volume1.delete_volume(self.context, volume_id1)
db.volume_destroy(self.context, volume_id2)
volume1.kill()
volume2.kill()
def test_too_many_gigabytes(self):
"""Ensures we don't go over max gigabytes"""
volume1 = service.Service('host1',
'nova-volume',
'volume',
FLAGS.volume_manager)
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
volume_ids1 = []
volume_ids2 = []
for index in xrange(FLAGS.max_gigabytes):
volume_id = self._create_volume()
volume1.create_volume(self.context, volume_id)
volume_ids1.append(volume_id)
volume_id = self._create_volume()
volume2.create_volume(self.context, volume_id)
volume_ids2.append(volume_id)
volume_id = self._create_volume()
self.assertRaises(driver.NoValidHost,
self.scheduler.driver.schedule_create_volume,
self.context,
volume_id)
for volume_id in volume_ids1:
volume1.delete_volume(self.context, volume_id)
for volume_id in volume_ids2:
volume2.delete_volume(self.context, volume_id)
volume1.kill()
volume2.kill()

@ -39,17 +39,6 @@ from nova.exception import ProcessExecutionError
FLAGS = flags.FLAGS
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
class ProcessExecutionError(IOError):
def __init__( self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
exit_code = '-'
message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
description, cmd, exit_code, stdout, stderr)
IOError.__init__(self, message)
def import_class(import_str):
"""Returns a class from a string including module and class"""
mod_str, _sep, class_str = import_str.rpartition('.')

@ -22,6 +22,7 @@ destroying persistent storage volumes, ala EBS.
"""
import logging
import datetime
from twisted.internet import defer
@ -72,7 +73,7 @@ class AOEManager(manager.Manager):
self.db.volume_update(context,
volume_id,
{'host': FLAGS.host})
{'host': self.host})
size = volume_ref['size']
logging.debug("volume %s: creating lv of size %sG", volume_id, size)
@ -89,14 +90,13 @@ class AOEManager(manager.Manager):
yield self.driver.create_export(volume_ref['str_id'],
shelf_id,
blade_id)
# TODO(joshua): We need to trigger a fanout message
# for aoe-discover on all the nodes
self.db.volume_update(context, volume_id, {'status': 'available'})
logging.debug("volume %s: re-exporting all values", volume_id)
yield self.driver.ensure_exports()
now = datetime.datetime.utcnow()
self.db.volume_update(context, volume_id, {'status': 'available',
'launched_at': now})
logging.debug("volume %s: created successfully", volume_id)
defer.returnValue(volume_id)
@ -107,7 +107,7 @@ class AOEManager(manager.Manager):
volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['attach_status'] == "attached":
raise exception.Error("Volume is still attached")
if volume_ref['host'] != FLAGS.host:
if volume_ref['host'] != self.host:
raise exception.Error("Volume is not local to this node")
shelf_id, blade_id = self.db.volume_get_shelf_and_blade(context,
volume_id)

@ -60,6 +60,7 @@ from nova.tests.objectstore_unittest import *
from nova.tests.process_unittest import *
from nova.tests.quota_unittest import *
from nova.tests.rpc_unittest import *
from nova.tests.scheduler_unittest import *
from nova.tests.service_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.volume_unittest import *

@ -88,6 +88,10 @@ def create_virtualenv(venv=VENV):
def install_dependencies(venv=VENV):
print 'Installing dependencies with pip (this can take a while)...'
# Install greenlet by hand - just listing it in the requires file does not
# get it in stalled in the right order
run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, 'greenlet'],
redirect_output=False)
run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, '-r', PIP_REQUIRES],
redirect_output=False)
run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, TWISTED_NOVA],

@ -7,15 +7,16 @@ amqplib==0.6.1
anyjson==0.2.4
boto==2.0b1
carrot==0.10.5
eventlet==0.9.10
eventlet==0.9.12
lockfile==0.8
python-daemon==1.5.5
python-gflags==1.3
redis==2.0.0
routes==1.12.3
tornado==1.0
webob==0.9.8
WebOb==0.9.8
wsgiref==0.1.2
zope.interface==3.6.1
mox==0.5.0
-f http://pymox.googlecode.com/files/mox-0.5.0.tar.gz
greenlet==0.3.1