add cluster_id attribute to PTs for HA

Partially implements blueprint node-centric-chain-plugin

Change-Id: If4f4752b04a0b27df1a296798e0289567a074b86
This commit is contained in:
Ivar Lazzaro 2015-09-30 16:25:00 -07:00
parent d7ab9f3d44
commit 8888c117cc
13 changed files with 254 additions and 9 deletions

View File

@ -526,3 +526,27 @@ class LocalAPI(object):
'servicechain_spec', scs_id)
except sc_ext.ServiceChainSpecNotFound:
LOG.warn(_("servicechain spec %s already deleted"), scs_id)
def _get_policy_target(self, plugin_context, pt_id):
return self._get_resource(self._group_policy_plugin, plugin_context,
'policy_target', pt_id)
def _get_policy_targets(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(self._group_policy_plugin, plugin_context,
'policy_targets', filters)
def _create_policy_target(self, plugin_context, attrs):
return self._create_resource(self._group_policy_plugin, plugin_context,
'policy_target', attrs, False)
def _update_policy_target(self, plugin_context, pt_id, attrs):
return self._update_resource(self._group_policy_plugin, plugin_context,
'policy_target', pt_id, attrs, False)
def _delete_policy_target(self, plugin_context, pt_id):
try:
self._delete_resource(self._group_policy_plugin, plugin_context,
'policy_target', pt_id, False)
except gp_ext.PolicyTargetNotFound:
LOG.warn(_('Policy Rule Set %s already deleted'), pt_id)

View File

@ -62,6 +62,7 @@ class PolicyTarget(gquota.GBPQuotaBase, model_base.BASEV2, models_v2.HasId,
sa.ForeignKey(
'gp_policy_target_groups.id'),
nullable=True)
cluster_id = sa.Column(sa.String(255))
class PTGToPRSProvidingAssociation(model_base.BASEV2):
@ -828,7 +829,8 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase,
'tenant_id': pt['tenant_id'],
'name': pt['name'],
'description': pt['description'],
'policy_target_group_id': pt['policy_target_group_id']}
'policy_target_group_id': pt['policy_target_group_id'],
'cluster_id': pt['cluster_id']}
return self._fields(res, fields)
def _make_policy_target_group_dict(self, ptg, fields=None):
@ -1130,7 +1132,8 @@ class GroupPolicyDbPlugin(gpolicy.GroupPolicyPluginBase,
pt_db = PolicyTarget(
id=uuidutils.generate_uuid(), tenant_id=tenant_id,
name=pt['name'], description=pt['description'],
policy_target_group_id=pt['policy_target_group_id'])
policy_target_group_id=pt['policy_target_group_id'],
cluster_id=pt['cluster_id'])
context.session.add(pt_db)
return self._make_policy_target_dict(pt_db)

View File

@ -237,7 +237,8 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
description=pt['description'],
policy_target_group_id=
pt['policy_target_group_id'],
port_id=pt['port_id'])
port_id=pt['port_id'],
cluster_id=pt['cluster_id'])
context.session.add(pt_db)
return self._make_policy_target_dict(pt_db)

View File

@ -0,0 +1,33 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# revision identifiers, used by Alembic.
revision = '4121adfbac30'
down_revision = 'acb613677ffa'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column(
'gp_policy_targets',
sa.Column('cluster_id', sa.String(255), server_default='')
)
def downgrade():
pass

View File

@ -1 +1 @@
acb613677ffa
4121adfbac30

View File

