node composition plugin implementation

Partially implements blueprint node-centric-chain-plugin

Change-Id: I16d56ebb7bff0c621c68106039b4217cfac77755
This commit is contained in:
Ivar Lazzaro 2015-05-06 20:02:43 -07:00
parent 657306d199
commit 7f90fa8c90
20 changed files with 1099 additions and 74 deletions

View File

View File

@ -0,0 +1,33 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_utils import importutils
from stevedore import driver
LOG = logging.getLogger(__name__)
def load_plugin(namespace, plugin):
try:
# Try to resolve plugin by name
mgr = driver.DriverManager(namespace, plugin)
plugin_class = mgr.driver
except RuntimeError as e1:
# fallback to class name
try:
plugin_class = importutils.import_class(plugin)
except ImportError as e2:
LOG.exception(_("Error loading plugin by name, %s"), e1)
LOG.exception(_("Error loading plugin by class, %s"), e2)
raise ImportError(_("Plugin not found."))
return plugin_class()

View File

@ -1 +1 @@
9744740aa75c
d08627f64e37

View File

@ -0,0 +1,58 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""ncp_implementation
"""
# revision identifiers, used by Alembic.
revision = 'd08627f64e37'
down_revision = '9744740aa75c'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'ncp_node_to_driver_mapping',
sa.Column('servicechain_node_id', sa.String(length=36),
nullable=False),
sa.Column('driver_name', sa.String(length=36)),
sa.Column('servicechain_instance_id', sa.String(length=36)),
sa.PrimaryKeyConstraint(
'servicechain_node_id', 'servicechain_instance_id'),
sa.ForeignKeyConstraint(['servicechain_node_id'],
['sc_nodes.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['servicechain_instance_id'],
['sc_instances.id'], ondelete='CASCADE')
)
op.create_table(
'ncp_service_targets',
sa.Column('servicechain_node_id', sa.String(length=36),
nullable=False),
sa.Column('servicechain_instance_id', sa.String(length=36)),
sa.Column('policy_target_id', sa.String(length=36)),
sa.Column('relationship', sa.String(length=25)),
sa.Column('position', sa.Integer),
sa.PrimaryKeyConstraint(
'servicechain_node_id', 'servicechain_instance_id',
'policy_target_id'),
sa.ForeignKeyConstraint(['policy_target_id'],
['gp_policy_targets.id'], ondelete='CASCADE')
)
def downgrade():
pass

View File

@ -205,6 +205,8 @@ class ServiceChainDbPlugin(schain.ServiceChainPluginBase,
'config_param_names': spec.get('config_param_names'),
'shared': spec['shared']}
res['nodes'] = [sc_node['node_id'] for sc_node in spec['nodes']]
res['instances'] = [x['servicechain_instance_id'] for x in
spec['instances']]
return self._fields(res, fields)
def _make_sc_instance_dict(self, instance, fields=None):
@ -296,13 +298,16 @@ class ServiceChainDbPlugin(schain.ServiceChainPluginBase,
return self._get_collection_count(context, ServiceChainNode,
filters=filters)
def _process_nodes_for_spec(self, context, spec_db, spec):
def _process_nodes_for_spec(self, context, spec_db, spec,
set_params=True):
if 'nodes' in spec:
self._set_nodes_for_spec(context, spec_db, spec['nodes'])
self._set_nodes_for_spec(context, spec_db, spec['nodes'],
set_params=set_params)
del spec['nodes']
return spec
def _set_nodes_for_spec(self, context, spec_db, nodes_id_list):
def _set_nodes_for_spec(self, context, spec_db, nodes_id_list,
set_params=True):
if not nodes_id_list:
spec_db.nodes = []
@ -324,18 +329,21 @@ class ServiceChainDbPlugin(schain.ServiceChainPluginBase,
# it as clearing existing nodes.
spec_db.nodes = []
for node_id in nodes_id_list:
if set_params:
sc_node = self.get_servicechain_node(context, node_id)
node_dict = jsonutils.loads(sc_node['config'])
config_params = (node_dict.get('parameters') or
node_dict.get('Parameters'))
if config_params:
if not spec_db.config_param_names:
spec_db.config_param_names = str(config_params.keys())
spec_db.config_param_names = str(
config_params.keys())
else:
config_param_names = ast.literal_eval(
spec_db.config_param_names)
config_param_names.extend(config_params.keys())
spec_db.config_param_names = str(config_param_names)
spec_db.config_param_names = str(
config_param_names)
assoc = SpecNodeAssociation(servicechain_spec_id=spec_db.id,
node_id=node_id)
@ -371,7 +379,8 @@ class ServiceChainDbPlugin(schain.ServiceChainPluginBase,
instance_db.specs.append(assoc)
@log.log
def create_servicechain_spec(self, context, servicechain_spec):
def create_servicechain_spec(self, context, servicechain_spec,
set_params=True):
spec = servicechain_spec['servicechain_spec']
tenant_id = self._get_tenant_id_for_create(context, spec)
with context.session.begin(subtransactions=True):
@ -380,18 +389,20 @@ class ServiceChainDbPlugin(schain.ServiceChainPluginBase,
name=spec['name'],
description=spec['description'],
shared=spec['shared'])
self._process_nodes_for_spec(context, spec_db, spec)
self._process_nodes_for_spec(context, spec_db, spec,
set_params=set_params)
context.session.add(spec_db)
return self._make_sc_spec_dict(spec_db)
@log.log
def update_servicechain_spec(self, context, spec_id,
servicechain_spec):
servicechain_spec, set_params=True):
spec = servicechain_spec['servicechain_spec']
with context.session.begin(subtransactions=True):
spec_db = self._get_servicechain_spec(context,
spec_id)
spec = self._process_nodes_for_spec(context, spec_db, spec)
spec = self._process_nodes_for_spec(context, spec_db, spec,
set_params=set_params)
spec_db.update(spec)
return self._make_sc_spec_dict(spec_db)

View File

@ -19,6 +19,12 @@ service_chain_opts = [
help=_("An ordered list of service chain node drivers "
"entrypoints to be loaded from the "
"gbpservice.neutron.servicechain.ncp_drivers "
"namespace.")),
cfg.StrOpt('node_plumber',
default='dummy_plumber',
help=_("The plumber used by the Node Composition Plugin "
"for service plumbing. Entrypoint loaded from the "
"gbpservice.neutron.servicechain.ncp_plumbers "
"namespace."))
]

View File

@ -15,6 +15,7 @@ from neutron import manager
from neutron.plugins.common import constants as pconst
from gbpservice.neutron.extensions import group_policy
from gbpservice.neutron.services.servicechain.plugins.ncp import model
def get_gbp_plugin():
@ -26,13 +27,19 @@ def get_node_driver_context(sc_plugin, context, sc_instance,
management_group=None, service_targets=None):
specs = sc_plugin.get_servicechain_specs(
context, filters={'id': sc_instance['servicechain_specs']})
provider = _ptg_or_ep(context, sc_instance['provider_ptg_id'])
consumer = _ptg_or_ep(context, sc_instance['consumer_ptg_id'])
position = _calculate_node_position(specs, current_node['id'])
provider = _get_ptg_or_ep(context, sc_instance['provider_ptg_id'])
consumer = _get_ptg_or_ep(context, sc_instance['consumer_ptg_id'])
current_profile = sc_plugin.get_service_profile(
context, current_node['service_profile_id'])
original_profile = sc_plugin.get_service_profile(
context,
original_node['service_profile_id']) if original_node else None
if not service_targets:
service_targets = model.get_service_targets(
context.session, servicechain_instance_id=sc_instance['id'],
position=position, servicechain_node_id=current_node['id'])
return NodeDriverContext(sc_plugin=sc_plugin,
context=context,
service_chain_instance=sc_instance,
@ -44,10 +51,11 @@ def get_node_driver_context(sc_plugin, context, sc_instance,
management_group=management_group,
original_service_chain_node=original_node,
original_service_profile=original_profile,
service_targets=service_targets)
service_targets=service_targets,
position=position)
def _ptg_or_ep(context, group_id):
def _get_ptg_or_ep(context, group_id):
group = None
if group_id:
try:
@ -59,11 +67,20 @@ def _ptg_or_ep(context, group_id):
return group
def _calculate_node_position(specs, node_id):
for spec in specs:
pos = 0
for node in spec['nodes']:
pos += 1
if node_id == node:
return pos
class NodeDriverContext(object):
""" Context passed down to NCC Node Drivers."""
"""Context passed down to NCP Node Drivers."""
def __init__(self, sc_plugin, context, service_chain_instance,
service_chain_specs, current_service_chain_node,
service_chain_specs, current_service_chain_node, position,
current_service_profile, provider_group, consumer_group=None,
management_group=None, original_service_chain_node=None,
original_service_profile=None, service_targets=None):
@ -85,6 +102,7 @@ class NodeDriverContext(object):
self._core_plugin = manager.NeutronManager.get_plugin()
self._l3_plugin = manager.NeutronManager.get_service_plugins().get(
pconst.L3_ROUTER_NAT)
self._position = position
@property
def gbp_plugin(self):
@ -136,6 +154,10 @@ class NodeDriverContext(object):
def current_profile(self):
return self._current_service_profile
@property
def current_position(self):
return self._position
@property
def original_node(self):
return self._original_service_chain_node
@ -152,18 +174,6 @@ class NodeDriverContext(object):
self.current_node['id'] in x['nodes']]
return self._relevant_specs
@property
def service_targets(self):
""" Returns the service targets assigned for this service if any.
The result looks like the following:
{
"provider": [pt_uuids],
"consumer": [pt_uuids],
"management": [pt_uuids],
}
"""
return self._service_targets
@property
def provider(self):
return self._provider_group
@ -175,3 +185,19 @@ class NodeDriverContext(object):
@property
def management(self):
return self._management_group
def get_service_targets(self, update=False):
""" Returns the service targets assigned for this service if any.
The result looks like the following:
{
"provider": [pt_uuids],
"consumer": [pt_uuids],
"management": [pt_uuids],
}
"""
if update:
self._service_targets = model.get_service_targets(
self.session, servicechain_instance_id=self.instance['id'],
position=self.current_position,
servicechain_node_id=self.current_node['id'])
return self._service_targets

View File

@ -28,12 +28,13 @@ class NodeDriverBase(object):
"""
@abc.abstractmethod
def initialize(self):
def initialize(self, name):
"""Perform driver initialization.
Called after all drivers have been loaded and the database has
been initialized. No abstract methods defined below will be
called prior to this method being called.
called prior to this method being called. Name is a unique attribute
that identifies the driver.
"""
pass
@ -143,3 +144,7 @@ class NodeDriverBase(object):
and the specific node to be processed by this driver.
"""
pass
@abc.abstractproperty
def name(self):
pass

View File

@ -0,0 +1,48 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Exceptions used by NodeCompositionPlugin and drivers."""
from neutron.common import exceptions
class NodeDriverError(exceptions.NeutronException):
"""Node driver call failed."""
message = _("%(method)s failed.")
class NodeCompositionPluginException(exceptions.NeutronException):
"""Base for node driver exceptions returned to user."""
pass
class NodeCompositionPluginBadRequest(exceptions.BadRequest,
NodeCompositionPluginException):
"""Base for node driver bad request exceptions returned to user."""
pass
class OneSpecPerInstanceAllowed(NodeCompositionPluginBadRequest):
message = _("The Node Composition Plugin only supports one Servicechain"
"Spec per Servicechain Instance.")
class NoDriverAvailableForAction(NodeCompositionPluginBadRequest):
message = _("The Node Composition Plugin can't find any Node Driver "
"available for executing %(action)s on node %(node_id)s. "
"This may be caused by a Servicechain Node misconfiguration "
"or an unsupported Service Profile.")
class ServiceProfileInUseByAnInstance(NodeCompositionPluginBadRequest):
message = _("Cannot update Service Profile %(profile_id)s because it's "
"used by servicechain instance %(instance_id)s.")

