heat/heat/engine/resources/instance_group.py

408 lines
15 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from heat.common import environment_format
from heat.common import exception
from heat.common import grouputils
from heat.common.i18n import _
from heat.common import timeutils as iso8601utils
from heat.engine import attributes
from heat.engine import function
from heat.engine import properties
from heat.engine import rsrc_defn
from heat.engine import scheduler
from heat.engine import stack_resource
from heat.scaling import template
(SCALED_RESOURCE_TYPE,) = ('OS::Heat::ScaledResource',)
class InstanceGroup(stack_resource.StackResource):
PROPERTIES = (
AVAILABILITY_ZONES, LAUNCH_CONFIGURATION_NAME, SIZE,
LOAD_BALANCER_NAMES, TAGS,
) = (
'AvailabilityZones', 'LaunchConfigurationName', 'Size',
'LoadBalancerNames', 'Tags',
)
_TAG_KEYS = (
TAG_KEY, TAG_VALUE,
) = (
'Key', 'Value',
)
_ROLLING_UPDATE_SCHEMA_KEYS = (
MIN_INSTANCES_IN_SERVICE, MAX_BATCH_SIZE, PAUSE_TIME
) = (
'MinInstancesInService', 'MaxBatchSize', 'PauseTime'
)
_UPDATE_POLICY_SCHEMA_KEYS = (ROLLING_UPDATE,) = ('RollingUpdate',)
ATTRIBUTES = (
INSTANCE_LIST,
) = (
'InstanceList',
)
properties_schema = {
AVAILABILITY_ZONES: properties.Schema(
properties.Schema.LIST,
_('Not Implemented.'),
required=True
),
LAUNCH_CONFIGURATION_NAME: properties.Schema(
properties.Schema.STRING,
_('The reference to a LaunchConfiguration resource.'),
required=True,
update_allowed=True
),
SIZE: properties.Schema(
properties.Schema.INTEGER,
_('Desired number of instances.'),
required=True,
update_allowed=True
),
LOAD_BALANCER_NAMES: properties.Schema(
properties.Schema.LIST,
_('List of LoadBalancer resources.')
),
TAGS: properties.Schema(
properties.Schema.LIST,
_('Tags to attach to this group.'),
schema=properties.Schema(
properties.Schema.MAP,
schema={
TAG_KEY: properties.Schema(
properties.Schema.STRING,
required=True
),
TAG_VALUE: properties.Schema(
properties.Schema.STRING,
required=True
),
},
)
),
}
attributes_schema = {
INSTANCE_LIST: attributes.Schema(
_("A comma-delimited list of server ip addresses. "
"(Heat extension).")
),
}
rolling_update_schema = {
MIN_INSTANCES_IN_SERVICE: properties.Schema(properties.Schema.NUMBER,
default=0),
MAX_BATCH_SIZE: properties.Schema(properties.Schema.NUMBER,
default=1),
PAUSE_TIME: properties.Schema(properties.Schema.STRING,
default='PT0S')
}
update_policy_schema = {
ROLLING_UPDATE: properties.Schema(properties.Schema.MAP,
schema=rolling_update_schema)
}
def __init__(self, name, json_snippet, stack):
"""
UpdatePolicy is currently only specific to InstanceGroup and
AutoScalingGroup. Therefore, init is overridden to parse for the
UpdatePolicy.
"""
super(InstanceGroup, self).__init__(name, json_snippet, stack)
self.update_policy = self.t.update_policy(self.update_policy_schema,
self.context)
def validate(self):
"""
Add validation for update_policy
"""
super(InstanceGroup, self).validate()
if self.update_policy:
self.update_policy.validate()
policy_name = self.update_policy_schema.keys()[0]
if self.update_policy[policy_name]:
pause_time = self.update_policy[policy_name][self.PAUSE_TIME]
if iso8601utils.parse_isoduration(pause_time) > 3600:
raise ValueError('Maximum PauseTime is 1 hour.')
def validate_launchconfig(self):
# It seems to be a common error to not have a dependency on the
# launchconfiguration. This can happen if the the actual resource
# name is used instead of {get_resource: launch_conf} and no
# depends_on is used.
conf_refid = self.properties.get(self.LAUNCH_CONFIGURATION_NAME)
if conf_refid:
conf = self.stack.resource_by_refid(conf_refid)
if conf is None:
raise ValueError(_('%(lc)s (%(ref)s)'
' reference can not be found.')
% dict(lc=self.LAUNCH_CONFIGURATION_NAME,
ref=conf_refid))
if self.name not in conf.required_by():
raise ValueError(_('%(lc)s (%(ref)s)'
' requires a reference to the'
' configuration not just the name of the'
' resource.') % dict(
lc=self.LAUNCH_CONFIGURATION_NAME,
ref=conf_refid))
def handle_create(self):
"""Create a nested stack and add the initial resources to it."""
self.validate_launchconfig()
num_instances = self.properties[self.SIZE]
initial_template = self._create_template(num_instances)
return self.create_with_template(initial_template)
def check_create_complete(self, task):
"""
When stack creation is done, update the load balancer.
If any instances failed to be created, delete them.
"""
done = super(InstanceGroup, self).check_create_complete(task)
if done:
self._lb_reload()
return done
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
"""
If Properties has changed, update self.properties, so we
get the new values during any subsequent adjustment.
"""
if tmpl_diff:
# parse update policy
if 'UpdatePolicy' in tmpl_diff:
up = json_snippet.update_policy(self.update_policy_schema,
self.context)
self.update_policy = up
self.properties = json_snippet.properties(self.properties_schema,
self.context)
if prop_diff:
# Replace instances first if launch configuration has changed
self._try_rolling_update(prop_diff)
# Get the current capacity, we may need to adjust if
# Size has changed
if self.properties[self.SIZE] is not None:
self.resize(self.properties[self.SIZE])
else:
curr_size = grouputils.get_size(self)
self.resize(curr_size)
def _tags(self):
"""
Make sure that we add a tag that Ceilometer can pick up.
These need to be prepended with 'metering.'.
"""
tags = self.properties.get(self.TAGS) or []
for t in tags:
if t[self.TAG_KEY].startswith('metering.'):
# the user has added one, don't add another.
return tags
return tags + [{self.TAG_KEY: 'metering.groupname',
self.TAG_VALUE: self.FnGetRefId()}]
def handle_delete(self):
return self.delete_nested()
def _get_conf_properties(self):
conf_refid = self.properties[self.LAUNCH_CONFIGURATION_NAME]
conf = self.stack.resource_by_refid(conf_refid)
props = function.resolve(conf.properties.data)
if 'InstanceId' in props:
props = conf.rebuild_lc_properties(props['InstanceId'])
props['Tags'] = self._tags()
# if the launch configuration is created from an existing instance.
# delete the 'InstanceId' property
props.pop('InstanceId', None)
return conf, props
def _get_instance_definition(self):
conf, props = self._get_conf_properties()
return rsrc_defn.ResourceDefinition(None,
SCALED_RESOURCE_TYPE,
props,
conf.t.metadata())
def _get_instance_templates(self):
"""Get templates for resource instances."""
return [(instance.name, instance.t)
for instance in grouputils.get_members(self)]
def _create_template(self, num_instances, num_replace=0,
template_version=('HeatTemplateFormatVersion',
'2012-12-12')):
"""
Create a template to represent autoscaled instances.
Also see heat.scaling.template.resource_templates.
"""
instance_definition = self._get_instance_definition()
old_resources = self._get_instance_templates()
definitions = template.resource_templates(
old_resources, instance_definition, num_instances, num_replace)
return template.make_template(definitions, version=template_version)
def _try_rolling_update(self, prop_diff):
if (self.update_policy[self.ROLLING_UPDATE] and
self.LAUNCH_CONFIGURATION_NAME in prop_diff):
policy = self.update_policy[self.ROLLING_UPDATE]
pause_sec = iso8601utils.parse_isoduration(policy[self.PAUSE_TIME])
self._replace(policy[self.MIN_INSTANCES_IN_SERVICE],
policy[self.MAX_BATCH_SIZE],
pause_sec)
def _replace(self, min_in_service, batch_size, pause_sec):
"""
Replace the instances in the group using updated launch configuration
"""
def changing_instances(tmpl):
instances = grouputils.get_members(self)
current = set((i.name, i.t) for i in instances)
updated = set(tmpl.resource_definitions(self.nested()).items())
# includes instances to be updated and deleted
affected = set(k for k, v in current ^ updated)
return set(i.FnGetRefId() for i in instances if i.name in affected)
def pause_between_batch():
while True:
try:
yield
except scheduler.Timeout:
return
capacity = len(self.nested()) if self.nested() else 0
efft_bat_sz = min(batch_size, capacity)
efft_min_sz = min(min_in_service, capacity)
# effective capacity includes temporary capacity added to accommodate
# the minimum number of instances in service during update
efft_capacity = max(capacity - efft_bat_sz, efft_min_sz) + efft_bat_sz
batch_cnt = (efft_capacity + efft_bat_sz - 1) // efft_bat_sz
if pause_sec * (batch_cnt - 1) >= self.stack.timeout_secs():
raise ValueError('The current UpdatePolicy will result '
'in stack update timeout.')
try:
remainder = capacity
while remainder > 0 or efft_capacity > capacity:
if capacity - remainder >= efft_min_sz:
efft_capacity = capacity
template = self._create_template(efft_capacity, efft_bat_sz)
self._lb_reload(exclude=changing_instances(template))
updater = self.update_with_template(template)
updater.run_to_completion()
self.check_update_complete(updater)
remainder -= efft_bat_sz
if ((remainder > 0 or efft_capacity > capacity) and
pause_sec > 0):
self._lb_reload()
waiter = scheduler.TaskRunner(pause_between_batch)
waiter(timeout=pause_sec)
finally:
self._lb_reload()
def resize(self, new_capacity):
"""
Resize the instance group to the new capacity.
When shrinking, the oldest instances will be removed.
"""
new_template = self._create_template(new_capacity)
try:
updater = self.update_with_template(new_template)
updater.run_to_completion()
self.check_update_complete(updater)
finally:
# Reload the LB in any case, so it's only pointing at healthy
# nodes.
self._lb_reload()
def _lb_reload(self, exclude=None):
'''
Notify the LoadBalancer to reload its config to include
the changes in instances we have just made.
This must be done after activation (instance in ACTIVE state),
otherwise the instances' IP addresses may not be available.
'''
exclude = exclude or []
if self.properties[self.LOAD_BALANCER_NAMES]:
id_list = grouputils.get_member_refids(self, exclude=exclude)
for lb in self.properties[self.LOAD_BALANCER_NAMES]:
lb_resource = self.stack[lb]
props = copy.copy(lb_resource.properties.data)
if 'Instances' in lb_resource.properties_schema:
props['Instances'] = id_list
elif 'members' in lb_resource.properties_schema:
props['members'] = id_list
else:
raise exception.Error(
_("Unsupported resource '%s' in LoadBalancerNames") %
(lb,))
lb_defn = rsrc_defn.ResourceDefinition(
lb_resource.name,
lb_resource.type(),
properties=props,
metadata=lb_resource.t.get('Metadata'),
deletion_policy=lb_resource.t.get('DeletionPolicy'))
scheduler.TaskRunner(lb_resource.update, lb_defn)()
def FnGetRefId(self):
return self.physical_resource_name_or_FnGetRefId()
def _resolve_attribute(self, name):
'''
heat extension: "InstanceList" returns comma delimited list of server
ip addresses.
'''
if name == self.INSTANCE_LIST:
return u','.join(inst.FnGetAtt('PublicIp')
for inst in grouputils.get_members(self)) or None
def child_template(self):
num_instances = int(self.properties[self.SIZE])
return self._create_template(num_instances)
def child_params(self):
"""Return the environment for the nested stack."""
return {
environment_format.PARAMETERS: {},
environment_format.RESOURCE_REGISTRY: {
SCALED_RESOURCE_TYPE: 'AWS::EC2::Instance',
},
}
def resource_mapping():
return {
'OS::Heat::InstanceGroup': InstanceGroup,
}