@ -408,6 +408,9 @@ RESOURCE_ATTRIBUTE_MAP = {
'policy_target_group_id': {'allow_post': True, 'allow_put': True,
'validate': {'type:uuid_or_none': None},
'required': True, 'is_visible': True},
'cluster_id': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'default': '', 'is_visible': True}
},
POLICY_TARGET_GROUPS: {
'id': {'allow_post': False, 'allow_put': False,

View File

@ -290,3 +290,16 @@ class OnlyOneGroupDefaultGatewayAllowed(GroupPolicyBadRequest):
class PTGAlreadyProvidingRedirectPRS(GroupPolicyBadRequest):
message = _("PTG %(ptg_id)s is already providing a redirect PRS.")
class InvalidClusterId(GroupPolicyBadRequest):
message = _("In RMD and derived drivers, a PT cluster_id should point to "
"an existing PT.")
class PolicyTargetInUse(GroupPolicyBadRequest):
message = _("Cannot delete a PT in use by a cluster.")
class InvalidClusterPtg(GroupPolicyBadRequest):
message = _("Inter PTG clustering disallowed.")

View File

@ -460,6 +460,8 @@ class ApicMappingDriver(api.ResourceMappingDriver):
reserved.append(subnet)
self._use_implicit_port(context, subnets=reserved or owned)
self._update_cluster_membership(
context, new_cluster_id=context.current['cluster_id'])
port = self._get_port(context._plugin_context,
context.current['port_id'])
if self._is_port_bound(port):
@ -753,6 +755,7 @@ class ApicMappingDriver(api.ResourceMappingDriver):
self._apply_policy_rule_set_rules(context, context.current, to_add)
def update_policy_target_precommit(self, context):
self._validate_cluster_id(context)
if (context.original['policy_target_group_id'] !=
context.current['policy_target_group_id']):
if context.current['policy_target_group_id']:
@ -760,6 +763,9 @@ class ApicMappingDriver(api.ResourceMappingDriver):
def update_policy_target_postcommit(self, context):
curr, orig = context.current, context.original
self._update_cluster_membership(
context, new_cluster_id=context.current['cluster_id'],
old_cluster_id=context.original['cluster_id'])
if ((orig['policy_target_group_id'] != curr['policy_target_group_id'])
or ((curr['description'] != orig['description']) and
curr['description'].startswith(PROXY_PORT_PREFIX))):

View File

@ -53,6 +53,7 @@ class NvsdGbpDriver(res_map.ResourceMappingDriver):
@log.log
def update_policy_target_postcommit(self, context):
super(NvsdGbpDriver, self).update_policy_target_postcommit(context)
self.nvsd_api.update_endpoint(context._plugin_context,
context.current)

View File

@ -199,6 +199,7 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
@log.log
def create_policy_target_precommit(self, context):
self._validate_cluster_id(context)
if not context.current['policy_target_group_id']:
raise exc.PolicyTargetRequiresPolicyTargetGroup()
if context.current['port_id']:
@ -225,6 +226,8 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
def create_policy_target_postcommit(self, context):
if not context.current['port_id']:
self._use_implicit_port(context)
self._update_cluster_membership(
context, new_cluster_id=context.current['cluster_id'])
self._assoc_ptg_sg_to_pt(context, context.current['id'],
context.current['policy_target_group_id'])
self._associate_fip_to_pt(context)
@ -337,16 +340,21 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
@log.log
def update_policy_target_precommit(self, context):
self._validate_cluster_id(context)
if (context.current['policy_target_group_id'] !=
context.original['policy_target_group_id']):
raise exc.PolicyTargetGroupUpdateOfPolicyTargetNotSupported()
@log.log
def update_policy_target_postcommit(self, context):
pass
if context.current['cluster_id'] != context.original['cluster_id']:
self._update_cluster_membership(
context, new_cluster_id=context.current['cluster_id'],
old_cluster_id=context.original['cluster_id'])
@log.log
def delete_policy_target_precommit(self, context):
self._validate_pt_in_use_by_cluster(context)
context.fips = self._get_pt_floating_ip_mapping(
context._plugin_context.session,
context.current['id'])
@ -2514,3 +2522,55 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
def _unset_proxy_gateway_routes(self, context, pt):
self._update_proxy_gateway_routes(context, pt, unset=True)
def _validate_cluster_id(self, context):
# In RMD, cluster_id can only point to a preexisting PT.
if context.current['cluster_id']:
try:
pt = self._get_policy_target(
context._plugin_context, context.current['cluster_id'])
if pt['policy_target_group_id'] != context.current[
'policy_target_group_id']:
raise exc.InvalidClusterPtg()
except gp_ext.PolicyTargetNotFound:
raise exc.InvalidClusterId()
def _validate_pt_in_use_by_cluster(self, context):
# Useful for avoiding to delete a cluster master
pts = [x for x in self._get_policy_targets(
context._plugin_context.elevated(),
{'cluster_id': [context.current['id']]})
if x['id'] != context.current['id']]
if pts:
raise exc.PolicyTargetInUse()
def _update_cluster_membership(self, context, new_cluster_id=None,
old_cluster_id=None):
if ("allowed-address-pairs" in
self._core_plugin.supported_extension_aliases):
curr_port = self._get_port(
context._plugin_context, context.current['port_id'])
curr_pairs = curr_port['allowed_address_pairs']
if old_cluster_id:
# Remove allowed address
master_mac, master_ips = self._get_cluster_master_pairs(
context._plugin_context, old_cluster_id)
curr_pairs = [x for x in curr_port['allowed_address_pairs']
if not ((x['ip_address'] in master_ips) and
(x['mac_address'] == master_mac))]
if new_cluster_id:
master_mac, master_ips = self._get_cluster_master_pairs(
context._plugin_context, new_cluster_id)
curr_pairs += [
{'mac_address': master_mac,
'ip_address': x} for x in master_ips]
self._update_port(context._plugin_context, curr_port['id'],
{'allowed_address_pairs': curr_pairs})
def _get_cluster_master_pairs(self, plugin_context, cluster_id):
master_pt = self._get_policy_target(plugin_context, cluster_id)
master_port = self._get_port(plugin_context,
master_pt['port_id'])
master_mac = master_port['mac_address']
master_ips = [x['ip_address'] for x in master_port['fixed_ips']]
return master_mac, master_ips

View File

@ -136,7 +136,8 @@ class NodePlumberBase(object):
instance['id']),
'name': SERVICE_TARGET_NAME_PREFIX + '%s_%s_%s' % (
relationship, node['id'][:5], instance['id'][:5]),
'port_id': None}
'port_id': None,
'cluster_id': ''}
data.update(extra_data)
data.update(target)
pt = gbp_plugin.create_policy_target(context.elevated(),

View File

@ -25,13 +25,15 @@ def gbp_attributes(func):
@gbp_attributes
def get_create_policy_target_default_attrs():
return {'name': '', 'description': '', 'policy_target_group_id': None}
return {'name': '', 'description': '', 'policy_target_group_id': None,
'cluster_id': ''}
@gbp_attributes
def get_create_policy_target_attrs():
return {'name': 'ep1', 'policy_target_group_id': _uuid(),
'tenant_id': _uuid(), 'description': 'test policy_target'}
'tenant_id': _uuid(), 'description': 'test policy_target',
'cluster_id': 'some_cluster_id'}
@gbp_attributes