View File

@ -0,0 +1,121 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import model_base
from oslo_log import log as logging
import sqlalchemy as sa
from gbpservice.neutron.db.grouppolicy import group_policy_db as gp_db
LOG = logging.getLogger(__name__)
PROVIDER = 'provider'
CONSUMER = 'consumer'
MANAGEMENT = 'management'
RELATIONSHIPS = [PROVIDER, CONSUMER, MANAGEMENT]
class NodeToDriverMapping(model_base.BASEV2):
"""Node to Driver mapping DB.
This table keeps track of the driver owning a specific SC Node based on
the SC instance
"""
__tablename__ = 'ncp_node_to_driver_mapping'
servicechain_node_id = sa.Column(sa.String(36),
sa.ForeignKey('sc_nodes.id',
ondelete='CASCADE'),
nullable=False, primary_key=True)
# Based on the extension name
driver_name = sa.Column(sa.String(36), nullable=False)
servicechain_instance_id = sa.Column(sa.String(36),
sa.ForeignKey('sc_instances.id',
ondelete='CASCADE'),
primary_key=True)
class ServiceTarget(model_base.BASEV2):
"""Service related policy targets.
Internal information regarding the policy targets owned by services.
"""
__tablename__ = 'ncp_service_targets'
policy_target_id = sa.Column(sa.String(36),
sa.ForeignKey(gp_db.PolicyTarget.id,
ondelete='CASCADE'),
nullable=False, primary_key=True)
# Not a FK to avoid constraint error on SCI delete
# keeping the DB entry is useful to identify uncleaned PTs
servicechain_instance_id = sa.Column(sa.String(36),
nullable=False, primary_key=True)
# Not a FK to avoid constraint error on SCN delete.
# keeping the DB entry is useful to identify uncleaned PTs
servicechain_node_id = sa.Column(sa.String(36),
nullable=False, primary_key=True)
# Defines on which "side" of the chain the PT is placed. typically
# its values can be "provider", "consumer" or "management"
relationship = sa.Column(sa.String(25), nullable=False)
position = sa.Column(sa.Integer)
def set_node_ownership(context, driver_name):
session = context.session
with session.begin(subtransactions=True):
owner = NodeToDriverMapping(
servicechain_instance_id=context.instance['id'],
servicechain_node_id=context.current_node['id'],
driver_name=driver_name)
session.add(owner)
def get_node_owner(context):
session = context.session
with session.begin(subtransactions=True):
query = session.query(NodeToDriverMapping)
query = query.filter_by(
servicechain_instance_id=context.instance['id'])
query = query.filter_by(
servicechain_node_id=context.current_node['id'])
return query.all()
def set_service_target(context, policy_target_id, relationship):
session = context.session
with session.begin(subtransactions=True):
owner = ServiceTarget(
policy_target_id=policy_target_id,
servicechain_instance_id=context.instance['id'],
servicechain_node_id=context.current_node['id'],
position=context.current_position,
relationship=relationship)
session.add(owner)
def get_service_targets(session, policy_target_id=None, relationship=None,
servicechain_instance_id=None, position=None,
servicechain_node_id=None):
with session.begin(subtransactions=True):
query = session.query(ServiceTarget)
if servicechain_instance_id:
query = query.filter_by(
servicechain_instance_id=servicechain_instance_id)
if servicechain_node_id:
query = query.filter_by(
servicechain_node_id=servicechain_node_id)
if policy_target_id:
query = query.filter_by(policy_target_id=policy_target_id)
if position:
query = query.filter_by(position=position)
if relationship:
query = query.filter_by(relationship=relationship)
return query.all()

