scheduler host_manager needs service for filters

distributed scheduler isn't checking service_is_up or
services['disabled'] due to filters not having access to service.

Fixed both.  Since ec2 API also uses service_down_time, I moved
service_is_up() into utils and made ec2 use it.

Change-Id: I0321844a47031b2de4d8738e032a4634edd1e945
This commit is contained in:
Chris Behrens 2012-01-19 21:36:42 -08:00
parent 64341eedf9
commit c56630c421
13 changed files with 158 additions and 57 deletions

View File

@ -50,7 +50,6 @@ from nova import volume
FLAGS = flags.FLAGS
flags.DECLARE('dhcp_domain', 'nova.network.manager')
flags.DECLARE('service_down_time', 'nova.scheduler.driver')
LOG = logging.getLogger("nova.api.ec2.cloud")
@ -290,8 +289,7 @@ class CloudController(object):
hsvcs = [service for service in services \
if service['host'] == host]
for svc in hsvcs:
delta = now - (svc['updated_at'] or svc['created_at'])
alive = (delta.seconds <= FLAGS.service_down_time)
alive = utils.service_is_up(svc)
art = (alive and ":-)") or "XXX"
active = 'enabled'
if svc['disabled']:

View File

@ -464,3 +464,6 @@ DEFINE_integer('zombie_instance_updated_at_window', 172800,
'being cleaned up.')
DEFINE_boolean('allow_ec2_admin_api', False, 'Enable/Disable EC2 Admin API')
DEFINE_integer('service_down_time', 60,
'maximum time since last check-in for up service')

View File