View File

@ -421,7 +421,105 @@ class ResourceMappingTestCase(test_plugin.GroupPolicyPluginTestCase):
return scs_id
class TestPolicyTarget(ResourceMappingTestCase):
class TestClusterIdMixin(object):
def test_cluster_invalid_id(self):
ptg_id = self.create_policy_target_group()['policy_target_group']['id']
res = self.create_policy_target(policy_target_group_id=ptg_id,
cluster_id='SomeInvalidCluster',
expected_res_status=400)
self.assertEqual('InvalidClusterId',
res['NeutronError']['type'])
def test_invalid_cluster_head_deletion(self):
ptg_id = self.create_policy_target_group()['policy_target_group']['id']
master = self.create_policy_target(
policy_target_group_id=ptg_id)['policy_target']
self.create_policy_target(
policy_target_group_id=ptg_id, cluster_id=master['id'],
expected_res_status=201)
res = self.delete_policy_target(master['id'],
expected_res_status=400)
self.assertEqual('PolicyTargetInUse',
res['NeutronError']['type'])
def test_cluster_invalid_ptg(self):
ptg_1 = self.create_policy_target_group()['policy_target_group']['id']
ptg_2 = self.create_policy_target_group()['policy_target_group']['id']
master = self.create_policy_target(
policy_target_group_id=ptg_1)['policy_target']
# Cluster member belonging to a different PTG.
res = self.create_policy_target(
policy_target_group_id=ptg_2, cluster_id=master['id'],
expected_res_status=400)
self.assertEqual('InvalidClusterPtg',
res['NeutronError']['type'])
def test_cluster_self_deletion(self):
ptg_id = self.create_policy_target_group()['policy_target_group']['id']
master = self.create_policy_target(
policy_target_group_id=ptg_id)['policy_target']
member = self.create_policy_target(
policy_target_group_id=ptg_id, cluster_id=master['id'],
expected_res_status=201)['policy_target']
# Use self id as cluster ID
self.update_policy_target(master['id'], cluster_id=master['id'])
self.delete_policy_target(master['id'], expected_res_status=400)
self.delete_policy_target(member['id'], expected_res_status=204)
# Deletion doesn't fail now that master is the only cluster member
self.delete_policy_target(master['id'], expected_res_status=204)
def test_cluster_id_create(self):
ptg_id = self.create_policy_target_group()['policy_target_group']['id']
master = self.create_policy_target(
policy_target_group_id=ptg_id)['policy_target']
member = self.create_policy_target(
policy_target_group_id=ptg_id, cluster_id=master['id'],
expected_res_status=201)['policy_target']
master_port = self._get_object('ports', master['port_id'],
self.api)['port']
member_port = self._get_object('ports', member['port_id'],
self.api)['port']
self.assertEqual(
[{'mac_address': master_port['mac_address'],
'ip_address': master_port['fixed_ips'][0]['ip_address']}],
member_port['allowed_address_pairs'])
def test_cluster_id_update_add(self):
ptg_id = self.create_policy_target_group()['policy_target_group']['id']
master = self.create_policy_target(
policy_target_group_id=ptg_id)['policy_target']
member = self.create_policy_target(
policy_target_group_id=ptg_id,
expected_res_status=201)['policy_target']
self.update_policy_target(member['id'], cluster_id=master['id'])
master_port = self._get_object('ports', master['port_id'],
self.api)['port']
member_port = self._get_object('ports', member['port_id'],
self.api)['port']
self.assertEqual(
[{'mac_address': master_port['mac_address'],
'ip_address': master_port['fixed_ips'][0]['ip_address']}],
member_port['allowed_address_pairs'])
def test_cluster_id_update_remove(self):
ptg_id = self.create_policy_target_group()['policy_target_group']['id']
master = self.create_policy_target(
policy_target_group_id=ptg_id)['policy_target']
member = self.create_policy_target(
policy_target_group_id=ptg_id, cluster_id=master['id'],
expected_res_status=201)['policy_target']
self.update_policy_target(member['id'], cluster_id='')
member_port = self._get_object('ports', member['port_id'],
self.api)['port']
self.assertEqual([], member_port['allowed_address_pairs'])
class TestPolicyTarget(ResourceMappingTestCase, TestClusterIdMixin):
def test_implicit_port_lifecycle(self, proxy_ip_pool=None):
# Create policy_target group.