View File

@ -10,11 +10,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import exceptions as n_exc
from oslo_config import cfg
from oslo_log import log as logging
import stevedore
from gbpservice.neutron.services.servicechain.plugins.ncp import config # noqa
from gbpservice.neutron.services.servicechain.plugins.ncp import model
LOG = logging.getLogger(__name__)
@ -52,6 +54,45 @@ class NodeDriverManager(stevedore.named.NamedExtensionManager):
for driver in self.ordered_drivers:
LOG.info(_("Initializing service chain node drivers '%s'"),
driver.name)
driver.obj.initialize()
driver.obj.initialize(driver.name)
self.native_bulk_support &= getattr(driver.obj,
'native_bulk_support', True)
def schedule_deploy(self, context):
"""Schedule Node Driver for Node creation.
Given a NodeContext, this method returns the driver capable of creating
the specific node.
"""
for driver in self.ordered_drivers:
try:
driver.obj.validate_create(context)
model.set_node_ownership(context, driver.obj.name)
return driver.obj
except n_exc.NeutronException as e:
LOG.warn(e.message)
def schedule_destroy(self, context):
"""Schedule Node Driver for Node disruption.
Given a NodeContext, this method returns the driver capable of
destroying the specific node.
"""
return self._get_owning_driver(context)
def schedule_update(self, context):
"""Schedule Node Driver for Node Update.
Given a NodeContext, this method returns the driver capable of updating
the specific node.
"""
driver = self._get_owning_driver(context)
if driver:
driver.validate_update(context)
return driver
def _get_owning_driver(self, context):
owner = model.get_node_owner(context)
if owner:
driver = self.drivers.get(owner[0].driver_name)
return driver.obj if driver else None

