NFP - Controller Enhancements
This changeset contains following changes - 1) Load service drivers from different paths - Driver loading logic in configurator is changed. Drivers can be loaded from different paths now. Drivers path is specified in configurator.ini file 2) Addressed the issue of configurator losing notifications from it's queue when restarted - Notifications from Configurator to UTC components are sent on rabbitmq using oslo messaging rpc cast() and when UTC component requests for notifications advanced controller pulls them from rabbitmq using pika RPC library. 3) Update service chain node driver support for Loadbalancer and Firewall service - a) Loadbalancer (V1) - HM and pool resources can be updated b) Loadbalancer (V2) - All resources can be updated c) Firewall - New firewall rules can added, existing rules can be deleted or updated 4) Redirecting advanced controller logs to nfp_pecan.log file Change-Id: Id0c95e1ad1c6c6757781a8734465eab8eae80436
This commit is contained in:
parent
86df7d9ec5
commit
9c7068bed4
@ -17,6 +17,7 @@ from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
import pecan
|
||||
import pika
|
||||
|
||||
from gbpservice.nfp.pecan import base_controller
|
||||
|
||||
@ -44,6 +45,10 @@ class Controller(base_controller.BaseController):
|
||||
for service in self.services:
|
||||
self._entry_to_rpc_routing_table(service)
|
||||
|
||||
configurator_notifications = self.services[0]['notifications']
|
||||
self.rmqconsumer = RMQConsumer(configurator_notifications['host'],
|
||||
configurator_notifications['queue']
|
||||
)
|
||||
super(Controller, self).__init__()
|
||||
except Exception as err:
|
||||
msg = (
|
||||
@ -88,10 +93,7 @@ class Controller(base_controller.BaseController):
|
||||
|
||||
try:
|
||||
if self.method_name == 'get_notifications':
|
||||
routing_key = 'CONFIGURATION'
|
||||
uservice = self.rpc_routing_table[routing_key]
|
||||
notification_data = uservice[0].rpcclient.call(
|
||||
self.method_name)
|
||||
notification_data = self.rmqconsumer.pull_notifications()
|
||||
msg = ("NOTIFICATION_DATA sent to config_agent %s"
|
||||
% notification_data)
|
||||
LOG.info(msg)
|
||||
@ -99,7 +101,7 @@ class Controller(base_controller.BaseController):
|
||||
|
||||
except Exception as err:
|
||||
pecan.response.status = 400
|
||||
msg = ("Failed to get handle request=%s. Reason=%s."
|
||||
msg = ("Failed to handle request=%s. Reason=%s."
|
||||
% (self.method_name, str(err).capitalize()))
|
||||
LOG.error(msg)
|
||||
error_data = self._format_description(msg)
|
||||
@ -123,10 +125,8 @@ class Controller(base_controller.BaseController):
|
||||
body = None
|
||||
if pecan.request.is_body_readable:
|
||||
body = pecan.request.json_body
|
||||
if self.method_name == 'network_function_event':
|
||||
routing_key = 'VISIBILITY'
|
||||
else:
|
||||
routing_key = 'CONFIGURATION'
|
||||
|
||||
routing_key = body.pop("routing_key", "CONFIGURATION")
|
||||
for uservice in self.rpc_routing_table[routing_key]:
|
||||
uservice.rpcclient.cast(self.method_name, body)
|
||||
msg = ('Sent RPC to %s' % (uservice.topic))
|
||||
@ -162,10 +162,8 @@ class Controller(base_controller.BaseController):
|
||||
body = None
|
||||
if pecan.request.is_body_readable:
|
||||
body = pecan.request.json_body
|
||||
if self.method_name == 'network_function_event':
|
||||
routing_key = 'VISIBILITY'
|
||||
else:
|
||||
routing_key = 'CONFIGURATION'
|
||||
|
||||
routing_key = body.pop("routing_key", "CONFIGURATION")
|
||||
for uservice in self.rpc_routing_table[routing_key]:
|
||||
uservice.rpcclient.cast(self.method_name, body)
|
||||
msg = ('Sent RPC to %s' % (uservice.topic))
|
||||
@ -269,3 +267,73 @@ class CloudService(object):
|
||||
self.topic = kwargs.get('topic')
|
||||
self.reporting_interval = kwargs.get('reporting_interval')
|
||||
self.rpcclient = RPCClient(topic=self.topic)
|
||||
|
||||
|
||||
class RMQConsumer(object):
|
||||
"""RMQConsumer for over the cloud services.
|
||||
|
||||
This class access rabbitmq's 'configurator-notifications' queue
|
||||
to pull all the notifications came from over the cloud services.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, rabbitmq_host, queue):
|
||||
self.rabbitmq_host = rabbitmq_host
|
||||
self.queue = queue
|
||||
self.create_connection()
|
||||
|
||||
def create_connection(self):
|
||||
try:
|
||||
self.connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters
|
||||
(host=self.rabbitmq_host))
|
||||
except Exception as e:
|
||||
msg = ("Failed to create rmq connection %s" % (e))
|
||||
LOG.error(msg)
|
||||
|
||||
def _fetch_data_from_wrapper_strct(self, oslo_notifications):
|
||||
notifications = []
|
||||
for oslo_notification_data in oslo_notifications:
|
||||
notification_data = jsonutils.loads(
|
||||
oslo_notification_data["oslo.message"]
|
||||
)["args"]["notification_data"]
|
||||
notifications.extend(notification_data)
|
||||
return notifications
|
||||
|
||||
def pull_notifications(self):
|
||||
notifications = []
|
||||
msgs_acknowledged = False
|
||||
try:
|
||||
self.channel = self.connection.channel()
|
||||
self.queue_declared = self.channel.queue_declare(queue=self.queue,
|
||||
durable=True)
|
||||
self.channel.queue_bind(self.queue, 'openstack')
|
||||
pending_msg_count = self.queue_declared.method.message_count
|
||||
log = ('[notifications queue:%s, pending notifications:%s]'
|
||||
% (self.queue, pending_msg_count))
|
||||
LOG.info(log)
|
||||
for i in range(pending_msg_count):
|
||||
method, properties, body = self.channel.basic_get(self.queue)
|
||||
notifications.append(jsonutils.loads(body))
|
||||
|
||||
# Acknowledge all messages delivery
|
||||
if pending_msg_count > 0:
|
||||
self.channel.basic_ack(delivery_tag=method.delivery_tag,
|
||||
multiple=True)
|
||||
msgs_acknowledged = True
|
||||
|
||||
self.channel.close()
|
||||
return self._fetch_data_from_wrapper_strct(notifications)
|
||||
|
||||
except pika.exceptions.ConnectionClosed:
|
||||
msg = ("Caught ConnectionClosed exception.Creating new connection")
|
||||
LOG.error(msg)
|
||||
self.create_connection()
|
||||
return self._fetch_data_from_wrapper_strct(notifications)
|
||||
except pika.exceptions.ChannelClosed:
|
||||
msg = ("Caught ChannelClosed exception.")
|
||||
LOG.error(msg)
|
||||
if msgs_acknowledged is False:
|
||||
return self.pull_notifications()
|
||||
else:
|
||||
return self._fetch_data_from_wrapper_strct(notifications)
|
||||
|
@ -10,9 +10,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from gbpservice.contrib.nfp.configurator.lib import constants as const
|
||||
from gbpservice.nfp.core import log as nfp_logging
|
||||
from gbpservice.nfp.core import module as nfp_api
|
||||
from neutron.common import rpc as n_rpc
|
||||
from oslo_config import cfg
|
||||
|
||||
n_rpc.init(cfg.CONF)
|
||||
|
||||
LOG = nfp_logging.getLogger(__name__)
|
||||
|
||||
@ -115,11 +121,19 @@ class AgentBaseNotification(object):
|
||||
cloud components using this notification handle.
|
||||
"""
|
||||
|
||||
API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, sc):
|
||||
self.sc = sc
|
||||
self.topic = const.NOTIFICATION_QUEUE
|
||||
target = messaging.Target(topic=self.topic,
|
||||
version=self.API_VERSION)
|
||||
self.client = n_rpc.get_client(target)
|
||||
self.cctxt = self.client.prepare(version=self.API_VERSION,
|
||||
topic=self.topic)
|
||||
|
||||
def _notification(self, data):
|
||||
"""Enqueues notification event into notification queue
|
||||
"""Enqueues notification event into const.NOTIFICATION_QUEUE
|
||||
|
||||
These events are enqueued into notification queue and are retrieved
|
||||
when get_notifications() API lands on configurator.
|
||||
@ -129,9 +143,10 @@ class AgentBaseNotification(object):
|
||||
Returns: None
|
||||
|
||||
"""
|
||||
event = self.sc.new_event(
|
||||
id=const.EVENT_STASH, key=const.EVENT_STASH, data=data)
|
||||
self.sc.stash_event(event)
|
||||
self.cctxt.cast(self, 'send_notification', notification_data=[data])
|
||||
|
||||
def to_dict(self):
|
||||
return {}
|
||||
|
||||
|
||||
class AgentBaseEventHandler(nfp_api.NfpEventHandler):
|
||||
|
@ -363,8 +363,8 @@ def load_drivers(conf):
|
||||
|
||||
"""
|
||||
|
||||
ld = load_driver.ConfiguratorUtils()
|
||||
drivers = ld.load_drivers(const.DRIVERS_DIR)
|
||||
ld = load_driver.ConfiguratorUtils(conf)
|
||||
drivers = ld.load_drivers(const.SERVICE_TYPE)
|
||||
|
||||
for service_type, driver_name in drivers.iteritems():
|
||||
driver_obj = driver_name(conf=conf)
|
||||
|
@ -219,10 +219,12 @@ class GenericConfigEventHandler(agent_base.AgentBaseEventHandler,
|
||||
elif ev.id == gen_cfg_const.EVENT_CONFIGURE_HEALTHMONITOR:
|
||||
resource_data = ev.data.get('resource_data')
|
||||
periodicity = resource_data.get('periodicity')
|
||||
EV_CONF_HM_MAXRETRY = (
|
||||
gen_cfg_const.EVENT_CONFIGURE_HEALTHMONITOR_MAXRETRY)
|
||||
if periodicity == gen_cfg_const.INITIAL:
|
||||
self.sc.poll_event(
|
||||
ev,
|
||||
max_times=gen_cfg_const.INITIAL_HM_RETRIES)
|
||||
ev,
|
||||
max_times= EV_CONF_HM_MAXRETRY)
|
||||
|
||||
elif periodicity == gen_cfg_const.FOREVER:
|
||||
self.sc.poll_event(ev)
|
||||
@ -354,8 +356,8 @@ class GenericConfigEventHandler(agent_base.AgentBaseEventHandler,
|
||||
self.notify._notification(notification_data)
|
||||
|
||||
@nfp_api.poll_event_desc(
|
||||
event=gen_cfg_const.EVENT_CONFIGURE_HEALTHMONITOR,
|
||||
spacing=5)
|
||||
event=gen_cfg_const.EVENT_CONFIGURE_HEALTHMONITOR,
|
||||
spacing=gen_cfg_const.EVENT_CONFIGURE_HEALTHMONITOR_SPACING)
|
||||
def handle_configure_healthmonitor(self, ev):
|
||||
"""Decorator method called for poll event CONFIGURE_HEALTHMONITOR
|
||||
Finally it Enqueues response into notification queue.
|
||||
@ -410,8 +412,8 @@ def load_drivers(conf):
|
||||
|
||||
"""
|
||||
|
||||
cutils = utils.ConfiguratorUtils()
|
||||
drivers = cutils.load_drivers(gen_cfg_const.DRIVERS_DIR)
|
||||
cutils = utils.ConfiguratorUtils(conf)
|
||||
drivers = cutils.load_drivers()
|
||||
|
||||
for service_type, driver_name in drivers.iteritems():
|
||||
driver_obj = driver_name(conf=conf)
|
||||
|
@ -710,8 +710,8 @@ def load_drivers(sc, conf):
|
||||
vendor name
|
||||
|
||||
"""
|
||||
cutils = utils.ConfiguratorUtils()
|
||||
drivers = cutils.load_drivers(lb_constants.DRIVERS_DIR)
|
||||
cutils = utils.ConfiguratorUtils(conf)
|
||||
drivers = cutils.load_drivers(lb_constants.SERVICE_TYPE)
|
||||
|
||||
plugin_rpc = LBaasRpcSender(sc)
|
||||
|
||||
|
@ -172,8 +172,8 @@ def load_drivers(conf):
|
||||
|
||||
"""
|
||||
|
||||
ld = load_driver.ConfiguratorUtils()
|
||||
drivers = ld.load_drivers(const.DRIVERS_DIR)
|
||||
ld = load_driver.ConfiguratorUtils(conf)
|
||||
drivers = ld.load_drivers(const.SERVICE_TYPE)
|
||||
|
||||
for service_type, driver_name in drivers.iteritems():
|
||||
driver_obj = driver_name(conf=conf)
|
||||
|
@ -297,7 +297,8 @@ class VPNaasEventHandler(nfp_api.NfpEventHandler):
|
||||
context = ev.data.get('context')
|
||||
s2s_contexts = self._plugin_rpc.get_vpn_servicecontext(context)
|
||||
state = self._sync_ipsec_conns(context, s2s_contexts[0])
|
||||
if state == const.STATE_ACTIVE:
|
||||
if state in {const.STATE_ACTIVE,
|
||||
const.STATE_ERROR}:
|
||||
return {'poll': False}
|
||||
|
||||
|
||||
@ -332,8 +333,8 @@ def load_drivers(sc, conf):
|
||||
Returns: dictionary of instances of the respective driver classes.
|
||||
"""
|
||||
|
||||
ld = utils.ConfiguratorUtils()
|
||||
drivers = ld.load_drivers(const.DRIVERS_DIR)
|
||||
ld = utils.ConfiguratorUtils(conf)
|
||||
drivers = ld.load_drivers(const.SERVICE_TYPE)
|
||||
|
||||
for service_type, driver_name in drivers.iteritems():
|
||||
driver_obj = driver_name(conf=conf)
|
||||
|
@ -105,7 +105,7 @@ class BaseDriver(object):
|
||||
|
||||
url = url % (mgmt_ip, port, 'configure-rsyslog-as-client')
|
||||
|
||||
log_forward_ip_address = self.conf.log_forward_ip_address
|
||||
log_forward_ip_address = self.conf.configurator.log_forward_ip_address
|
||||
if not log_forward_ip_address:
|
||||
msg = ("Log forwarding IP address not configured "
|
||||
"for service at %s." % mgmt_ip)
|
||||
@ -114,8 +114,8 @@ class BaseDriver(object):
|
||||
|
||||
data = dict(
|
||||
server_ip=log_forward_ip_address,
|
||||
server_port=self.conf.log_forward_port,
|
||||
log_level=self.conf.log_level)
|
||||
server_port=self.conf.configurator.log_forward_port,
|
||||
log_level=self.conf.configurator.log_level)
|
||||
data = jsonutils.dumps(data)
|
||||
|
||||
msg = ("Initiating POST request to configure log forwarding "
|
||||
|
@ -12,7 +12,7 @@
|
||||
|
||||
VYOS = 'vyos'
|
||||
CONFIGURATION_SERVER_PORT = '8888'
|
||||
REST_TIMEOUT = 120
|
||||
REST_TIMEOUT = 180
|
||||
request_url = "http://%s:%s/%s"
|
||||
|
||||
INTERFACE_NOT_FOUND = "INTERFACE NOT FOUND"
|
||||
|
@ -15,4 +15,4 @@ SERVICE_VENDOR = 'vyos'
|
||||
CONFIGURATION_SERVER_PORT = 8888
|
||||
request_url = "http://%s:%s/%s"
|
||||
|
||||
REST_TIMEOUT = 90
|
||||
REST_TIMEOUT = 180
|
||||
|
@ -1238,6 +1238,7 @@ class VpnaasIpsecDriver(VpnGenericConfigDriver):
|
||||
msg = ("Failed to check if IPSEC state is changed. %s"
|
||||
% str(err).capitalize())
|
||||
LOG.error(msg)
|
||||
return vpn_const.STATE_ERROR
|
||||
if changed:
|
||||
self.agent.update_status(
|
||||
context, self._update_conn_status(conn,
|
||||
|
@ -1,27 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
opts = [
|
||||
cfg.StrOpt(
|
||||
'log_forward_ip_address',
|
||||
default=None,
|
||||
help=('IP address to forward logs to')),
|
||||
cfg.IntOpt(
|
||||
'log_forward_port',
|
||||
default=514,
|
||||
help=("port to forward logs to")),
|
||||
cfg.StrOpt(
|
||||
'log_level',
|
||||
default='debug',
|
||||
help=('log level for logs forwarding'))]
|
@ -217,9 +217,9 @@ class Filter(object):
|
||||
"""
|
||||
Get the local subnet cidr
|
||||
"""
|
||||
subnet = [subnet for subnet in service_info['subnets']
|
||||
if subnet['id'] == vpnservice['subnet_id']][0]
|
||||
cidr = subnet['cidr']
|
||||
description = vpnservice['description']
|
||||
tokens = description.split(';')
|
||||
cidr = tokens[5].split('=')[1]
|
||||
vpnservice['cidr'] = cidr
|
||||
|
||||
siteconn = {}
|
||||
|
@ -10,7 +10,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
DRIVERS_DIR = 'gbpservice.contrib.nfp.configurator.drivers.firewall'
|
||||
SERVICE_TYPE = 'firewall'
|
||||
|
||||
FIREWALL_CREATE_EVENT = 'CREATE_FIREWALL'
|
||||
|
@ -10,7 +10,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
DRIVERS_DIR = 'gbpservice.contrib.nfp.configurator.drivers'
|
||||
SERVICE_TYPE = 'generic_config'
|
||||
EVENT_CONFIGURE_INTERFACES = 'CONFIGURE_INTERFACES'
|
||||
EVENT_CLEAR_INTERFACES = 'CLEAR_INTERFACES'
|
||||
@ -19,7 +18,11 @@ EVENT_CLEAR_ROUTES = 'CLEAR_ROUTES'
|
||||
EVENT_CONFIGURE_HEALTHMONITOR = 'CONFIGURE_HEALTHMONITOR'
|
||||
EVENT_CLEAR_HEALTHMONITOR = 'CLEAR_HEALTHMONITOR'
|
||||
|
||||
MAX_FAIL_COUNT = 12 # 5 secs delay * 12 = 60 secs
|
||||
# REVISIT: Need to make this configurable
|
||||
MAX_FAIL_COUNT = 28 # 5 secs delay * 28 = 140 secs
|
||||
INITIAL = 'initial'
|
||||
FOREVER = 'forever'
|
||||
INITIAL_HM_RETRIES = 24 # 5 secs delay * 24 = 120 secs
|
||||
|
||||
#POLLING EVENTS SPACING AND MAXRETRIES
|
||||
EVENT_CONFIGURE_HEALTHMONITOR_SPACING = 10
|
||||
EVENT_CONFIGURE_HEALTHMONITOR_MAXRETRY = 40
|
||||
|
@ -10,7 +10,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
DRIVERS_DIR = 'gbpservice.contrib.nfp.configurator.drivers.loadbalancer.v1'
|
||||
SERVICE_TYPE = 'loadbalancer'
|
||||
NEUTRON = 'neutron'
|
||||
|
||||
|
@ -10,7 +10,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
DRIVERS_DIR = 'gbpservice.contrib.nfp.configurator.drivers.nfp_service'
|
||||
SERVICE_TYPE = 'nfp_service'
|
||||
CREATE_NFP_SERVICE_EVENT = 'CREATE_NFP_SERVICE'
|
||||
UNHANDLED_RESULT = 'unhandled'
|
||||
|
@ -24,29 +24,31 @@ class ConfiguratorUtils(object):
|
||||
New common library functions, if needed, should be added in this class.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
|
||||
def load_drivers(self, pkg):
|
||||
def load_drivers(self, service_type=None):
|
||||
"""Load all the driver class objects inside pkg. In each class in the
|
||||
pkg it will look for keywork 'service_type' or/and 'vendor' and
|
||||
select that class as driver class
|
||||
|
||||
@param pkg : package
|
||||
e.g pkg = 'gbpservice.neutron.nsf.configurator.drivers.firewall'
|
||||
@param service_type: firewall/vpn/loadbalancer/nfp_service/None
|
||||
|
||||
Returns: driver_objects dictionary
|
||||
e.g driver_objects = {'loadbalancer': <driver class object>}
|
||||
|
||||
"""
|
||||
|
||||
pkgs = self.conf.CONFIG_DRIVERS.drivers
|
||||
driver_objects = {}
|
||||
|
||||
base_driver = __import__(pkg,
|
||||
globals(), locals(), ['drivers'], -1)
|
||||
drivers_dir = base_driver.__path__[0]
|
||||
|
||||
modules = []
|
||||
subdirectories = [x[0] for x in os.walk(drivers_dir)]
|
||||
subdirectories = []
|
||||
for pkg in pkgs:
|
||||
base_driver = __import__(pkg,
|
||||
globals(), locals(), ['drivers'], -1)
|
||||
drivers_dir = base_driver.__path__[0]
|
||||
subdirectories += [x[0] for x in os.walk(drivers_dir)]
|
||||
|
||||
for subd in subdirectories:
|
||||
syspath = sys.path
|
||||
sys.path = [subd] + syspath
|
||||
@ -66,8 +68,12 @@ class ConfiguratorUtils(object):
|
||||
for name, class_obj in inspect.getmembers(module):
|
||||
if inspect.isclass(class_obj):
|
||||
key = ''
|
||||
if hasattr(class_obj, 'service_type'):
|
||||
if hasattr(class_obj, 'service_type') and (
|
||||
not service_type or (service_type.lower() in (
|
||||
class_obj.service_type.lower()))):
|
||||
key += class_obj.service_type
|
||||
else:
|
||||
continue
|
||||
if hasattr(class_obj, 'service_vendor'):
|
||||
key += class_obj.service_vendor
|
||||
if key:
|
||||
|
@ -10,8 +10,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
DRIVERS_DIR = 'gbpservice.contrib.nfp.configurator.drivers.vpn'
|
||||
|
||||
SERVICE_TYPE = 'vpn'
|
||||
|
||||
|
||||
|
@ -0,0 +1,34 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg as oslo_config
|
||||
|
||||
nfp_configurator_extra_opts = [
|
||||
oslo_config.StrOpt('log_forward_ip_address',
|
||||
default='', help='Log collector host IP address'),
|
||||
oslo_config.IntOpt('log_forward_port',
|
||||
default='514', help='Log collector port number'),
|
||||
oslo_config.StrOpt('log_level',
|
||||
default='debug',
|
||||
help='Log level info/error/debug/warning')]
|
||||
|
||||
oslo_config.CONF.register_opts(nfp_configurator_extra_opts, "configurator")
|
||||
|
||||
|
||||
nfp_configurator_config_drivers_opts = [
|
||||
oslo_config.ListOpt(
|
||||
'drivers',
|
||||
default=['gbpservice.contrib.nfp.configurator.drivers'],
|
||||
help='List of config driver directories')]
|
||||
|
||||
oslo_config.CONF.register_opts(nfp_configurator_config_drivers_opts,
|
||||
"CONFIG_DRIVERS")
|
@ -12,7 +12,6 @@
|
||||
|
||||
from oslo_log import helpers as log_helpers
|
||||
|
||||
from gbpservice.contrib.nfp.configurator.lib import config_opts
|
||||
from gbpservice.contrib.nfp.configurator.lib import constants as const
|
||||
from gbpservice.contrib.nfp.configurator.lib import demuxer
|
||||
from gbpservice.contrib.nfp.configurator.lib import schema_validator
|
||||
@ -417,7 +416,7 @@ def init_rpc(sc, cm, conf, demuxer):
|
||||
sc.register_rpc_agents([configurator_agent])
|
||||
|
||||
|
||||
def get_configurator_module_instance(sc):
|
||||
def get_configurator_module_instance(sc, conf):
|
||||
""" Provides ConfiguratorModule class object and loads service agents.
|
||||
|
||||
Returns: Instance of ConfiguratorModule class
|
||||
@ -425,7 +424,7 @@ def get_configurator_module_instance(sc):
|
||||
"""
|
||||
|
||||
cm = ConfiguratorModule(sc)
|
||||
conf_utils = utils.ConfiguratorUtils()
|
||||
conf_utils = utils.ConfiguratorUtils(conf)
|
||||
|
||||
# Loads all the service agents under AGENT_PKG module path
|
||||
cm.imported_sas = conf_utils.load_agents(const.AGENTS_PKG)
|
||||
@ -455,7 +454,7 @@ def nfp_module_init(sc, conf):
|
||||
|
||||
# Create configurator module and de-multiplexer objects
|
||||
try:
|
||||
cm = get_configurator_module_instance(sc)
|
||||
cm = get_configurator_module_instance(sc, conf)
|
||||
demuxer_instance = demuxer.ServiceAgentDemuxer()
|
||||
except Exception as err:
|
||||
msg = ("Failed to initialize configurator de-multiplexer. %s."
|
||||
@ -468,7 +467,6 @@ def nfp_module_init(sc, conf):
|
||||
|
||||
# Initialize all the pre-loaded service agents
|
||||
try:
|
||||
conf.register_opts(config_opts.opts)
|
||||
cm.init_service_agents(sc, conf)
|
||||
except Exception as err:
|
||||
msg = ("Failed to initialize configurator agent modules. %s."
|
||||
@ -507,7 +505,7 @@ def nfp_module_post_init(sc, conf):
|
||||
"""
|
||||
|
||||
try:
|
||||
cm = get_configurator_module_instance(sc)
|
||||
cm = get_configurator_module_instance(sc, conf)
|
||||
cm.init_service_agents_complete(sc, conf)
|
||||
except Exception as err:
|
||||
msg = ("Failed to trigger initialization complete for configurator"
|
||||
|
@ -20,6 +20,7 @@ from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron import manager
|
||||
from neutron_lib import exceptions
|
||||
from neutron_vpnaas.db.vpn import vpn_validator
|
||||
from neutron_vpnaas.services.vpn.plugin import VPNDriverPlugin
|
||||
from neutron_vpnaas.services.vpn.plugin import VPNPlugin
|
||||
from neutron_vpnaas.services.vpn.service_drivers import base_ipsec
|
||||
@ -148,12 +149,22 @@ class NFPIPsecVpnAgentApi(base_ipsec.IPsecVpnAgentApi):
|
||||
LOG.error(msg)
|
||||
|
||||
|
||||
class VPNValidator(vpn_validator.VpnReferenceValidator):
|
||||
"""This class overrides the vpnservice validator method"""
|
||||
def __init__(self):
|
||||
super(VPNValidator, self).__init__()
|
||||
|
||||
def validate_vpnservice(self, context, vpns):
|
||||
pass
|
||||
|
||||
|
||||
class NFPIPsecVPNDriver(base_ipsec.BaseIPsecVPNDriver):
|
||||
"""VPN Service Driver class for IPsec."""
|
||||
|
||||
def __init__(self, service_plugin):
|
||||
super(NFPIPsecVPNDriver, self).__init__(
|
||||
service_plugin)
|
||||
self.validator = VPNValidator()
|
||||
|
||||
def create_rpc_conn(self):
|
||||
self.endpoints = [
|
||||
@ -205,7 +216,10 @@ class NFPIPsecVPNDriver(base_ipsec.BaseIPsecVPNDriver):
|
||||
msg = ('updating ipsec_site_connection with id %s to'
|
||||
'ERROR state' % (ipsec_site_connection['id']))
|
||||
LOG.error(msg)
|
||||
self._update_ipsec_conn_state(context, ipsec_site_connection)
|
||||
VPNPluginExt().update_ipsec_site_conn_status(
|
||||
context,
|
||||
ipsec_site_connection['id'],
|
||||
ERROR)
|
||||
break
|
||||
time.sleep(5)
|
||||
starttime += 5
|
||||
@ -213,7 +227,10 @@ class NFPIPsecVPNDriver(base_ipsec.BaseIPsecVPNDriver):
|
||||
msg = ('updating ipsec_site_connection with id %s to'
|
||||
'ERROR state' % (ipsec_site_connection['id']))
|
||||
LOG.error(msg)
|
||||
self._update_ipsec_conn_state(context, ipsec_site_connection)
|
||||
VPNPluginExt().update_ipsec_site_conn_status(
|
||||
context,
|
||||
ipsec_site_connection['id'],
|
||||
ERROR)
|
||||
|
||||
def _move_ipsec_conn_state_to_error(self, context, ipsec_site_connection):
|
||||
vpnsvc_status = [{
|
||||
|
@ -1,9 +1,9 @@
|
||||
#!/bin/bash
|
||||
|
||||
service rabbitmq-server start
|
||||
screen -dmS "configurator" /usr/bin/python2 /usr/bin/nfp --config-file=/etc/nfp_configurator.ini --log-file=/var/log/nfp/nfp_configurator.log
|
||||
service nfp-controller start
|
||||
cd /usr/local/lib/python2.7/dist-packages/gbpservice/nfp/pecan/api/
|
||||
python setup.py develop
|
||||
screen -dmS "pecan" pecan configurator_decider config.py --mode advanced
|
||||
service nfp-pecan start
|
||||
/bin/bash
|
||||
|
||||
|
@ -68,12 +68,9 @@ class ControllerTestCase(base.BaseTestCase, rest.RestController):
|
||||
|
||||
"""
|
||||
with mock.patch.object(
|
||||
controller.RPCClient, 'call') as rpc_mock:
|
||||
rpc_mock.return_value = jsonutils.dumps(self.data)
|
||||
response = self.app.get(
|
||||
'/v1/nfp/get_notifications'
|
||||
)
|
||||
rpc_mock.assert_called_with('get_notifications')
|
||||
controller.RMQConsumer, 'pull_notifications') as mock_pn:
|
||||
response = self.app.get('/v1/nfp/get_notifications')
|
||||
mock_pn.assert_called_with()
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_post_create_network_function_device_config(self):
|
||||
|
@ -229,9 +229,10 @@ class GenericConfigEventHandlerTestCase(base.BaseTestCase):
|
||||
mock_delete_src_routes.assert_called_with(
|
||||
self.fo.context, resource_data)
|
||||
elif const.EVENT_CONFIGURE_HEALTHMONITOR in ev.id:
|
||||
if periodicity == const.INITIAL_HM_RETRIES:
|
||||
if periodicity == const.EVENT_CONFIGURE_HEALTHMONITOR_MAXRETRY:
|
||||
mock_hm_poll_event.assert_called_with(
|
||||
ev, max_times=const.INITIAL_HM_RETRIES)
|
||||
ev, max_times=(
|
||||
const.EVENT_CONFIGURE_HEALTHMONITOR_MAXRETRY))
|
||||
elif periodicity == const.FOREVER:
|
||||
mock_hm_poll_event.assert_called_with(ev)
|
||||
elif ev.id == const.EVENT_CLEAR_HEALTHMONITOR:
|
||||
@ -256,11 +257,14 @@ class GenericConfigEventHandlerTestCase(base.BaseTestCase):
|
||||
agent, '_get_driver', return_value=driver), (
|
||||
mock.patch.object(
|
||||
driver, const.EVENT_CONFIGURE_HEALTHMONITOR.lower(),
|
||||
return_value=common_const.SUCCESS)), (
|
||||
mock.patch.object(subprocess, 'check_output', return_value=True)):
|
||||
return_value=common_const.SUCCESS)) as mock_dvr, (
|
||||
mock.patch.object(subprocess,
|
||||
'check_output', return_value=True)):
|
||||
|
||||
agent.handle_configure_healthmonitor(ev)
|
||||
|
||||
self.assertEqual(mock_dvr.return_value, common_const.SUCCESS)
|
||||
|
||||
def test_configure_interfaces_genericconfigeventhandler(self):
|
||||
""" Implements test case for configure interfaces method
|
||||
of generic config event handler.
|
||||
|
@ -13,7 +13,6 @@
|
||||
import mock
|
||||
|
||||
from gbpservice.contrib.nfp.configurator.agents import loadbalancer_v1 as lb
|
||||
from gbpservice.contrib.nfp.configurator.lib import constants as const
|
||||
from gbpservice.contrib.nfp.configurator.lib import demuxer
|
||||
from gbpservice.contrib.nfp.configurator.modules import configurator
|
||||
from gbpservice.contrib.tests.unit.nfp.configurator.test_data import (
|
||||
@ -55,29 +54,8 @@ class LBaasRpcSenderTest(base.BaseTestCase):
|
||||
sc, conf, rpc_mgr = self._get_configurator_rpc_manager_object()
|
||||
agent = lb.LBaasRpcSender(sc)
|
||||
agent_info = {'context': 'context', 'resource': 'pool'}
|
||||
with mock.patch.object(
|
||||
sc, 'new_event', return_value='foo') as mock_new_event, (
|
||||
mock.patch.object(
|
||||
sc, 'stash_event')) as mock_stash_event:
|
||||
|
||||
agent.update_status('pool', 'object_id',
|
||||
'status', agent_info, 'pool')
|
||||
|
||||
data = {'info': {'service_type': "loadbalancer",
|
||||
'context': 'context'},
|
||||
'notification': [{'resource': 'pool',
|
||||
'data': {'obj_type': 'pool',
|
||||
'obj_id': 'object_id',
|
||||
'notification_type':
|
||||
'update_status',
|
||||
'status': 'status',
|
||||
'pool': 'pool'}}]
|
||||
}
|
||||
mock_new_event.assert_called_with(
|
||||
id=const.EVENT_STASH,
|
||||
key=const.EVENT_STASH,
|
||||
data=data)
|
||||
mock_stash_event.assert_called_with('foo')
|
||||
agent.update_status('pool', 'object_id',
|
||||
'status', agent_info, 'pool')
|
||||
|
||||
def test_update_pool_stats(self):
|
||||
"""Implements test case for update_pool_stats method
|
||||
@ -89,28 +67,8 @@ class LBaasRpcSenderTest(base.BaseTestCase):
|
||||
|
||||
sc, conf, rpc_mgr = self._get_configurator_rpc_manager_object()
|
||||
agent = lb.LBaasRpcSender(sc)
|
||||
|
||||
with mock.patch.object(
|
||||
sc, 'new_event', return_value='foo') as mock_new_event, (
|
||||
mock.patch.object(
|
||||
sc, 'stash_event')) as mock_stash_event:
|
||||
context = test_data.Context()
|
||||
agent.update_pool_stats('pool_id', 'stats', context)
|
||||
|
||||
data = {'info': {'service_type': 'loadbalancer',
|
||||
'context': context.to_dict()},
|
||||
'notification': [{'resource': 'pool',
|
||||
'data': {'pool_id': 'pool_id',
|
||||
'stats': 'stats',
|
||||
'notification_type': (
|
||||
'update_pool_stats'),
|
||||
'pool': 'pool_id'}}]
|
||||
}
|
||||
mock_new_event.assert_called_with(
|
||||
id=const.EVENT_STASH,
|
||||
key=const.EVENT_STASH,
|
||||
data=data)
|
||||
mock_stash_event.assert_called_with('foo')
|
||||
context = test_data.Context()
|
||||
agent.update_pool_stats('pool_id', 'stats', context)
|
||||
|
||||
def test_get_logical_device(self):
|
||||
"""Implements test case for get_logical_device method
|
||||
|
@ -319,7 +319,7 @@ class RestApiTestCase(base.BaseTestCase):
|
||||
self.test_dict = vpn_test_data.VPNTestData()
|
||||
self.args = {'peer_address': '1.103.2.2'}
|
||||
self.fake_resp_dict = {'status': None}
|
||||
self.timeout = 90
|
||||
self.timeout = self.rest_obj.timeout
|
||||
self.data = {'data': 'data'}
|
||||
self.j_data = jsonutils.dumps(self.data)
|
||||
|
||||
|
@ -41,7 +41,7 @@ class FakeObjects(object):
|
||||
data_for_add_src_route = {'source_cidr': "1.2.3.4/24",
|
||||
'gateway_ip': "1.2.3.4"}
|
||||
data_for_del_src_route = {'source_cidr': '1.2.3.4/24'}
|
||||
timeout = 120
|
||||
timeout = 180
|
||||
|
||||
def get_url_for_api(self, api):
|
||||
url = 'http://172.24.4.5:8888/'
|
||||
|
@ -10,6 +10,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from gbpservice.contrib.nfp.configurator.drivers.vpn.vyos import (
|
||||
vyos_vpn_constants)
|
||||
|
||||
""" Implements fake objects for assertion.
|
||||
|
||||
"""
|
||||
@ -76,7 +79,7 @@ class VPNTestData(object):
|
||||
self.data__ = {"local_cidr": "11.0.6.0/24",
|
||||
"peer_address": "1.103.2.2",
|
||||
"peer_cidr": "141.0.0.0/24"}
|
||||
self.timeout = 90
|
||||
self.timeout = vyos_vpn_constants.REST_TIMEOUT
|
||||
|
||||
self.ipsec_vpn_create = ['fip=192.168.20.75',
|
||||
'tunnel_local_cidr=11.0.6.0/24',
|
||||
|
@ -28,15 +28,15 @@ app = {
|
||||
}
|
||||
|
||||
logging = {
|
||||
'root': {'level': 'INFO', 'handlers': ['console']},
|
||||
'root': {'level': 'INFO', 'handlers': ['console', 'logfile']},
|
||||
'loggers': {
|
||||
'pecanlog': {'level': 'INFO',
|
||||
'handlers': ['console'],
|
||||
'handlers': ['console', 'logfile'],
|
||||
'propagate': False},
|
||||
'pecan': {'level': 'INFO',
|
||||
'handlers': ['console'],
|
||||
'handlers': ['console', 'logfile'],
|
||||
'propagate': False},
|
||||
'py.warnings': {'handlers': ['console']},
|
||||
'py.warnings': {'handlers': ['console', 'logfile']},
|
||||
'__force_dict__': True
|
||||
},
|
||||
'handlers': {
|
||||
@ -44,6 +44,11 @@ logging = {
|
||||
'level': 'INFO',
|
||||
'class': 'logging.StreamHandler',
|
||||
'formatter': 'color'
|
||||
},
|
||||
'logfile': {
|
||||
'class': 'logging.FileHandler',
|
||||
'filename': '/var/log/nfp/nfp_pecan.log',
|
||||
'level': 'INFO'
|
||||
}
|
||||
},
|
||||
'formatters': {
|
||||
@ -56,13 +61,18 @@ logging = {
|
||||
'format': ('%(asctime)s [%(padded_color_levelname)s] [%(name)s]'
|
||||
'[%(threadName)s] %(message)s'),
|
||||
'__force_dict__': True
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cloud_services = [
|
||||
{'service_name': 'configurator',
|
||||
'topic': 'configurator',
|
||||
'topic': 'configurator', # configurator rpc topic
|
||||
'reporting_interval': '10', # in seconds
|
||||
'apis': ['CONFIGURATION']
|
||||
}
|
||||
'apis': ['CONFIGURATION'],
|
||||
# notifications from configurator to UTC components
|
||||
'notifications': {'host': '127.0.0.1',
|
||||
'queue': 'configurator-notifications'
|
||||
}
|
||||
},
|
||||
]
|
||||
|
Loading…
Reference in New Issue
Block a user