@ -37,8 +37,6 @@ from nova import utils
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.scheduler.driver')
flags.DEFINE_integer('service_down_time', 60,
'maximum time since last check-in for up service')
flags.DEFINE_string('scheduler_host_manager',
'nova.scheduler.host_manager.HostManager',
'The scheduler host manager class to use')
@ -159,21 +157,13 @@ class Scheduler(object):
"""Poll child zones periodically to get status."""
return self.zone_manager.update(context)
@staticmethod
def service_is_up(service):
"""Check whether a service is up based on last heartbeat."""
last_heartbeat = service['updated_at'] or service['created_at']
# Timestamps in DB are UTC.
elapsed = utils.total_seconds(utils.utcnow() - last_heartbeat)
return abs(elapsed) <= FLAGS.service_down_time
def hosts_up(self, context, topic):
"""Return the list of hosts that have a running service for topic."""
services = db.service_get_all_by_topic(context, topic)
return [service['host']
for service in services
if self.service_is_up(service)]
if utils.service_is_up(service)]
def create_instance_db_entry(self, context, request_spec):
"""Create instance DB entry based on request_spec"""
@ -267,7 +257,7 @@ class Scheduler(object):
# to the instance.
if len(instance_ref['volumes']) != 0:
services = db.service_get_all_by_topic(context, 'volume')
if len(services) < 1 or not self.service_is_up(services[0]):
if len(services) < 1 or not utils.service_is_up(services[0]):
raise exception.VolumeServiceUnavailable()
# Checking src host exists and compute node
@ -275,7 +265,7 @@ class Scheduler(object):
services = db.service_get_all_compute_by_host(context, src)
# Checking src host is alive.
if not self.service_is_up(services[0]):
if not utils.service_is_up(services[0]):
raise exception.ComputeServiceUnavailable(host=src)
def _live_migration_dest_check(self, context, instance_ref, dest,
@ -295,7 +285,7 @@ class Scheduler(object):
dservice_ref = dservice_refs[0]
# Checking dest host is alive.
if not self.service_is_up(dservice_ref):
if not utils.service_is_up(dservice_ref):
raise exception.ComputeServiceUnavailable(host=dest)
# Checking whether The host where instance is running

View File

@ -15,6 +15,7 @@
from nova import log as logging
from nova.scheduler.filters import abstract_filter
from nova import utils
LOG = logging.getLogger('nova.scheduler.filter.compute_filter')
@ -48,8 +49,11 @@ class ComputeFilter(abstract_filter.AbstractHostFilter):
instance_type = filter_properties.get('instance_type')
if host_state.topic != 'compute' or not instance_type:
return True
capabilities = host_state.capabilities or {}
capabilities = host_state.capabilities
service = host_state.service
if not utils.service_is_up(service) or service['disabled']:
return False
if not self._basic_ram_filter(host_state, instance_type):
return False
if not capabilities.get("enabled", True):

View File

@ -128,14 +128,13 @@ class JsonFilter(abstract_filter.AbstractHostFilter):
"""Return a list of hosts that can fulfill the requirements
specified in the query.
"""
capabilities = host_state.capabilities or {}
if not capabilities.get("enabled", True):
return False
query = filter_properties.get('query', None)
if not query:
return True
# NOTE(comstud): Not checking capabilities or service for
# enabled/disabled so that a provided json filter can decide
result = self._process_filter(json.loads(query), host_state)
if isinstance(result, list):
# If any succeeded, include the host

View File

@ -77,7 +77,7 @@ class HostState(object):
previously used and lock down access.
"""
def __init__(self, host, topic, capabilities=None):
def __init__(self, host, topic, capabilities=None, service=None):
self.host = host
self.topic = topic
@ -86,6 +86,9 @@ class HostState(object):
if capabilities is None:
capabilities = {}
self.capabilities = ReadOnlyDict(capabilities.get(topic, None))
if service is None:
service = {}
self.service = ReadOnlyDict(service)
# Mutable available resources.
# These will change as resources are virtually "consumed".
self.free_ram_mb = 0
@ -293,7 +296,8 @@ class HostManager(object):
host = service['host']
capabilities = self.service_states.get(host, None)
host_state = self.host_state_cls(host, topic,
capabilities=capabilities)
capabilities=capabilities,
service=dict(service.iteritems()))
host_state.update_from_compute_node(compute)
host_state_map[host] = host_state

View File

@ -26,6 +26,7 @@ from nova import flags
from nova import exception
from nova.scheduler import driver
from nova.scheduler import chance
from nova import utils
FLAGS = flags.FLAGS
flags.DEFINE_integer("max_cores", 16,
@ -57,7 +58,7 @@ class SimpleScheduler(chance.ChanceScheduler):
if host and context.is_admin:
service = db.service_get_by_args(elevated, host, 'nova-compute')
if not self.service_is_up(service):
if not utils.service_is_up(service):
raise exception.WillNotSchedule(host=host)
return host
@ -79,7 +80,7 @@ class SimpleScheduler(chance.ChanceScheduler):
instance_cores + instance_opts['vcpus'] > FLAGS.max_cores:
msg = _("Not enough allocatable CPU cores remaining")
raise exception.NoValidHost(reason=msg)
if self.service_is_up(service):
if utils.service_is_up(service) and not service['disabled']:
return service['host']
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)
@ -120,7 +121,7 @@ class SimpleScheduler(chance.ChanceScheduler):
zone, _x, host = availability_zone.partition(':')
if host and context.is_admin:
service = db.service_get_by_args(elevated, host, 'nova-volume')
if not self.service_is_up(service):
if not utils.service_is_up(service):
raise exception.WillNotSchedule(host=host)
driver.cast_to_volume_host(context, host, 'create_volume',
volume_id=volume_id, **_kwargs)
@ -135,7 +136,7 @@ class SimpleScheduler(chance.ChanceScheduler):
if volume_gigabytes + volume_ref['size'] > FLAGS.max_gigabytes:
msg = _("Not enough allocatable volume gigabytes remaining")
raise exception.NoValidHost(reason=msg)
if self.service_is_up(service):
if utils.service_is_up(service) and not service['disabled']:
driver.cast_to_volume_host(context, service['host'],
'create_volume', volume_id=volume_id, **_kwargs)
return None

View File

@ -215,7 +215,7 @@ class VsaScheduler(simple.SimpleScheduler):
zone, _x, host = availability_zone.partition(':')
service = db.service_get_by_args(context.elevated(), host,
'nova-volume')
if not self.service_is_up(service):
if service['disabled'] or not utils.service_is_up(service):
raise exception.WillNotSchedule(host=host)
return host

View File

@ -23,10 +23,14 @@ from nova.scheduler import zone_manager
COMPUTE_NODES = [
dict(id=1, local_gb=1024, memory_mb=1024, service=dict(host='host1')),
dict(id=2, local_gb=2048, memory_mb=2048, service=dict(host='host2')),
dict(id=3, local_gb=4096, memory_mb=4096, service=dict(host='host3')),
dict(id=4, local_gb=8192, memory_mb=8192, service=dict(host='host4')),
dict(id=1, local_gb=1024, memory_mb=1024,
service=dict(host='host1', disabled=False)),
dict(id=2, local_gb=2048, memory_mb=2048,
service=dict(host='host2', disabled=True)),
dict(id=3, local_gb=4096, memory_mb=4096,
service=dict(host='host3', disabled=False)),
dict(id=4, local_gb=8192, memory_mb=8192,
service=dict(host='host4', disabled=False)),
# Broken entry
dict(id=5, local_gb=1024, memory_mb=1024, service=None),
]

View File

@ -1,5 +1,4 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
# Copyright 2011 OpenStack LLC. # All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -21,6 +20,7 @@ import json
from nova.scheduler import filters
from nova import test
from nova.tests.scheduler import fakes
from nova import utils
class HostFiltersTestCase(test.TestCase):
@ -37,64 +37,102 @@ class HostFiltersTestCase(test.TestCase):
host = fakes.FakeHostState('host1', 'compute', {})
self.assertTrue(filt_cls.host_passes(host, {}))
def _stub_service_is_up(self, ret_value):
def fake_service_is_up(service):
return ret_value
self.stubs.Set(utils, 'service_is_up', fake_service_is_up)
def test_compute_filter_passes(self):
self._stub_service_is_up(True)
filt_cls = filters.ComputeFilter()
filter_properties = {'instance_type': {'memory_mb': 1024}}
capabilities = {'enabled': True}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
{'free_ram_mb': 1024, 'capabilities': capabilities,
'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_fails_on_memory(self):
self._stub_service_is_up(True)
filt_cls = filters.ComputeFilter()
filter_properties = {'instance_type': {'memory_mb': 1024}}
capabilities = {'enabled': True}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1023, 'capabilities': capabilities})
{'free_ram_mb': 1023, 'capabilities': capabilities,
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_fails_on_disabled(self):
def test_compute_filter_fails_on_service_disabled(self):
self._stub_service_is_up(True)
filt_cls = filters.ComputeFilter()
filter_properties = {'instance_type': {'memory_mb': 1024}}
capabilities = {'enabled': False}
capabilities = {'enabled': True}
service = {'disabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
{'free_ram_mb': 1024, 'capabilities': capabilities,
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_fails_on_service_down(self):
self._stub_service_is_up(False)
filt_cls = filters.ComputeFilter()
filter_properties = {'instance_type': {'memory_mb': 1024}}
capabilities = {'enabled': True}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities,
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_passes_on_volume(self):
self._stub_service_is_up(True)
filt_cls = filters.ComputeFilter()
filter_properties = {'instance_type': {'memory_mb': 1024}}
capabilities = {'enabled': False}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'volume',
{'free_ram_mb': 1024, 'capabilities': capabilities})
{'free_ram_mb': 1024, 'capabilities': capabilities,
'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_passes_on_no_instance_type(self):
self._stub_service_is_up(True)
filt_cls = filters.ComputeFilter()
filter_properties = {}
capabilities = {'enabled': False}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
{'free_ram_mb': 1024, 'capabilities': capabilities,
'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_passes_extra_specs(self):
self._stub_service_is_up(True)
filt_cls = filters.ComputeFilter()
extra_specs = {'opt1': 1, 'opt2': 2}
capabilities = {'enabled': True, 'opt1': 1, 'opt2': 2}
service = {'disabled': False}
filter_properties = {'instance_type': {'memory_mb': 1024,
'extra_specs': extra_specs}}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
{'free_ram_mb': 1024, 'capabilities': capabilities,
'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_fails_extra_specs(self):
self._stub_service_is_up(True)
filt_cls = filters.ComputeFilter()
extra_specs = {'opt1': 1, 'opt2': 3}
capabilities = {'enabled': True, 'opt1': 1, 'opt2': 2}
service = {'disabled': False}
filter_properties = {'instance_type': {'memory_mb': 1024,
'extra_specs': extra_specs}}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
{'free_ram_mb': 1024, 'capabilities': capabilities,
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_passes(self):
@ -156,11 +194,15 @@ class HostFiltersTestCase(test.TestCase):
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_fails_on_disabled(self):
def test_json_filter_fails_on_caps_disabled(self):
filt_cls = filters.JsonFilter()
json_query = json.dumps(
['and', ['>=', '$free_ram_mb', 1024],
['>=', '$free_disk_mb', 200 * 1024],
'$capabilities.enabled'])
filter_properties = {'instance_type': {'memory_mb': 1024,
'local_gb': 200},
'query': self.json_query}
'query': json_query}
capabilities = {'enabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024,
@ -168,10 +210,40 @@ class HostFiltersTestCase(test.TestCase):
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_fails_on_service_disabled(self):
filt_cls = filters.JsonFilter()
json_query = json.dumps(
['and', ['>=', '$free_ram_mb', 1024],
['>=', '$free_disk_mb', 200 * 1024],
['not', '$service.disabled']])
filter_properties = {'instance_type': {'memory_mb': 1024,
'local_gb': 200},
'query': json_query}
capabilities = {'enabled': True}
service = {'disabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024,
'free_disk_mb': 200 * 1024,
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_passes(self):
filt_cls = filters.JsonFilter()
filter_properties = {'instance_type': {'memory_mb': 1024,
'local_gb': 200},
'query': self.json_query}
capabilities = {'enabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024,
'free_disk_mb': 200 * 1024,
'capabilities': capabilities})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_json_filter_happy_day(self):
"""Test json filter more thoroughly"""
filt_cls = filters.JsonFilter()
raw = ['and',
'$capabilities.enabled',
['=', '$capabilities.opt1', 'match'],
['or',
['and',
@ -184,50 +256,62 @@ class HostFiltersTestCase(test.TestCase):
# Passes
capabilities = {'enabled': True, 'opt1': 'match'}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 10,
'free_disk_mb': 200,
'capabilities': capabilities})
'capabilities': capabilities,
'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
# Passes
capabilities = {'enabled': True, 'opt1': 'match'}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 40,
'free_disk_mb': 400,
'capabilities': capabilities})
'capabilities': capabilities,
'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
# Failes due to disabled
# Failes due to caps disabled
capabilities = {'enabled': False, 'opt1': 'match'}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'instance_type',
{'free_ram_mb': 40,
'free_disk_mb': 400,
'capabilities': capabilities})
'capabilities': capabilities,
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
# Fails due to being exact memory/disk we don't want
capabilities = {'enabled': True, 'opt1': 'match'}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 30,
'free_disk_mb': 300,
'capabilities': capabilities})
'capabilities': capabilities,
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
# Fails due to memory lower but disk higher
capabilities = {'enabled': True, 'opt1': 'match'}
service = {'disabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 20,
'free_disk_mb': 400,
'capabilities': capabilities})
'capabilities': capabilities,
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
# Fails due to capabilities 'opt1' not equal
capabilities = {'enabled': True, 'opt1': 'no-match'}
service = {'enabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 20,
'free_disk_mb': 400,
'capabilities': capabilities})
'capabilities': capabilities,
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_basic_operators(self):

View File

@ -264,6 +264,12 @@ class HostManagerTestCase(test.TestCase):
self.mox.VerifyAll()
self.assertEqual(len(host_states), 4)
# Check that .service is set properly
for i in xrange(4):
compute_node = fakes.COMPUTE_NODES[i]
host = compute_node['service']['host']
self.assertEqual(host_states[host].service,
compute_node['service'])
self.assertEqual(host_states['host1'].free_ram_mb, 0)
# 511GB
self.assertEqual(host_states['host1'].free_disk_mb, 523264)

View File

@ -197,7 +197,7 @@ class VsaSchedulerTestCase(test.TestCase):
scheduled_volume = {'id': volume_id, 'host': values['host']}
def _fake_service_get_by_args(self, context, host, binary):
return "service"
return {'host': 'fake_host', 'disabled': False}
def _fake_service_is_up_True(self, service):
return True
@ -386,7 +386,7 @@ class VsaSchedulerTestCase(test.TestCase):
self.stubs.Set(nova.db,
'service_get_by_args', self._fake_service_get_by_args)
self.stubs.Set(self.sched,
self.stubs.Set(utils,
'service_is_up', self._fake_service_is_up_False)
self.assertRaises(exception.WillNotSchedule,
@ -395,7 +395,7 @@ class VsaSchedulerTestCase(test.TestCase):
request_spec,
availability_zone="nova:host_5")
self.stubs.Set(self.sched,
self.stubs.Set(utils,
'service_is_up', self._fake_service_is_up_True)
self.sched.schedule_create_volumes(self.context,
@ -462,7 +462,7 @@ class VsaSchedulerTestCase(test.TestCase):
self.stubs.Set(nova.db, 'volume_get', _fake_volume_get_az)
self.stubs.Set(nova.db,
'service_get_by_args', self._fake_service_get_by_args)
self.stubs.Set(self.sched,
self.stubs.Set(utils,
'service_is_up', self._fake_service_is_up_True)
self.sched.schedule_create_volume(self.context,

View File

@ -1399,3 +1399,11 @@ def _showwarning(message, category, filename, lineno, file=None, line=None):
# Install our warnings handler
warnings.showwarning = _showwarning
def service_is_up(service):
"""Check whether a service is up based on last heartbeat."""
last_heartbeat = service['updated_at'] or service['created_at']
# Timestamps in DB are UTC.
elapsed = total_seconds(utcnow() - last_heartbeat)
return abs(elapsed) <= FLAGS.service_down_time