notify PT added/removed for autoscaling

Partially implements blueprint node-centric-chain-plugin

Change-Id: I55a615f5cc768f82c5c550c5924ed3c1a16fb405
(cherry picked from commit 2967da543d)
This commit is contained in:
Ivar Lazzaro
2015-05-29 19:37:32 -07:00
parent 8fd9edac57
commit 1d69b0ce86
8 changed files with 193 additions and 6 deletions

View File

@@ -377,6 +377,22 @@ class ServiceChainDbPlugin(schain.ServiceChainPluginBase,
servicechain_spec_id=spec_id)
instance_db.specs.append(assoc)
def _get_instances_from_policy_target(self, context, policy_target):
with context.session.begin(subtransactions=True):
ptg_id = policy_target['policy_target_group_id']
scis_p = self.get_servicechain_instances(
context, {'provider_ptg_id': [ptg_id]})
scis_c = self.get_servicechain_instances(
context, {'consumer_ptg_id': [ptg_id]})
# Don't return duplicates
result = []
seen = set()
for sci in scis_p + scis_c:
if sci['id'] not in seen:
seen.add(sci['id'])
result.append(sci)
return result
@log.log
def create_servicechain_spec(self, context, servicechain_spec,
set_params=True):

View File

@@ -286,6 +286,20 @@ class ServiceChainPluginBase(service_base.ServicePluginBase):
def get_plugin_description(self):
return 'Service Chain plugin'
def update_chains_pt_added(self, context, policy_target):
""" Auto scaling function.
Override this method to react to policy target creation.
"""
pass
def update_chains_pt_removed(self, context, policy_target):
""" Auto scaling function.
Override this method to react to policy target deletion.
"""
pass
@abc.abstractmethod
@log.log
def get_servicechain_nodes(self, context, filters=None, fields=None):

View File

@@ -310,8 +310,18 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.extension_manager.initialize()
self.policy_driver_manager.initialize()
def _notify_sc_plugin_pt_added(self, context, policy_target):
if self.servicechain_plugin:
self.servicechain_plugin.update_chains_pt_added(context,
policy_target)
def _notify_sc_plugin_pt_removed(self, context, policy_target):
if self.servicechain_plugin:
self.servicechain_plugin.update_chains_pt_removed(context,
policy_target)
@log.log
def create_policy_target(self, context, policy_target):
def create_policy_target(self, context, policy_target, notify_sc=True):
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin,
@@ -335,6 +345,11 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
result['id'])
self.delete_policy_target(context, result['id'])
# REVISIT(ivar): For now just raise the exception if something goes
# wrong. This will eventually be managed in an asynchronous way.
if notify_sc:
self._notify_sc_plugin_pt_added(context, result)
return result
@log.log
@@ -362,7 +377,8 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
return updated_policy_target
@log.log
def delete_policy_target(self, context, policy_target_id):
def delete_policy_target(self, context, policy_target_id,
notify_sc=True):
session = context.session
with session.begin(subtransactions=True):
policy_target = self.get_policy_target(context, policy_target_id)
@@ -373,6 +389,12 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
super(GroupPolicyPlugin, self).delete_policy_target(
context, policy_target_id)
if notify_sc:
# REVISIT(ivar): For now just raise the exception if something goes
# wrong. This will eventually be managed in an asynchronous way.
self._notify_sc_plugin_pt_removed(
context, policy_target)
try:
self.policy_driver_manager.delete_policy_target_postcommit(
policy_context)

View File

