NFP - Base mode Service Orchestrator
Device won't get created in base mode support.
Inserted proxy_agent component in this patch as other
patches are dependent on this.
Change-Id: Ia13caec590584ea8958abdffff09a6fef9dcb226
Implements: blueprint gbp-network-services-framework
Co-Authored-By: Yogesh Rajmane<yogesh.rajmane@oneconvergence.com>
Co-Authored-By: Akash Deep<akash.deep@oneconvergence.com>
Co-Authored-By: Ahmed Khan<ahmed.khan@oneconvergence.com>
(cherry picked from commit ea5e8a2621)
This commit is contained in:
committed by
Hemanth Ravi
parent
35ae845b29
commit
f4b68b18f8
0
gbpservice/neutron/tests/unit/nfp/lib/__init__.py
Normal file
0
gbpservice/neutron/tests/unit/nfp/lib/__init__.py
Normal file
179
gbpservice/neutron/tests/unit/nfp/lib/test_transport.py
Normal file
179
gbpservice/neutron/tests/unit/nfp/lib/test_transport.py
Normal file
@@ -0,0 +1,179 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from gbpservice.nfp.lib import transport
|
||||
import mock
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron import context as ctx
|
||||
from oslo_config import cfg
|
||||
from oslo_serialization import jsonutils
|
||||
import unittest
|
||||
|
||||
"""
|
||||
Common class used to create configuration mapping
|
||||
"""
|
||||
|
||||
|
||||
class Map(dict):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Map, self).__init__(*args, **kwargs)
|
||||
for arg in args:
|
||||
if isinstance(arg, dict):
|
||||
for k, v in arg.iteritems():
|
||||
self[k] = v
|
||||
|
||||
if kwargs:
|
||||
for k, v in kwargs.iteritems():
|
||||
self[k] = v
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return self.get(attr)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
self.__setitem__(key, value)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
super(Map, self).__setitem__(key, value)
|
||||
self.__dict__.update({key: value})
|
||||
|
||||
def __delattr__(self, item):
|
||||
self.__delitem__(item)
|
||||
|
||||
def __delitem__(self, key):
|
||||
super(Map, self).__delitem__(key)
|
||||
del self.__dict__[key]
|
||||
|
||||
|
||||
class TestContext(object):
|
||||
|
||||
def get_context(self):
|
||||
try:
|
||||
return ctx.Context('some_user', 'some_tenant')
|
||||
except Exception:
|
||||
return ctx.Context('some_user', 'some_tenant')
|
||||
|
||||
def get_test_context(self):
|
||||
# creating a test context
|
||||
variables = {}
|
||||
variables['context'] = self.get_context()
|
||||
variables['body'] = {'info': {'context': {}},
|
||||
'config': []}
|
||||
variables['method_type'] = 'CREATE'
|
||||
variables['device_config'] = True
|
||||
return variables
|
||||
|
||||
|
||||
class CommonLibraryTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
n_rpc.init(cfg.CONF)
|
||||
self.imprt_rc = 'gbpservice.nfp.lib.rest_client_over_unix'
|
||||
|
||||
def _cast(self, context, method, **kwargs):
|
||||
return
|
||||
|
||||
def _call(self, context, method, **kwargs):
|
||||
return []
|
||||
|
||||
def _get(self, path):
|
||||
|
||||
class MockResponse(object):
|
||||
|
||||
def __init__(self):
|
||||
self.content = {'success': '200'}
|
||||
return MockResponse()
|
||||
|
||||
def _uget(self, path):
|
||||
return(200, "")
|
||||
|
||||
def _post(self, path, body, method_type):
|
||||
return (200, "")
|
||||
|
||||
def _upost(self, path, body, delete=False):
|
||||
return (200, "")
|
||||
|
||||
def test_rpc_send_request_to_configurator(self):
|
||||
|
||||
with mock.patch('oslo_messaging.rpc.client._CallContext.cast') as cast:
|
||||
cast.side_effect = self._cast
|
||||
|
||||
test_context = TestContext().get_test_context()
|
||||
conf = Map(backend='rpc', RPC=Map(topic='topic'))
|
||||
|
||||
transport.send_request_to_configurator(
|
||||
conf,
|
||||
test_context['context'],
|
||||
test_context['body'],
|
||||
test_context['method_type'],
|
||||
test_context['device_config'])
|
||||
|
||||
def test_tcp_rest_send_request_to_configurator(self):
|
||||
|
||||
with mock.patch.object(transport.RestApi, 'post') as mock_post:
|
||||
mock_post.side_effect = self._post
|
||||
|
||||
test_context = TestContext().get_test_context()
|
||||
conf = Map(backend='tcp_rest', RPC=Map(topic='topic'),
|
||||
REST=Map(rest_server_ip='0.0.0.0',
|
||||
rest_server_port=5672))
|
||||
|
||||
transport.send_request_to_configurator(
|
||||
conf,
|
||||
test_context['context'],
|
||||
test_context['body'],
|
||||
test_context['method_type'],
|
||||
test_context['device_config'])
|
||||
|
||||
def test_unix_rest_send_request_to_configurator(self):
|
||||
|
||||
with mock.patch(self.imprt_rc + '.post') as mock_post:
|
||||
mock_post.side_effect = self._upost
|
||||
|
||||
test_context = TestContext().get_test_context()
|
||||
conf = Map(backend='unix_rest')
|
||||
|
||||
transport.send_request_to_configurator(
|
||||
conf,
|
||||
test_context['context'],
|
||||
test_context['body'],
|
||||
test_context['method_type'],
|
||||
test_context['device_config'])
|
||||
|
||||
def test_tcp_rest_get_response_from_configurator(self):
|
||||
|
||||
with mock.patch.object(transport.RestApi, 'get') as (
|
||||
mock_get), mock.patch.object(jsonutils, 'loads') as (
|
||||
mock_loads):
|
||||
mock_get.side_effect = self._get
|
||||
mock_loads.return_value = True
|
||||
|
||||
conf = Map(backend='tcp_rest', RPC=Map(topic='topic'),
|
||||
REST=Map(rest_server_ip='0.0.0.0',
|
||||
rest_server_port=5672))
|
||||
|
||||
transport.get_response_from_configurator(conf)
|
||||
|
||||
def test_unix_rest_get_response_from_configurator(self):
|
||||
|
||||
with mock.patch(self.imprt_rc + '.get') as (
|
||||
mock_get), mock.patch.object(jsonutils, 'loads') as (
|
||||
mock_loads):
|
||||
mock_get.side_effect = self._uget
|
||||
mock_loads.return_value = True
|
||||
|
||||
conf = Map(backend='unix_rest')
|
||||
|
||||
transport.get_response_from_configurator(conf)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -0,0 +1,83 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from gbpservice.nfp.proxy_agent.modules import proxy_agent
|
||||
import mock
|
||||
from neutron import context as ctx
|
||||
import unittest
|
||||
|
||||
rpc_manager = proxy_agent.RpcHandler
|
||||
|
||||
|
||||
class TestContext(object):
|
||||
|
||||
def get_context(self):
|
||||
try:
|
||||
return ctx.Context('some_user', 'some_tenant')
|
||||
except Exception:
|
||||
return ctx.Context('some_user', 'some_tenant')
|
||||
|
||||
"Common class for proxy agent test cases"
|
||||
|
||||
|
||||
class ConfigAgentProxyTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.manager = rpc_manager('conf', 'sc')
|
||||
self.context = TestContext().get_context()
|
||||
self.import_lib = 'gbpservice.nfp.lib.transport'
|
||||
|
||||
def _post(self, conf, context, body,
|
||||
method_type, device_config=False,
|
||||
network_function_event=False):
|
||||
return (200, '')
|
||||
|
||||
def test_create_network_function_config(self):
|
||||
_data = "data"
|
||||
import_send = self.import_lib + '.send_request_to_configurator'
|
||||
with mock.patch(import_send) as mock_send:
|
||||
mock_send.side_effect = self._post
|
||||
self.manager.create_network_function_config(self.context, _data)
|
||||
|
||||
def test_delete_network_function_config(self):
|
||||
_data = "data"
|
||||
import_send = self.import_lib + '.send_request_to_configurator'
|
||||
with mock.patch(import_send) as mock_send:
|
||||
mock_send.side_effect = self._post
|
||||
self.manager.delete_network_function_config(self.context, _data)
|
||||
|
||||
def test_create_network_function_device_config(self):
|
||||
_data = "data"
|
||||
import_send = self.import_lib + '.send_request_to_configurator'
|
||||
with mock.patch(import_send) as mock_send:
|
||||
mock_send.side_effect = self._post
|
||||
self.manager.create_network_function_device_config(
|
||||
self.context, _data)
|
||||
|
||||
def test_delete_network_function_device_config(self):
|
||||
_data = "data"
|
||||
import_send = self.import_lib + '.send_request_to_configurator'
|
||||
with mock.patch(import_send) as mock_send:
|
||||
mock_send.side_effect = self._post
|
||||
self.manager.delete_network_function_device_config(
|
||||
self.context, _data)
|
||||
|
||||
def test_network_function_event(self):
|
||||
_data = "data"
|
||||
import_send = self.import_lib + '.send_request_to_configurator'
|
||||
with mock.patch(import_send) as mock_send:
|
||||
mock_send.side_effect = self._post
|
||||
self.manager.network_function_event(
|
||||
self.context, _data)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,95 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from gbpservice.nfp.proxy_agent.notifications import pull
|
||||
import mock
|
||||
from neutron import context as ctx
|
||||
import unittest
|
||||
|
||||
from neutron.common import rpc as n_rpc
|
||||
from oslo_config import cfg
|
||||
|
||||
pull_notification = pull.PullNotification
|
||||
|
||||
|
||||
class TestContext(object):
|
||||
|
||||
def get_context_dict(self):
|
||||
try:
|
||||
context = ctx.Context('some_user', 'some_tenant')
|
||||
except Exception:
|
||||
context = ctx.Context('some_user', 'some_tenant')
|
||||
return context.to_dict()
|
||||
|
||||
"""Common class for pull notification tests"""
|
||||
|
||||
|
||||
class PullNotificationTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
n_rpc.init(cfg.CONF)
|
||||
self.p_notification = pull_notification('sc', 'conf')
|
||||
self.context = TestContext().get_context_dict()
|
||||
self.ev = ''
|
||||
self.import_lib = 'gbpservice.nfp.lib.transport'
|
||||
self.import_cast = 'oslo_messaging.rpc.client._CallContext.cast'
|
||||
|
||||
def _resp_base_structure(self, requester):
|
||||
response_data = [{
|
||||
'info': {
|
||||
'context': {
|
||||
'neutron_context': self.context,
|
||||
'requester': requester}
|
||||
}}]
|
||||
return response_data
|
||||
|
||||
def _cast(self, context, method, **kwargs):
|
||||
return
|
||||
|
||||
def _resp_data_nso(self, conf):
|
||||
response_data = self._resp_base_structure('service_orch')
|
||||
return response_data
|
||||
|
||||
def _resp_data_ndo(self, conf):
|
||||
response_data = self._resp_base_structure('device_orch')
|
||||
return response_data
|
||||
|
||||
def _resp_data_nco(self, conf):
|
||||
response_data = self._resp_base_structure('nas_service')
|
||||
return response_data
|
||||
|
||||
def test_nco_pull_notifications(self):
|
||||
import_get = self.import_lib + '.get_response_from_configurator'
|
||||
with mock.patch(import_get) as (
|
||||
mock_get), mock.patch(self.import_cast) as (
|
||||
mock_cast):
|
||||
mock_get.side_effect = self._resp_data_nco
|
||||
mock_cast.side_effect = self._cast
|
||||
self.p_notification.pull_notifications(self.ev)
|
||||
|
||||
def test_nso_pull_notifications(self):
|
||||
import_get = self.import_lib + '.get_response_from_configurator'
|
||||
with mock.patch(import_get) as (
|
||||
mock_get), mock.patch(self.import_cast) as (
|
||||
mock_cast):
|
||||
mock_get.side_effect = self._resp_data_nso
|
||||
mock_cast.side_effect = self._cast
|
||||
self.p_notification.pull_notifications(self.ev)
|
||||
|
||||
def test_ndo_pull_notifications(self):
|
||||
import_get = self.import_lib + '.get_response_from_configurator'
|
||||
with mock.patch(import_get) as (
|
||||
mock_get), mock.patch(self.import_cast) as (
|
||||
mock_cast):
|
||||
mock_get.side_effect = self._resp_data_ndo
|
||||
mock_cast.side_effect = self._cast
|
||||
self.p_notification.pull_notifications(self.ev)
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
GBP_MODE = "gbp"
|
||||
NEUTRON_MODE = "neutron"
|
||||
NOVA_MODE = "nova"
|
||||
|
||||
NEUTRON_PORT = "neutron_port"
|
||||
GBP_PORT = "gbp_policy_target"
|
||||
@@ -36,7 +37,24 @@ PENDING_UPDATE = "PENDING_UPDATE"
|
||||
PENDING_DELETE = "PENDING_DELETE"
|
||||
ERROR = "ERROR"
|
||||
|
||||
DEVICE_ORCHESTRATOR = "device_orch"
|
||||
SERVICE_ORCHESTRATOR = "service_orch"
|
||||
|
||||
HEAT_CONFIG_TAG = 'heat_config'
|
||||
CONFIG_INIT_TAG = 'config_init'
|
||||
ANSIBLE_TAG = 'ansible'
|
||||
CUSTOM_JSON = 'custom_json'
|
||||
|
||||
COMPLETED = "COMPLETED"
|
||||
IN_PROGRESS = "IN_PROGRESS"
|
||||
|
||||
CONFIG_INIT_TAG = "config_init"
|
||||
CONFIG_SCRIPT = 'config_script'
|
||||
|
||||
CONFIG_TAG_RESOURCE_MAP = {
|
||||
HEAT_CONFIG_TAG: 'heat',
|
||||
CONFIG_INIT_TAG: 'config_init',
|
||||
ANSIBLE_TAG: 'ansible',
|
||||
CUSTOM_JSON: 'custom_json'}
|
||||
|
||||
LOADBALANCER_RPC_API_VERSION = "2.0"
|
||||
|
||||
170
gbpservice/nfp/lib/rest_client_over_unix.py
Normal file
170
gbpservice/nfp/lib/rest_client_over_unix.py
Normal file
@@ -0,0 +1,170 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import exceptions
|
||||
import httplib
|
||||
import httplib2
|
||||
import zlib
|
||||
|
||||
import six.moves.urllib.parse as urlparse
|
||||
import socket
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from gbpservice.nfp.core import log as nfp_logging
|
||||
|
||||
LOG = nfp_logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RestClientException(exceptions.Exception):
|
||||
|
||||
""" RestClient Exception """
|
||||
|
||||
|
||||
class UnixHTTPConnection(httplib.HTTPConnection):
|
||||
|
||||
"""Connection class for HTTP over UNIX domain socket."""
|
||||
|
||||
def __init__(self, host, port=None, strict=None, timeout=None,
|
||||
proxy_info=None):
|
||||
httplib.HTTPConnection.__init__(self, host, port, strict)
|
||||
self.timeout = timeout
|
||||
self.socket_path = '/var/run/uds_socket'
|
||||
|
||||
def connect(self):
|
||||
"""Method used to connect socket server."""
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
if self.timeout:
|
||||
self.sock.settimeout(self.timeout)
|
||||
try:
|
||||
self.sock.connect(self.socket_path)
|
||||
except socket.error as exc:
|
||||
raise RestClientException(
|
||||
"Caught exception socket.error : %s" % exc)
|
||||
|
||||
|
||||
class UnixRestClient(object):
|
||||
|
||||
def _http_request(self, url, method_type, headers=None, body=None):
|
||||
try:
|
||||
h = httplib2.Http()
|
||||
resp, content = h.request(
|
||||
url,
|
||||
method=method_type,
|
||||
headers=headers,
|
||||
body=body,
|
||||
connection_type=UnixHTTPConnection)
|
||||
return resp, content
|
||||
|
||||
except httplib2.ServerNotFoundError:
|
||||
raise RestClientException("Server Not Found")
|
||||
|
||||
except exceptions.Exception as e:
|
||||
raise RestClientException("httplib response error %s" % (e))
|
||||
|
||||
def send_request(self, path, method_type, request_method='http',
|
||||
server_addr='127.0.0.1',
|
||||
headers=None, body=None):
|
||||
"""Implementation for common interface for all unix crud requests.
|
||||
Return:Http Response
|
||||
"""
|
||||
# prepares path, body, url for sending unix request.
|
||||
if method_type.upper() != 'GET':
|
||||
body = jsonutils.dumps(body)
|
||||
body = zlib.compress(body)
|
||||
|
||||
path = '/v1/nfp/' + path
|
||||
url = urlparse.urlunsplit((
|
||||
request_method,
|
||||
server_addr,
|
||||
path,
|
||||
None,
|
||||
''))
|
||||
|
||||
try:
|
||||
resp, content = self._http_request(url, method_type,
|
||||
headers=headers, body=body)
|
||||
if content != '':
|
||||
content = zlib.decompress(content)
|
||||
message = "%s:%s" % (resp, content)
|
||||
LOG.info(message)
|
||||
except RestClientException as rce:
|
||||
message = "ERROR : %s" % (rce)
|
||||
LOG.error(message)
|
||||
raise rce
|
||||
|
||||
success_code = [200, 201, 202, 204]
|
||||
# Evaluate responses into success and failures.
|
||||
# Raise exception for failure cases which needs
|
||||
# to be handled by caller.
|
||||
if success_code.__contains__(resp.status):
|
||||
return resp, content
|
||||
elif resp.status == 400:
|
||||
raise RestClientException("HTTPBadRequest: %s" % resp.reason)
|
||||
elif resp.status == 401:
|
||||
raise RestClientException("HTTPUnauthorized: %s" % resp.reason)
|
||||
elif resp.status == 403:
|
||||
raise RestClientException("HTTPForbidden: %s" % resp.reason)
|
||||
elif resp.status == 404:
|
||||
raise RestClientException("HttpNotFound: %s" % resp.reason)
|
||||
elif resp.status == 405:
|
||||
raise RestClientException(
|
||||
"HTTPMethodNotAllowed: %s" % resp.reason)
|
||||
elif resp.status == 406:
|
||||
raise RestClientException("HTTPNotAcceptable: %s" % resp.reason)
|
||||
elif resp.status == 408:
|
||||
raise RestClientException("HTTPRequestTimeout: %s" % resp.reason)
|
||||
elif resp.status == 409:
|
||||
raise RestClientException("HTTPConflict: %s" % resp.reason)
|
||||
elif resp.status == 415:
|
||||
raise RestClientException(
|
||||
"HTTPUnsupportedMediaType: %s" % resp.reason)
|
||||
elif resp.status == 417:
|
||||
raise RestClientException(
|
||||
"HTTPExpectationFailed: %s" % resp.reason)
|
||||
elif resp.status == 500:
|
||||
raise RestClientException("HTTPServerError: %s" % resp.reason)
|
||||
else:
|
||||
raise Exception('Unhandled Exception code: %s %s' % (resp.status,
|
||||
resp.reason))
|
||||
|
||||
|
||||
def get(path):
|
||||
"""Implements get method for unix restclient
|
||||
Return:Http Response
|
||||
"""
|
||||
return UnixRestClient().send_request(path, 'GET')
|
||||
|
||||
|
||||
def put(path, body):
|
||||
"""Implements put method for unix restclient
|
||||
Return:Http Response
|
||||
"""
|
||||
headers = {'content-type': 'application/octet-stream'}
|
||||
return UnixRestClient().send_request(
|
||||
path, 'PUT', headers=headers, body=body)
|
||||
|
||||
|
||||
def post(path, body, delete=False):
|
||||
"""Implements post method for unix restclient
|
||||
Return:Http Response
|
||||
"""
|
||||
# Method-Type added here,as DELETE/CREATE
|
||||
# both case are handled by post as delete also needs
|
||||
# to send data to the rest-unix-server.
|
||||
headers = {'content-type': 'application/octet-stream'}
|
||||
if delete:
|
||||
headers.update({'method-type': 'DELETE'})
|
||||
else:
|
||||
headers.update({'method-type': 'CREATE'})
|
||||
return UnixRestClient().send_request(
|
||||
path, 'POST', headers=headers, body=body)
|
||||
@@ -9,7 +9,308 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import exceptions
|
||||
|
||||
from gbpservice.nfp.common import constants as nfp_constants
|
||||
from gbpservice.nfp.core import log as nfp_logging
|
||||
from gbpservice.nfp.lib import rest_client_over_unix as unix_rc
|
||||
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron import context as n_context
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_config import cfg as oslo_config
|
||||
import oslo_messaging as messaging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
import requests
|
||||
|
||||
LOG = nfp_logging.getLogger(__name__)
|
||||
Version = 'v1' # v1/v2/v3#
|
||||
|
||||
rest_opts = [
|
||||
cfg.StrOpt('rest_server_address',
|
||||
default='', help='Rest connection IpAddr'),
|
||||
cfg.IntOpt('rest_server_port',
|
||||
default=8080, help='Rest connection Port'),
|
||||
]
|
||||
|
||||
rpc_opts = [
|
||||
cfg.StrOpt('topic',
|
||||
default='', help='Topic for rpc connection'),
|
||||
]
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt(
|
||||
'backend',
|
||||
default='rpc',
|
||||
help='Backend Support for communicationg with configurator.'
|
||||
),
|
||||
]
|
||||
|
||||
oslo_config.CONF.register_opts(OPTS)
|
||||
oslo_config.CONF.register_opts(rest_opts, "REST")
|
||||
oslo_config.CONF.register_opts(rpc_opts, "RPC")
|
||||
n_rpc.init(cfg.CONF)
|
||||
|
||||
UNIX_REST = 'unix_rest'
|
||||
TCP_REST = 'tcp_rest'
|
||||
|
||||
""" Common Class for restClient exceptions """
|
||||
|
||||
|
||||
class RestClientException(exceptions.Exception):
|
||||
|
||||
""" RestClient Exception """
|
||||
|
||||
""" Common Class to handle restclient request"""
|
||||
|
||||
|
||||
class RestApi(object):
|
||||
|
||||
def __init__(self, rest_server_address, rest_server_port):
|
||||
self.rest_server_address = rest_server_address
|
||||
self.rest_server_port = rest_server_port
|
||||
self.url = "http://%s:%s/v1/nfp/%s"
|
||||
|
||||
def _response(self, resp, url):
|
||||
success_code = [200, 201, 202, 204]
|
||||
# Evaluate responses into success and failures.
|
||||
# Raise exception for failure cases which needs
|
||||
# to be handled in caller function.
|
||||
if success_code.__contains__(resp.status_code):
|
||||
return resp
|
||||
elif resp.status_code == 400:
|
||||
raise RestClientException("HTTPBadRequest: %s" % resp.reason)
|
||||
elif resp.status_code == 401:
|
||||
raise RestClientException("HTTPUnauthorized: %s" % resp.reason)
|
||||
elif resp.status_code == 403:
|
||||
raise RestClientException("HTTPForbidden: %s" % resp.reason)
|
||||
elif resp.status_code == 404:
|
||||
raise RestClientException("HttpNotFound: %s" % resp.reason)
|
||||
elif resp.status_code == 405:
|
||||
raise RestClientException(
|
||||
"HTTPMethodNotAllowed: %s" % resp.reason)
|
||||
elif resp.status_code == 406:
|
||||
raise RestClientException("HTTPNotAcceptable: %s" % resp.reason)
|
||||
elif resp.status_code == 408:
|
||||
raise RestClientException("HTTPRequestTimeout: %s" % resp.reason)
|
||||
elif resp.status_code == 409:
|
||||
raise RestClientException("HTTPConflict: %s" % resp.reason)
|
||||
elif resp.status_code == 415:
|
||||
raise RestClientException(
|
||||
"HTTPUnsupportedMediaType: %s" % resp.reason)
|
||||
elif resp.status_code == 417:
|
||||
raise RestClientException(
|
||||
"HTTPExpectationFailed: %s" % resp.reason)
|
||||
elif resp.status_code == 500:
|
||||
raise RestClientException("HTTPServerError: %s" % resp.reason)
|
||||
else:
|
||||
raise RestClientException('Unhandled Exception code: %s %s' %
|
||||
(resp.status_code, resp.reason))
|
||||
return resp
|
||||
|
||||
def post(self, path, body, method_type):
|
||||
"""Post restclient request handler
|
||||
Return:Http response
|
||||
"""
|
||||
url = self.url % (
|
||||
self.rest_server_address,
|
||||
self.rest_server_port, path)
|
||||
data = jsonutils.dumps(body)
|
||||
try:
|
||||
# Method-Type needs to be added here,as DELETE/CREATE
|
||||
# both case are handled by post as delete also needs
|
||||
# to send data to the rest-server.
|
||||
headers = {"content-type": "application/json",
|
||||
"method-type": method_type}
|
||||
resp = requests.post(url, data,
|
||||
headers=headers)
|
||||
message = "POST url %s %d" % (url, resp.status_code)
|
||||
LOG.info(message)
|
||||
return self._response(resp, url)
|
||||
except RestClientException as rce:
|
||||
message = "Rest API %s - Failed. Reason: %s" % (
|
||||
url, rce)
|
||||
LOG.error(message)
|
||||
|
||||
def get(self, path):
|
||||
"""Get restclient request handler
|
||||
Return:Http response
|
||||
"""
|
||||
url = self.url % (
|
||||
self.rest_server_address,
|
||||
self.rest_server_port, path)
|
||||
try:
|
||||
headers = {"content-type": "application/json"}
|
||||
resp = requests.get(url,
|
||||
headers=headers)
|
||||
message = "GET url %s %d" % (url, resp.status_code)
|
||||
LOG.info(message)
|
||||
return self._response(resp, url)
|
||||
except RestClientException as rce:
|
||||
message = "Rest API %s - Failed. Reason: %s" % (
|
||||
url, rce)
|
||||
LOG.error(message)
|
||||
|
||||
""" Common Class to handle rpcclient request"""
|
||||
|
||||
|
||||
class RPCClient(object):
|
||||
API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic):
|
||||
self.topic = topic
|
||||
target = messaging.Target(topic=self.topic,
|
||||
version=self.API_VERSION)
|
||||
self.client = n_rpc.get_client(target)
|
||||
self.cctxt = self.client.prepare(version=self.API_VERSION,
|
||||
topic=self.topic)
|
||||
|
||||
|
||||
def send_request_to_configurator(conf, context, body,
|
||||
method_type, device_config=False,
|
||||
network_function_event=False):
|
||||
"""Common function to handle (create, delete) request for configurator.
|
||||
Send create/delete to configurator rest-server.
|
||||
Return:Http Response
|
||||
"""
|
||||
# This function reads configuration data and decides
|
||||
# method (tcp_rest/rpc) for sending request to configurator.
|
||||
if device_config:
|
||||
method_name = method_type.lower() + '_network_function_device_config'
|
||||
body['info']['context'].update({'neutron_context': context.to_dict()})
|
||||
elif network_function_event:
|
||||
method_name = 'network_function_event'
|
||||
else:
|
||||
if (body['config'][0]['resource'] in
|
||||
nfp_constants.CONFIG_TAG_RESOURCE_MAP.values()):
|
||||
body['config'][0]['resource_data'].update(
|
||||
{'neutron_context': context.to_dict()})
|
||||
body['info']['context'].update(
|
||||
{'neutron_context': context.to_dict()})
|
||||
method_name = method_type.lower() + '_network_function_config'
|
||||
|
||||
if conf.backend == TCP_REST:
|
||||
try:
|
||||
rc = RestApi(conf.REST.rest_server_address,
|
||||
conf.REST.rest_server_port)
|
||||
resp = rc.post(method_name, body, method_type.upper())
|
||||
message = "%s -> POST response: (%s) body: %s " % (
|
||||
method_name, resp, body)
|
||||
LOG.info(message)
|
||||
except RestClientException as rce:
|
||||
message = "%s -> POST request failed.Reason: %s" % (
|
||||
method_name, rce)
|
||||
LOG.error(message)
|
||||
|
||||
elif conf.backend == UNIX_REST:
|
||||
try:
|
||||
resp, content = unix_rc.post(method_name,
|
||||
body=body)
|
||||
message = "%s -> POST response: (%s) body : %s " % (
|
||||
method_name, content, body)
|
||||
LOG.info(message)
|
||||
|
||||
except unix_rc.RestClientException as rce:
|
||||
message = "%s -> request failed . Reason %s " % (
|
||||
method_name, rce)
|
||||
LOG.error(message)
|
||||
|
||||
else:
|
||||
message = ("%s -> RPC request sent ! with body : %s " % (
|
||||
(method_name, body)))
|
||||
LOG.info(message)
|
||||
rpcClient = RPCClient(conf.RPC.topic)
|
||||
rpcClient.cctxt.cast(context, method_name,
|
||||
body=body)
|
||||
|
||||
|
||||
def get_response_from_configurator(conf):
|
||||
"""Common function to handle get request for configurator.
|
||||
Get notification http response from configurator rest server.
|
||||
Return:Http Response
|
||||
response_data = [
|
||||
{'receiver': <neutron/device_orchestrator/service_orchestrator>,
|
||||
'resource': <firewall/vpn/loadbalancer/orchestrator>,
|
||||
'method': <notification method name>,
|
||||
'kwargs': <notification method arguments>
|
||||
},
|
||||
]
|
||||
"""
|
||||
# This function reads configuration data and decides
|
||||
# method (tcp_rest/ unix_rest/ rpc) for get response from configurator.
|
||||
if conf.backend == TCP_REST:
|
||||
try:
|
||||
rc = RestApi(conf.REST.rest_server_address,
|
||||
conf.REST.rest_server_port)
|
||||
resp = rc.get('get_notifications')
|
||||
rpc_cbs_data = jsonutils.loads(resp.content)
|
||||
return rpc_cbs_data
|
||||
except RestClientException as rce:
|
||||
message = ("get_notification ->"
|
||||
"GET request failed. Reason : %s" % (rce))
|
||||
LOG.error(message)
|
||||
return "get_notification -> GET request failed. Reason : %s" % (
|
||||
rce)
|
||||
except Exception as e:
|
||||
message = ("get_notification ->"
|
||||
"GET request failed. Reason : %s" % (e))
|
||||
LOG.error(message)
|
||||
return "get_notification -> GET request failed. Reason : %s" % (
|
||||
e)
|
||||
|
||||
elif conf.backend == UNIX_REST:
|
||||
try:
|
||||
resp, content = unix_rc.get('get_notifications')
|
||||
content = jsonutils.loads(content)
|
||||
if content:
|
||||
message = ("get_notification ->"
|
||||
"GET response: (%s)" % (content))
|
||||
LOG.info(message)
|
||||
return content
|
||||
except unix_rc.RestClientException as rce:
|
||||
message = ("get_notification ->"
|
||||
"GET request failed. Reason : %s" % (
|
||||
rce))
|
||||
LOG.error(message)
|
||||
return "get_notification -> GET request failed. Reason : %s" % (
|
||||
rce)
|
||||
except Exception as e:
|
||||
message = ("get_notification ->"
|
||||
"GET request failed. Reason : %s" % (
|
||||
e))
|
||||
LOG.error(message)
|
||||
return "get_notification -> GET request failed. Reason : %s" % (
|
||||
e)
|
||||
|
||||
else:
|
||||
rpc_cbs_data = []
|
||||
try:
|
||||
rpcClient = RPCClient(conf.RPC.topic)
|
||||
context = n_context.Context(
|
||||
'config_agent_user', 'config_agent_tenant')
|
||||
rpc_cbs_data = rpcClient.cctxt.call(context,
|
||||
'get_notifications')
|
||||
return rpc_cbs_data
|
||||
except Exception as e:
|
||||
message = "Exception while processing %s" % e
|
||||
LOG.error(message)
|
||||
return "get_notification -> GET request failed. Reason : %s" % (
|
||||
e)
|
||||
|
||||
|
||||
def parse_service_flavor_string(service_flavor_str):
|
||||
pass
|
||||
"""Parse service_flavour string to service details dictionary.
|
||||
Return: Service Details Dictionary
|
||||
"""
|
||||
service_details = {}
|
||||
if ',' not in service_flavor_str:
|
||||
service_details['device_type'] = 'nova'
|
||||
service_details['service_vendor'] = service_flavor_str
|
||||
else:
|
||||
service_flavor_dict = dict(item.split('=') for item
|
||||
in service_flavor_str.split(','))
|
||||
service_details = {key.strip(): value.strip() for key, value
|
||||
in service_flavor_dict.iteritems()}
|
||||
return service_details
|
||||
|
||||
@@ -63,6 +63,7 @@ def events_init(controller, config, service_orchestrator):
|
||||
'DEVICE_CREATED', 'DEVICE_ACTIVE', 'DEVICE_DELETED',
|
||||
'DEVICE_CREATE_FAILED', 'SEND_HEAT_CONFIG',
|
||||
'CHECK_HEAT_CONFIG_RESULT', 'APPLY_USER_CONFIG',
|
||||
'APPLY_USER_CONFIG_BASEMODE',
|
||||
'DELETE_USER_CONFIG', 'UPDATE_USER_CONFIG',
|
||||
'POLICY_TARGET_ADD', 'POLICY_TARGET_REMOVE',
|
||||
'CONSUMER_ADD', 'CONSUMER_REMOVE',
|
||||
@@ -415,6 +416,7 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler):
|
||||
"DEVICE_DELETED": self.handle_device_deleted,
|
||||
"DEVICE_CREATE_FAILED": self.handle_device_create_failed,
|
||||
"APPLY_USER_CONFIG": self.apply_user_config,
|
||||
"APPLY_USER_CONFIG_BASEMODE": self.apply_user_config_basemode,
|
||||
"CHECK_HEAT_CONFIG_RESULT": self.check_heat_config_result,
|
||||
"DELETE_USER_CONFIG": self.delete_user_config,
|
||||
"UPDATE_USER_CONFIG": self.handle_update_user_config,
|
||||
@@ -753,13 +755,6 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler):
|
||||
network_function_status)
|
||||
return None
|
||||
|
||||
if base_mode_support:
|
||||
# In base mode support, create user config directly, no need to
|
||||
# create network function instance, network function device first.
|
||||
self.create_network_function_user_config(network_function['id'],
|
||||
service_config_str)
|
||||
return network_function
|
||||
|
||||
nfp_context = network_function_info
|
||||
|
||||
service_details['service_type'] = service_profile['service_type']
|
||||
@@ -767,11 +762,20 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler):
|
||||
nfp_context['network_function'] = network_function
|
||||
nfp_context['service_details'] = service_details
|
||||
nfp_context['share_existing_device'] = False
|
||||
nfp_context['base_mode'] = base_mode_support
|
||||
|
||||
# Create and event to perform Network service instance
|
||||
self._create_event('CREATE_NETWORK_FUNCTION_INSTANCE',
|
||||
event_data=nfp_context,
|
||||
is_internal_event=True)
|
||||
if base_mode_support:
|
||||
# Store the context in current thread
|
||||
nfp_core_context.store_nfp_context(nfp_context)
|
||||
# In base mode support, create user config directly, no need to
|
||||
# create network function instance, network function device first.
|
||||
self.create_network_function_user_config(network_function['id'],
|
||||
service_config_str)
|
||||
else:
|
||||
# Create and event to perform Network service instance
|
||||
self._create_event('CREATE_NETWORK_FUNCTION_INSTANCE',
|
||||
event_data=nfp_context,
|
||||
is_internal_event=True)
|
||||
|
||||
nfp_logging.clear_logging_context()
|
||||
return network_function
|
||||
@@ -955,14 +959,46 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler):
|
||||
def check_heat_config_result(self, event):
|
||||
nfp_context = event.data['nfp_context']
|
||||
|
||||
event_desc = nfp_context['event_desc']
|
||||
key = nfp_context['key']
|
||||
id = nfp_context['id']
|
||||
base_mode = nfp_context['base_mode']
|
||||
if base_mode:
|
||||
# Create and event to apply user config
|
||||
self._create_event('APPLY_USER_CONFIG_BASEMODE',
|
||||
event_data=event.data,
|
||||
is_internal_event=True)
|
||||
else:
|
||||
event_desc = nfp_context['event_desc']
|
||||
key = nfp_context['key']
|
||||
id = nfp_context['id']
|
||||
|
||||
# Complete the original event here
|
||||
event = self._controller.new_event(
|
||||
id=id, key=key, desc_dict=event_desc)
|
||||
self._controller.event_complete(event, result='SUCCESS')
|
||||
# Complete the original event here
|
||||
event = self._controller.new_event(id=id, key=key,
|
||||
desc_dict=event_desc)
|
||||
self._controller.event_complete(event, result='SUCCESS')
|
||||
|
||||
def apply_user_config_basemode(self, event):
|
||||
request_data = event.data
|
||||
network_function_details = self.get_network_function_details(
|
||||
request_data['network_function_id'])
|
||||
request_data['heat_stack_id'] = self.config_driver.apply_config(
|
||||
network_function_details) # Heat driver to launch stack
|
||||
network_function = network_function_details['network_function']
|
||||
request_data['network_function_id'] = network_function['id']
|
||||
if not request_data['heat_stack_id']:
|
||||
self._create_event('USER_CONFIG_FAILED',
|
||||
event_data=request_data, is_internal_event=True)
|
||||
return
|
||||
request_data['tenant_id'] = network_function['tenant_id']
|
||||
request_data['network_function_details'] = network_function_details
|
||||
LOG.debug("handle_device_active heat_stack_id: %s"
|
||||
% (request_data['heat_stack_id']))
|
||||
self.db_handler.update_network_function(
|
||||
self.db_session, network_function['id'],
|
||||
{'heat_stack_id': request_data['heat_stack_id'],
|
||||
'description': network_function['description']})
|
||||
self._create_event('APPLY_USER_CONFIG_IN_PROGRESS',
|
||||
event_data=request_data,
|
||||
is_poll_event=True,
|
||||
original_event=event)
|
||||
|
||||
def apply_user_config(self, event):
|
||||
nfp_context = event.data
|
||||
@@ -1739,12 +1775,14 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler):
|
||||
|
||||
nfp_context = nfp_core_context.get_nfp_context()
|
||||
if nfp_context:
|
||||
network_function = nfp_context['network_function']
|
||||
network_function_instance = nfp_context[
|
||||
'network_function_instance']
|
||||
network_function_device = nfp_context['network_function_device']
|
||||
service_type = nfp_context['service_details']['service_type']
|
||||
|
||||
network_function = nfp_context.get('network_function', None)
|
||||
network_function_instance = nfp_context.get(
|
||||
'network_function_instance', None)
|
||||
network_function_device = nfp_context.get(
|
||||
'network_function_device', None)
|
||||
service_details = nfp_context.get('service_details', None)
|
||||
if service_details:
|
||||
service_type = service_details.get('service_type', None)
|
||||
if not network_function:
|
||||
network_function = self.db_handler.get_network_function(
|
||||
self.db_session, network_function_id)
|
||||
@@ -1827,11 +1865,13 @@ class NSOConfiguratorRpcApi(object):
|
||||
network_function_instance = network_function_details.get(
|
||||
'network_function_instance')
|
||||
nfp_context = nfp_core_context.get_nfp_context()
|
||||
nfp_context_rpc = None
|
||||
rpc_nfp_context = None
|
||||
if nfp_context:
|
||||
nfp_context_rpc = {'event_desc': nfp_context['event_desc'],
|
||||
'key': nfp_context.pop('key'),
|
||||
'id': nfp_context.pop('id')}
|
||||
rpc_nfp_context = {
|
||||
'event_desc': nfp_context.get('event_desc', None),
|
||||
'key': nfp_context.pop('key', None),
|
||||
'id': nfp_context.pop('id', None),
|
||||
'base_mode': nfp_context.pop('base_mode', None)}
|
||||
request_info = {
|
||||
'nf_id': network_function_details['network_function']['id'],
|
||||
'nfi_id': (network_function_instance['id']
|
||||
@@ -1840,7 +1880,7 @@ class NSOConfiguratorRpcApi(object):
|
||||
'requester': nfp_constants.SERVICE_ORCHESTRATOR,
|
||||
'operation': operation,
|
||||
'logging_context': nfp_logging.get_logging_context(),
|
||||
'nfp_context': nfp_context_rpc
|
||||
'nfp_context': rpc_nfp_context
|
||||
}
|
||||
if operation in ['consumer_add', 'consumer_remove']:
|
||||
request_info.update({'consumer_ptg': user_config_data[
|
||||
|
||||
0
gbpservice/nfp/proxy_agent/__init__.py
Normal file
0
gbpservice/nfp/proxy_agent/__init__.py
Normal file
0
gbpservice/nfp/proxy_agent/lib/__init__.py
Normal file
0
gbpservice/nfp/proxy_agent/lib/__init__.py
Normal file
16
gbpservice/nfp/proxy_agent/lib/topics.py
Normal file
16
gbpservice/nfp/proxy_agent/lib/topics.py
Normal file
@@ -0,0 +1,16 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
PROXY_AGENT_TOPIC = 'nfp-proxy-agent-topic'
|
||||
SERVICE_ORCH_TOPIC = 'nfp-nso-notification-topic'
|
||||
DEVICE_ORCH_TOPIC = 'nfp-ndo-notification-topic'
|
||||
CONFIG_ORCH_TOPIC = 'nfp-nco-notification-topic'
|
||||
0
gbpservice/nfp/proxy_agent/modules/__init__.py
Normal file
0
gbpservice/nfp/proxy_agent/modules/__init__.py
Normal file
34
gbpservice/nfp/proxy_agent/modules/notification_agent.py
Normal file
34
gbpservice/nfp/proxy_agent/modules/notification_agent.py
Normal file
@@ -0,0 +1,34 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from gbpservice.nfp.core.event import Event
|
||||
from gbpservice.nfp.proxy_agent.notifications import pull
|
||||
|
||||
|
||||
def events_init(sc, conf):
|
||||
"""Register event with its handler."""
|
||||
evs = [
|
||||
Event(id='PULL_NOTIFICATIONS',
|
||||
handler=pull.PullNotification(sc, conf))]
|
||||
sc.register_events(evs)
|
||||
|
||||
|
||||
def nfp_module_init(sc, conf):
|
||||
"""Initialize module to register rpc & event handler"""
|
||||
events_init(sc, conf)
|
||||
|
||||
|
||||
def nfp_module_post_init(sc, conf):
|
||||
"""Post a event for pull notification after each periodic_task_interval"""
|
||||
ev = sc.new_event(id='PULL_NOTIFICATIONS',
|
||||
key='PULL_NOTIFICATIONS')
|
||||
sc.post_event(ev)
|
||||
105
gbpservice/nfp/proxy_agent/modules/proxy_agent.py
Normal file
105
gbpservice/nfp/proxy_agent/modules/proxy_agent.py
Normal file
@@ -0,0 +1,105 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from gbpservice.nfp.core import log as nfp_logging
|
||||
from gbpservice.nfp.core.rpc import RpcAgent
|
||||
from gbpservice.nfp.lib import transport as transport
|
||||
from gbpservice.nfp.proxy_agent.lib import topics
|
||||
|
||||
|
||||
from oslo_log import helpers as log_helpers
|
||||
import oslo_messaging as messaging
|
||||
|
||||
LOG = nfp_logging.getLogger(__name__)
|
||||
|
||||
|
||||
def rpc_init(config, sc):
|
||||
"""Register agent with its handler."""
|
||||
rpcmgr = RpcHandler(config, sc)
|
||||
agent = RpcAgent(
|
||||
sc,
|
||||
host=config.host,
|
||||
topic=topics.PROXY_AGENT_TOPIC,
|
||||
manager=rpcmgr)
|
||||
sc.register_rpc_agents([agent])
|
||||
|
||||
|
||||
def nfp_module_init(sc, conf):
|
||||
"""Initialize module to register rpc & event handler"""
|
||||
rpc_init(conf, sc)
|
||||
|
||||
|
||||
class RpcHandler(object):
|
||||
RPC_API_VERSION = '1.0'
|
||||
target = messaging.Target(version=RPC_API_VERSION)
|
||||
|
||||
def __init__(self, conf, sc):
|
||||
super(RpcHandler, self).__init__()
|
||||
self._conf = conf
|
||||
self._sc = sc
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def create_network_function_config(self, context, body):
|
||||
"""Method of rpc handler for create_network_function_config.
|
||||
Return: Http Response.
|
||||
"""
|
||||
transport.send_request_to_configurator(self._conf,
|
||||
context, body,
|
||||
"CREATE")
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def delete_network_function_config(self, context, body):
|
||||
"""Method of rpc handler for delete_network_function_config.
|
||||
Return: Http Response.
|
||||
"""
|
||||
transport.send_request_to_configurator(self._conf,
|
||||
context, body,
|
||||
"DELETE")
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def update_network_function_config(self, context, body):
|
||||
"""Method of rpc handler for delete_network_function_config.
|
||||
Return: Http Response.
|
||||
"""
|
||||
transport.send_request_to_configurator(self._conf,
|
||||
context, body,
|
||||
"UPDATE")
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def create_network_function_device_config(self, context, body):
|
||||
"""Method of rpc handler for create_network_function_device_config.
|
||||
Return: Http Response.
|
||||
"""
|
||||
transport.send_request_to_configurator(self._conf,
|
||||
context, body,
|
||||
"CREATE",
|
||||
device_config=True)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def delete_network_function_device_config(self, context, body):
|
||||
"""Method of rpc handler for delete_network_function_device_config.
|
||||
Return: Http Response.
|
||||
"""
|
||||
transport.send_request_to_configurator(self._conf,
|
||||
context, body,
|
||||
"DELETE",
|
||||
device_config=True)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def network_function_event(self, context, body):
|
||||
"""Method of rpc handler for create_service.
|
||||
Return: Http Response.
|
||||
"""
|
||||
transport.send_request_to_configurator(self._conf,
|
||||
context, body,
|
||||
"CREATE",
|
||||
network_function_event=True)
|
||||
89
gbpservice/nfp/proxy_agent/notifications/pull.py
Normal file
89
gbpservice/nfp/proxy_agent/notifications/pull.py
Normal file
@@ -0,0 +1,89 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from gbpservice.nfp.core import log as nfp_logging
|
||||
from gbpservice.nfp.core import module as nfp_api
|
||||
from gbpservice.nfp.lib import transport as transport
|
||||
from gbpservice.nfp.proxy_agent.lib import topics as a_topics
|
||||
|
||||
from neutron import context as n_context
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
LOG = nfp_logging.getLogger(__name__)
|
||||
|
||||
ResourceMap = {
|
||||
'device_orch': a_topics.DEVICE_ORCH_TOPIC,
|
||||
'service_orch': a_topics.SERVICE_ORCH_TOPIC,
|
||||
'nas_service': a_topics.CONFIG_ORCH_TOPIC
|
||||
}
|
||||
|
||||
|
||||
"""Periodic Class to pull notification from configurator"""
|
||||
|
||||
|
||||
class PullNotification(nfp_api.NfpEventHandler):
|
||||
|
||||
def __init__(self, sc, conf):
|
||||
self._sc = sc
|
||||
self._conf = conf
|
||||
|
||||
def handle_event(self, ev):
|
||||
self._sc.poll_event(ev)
|
||||
|
||||
def _method_handler(self, notification):
|
||||
# Method handles notification as per resource, resource_type and method
|
||||
try:
|
||||
requester = notification['info']['context']['requester']
|
||||
topic = ResourceMap[requester]
|
||||
context = notification['info']['context']['neutron_context']
|
||||
rpcClient = transport.RPCClient(topic)
|
||||
rpc_ctx = n_context.Context.from_dict(context)
|
||||
rpcClient.cctxt.cast(rpc_ctx,
|
||||
'network_function_notification',
|
||||
notification_data=notification)
|
||||
except Exception as e:
|
||||
raise Exception(e)
|
||||
|
||||
@nfp_api.poll_event_desc(event='PULL_NOTIFICATIONS', spacing=2)
|
||||
def pull_notifications(self, ev):
|
||||
"""Pull and handle notification from configurator."""
|
||||
notifications = transport.get_response_from_configurator(self._conf)
|
||||
|
||||
if not isinstance(notifications, list):
|
||||
message = "Notfications not list, %s" % (notifications)
|
||||
LOG.error(message)
|
||||
|
||||
else:
|
||||
for notification in notifications:
|
||||
if not notification:
|
||||
message = "Receiver Response: Empty"
|
||||
LOG.info(message)
|
||||
continue
|
||||
try:
|
||||
self._method_handler(notification)
|
||||
except AttributeError:
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
message = ("AttributeError while handling"
|
||||
"message %s : %s " % (notification,
|
||||
traceback.format_exception(
|
||||
exc_type, exc_value, exc_traceback)))
|
||||
LOG.error(message)
|
||||
|
||||
except Exception as e:
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
message = ("Generic exception (%s)"
|
||||
"while handling message (%s) : %s" % (
|
||||
e, notification, traceback.format_exception(
|
||||
exc_type, exc_value, exc_traceback)))
|
||||
LOG.error(message)
|
||||
0
gbpservice/nfp/proxy_agent/proxy/__init__.py
Normal file
0
gbpservice/nfp/proxy_agent/proxy/__init__.py
Normal file
333
gbpservice/nfp/proxy_agent/proxy/proxy.py
Normal file
333
gbpservice/nfp/proxy_agent/proxy/proxy.py
Normal file
@@ -0,0 +1,333 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
import argparse
|
||||
import ConfigParser
|
||||
from gbpservice.nfp.core import log as nfp_logging
|
||||
import os
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as oslo_logging
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
||||
oslo_logging.register_options(cfg.CONF)
|
||||
|
||||
LOG = nfp_logging.getLogger(__name__)
|
||||
|
||||
# Queue of proxy connections which workers will handle
|
||||
ConnQ = eventlet.queue.Queue(maxsize=0)
|
||||
|
||||
tcp_open_connection_count = 0
|
||||
tcp_close_connection_count = 0
|
||||
|
||||
|
||||
class ConnectionIdleTimeOut(Exception):
|
||||
|
||||
'''
|
||||
Exception raised when connection is idle for configured timeout
|
||||
'''
|
||||
pass
|
||||
|
||||
|
||||
"""
|
||||
parsing the proxy configuration file
|
||||
"""
|
||||
|
||||
|
||||
class Configuration(object):
|
||||
|
||||
def __init__(self, filee):
|
||||
config = ConfigParser.ConfigParser()
|
||||
config.read(filee)
|
||||
|
||||
self.thread_pool_size = config.getint('OPTIONS', 'thread_pool_size')
|
||||
self.unix_bind_path = config.get('OPTIONS', 'unix_bind_path')
|
||||
self.max_connections = config.getint('OPTIONS', 'max_connections')
|
||||
self.worker_threads = config.getint('OPTIONS', 'worker_threads')
|
||||
self.connect_max_wait_timeout = config.getfloat(
|
||||
'OPTIONS', 'connect_max_wait_timeout')
|
||||
self.idle_max_wait_timeout = config.getfloat(
|
||||
'OPTIONS', 'idle_max_wait_timeout')
|
||||
self.idle_min_wait_timeout = config.getfloat(
|
||||
'OPTIONS', 'idle_min_wait_timeout')
|
||||
self.rest_server_address = config.get(
|
||||
'NFP_CONTROLLER', 'rest_server_address')
|
||||
self.rest_server_port = config.getint(
|
||||
'NFP_CONTROLLER', 'rest_server_port')
|
||||
|
||||
|
||||
"""
|
||||
Class to create Unix Listener
|
||||
"""
|
||||
|
||||
|
||||
class UnixServer(object):
|
||||
|
||||
def __init__(self, conf, proxy):
|
||||
self.proxy = proxy
|
||||
self.bind_path = conf.unix_bind_path
|
||||
self.max_connections = conf.max_connections
|
||||
# Make sure the socket does not already exist
|
||||
try:
|
||||
os.unlink(self.bind_path)
|
||||
except OSError:
|
||||
if os.path.exists(self.bind_path):
|
||||
raise
|
||||
|
||||
# Create a UDS socket
|
||||
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
|
||||
# Bind the socket to the port
|
||||
message = 'starting up on %s' % self.bind_path
|
||||
LOG.info(message)
|
||||
self.socket.bind(self.bind_path)
|
||||
self.socket.listen(self.max_connections)
|
||||
|
||||
def listen(self):
|
||||
client, address = self.socket.accept()
|
||||
self.proxy.new_client(client, address)
|
||||
|
||||
|
||||
"""
|
||||
Class to create TCP client Connection if
|
||||
TCP server is alive
|
||||
"""
|
||||
|
||||
|
||||
class TcpClient(object):
|
||||
|
||||
def __init__(self, conf, proxy):
|
||||
self.conf = conf
|
||||
self.proxy = proxy
|
||||
self.server_address = conf.rest_server_address
|
||||
self.server_port = conf.rest_server_port
|
||||
# Connect the socket to the port where the server is listening
|
||||
self.server = (self.server_address, self.server_port)
|
||||
|
||||
def connect(self):
|
||||
sock = socket.socket()
|
||||
message = 'connecting to %s port %s' % self.server
|
||||
LOG.info(message)
|
||||
sock.settimeout(self.conf.connect_max_wait_timeout)
|
||||
try:
|
||||
sock.connect(self.server)
|
||||
except socket.error as exc:
|
||||
message = "Caught exception socket.error : %s" % exc
|
||||
LOG.error(message)
|
||||
return sock, False
|
||||
return sock, True
|
||||
|
||||
|
||||
"""
|
||||
ADT for proxy connection
|
||||
"""
|
||||
|
||||
|
||||
class Connection(object):
|
||||
|
||||
def __init__(self, conf, socket, type='unix'):
|
||||
self._socket = socket
|
||||
self._idle_wait = conf.idle_min_wait_timeout
|
||||
self._idle_timeout = conf.idle_max_wait_timeout
|
||||
self._idle_count_max = (self._idle_timeout / self._idle_wait)
|
||||
self._idle_count = 0
|
||||
self._start_time = time.time()
|
||||
self._end_time = time.time()
|
||||
self.type = type
|
||||
self.socket_id = self._socket.fileno()
|
||||
|
||||
def _tick(self):
|
||||
self._idle_count += 1
|
||||
|
||||
def _timedout(self):
|
||||
if self._idle_count > self._idle_count_max:
|
||||
self._end_time = time.time()
|
||||
raise ConnectionIdleTimeOut(
|
||||
"Connection (%d) - stime (%s) - etime (%s) - "
|
||||
"idle_count (%d) idle_count_max(%d)" % (
|
||||
self.identify(), self._start_time,
|
||||
self._end_time, self._idle_count, self._idle_count_max))
|
||||
|
||||
def idle(self):
|
||||
self._tick()
|
||||
self._timedout()
|
||||
|
||||
def idle_reset(self):
|
||||
self._idle_count = 0
|
||||
self._start_time = time.time()
|
||||
|
||||
def _wait(self, timeout):
|
||||
if self.type == 'unix':
|
||||
eventlet.sleep(timeout)
|
||||
self._socket.setblocking(0)
|
||||
else:
|
||||
self._socket.settimeout(timeout)
|
||||
|
||||
def recv(self):
|
||||
self._wait(self._idle_wait)
|
||||
try:
|
||||
data = self._socket.recv(1024)
|
||||
if data and len(data):
|
||||
self.idle_reset()
|
||||
return data
|
||||
self.idle()
|
||||
except socket.timeout:
|
||||
self.idle()
|
||||
except socket.error:
|
||||
self.idle()
|
||||
return None
|
||||
|
||||
def send(self, data):
|
||||
self._socket.setblocking(1)
|
||||
self._socket.sendall(data)
|
||||
self._socket.setblocking(0)
|
||||
|
||||
def close(self):
|
||||
message = "Closing Socket - %d" % (self.identify())
|
||||
LOG.debug(message)
|
||||
try:
|
||||
self._socket.shutdown(socket.SHUT_RDWR)
|
||||
self._socket.close()
|
||||
except Exception as exc:
|
||||
message = "%s - exception while closing - %s" % (
|
||||
self.identify(), str(exc))
|
||||
LOG.error(message)
|
||||
|
||||
def identify(self):
|
||||
return self.socket_id
|
||||
|
||||
|
||||
"""
|
||||
ADT for Proxy Connection Object
|
||||
Each Connection Object is pair of Unix Socket and
|
||||
TCP Client Socket
|
||||
"""
|
||||
|
||||
|
||||
class ProxyConnection(object):
|
||||
|
||||
def __init__(self, conf, unix_socket, tcp_socket):
|
||||
self._unix_conn = Connection(conf, unix_socket, type='unix')
|
||||
self._tcp_conn = Connection(conf, tcp_socket, type='tcp')
|
||||
message = "New Proxy - Unix - %d, TCP - %d" % (
|
||||
self._unix_conn.identify(), self._tcp_conn.identify())
|
||||
LOG.debug(message)
|
||||
|
||||
def close(self):
|
||||
self._unix_conn.close()
|
||||
self._tcp_conn.close()
|
||||
|
||||
def _proxy(self, rxconn, txconn):
|
||||
data = rxconn.recv()
|
||||
if data:
|
||||
txconn.send(data)
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
self._proxy(self._unix_conn, self._tcp_conn)
|
||||
self._proxy(self._tcp_conn, self._unix_conn)
|
||||
return True
|
||||
except Exception as exc:
|
||||
message = "%s" % (exc)
|
||||
LOG.debug(message)
|
||||
self._unix_conn.close()
|
||||
self._tcp_conn.close()
|
||||
return False
|
||||
|
||||
def identify(self):
|
||||
return '%d:%d' % (
|
||||
self._unix_conn.identify(),
|
||||
self._tcp_conn.identify())
|
||||
|
||||
|
||||
"""
|
||||
ADT for proxy Worker
|
||||
"""
|
||||
|
||||
|
||||
class Worker(object):
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Worker thread will pop the Proxy Connection Object
|
||||
from Connection Queue and Perform send and receive
|
||||
operations. If the connection is ideal upto ideal_max_timeout
|
||||
it will not push the Object into connection queue so Proxy Connection
|
||||
Object is automatically destroy, otherwise it will again
|
||||
push the Object in connection Queue
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
pc = ConnQ.get()
|
||||
call = True
|
||||
while call:
|
||||
call = pc.run()
|
||||
except eventlet.queue.Empty:
|
||||
pass
|
||||
eventlet.sleep(0)
|
||||
|
||||
|
||||
"""
|
||||
ADT to Run the configurator proxy,
|
||||
accept the Unix Client request,
|
||||
Check REST Server is reachable or not,
|
||||
Try to establish TCP Client Connection to REST
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class Proxy(object):
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
# Be a server and wait for connections from the client
|
||||
self.server = UnixServer(conf, self)
|
||||
self.client = TcpClient(conf, self)
|
||||
|
||||
def start(self):
|
||||
"""Run each worker in new thread"""
|
||||
|
||||
for i in range(self.conf.worker_threads):
|
||||
eventlet.spawn_n(Worker().run)
|
||||
while True:
|
||||
self.server.listen()
|
||||
|
||||
def new_client(self, unixsocket, address):
|
||||
"""Establish connection with the tcp server"""
|
||||
|
||||
tcpsocket, connected = self.client.connect()
|
||||
if not connected:
|
||||
message = "Proxy -> Could not connect with tcp server"
|
||||
LOG.error(message)
|
||||
unixsocket.close()
|
||||
tcpsocket.close()
|
||||
else:
|
||||
pc = ProxyConnection(self.conf, unixsocket, tcpsocket)
|
||||
ConnQ.put(pc)
|
||||
|
||||
|
||||
def main(argv):
|
||||
cfg.CONF(args=sys.argv[1:])
|
||||
oslo_logging.setup(cfg.CONF, 'nfp')
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'-config-file', "--config-file", action="store", dest='config_file')
|
||||
parser.add_argument(
|
||||
'-log-file', "--log-file", action="store", dest='log_file')
|
||||
args = parser.parse_args(sys.argv[1:])
|
||||
conf = Configuration(args.config_file)
|
||||
Proxy(conf).start()
|
||||
Reference in New Issue
Block a user