heat/heat/engine/resources/openstack/heat/resource_group.py

600 lines
23 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import itertools
import six
from heat.common import exception
from heat.common import grouputils
from heat.common.i18n import _
from heat.common import timeutils
from heat.engine import attributes
from heat.engine import constraints
from heat.engine import properties
from heat.engine.resources import stack_resource
from heat.engine import rsrc_defn
from heat.engine import scheduler
from heat.engine import support
from heat.engine import template
from heat.scaling import rolling_update
from heat.scaling import template as scale_template
template_template = {
"heat_template_version": "2015-04-30",
"resources": {}
}
class ResourceGroup(stack_resource.StackResource):
"""
A resource that creates one or more identically configured nested
resources.
In addition to the `refs` attribute, this resource implements synthetic
attributes that mirror those of the resources in the group. When
getting an attribute from this resource, however, a list of attribute
values for each resource in the group is returned. To get attribute values
for a single resource in the group, synthetic attributes of the form
`resource.{resource index}.{attribute name}` can be used. The resource ID
of a particular resource in the group can be obtained via the synthetic
attribute `resource.{resource index}`.
While each resource in the group will be identically configured, this
resource does allow for some index-based customization of the properties
of the resources in the group. For example::
resources:
my_indexed_group:
type: OS::Heat::ResourceGroup
properties:
count: 3
resource_def:
type: OS::Nova::Server
properties:
# create a unique name for each server
# using its index in the group
name: my_server_%index%
image: CentOS 6.5
flavor: 4GB Performance
would result in a group of three servers having the same image and flavor,
but names of `my_server_0`, `my_server_1`, and `my_server_2`. The variable
used for substitution can be customized by using the `index_var` property.
"""
support_status = support.SupportStatus(version='2014.1')
PROPERTIES = (
COUNT, INDEX_VAR, RESOURCE_DEF, REMOVAL_POLICIES
) = (
'count', 'index_var', 'resource_def', 'removal_policies'
)
_RESOURCE_DEF_KEYS = (
RESOURCE_DEF_TYPE, RESOURCE_DEF_PROPERTIES, RESOURCE_DEF_METADATA,
) = (
'type', 'properties', 'metadata',
)
_REMOVAL_POLICIES_KEYS = (
REMOVAL_RSRC_LIST,
) = (
'resource_list',
)
_ROLLING_UPDATES_SCHEMA_KEYS = (
MIN_IN_SERVICE, MAX_BATCH_SIZE, PAUSE_TIME,
) = (
'min_in_service', 'max_batch_size', 'pause_time',
)
_UPDATE_POLICY_SCHEMA_KEYS = (ROLLING_UPDATE,) = ('rolling_update',)
ATTRIBUTES = (
REFS, ATTR_ATTRIBUTES,
) = (
'refs', 'attributes',
)
properties_schema = {
COUNT: properties.Schema(
properties.Schema.INTEGER,
_('The number of resources to create.'),
default=1,
constraints=[
constraints.Range(min=0),
],
update_allowed=True
),
INDEX_VAR: properties.Schema(
properties.Schema.STRING,
_('A variable that this resource will use to replace with the '
'current index of a given resource in the group. Can be used, '
'for example, to customize the name property of grouped '
'servers in order to differentiate them when listed with '
'nova client.'),
default="%index%",
constraints=[
constraints.Length(min=3)
],
support_status=support.SupportStatus(version='2014.2')
),
RESOURCE_DEF: properties.Schema(
properties.Schema.MAP,
_('Resource definition for the resources in the group. The value '
'of this property is the definition of a resource just as if '
'it had been declared in the template itself.'),
schema={
RESOURCE_DEF_TYPE: properties.Schema(
properties.Schema.STRING,
_('The type of the resources in the group'),
required=True
),
RESOURCE_DEF_PROPERTIES: properties.Schema(
properties.Schema.MAP,
_('Property values for the resources in the group')
),
RESOURCE_DEF_METADATA: properties.Schema(
properties.Schema.MAP,
_('Supplied metadata for the resources in the group'),
support_status=support.SupportStatus(version='5.0.0')
),
},
required=True,
update_allowed=True
),
REMOVAL_POLICIES: properties.Schema(
properties.Schema.LIST,
_('Policies for removal of resources on update'),
schema=properties.Schema(
properties.Schema.MAP,
_('Policy to be processed when doing an update which '
'requires removal of specific resources.'),
schema={
REMOVAL_RSRC_LIST: properties.Schema(
properties.Schema.LIST,
_("List of resources to be removed "
"when doing an update which requires removal of "
"specific resources. "
"The resource may be specified several ways: "
"(1) The resource name, as in the nested stack, "
"(2) The resource reference returned from "
"get_resource in a template, as available via "
"the 'refs' attribute "
"Note this is destructive on update when specified; "
"even if the count is not being reduced, and once "
"a resource name is removed, it's name is never "
"reused in subsequent updates"
),
default=[]
),
},
),
update_allowed=True,
default=[],
support_status=support.SupportStatus(version='2015.1')
),
}
attributes_schema = {
REFS: attributes.Schema(
_("A list of resource IDs for the resources in the group"),
type=attributes.Schema.LIST
),
ATTR_ATTRIBUTES: attributes.Schema(
_("A map of resource names to the specified attribute of each "
"individual resource. "
"Requires heat_template_version: 2014-10-16."),
support_status=support.SupportStatus(version='2014.2'),
type=attributes.Schema.MAP
),
}
rolling_update_schema = {
MIN_IN_SERVICE: properties.Schema(
properties.Schema.INTEGER,
_('The minimum number of resources in service while '
'rolling updates are being executed.'),
constraints=[constraints.Range(min=0)],
default=0),
MAX_BATCH_SIZE: properties.Schema(
properties.Schema.INTEGER,
_('The maximum number of resources to replace at once.'),
constraints=[constraints.Range(min=1)],
default=1),
PAUSE_TIME: properties.Schema(
properties.Schema.NUMBER,
_('The number of seconds to wait between batches of '
'updates.'),
constraints=[constraints.Range(min=0)],
default=0),
}
update_policy_schema = {
ROLLING_UPDATE: properties.Schema(
properties.Schema.MAP,
schema=rolling_update_schema,
support_status=support.SupportStatus(version='5.0.0')
)
}
def __init__(self, name, json_snippet, stack):
super(ResourceGroup, self).__init__(name, json_snippet, stack)
self.update_policy = self.t.update_policy(self.update_policy_schema,
self.context)
def get_size(self):
return self.properties.get(self.COUNT)
def validate(self):
"""
Validation for update_policy
"""
super(ResourceGroup, self).validate()
if self.update_policy is not None:
self.update_policy.validate()
policy_name = self.ROLLING_UPDATE
if (policy_name in self.update_policy and
self.update_policy[policy_name] is not None):
pause_time = self.update_policy[policy_name][self.PAUSE_TIME]
if pause_time > 3600:
msg = _('Maximum %(arg1)s allowed is 1hr(3600s),'
' provided %(arg2)s seconds.') % dict(
arg1=self.PAUSE_TIME,
arg2=pause_time)
raise ValueError(msg)
def validate_nested_stack(self):
# Only validate the resource definition (which may be a
# nested template) if count is non-zero, to enable folks
# to disable features via a zero count if they wish
if not self.get_size():
return
test_tmpl = self._assemble_nested(["0"], include_all=True)
val_templ = template.Template(test_tmpl)
res_def = val_templ.resource_definitions(self.stack)["0"]
# make sure we can resolve the nested resource type
try:
self.stack.env.get_class(res_def.resource_type)
except exception.TemplateNotFound:
# its a template resource
pass
try:
name = "%s-%s" % (self.stack.name, self.name)
nested_stack = self._parse_nested_stack(
name,
test_tmpl,
self.child_params())
nested_stack.strict_validate = False
nested_stack.validate()
except Exception as ex:
msg = _("Failed to validate: %s") % six.text_type(ex)
raise exception.StackValidationFailed(message=msg)
def _name_blacklist(self):
"""Resolve the remove_policies to names for removal."""
nested = self.nested()
# To avoid reusing names after removal, we store a comma-separated
# blacklist in the resource data
db_rsrc_names = self.data().get('name_blacklist')
if db_rsrc_names:
current_blacklist = db_rsrc_names.split(',')
else:
current_blacklist = []
# Now we iterate over the removal policies, and update the blacklist
# with any additional names
rsrc_names = set(current_blacklist)
if nested:
for r in self.properties[self.REMOVAL_POLICIES]:
if self.REMOVAL_RSRC_LIST in r:
# Tolerate string or int list values
for n in r[self.REMOVAL_RSRC_LIST]:
str_n = six.text_type(n)
if str_n in nested:
rsrc_names.add(str_n)
continue
rsrc = nested.resource_by_refid(str_n)
if rsrc:
rsrc_names.add(rsrc.name)
# If the blacklist has changed, update the resource data
if rsrc_names != set(current_blacklist):
self.data_set('name_blacklist', ','.join(rsrc_names))
return rsrc_names
def _resource_names(self, size=None):
name_blacklist = self._name_blacklist()
if size is None:
size = self.get_size()
def is_blacklisted(name):
return name in name_blacklist
candidates = six.moves.map(six.text_type, itertools.count())
return itertools.islice(six.moves.filterfalse(is_blacklisted,
candidates),
size)
def _get_resources(self):
"""Get templates for resources."""
return [(resource.name, resource.t.render_hot())
for resource in grouputils.get_members(self)]
def _count_black_listed(self):
"""Return the number of current resource names that are blacklisted"""
existing_members = grouputils.get_member_names(self)
return len(self._name_blacklist() & set(existing_members))
def handle_create(self):
names = self._resource_names()
self.create_with_template(self._assemble_nested(names),
{},
self.stack.timeout_mins)
def _run_to_completion(self, template, timeout):
updater = self.update_with_template(template, {},
timeout)
while not super(ResourceGroup,
self).check_update_complete(updater):
yield
def _run_update(self, total_capacity, max_updates, names, timeout):
template = self._assemble_for_rolling_update(total_capacity,
max_updates,
names)
return self._run_to_completion(template, timeout)
def check_update_complete(self, checkers):
for checker in checkers:
if not checker.started():
checker.start()
if not checker.step():
return False
return True
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
if tmpl_diff:
# parse update policy
if rsrc_defn.UPDATE_POLICY in tmpl_diff:
up = json_snippet.update_policy(self.update_policy_schema,
self.context)
self.update_policy = up
checkers = []
self.properties = json_snippet.properties(self.properties_schema,
self.context)
if prop_diff and self.RESOURCE_DEF in prop_diff:
updaters = self._try_rolling_update()
if updaters:
checkers.extend(updaters)
resizer = scheduler.TaskRunner(
self._run_to_completion,
self._assemble_nested_for_size(self.get_size()),
self.stack.timeout_mins)
checkers.append(resizer)
checkers[0].start()
return checkers
def _assemble_nested_for_size(self, new_capacity):
new_names = self._resource_names(new_capacity)
return self._assemble_nested(new_names)
def FnGetAtt(self, key, *path):
if key.startswith("resource."):
return grouputils.get_nested_attrs(self, key, False, *path)
names = self._resource_names()
if key == self.REFS:
vals = [grouputils.get_rsrc_id(self, key, False, n) for n in names]
return attributes.select_from_attribute(vals, path)
if key == self.ATTR_ATTRIBUTES:
if not path:
raise exception.InvalidTemplateAttribute(
resource=self.name, key=key)
return dict((n, grouputils.get_rsrc_attr(
self, key, False, n, *path)) for n in names)
path = [key] + list(path)
return [grouputils.get_rsrc_attr(self, key, False, n, *path)
for n in names]
def _build_resource_definition(self, include_all=False):
res_def = self.properties[self.RESOURCE_DEF]
if res_def[self.RESOURCE_DEF_PROPERTIES] is None:
res_def[self.RESOURCE_DEF_PROPERTIES] = {}
if res_def[self.RESOURCE_DEF_METADATA] is None:
del res_def[self.RESOURCE_DEF_METADATA]
if not include_all:
resource_def_props = res_def[self.RESOURCE_DEF_PROPERTIES]
clean = dict((k, v) for k, v in resource_def_props.items()
if v is not None)
res_def[self.RESOURCE_DEF_PROPERTIES] = clean
return res_def
def _handle_repl_val(self, res_name, val):
repl_var = self.properties[self.INDEX_VAR]
recurse = lambda x: self._handle_repl_val(res_name, x)
if isinstance(val, six.string_types):
return val.replace(repl_var, res_name)
elif isinstance(val, collections.Mapping):
return dict(zip(val, map(recurse, six.itervalues(val))))
elif isinstance(val, collections.Sequence):
return map(recurse, val)
return val
def _do_prop_replace(self, res_name, res_def_template):
res_def = copy.deepcopy(res_def_template)
props = res_def[self.RESOURCE_DEF_PROPERTIES]
if props:
props = self._handle_repl_val(res_name, props)
res_def[self.RESOURCE_DEF_PROPERTIES] = props
return res_def
def _assemble_nested(self, names, include_all=False):
res_def = self._build_resource_definition(include_all)
resources = dict((k, self._do_prop_replace(k, res_def))
for k in names)
child_template = copy.deepcopy(template_template)
child_template['resources'] = resources
return child_template
def _assemble_for_rolling_update(self, total_capacity, max_updates,
updated_names, include_all=False):
name_blacklist = self._name_blacklist()
valid_resources = [(n, d) for n, d in self._get_resources()
if n not in name_blacklist][:total_capacity]
num_creating = max(total_capacity - len(valid_resources), 0)
new_names = iter(updated_names[:num_creating])
upd_names = updated_names[num_creating:]
def replace_priority(res_item):
name, defn = res_item
try:
return upd_names.index(name)
except ValueError:
return len(upd_names)
old_resources = sorted(valid_resources, key=replace_priority)
res_def = self._build_resource_definition(include_all)
resources = scale_template.member_definitions(old_resources, res_def,
total_capacity,
max_updates,
lambda: next(new_names),
self._do_prop_replace)
child_template = copy.deepcopy(template_template)
child_template['resources'] = dict(resources)
return child_template
def _try_rolling_update(self):
if self.update_policy[self.ROLLING_UPDATE]:
policy = self.update_policy[self.ROLLING_UPDATE]
return self._replace(policy[self.MIN_IN_SERVICE],
policy[self.MAX_BATCH_SIZE],
policy[self.PAUSE_TIME])
def _update_timeout(self, batch_cnt, pause_sec):
total_pause_time = pause_sec * max(batch_cnt - 1, 0)
if total_pause_time >= self.stack.timeout_secs():
msg = _('The current %s will result in stack update '
'timeout.') % rsrc_defn.UPDATE_POLICY
raise ValueError(msg)
return self.stack.timeout_secs() - total_pause_time
def _get_batches(self, targ_cap, init_cap, batch_size, min_in_service):
curr_cap = init_cap
updated = 0
while updated < targ_cap:
new_cap, total_new = rolling_update.next_batch(targ_cap,
curr_cap,
updated,
batch_size,
min_in_service)
if new_cap <= init_cap:
# Don't ever update existing nodes that are beyond the size
# of our target capacity, but continue to count them toward
# the number in service
high_water = targ_cap
else:
high_water = new_cap
new_names = list(self._resource_names(high_water))
create_names = new_names[high_water - (new_cap - init_cap):]
num_updates = total_new - max(new_cap - curr_cap, 0)
upd_start = targ_cap - (updated + num_updates)
upd_end = targ_cap
update_names = new_names[upd_start:upd_end]
yield (new_cap, total_new,
list(reversed(update_names + create_names)))
updated += num_updates
curr_cap = new_cap
if not rolling_update.needs_update(targ_cap, curr_cap,
updated):
break
def _replace(self, min_in_service, batch_size, pause_sec):
def pause_between_batch(pause_sec):
duration = timeutils.Duration(pause_sec)
while not duration.expired():
yield
# blacklist count existing
num_blacklist = self._count_black_listed()
# current capacity not including existing blacklisted
curr_cap = len(self.nested()) - num_blacklist if self.nested() else 0
# final capacity expected after replace
capacity = min(curr_cap, self.get_size())
batches = list(self._get_batches(capacity, curr_cap, batch_size,
min_in_service))
update_timeout = self._update_timeout(len(batches), pause_sec)
def tasks():
for index, (curr_cap, max_upd, update_rsrcs) in enumerate(batches):
yield scheduler.TaskRunner(self._run_update,
curr_cap, max_upd,
update_rsrcs,
update_timeout)
if index < (len(batches) - 1) and pause_sec > 0:
yield scheduler.TaskRunner(pause_between_batch, pause_sec)
return list(tasks())
def child_template(self):
names = self._resource_names()
return self._assemble_nested(names)
def child_params(self):
return {}
def handle_adopt(self, resource_data):
names = self._resource_names()
if names:
return self.create_with_template(self._assemble_nested(names),
{},
adopt_data=resource_data)
def resource_mapping():
return {
'OS::Heat::ResourceGroup': ResourceGroup,
}