495 lines
18 KiB
Python
495 lines
18 KiB
Python
# Copyright (c) 2012 Rackspace Hosting
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
CellState Manager
|
|
"""
|
|
import copy
|
|
import datetime
|
|
import functools
|
|
import time
|
|
|
|
from oslo.config import cfg
|
|
|
|
from nova.cells import rpc_driver
|
|
from nova import context
|
|
from nova.db import base
|
|
from nova import exception
|
|
from nova.i18n import _
|
|
from nova.openstack.common.db import exception as db_exc
|
|
from nova.openstack.common import fileutils
|
|
from nova.openstack.common import jsonutils
|
|
from nova.openstack.common import log as logging
|
|
from nova.openstack.common import timeutils
|
|
from nova.openstack.common import units
|
|
from nova import rpc
|
|
from nova import utils
|
|
|
|
cell_state_manager_opts = [
|
|
cfg.IntOpt('db_check_interval',
|
|
default=60,
|
|
help='Interval, in seconds, for getting fresh cell '
|
|
'information from the database.'),
|
|
cfg.StrOpt('cells_config',
|
|
help='Configuration file from which to read cells '
|
|
'configuration. If given, overrides reading cells '
|
|
'from the database.'),
|
|
]
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
CONF = cfg.CONF
|
|
CONF.import_opt('name', 'nova.cells.opts', group='cells')
|
|
CONF.import_opt('reserve_percent', 'nova.cells.opts', group='cells')
|
|
CONF.import_opt('mute_child_interval', 'nova.cells.opts', group='cells')
|
|
CONF.register_opts(cell_state_manager_opts, group='cells')
|
|
|
|
|
|
class CellState(object):
|
|
"""Holds information for a particular cell."""
|
|
def __init__(self, cell_name, is_me=False):
|
|
self.name = cell_name
|
|
self.is_me = is_me
|
|
self.last_seen = datetime.datetime.min
|
|
self.capabilities = {}
|
|
self.capacities = {}
|
|
self.db_info = {}
|
|
# TODO(comstud): The DB will specify the driver to use to talk
|
|
# to this cell, but there's no column for this yet. The only
|
|
# available driver is the rpc driver.
|
|
self.driver = rpc_driver.CellsRPCDriver()
|
|
|
|
def update_db_info(self, cell_db_info):
|
|
"""Update cell credentials from db."""
|
|
self.db_info = dict(
|
|
[(k, v) for k, v in cell_db_info.iteritems()
|
|
if k != 'name'])
|
|
|
|
def update_capabilities(self, cell_metadata):
|
|
"""Update cell capabilities for a cell."""
|
|
self.last_seen = timeutils.utcnow()
|
|
self.capabilities = cell_metadata
|
|
|
|
def update_capacities(self, capacities):
|
|
"""Update capacity information for a cell."""
|
|
self.last_seen = timeutils.utcnow()
|
|
self.capacities = capacities
|
|
|
|
def get_cell_info(self):
|
|
"""Return subset of cell information for OS API use."""
|
|
db_fields_to_return = ['is_parent', 'weight_scale', 'weight_offset']
|
|
url_fields_to_return = {
|
|
'username': 'username',
|
|
'hostname': 'rpc_host',
|
|
'port': 'rpc_port',
|
|
}
|
|
cell_info = dict(name=self.name, capabilities=self.capabilities)
|
|
if self.db_info:
|
|
for field in db_fields_to_return:
|
|
cell_info[field] = self.db_info[field]
|
|
|
|
url = rpc.get_transport_url(self.db_info['transport_url'])
|
|
if url.hosts:
|
|
for field, canonical in url_fields_to_return.items():
|
|
cell_info[canonical] = getattr(url.hosts[0], field)
|
|
return cell_info
|
|
|
|
def send_message(self, message):
|
|
"""Send a message to a cell. Just forward this to the driver,
|
|
passing ourselves and the message as arguments.
|
|
"""
|
|
self.driver.send_message_to_cell(self, message)
|
|
|
|
def __repr__(self):
|
|
me = "me" if self.is_me else "not_me"
|
|
return "Cell '%s' (%s)" % (self.name, me)
|
|
|
|
|
|
def sync_before(f):
|
|
"""Use as a decorator to wrap methods that use cell information to
|
|
make sure they sync the latest information from the DB periodically.
|
|
"""
|
|
@functools.wraps(f)
|
|
def wrapper(self, *args, **kwargs):
|
|
self._cell_data_sync()
|
|
return f(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def sync_after(f):
|
|
"""Use as a decorator to wrap methods that update cell information
|
|
in the database to make sure the data is synchronized immediately.
|
|
"""
|
|
@functools.wraps(f)
|
|
def wrapper(self, *args, **kwargs):
|
|
result = f(self, *args, **kwargs)
|
|
self._cell_data_sync(force=True)
|
|
return result
|
|
return wrapper
|
|
|
|
|
|
_unset = object()
|
|
|
|
|
|
class CellStateManager(base.Base):
|
|
def __new__(cls, cell_state_cls=None, cells_config=_unset):
|
|
if cls is not CellStateManager:
|
|
return super(CellStateManager, cls).__new__(cls)
|
|
|
|
if cells_config is _unset:
|
|
cells_config = CONF.cells.cells_config
|
|
|
|
if cells_config:
|
|
config_path = CONF.find_file(cells_config)
|
|
if not config_path:
|
|
raise cfg.ConfigFilesNotFoundError(config_files=[cells_config])
|
|
return CellStateManagerFile(cell_state_cls, config_path)
|
|
|
|
return CellStateManagerDB(cell_state_cls)
|
|
|
|
def __init__(self, cell_state_cls=None):
|
|
super(CellStateManager, self).__init__()
|
|
if not cell_state_cls:
|
|
cell_state_cls = CellState
|
|
self.cell_state_cls = cell_state_cls
|
|
self.my_cell_state = cell_state_cls(CONF.cells.name, is_me=True)
|
|
self.parent_cells = {}
|
|
self.child_cells = {}
|
|
self.last_cell_db_check = datetime.datetime.min
|
|
|
|
attempts = 0
|
|
while True:
|
|
try:
|
|
self._cell_data_sync(force=True)
|
|
break
|
|
except db_exc.DBError as e:
|
|
attempts += 1
|
|
if attempts > 120:
|
|
raise
|
|
LOG.exception(_('DB error: %s') % e)
|
|
time.sleep(30)
|
|
|
|
my_cell_capabs = {}
|
|
for cap in CONF.cells.capabilities:
|
|
name, value = cap.split('=', 1)
|
|
if ';' in value:
|
|
values = set(value.split(';'))
|
|
else:
|
|
values = set([value])
|
|
my_cell_capabs[name] = values
|
|
self.my_cell_state.update_capabilities(my_cell_capabs)
|
|
|
|
def _refresh_cells_from_dict(self, db_cells_dict):
|
|
"""Make our cell info map match the db."""
|
|
|
|
# Update current cells. Delete ones that disappeared
|
|
for cells_dict in (self.parent_cells, self.child_cells):
|
|
for cell_name, cell_info in cells_dict.items():
|
|
is_parent = cell_info.db_info['is_parent']
|
|
db_dict = db_cells_dict.get(cell_name)
|
|
if db_dict and is_parent == db_dict['is_parent']:
|
|
cell_info.update_db_info(db_dict)
|
|
else:
|
|
del cells_dict[cell_name]
|
|
|
|
# Add new cells
|
|
for cell_name, db_info in db_cells_dict.items():
|
|
if db_info['is_parent']:
|
|
cells_dict = self.parent_cells
|
|
else:
|
|
cells_dict = self.child_cells
|
|
if cell_name not in cells_dict:
|
|
cells_dict[cell_name] = self.cell_state_cls(cell_name)
|
|
cells_dict[cell_name].update_db_info(db_info)
|
|
|
|
def _time_to_sync(self):
|
|
"""Is it time to sync the DB against our memory cache?"""
|
|
diff = timeutils.utcnow() - self.last_cell_db_check
|
|
return diff.seconds >= CONF.cells.db_check_interval
|
|
|
|
def _update_our_capacity(self, ctxt=None):
|
|
"""Update our capacity in the self.my_cell_state CellState.
|
|
|
|
This will add/update 2 entries in our CellState.capacities,
|
|
'ram_free' and 'disk_free'.
|
|
|
|
The values of these are both dictionaries with the following
|
|
format:
|
|
|
|
{'total_mb': <total_memory_free_in_the_cell>,
|
|
'units_by_mb: <units_dictionary>}
|
|
|
|
<units_dictionary> contains the number of units that we can build for
|
|
every distinct memory or disk requirement that we have based on
|
|
instance types. This number is computed by looking at room available
|
|
on every compute_node.
|
|
|
|
Take the following instance_types as an example:
|
|
|
|
[{'memory_mb': 1024, 'root_gb': 10, 'ephemeral_gb': 100},
|
|
{'memory_mb': 2048, 'root_gb': 20, 'ephemeral_gb': 200}]
|
|
|
|
capacities['ram_free']['units_by_mb'] would contain the following:
|
|
|
|
{'1024': <number_of_instances_that_will_fit>,
|
|
'2048': <number_of_instances_that_will_fit>}
|
|
|
|
capacities['disk_free']['units_by_mb'] would contain the following:
|
|
|
|
{'122880': <number_of_instances_that_will_fit>,
|
|
'225280': <number_of_instances_that_will_fit>}
|
|
|
|
Units are in MB, so 122880 = (10 + 100) * 1024.
|
|
|
|
NOTE(comstud): Perhaps we should only report a single number
|
|
available per instance_type.
|
|
"""
|
|
|
|
if not ctxt:
|
|
ctxt = context.get_admin_context()
|
|
|
|
reserve_level = CONF.cells.reserve_percent / 100.0
|
|
compute_hosts = {}
|
|
|
|
def _get_compute_hosts():
|
|
compute_nodes = self.db.compute_node_get_all(ctxt)
|
|
for compute in compute_nodes:
|
|
service = compute['service']
|
|
if not service or service['disabled']:
|
|
continue
|
|
host = service['host']
|
|
compute_hosts[host] = {
|
|
'free_ram_mb': compute['free_ram_mb'],
|
|
'free_disk_mb': compute['free_disk_gb'] * 1024,
|
|
'total_ram_mb': compute['memory_mb'],
|
|
'total_disk_mb': compute['local_gb'] * 1024}
|
|
|
|
_get_compute_hosts()
|
|
if not compute_hosts:
|
|
self.my_cell_state.update_capacities({})
|
|
return
|
|
|
|
ram_mb_free_units = {}
|
|
disk_mb_free_units = {}
|
|
total_ram_mb_free = 0
|
|
total_disk_mb_free = 0
|
|
|
|
def _free_units(total, free, per_inst):
|
|
if per_inst:
|
|
min_free = total * reserve_level
|
|
free = max(0, free - min_free)
|
|
return int(free / per_inst)
|
|
else:
|
|
return 0
|
|
|
|
instance_types = self.db.flavor_get_all(ctxt)
|
|
memory_mb_slots = frozenset(
|
|
[inst_type['memory_mb'] for inst_type in instance_types])
|
|
disk_mb_slots = frozenset(
|
|
[(inst_type['root_gb'] + inst_type['ephemeral_gb']) * units.Ki
|
|
for inst_type in instance_types])
|
|
|
|
for compute_values in compute_hosts.values():
|
|
total_ram_mb_free += compute_values['free_ram_mb']
|
|
total_disk_mb_free += compute_values['free_disk_mb']
|
|
for memory_mb_slot in memory_mb_slots:
|
|
ram_mb_free_units.setdefault(str(memory_mb_slot), 0)
|
|
free_units = _free_units(compute_values['total_ram_mb'],
|
|
compute_values['free_ram_mb'], memory_mb_slot)
|
|
ram_mb_free_units[str(memory_mb_slot)] += free_units
|
|
for disk_mb_slot in disk_mb_slots:
|
|
disk_mb_free_units.setdefault(str(disk_mb_slot), 0)
|
|
free_units = _free_units(compute_values['total_disk_mb'],
|
|
compute_values['free_disk_mb'], disk_mb_slot)
|
|
disk_mb_free_units[str(disk_mb_slot)] += free_units
|
|
|
|
capacities = {'ram_free': {'total_mb': total_ram_mb_free,
|
|
'units_by_mb': ram_mb_free_units},
|
|
'disk_free': {'total_mb': total_disk_mb_free,
|
|
'units_by_mb': disk_mb_free_units}}
|
|
self.my_cell_state.update_capacities(capacities)
|
|
|
|
@sync_before
|
|
def get_cell_info_for_neighbors(self):
|
|
"""Return cell information for all neighbor cells."""
|
|
cell_list = [cell.get_cell_info()
|
|
for cell in self.child_cells.itervalues()]
|
|
cell_list.extend([cell.get_cell_info()
|
|
for cell in self.parent_cells.itervalues()])
|
|
return cell_list
|
|
|
|
@sync_before
|
|
def get_my_state(self):
|
|
"""Return information for my (this) cell."""
|
|
return self.my_cell_state
|
|
|
|
@sync_before
|
|
def get_child_cells(self):
|
|
"""Return list of child cell_infos."""
|
|
return self.child_cells.values()
|
|
|
|
@sync_before
|
|
def get_parent_cells(self):
|
|
"""Return list of parent cell_infos."""
|
|
return self.parent_cells.values()
|
|
|
|
@sync_before
|
|
def get_parent_cell(self, cell_name):
|
|
return self.parent_cells.get(cell_name)
|
|
|
|
@sync_before
|
|
def get_child_cell(self, cell_name):
|
|
return self.child_cells.get(cell_name)
|
|
|
|
@sync_before
|
|
def update_cell_capabilities(self, cell_name, capabilities):
|
|
"""Update capabilities for a cell."""
|
|
cell = (self.child_cells.get(cell_name) or
|
|
self.parent_cells.get(cell_name))
|
|
if not cell:
|
|
LOG.error(_("Unknown cell '%(cell_name)s' when trying to "
|
|
"update capabilities"),
|
|
{'cell_name': cell_name})
|
|
return
|
|
# Make sure capabilities are sets.
|
|
for capab_name, values in capabilities.items():
|
|
capabilities[capab_name] = set(values)
|
|
cell.update_capabilities(capabilities)
|
|
|
|
@sync_before
|
|
def update_cell_capacities(self, cell_name, capacities):
|
|
"""Update capacities for a cell."""
|
|
cell = (self.child_cells.get(cell_name) or
|
|
self.parent_cells.get(cell_name))
|
|
if not cell:
|
|
LOG.error(_("Unknown cell '%(cell_name)s' when trying to "
|
|
"update capacities"),
|
|
{'cell_name': cell_name})
|
|
return
|
|
cell.update_capacities(capacities)
|
|
|
|
@sync_before
|
|
def get_our_capabilities(self, include_children=True):
|
|
capabs = copy.deepcopy(self.my_cell_state.capabilities)
|
|
if include_children:
|
|
for cell in self.child_cells.values():
|
|
if timeutils.is_older_than(cell.last_seen,
|
|
CONF.cells.mute_child_interval):
|
|
continue
|
|
for capab_name, values in cell.capabilities.items():
|
|
if capab_name not in capabs:
|
|
capabs[capab_name] = set([])
|
|
capabs[capab_name] |= values
|
|
return capabs
|
|
|
|
def _add_to_dict(self, target, src):
|
|
for key, value in src.items():
|
|
if isinstance(value, dict):
|
|
target.setdefault(key, {})
|
|
self._add_to_dict(target[key], value)
|
|
continue
|
|
target.setdefault(key, 0)
|
|
target[key] += value
|
|
|
|
@sync_before
|
|
def get_our_capacities(self, include_children=True):
|
|
capacities = copy.deepcopy(self.my_cell_state.capacities)
|
|
if include_children:
|
|
for cell in self.child_cells.values():
|
|
self._add_to_dict(capacities, cell.capacities)
|
|
return capacities
|
|
|
|
@sync_before
|
|
def get_capacities(self, cell_name=None):
|
|
if not cell_name or cell_name == self.my_cell_state.name:
|
|
return self.get_our_capacities()
|
|
if cell_name in self.child_cells:
|
|
return self.child_cells[cell_name].capacities
|
|
raise exception.CellNotFound(cell_name=cell_name)
|
|
|
|
@sync_before
|
|
def cell_get(self, ctxt, cell_name):
|
|
for cells_dict in (self.parent_cells, self.child_cells):
|
|
if cell_name in cells_dict:
|
|
return cells_dict[cell_name]
|
|
|
|
raise exception.CellNotFound(cell_name=cell_name)
|
|
|
|
|
|
class CellStateManagerDB(CellStateManager):
|
|
@utils.synchronized('cell-db-sync')
|
|
def _cell_data_sync(self, force=False):
|
|
"""Update cell status for all cells from the backing data store
|
|
when necessary.
|
|
|
|
:param force: If True, cell status will be updated regardless
|
|
of whether it's time to do so.
|
|
"""
|
|
if force or self._time_to_sync():
|
|
LOG.debug("Updating cell cache from db.")
|
|
self.last_cell_db_check = timeutils.utcnow()
|
|
ctxt = context.get_admin_context()
|
|
db_cells = self.db.cell_get_all(ctxt)
|
|
db_cells_dict = dict((cell['name'], cell) for cell in db_cells)
|
|
self._refresh_cells_from_dict(db_cells_dict)
|
|
self._update_our_capacity(ctxt)
|
|
|
|
@sync_after
|
|
def cell_create(self, ctxt, values):
|
|
return self.db.cell_create(ctxt, values)
|
|
|
|
@sync_after
|
|
def cell_update(self, ctxt, cell_name, values):
|
|
return self.db.cell_update(ctxt, cell_name, values)
|
|
|
|
@sync_after
|
|
def cell_delete(self, ctxt, cell_name):
|
|
return self.db.cell_delete(ctxt, cell_name)
|
|
|
|
|
|
class CellStateManagerFile(CellStateManager):
|
|
def __init__(self, cell_state_cls, cells_config_path):
|
|
self.cells_config_path = cells_config_path
|
|
super(CellStateManagerFile, self).__init__(cell_state_cls)
|
|
|
|
def _cell_data_sync(self, force=False):
|
|
"""Update cell status for all cells from the backing data store
|
|
when necessary.
|
|
|
|
:param force: If True, cell status will be updated regardless
|
|
of whether it's time to do so.
|
|
"""
|
|
reloaded, data = fileutils.read_cached_file(self.cells_config_path,
|
|
force_reload=force)
|
|
|
|
if reloaded:
|
|
LOG.debug("Updating cell cache from config file.")
|
|
self.cells_config_data = jsonutils.loads(data)
|
|
self._refresh_cells_from_dict(self.cells_config_data)
|
|
|
|
if force or self._time_to_sync():
|
|
self.last_cell_db_check = timeutils.utcnow()
|
|
self._update_our_capacity()
|
|
|
|
def cell_create(self, ctxt, values):
|
|
raise exception.CellsUpdateProhibited()
|
|
|
|
def cell_update(self, ctxt, cell_name, values):
|
|
raise exception.CellsUpdateProhibited()
|
|
|
|
def cell_delete(self, ctxt, cell_name):
|
|
raise exception.CellsUpdateProhibited()
|