NFP - Proxy,ConfigOrch & Node driver Enhancements

Changeset includes,
a) Node driver update to support fw+vpn chain
b) Parallel delete of nodes of a chain
c) nfp_proxy to use oslo_config instead of argparser
d) vpn service fix to send provider subnet id

Change-Id: Ib8e8755e973c3a0ca21935659187fa9a8639c386
(cherry picked from commit 86df7d9ec5)
This commit is contained in:
Ashutosh Mishra
2016-08-24 18:39:36 +05:30
committed by Jagadish
parent 581af7e233
commit 8d73af3160
9 changed files with 497 additions and 135 deletions

View File

@@ -13,6 +13,7 @@
import eventlet
from eventlet import greenpool
import threading
from keystoneclient import exceptions as k_exceptions
from keystoneclient.v2_0 import client as keyclient
@@ -33,6 +34,8 @@ from sqlalchemy.orm.exc import NoResultFound
from gbpservice.common import utils
from gbpservice.neutron.services.servicechain.plugins.ncp import (
exceptions as exc)
from gbpservice.neutron.services.servicechain.plugins.ncp import (
model as ncp_model)
from gbpservice.neutron.services.servicechain.plugins.ncp import driver_base
from gbpservice.neutron.services.servicechain.plugins.ncp import plumber_base
from gbpservice.nfp.common import constants as nfp_constants
@@ -45,11 +48,11 @@ NFP_NODE_DRIVER_OPTS = [
"be owned by the Admin"),
default=False),
cfg.IntOpt('service_create_timeout',
default=600,
default=nfp_constants.SERVICE_CREATE_TIMEOUT,
help=_("Seconds to wait for service creation "
"to complete")),
cfg.IntOpt('service_delete_timeout',
default=120,
default=nfp_constants.SERVICE_DELETE_TIMEOUT,
help=_("Seconds to wait for service deletion "
"to complete")),
]
@@ -59,6 +62,10 @@ cfg.CONF.register_opts(NFP_NODE_DRIVER_OPTS, "nfp_node_driver")
LOG = logging.getLogger(__name__)
# REVISIT: L2 insertion not supported
GATEWAY_PLUMBER_TYPE = [pconst.FIREWALL, pconst.VPN]
nfp_context_store = threading.local()
class InvalidServiceType(exc.NodeCompositionPluginBadRequest):
message = _("The NFP Node driver only supports the services "
@@ -207,16 +214,50 @@ class NFPClientApi(object):
policy_target=policy_target)
class NFPContext(object):
@staticmethod
def store_nfp_context(sc_instance_id, **context):
if not hasattr(nfp_context_store, 'context'):
nfp_context_store.context = {}
# Considering each store request comes with one entry
if not nfp_context_store.context.get(sc_instance_id):
NFPContext._initialise_attr(sc_instance_id)
nfp_context_store.context[sc_instance_id].update(context)
@staticmethod
def clear_nfp_context(sc_instance_id):
if not hasattr(nfp_context_store, 'context'):
return
if nfp_context_store.context.get(sc_instance_id):
del nfp_context_store.context[sc_instance_id]
@staticmethod
def get_nfp_context(sc_instance_id):
if not hasattr(nfp_context_store, 'context'):
return {}
if nfp_context_store.context.get(sc_instance_id):
return nfp_context_store.context[sc_instance_id]
return {}
@staticmethod
def _initialise_attr(sc_instance_id):
context = {'thread_pool': greenpool.GreenPool(10),
'active_threads': [],
'sc_node_count': 0,
'sc_gateway_type_nodes': [],
'update': False}
if nfp_context_store.context:
nfp_context_store.context.update({sc_instance_id: context})
else:
nfp_context_store.context = {sc_instance_id: context}
class NFPNodeDriver(driver_base.NodeDriverBase):
SUPPORTED_SERVICE_TYPES = [
pconst.LOADBALANCER, pconst.FIREWALL, pconst.VPN,
pconst.LOADBALANCERV2]
SUPPORTED_SERVICE_VENDOR_MAPPING = {
pconst.LOADBALANCERV2: [nfp_constants.HAPROXY_LBAASV2],
pconst.LOADBALANCER: [nfp_constants.HAPROXY_VENDOR],
pconst.FIREWALL: [nfp_constants.VYOS_VENDOR, nfp_constants.NFP_VENDOR],
pconst.VPN: [nfp_constants.VYOS_VENDOR],
}
vendor_name = nfp_constants.NFP_VENDOR.upper()
required_heat_resources = {
pconst.LOADBALANCERV2: ['OS::Neutron::LBaaS::LoadBalancer',
@@ -233,9 +274,6 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
def __init__(self):
super(NFPNodeDriver, self).__init__()
self._lbaas_plugin = None
self.thread_pool = greenpool.GreenPool(10)
self.active_threads = []
self.sc_node_count = 0
@property
def name(self):
@@ -278,11 +316,43 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
# logical services in a single device
plumbing_request = {'management': [], 'provider': [{}],
'consumer': [{}]}
# plumber will return stitching network PT instead of consumer
# as chain is instantiated while creating provider group.
if service_type in GATEWAY_PLUMBER_TYPE:
gateway_type_node = {'service_type': service_type,
'context': {}}
nfp_context = NFPContext.get_nfp_context(context.instance['id'])
if nfp_context:
if len(nfp_context['sc_gateway_type_nodes']):
LOG.info(_LI(
"Not requesting plumber for PTs for service type "
"%(service_type)s"), {'service_type': service_type})
if not nfp_context['update']:
nfp_context['sc_gateway_type_nodes'].append(
gateway_type_node)
NFPContext.store_nfp_context(
context.instance['id'],
sc_gateway_type_nodes=(
nfp_context['sc_gateway_type_nodes']))
return {}
if not nfp_context['update']:
nfp_context['sc_gateway_type_nodes'].append(
gateway_type_node)
NFPContext.store_nfp_context(
context.instance['id'],
sc_gateway_type_nodes=(
nfp_context['sc_gateway_type_nodes']))
else:
NFPContext.store_nfp_context(
context.instance['id'],
sc_gateway_type_nodes=[gateway_type_node])
if service_type in [pconst.FIREWALL, pconst.VPN]:
plumbing_request['plumbing_type'] = (
nfp_constants.GATEWAY_TYPE)
nfp_constants.GATEWAY_TYPE)
else: # Loadbalancer which is one arm
NFPContext.store_nfp_context(
context.instance['id'])
plumbing_request['consumer'] = []
plumbing_request['plumbing_type'] = (
nfp_constants.ENDPOINT_TYPE)
@@ -309,17 +379,11 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
if context.current_profile['service_type'] not in (
self.SUPPORTED_SERVICE_TYPES):
raise InvalidServiceType()
service_vendor = self._parse_service_flavor_string(
context.current_profile['service_flavor'])['service_vendor']
if (service_vendor.lower() not in
self.SUPPORTED_SERVICE_VENDOR_MAPPING[
context.current_profile['service_type']]):
raise UnSupportedServiceProfile(
service_type=context.current_profile['service_type'],
vendor=context.current_profile['vendor'])
self._is_node_order_in_spec_supported(context)
def validate_update(self, context):
NFPContext.store_nfp_context(context.instance['id'],
update=True)
if not context.original_node: # PT create/delete notifications
return
if context.current_node and not context.current_profile:
@@ -332,21 +396,13 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
if context.current_profile['service_type'] not in (
self.SUPPORTED_SERVICE_TYPES):
raise InvalidServiceType()
service_vendor = self._parse_service_flavor_string(
context.current_profile['service_flavor'])['service_vendor']
if (service_vendor.lower() not in
self.SUPPORTED_SERVICE_VENDOR_MAPPING[
context.current_profile['service_type']]):
raise UnSupportedServiceProfile(
service_type=context.current_profile['service_type'],
vendor=context.current_profile['vendor'])
def _wait(self, thread):
def _wait(self, thread, context):
try:
result = thread.wait()
return result
except Exception as e:
self.active_threads = []
NFPContext.clear_nfp_context(context.instance['id'])
raise e
def create(self, context):
@@ -358,32 +414,47 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
context.plugin_session, context.current_node['id'],
context.instance['id'], network_function_id)
except Exception as e:
self.sc_node_count -= 1
NFPContext.clear_nfp_context(context.instance['id'])
raise e
self._wait_for_node_operation_completion(context,
network_function_id,
nfp_constants.CREATE)
def _wait_for_node_operation_completion(self, context,
network_function_id,
operation):
# Check for NF status in a separate thread
LOG.debug("Spawning thread for nf ACTIVE poll")
nfp_context = NFPContext.get_nfp_context(context.instance['id'])
if operation == nfp_constants.DELETE:
gth = nfp_context['thread_pool'].spawn(
self._wait_for_network_function_delete_completion,
context, network_function_id)
else:
gth = nfp_context['thread_pool'].spawn(
self._wait_for_network_function_operation_completion,
context, network_function_id, operation=operation)
gth = self.thread_pool.spawn(
self._wait_for_network_function_operation_completion,
context, network_function_id, operation=nfp_constants.CREATE)
self.active_threads.append(gth)
nfp_context['active_threads'].append(gth)
LOG.debug("Active Threads count (%d), sc_node_count (%d)" % (
len(self.active_threads), self.sc_node_count))
len(nfp_context['active_threads']), nfp_context['sc_node_count']))
self.sc_node_count -= 1
nfp_context['sc_node_count'] -= 1
# At last wait for the threads to complete, success/failure/timeout
if self.sc_node_count == 0:
self.thread_pool.waitall()
if nfp_context['sc_node_count'] == 0:
nfp_context['thread_pool'].waitall()
# Get the results
for gth in self.active_threads:
self._wait(gth)
self.active_threads = []
for gth in nfp_context['active_threads']:
self._wait(gth, context)
NFPContext.clear_nfp_context(context.instance['id'])
else:
NFPContext.store_nfp_context(context.instance['id'], **nfp_context)
def update(self, context):
NFPContext.clear_nfp_context(context.instance['id'])
context._plugin_context = self._get_resource_owner_context(
context._plugin_context)
network_function_map = self._get_node_instance_network_function_map(
@@ -401,7 +472,21 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
self._wait_for_network_function_operation_completion(
context, network_function_id, operation=nfp_constants.UPDATE)
def _get_node_count(self, context):
current_specs = context.relevant_specs
node_list = []
for spec in current_specs:
node_list.extend(spec['nodes'])
NFPContext.store_nfp_context(context.instance['id'],
sc_node_count=len(node_list))
return len(node_list)
def delete(self, context):
nfp_context = (
NFPContext.get_nfp_context(context.instance['id']))
if nfp_context and not nfp_context.get('sc_node_count'):
nfp_context['sc_node_count'] = self._get_node_count(context)
context._plugin_context = self._get_resource_owner_context(
context._plugin_context)
network_function_map = self._get_node_instance_network_function_map(
@@ -410,6 +495,10 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
context.instance['id'])
if not network_function_map:
NFPContext.store_nfp_context(
context.instance['id'],
sc_gateway_type_nodes=[],
sc_node_count=nfp_context['sc_node_count'] - 1)
return
network_function_id = network_function_map.network_function_id
@@ -417,15 +506,18 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
self.nfp_notifier.delete_network_function(
context=context.plugin_context,
network_function_id=network_function_id)
except Exception:
except Exception as e:
NFPContext.clear_nfp_context(context.instance['id'])
LOG.exception(_LE("Delete Network service Failed"))
self._delete_node_instance_network_function_map(
context.plugin_session,
context.current_node['id'],
context.instance['id'])
raise e
self._wait_for_network_function_delete_completion(
context, network_function_id)
self._delete_node_instance_network_function_map(
context.plugin_session,
context.current_node['id'],
context.instance['id'])
self._wait_for_node_operation_completion(context,
network_function_id,
nfp_constants.DELETE)
def update_policy_target_added(self, context, policy_target):
if context.current_profile['service_type'] in [pconst.LOADBALANCER,
@@ -523,6 +615,11 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
eventlet.sleep(5)
time_waited = time_waited + 5
self._delete_node_instance_network_function_map(
context.plugin_session,
context.current_node['id'],
context.instance['id'])
if network_function:
LOG.error(_LE("Delete network function %(network_function)s "
"failed"),
@@ -626,25 +723,75 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
else:
LOG.info(_LI("No action to take on update"))
def _get_service_chain_specs(self, context):
current_specs = context.relevant_specs
for spec in current_specs:
filters = {'id': spec['nodes']}
nodes = context.sc_plugin.get_servicechain_nodes(
context.plugin_context, filters)
for node in nodes:
profile = context.sc_plugin.get_service_profile(
context.plugin_context, node['service_profile_id'])
node['sc_service_profile'] = profile
spec['sc_nodes'] = nodes
return current_specs
def _sc_head_gateway_node_service_targets(self, context,
service_type, relationships):
current_specs = context.relevant_specs
service_targets = []
for spec in current_specs:
filters = {'id': spec['nodes']}
nodes = context.sc_plugin.get_servicechain_nodes(
context.plugin_context, filters)
for node in nodes:
profile = context.sc_plugin.get_service_profile(
context.plugin_context, node['service_profile_id'])
if (profile['service_type'] != service_type and
profile['service_type'] in GATEWAY_PLUMBER_TYPE):
for relationship in relationships:
service_targets.extend(ncp_model.get_service_targets(
context.session,
servicechain_instance_id=context.instance['id'],
servicechain_node_id=node['id'],
relationship=relationship))
return service_targets
def _get_service_targets(self, context):
service_type = context.current_profile['service_type']
provider_service_targets = []
consumer_service_targets = []
service_flavor_str = context.current_profile['service_flavor']
service_details = self._parse_service_flavor_string(service_flavor_str)
nfp_context = NFPContext.get_nfp_context(context.instance['id'])
is_gateway_type = False
global GATEWAY_PLUMBER_TYPE
if service_type in GATEWAY_PLUMBER_TYPE:
for gateway_node in nfp_context['sc_gateway_type_nodes']:
if gateway_node['context']:
service_target_info = gateway_node['context']
return service_target_info
is_gateway_type = True
service_targets = context.get_service_targets()
# Bug with NCP. For create, its not setting service targets in context
if not service_targets:
service_targets = context.get_service_targets(update=True)
if not service_targets:
return {}
if not service_targets and is_gateway_type:
relationships = [nfp_constants.PROVIDER, nfp_constants.CONSUMER]
service_targets = self._sc_head_gateway_node_service_targets(
context,
service_type,
relationships)
for service_target in service_targets:
if service_target.relationship == nfp_constants.CONSUMER:
consumer_service_targets.append(service_target)
elif service_target.relationship == nfp_constants.PROVIDER:
provider_service_targets.append(service_target)
LOG.debug("provider targets: %s consumer targets %s" % (
provider_service_targets, consumer_service_targets))
if (service_details['device_type'] != 'None' and (
@@ -655,6 +802,10 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
{'service_type': service_type})
raise Exception("Service Targets are not created for the Node")
if (not consumer_service_targets and
not provider_service_targets):
return {}
service_target_info = {
'provider_ports': [],
'provider_subnet': None,
@@ -701,6 +852,11 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
service_target_info['consumer_pt_objs'].append(policy_target)
service_target_info['consumer_ptg'].append(policy_target_group)
for gateway_node in nfp_context['sc_gateway_type_nodes']:
if gateway_node['service_type'] == service_type:
gateway_node['context'] = service_target_info
NFPContext.store_nfp_context(context.instance['id'],
**nfp_context)
return service_target_info
# Needs a better algorithm
@@ -737,7 +893,8 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
raise InvalidNodeOrderInChain(
node_order=allowed_chain_combinations)
self.sc_node_count = len(node_list)
NFPContext.store_nfp_context(context.instance['id'],
sc_node_count=len(node_list))
def _get_consumers_for_provider(self, context, provider):
'''
@@ -858,6 +1015,8 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
'port_model': nfp_constants.GBP_NETWORK,
'port_classification': nfp_constants.MANAGEMENT}
service_chain_specs = self._get_service_chain_specs(context)
nfp_create_nf_data = {
'resource_owner_context': context._plugin_context.to_dict(),
'service_chain_instance': sc_instance,
@@ -871,7 +1030,8 @@ class NFPNodeDriver(driver_base.NodeDriverBase):
'network_function_mode': nfp_constants.GBP_MODE,
'tenant_id': context.provider['tenant_id'],
'consuming_ptgs_details': consuming_ptgs_details,
'consuming_eps_details': consuming_eps_details}
'consuming_eps_details': consuming_eps_details,
'service_chain_specs': service_chain_specs}
return self.nfp_notifier.create_network_function(
context.plugin_context, network_function=nfp_create_nf_data)['id']