From f4b68b18f8dc58e35f5bd7b74c6b4d50390fa35d Mon Sep 17 00:00:00 2001 From: Ashutosh Mishra Date: Mon, 28 Mar 2016 21:07:29 +0530 Subject: [PATCH] NFP - Base mode Service Orchestrator Device won't get created in base mode support. Inserted proxy_agent component in this patch as other patches are dependent on this. Change-Id: Ia13caec590584ea8958abdffff09a6fef9dcb226 Implements: blueprint gbp-network-services-framework Co-Authored-By: Yogesh Rajmane Co-Authored-By: Akash Deep Co-Authored-By: Ahmed Khan (cherry picked from commit ea5e8a26219fef7568e600db834df5bafe5224c9) --- .../neutron/tests/unit/nfp/lib/__init__.py | 0 .../tests/unit/nfp/lib/test_transport.py | 179 ++++++++++ .../tests/unit/nfp/proxy_agent/__init__.py | 0 .../unit/nfp/proxy_agent/modules/__init__.py | 0 .../proxy_agent/modules/test_proxy_agent.py | 83 +++++ .../nfp/proxy_agent/notifications/__init__.py | 0 .../notifications/test_pull_notifications.py | 95 +++++ gbpservice/nfp/common/constants.py | 18 + gbpservice/nfp/lib/rest_client_over_unix.py | 170 +++++++++ gbpservice/nfp/lib/transport.py | 303 +++++++++++++++- .../modules/service_orchestrator.py | 98 ++++-- gbpservice/nfp/proxy_agent/__init__.py | 0 gbpservice/nfp/proxy_agent/lib/__init__.py | 0 gbpservice/nfp/proxy_agent/lib/topics.py | 16 + .../nfp/proxy_agent/modules/__init__.py | 0 .../proxy_agent/modules/notification_agent.py | 34 ++ .../nfp/proxy_agent/modules/proxy_agent.py | 105 ++++++ .../nfp/proxy_agent/notifications/__init__.py | 0 .../nfp/proxy_agent/notifications/pull.py | 89 +++++ gbpservice/nfp/proxy_agent/proxy/__init__.py | 0 gbpservice/nfp/proxy_agent/proxy/proxy.py | 333 ++++++++++++++++++ 21 files changed, 1493 insertions(+), 30 deletions(-) create mode 100644 gbpservice/neutron/tests/unit/nfp/lib/__init__.py create mode 100644 gbpservice/neutron/tests/unit/nfp/lib/test_transport.py create mode 100644 gbpservice/neutron/tests/unit/nfp/proxy_agent/__init__.py create mode 100644 gbpservice/neutron/tests/unit/nfp/proxy_agent/modules/__init__.py create mode 100644 gbpservice/neutron/tests/unit/nfp/proxy_agent/modules/test_proxy_agent.py create mode 100644 gbpservice/neutron/tests/unit/nfp/proxy_agent/notifications/__init__.py create mode 100644 gbpservice/neutron/tests/unit/nfp/proxy_agent/notifications/test_pull_notifications.py create mode 100644 gbpservice/nfp/lib/rest_client_over_unix.py create mode 100644 gbpservice/nfp/proxy_agent/__init__.py create mode 100644 gbpservice/nfp/proxy_agent/lib/__init__.py create mode 100644 gbpservice/nfp/proxy_agent/lib/topics.py create mode 100644 gbpservice/nfp/proxy_agent/modules/__init__.py create mode 100644 gbpservice/nfp/proxy_agent/modules/notification_agent.py create mode 100644 gbpservice/nfp/proxy_agent/modules/proxy_agent.py create mode 100644 gbpservice/nfp/proxy_agent/notifications/__init__.py create mode 100644 gbpservice/nfp/proxy_agent/notifications/pull.py create mode 100644 gbpservice/nfp/proxy_agent/proxy/__init__.py create mode 100644 gbpservice/nfp/proxy_agent/proxy/proxy.py diff --git a/gbpservice/neutron/tests/unit/nfp/lib/__init__.py b/gbpservice/neutron/tests/unit/nfp/lib/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/neutron/tests/unit/nfp/lib/test_transport.py b/gbpservice/neutron/tests/unit/nfp/lib/test_transport.py new file mode 100644 index 000000000..672499289 --- /dev/null +++ b/gbpservice/neutron/tests/unit/nfp/lib/test_transport.py @@ -0,0 +1,179 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from gbpservice.nfp.lib import transport +import mock +from neutron.common import rpc as n_rpc +from neutron import context as ctx +from oslo_config import cfg +from oslo_serialization import jsonutils +import unittest + +""" +Common class used to create configuration mapping +""" + + +class Map(dict): + + def __init__(self, *args, **kwargs): + super(Map, self).__init__(*args, **kwargs) + for arg in args: + if isinstance(arg, dict): + for k, v in arg.iteritems(): + self[k] = v + + if kwargs: + for k, v in kwargs.iteritems(): + self[k] = v + + def __getattr__(self, attr): + return self.get(attr) + + def __setattr__(self, key, value): + self.__setitem__(key, value) + + def __setitem__(self, key, value): + super(Map, self).__setitem__(key, value) + self.__dict__.update({key: value}) + + def __delattr__(self, item): + self.__delitem__(item) + + def __delitem__(self, key): + super(Map, self).__delitem__(key) + del self.__dict__[key] + + +class TestContext(object): + + def get_context(self): + try: + return ctx.Context('some_user', 'some_tenant') + except Exception: + return ctx.Context('some_user', 'some_tenant') + + def get_test_context(self): + # creating a test context + variables = {} + variables['context'] = self.get_context() + variables['body'] = {'info': {'context': {}}, + 'config': []} + variables['method_type'] = 'CREATE' + variables['device_config'] = True + return variables + + +class CommonLibraryTest(unittest.TestCase): + + def setUp(self): + n_rpc.init(cfg.CONF) + self.imprt_rc = 'gbpservice.nfp.lib.rest_client_over_unix' + + def _cast(self, context, method, **kwargs): + return + + def _call(self, context, method, **kwargs): + return [] + + def _get(self, path): + + class MockResponse(object): + + def __init__(self): + self.content = {'success': '200'} + return MockResponse() + + def _uget(self, path): + return(200, "") + + def _post(self, path, body, method_type): + return (200, "") + + def _upost(self, path, body, delete=False): + return (200, "") + + def test_rpc_send_request_to_configurator(self): + + with mock.patch('oslo_messaging.rpc.client._CallContext.cast') as cast: + cast.side_effect = self._cast + + test_context = TestContext().get_test_context() + conf = Map(backend='rpc', RPC=Map(topic='topic')) + + transport.send_request_to_configurator( + conf, + test_context['context'], + test_context['body'], + test_context['method_type'], + test_context['device_config']) + + def test_tcp_rest_send_request_to_configurator(self): + + with mock.patch.object(transport.RestApi, 'post') as mock_post: + mock_post.side_effect = self._post + + test_context = TestContext().get_test_context() + conf = Map(backend='tcp_rest', RPC=Map(topic='topic'), + REST=Map(rest_server_ip='0.0.0.0', + rest_server_port=5672)) + + transport.send_request_to_configurator( + conf, + test_context['context'], + test_context['body'], + test_context['method_type'], + test_context['device_config']) + + def test_unix_rest_send_request_to_configurator(self): + + with mock.patch(self.imprt_rc + '.post') as mock_post: + mock_post.side_effect = self._upost + + test_context = TestContext().get_test_context() + conf = Map(backend='unix_rest') + + transport.send_request_to_configurator( + conf, + test_context['context'], + test_context['body'], + test_context['method_type'], + test_context['device_config']) + + def test_tcp_rest_get_response_from_configurator(self): + + with mock.patch.object(transport.RestApi, 'get') as ( + mock_get), mock.patch.object(jsonutils, 'loads') as ( + mock_loads): + mock_get.side_effect = self._get + mock_loads.return_value = True + + conf = Map(backend='tcp_rest', RPC=Map(topic='topic'), + REST=Map(rest_server_ip='0.0.0.0', + rest_server_port=5672)) + + transport.get_response_from_configurator(conf) + + def test_unix_rest_get_response_from_configurator(self): + + with mock.patch(self.imprt_rc + '.get') as ( + mock_get), mock.patch.object(jsonutils, 'loads') as ( + mock_loads): + mock_get.side_effect = self._uget + mock_loads.return_value = True + + conf = Map(backend='unix_rest') + + transport.get_response_from_configurator(conf) + +if __name__ == '__main__': + unittest.main() diff --git a/gbpservice/neutron/tests/unit/nfp/proxy_agent/__init__.py b/gbpservice/neutron/tests/unit/nfp/proxy_agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/neutron/tests/unit/nfp/proxy_agent/modules/__init__.py b/gbpservice/neutron/tests/unit/nfp/proxy_agent/modules/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/neutron/tests/unit/nfp/proxy_agent/modules/test_proxy_agent.py b/gbpservice/neutron/tests/unit/nfp/proxy_agent/modules/test_proxy_agent.py new file mode 100644 index 000000000..ba727dd2d --- /dev/null +++ b/gbpservice/neutron/tests/unit/nfp/proxy_agent/modules/test_proxy_agent.py @@ -0,0 +1,83 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from gbpservice.nfp.proxy_agent.modules import proxy_agent +import mock +from neutron import context as ctx +import unittest + +rpc_manager = proxy_agent.RpcHandler + + +class TestContext(object): + + def get_context(self): + try: + return ctx.Context('some_user', 'some_tenant') + except Exception: + return ctx.Context('some_user', 'some_tenant') + +"Common class for proxy agent test cases" + + +class ConfigAgentProxyTestCase(unittest.TestCase): + + def setUp(self): + self.manager = rpc_manager('conf', 'sc') + self.context = TestContext().get_context() + self.import_lib = 'gbpservice.nfp.lib.transport' + + def _post(self, conf, context, body, + method_type, device_config=False, + network_function_event=False): + return (200, '') + + def test_create_network_function_config(self): + _data = "data" + import_send = self.import_lib + '.send_request_to_configurator' + with mock.patch(import_send) as mock_send: + mock_send.side_effect = self._post + self.manager.create_network_function_config(self.context, _data) + + def test_delete_network_function_config(self): + _data = "data" + import_send = self.import_lib + '.send_request_to_configurator' + with mock.patch(import_send) as mock_send: + mock_send.side_effect = self._post + self.manager.delete_network_function_config(self.context, _data) + + def test_create_network_function_device_config(self): + _data = "data" + import_send = self.import_lib + '.send_request_to_configurator' + with mock.patch(import_send) as mock_send: + mock_send.side_effect = self._post + self.manager.create_network_function_device_config( + self.context, _data) + + def test_delete_network_function_device_config(self): + _data = "data" + import_send = self.import_lib + '.send_request_to_configurator' + with mock.patch(import_send) as mock_send: + mock_send.side_effect = self._post + self.manager.delete_network_function_device_config( + self.context, _data) + + def test_network_function_event(self): + _data = "data" + import_send = self.import_lib + '.send_request_to_configurator' + with mock.patch(import_send) as mock_send: + mock_send.side_effect = self._post + self.manager.network_function_event( + self.context, _data) + +if __name__ == "__main__": + unittest.main() diff --git a/gbpservice/neutron/tests/unit/nfp/proxy_agent/notifications/__init__.py b/gbpservice/neutron/tests/unit/nfp/proxy_agent/notifications/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/neutron/tests/unit/nfp/proxy_agent/notifications/test_pull_notifications.py b/gbpservice/neutron/tests/unit/nfp/proxy_agent/notifications/test_pull_notifications.py new file mode 100644 index 000000000..100c1ab3f --- /dev/null +++ b/gbpservice/neutron/tests/unit/nfp/proxy_agent/notifications/test_pull_notifications.py @@ -0,0 +1,95 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from gbpservice.nfp.proxy_agent.notifications import pull +import mock +from neutron import context as ctx +import unittest + +from neutron.common import rpc as n_rpc +from oslo_config import cfg + +pull_notification = pull.PullNotification + + +class TestContext(object): + + def get_context_dict(self): + try: + context = ctx.Context('some_user', 'some_tenant') + except Exception: + context = ctx.Context('some_user', 'some_tenant') + return context.to_dict() + +"""Common class for pull notification tests""" + + +class PullNotificationTestCase(unittest.TestCase): + + def setUp(self): + n_rpc.init(cfg.CONF) + self.p_notification = pull_notification('sc', 'conf') + self.context = TestContext().get_context_dict() + self.ev = '' + self.import_lib = 'gbpservice.nfp.lib.transport' + self.import_cast = 'oslo_messaging.rpc.client._CallContext.cast' + + def _resp_base_structure(self, requester): + response_data = [{ + 'info': { + 'context': { + 'neutron_context': self.context, + 'requester': requester} + }}] + return response_data + + def _cast(self, context, method, **kwargs): + return + + def _resp_data_nso(self, conf): + response_data = self._resp_base_structure('service_orch') + return response_data + + def _resp_data_ndo(self, conf): + response_data = self._resp_base_structure('device_orch') + return response_data + + def _resp_data_nco(self, conf): + response_data = self._resp_base_structure('nas_service') + return response_data + + def test_nco_pull_notifications(self): + import_get = self.import_lib + '.get_response_from_configurator' + with mock.patch(import_get) as ( + mock_get), mock.patch(self.import_cast) as ( + mock_cast): + mock_get.side_effect = self._resp_data_nco + mock_cast.side_effect = self._cast + self.p_notification.pull_notifications(self.ev) + + def test_nso_pull_notifications(self): + import_get = self.import_lib + '.get_response_from_configurator' + with mock.patch(import_get) as ( + mock_get), mock.patch(self.import_cast) as ( + mock_cast): + mock_get.side_effect = self._resp_data_nso + mock_cast.side_effect = self._cast + self.p_notification.pull_notifications(self.ev) + + def test_ndo_pull_notifications(self): + import_get = self.import_lib + '.get_response_from_configurator' + with mock.patch(import_get) as ( + mock_get), mock.patch(self.import_cast) as ( + mock_cast): + mock_get.side_effect = self._resp_data_ndo + mock_cast.side_effect = self._cast + self.p_notification.pull_notifications(self.ev) diff --git a/gbpservice/nfp/common/constants.py b/gbpservice/nfp/common/constants.py index d012da724..f4c1a6ba2 100644 --- a/gbpservice/nfp/common/constants.py +++ b/gbpservice/nfp/common/constants.py @@ -12,6 +12,7 @@ GBP_MODE = "gbp" NEUTRON_MODE = "neutron" +NOVA_MODE = "nova" NEUTRON_PORT = "neutron_port" GBP_PORT = "gbp_policy_target" @@ -36,7 +37,24 @@ PENDING_UPDATE = "PENDING_UPDATE" PENDING_DELETE = "PENDING_DELETE" ERROR = "ERROR" +DEVICE_ORCHESTRATOR = "device_orch" +SERVICE_ORCHESTRATOR = "service_orch" + +HEAT_CONFIG_TAG = 'heat_config' +CONFIG_INIT_TAG = 'config_init' +ANSIBLE_TAG = 'ansible' +CUSTOM_JSON = 'custom_json' + COMPLETED = "COMPLETED" IN_PROGRESS = "IN_PROGRESS" CONFIG_INIT_TAG = "config_init" +CONFIG_SCRIPT = 'config_script' + +CONFIG_TAG_RESOURCE_MAP = { + HEAT_CONFIG_TAG: 'heat', + CONFIG_INIT_TAG: 'config_init', + ANSIBLE_TAG: 'ansible', + CUSTOM_JSON: 'custom_json'} + +LOADBALANCER_RPC_API_VERSION = "2.0" diff --git a/gbpservice/nfp/lib/rest_client_over_unix.py b/gbpservice/nfp/lib/rest_client_over_unix.py new file mode 100644 index 000000000..020e71ff5 --- /dev/null +++ b/gbpservice/nfp/lib/rest_client_over_unix.py @@ -0,0 +1,170 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import exceptions +import httplib +import httplib2 +import zlib + +import six.moves.urllib.parse as urlparse +import socket + +from oslo_serialization import jsonutils + +from gbpservice.nfp.core import log as nfp_logging + +LOG = nfp_logging.getLogger(__name__) + + +class RestClientException(exceptions.Exception): + + """ RestClient Exception """ + + +class UnixHTTPConnection(httplib.HTTPConnection): + + """Connection class for HTTP over UNIX domain socket.""" + + def __init__(self, host, port=None, strict=None, timeout=None, + proxy_info=None): + httplib.HTTPConnection.__init__(self, host, port, strict) + self.timeout = timeout + self.socket_path = '/var/run/uds_socket' + + def connect(self): + """Method used to connect socket server.""" + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + if self.timeout: + self.sock.settimeout(self.timeout) + try: + self.sock.connect(self.socket_path) + except socket.error as exc: + raise RestClientException( + "Caught exception socket.error : %s" % exc) + + +class UnixRestClient(object): + + def _http_request(self, url, method_type, headers=None, body=None): + try: + h = httplib2.Http() + resp, content = h.request( + url, + method=method_type, + headers=headers, + body=body, + connection_type=UnixHTTPConnection) + return resp, content + + except httplib2.ServerNotFoundError: + raise RestClientException("Server Not Found") + + except exceptions.Exception as e: + raise RestClientException("httplib response error %s" % (e)) + + def send_request(self, path, method_type, request_method='http', + server_addr='127.0.0.1', + headers=None, body=None): + """Implementation for common interface for all unix crud requests. + Return:Http Response + """ + # prepares path, body, url for sending unix request. + if method_type.upper() != 'GET': + body = jsonutils.dumps(body) + body = zlib.compress(body) + + path = '/v1/nfp/' + path + url = urlparse.urlunsplit(( + request_method, + server_addr, + path, + None, + '')) + + try: + resp, content = self._http_request(url, method_type, + headers=headers, body=body) + if content != '': + content = zlib.decompress(content) + message = "%s:%s" % (resp, content) + LOG.info(message) + except RestClientException as rce: + message = "ERROR : %s" % (rce) + LOG.error(message) + raise rce + + success_code = [200, 201, 202, 204] + # Evaluate responses into success and failures. + # Raise exception for failure cases which needs + # to be handled by caller. + if success_code.__contains__(resp.status): + return resp, content + elif resp.status == 400: + raise RestClientException("HTTPBadRequest: %s" % resp.reason) + elif resp.status == 401: + raise RestClientException("HTTPUnauthorized: %s" % resp.reason) + elif resp.status == 403: + raise RestClientException("HTTPForbidden: %s" % resp.reason) + elif resp.status == 404: + raise RestClientException("HttpNotFound: %s" % resp.reason) + elif resp.status == 405: + raise RestClientException( + "HTTPMethodNotAllowed: %s" % resp.reason) + elif resp.status == 406: + raise RestClientException("HTTPNotAcceptable: %s" % resp.reason) + elif resp.status == 408: + raise RestClientException("HTTPRequestTimeout: %s" % resp.reason) + elif resp.status == 409: + raise RestClientException("HTTPConflict: %s" % resp.reason) + elif resp.status == 415: + raise RestClientException( + "HTTPUnsupportedMediaType: %s" % resp.reason) + elif resp.status == 417: + raise RestClientException( + "HTTPExpectationFailed: %s" % resp.reason) + elif resp.status == 500: + raise RestClientException("HTTPServerError: %s" % resp.reason) + else: + raise Exception('Unhandled Exception code: %s %s' % (resp.status, + resp.reason)) + + +def get(path): + """Implements get method for unix restclient + Return:Http Response + """ + return UnixRestClient().send_request(path, 'GET') + + +def put(path, body): + """Implements put method for unix restclient + Return:Http Response + """ + headers = {'content-type': 'application/octet-stream'} + return UnixRestClient().send_request( + path, 'PUT', headers=headers, body=body) + + +def post(path, body, delete=False): + """Implements post method for unix restclient + Return:Http Response + """ + # Method-Type added here,as DELETE/CREATE + # both case are handled by post as delete also needs + # to send data to the rest-unix-server. + headers = {'content-type': 'application/octet-stream'} + if delete: + headers.update({'method-type': 'DELETE'}) + else: + headers.update({'method-type': 'CREATE'}) + return UnixRestClient().send_request( + path, 'POST', headers=headers, body=body) diff --git a/gbpservice/nfp/lib/transport.py b/gbpservice/nfp/lib/transport.py index b183ec3f4..b726c3bf9 100644 --- a/gbpservice/nfp/lib/transport.py +++ b/gbpservice/nfp/lib/transport.py @@ -9,7 +9,308 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import exceptions + +from gbpservice.nfp.common import constants as nfp_constants +from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.lib import rest_client_over_unix as unix_rc + +from neutron.common import rpc as n_rpc +from neutron import context as n_context + +from oslo_config import cfg +from oslo_config import cfg as oslo_config +import oslo_messaging as messaging +from oslo_serialization import jsonutils + +import requests + +LOG = nfp_logging.getLogger(__name__) +Version = 'v1' # v1/v2/v3# + +rest_opts = [ + cfg.StrOpt('rest_server_address', + default='', help='Rest connection IpAddr'), + cfg.IntOpt('rest_server_port', + default=8080, help='Rest connection Port'), +] + +rpc_opts = [ + cfg.StrOpt('topic', + default='', help='Topic for rpc connection'), +] + +OPTS = [ + cfg.StrOpt( + 'backend', + default='rpc', + help='Backend Support for communicationg with configurator.' + ), +] + +oslo_config.CONF.register_opts(OPTS) +oslo_config.CONF.register_opts(rest_opts, "REST") +oslo_config.CONF.register_opts(rpc_opts, "RPC") +n_rpc.init(cfg.CONF) + +UNIX_REST = 'unix_rest' +TCP_REST = 'tcp_rest' + +""" Common Class for restClient exceptions """ + + +class RestClientException(exceptions.Exception): + + """ RestClient Exception """ + +""" Common Class to handle restclient request""" + + +class RestApi(object): + + def __init__(self, rest_server_address, rest_server_port): + self.rest_server_address = rest_server_address + self.rest_server_port = rest_server_port + self.url = "http://%s:%s/v1/nfp/%s" + + def _response(self, resp, url): + success_code = [200, 201, 202, 204] + # Evaluate responses into success and failures. + # Raise exception for failure cases which needs + # to be handled in caller function. + if success_code.__contains__(resp.status_code): + return resp + elif resp.status_code == 400: + raise RestClientException("HTTPBadRequest: %s" % resp.reason) + elif resp.status_code == 401: + raise RestClientException("HTTPUnauthorized: %s" % resp.reason) + elif resp.status_code == 403: + raise RestClientException("HTTPForbidden: %s" % resp.reason) + elif resp.status_code == 404: + raise RestClientException("HttpNotFound: %s" % resp.reason) + elif resp.status_code == 405: + raise RestClientException( + "HTTPMethodNotAllowed: %s" % resp.reason) + elif resp.status_code == 406: + raise RestClientException("HTTPNotAcceptable: %s" % resp.reason) + elif resp.status_code == 408: + raise RestClientException("HTTPRequestTimeout: %s" % resp.reason) + elif resp.status_code == 409: + raise RestClientException("HTTPConflict: %s" % resp.reason) + elif resp.status_code == 415: + raise RestClientException( + "HTTPUnsupportedMediaType: %s" % resp.reason) + elif resp.status_code == 417: + raise RestClientException( + "HTTPExpectationFailed: %s" % resp.reason) + elif resp.status_code == 500: + raise RestClientException("HTTPServerError: %s" % resp.reason) + else: + raise RestClientException('Unhandled Exception code: %s %s' % + (resp.status_code, resp.reason)) + return resp + + def post(self, path, body, method_type): + """Post restclient request handler + Return:Http response + """ + url = self.url % ( + self.rest_server_address, + self.rest_server_port, path) + data = jsonutils.dumps(body) + try: + # Method-Type needs to be added here,as DELETE/CREATE + # both case are handled by post as delete also needs + # to send data to the rest-server. + headers = {"content-type": "application/json", + "method-type": method_type} + resp = requests.post(url, data, + headers=headers) + message = "POST url %s %d" % (url, resp.status_code) + LOG.info(message) + return self._response(resp, url) + except RestClientException as rce: + message = "Rest API %s - Failed. Reason: %s" % ( + url, rce) + LOG.error(message) + + def get(self, path): + """Get restclient request handler + Return:Http response + """ + url = self.url % ( + self.rest_server_address, + self.rest_server_port, path) + try: + headers = {"content-type": "application/json"} + resp = requests.get(url, + headers=headers) + message = "GET url %s %d" % (url, resp.status_code) + LOG.info(message) + return self._response(resp, url) + except RestClientException as rce: + message = "Rest API %s - Failed. Reason: %s" % ( + url, rce) + LOG.error(message) + +""" Common Class to handle rpcclient request""" + + +class RPCClient(object): + API_VERSION = '1.0' + + def __init__(self, topic): + self.topic = topic + target = messaging.Target(topic=self.topic, + version=self.API_VERSION) + self.client = n_rpc.get_client(target) + self.cctxt = self.client.prepare(version=self.API_VERSION, + topic=self.topic) + + +def send_request_to_configurator(conf, context, body, + method_type, device_config=False, + network_function_event=False): + """Common function to handle (create, delete) request for configurator. + Send create/delete to configurator rest-server. + Return:Http Response + """ + # This function reads configuration data and decides + # method (tcp_rest/rpc) for sending request to configurator. + if device_config: + method_name = method_type.lower() + '_network_function_device_config' + body['info']['context'].update({'neutron_context': context.to_dict()}) + elif network_function_event: + method_name = 'network_function_event' + else: + if (body['config'][0]['resource'] in + nfp_constants.CONFIG_TAG_RESOURCE_MAP.values()): + body['config'][0]['resource_data'].update( + {'neutron_context': context.to_dict()}) + body['info']['context'].update( + {'neutron_context': context.to_dict()}) + method_name = method_type.lower() + '_network_function_config' + + if conf.backend == TCP_REST: + try: + rc = RestApi(conf.REST.rest_server_address, + conf.REST.rest_server_port) + resp = rc.post(method_name, body, method_type.upper()) + message = "%s -> POST response: (%s) body: %s " % ( + method_name, resp, body) + LOG.info(message) + except RestClientException as rce: + message = "%s -> POST request failed.Reason: %s" % ( + method_name, rce) + LOG.error(message) + + elif conf.backend == UNIX_REST: + try: + resp, content = unix_rc.post(method_name, + body=body) + message = "%s -> POST response: (%s) body : %s " % ( + method_name, content, body) + LOG.info(message) + + except unix_rc.RestClientException as rce: + message = "%s -> request failed . Reason %s " % ( + method_name, rce) + LOG.error(message) + + else: + message = ("%s -> RPC request sent ! with body : %s " % ( + (method_name, body))) + LOG.info(message) + rpcClient = RPCClient(conf.RPC.topic) + rpcClient.cctxt.cast(context, method_name, + body=body) + + +def get_response_from_configurator(conf): + """Common function to handle get request for configurator. + Get notification http response from configurator rest server. + Return:Http Response + response_data = [ + {'receiver': , + 'resource': , + 'method': , + 'kwargs': + }, + ] + """ + # This function reads configuration data and decides + # method (tcp_rest/ unix_rest/ rpc) for get response from configurator. + if conf.backend == TCP_REST: + try: + rc = RestApi(conf.REST.rest_server_address, + conf.REST.rest_server_port) + resp = rc.get('get_notifications') + rpc_cbs_data = jsonutils.loads(resp.content) + return rpc_cbs_data + except RestClientException as rce: + message = ("get_notification ->" + "GET request failed. Reason : %s" % (rce)) + LOG.error(message) + return "get_notification -> GET request failed. Reason : %s" % ( + rce) + except Exception as e: + message = ("get_notification ->" + "GET request failed. Reason : %s" % (e)) + LOG.error(message) + return "get_notification -> GET request failed. Reason : %s" % ( + e) + + elif conf.backend == UNIX_REST: + try: + resp, content = unix_rc.get('get_notifications') + content = jsonutils.loads(content) + if content: + message = ("get_notification ->" + "GET response: (%s)" % (content)) + LOG.info(message) + return content + except unix_rc.RestClientException as rce: + message = ("get_notification ->" + "GET request failed. Reason : %s" % ( + rce)) + LOG.error(message) + return "get_notification -> GET request failed. Reason : %s" % ( + rce) + except Exception as e: + message = ("get_notification ->" + "GET request failed. Reason : %s" % ( + e)) + LOG.error(message) + return "get_notification -> GET request failed. Reason : %s" % ( + e) + + else: + rpc_cbs_data = [] + try: + rpcClient = RPCClient(conf.RPC.topic) + context = n_context.Context( + 'config_agent_user', 'config_agent_tenant') + rpc_cbs_data = rpcClient.cctxt.call(context, + 'get_notifications') + return rpc_cbs_data + except Exception as e: + message = "Exception while processing %s" % e + LOG.error(message) + return "get_notification -> GET request failed. Reason : %s" % ( + e) def parse_service_flavor_string(service_flavor_str): - pass + """Parse service_flavour string to service details dictionary. + Return: Service Details Dictionary + """ + service_details = {} + if ',' not in service_flavor_str: + service_details['device_type'] = 'nova' + service_details['service_vendor'] = service_flavor_str + else: + service_flavor_dict = dict(item.split('=') for item + in service_flavor_str.split(',')) + service_details = {key.strip(): value.strip() for key, value + in service_flavor_dict.iteritems()} + return service_details diff --git a/gbpservice/nfp/orchestrator/modules/service_orchestrator.py b/gbpservice/nfp/orchestrator/modules/service_orchestrator.py index 13ec6c3ef..a613be4ed 100644 --- a/gbpservice/nfp/orchestrator/modules/service_orchestrator.py +++ b/gbpservice/nfp/orchestrator/modules/service_orchestrator.py @@ -63,6 +63,7 @@ def events_init(controller, config, service_orchestrator): 'DEVICE_CREATED', 'DEVICE_ACTIVE', 'DEVICE_DELETED', 'DEVICE_CREATE_FAILED', 'SEND_HEAT_CONFIG', 'CHECK_HEAT_CONFIG_RESULT', 'APPLY_USER_CONFIG', + 'APPLY_USER_CONFIG_BASEMODE', 'DELETE_USER_CONFIG', 'UPDATE_USER_CONFIG', 'POLICY_TARGET_ADD', 'POLICY_TARGET_REMOVE', 'CONSUMER_ADD', 'CONSUMER_REMOVE', @@ -415,6 +416,7 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler): "DEVICE_DELETED": self.handle_device_deleted, "DEVICE_CREATE_FAILED": self.handle_device_create_failed, "APPLY_USER_CONFIG": self.apply_user_config, + "APPLY_USER_CONFIG_BASEMODE": self.apply_user_config_basemode, "CHECK_HEAT_CONFIG_RESULT": self.check_heat_config_result, "DELETE_USER_CONFIG": self.delete_user_config, "UPDATE_USER_CONFIG": self.handle_update_user_config, @@ -753,13 +755,6 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler): network_function_status) return None - if base_mode_support: - # In base mode support, create user config directly, no need to - # create network function instance, network function device first. - self.create_network_function_user_config(network_function['id'], - service_config_str) - return network_function - nfp_context = network_function_info service_details['service_type'] = service_profile['service_type'] @@ -767,11 +762,20 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler): nfp_context['network_function'] = network_function nfp_context['service_details'] = service_details nfp_context['share_existing_device'] = False + nfp_context['base_mode'] = base_mode_support - # Create and event to perform Network service instance - self._create_event('CREATE_NETWORK_FUNCTION_INSTANCE', - event_data=nfp_context, - is_internal_event=True) + if base_mode_support: + # Store the context in current thread + nfp_core_context.store_nfp_context(nfp_context) + # In base mode support, create user config directly, no need to + # create network function instance, network function device first. + self.create_network_function_user_config(network_function['id'], + service_config_str) + else: + # Create and event to perform Network service instance + self._create_event('CREATE_NETWORK_FUNCTION_INSTANCE', + event_data=nfp_context, + is_internal_event=True) nfp_logging.clear_logging_context() return network_function @@ -955,14 +959,46 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler): def check_heat_config_result(self, event): nfp_context = event.data['nfp_context'] - event_desc = nfp_context['event_desc'] - key = nfp_context['key'] - id = nfp_context['id'] + base_mode = nfp_context['base_mode'] + if base_mode: + # Create and event to apply user config + self._create_event('APPLY_USER_CONFIG_BASEMODE', + event_data=event.data, + is_internal_event=True) + else: + event_desc = nfp_context['event_desc'] + key = nfp_context['key'] + id = nfp_context['id'] - # Complete the original event here - event = self._controller.new_event( - id=id, key=key, desc_dict=event_desc) - self._controller.event_complete(event, result='SUCCESS') + # Complete the original event here + event = self._controller.new_event(id=id, key=key, + desc_dict=event_desc) + self._controller.event_complete(event, result='SUCCESS') + + def apply_user_config_basemode(self, event): + request_data = event.data + network_function_details = self.get_network_function_details( + request_data['network_function_id']) + request_data['heat_stack_id'] = self.config_driver.apply_config( + network_function_details) # Heat driver to launch stack + network_function = network_function_details['network_function'] + request_data['network_function_id'] = network_function['id'] + if not request_data['heat_stack_id']: + self._create_event('USER_CONFIG_FAILED', + event_data=request_data, is_internal_event=True) + return + request_data['tenant_id'] = network_function['tenant_id'] + request_data['network_function_details'] = network_function_details + LOG.debug("handle_device_active heat_stack_id: %s" + % (request_data['heat_stack_id'])) + self.db_handler.update_network_function( + self.db_session, network_function['id'], + {'heat_stack_id': request_data['heat_stack_id'], + 'description': network_function['description']}) + self._create_event('APPLY_USER_CONFIG_IN_PROGRESS', + event_data=request_data, + is_poll_event=True, + original_event=event) def apply_user_config(self, event): nfp_context = event.data @@ -1739,12 +1775,14 @@ class ServiceOrchestrator(nfp_api.NfpEventHandler): nfp_context = nfp_core_context.get_nfp_context() if nfp_context: - network_function = nfp_context['network_function'] - network_function_instance = nfp_context[ - 'network_function_instance'] - network_function_device = nfp_context['network_function_device'] - service_type = nfp_context['service_details']['service_type'] - + network_function = nfp_context.get('network_function', None) + network_function_instance = nfp_context.get( + 'network_function_instance', None) + network_function_device = nfp_context.get( + 'network_function_device', None) + service_details = nfp_context.get('service_details', None) + if service_details: + service_type = service_details.get('service_type', None) if not network_function: network_function = self.db_handler.get_network_function( self.db_session, network_function_id) @@ -1827,11 +1865,13 @@ class NSOConfiguratorRpcApi(object): network_function_instance = network_function_details.get( 'network_function_instance') nfp_context = nfp_core_context.get_nfp_context() - nfp_context_rpc = None + rpc_nfp_context = None if nfp_context: - nfp_context_rpc = {'event_desc': nfp_context['event_desc'], - 'key': nfp_context.pop('key'), - 'id': nfp_context.pop('id')} + rpc_nfp_context = { + 'event_desc': nfp_context.get('event_desc', None), + 'key': nfp_context.pop('key', None), + 'id': nfp_context.pop('id', None), + 'base_mode': nfp_context.pop('base_mode', None)} request_info = { 'nf_id': network_function_details['network_function']['id'], 'nfi_id': (network_function_instance['id'] @@ -1840,7 +1880,7 @@ class NSOConfiguratorRpcApi(object): 'requester': nfp_constants.SERVICE_ORCHESTRATOR, 'operation': operation, 'logging_context': nfp_logging.get_logging_context(), - 'nfp_context': nfp_context_rpc + 'nfp_context': rpc_nfp_context } if operation in ['consumer_add', 'consumer_remove']: request_info.update({'consumer_ptg': user_config_data[ diff --git a/gbpservice/nfp/proxy_agent/__init__.py b/gbpservice/nfp/proxy_agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/nfp/proxy_agent/lib/__init__.py b/gbpservice/nfp/proxy_agent/lib/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/nfp/proxy_agent/lib/topics.py b/gbpservice/nfp/proxy_agent/lib/topics.py new file mode 100644 index 000000000..82903ae05 --- /dev/null +++ b/gbpservice/nfp/proxy_agent/lib/topics.py @@ -0,0 +1,16 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +PROXY_AGENT_TOPIC = 'nfp-proxy-agent-topic' +SERVICE_ORCH_TOPIC = 'nfp-nso-notification-topic' +DEVICE_ORCH_TOPIC = 'nfp-ndo-notification-topic' +CONFIG_ORCH_TOPIC = 'nfp-nco-notification-topic' diff --git a/gbpservice/nfp/proxy_agent/modules/__init__.py b/gbpservice/nfp/proxy_agent/modules/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/nfp/proxy_agent/modules/notification_agent.py b/gbpservice/nfp/proxy_agent/modules/notification_agent.py new file mode 100644 index 000000000..2e8225e68 --- /dev/null +++ b/gbpservice/nfp/proxy_agent/modules/notification_agent.py @@ -0,0 +1,34 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from gbpservice.nfp.core.event import Event +from gbpservice.nfp.proxy_agent.notifications import pull + + +def events_init(sc, conf): + """Register event with its handler.""" + evs = [ + Event(id='PULL_NOTIFICATIONS', + handler=pull.PullNotification(sc, conf))] + sc.register_events(evs) + + +def nfp_module_init(sc, conf): + """Initialize module to register rpc & event handler""" + events_init(sc, conf) + + +def nfp_module_post_init(sc, conf): + """Post a event for pull notification after each periodic_task_interval""" + ev = sc.new_event(id='PULL_NOTIFICATIONS', + key='PULL_NOTIFICATIONS') + sc.post_event(ev) diff --git a/gbpservice/nfp/proxy_agent/modules/proxy_agent.py b/gbpservice/nfp/proxy_agent/modules/proxy_agent.py new file mode 100644 index 000000000..7bc0b46b0 --- /dev/null +++ b/gbpservice/nfp/proxy_agent/modules/proxy_agent.py @@ -0,0 +1,105 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core.rpc import RpcAgent +from gbpservice.nfp.lib import transport as transport +from gbpservice.nfp.proxy_agent.lib import topics + + +from oslo_log import helpers as log_helpers +import oslo_messaging as messaging + +LOG = nfp_logging.getLogger(__name__) + + +def rpc_init(config, sc): + """Register agent with its handler.""" + rpcmgr = RpcHandler(config, sc) + agent = RpcAgent( + sc, + host=config.host, + topic=topics.PROXY_AGENT_TOPIC, + manager=rpcmgr) + sc.register_rpc_agents([agent]) + + +def nfp_module_init(sc, conf): + """Initialize module to register rpc & event handler""" + rpc_init(conf, sc) + + +class RpcHandler(object): + RPC_API_VERSION = '1.0' + target = messaging.Target(version=RPC_API_VERSION) + + def __init__(self, conf, sc): + super(RpcHandler, self).__init__() + self._conf = conf + self._sc = sc + + @log_helpers.log_method_call + def create_network_function_config(self, context, body): + """Method of rpc handler for create_network_function_config. + Return: Http Response. + """ + transport.send_request_to_configurator(self._conf, + context, body, + "CREATE") + + @log_helpers.log_method_call + def delete_network_function_config(self, context, body): + """Method of rpc handler for delete_network_function_config. + Return: Http Response. + """ + transport.send_request_to_configurator(self._conf, + context, body, + "DELETE") + + @log_helpers.log_method_call + def update_network_function_config(self, context, body): + """Method of rpc handler for delete_network_function_config. + Return: Http Response. + """ + transport.send_request_to_configurator(self._conf, + context, body, + "UPDATE") + + @log_helpers.log_method_call + def create_network_function_device_config(self, context, body): + """Method of rpc handler for create_network_function_device_config. + Return: Http Response. + """ + transport.send_request_to_configurator(self._conf, + context, body, + "CREATE", + device_config=True) + + @log_helpers.log_method_call + def delete_network_function_device_config(self, context, body): + """Method of rpc handler for delete_network_function_device_config. + Return: Http Response. + """ + transport.send_request_to_configurator(self._conf, + context, body, + "DELETE", + device_config=True) + + @log_helpers.log_method_call + def network_function_event(self, context, body): + """Method of rpc handler for create_service. + Return: Http Response. + """ + transport.send_request_to_configurator(self._conf, + context, body, + "CREATE", + network_function_event=True) diff --git a/gbpservice/nfp/proxy_agent/notifications/__init__.py b/gbpservice/nfp/proxy_agent/notifications/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/nfp/proxy_agent/notifications/pull.py b/gbpservice/nfp/proxy_agent/notifications/pull.py new file mode 100644 index 000000000..e23fc8fd9 --- /dev/null +++ b/gbpservice/nfp/proxy_agent/notifications/pull.py @@ -0,0 +1,89 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import module as nfp_api +from gbpservice.nfp.lib import transport as transport +from gbpservice.nfp.proxy_agent.lib import topics as a_topics + +from neutron import context as n_context + +import sys +import traceback + +LOG = nfp_logging.getLogger(__name__) + +ResourceMap = { + 'device_orch': a_topics.DEVICE_ORCH_TOPIC, + 'service_orch': a_topics.SERVICE_ORCH_TOPIC, + 'nas_service': a_topics.CONFIG_ORCH_TOPIC +} + + +"""Periodic Class to pull notification from configurator""" + + +class PullNotification(nfp_api.NfpEventHandler): + + def __init__(self, sc, conf): + self._sc = sc + self._conf = conf + + def handle_event(self, ev): + self._sc.poll_event(ev) + + def _method_handler(self, notification): + # Method handles notification as per resource, resource_type and method + try: + requester = notification['info']['context']['requester'] + topic = ResourceMap[requester] + context = notification['info']['context']['neutron_context'] + rpcClient = transport.RPCClient(topic) + rpc_ctx = n_context.Context.from_dict(context) + rpcClient.cctxt.cast(rpc_ctx, + 'network_function_notification', + notification_data=notification) + except Exception as e: + raise Exception(e) + + @nfp_api.poll_event_desc(event='PULL_NOTIFICATIONS', spacing=2) + def pull_notifications(self, ev): + """Pull and handle notification from configurator.""" + notifications = transport.get_response_from_configurator(self._conf) + + if not isinstance(notifications, list): + message = "Notfications not list, %s" % (notifications) + LOG.error(message) + + else: + for notification in notifications: + if not notification: + message = "Receiver Response: Empty" + LOG.info(message) + continue + try: + self._method_handler(notification) + except AttributeError: + exc_type, exc_value, exc_traceback = sys.exc_info() + message = ("AttributeError while handling" + "message %s : %s " % (notification, + traceback.format_exception( + exc_type, exc_value, exc_traceback))) + LOG.error(message) + + except Exception as e: + exc_type, exc_value, exc_traceback = sys.exc_info() + message = ("Generic exception (%s)" + "while handling message (%s) : %s" % ( + e, notification, traceback.format_exception( + exc_type, exc_value, exc_traceback))) + LOG.error(message) diff --git a/gbpservice/nfp/proxy_agent/proxy/__init__.py b/gbpservice/nfp/proxy_agent/proxy/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/nfp/proxy_agent/proxy/proxy.py b/gbpservice/nfp/proxy_agent/proxy/proxy.py new file mode 100644 index 000000000..90566c7da --- /dev/null +++ b/gbpservice/nfp/proxy_agent/proxy/proxy.py @@ -0,0 +1,333 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch() + +import argparse +import ConfigParser +from gbpservice.nfp.core import log as nfp_logging +import os +from oslo_config import cfg +from oslo_log import log as oslo_logging +import socket +import sys +import time + + +oslo_logging.register_options(cfg.CONF) + +LOG = nfp_logging.getLogger(__name__) + +# Queue of proxy connections which workers will handle +ConnQ = eventlet.queue.Queue(maxsize=0) + +tcp_open_connection_count = 0 +tcp_close_connection_count = 0 + + +class ConnectionIdleTimeOut(Exception): + + ''' + Exception raised when connection is idle for configured timeout + ''' + pass + + +""" +parsing the proxy configuration file +""" + + +class Configuration(object): + + def __init__(self, filee): + config = ConfigParser.ConfigParser() + config.read(filee) + + self.thread_pool_size = config.getint('OPTIONS', 'thread_pool_size') + self.unix_bind_path = config.get('OPTIONS', 'unix_bind_path') + self.max_connections = config.getint('OPTIONS', 'max_connections') + self.worker_threads = config.getint('OPTIONS', 'worker_threads') + self.connect_max_wait_timeout = config.getfloat( + 'OPTIONS', 'connect_max_wait_timeout') + self.idle_max_wait_timeout = config.getfloat( + 'OPTIONS', 'idle_max_wait_timeout') + self.idle_min_wait_timeout = config.getfloat( + 'OPTIONS', 'idle_min_wait_timeout') + self.rest_server_address = config.get( + 'NFP_CONTROLLER', 'rest_server_address') + self.rest_server_port = config.getint( + 'NFP_CONTROLLER', 'rest_server_port') + + +""" +Class to create Unix Listener +""" + + +class UnixServer(object): + + def __init__(self, conf, proxy): + self.proxy = proxy + self.bind_path = conf.unix_bind_path + self.max_connections = conf.max_connections + # Make sure the socket does not already exist + try: + os.unlink(self.bind_path) + except OSError: + if os.path.exists(self.bind_path): + raise + + # Create a UDS socket + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + # Bind the socket to the port + message = 'starting up on %s' % self.bind_path + LOG.info(message) + self.socket.bind(self.bind_path) + self.socket.listen(self.max_connections) + + def listen(self): + client, address = self.socket.accept() + self.proxy.new_client(client, address) + + +""" +Class to create TCP client Connection if +TCP server is alive +""" + + +class TcpClient(object): + + def __init__(self, conf, proxy): + self.conf = conf + self.proxy = proxy + self.server_address = conf.rest_server_address + self.server_port = conf.rest_server_port + # Connect the socket to the port where the server is listening + self.server = (self.server_address, self.server_port) + + def connect(self): + sock = socket.socket() + message = 'connecting to %s port %s' % self.server + LOG.info(message) + sock.settimeout(self.conf.connect_max_wait_timeout) + try: + sock.connect(self.server) + except socket.error as exc: + message = "Caught exception socket.error : %s" % exc + LOG.error(message) + return sock, False + return sock, True + + +""" +ADT for proxy connection +""" + + +class Connection(object): + + def __init__(self, conf, socket, type='unix'): + self._socket = socket + self._idle_wait = conf.idle_min_wait_timeout + self._idle_timeout = conf.idle_max_wait_timeout + self._idle_count_max = (self._idle_timeout / self._idle_wait) + self._idle_count = 0 + self._start_time = time.time() + self._end_time = time.time() + self.type = type + self.socket_id = self._socket.fileno() + + def _tick(self): + self._idle_count += 1 + + def _timedout(self): + if self._idle_count > self._idle_count_max: + self._end_time = time.time() + raise ConnectionIdleTimeOut( + "Connection (%d) - stime (%s) - etime (%s) - " + "idle_count (%d) idle_count_max(%d)" % ( + self.identify(), self._start_time, + self._end_time, self._idle_count, self._idle_count_max)) + + def idle(self): + self._tick() + self._timedout() + + def idle_reset(self): + self._idle_count = 0 + self._start_time = time.time() + + def _wait(self, timeout): + if self.type == 'unix': + eventlet.sleep(timeout) + self._socket.setblocking(0) + else: + self._socket.settimeout(timeout) + + def recv(self): + self._wait(self._idle_wait) + try: + data = self._socket.recv(1024) + if data and len(data): + self.idle_reset() + return data + self.idle() + except socket.timeout: + self.idle() + except socket.error: + self.idle() + return None + + def send(self, data): + self._socket.setblocking(1) + self._socket.sendall(data) + self._socket.setblocking(0) + + def close(self): + message = "Closing Socket - %d" % (self.identify()) + LOG.debug(message) + try: + self._socket.shutdown(socket.SHUT_RDWR) + self._socket.close() + except Exception as exc: + message = "%s - exception while closing - %s" % ( + self.identify(), str(exc)) + LOG.error(message) + + def identify(self): + return self.socket_id + + +""" +ADT for Proxy Connection Object +Each Connection Object is pair of Unix Socket and +TCP Client Socket +""" + + +class ProxyConnection(object): + + def __init__(self, conf, unix_socket, tcp_socket): + self._unix_conn = Connection(conf, unix_socket, type='unix') + self._tcp_conn = Connection(conf, tcp_socket, type='tcp') + message = "New Proxy - Unix - %d, TCP - %d" % ( + self._unix_conn.identify(), self._tcp_conn.identify()) + LOG.debug(message) + + def close(self): + self._unix_conn.close() + self._tcp_conn.close() + + def _proxy(self, rxconn, txconn): + data = rxconn.recv() + if data: + txconn.send(data) + + def run(self): + try: + self._proxy(self._unix_conn, self._tcp_conn) + self._proxy(self._tcp_conn, self._unix_conn) + return True + except Exception as exc: + message = "%s" % (exc) + LOG.debug(message) + self._unix_conn.close() + self._tcp_conn.close() + return False + + def identify(self): + return '%d:%d' % ( + self._unix_conn.identify(), + self._tcp_conn.identify()) + + +""" +ADT for proxy Worker +""" + + +class Worker(object): + + def run(self): + """ + Worker thread will pop the Proxy Connection Object + from Connection Queue and Perform send and receive + operations. If the connection is ideal upto ideal_max_timeout + it will not push the Object into connection queue so Proxy Connection + Object is automatically destroy, otherwise it will again + push the Object in connection Queue + """ + while True: + try: + pc = ConnQ.get() + call = True + while call: + call = pc.run() + except eventlet.queue.Empty: + pass + eventlet.sleep(0) + + +""" +ADT to Run the configurator proxy, + accept the Unix Client request, + Check REST Server is reachable or not, + Try to establish TCP Client Connection to REST + +""" + + +class Proxy(object): + + def __init__(self, conf): + self.conf = conf + # Be a server and wait for connections from the client + self.server = UnixServer(conf, self) + self.client = TcpClient(conf, self) + + def start(self): + """Run each worker in new thread""" + + for i in range(self.conf.worker_threads): + eventlet.spawn_n(Worker().run) + while True: + self.server.listen() + + def new_client(self, unixsocket, address): + """Establish connection with the tcp server""" + + tcpsocket, connected = self.client.connect() + if not connected: + message = "Proxy -> Could not connect with tcp server" + LOG.error(message) + unixsocket.close() + tcpsocket.close() + else: + pc = ProxyConnection(self.conf, unixsocket, tcpsocket) + ConnQ.put(pc) + + +def main(argv): + cfg.CONF(args=sys.argv[1:]) + oslo_logging.setup(cfg.CONF, 'nfp') + parser = argparse.ArgumentParser() + parser.add_argument( + '-config-file', "--config-file", action="store", dest='config_file') + parser.add_argument( + '-log-file', "--log-file", action="store", dest='log_file') + args = parser.parse_args(sys.argv[1:]) + conf = Configuration(args.config_file) + Proxy(conf).start()