Added support of availability zones for compute.

models.Service got additional field availability_zone and was created ZoneScheduler that make decisions based on this field. 
Also replaced fake 'nova' zone in EC2 cloud api.
This commit is contained in:
Ilya Alekseyev
2011-01-12 11:34:16 +00:00
committed by Tarmac
12 changed files with 235 additions and 21 deletions

View File

@@ -15,6 +15,7 @@ Eldar Nugaev <enugaev@griddynamics.com>
Eric Day <eday@oddments.org> Eric Day <eday@oddments.org>
Ewan Mellor <ewan.mellor@citrix.com> Ewan Mellor <ewan.mellor@citrix.com>
Hisaki Ohara <hisaki.ohara@intel.com> Hisaki Ohara <hisaki.ohara@intel.com>
Ilya Alekseyev <ialekseev@griddynamics.com>
Jay Pipes <jaypipes@gmail.com> Jay Pipes <jaypipes@gmail.com>
Jesse Andrews <anotherjesse@gmail.com> Jesse Andrews <anotherjesse@gmail.com>
Joe Heck <heckj@mac.com> Joe Heck <heckj@mac.com>

View File

@@ -132,6 +132,21 @@ class CloudController(object):
result[key] = [line] result[key] = [line]
return result return result
def _trigger_refresh_security_group(self, context, security_group):
nodes = set([instance['host'] for instance in security_group.instances
if instance['host'] is not None])
for node in nodes:
rpc.cast(context,
'%s.%s' % (FLAGS.compute_topic, node),
{"method": "refresh_security_group",
"args": {"security_group_id": security_group.id}})
def _get_availability_zone_by_host(self, context, host):
services = db.service_get_all_by_host(context, host)
if len(services) > 0:
return services[0]['availability_zone']
return 'unknown zone'
def get_metadata(self, address): def get_metadata(self, address):
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
instance_ref = self.compute_api.get_all(ctxt, fixed_ip=address) instance_ref = self.compute_api.get_all(ctxt, fixed_ip=address)
@@ -144,6 +159,8 @@ class CloudController(object):
else: else:
keys = '' keys = ''
hostname = instance_ref['hostname'] hostname = instance_ref['hostname']
host = instance_ref['host']
availability_zone = self._get_availability_zone_by_host(ctxt, host)
floating_ip = db.instance_get_floating_address(ctxt, floating_ip = db.instance_get_floating_address(ctxt,
instance_ref['id']) instance_ref['id'])
ec2_id = id_to_ec2_id(instance_ref['id']) ec2_id = id_to_ec2_id(instance_ref['id'])
@@ -166,8 +183,7 @@ class CloudController(object):
'local-hostname': hostname, 'local-hostname': hostname,
'local-ipv4': address, 'local-ipv4': address,
'kernel-id': instance_ref['kernel_id'], 'kernel-id': instance_ref['kernel_id'],
# TODO(vish): real zone 'placement': {'availability-zone': availability_zone},
'placement': {'availability-zone': 'nova'},
'public-hostname': hostname, 'public-hostname': hostname,
'public-ipv4': floating_ip or '', 'public-ipv4': floating_ip or '',
'public-keys': keys, 'public-keys': keys,
@@ -191,8 +207,26 @@ class CloudController(object):
return self._describe_availability_zones(context, **kwargs) return self._describe_availability_zones(context, **kwargs)
def _describe_availability_zones(self, context, **kwargs): def _describe_availability_zones(self, context, **kwargs):
return {'availabilityZoneInfo': [{'zoneName': 'nova', enabled_services = db.service_get_all(context)
'zoneState': 'available'}]} disabled_services = db.service_get_all(context, True)
available_zones = []
for zone in [service.availability_zone for service
in enabled_services]:
if not zone in available_zones:
available_zones.append(zone)
not_available_zones = []
for zone in [service.availability_zone for service in disabled_services
if not service['availability_zone'] in available_zones]:
if not zone in not_available_zones:
not_available_zones.append(zone)
result = []
for zone in available_zones:
result.append({'zoneName': zone,
'zoneState': "available"})
for zone in not_available_zones:
result.append({'zoneName': zone,
'zoneState': "not available"})
return {'availabilityZoneInfo': result}
def _describe_availability_zones_verbose(self, context, **kwargs): def _describe_availability_zones_verbose(self, context, **kwargs):
rv = {'availabilityZoneInfo': [{'zoneName': 'nova', rv = {'availabilityZoneInfo': [{'zoneName': 'nova',
@@ -654,6 +688,9 @@ class CloudController(object):
i['amiLaunchIndex'] = instance['launch_index'] i['amiLaunchIndex'] = instance['launch_index']
i['displayName'] = instance['display_name'] i['displayName'] = instance['display_name']
i['displayDescription'] = instance['display_description'] i['displayDescription'] = instance['display_description']
host = instance['host']
zone = self._get_availability_zone_by_host(context, host)
i['placement'] = {'availabilityZone': zone}
if instance['reservation_id'] not in reservations: if instance['reservation_id'] not in reservations:
r = {} r = {}
r['reservationId'] = instance['reservation_id'] r['reservationId'] = instance['reservation_id']

View File

@@ -186,7 +186,8 @@ class API(base.Base):
FLAGS.scheduler_topic, FLAGS.scheduler_topic,
{"method": "run_instance", {"method": "run_instance",
"args": {"topic": FLAGS.compute_topic, "args": {"topic": FLAGS.compute_topic,
"instance_id": instance_id}}) "instance_id": instance_id,
"availability_zone": availability_zone}})
for group_id in security_groups: for group_id in security_groups:
self.trigger_security_group_members_refresh(elevated, group_id) self.trigger_security_group_members_refresh(elevated, group_id)

View File

@@ -81,16 +81,21 @@ def service_get(context, service_id):
return IMPL.service_get(context, service_id) return IMPL.service_get(context, service_id)
def service_get_all(context): def service_get_all(context, disabled=False):
"""Get a list of all services on any machine on any topic of any type""" """Get all service."""
return IMPL.service_get_all(context) return IMPL.service_get_all(context, None, disabled)
def service_get_all_by_topic(context, topic): def service_get_all_by_topic(context, topic):
"""Get all compute services for a given topic.""" """Get all services for a given topic."""
return IMPL.service_get_all_by_topic(context, topic) return IMPL.service_get_all_by_topic(context, topic)
def service_get_all_by_host(context, host):
"""Get all services for a given host."""
return IMPL.service_get_all_by_host(context, host)
def service_get_all_compute_sorted(context): def service_get_all_compute_sorted(context):
"""Get all compute services sorted by instance count. """Get all compute services sorted by instance count.

View File

@@ -135,14 +135,14 @@ def service_get(context, service_id, session=None):
@require_admin_context @require_admin_context
def service_get_all(context, session=None): def service_get_all(context, session=None, disabled=False):
if not session: if not session:
session = get_session() session = get_session()
result = session.query(models.Service).\ result = session.query(models.Service).\
filter_by(deleted=can_read_deleted(context)).\ filter_by(deleted=can_read_deleted(context)).\
all() filter_by(disabled=disabled).\
all()
return result return result
@@ -156,6 +156,15 @@ def service_get_all_by_topic(context, topic):
all() all()
@require_admin_context
def service_get_all_by_host(context, host):
session = get_session()
return session.query(models.Service).\
filter_by(deleted=False).\
filter_by(host=host).\
all()
@require_admin_context @require_admin_context
def _service_get_all_topic_subquery(context, session, topic, subq, label): def _service_get_all_topic_subquery(context, session, topic, subq, label):
sort_value = getattr(subq.c, label) sort_value = getattr(subq.c, label)

View File

@@ -149,6 +149,7 @@ class Service(BASE, NovaBase):
topic = Column(String(255)) topic = Column(String(255))
report_count = Column(Integer, nullable=False, default=0) report_count = Column(Integer, nullable=False, default=0)
disabled = Column(Boolean, default=False) disabled = Column(Boolean, default=False)
availability_zone = Column(String(255), default='nova')
class Certificate(BASE, NovaBase): class Certificate(BASE, NovaBase):

View File

@@ -308,6 +308,5 @@ DEFINE_string('image_service', 'nova.image.s3.S3ImageService',
DEFINE_string('host', socket.gethostname(), DEFINE_string('host', socket.gethostname(),
'name of this node') 'name of this node')
# UNUSED
DEFINE_string('node_availability_zone', 'nova', DEFINE_string('node_availability_zone', 'nova',
'availability zone of this node') 'availability zone of this node')

56
nova/scheduler/zone.py Normal file
View File

@@ -0,0 +1,56 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Openstack, LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Availability Zone Scheduler implementation
"""
import random
from nova.scheduler import driver
from nova import db
class ZoneScheduler(driver.Scheduler):
"""Implements Scheduler as a random node selector."""
def hosts_up_with_zone(self, context, topic, zone):
"""Return the list of hosts that have a running service
for topic and availability zone (if defined).
"""
if zone is None:
return self.hosts_up(context, topic)
services = db.service_get_all_by_topic(context, topic)
return [service.host
for service in services
if self.service_is_up(service)
and service.availability_zone == zone]
def schedule(self, context, topic, *_args, **_kwargs):
"""Picks a host that is up at random in selected
availability zone (if defined).
"""
zone = _kwargs.get('availability_zone')
hosts = self.hosts_up_with_zone(context, topic, zone)
if not hosts:
raise driver.NoValidHost(_("No hosts found"))
return hosts[int(random.random() * len(hosts))]

View File

@@ -113,11 +113,13 @@ class Service(object):
self.timers.append(periodic) self.timers.append(periodic)
def _create_service_ref(self, context): def _create_service_ref(self, context):
zone = FLAGS.node_availability_zone
service_ref = db.service_create(context, service_ref = db.service_create(context,
{'host': self.host, {'host': self.host,
'binary': self.binary, 'binary': self.binary,
'topic': self.topic, 'topic': self.topic,
'report_count': 0}) 'report_count': 0,
'availability_zone': zone})
self.service_id = service_ref['id'] self.service_id = service_ref['id']
def __getattr__(self, key): def __getattr__(self, key):

View File

@@ -133,10 +133,35 @@ class CloudTestCase(test.TestCase):
db.volume_destroy(self.context, vol1['id']) db.volume_destroy(self.context, vol1['id'])
db.volume_destroy(self.context, vol2['id']) db.volume_destroy(self.context, vol2['id'])
def test_describe_availability_zones(self):
"""Makes sure describe_availability_zones works and filters results."""
service1 = db.service_create(self.context, {'host': 'host1_zones',
'binary': "nova-compute",
'topic': 'compute',
'report_count': 0,
'availability_zone': "zone1"})
service2 = db.service_create(self.context, {'host': 'host2_zones',
'binary': "nova-compute",
'topic': 'compute',
'report_count': 0,
'availability_zone': "zone2"})
result = self.cloud.describe_availability_zones(self.context)
self.assertEqual(len(result['availabilityZoneInfo']), 3)
db.service_destroy(self.context, service1['id'])
db.service_destroy(self.context, service2['id'])
def test_describe_instances(self): def test_describe_instances(self):
"""Makes sure describe_instances works and filters results.""" """Makes sure describe_instances works and filters results."""
inst1 = db.instance_create(self.context, {'reservation_id': 'a'}) inst1 = db.instance_create(self.context, {'reservation_id': 'a',
inst2 = db.instance_create(self.context, {'reservation_id': 'a'}) 'host': 'host1'})
inst2 = db.instance_create(self.context, {'reservation_id': 'a',
'host': 'host2'})
comp1 = db.service_create(self.context, {'host': 'host1',
'availability_zone': 'zone1',
'topic': "compute"})
comp2 = db.service_create(self.context, {'host': 'host2',
'availability_zone': 'zone2',
'topic': "compute"})
result = self.cloud.describe_instances(self.context) result = self.cloud.describe_instances(self.context)
result = result['reservationSet'][0] result = result['reservationSet'][0]
self.assertEqual(len(result['instancesSet']), 2) self.assertEqual(len(result['instancesSet']), 2)
@@ -147,8 +172,12 @@ class CloudTestCase(test.TestCase):
self.assertEqual(len(result['instancesSet']), 1) self.assertEqual(len(result['instancesSet']), 1)
self.assertEqual(result['instancesSet'][0]['instanceId'], self.assertEqual(result['instancesSet'][0]['instanceId'],
instance_id) instance_id)
self.assertEqual(result['instancesSet'][0]
['placement']['availabilityZone'], 'zone2')
db.instance_destroy(self.context, inst1['id']) db.instance_destroy(self.context, inst1['id'])
db.instance_destroy(self.context, inst2['id']) db.instance_destroy(self.context, inst2['id'])
db.service_destroy(self.context, comp1['id'])
db.service_destroy(self.context, comp2['id'])
def test_console_output(self): def test_console_output(self):
image_id = FLAGS.default_image image_id = FLAGS.default_image
@@ -241,6 +270,19 @@ class CloudTestCase(test.TestCase):
LOG.debug(_("Terminating instance %s"), instance_id) LOG.debug(_("Terminating instance %s"), instance_id)
rv = self.compute.terminate_instance(instance_id) rv = self.compute.terminate_instance(instance_id)
def test_describe_instances(self):
"""Makes sure describe_instances works."""
instance1 = db.instance_create(self.context, {'host': 'host2'})
comp1 = db.service_create(self.context, {'host': 'host2',
'availability_zone': 'zone1',
'topic': "compute"})
result = self.cloud.describe_instances(self.context)
self.assertEqual(result['reservationSet'][0]
['instancesSet'][0]
['placement']['availabilityZone'], 'zone1')
db.instance_destroy(self.context, instance1['id'])
db.service_destroy(self.context, comp1['id'])
def test_instance_update_state(self): def test_instance_update_state(self):
def instance(num): def instance(num):
return { return {

View File

@@ -21,6 +21,7 @@ Tests For Scheduler
import datetime import datetime
from mox import IgnoreArg
from nova import context from nova import context
from nova import db from nova import db
from nova import flags from nova import flags
@@ -76,6 +77,59 @@ class SchedulerTestCase(test.TestCase):
scheduler.named_method(ctxt, 'topic', num=7) scheduler.named_method(ctxt, 'topic', num=7)
class ZoneSchedulerTestCase(test.TestCase):
"""Test case for zone scheduler"""
def setUp(self):
super(ZoneSchedulerTestCase, self).setUp()
self.flags(scheduler_driver='nova.scheduler.zone.ZoneScheduler')
def _create_service_model(self, **kwargs):
service = db.sqlalchemy.models.Service()
service.host = kwargs['host']
service.disabled = False
service.deleted = False
service.report_count = 0
service.binary = 'nova-compute'
service.topic = 'compute'
service.id = kwargs['id']
service.availability_zone = kwargs['zone']
service.created_at = datetime.datetime.utcnow()
return service
def test_with_two_zones(self):
scheduler = manager.SchedulerManager()
ctxt = context.get_admin_context()
service_list = [self._create_service_model(id=1,
host='host1',
zone='zone1'),
self._create_service_model(id=2,
host='host2',
zone='zone2'),
self._create_service_model(id=3,
host='host3',
zone='zone2'),
self._create_service_model(id=4,
host='host4',
zone='zone2'),
self._create_service_model(id=5,
host='host5',
zone='zone2')]
self.mox.StubOutWithMock(db, 'service_get_all_by_topic')
arg = IgnoreArg()
db.service_get_all_by_topic(arg, arg).AndReturn(service_list)
self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True)
rpc.cast(ctxt,
'compute.host1',
{'method': 'run_instance',
'args': {'instance_id': 'i-ffffffff',
'availability_zone': 'zone1'}})
self.mox.ReplayAll()
scheduler.run_instance(ctxt,
'compute',
instance_id='i-ffffffff',
availability_zone='zone1')
class SimpleDriverTestCase(test.TestCase): class SimpleDriverTestCase(test.TestCase):
"""Test case for simple driver""" """Test case for simple driver"""
def setUp(self): def setUp(self):

View File

@@ -133,7 +133,8 @@ class ServiceTestCase(test.TestCase):
service_create = {'host': host, service_create = {'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'report_count': 0} 'report_count': 0,
'availability_zone': 'nova'}
service_ref = {'host': host, service_ref = {'host': host,
'binary': binary, 'binary': binary,
'report_count': 0, 'report_count': 0,
@@ -161,11 +162,13 @@ class ServiceTestCase(test.TestCase):
service_create = {'host': host, service_create = {'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'report_count': 0} 'report_count': 0,
'availability_zone': 'nova'}
service_ref = {'host': host, service_ref = {'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'report_count': 0, 'report_count': 0,
'availability_zone': 'nova',
'id': 1} 'id': 1}
service.db.service_get_by_args(mox.IgnoreArg(), service.db.service_get_by_args(mox.IgnoreArg(),
@@ -193,11 +196,13 @@ class ServiceTestCase(test.TestCase):
service_create = {'host': host, service_create = {'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'report_count': 0} 'report_count': 0,
'availability_zone': 'nova'}
service_ref = {'host': host, service_ref = {'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'report_count': 0, 'report_count': 0,
'availability_zone': 'nova',
'id': 1} 'id': 1}
service.db.service_get_by_args(mox.IgnoreArg(), service.db.service_get_by_args(mox.IgnoreArg(),
@@ -224,11 +229,13 @@ class ServiceTestCase(test.TestCase):
service_create = {'host': host, service_create = {'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'report_count': 0} 'report_count': 0,
'availability_zone': 'nova'}
service_ref = {'host': host, service_ref = {'host': host,
'binary': binary, 'binary': binary,
'topic': topic, 'topic': topic,
'report_count': 0, 'report_count': 0,
'availability_zone': 'nova',
'id': 1} 'id': 1}
service.db.service_get_by_args(mox.IgnoreArg(), service.db.service_get_by_args(mox.IgnoreArg(),