@@ -145,6 +145,32 @@ class NodeDriverBase(object):
"""
pass
@abc.abstractmethod
def update_policy_target_added(self, context, policy_target):
"""Update a deployed Service Chain Node on adding of a PT.
This method can be used for auto scaling some services whenever a
Policy Target is added to a relevant PTG.
:param context: NodeDriverContext instance describing the service chain
and the specific node to be processed by this driver.
:param policy_target: Dict representing a Policy Target.
"""
pass
@abc.abstractmethod
def update_policy_target_removed(self, context, policy_target):
"""Update a deployed Service Chain Node on removal of a PT.
This method can be used for auto scaling some services whenever a
Policy Target is removed from a relevant PTG.
:param context: NodeDriverContext instance describing the service chain
and the specific node to be processed by this driver.
:param policy_target: Dict representing a Policy Target.
"""
pass
@abc.abstractproperty
def name(self):
pass

View File

@@ -48,6 +48,14 @@ class NoopNodeDriver(driver_base.NodeDriverBase):
def update(self, context):
pass
@log.log
def update_policy_target_added(self, context, policy_target):
pass
@log.log
def update_policy_target_removed(self, context, policy_target):
pass
@property
def name(self):
return self._name

View File

@@ -232,6 +232,36 @@ class NodeCompositionPlugin(servicechain_db.ServiceChainDbPlugin,
updated_profile)
return updated_profile
def update_chains_pt_added(self, context, policy_target):
""" Auto scaling function.
Notify the correct set of node drivers that a new policy target has
been added to a relevant PTG.
"""
self._update_chains_pt_modified(context, policy_target, 'added')
def update_chains_pt_removed(self, context, policy_target):
""" Auto scaling function.
Notify the correct set of node drivers that a new policy target has
been removed from a relevant PTG.
"""
self._update_chains_pt_modified(context, policy_target, 'removed')
def _update_chains_pt_modified(self, context, policy_target, action):
scis = self._get_instances_from_policy_target(context, policy_target)
for sci in scis:
updaters = self._get_scheduled_drivers(context, sci, 'update')
for update in updaters.values():
try:
getattr(update['driver'],
'update_policy_target_' + action)(
update['context'], policy_target)
except exc.NodeDriverError as ex:
LOG.error(_("Node Update on policy target modification "
"failed, %s"), ex.message)
def _get_instance_nodes(self, context, instance):
if not instance['servicechain_specs']:
return []

View File

@@ -113,7 +113,8 @@ class NodePlumberBase(object):
for pt in pts:
try:
gbp_plugin.delete_policy_target(context, pt.policy_target_id)
gbp_plugin.delete_policy_target(context, pt.policy_target_id,
notify_sc=False)
except group_policy.PolicyTargetNotFound as ex:
LOG.debug(ex.message)
@@ -134,7 +135,8 @@ class NodePlumberBase(object):
'name': '', 'port_id': None}
data.update(target)
pt = gbp_plugin.create_policy_target(context,
{'policy_target': data})
{'policy_target': data},
notify_sc=False)
model.set_service_target(part_context, pt['id'], relationship)
def _sort_deployment(self, deployment):

View File

@@ -305,8 +305,7 @@ class NodeCompositionPluginTestCase(
expected_res_status=201)['servicechain_node']['id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
nodes=[node_id], expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
@@ -377,6 +376,76 @@ class NodeCompositionPluginTestCase(
self.update_servicechain_node(node['id'], name='somethingelse')
self.assertEqual(3, update.call_count)
def test_relevant_ptg_update(self):
add = self.driver.update_policy_target_added = mock.Mock()
rem = self.driver.update_policy_target_removed = mock.Mock()
prof = self.create_service_profile(
service_type='LOADBALANCER')['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})['policy_target_group']
consumer = self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})['policy_target_group']
# Verify notification issued for created PT in the provider
pt = self.create_policy_target(
policy_target_group_id=provider['id'])['policy_target']
self.assertEqual(1, add.call_count)
add.assert_called_with(mock.ANY, pt)
# Verify notification issued for deleted PT in the provider
self.delete_policy_target(pt['id'])
self.assertEqual(1, rem.call_count)
rem.assert_called_with(mock.ANY, pt)
# Verify notification issued for created PT in the consumer
pt = self.create_policy_target(
policy_target_group_id=consumer['id'])['policy_target']
self.assertEqual(2, add.call_count)
add.assert_called_with(mock.ANY, pt)
# Verify notification issued for deleted PT in the consumer
self.delete_policy_target(pt['id'])
self.assertEqual(2, rem.call_count)
rem.assert_called_with(mock.ANY, pt)
def test_irrelevant_ptg_update(self):
add = self.driver.update_policy_target_added = mock.Mock()
rem = self.driver.update_policy_target_removed = mock.Mock()
prof = self.create_service_profile(
service_type='LOADBALANCER')['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'],
expected_res_status=201)['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']], expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
other = self.create_policy_target_group()['policy_target_group']
# Verify notification issued for created PT in the provider
pt = self.create_policy_target(
policy_target_group_id=other['id'])['policy_target']
self.assertFalse(add.called)
# Verify notification issued for deleted PT in the provider
self.delete_policy_target(pt['id'])
self.assertFalse(rem.called)
class AgnosticChainPlumberTestCase(NodeCompositionPluginTestCase):