Cells: Add periodic instance healing
Adds a periodic task that'll sync instance information to API cells periodically. Does a handful of instances per periodic task run based on config values. Instances picked for syncing are randomized so that multiple nova-cells services won't be syncing the same instances at nearly the same time. instance_updated_at_threshold -- Number of seconds after an instance was updated or deleted to continue to sync (Ie, don't sync instances updated more than 'x' seconds ago.) instance_update_num_instances -- Number of instances to update per periodic task run Implements blueprint nova-compute-cells DocImpact Change-Id: I3103c3a69ab9cf0ec3e399abe046ed0d216234ab
This commit is contained in:
parent
3e3111f137
commit
2a89de1e46
|
@ -16,19 +16,31 @@
|
|||
"""
|
||||
Cells Service Manager
|
||||
"""
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from nova.cells import messaging
|
||||
from nova.cells import state as cells_state
|
||||
from nova.cells import utils as cells_utils
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import manager
|
||||
from nova.openstack.common import cfg
|
||||
from nova.openstack.common import importutils
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common import timeutils
|
||||
|
||||
cell_manager_opts = [
|
||||
cfg.StrOpt('driver',
|
||||
default='nova.cells.rpc_driver.CellsRPCDriver',
|
||||
help='Cells communication driver to use'),
|
||||
cfg.IntOpt("instance_updated_at_threshold",
|
||||
default=3600,
|
||||
help="Number of seconds after an instance was updated "
|
||||
"or deleted to continue to update cells"),
|
||||
cfg.IntOpt("instance_update_num_instances",
|
||||
default=1,
|
||||
help="Number of instances to update per periodic task run")
|
||||
]
|
||||
|
||||
|
||||
|
@ -66,6 +78,7 @@ class CellsManager(manager.Manager):
|
|||
cells_driver_cls = importutils.import_class(
|
||||
CONF.cells.driver)
|
||||
self.driver = cells_driver_cls()
|
||||
self.instances_to_heal = iter([])
|
||||
|
||||
def post_start_hook(self):
|
||||
"""Have the driver start its consumers for inter-cell communication.
|
||||
|
@ -93,6 +106,77 @@ class CellsManager(manager.Manager):
|
|||
self.msg_runner.tell_parents_our_capabilities(ctxt)
|
||||
self.msg_runner.tell_parents_our_capacities(ctxt)
|
||||
|
||||
@manager.periodic_task
|
||||
def _heal_instances(self, ctxt):
|
||||
"""Periodic task to send updates for a number of instances to
|
||||
parent cells.
|
||||
|
||||
On every run of the periodic task, we will attempt to sync
|
||||
'CONF.cells.instance_update_num_instances' number of instances.
|
||||
When we get the list of instances, we shuffle them so that multiple
|
||||
nova-cells services aren't attempting to sync the same instances
|
||||
in lockstep.
|
||||
|
||||
If CONF.cells.instance_update_at_threshold is set, only attempt
|
||||
to sync instances that have been updated recently. The CONF
|
||||
setting defines the maximum number of seconds old the updated_at
|
||||
can be. Ie, a threshold of 3600 means to only update instances
|
||||
that have modified in the last hour.
|
||||
"""
|
||||
|
||||
if not self.state_manager.get_parent_cells():
|
||||
# No need to sync up if we have no parents.
|
||||
return
|
||||
|
||||
info = {'updated_list': False}
|
||||
|
||||
def _next_instance():
|
||||
try:
|
||||
instance = self.instances_to_heal.next()
|
||||
except StopIteration:
|
||||
if info['updated_list']:
|
||||
return
|
||||
threshold = CONF.cells.instance_updated_at_threshold
|
||||
updated_since = None
|
||||
if threshold > 0:
|
||||
updated_since = timeutils.utcnow() - datetime.timedelta(
|
||||
seconds=threshold)
|
||||
self.instances_to_heal = cells_utils.get_instances_to_sync(
|
||||
ctxt, updated_since=updated_since, shuffle=True,
|
||||
uuids_only=True)
|
||||
info['updated_list'] = True
|
||||
try:
|
||||
instance = self.instances_to_heal.next()
|
||||
except StopIteration:
|
||||
return
|
||||
return instance
|
||||
|
||||
rd_context = ctxt.elevated(read_deleted='yes')
|
||||
|
||||
for i in xrange(CONF.cells.instance_update_num_instances):
|
||||
while True:
|
||||
# Yield to other greenthreads
|
||||
time.sleep(0)
|
||||
instance_uuid = _next_instance()
|
||||
if not instance_uuid:
|
||||
return
|
||||
try:
|
||||
instance = self.db.instance_get_by_uuid(rd_context,
|
||||
instance_uuid)
|
||||
except exception.InstanceNotFound:
|
||||
continue
|
||||
self._sync_instance(ctxt, instance)
|
||||
break
|
||||
|
||||
def _sync_instance(self, ctxt, instance):
|
||||
"""Broadcast an instance_update or instance_destroy message up to
|
||||
parent cells.
|
||||
"""
|
||||
if instance['deleted']:
|
||||
self.instance_destroy_at_top(ctxt, instance)
|
||||
else:
|
||||
self.instance_update_at_top(ctxt, instance)
|
||||
|
||||
def schedule_run_instance(self, ctxt, host_sched_kwargs):
|
||||
"""Pick a cell (possibly ourselves) to build new instance(s)
|
||||
and forward the request accordingly.
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
# Copyright (c) 2012 Rackspace Hosting
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Cells Utility Methods
|
||||
"""
|
||||
import random
|
||||
|
||||
from nova import db
|
||||
|
||||
|
||||
def get_instances_to_sync(context, updated_since=None, project_id=None,
|
||||
deleted=True, shuffle=False, uuids_only=False):
|
||||
"""Return a generator that will return a list of active and
|
||||
deleted instances to sync with parent cells. The list may
|
||||
optionally be shuffled for periodic updates so that multiple
|
||||
cells services aren't self-healing the same instances in nearly
|
||||
lockstep.
|
||||
"""
|
||||
filters = {}
|
||||
if updated_since is not None:
|
||||
filters['changes-since'] = updated_since
|
||||
if project_id is not None:
|
||||
filters['project_id'] = project_id
|
||||
if not deleted:
|
||||
filters['deleted'] = False
|
||||
# Active instances first.
|
||||
instances = db.instance_get_all_by_filters(
|
||||
context, filters, 'deleted', 'asc')
|
||||
if shuffle:
|
||||
random.shuffle(instances)
|
||||
for instance in instances:
|
||||
if uuids_only:
|
||||
yield instance['uuid']
|
||||
else:
|
||||
yield instance
|
|
@ -55,6 +55,12 @@ class FakeDBApi(object):
|
|||
def compute_node_get_all(self, ctxt):
|
||||
return []
|
||||
|
||||
def instance_get_all_by_filters(self, ctxt, *args, **kwargs):
|
||||
return []
|
||||
|
||||
def instance_get_by_uuid(self, ctxt, *args, **kwargs):
|
||||
return None
|
||||
|
||||
|
||||
class FakeCellsDriver(driver.BaseCellsDriver):
|
||||
pass
|
||||
|
|
|
@ -15,8 +15,12 @@
|
|||
"""
|
||||
Tests For CellsManager
|
||||
"""
|
||||
import datetime
|
||||
|
||||
from nova.cells import messaging
|
||||
from nova.cells import utils as cells_utils
|
||||
from nova import context
|
||||
from nova.openstack.common import timeutils
|
||||
from nova import test
|
||||
from nova.tests.cells import fakes
|
||||
|
||||
|
@ -149,3 +153,61 @@ class CellsManagerClassTestCase(test.TestCase):
|
|||
self.mox.ReplayAll()
|
||||
self.cells_manager.bw_usage_update_at_top(
|
||||
self.ctxt, bw_update_info='fake-bw-info')
|
||||
|
||||
def test_heal_instances(self):
|
||||
self.flags(instance_updated_at_threshold=1000,
|
||||
instance_update_num_instances=2,
|
||||
group='cells')
|
||||
|
||||
fake_context = context.RequestContext('fake', 'fake')
|
||||
stalled_time = timeutils.utcnow()
|
||||
updated_since = stalled_time - datetime.timedelta(seconds=1000)
|
||||
|
||||
def utcnow():
|
||||
return stalled_time
|
||||
|
||||
call_info = {'get_instances': 0, 'sync_instances': []}
|
||||
|
||||
instances = ['instance1', 'instance2', 'instance3']
|
||||
|
||||
def get_instances_to_sync(context, **kwargs):
|
||||
self.assertEqual(context, fake_context)
|
||||
call_info['shuffle'] = kwargs.get('shuffle')
|
||||
call_info['project_id'] = kwargs.get('project_id')
|
||||
call_info['updated_since'] = kwargs.get('updated_since')
|
||||
call_info['get_instances'] += 1
|
||||
return iter(instances)
|
||||
|
||||
def instance_get_by_uuid(context, uuid):
|
||||
return instances[int(uuid[-1]) - 1]
|
||||
|
||||
def sync_instance(context, instance):
|
||||
self.assertEqual(context, fake_context)
|
||||
call_info['sync_instances'].append(instance)
|
||||
|
||||
self.stubs.Set(cells_utils, 'get_instances_to_sync',
|
||||
get_instances_to_sync)
|
||||
self.stubs.Set(self.cells_manager.db, 'instance_get_by_uuid',
|
||||
instance_get_by_uuid)
|
||||
self.stubs.Set(self.cells_manager, '_sync_instance',
|
||||
sync_instance)
|
||||
self.stubs.Set(timeutils, 'utcnow', utcnow)
|
||||
|
||||
self.cells_manager._heal_instances(fake_context)
|
||||
self.assertEqual(call_info['shuffle'], True)
|
||||
self.assertEqual(call_info['project_id'], None)
|
||||
self.assertEqual(call_info['updated_since'], updated_since)
|
||||
self.assertEqual(call_info['get_instances'], 1)
|
||||
# Only first 2
|
||||
self.assertEqual(call_info['sync_instances'],
|
||||
instances[:2])
|
||||
|
||||
call_info['sync_instances'] = []
|
||||
self.cells_manager._heal_instances(fake_context)
|
||||
self.assertEqual(call_info['shuffle'], True)
|
||||
self.assertEqual(call_info['project_id'], None)
|
||||
self.assertEqual(call_info['updated_since'], updated_since)
|
||||
self.assertEqual(call_info['get_instances'], 2)
|
||||
# Now the last 1 and the first 1
|
||||
self.assertEqual(call_info['sync_instances'],
|
||||
[instances[-1], instances[0]])
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
# Copyright (c) 2012 Rackspace Hosting
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""
|
||||
Tests For Cells Utility methods
|
||||
"""
|
||||
import inspect
|
||||
import random
|
||||
|
||||
from nova.cells import utils as cells_utils
|
||||
from nova import db
|
||||
from nova import test
|
||||
|
||||
|
||||
class CellsUtilsTestCase(test.TestCase):
|
||||
"""Test case for Cells utility methods."""
|
||||
def test_get_instances_to_sync(self):
|
||||
fake_context = 'fake_context'
|
||||
|
||||
call_info = {'get_all': 0, 'shuffle': 0}
|
||||
|
||||
def random_shuffle(_list):
|
||||
call_info['shuffle'] += 1
|
||||
|
||||
def instance_get_all_by_filters(context, filters,
|
||||
sort_key, sort_order):
|
||||
self.assertEqual(context, fake_context)
|
||||
self.assertEqual(sort_key, 'deleted')
|
||||
self.assertEqual(sort_order, 'asc')
|
||||
call_info['got_filters'] = filters
|
||||
call_info['get_all'] += 1
|
||||
return ['fake_instance1', 'fake_instance2', 'fake_instance3']
|
||||
|
||||
self.stubs.Set(db, 'instance_get_all_by_filters',
|
||||
instance_get_all_by_filters)
|
||||
self.stubs.Set(random, 'shuffle', random_shuffle)
|
||||
|
||||
instances = cells_utils.get_instances_to_sync(fake_context)
|
||||
self.assertTrue(inspect.isgenerator(instances))
|
||||
self.assertTrue(len([x for x in instances]), 3)
|
||||
self.assertEqual(call_info['get_all'], 1)
|
||||
self.assertEqual(call_info['got_filters'], {})
|
||||
self.assertEqual(call_info['shuffle'], 0)
|
||||
|
||||
instances = cells_utils.get_instances_to_sync(fake_context,
|
||||
shuffle=True)
|
||||
self.assertTrue(inspect.isgenerator(instances))
|
||||
self.assertTrue(len([x for x in instances]), 3)
|
||||
self.assertEqual(call_info['get_all'], 2)
|
||||
self.assertEqual(call_info['got_filters'], {})
|
||||
self.assertEqual(call_info['shuffle'], 1)
|
||||
|
||||
instances = cells_utils.get_instances_to_sync(fake_context,
|
||||
updated_since='fake-updated-since')
|
||||
self.assertTrue(inspect.isgenerator(instances))
|
||||
self.assertTrue(len([x for x in instances]), 3)
|
||||
self.assertEqual(call_info['get_all'], 3)
|
||||
self.assertEqual(call_info['got_filters'],
|
||||
{'changes-since': 'fake-updated-since'})
|
||||
self.assertEqual(call_info['shuffle'], 1)
|
||||
|
||||
instances = cells_utils.get_instances_to_sync(fake_context,
|
||||
project_id='fake-project',
|
||||
updated_since='fake-updated-since', shuffle=True)
|
||||
self.assertTrue(inspect.isgenerator(instances))
|
||||
self.assertTrue(len([x for x in instances]), 3)
|
||||
self.assertEqual(call_info['get_all'], 4)
|
||||
self.assertEqual(call_info['got_filters'],
|
||||
{'changes-since': 'fake-updated-since',
|
||||
'project_id': 'fake-project'})
|
||||
self.assertEqual(call_info['shuffle'], 2)
|
Loading…
Reference in New Issue