Merge "Eliminate nested stack loading in InstanceGroup/ASG scaling"
This commit is contained in:
commit
8643975783
@ -129,19 +129,12 @@ def get_members(group, include_failed=False):
|
||||
key=lambda r: (r.status != r.FAILED, r.created_time, r.name))
|
||||
|
||||
|
||||
def get_member_refids(group, exclude=None):
|
||||
def get_member_refids(group):
|
||||
"""Get a list of member resources managed by the specified group.
|
||||
|
||||
The list of resources is sorted first by created_time then by name.
|
||||
"""
|
||||
members = get_members(group)
|
||||
if len(members) == 0:
|
||||
return []
|
||||
|
||||
if exclude is None:
|
||||
exclude = []
|
||||
return [r.FnGetRefId() for r in members
|
||||
if r.FnGetRefId() not in exclude]
|
||||
return [r.FnGetRefId() for r in get_members(group)]
|
||||
|
||||
|
||||
def get_member_names(group):
|
||||
|
@ -75,6 +75,8 @@ class AutoScalingResourceGroup(aws_asg.AutoScalingGroup):
|
||||
'outputs', 'outputs_list', 'current_size', 'refs', 'refs_map',
|
||||
)
|
||||
|
||||
(OUTPUT_MEMBER_IDS,) = (REFS_MAP,)
|
||||
|
||||
properties_schema = {
|
||||
RESOURCE: properties.Schema(
|
||||
properties.Schema.MAP,
|
||||
@ -280,26 +282,23 @@ class AutoScalingResourceGroup(aws_asg.AutoScalingGroup):
|
||||
# template generation time.
|
||||
if key == self.OUTPUTS_LIST:
|
||||
key = self.OUTPUTS
|
||||
if key == self.REFS:
|
||||
key = self.REFS_MAP
|
||||
if key.startswith("resource."):
|
||||
keycomponents = key.split('.', 2)
|
||||
path = keycomponents[2:] + path
|
||||
if path:
|
||||
key = self.OUTPUTS
|
||||
else:
|
||||
key = self.REFS_MAP
|
||||
output_name = self._attribute_output_name(key, *path)
|
||||
value = None
|
||||
|
||||
if key == self.REFS_MAP:
|
||||
value = {r: get_res_fn(r) for r in resource_names}
|
||||
elif key == self.OUTPUTS and path:
|
||||
if key == self.OUTPUTS and path:
|
||||
value = {r: get_attr_fn([r] + path) for r in resource_names}
|
||||
|
||||
if value is not None:
|
||||
yield output.OutputDefinition(output_name, value)
|
||||
|
||||
# Always define an output for the member IDs, which also doubles as the
|
||||
# output used by the REFS and REFS_MAP attributes.
|
||||
member_ids_value = {r: get_res_fn(r) for r in resource_names}
|
||||
yield output.OutputDefinition(self.OUTPUT_MEMBER_IDS,
|
||||
member_ids_value)
|
||||
|
||||
|
||||
def resource_mapping():
|
||||
return {
|
||||
|
@ -75,6 +75,8 @@ class InstanceGroup(stack_resource.StackResource):
|
||||
'InstanceList',
|
||||
)
|
||||
|
||||
(OUTPUT_MEMBER_IDS,) = ('references',)
|
||||
|
||||
properties_schema = {
|
||||
AVAILABILITY_ZONES: properties.Schema(
|
||||
properties.Schema.LIST,
|
||||
@ -312,13 +314,15 @@ class InstanceGroup(stack_resource.StackResource):
|
||||
|
||||
Replace the instances in the group using updated launch configuration.
|
||||
"""
|
||||
def changing_instances(tmpl):
|
||||
instances = grouputils.get_members(self)
|
||||
current = set((i.name, i.t) for i in instances)
|
||||
updated = set(tmpl.resource_definitions(None).items())
|
||||
def changing_instances(old_tmpl, new_tmpl):
|
||||
updated = set(new_tmpl.resource_definitions(None).items())
|
||||
if old_tmpl is not None:
|
||||
current = set(old_tmpl.resource_definitions(None).items())
|
||||
changing = current ^ updated
|
||||
else:
|
||||
changing = updated
|
||||
# includes instances to be updated and deleted
|
||||
affected = set(k for k, v in current ^ updated)
|
||||
return set(i.FnGetRefId() for i in instances if i.name in affected)
|
||||
return set(k for k, v in changing)
|
||||
|
||||
def pause_between_batch():
|
||||
while True:
|
||||
@ -327,7 +331,10 @@ class InstanceGroup(stack_resource.StackResource):
|
||||
except scheduler.Timeout:
|
||||
return
|
||||
|
||||
capacity = len(self.nested()) if self.nested() else 0
|
||||
group_data = self._group_data()
|
||||
old_template = group_data.template()
|
||||
|
||||
capacity = group_data.size(include_failed=True)
|
||||
batches = list(self._get_batches(capacity, batch_size, min_in_service))
|
||||
|
||||
update_timeout = self._update_timeout(len(batches), pause_sec)
|
||||
@ -335,16 +342,20 @@ class InstanceGroup(stack_resource.StackResource):
|
||||
try:
|
||||
for index, (total_capacity, efft_bat_sz) in enumerate(batches):
|
||||
template = self._create_template(total_capacity, efft_bat_sz)
|
||||
self._lb_reload(exclude=changing_instances(template))
|
||||
self._lb_reload(exclude=changing_instances(old_template,
|
||||
template),
|
||||
refresh_data=False)
|
||||
updater = self.update_with_template(template)
|
||||
checker = scheduler.TaskRunner(self._check_for_completion,
|
||||
updater)
|
||||
checker(timeout=update_timeout)
|
||||
old_template = template
|
||||
if index < (len(batches) - 1) and pause_sec > 0:
|
||||
self._lb_reload()
|
||||
waiter = scheduler.TaskRunner(pause_between_batch)
|
||||
waiter(timeout=pause_sec)
|
||||
finally:
|
||||
self._group_data(refresh=True)
|
||||
self._lb_reload()
|
||||
|
||||
@staticmethod
|
||||
@ -387,11 +398,29 @@ class InstanceGroup(stack_resource.StackResource):
|
||||
# nodes.
|
||||
self._lb_reload()
|
||||
|
||||
def _lb_reload(self, exclude=None):
|
||||
def _lb_reload(self, exclude=frozenset(), refresh_data=True):
|
||||
lb_names = self.properties.get(self.LOAD_BALANCER_NAMES) or []
|
||||
if lb_names:
|
||||
lb_dict = dict((name, self.stack[name]) for name in lb_names)
|
||||
lbutils.reload_loadbalancers(self, lb_dict, exclude)
|
||||
if refresh_data:
|
||||
self._outputs = None
|
||||
try:
|
||||
all_refids = self.get_output(self.OUTPUT_MEMBER_IDS)
|
||||
except (exception.NotFound,
|
||||
exception.TemplateOutputError) as op_err:
|
||||
LOG.debug('Falling back to grouputils due to %s', op_err)
|
||||
if refresh_data:
|
||||
self._nested = None
|
||||
instances = grouputils.get_members(self)
|
||||
all_refids = {i.name: i.FnGetRefId() for i in instances}
|
||||
names = [i.name for i in instances]
|
||||
else:
|
||||
group_data = self._group_data(refresh=refresh_data)
|
||||
names = group_data.member_names(include_failed=False)
|
||||
|
||||
id_list = [all_refids[n] for n in names
|
||||
if n not in exclude and n in all_refids]
|
||||
lbs = [self.stack[name] for name in lb_names]
|
||||
lbutils.reconfigure_loadbalancers(lbs, id_list)
|
||||
|
||||
def get_reference_id(self):
|
||||
return self.physical_resource_name_or_FnGetRefId()
|
||||
@ -441,6 +470,10 @@ class InstanceGroup(stack_resource.StackResource):
|
||||
for r in resource_names}
|
||||
yield output.OutputDefinition(key, value)
|
||||
|
||||
member_ids_value = {r: get_res_fn(r) for r in resource_names}
|
||||
yield output.OutputDefinition(self.OUTPUT_MEMBER_IDS,
|
||||
member_ids_value)
|
||||
|
||||
def child_template(self):
|
||||
num_instances = int(self.properties[self.SIZE])
|
||||
return self._create_template(num_instances)
|
||||
|
@ -13,38 +13,29 @@
|
||||
|
||||
import copy
|
||||
|
||||
import six
|
||||
|
||||
from heat.common import exception
|
||||
from heat.common import grouputils
|
||||
from heat.common.i18n import _
|
||||
from heat.engine import rsrc_defn
|
||||
from heat.engine import scheduler
|
||||
|
||||
|
||||
def reload_loadbalancers(group, load_balancers, exclude=None):
|
||||
def reconfigure_loadbalancers(load_balancers, id_list):
|
||||
"""Notify the LoadBalancer to reload its config.
|
||||
|
||||
This must be done after activation (instance in ACTIVE state), otherwise
|
||||
the instances' IP addresses may not be available.
|
||||
"""
|
||||
exclude = exclude or []
|
||||
id_list = grouputils.get_member_refids(group, exclude=exclude)
|
||||
for name, lb in six.iteritems(load_balancers):
|
||||
props = copy.copy(lb.properties.data)
|
||||
for lb in load_balancers:
|
||||
existing_defn = lb.frozen_definition()
|
||||
props = copy.copy(existing_defn.properties(lb.properties_schema,
|
||||
lb.context).data)
|
||||
if 'Instances' in lb.properties_schema:
|
||||
props['Instances'] = id_list
|
||||
elif 'members' in lb.properties_schema:
|
||||
props['members'] = id_list
|
||||
else:
|
||||
raise exception.Error(
|
||||
_("Unsupported resource '%s' in LoadBalancerNames") % name)
|
||||
_("Unsupported resource '%s' in LoadBalancerNames") % lb.name)
|
||||
|
||||
lb_defn = rsrc_defn.ResourceDefinition(
|
||||
lb.name,
|
||||
lb.type(),
|
||||
properties=props,
|
||||
metadata=lb.t.metadata(),
|
||||
deletion_policy=lb.t.deletion_policy())
|
||||
lb_defn = existing_defn.freeze(properties=props)
|
||||
|
||||
scheduler.TaskRunner(lb.update, lb_defn)()
|
||||
|
@ -15,7 +15,6 @@ import mock
|
||||
import six
|
||||
|
||||
from heat.common import exception
|
||||
from heat.common import grouputils
|
||||
from heat.common import template_format
|
||||
from heat.engine import properties
|
||||
from heat.engine import resource
|
||||
@ -78,23 +77,17 @@ class LBUtilsTest(common.HeatTestCase):
|
||||
self.stack = utils.parse_stack(t)
|
||||
|
||||
def test_reload_aws_lb(self):
|
||||
group = mock.Mock()
|
||||
self.patchobject(grouputils, 'get_member_refids',
|
||||
return_value=['ID1', 'ID2', 'ID3'])
|
||||
id_list = ['ID1', 'ID2', 'ID3']
|
||||
|
||||
lb1 = self.stack['aws_lb_1']
|
||||
lb2 = self.stack['aws_lb_2']
|
||||
lbs = {
|
||||
'LB_1': lb1,
|
||||
'LB_2': lb2
|
||||
}
|
||||
lb1.action = mock.Mock(return_value=lb1.CREATE)
|
||||
lb2.action = mock.Mock(return_value=lb2.CREATE)
|
||||
lb1.handle_update = mock.Mock()
|
||||
lb2.handle_update = mock.Mock()
|
||||
prop_diff = {'Instances': ['ID1', 'ID2', 'ID3']}
|
||||
prop_diff = {'Instances': id_list}
|
||||
|
||||
lbutils.reload_loadbalancers(group, lbs)
|
||||
lbutils.reconfigure_loadbalancers([lb1, lb2], id_list)
|
||||
|
||||
# For verification's purpose, we just check the prop_diff
|
||||
lb1.handle_update.assert_called_with(mock.ANY, mock.ANY,
|
||||
@ -103,25 +96,19 @@ class LBUtilsTest(common.HeatTestCase):
|
||||
prop_diff)
|
||||
|
||||
def test_reload_neutron_lb(self):
|
||||
group = mock.Mock()
|
||||
self.patchobject(grouputils, 'get_member_refids',
|
||||
return_value=['ID1', 'ID2', 'ID3'])
|
||||
id_list = ['ID1', 'ID2', 'ID3']
|
||||
|
||||
lb1 = self.stack['neutron_lb_1']
|
||||
lb2 = self.stack['neutron_lb_2']
|
||||
lb1.action = mock.Mock(return_value=lb1.CREATE)
|
||||
lb2.action = mock.Mock(return_value=lb2.CREATE)
|
||||
lbs = {
|
||||
'LB_1': lb1,
|
||||
'LB_2': lb2
|
||||
}
|
||||
|
||||
lb1.handle_update = mock.Mock()
|
||||
lb2.handle_update = mock.Mock()
|
||||
|
||||
prop_diff = {'members': ['ID1', 'ID2', 'ID3']}
|
||||
prop_diff = {'members': id_list}
|
||||
|
||||
lbutils.reload_loadbalancers(group, lbs)
|
||||
lbutils.reconfigure_loadbalancers([lb1, lb2], id_list)
|
||||
|
||||
# For verification's purpose, we just check the prop_diff
|
||||
lb1.handle_update.assert_called_with(mock.ANY, mock.ANY,
|
||||
@ -130,16 +117,11 @@ class LBUtilsTest(common.HeatTestCase):
|
||||
prop_diff)
|
||||
|
||||
def test_reload_non_lb(self):
|
||||
group = mock.Mock()
|
||||
self.patchobject(grouputils, 'get_member_refids',
|
||||
return_value=['ID1', 'ID2', 'ID3'])
|
||||
|
||||
lbs = {
|
||||
'LB_1': self.stack['non_lb'],
|
||||
}
|
||||
id_list = ['ID1', 'ID2', 'ID3']
|
||||
non_lb = self.stack['non_lb']
|
||||
|
||||
error = self.assertRaises(exception.Error,
|
||||
lbutils.reload_loadbalancers,
|
||||
group, lbs)
|
||||
self.assertIn("Unsupported resource 'LB_1' in LoadBalancerNames",
|
||||
lbutils.reconfigure_loadbalancers,
|
||||
[non_lb], id_list)
|
||||
self.assertIn("Unsupported resource 'non_lb' in LoadBalancerNames",
|
||||
six.text_type(error))
|
||||
|
@ -255,8 +255,7 @@ class LoadbalancerReloadTest(common.HeatTestCase):
|
||||
"LoadBalancerNames": ["ElasticLoadBalancer"]})
|
||||
group = instgrp.InstanceGroup('asg', defn, stack)
|
||||
|
||||
mock_members = self.patchobject(grouputils, 'get_member_refids')
|
||||
mock_members.return_value = ['aaaa', 'bbb']
|
||||
mocks = self.setup_mocks(group, ['aaaa', 'bbb'])
|
||||
expected = rsrc_defn.ResourceDefinition(
|
||||
'ElasticLoadBalancer',
|
||||
'AWS::ElasticLoadBalancing::LoadBalancer',
|
||||
@ -264,13 +263,11 @@ class LoadbalancerReloadTest(common.HeatTestCase):
|
||||
'Listeners': [{'InstancePort': u'80',
|
||||
'LoadBalancerPort': u'80',
|
||||
'Protocol': 'HTTP'}],
|
||||
'AvailabilityZones': ['nova']},
|
||||
metadata={},
|
||||
deletion_policy='Delete'
|
||||
'AvailabilityZones': ['nova']}
|
||||
)
|
||||
|
||||
group._lb_reload()
|
||||
mock_members.assert_called_once_with(group, exclude=[])
|
||||
self.check_mocks(group, mocks)
|
||||
lb.update.assert_called_once_with(expected)
|
||||
|
||||
def test_members(self):
|
||||
@ -297,18 +294,15 @@ class LoadbalancerReloadTest(common.HeatTestCase):
|
||||
"LoadBalancerNames": ["ElasticLoadBalancer"]})
|
||||
group = instgrp.InstanceGroup('asg', defn, stack)
|
||||
|
||||
mock_members = self.patchobject(grouputils, 'get_member_refids')
|
||||
mock_members.return_value = ['aaaa', 'bbb']
|
||||
mocks = self.setup_mocks(group, ['aaaa', 'bbb'])
|
||||
expected = rsrc_defn.ResourceDefinition(
|
||||
'ElasticLoadBalancer',
|
||||
'OS::Neutron::LoadBalancer',
|
||||
{'protocol_port': 8080,
|
||||
'members': ['aaaa', 'bbb']},
|
||||
metadata={},
|
||||
deletion_policy='Delete')
|
||||
'members': ['aaaa', 'bbb']})
|
||||
|
||||
group._lb_reload()
|
||||
mock_members.assert_called_once_with(group, exclude=[])
|
||||
self.check_mocks(group, mocks)
|
||||
lb.update.assert_called_once_with(expected)
|
||||
|
||||
def test_lb_reload_invalid_resource(self):
|
||||
@ -332,9 +326,7 @@ class LoadbalancerReloadTest(common.HeatTestCase):
|
||||
"LoadBalancerNames": ["ElasticLoadBalancer"]})
|
||||
group = instgrp.InstanceGroup('asg', defn, stack)
|
||||
|
||||
mock_members = self.patchobject(grouputils, 'get_member_refids')
|
||||
mock_members.return_value = ['aaaa', 'bbb']
|
||||
|
||||
self.setup_mocks(group, ['aaaa', 'bbb'])
|
||||
error = self.assertRaises(exception.Error,
|
||||
group._lb_reload)
|
||||
self.assertEqual(
|
||||
@ -350,19 +342,48 @@ class LoadbalancerReloadTest(common.HeatTestCase):
|
||||
self.patchobject(stk_defn.StackDefinition, 'get_availability_zones',
|
||||
return_value=['abc', 'xyz'])
|
||||
|
||||
mock_members = self.patchobject(grouputils, 'get_member_refids')
|
||||
mock_members.return_value = ['aaaabbbbcccc']
|
||||
|
||||
stack = utils.parse_stack(t, params=inline_templates.as_params)
|
||||
lb = stack['ElasticLoadBalancer']
|
||||
lb.state_set(lb.CREATE, lb.COMPLETE)
|
||||
lb.handle_update = mock.Mock(return_value=None)
|
||||
group = stack['WebServerGroup']
|
||||
self.setup_mocks(group, ['aaaabbbbcccc'])
|
||||
group._lb_reload()
|
||||
lb.handle_update.assert_called_once_with(
|
||||
mock.ANY, mock.ANY,
|
||||
{'Instances': ['aaaabbbbcccc']})
|
||||
|
||||
def setup_mocks(self, group, member_refids):
|
||||
refs = {str(i): r for i, r in enumerate(member_refids)}
|
||||
group.get_output = mock.Mock(return_value=refs)
|
||||
names = sorted(refs.keys())
|
||||
group_data = group._group_data()
|
||||
group_data.member_names = mock.Mock(return_value=names)
|
||||
group._group_data = mock.Mock(return_value=group_data)
|
||||
|
||||
def check_mocks(self, group, unused):
|
||||
pass
|
||||
|
||||
|
||||
class LoadbalancerReloadFallbackTest(LoadbalancerReloadTest):
|
||||
def setup_mocks(self, group, member_refids):
|
||||
# Raise NotFound when getting output, to force fallback to old-school
|
||||
# grouputils functions
|
||||
group.get_output = mock.Mock(side_effect=exception.NotFound)
|
||||
|
||||
def make_mock_member(refid):
|
||||
mem = mock.Mock()
|
||||
mem.FnGetRefId = mock.Mock(return_value=refid)
|
||||
return mem
|
||||
|
||||
members = [make_mock_member(r) for r in member_refids]
|
||||
mock_members = self.patchobject(grouputils, 'get_members',
|
||||
return_value=members)
|
||||
return mock_members
|
||||
|
||||
def check_mocks(self, group, mock_members):
|
||||
mock_members.assert_called_once_with(group)
|
||||
|
||||
|
||||
class InstanceGroupWithNestedStack(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
@ -417,7 +438,10 @@ class ReplaceTest(InstanceGroupWithNestedStack):
|
||||
|
||||
def setUp(self):
|
||||
super(ReplaceTest, self).setUp()
|
||||
self.group._nested = self.get_fake_nested_stack(2)
|
||||
nested = self.get_fake_nested_stack(2)
|
||||
inspector = self.group._group_data()
|
||||
inspector.size = mock.Mock(return_value=2)
|
||||
inspector.template = mock.Mock(return_value=nested.defn._template)
|
||||
|
||||
def test_rolling_updates(self):
|
||||
self.group._replace(self.min_in_service, self.batch_size, 0)
|
||||
|
@ -297,6 +297,6 @@ class InstanceGroupReplaceTest(common.HeatTestCase):
|
||||
# (6 - 1)*14*60 > 3600, so to raise error
|
||||
|
||||
group = instgrp.InstanceGroup('asg', defn, stack)
|
||||
group.nested = mock.MagicMock(return_value=range(12))
|
||||
group._group_data().size = mock.Mock(return_value=12)
|
||||
self.assertRaises(ValueError,
|
||||
group._replace, 10, 1, 14 * 60)
|
||||
|
@ -58,8 +58,6 @@ class GroupUtilsTest(common.HeatTestCase):
|
||||
# refids
|
||||
actual_ids = grouputils.get_member_refids(group)
|
||||
self.assertEqual(['ID-r0', 'ID-r1'], actual_ids)
|
||||
partial_ids = grouputils.get_member_refids(group, exclude=['ID-r1'])
|
||||
self.assertEqual(['ID-r0'], partial_ids)
|
||||
|
||||
def test_group_with_failed_members(self):
|
||||
group = mock.Mock()
|
||||
|
Loading…
x
Reference in New Issue
Block a user