View File

@ -20,8 +20,9 @@ class NoopNodeDriver(driver_base.NodeDriverBase):
initialized = False
@log.log
def initialize(self):
def initialize(self, name):
self.initialized = True
self._name = name
@log.log
def get_plumbing_info(self, context):
@ -46,3 +47,7 @@ class NoopNodeDriver(driver_base.NodeDriverBase):
@log.log
def update(self, context):
pass
@property
def name(self):
return self._name

View File

@ -0,0 +1,32 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import log
from gbpservice.neutron.services.servicechain.plugins.ncp import plumber_base
class NoopPlumber(plumber_base.NodePlumberBase):
initialized = False
@log.log
def initialize(self):
self.initialized = True
@log.log
def plug_services(self, context, deployment):
self._sort_deployment(deployment)
@log.log
def unplug_services(self, context, deployment):
self._sort_deployment(deployment)

View File

@ -10,16 +10,28 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import log
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from gbpservice.common import utils
from gbpservice.neutron.db import servicechain_db
from gbpservice.neutron.services.servicechain.plugins.ncp import (
context as ctx)
from gbpservice.neutron.services.servicechain.plugins.ncp import (
exceptions as exc)
from gbpservice.neutron.services.servicechain.plugins.ncp import (
node_driver_manager as manager)
from gbpservice.neutron.services.servicechain.plugins import sharing
LOG = logging.getLogger(__name__)
PLUMBER_NAMESPACE = 'gbpservice.neutron.servicechain.ncp_plumbers'
class NodeCompositionPlugin(servicechain_db.ServiceChainDbPlugin):
class NodeCompositionPlugin(servicechain_db.ServiceChainDbPlugin,
sharing.SharingMixin):
"""Implementation of the Service Chain Plugin.
@ -30,3 +42,265 @@ class NodeCompositionPlugin(servicechain_db.ServiceChainDbPlugin):
self.driver_manager = manager.NodeDriverManager()
super(NodeCompositionPlugin, self).__init__()
self.driver_manager.initialize()
self.plumber = utils.load_plugin(
PLUMBER_NAMESPACE, cfg.CONF.node_composition_plugin.node_plumber)
self.plumber.initialize()
@log.log
def create_servicechain_instance(self, context, servicechain_instance):
"""Instance created.
When a Servicechain Instance is created, all its nodes need to be
instantiated.
"""
session = context.session
deployers = {}
with session.begin(subtransactions=True):
instance = super(NodeCompositionPlugin,
self).create_servicechain_instance(
context, servicechain_instance)
if len(instance['servicechain_specs']) > 1:
raise exc.OneSpecPerInstanceAllowed()
deployers = self._get_scheduled_drivers(context, instance,
'deploy')
# Actual node deploy
try:
self._deploy_servicechain_nodes(context, deployers)
except Exception:
# Some node could not be deployed
with excutils.save_and_reraise_exception():
LOG.error(_("Node deployment failed, "
"deleting servicechain_instance %s"),
instance['id'])
self.delete_servicechain_instance(context, instance['id'])
return instance
@log.log
def update_servicechain_instance(self, context, servicechain_instance_id,
servicechain_instance):
"""Instance updated.
When a Servicechain Instance is updated and the spec changed, all the
nodes of the previous spec should be destroyed and the newer ones
created.
"""
session = context.session
deployers = {}
destroyers = {}
with session.begin(subtransactions=True):
original_instance = self.get_servicechain_instance(
context, servicechain_instance_id)
updated_instance = super(
NodeCompositionPlugin, self).update_servicechain_instance(
context, servicechain_instance_id, servicechain_instance)
if (original_instance['servicechain_specs'] !=
updated_instance['servicechain_specs']):
if len(updated_instance['servicechain_specs']) > 1:
raise exc.OneSpecPerInstanceAllowed()
destroyers = self._get_scheduled_drivers(
context, original_instance, 'destroy')
deployers = self._get_scheduled_drivers(
context, updated_instance, 'deploy')
self._destroy_servicechain_nodes(context, destroyers)
self._deploy_servicechain_nodes(context, deployers)
return updated_instance
@log.log
def delete_servicechain_instance(self, context, servicechain_instance_id):
"""Instance deleted.
When a Servicechain Instance is deleted, all its nodes need to be
destroyed.
"""
session = context.session
with session.begin(subtransactions=True):
instance = self.get_servicechain_instance(context,
servicechain_instance_id)
destroyers = self._get_scheduled_drivers(context, instance,
'destroy')
self._destroy_servicechain_nodes(context, destroyers)
with session.begin(subtransactions=True):
super(NodeCompositionPlugin, self).delete_servicechain_instance(
context, servicechain_instance_id)
@log.log
def create_servicechain_node(self, context, servicechain_node):
session = context.session
with session.begin(subtransactions=True):
result = super(NodeCompositionPlugin,
self).create_servicechain_node(context,
servicechain_node)
self._validate_shared_create(context, result, 'servicechain_node')
return result
@log.log
def update_servicechain_node(self, context, servicechain_node_id,
servicechain_node):
"""Node Update.
When a Servicechain Node is updated, all the corresponding instances
need to be updated as well. This usually results in a node
reconfiguration.
"""
session = context.session
updaters = {}
with session.begin(subtransactions=True):
original_sc_node = self.get_servicechain_node(
context, servicechain_node_id)
updated_sc_node = super(NodeCompositionPlugin,
self).update_servicechain_node(
context, servicechain_node_id,
servicechain_node)
self._validate_shared_update(context, original_sc_node,
updated_sc_node, 'servicechain_node')
instances = self._get_node_instances(context, updated_sc_node)
for instance in instances:
node_context = ctx.get_node_driver_context(
self, context, instance, updated_sc_node, original_sc_node)
# TODO(ivar): Validate that the node driver understands the
# update.
driver = self.driver_manager.schedule_update(node_context)
if not driver:
raise exc.NoDriverAvailableForAction(
action='update', node_id=original_sc_node['id'])
updaters[instance['id']] = {}
updaters[instance['id']]['context'] = node_context
updaters[instance['id']]['driver'] = driver
updaters[instance['id']]['plumbing_info'] = (
driver.get_plumbing_info(node_context))
# Update the nodes
for update in updaters.values():
try:
update['driver'].update(update['context'])
except exc.NodeDriverError as ex:
LOG.error(_("Node Update failed, %s"),
ex.message)
return updated_sc_node
@log.log
def create_servicechain_spec(self, context, servicechain_spec):
session = context.session
with session.begin(subtransactions=True):
result = super(
NodeCompositionPlugin, self).create_servicechain_spec(
context, servicechain_spec, set_params=False)
self._validate_shared_create(context, result, 'servicechain_spec')
return result
@log.log
def update_servicechain_spec(self, context, servicechain_spec_id,
servicechain_spec):
session = context.session
with session.begin(subtransactions=True):
original_sc_spec = self.get_servicechain_spec(
context, servicechain_spec_id)
updated_sc_spec = super(NodeCompositionPlugin,
self).update_servicechain_spec(
context, servicechain_spec_id,
servicechain_spec, set_params=False)
self._validate_shared_update(context, original_sc_spec,
updated_sc_spec, 'servicechain_spec')
return updated_sc_spec
@log.log
def create_service_profile(self, context, service_profile):
session = context.session
with session.begin(subtransactions=True):
result = super(
NodeCompositionPlugin, self).create_service_profile(
context, service_profile)
self._validate_shared_create(context, result, 'service_profile')
return result
@log.log
def update_service_profile(self, context, service_profile_id,
service_profile):
session = context.session
with session.begin(subtransactions=True):
original_profile = self.get_service_profile(
context, service_profile_id)
updated_profile = super(NodeCompositionPlugin,
self).update_service_profile(
context, service_profile_id,
service_profile)
self._validate_profile_update(context, original_profile,
updated_profile)
return updated_profile
def _get_instance_nodes(self, context, instance):
if not instance['servicechain_specs']:
return []
specs = self.get_servicechain_spec(
context, instance['servicechain_specs'][0])
return self.get_servicechain_nodes(context, {'id': specs['nodes']})
def _get_node_instances(self, context, node):
specs = self.get_servicechain_specs(
context, {'id': node['servicechain_specs']})
result = []
for spec in specs:
result.extend(self.get_servicechain_instances(
context, {'id': spec['instances']}))
return result
def _get_scheduled_drivers(self, context, instance, action):
nodes = self._get_instance_nodes(context, instance)
result = {}
func = getattr(self.driver_manager, 'schedule_' + action)
for node in nodes:
node_context = ctx.get_node_driver_context(
self, context, instance, node)
driver = func(node_context)
if not driver:
raise exc.NoDriverAvailableForAction(action=action,
node_id=node['id'])
result[node['id']] = {}
result[node['id']]['driver'] = driver
result[node['id']]['context'] = node_context
result[node['id']]['plumbing_info'] = driver.get_plumbing_info(
node_context)
return result
def _deploy_servicechain_nodes(self, context, deployers):
self.plumber.plug_services(context, deployers.values())
for deploy in deployers.values():
driver = deploy['driver']
driver.create(deploy['context'])
def _destroy_servicechain_nodes(self, context, destroyers):
# Actual node disruption
try:
for destroy in destroyers.values():
driver = destroy['driver']
try:
driver.delete(destroy['context'])
except exc.NodeDriverError:
LOG.error(_("Node destroy failed, for node %s "),
driver['context'].current_node['id'])
except Exception as e:
LOG.exception(e)
finally:
self.plumber.unplug_services(context, destroyers.values())
pass
def _validate_profile_update(self, context, original, updated):
# Raise if the profile is in use by any instance
# Ugly one shot query to verify whether the profile is in use
db = servicechain_db
query = context.session.query(db.ServiceChainInstance)
query = query.join(db.InstanceSpecAssociation)
query = query.join(db.ServiceChainSpec)
query = query.join(db.SpecNodeAssociation)
query = query.join(db.ServiceChainNode)
instance = query.filter(
db.ServiceChainNode.service_profile_id == original['id']).first()
if instance:
raise exc.ServiceProfileInUseByAnInstance(
profile_id=original['id'], instance_id=instance.id)
self._validate_shared_update(context, original, updated,
'service_profile')

View File

@ -0,0 +1,78 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class NodePlumberBase(object):
"""Node Plumber base Class
The node plumber is an entity which takes care of plumbing nodes in a
chain. By node plumbing is intended the creation/disruption of the
appropriate Neutron and GBP constructs (typically Ports and Policy Targets)
based on the specific Node needs, taking into account the whole service
chain in the process. Ideally, this module will ensure that the traffic
flows as expected according to the user intent.
"""
@abc.abstractmethod
def initialize(self):
"""Perform plumber initialization.
No abstract methods defined below will be called prior to this method
being called.
"""
@abc.abstractmethod
def plug_services(self, context, deployment):
"""Plug services
Given a deployment, this method is expected to create all the needed
policy targets / neutron ports placed where needed based on the whole
chain configuration.
The expectation is that this method ultimately creates ServiceTarget
object that will be retrieved by the node drivers at the right time.
A deployment is a list composed as follows:
[{'context': node_context,
'driver': deploying_driver,
'plumbing_info': node_plumbing_needs},
...]
No assumptions should be made on the order of the nodes as received in
the deployment, but it can be retrieved by calling
node_context.current_position
"""
@abc.abstractmethod
def unplug_services(self, context, deployment):
"""Plug services
Given a deployment, this method is expected to destroy all the
policy targets / neutron ports previously created for this chain
configuration.
The expectation is that this method ultimately removes all the
ServiceTarget related to this particular chain.
A deployment is a list composed as follows:
[{'context': node_context,
'driver': deploying_driver,
'plumbing_info': node_plumbing_needs},
...]
No assumptions should be made on the order of the nodes as received in
the deployment, but it can be retrieved by calling
node_context.current_position
"""
def _sort_deployment(self, deployment):
deployment.sort(key=lambda x: x['context'].current_position)

View File

@ -140,7 +140,9 @@ class ServiceChainDbTestCase(ServiceChainDBTestBase,
if not sc_plugin:
sc_plugin = DB_GP_PLUGIN_KLASS
if not service_plugins:
service_plugins = {'gp_plugin_name': gp_plugin or GP_PLUGIN_KLASS,
service_plugins = {
'l3_plugin_name': 'router',
'gp_plugin_name': gp_plugin or GP_PLUGIN_KLASS,
'sc_plugin_name': sc_plugin}
super(ServiceChainDbTestCase, self).setUp(

View File

@ -11,13 +11,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from neutron.common import config # noqa
from neutron.common import exceptions as n_exc
from neutron import context as n_context
from neutron.db import api as db_api
from neutron.db import model_base
from neutron import manager
from neutron.plugins.common import constants as pconst
from oslo_config import cfg
from oslo_serialization import jsonutils
from gbpservice.neutron.db.grouppolicy import group_policy_mapping_db # noqa
from gbpservice.neutron.services.grouppolicy import config as gpconfig # noqa
from gbpservice.neutron.services.servicechain.plugins.ncp import (
context as ncp_context)
from gbpservice.neutron.services.servicechain.plugins.ncp import (
exceptions as exc)
import gbpservice.neutron.services.servicechain.plugins.ncp.config # noqa
from gbpservice.neutron.services.servicechain.plugins.ncp.node_drivers import (
dummy_driver as dummy_driver)
@ -40,21 +50,72 @@ class NodeCompositionPluginTestCase(
def setUp(self, core_plugin=None, gp_plugin=None, node_drivers=None):
if node_drivers:
cfg.CONF.set_override('node_drivers', node_drivers,
group='node_composition_chain')
group='node_composition_plugin')
config.cfg.CONF.set_override('policy_drivers',
['implicit_policy', 'resource_mapping'],
group='group_policy')
super(NodeCompositionPluginTestCase, self).setUp(
core_plugin=core_plugin or CORE_PLUGIN,
gp_plugin=gp_plugin or GP_PLUGIN_KLASS,
sc_plugin=SC_PLUGIN_KLASS)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
self.driver = self.sc_plugin.driver_manager.ordered_drivers[0].obj
def test_node_shared(self):
pass
@property
def sc_plugin(self):
plugins = manager.NeutronManager.get_service_plugins()
servicechain_plugin = plugins.get(pconst.SERVICECHAIN)
return servicechain_plugin
def test_profile_shared(self):
pass
def _create_redirect_rule(self, spec_id):
action = self.create_policy_action(action_type='REDIRECT',
action_value=spec_id)
classifier = self.create_policy_classifier(
port_range=80, protocol='tcp', direction='bi')
rule = self.create_policy_rule(
policy_actions=[action['policy_action']['id']],
policy_classifier_id=classifier['policy_classifier']['id'])
return rule
def test_spec_shared(self):
def _create_redirect_prs(self, spec_id):
rule = self._create_redirect_rule(spec_id)['policy_rule']
prs = self.create_policy_rule_set(policy_rules=[rule['id']])
return prs
def _create_simple_service_chain(self, number_of_nodes=1):
prof = self.create_service_profile(
service_type='LOADBALANCER')['service_profile']
node_ids = []
for x in xrange(number_of_nodes):
node_ids.append(self.create_servicechain_node(
service_profile_id=prof['id'],
expected_res_status=201)['servicechain_node']['id'])
return self._create_chain_with_nodes(node_ids)
def _create_chain_with_nodes(self, node_ids=None):
node_ids = node_ids or []
spec = self.create_servicechain_spec(
nodes=node_ids,
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
provider = self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})['policy_target_group']
consumer = self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})['policy_target_group']
return provider, consumer, prs
def _add_node_driver(self, name):
inst = dummy_driver.NoopNodeDriver()
inst.initialize(name)
ext = mock.Mock()
ext.obj = inst
self.sc_plugin.driver_manager.ordered_drivers.append(ext)
self.sc_plugin.driver_manager.drivers[name] = ext
def test_spec_ordering_list_servicechain_instances(self):
pass
def test_context_attributes(self):
@ -86,16 +147,15 @@ class NodeCompositionPluginTestCase(
self.assertIsNotNone(ctx.session)
self.assertIsNotNone(ctx.admin_context)
self.assertIsNotNone(ctx.admin_session)
self.assertEqual(ctx.instance, instance)
self.assertEqual(ctx.provider, provider)
self.assertEqual(ctx.consumer, consumer)
self.assertEqual(ctx.management, management)
self.assertEqual(ctx.management, management)
self.assertEqual(ctx.relevant_specs, [spec])
del ctx.current_profile['nodes']
self.assertEqual(ctx.current_profile, profile)
self.assertEqual(instance['id'], ctx.instance['id'])
self.assertEqual(provider['id'], ctx.provider['id'])
self.assertEqual(consumer['id'], ctx.consumer['id'])
self.assertEqual(management['id'], ctx.management['id'])
self.assertEqual([spec['id']], [x['id'] for x in ctx.relevant_specs])
self.assertIsNone(ctx.original_node)
self.assertIsNone(ctx.service_targets)
self.assertEqual(0, len(ctx.get_service_targets()))
def test_context_relevant_specs(self):
plugin_context = n_context.get_admin_context()
@ -104,24 +164,15 @@ class NodeCompositionPluginTestCase(
spec_used = self.create_servicechain_spec(
nodes=[node_used['id']])['servicechain_spec']
node_unused = self._create_profiled_servicechain_node(
service_type="TYPE", config='{}')['servicechain_node']
spec_unused = self.create_servicechain_spec(
nodes=[node_unused['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
instance = self.create_servicechain_instance(
provider_ptg_id=provider['id'],
servicechain_specs=[spec_used['id'],
spec_unused['id']])['servicechain_instance']
self.assertEqual(len(instance['servicechain_specs']), 2)
servicechain_specs=[spec_used['id']])['servicechain_instance']
ctx = ncp_context.get_node_driver_context(
self.plugin, plugin_context, instance, node_used)
self.assertEqual(ctx.relevant_specs, [spec_used])
class TestNcpNodeDriverManager(NodeCompositionPluginTestCase):
self.assertEqual([spec_used['id']],
[x['id'] for x in ctx.relevant_specs])
def test_manager_initialized(self):
mgr = self.plugin.driver_manager
@ -129,3 +180,187 @@ class TestNcpNodeDriverManager(NodeCompositionPluginTestCase):
dummy_driver.NoopNodeDriver)
for driver in mgr.ordered_drivers:
self.assertTrue(driver.obj.initialized)
def test_spec_parameters(self):
"""Test that config_param_names is empty when using NCP.
In NCP the config attribute of a node may be something different than
a HEAT template, therefore config_param_names is not used.
"""
params_node_1 = ['p1', 'p2', 'p3']
params_node_2 = ['p4', 'p5', 'p6']
def params_dict(params):
return jsonutils.dumps({'Parameters':
dict((x, {}) for x in params)})
prof = self.create_service_profile(
service_type='LOADBALANCER', shared=True,
tenant_id='admin')['service_profile']
# Create 2 nodes with different parameters
node1 = self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
config=params_dict(params_node_1),
expected_res_status=201)['servicechain_node']
node2 = self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
config=params_dict(params_node_2),
expected_res_status=201)['servicechain_node']
# Create SC spec with the nodes assigned
spec = self.create_servicechain_spec(
nodes=[node1['id'], node2['id']], shared=True,
expected_res_status=201)['servicechain_spec']
# Verify param names is empty
self.assertIsNone(spec['config_param_names'])
# Update the spec removing one node
self.update_servicechain_spec(spec['id'], nodes=[node1['id']],
expected_res_status=200)
spec = self.show_servicechain_spec(spec['id'])['servicechain_spec']
# Verify param names is empty
self.assertIsNone(spec['config_param_names'])
def test_create_service_chain(self):
deploy = self.driver.create = mock.Mock()
destroy = self.driver.delete = mock.Mock()
self._create_simple_service_chain(1)
self.assertEqual(1, deploy.call_count)
self.assertEqual(0, destroy.call_count)
deploy.reset_mock()
provider, _, _ = self._create_simple_service_chain(3)
self.assertEqual(3, deploy.call_count)
self.assertEqual(0, destroy.call_count)
self.update_policy_target_group(provider['id'],
provided_policy_rule_sets={})
self.assertEqual(3, deploy.call_count)
self.assertEqual(3, destroy.call_count)
def test_create_service_chain_fails(self):
deploy = self.driver.create = mock.Mock()
destroy = self.driver.delete = mock.Mock()
deploy.side_effect = Exception
try:
self._create_simple_service_chain(3)
except Exception:
pass
self.assertEqual(1, deploy.call_count)
self.assertEqual(3, destroy.call_count)
def test_update_node_fails(self):
validate_update = self.driver.validate_update = mock.Mock()
validate_update.side_effect = exc.NodeCompositionPluginBadRequest(
resource='node', msg='reason')
prof = self.create_service_profile(
service_type='LOADBALANCER')['service_profile']
node_id = self.create_servicechain_node(
service_profile_id=prof['id'],
expected_res_status=201)['servicechain_node']['id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
res = self.update_servicechain_node(node_id,
description='somethingelse',
expected_res_status=400)
self.assertEqual('NodeCompositionPluginBadRequest',
res['NeutronError']['type'])
def test_update_instantiated_profile_fails(self):
prof = self.create_service_profile(
service_type='LOADBALANCER')['service_profile']
node_id = self.create_servicechain_node(
service_profile_id=prof['id'],
expected_res_status=201)['servicechain_node']['id']
spec = self.create_servicechain_spec(
nodes=[node_id],
expected_res_status=201)['servicechain_spec']
prs = self._create_redirect_prs(spec['id'])['policy_rule_set']
self.create_policy_target_group(
provided_policy_rule_sets={prs['id']: ''})
self.create_policy_target_group(
consumed_policy_rule_sets={prs['id']: ''})
res = self.update_service_profile(prof['id'],
vendor='somethingelse',
expected_res_status=400)
self.assertEqual('ServiceProfileInUseByAnInstance',
res['NeutronError']['type'])
def test_second_driver_scheduled_if_first_fails(self):
self._add_node_driver('test')
drivers = [x.obj for x in
self.sc_plugin.driver_manager.ordered_drivers]
create_1 = drivers[0].validate_create = mock.Mock()
create_1.side_effect = n_exc.NeutronException()
# This happens without error
profile = self.create_service_profile(
service_type="TYPE")['service_profile']
node = self.create_servicechain_node(
service_profile_id=profile['id'], config='{}')['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
consumer = self.create_policy_target_group()['policy_target_group']
self.create_servicechain_instance(
provider_ptg_id=provider['id'], consumer_ptg_id=consumer['id'],
servicechain_specs=[spec['id']], expected_res_status=201)
def test_chain_fails_if_no_drivers_available(self):
self._add_node_driver('test')
drivers = [x.obj for x in
self.sc_plugin.driver_manager.ordered_drivers]
create_1 = drivers[0].validate_create = mock.Mock()
create_1.side_effect = n_exc.NeutronException()
create_2 = drivers[1].validate_create = mock.Mock()
create_2.side_effect = n_exc.NeutronException()
profile = self.create_service_profile(
service_type="TYPE")['service_profile']
node = self.create_servicechain_node(
service_profile_id=profile['id'], config='{}')['servicechain_node']
spec = self.create_servicechain_spec(
nodes=[node['id']])['servicechain_spec']
provider = self.create_policy_target_group()['policy_target_group']
consumer = self.create_policy_target_group()['policy_target_group']
self.create_servicechain_instance(
provider_ptg_id=provider['id'], consumer_ptg_id=consumer['id'],
servicechain_specs=[spec['id']], expected_res_status=400)
def test_multiple_nodes_update(self):
update = self.driver.update = mock.Mock()
prof = self.create_service_profile(
service_type='LOADBALANCER')['service_profile']
node = self.create_servicechain_node(
service_profile_id=prof['id'], config='{}')['servicechain_node']
self._create_chain_with_nodes([node['id']])
self.update_servicechain_node(node['id'], name='somethingelse')
self.assertEqual(1, update.call_count)
update.reset_mock()
self._create_chain_with_nodes([node['id']])
self._create_chain_with_nodes([node['id']])
self.update_servicechain_node(node['id'], name='somethingelse')
self.assertEqual(3, update.call_count)

View File

@ -11,8 +11,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import collections
from neutron import context as n_ctx
from oslo_config import cfg
from oslo_serialization import jsonutils
from gbpservice.neutron.services.servicechain.plugins.msc import context
from gbpservice.neutron.tests.unit.db.grouppolicy import (
@ -195,3 +199,47 @@ class TestGroupPolicyPluginGroupResources(
self.assertIsNone(ctx.original_profile)
self.assertEqual(ctx.current['id'], current['id'])
self.assertIsNone(ctx.current_profile)
def test_spec_parameters(self):
params_node_1 = ['p1', 'p2', 'p3']
params_node_2 = ['p4', 'p5', 'p6']
def params_dict(params):
return jsonutils.dumps({'Parameters':
dict((x, {}) for x in params)})
prof = self.create_service_profile(
service_type='LOADBALANCER', shared=True,
tenant_id='admin')['service_profile']
# Create 2 nodes with different parameters
node1 = self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
config=params_dict(params_node_1),
expected_res_status=201)['servicechain_node']
node2 = self.create_servicechain_node(
service_profile_id=prof['id'], shared=True,
config=params_dict(params_node_2),
expected_res_status=201)['servicechain_node']
# Create SC spec with the nodes assigned
spec = self.create_servicechain_spec(
nodes=[node1['id'], node2['id']], shared=True,
expected_res_status=201)['servicechain_spec']
# Verify param names correspondence
self.assertEqual(
collections.Counter(params_node_1 + params_node_2),
collections.Counter(ast.literal_eval(spec['config_param_names'])))
# REVISIT(ivar): update verification fails because of bug/1460186
# Update the spec removing one node
#self.update_servicechain_spec(spec['id'], nodes=[node1['id']],
# expected_res_status=200)
#spec = self.show_servicechain_spec(spec['id'])['servicechain_spec']
# Verify param names correspondence
#self.assertEqual(
# collections.Counter(params_node_1),
# collections.Counter(ast.literal_eval(spec['config_param_names'])))

View File

@ -59,6 +59,8 @@ gbpservice.neutron.servicechain.servicechain_drivers =
oneconvergence_servicechain_driver = gbpservice.neutron.services.servicechain.plugins.msc.drivers.oneconvergence_servicechain_driver:OneconvergenceServiceChainDriver
gbpservice.neutron.servicechain.ncp_drivers =
node_dummy = gbpservice.neutron.services.servicechain.plugins.ncp.node_drivers.dummy_driver:NoopNodeDriver
gbpservice.neutron.servicechain.ncp_plumbers =
dummy_plumber = gbpservice.neutron.services.servicechain.plugins.ncp.node_plumbers.dummy_plumber:NoopPlumber
[build_sphinx]
source-dir = doc/source