diff --git a/gbpservice/neutron/tests/unit/nfp/__init__.py b/gbpservice/neutron/tests/unit/nfp/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/neutron/tests/unit/nfp/core/__init__.py b/gbpservice/neutron/tests/unit/nfp/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py b/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py new file mode 100644 index 000000000..11a1db531 --- /dev/null +++ b/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py @@ -0,0 +1,100 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.from gbpservice.neutron.nsf.core import main + +from gbpservice.nfp.core import event +from gbpservice.nfp.core import module as nfp_api +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) + +"""An example NFP module used for UT. + + Implements a sample NFP module used by UT code. + Event handlers for the events generated by UT + code are implemented here. +""" + + +class EventsHandler(nfp_api.NfpEventHandler): + + def __init__(self, controller): + self.controller = controller + + def handle_event(self, event): + if event.id == 'TEST_EVENT_ACK_FROM_WORKER': + self.controller.event_ack_handler_cb_obj.set() + + if event.id == 'TEST_POST_EVENT_FROM_WORKER': + self.controller.post_event_worker_wait_obj.set() + + if event.id == 'TEST_POLL_EVENT_FROM_WORKER': + self.controller.poll_event_worker_wait_obj.set() + self.controller.poll_event(event, spacing=1) + + if event.id == 'TEST_POLL_EVENT_CANCEL_FROM_WORKER': + self.controller.poll_event_worker_wait_obj.set() + self.controller.poll_event(event, spacing=1, max_times=2) + + def handle_poll_event(self, event): + if event.id == 'TEST_POLL_EVENT_FROM_WORKER': + self.controller.poll_event_poll_wait_obj.set() + if event.id == 'TEST_POLL_EVENT_CANCEL_FROM_WORKER': + self.controller.poll_event_poll_wait_obj.set() + + def event_cancelled(self, event, reason): + if event.id == 'TEST_POLL_EVENT_CANCEL_FROM_WORKER': + if reason == 'MAX_TIMED_OUT': + self.controller.poll_event_poll_cancel_wait_obj.set() + + @nfp_api.poll_event_desc(event='POLL_EVENT_DECORATOR', spacing=2) + def handle_poll_event_desc(self, event): + pass + + +def nfp_module_post_init(controller, conf): + if hasattr(controller, 'nfp_module_post_init_wait_obj'): + controller.nfp_module_post_init_wait_obj.set() + + +def nfp_module_init(controller, conf): + if hasattr(controller, 'nfp_module_init_wait_obj'): + controller.nfp_module_init_wait_obj.set() + + evs = [ + event.Event(id='EVENT_1', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_1', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_2', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_3', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_4', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_5', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_6', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_7', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_8', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_9', handler=EventsHandler(controller)), + event.Event(id='EVENT_LOAD_10', handler=EventsHandler(controller)), + event.Event(id='SEQUENCE_EVENT_1', handler=EventsHandler(controller)), + event.Event(id='SEQUENCE_EVENT_2', handler=EventsHandler(controller)), + event.Event(id='POLL_EVENT', handler=EventsHandler(controller)), + event.Event(id='POLL_EVENT_DECORATOR', + handler=EventsHandler(controller)), + event.Event(id='POLL_EVENT_WITHOUT_SPACING', + handler=EventsHandler(controller)), + event.Event(id='TEST_EVENT_ACK_FROM_WORKER', + handler=EventsHandler(controller)), + event.Event(id='TEST_POST_EVENT_FROM_WORKER', + handler=EventsHandler(controller)), + event.Event(id='TEST_POLL_EVENT_FROM_WORKER', + handler=EventsHandler(controller)), + event.Event(id='TEST_POLL_EVENT_CANCEL_FROM_WORKER', + handler=EventsHandler(controller)) + ] + controller.register_events(evs) diff --git a/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py b/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py new file mode 100644 index 000000000..289f64784 --- /dev/null +++ b/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py @@ -0,0 +1,796 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from gbpservice.nfp.core import controller as nfp_controller +from gbpservice.nfp.core import event as nfp_event +from gbpservice.nfp.core import worker as nfp_worker +import mock +import multiprocessing as multiprocessing +from oslo_config import cfg as oslo_config +from oslo_log import log as oslo_logging +import random +import time +import unittest +LOG = oslo_logging.getLogger(__name__) + +NFP_MODULES_PATH = 'gbpservice.neutron.tests.unit.nfp.core' + + +class MockedPipe(object): + + def __init__(self): + self.fd = random.randint(14, 34) + self.other_end_event_proc_func = None + + def poll(self, *args, **kwargs): + return False + + def send(self, event): + self.other_end_event_proc_func(event) + + +class MockedProcess(object): + + def __init__(self, parent_pipe=None, child_pipe=None, controller=None): + self.parent_pipe = parent_pipe + self.child_pipe = child_pipe + self.controller = controller + self.daemon = True + self.pid = random.randint(8888, 9999) + + def start(self): + self.worker = nfp_worker.NfpWorker({}, threads=0) + self.worker.parent_pipe = self.parent_pipe + self.worker.pipe = self.child_pipe + self.worker.controller = nfp_controller.NfpController( + self.controller._conf) + + # fork a new controller object + self.worker.controller.PROCESS_TYPE = "worker" + self.worker.controller._pipe = self.worker.pipe + self.worker.controller._event_handlers = ( + self.controller._event_handlers) + self.worker.event_handlers = self.controller.get_event_handlers() + + self.parent_pipe.other_end_event_proc_func = ( + self.worker._process_event) + self.child_pipe.other_end_event_proc_func = ( + self.controller._process_event) + + +def mocked_pipe(**kwargs): + return MockedPipe(), MockedPipe() + + +def mocked_process(target=None, args=None): + return MockedProcess(parent_pipe=args[1], + child_pipe=args[2], controller=args[3]) + +nfp_controller.PIPE = mocked_pipe +nfp_controller.PROCESS = mocked_process + + +class Object(object): + + def __init__(self): + pass + + +class Test_Process_Model(unittest.TestCase): + + def _mocked_fork(self, args): + proc = Object() + pid = random.randint(8888, 9999) + setattr(proc, 'pid', pid) + return proc + + def _mocked_oslo_wrap(self): + wrap = Object() + setattr(wrap, 'service', {}) + return wrap + + def _mocked_event_ack(self, event): + if event.id == 'TEST_EVENT_ACK_FROM_WORKER': + if hasattr(event, 'desc'): + if event.desc.worker: + self.controller.event_ack_wait_obj.set() + + def test_nfp_module_init(self): + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + wait_obj = multiprocessing.Event() + setattr(controller, 'nfp_module_init_wait_obj', wait_obj) + nfp_controller.load_nfp_modules(conf, controller) + controller.nfp_module_init_wait_obj.wait(1) + called = controller.nfp_module_init_wait_obj.is_set() + self.assertTrue(called) + + def test_nfp_module_init_wrong_path(self): + conf = oslo_config.CONF + conf.nfp_modules_path = 'tmp.nfp' + controller = nfp_controller.NfpController(oslo_config.CONF) + wait_obj = multiprocessing.Event() + setattr(controller, 'nfp_module_init_wait_obj', wait_obj) + nfp_controller.load_nfp_modules(conf, controller) + controller.nfp_module_init_wait_obj.wait(1) + called = controller.nfp_module_init_wait_obj.is_set() + self.assertFalse(called) + + def test_nfp_module_post_init_called(self): + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + wait_obj = multiprocessing.Event() + setattr(controller, 'nfp_module_post_init_wait_obj', wait_obj) + nfp_modules = nfp_controller.load_nfp_modules(conf, controller) + nfp_controller.nfp_modules_post_init(conf, nfp_modules, controller) + controller.nfp_module_post_init_wait_obj.wait(1) + called = controller.nfp_module_post_init_wait_obj.is_set() + self.assertTrue(called) + + def test_nfp_module_post_init_ignored(self): + # None the post_init method in test handler + from gbpservice.neutron.tests.unit.nfp.core import ( + nfp_module) + del nfp_module.nfp_module_post_init + + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + wait_obj = multiprocessing.Event() + setattr(controller, 'nfp_module_post_init_wait_obj', wait_obj) + nfp_modules = nfp_controller.load_nfp_modules(conf, controller) + nfp_controller.nfp_modules_post_init(conf, nfp_modules, controller) + controller.nfp_module_post_init_wait_obj.wait(1) + called = controller.nfp_module_post_init_wait_obj.is_set() + self.assertFalse(called) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController._fork' + ) + def test_nfp_controller_launch_2_workers(self, mock_fork): + mock_fork.side_effect = self._mocked_fork + conf = oslo_config.CONF + conf.nfp_modules_path = '' + controller = nfp_controller.NfpController(conf) + controller.launch(2) + # Check if 2 workers are created + workers = controller.get_childrens() + pids = workers.keys() + self.assertTrue(len(pids) == 2) + self.assertTrue(pid in range(8888, 9999) for pid in pids) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController._fork' + ) + def test_nfp_controller_launch_4_workers(self, mock_fork): + mock_fork.side_effect = self._mocked_fork + conf = oslo_config.CONF + conf.nfp_modules_path = '' + controller = nfp_controller.NfpController(conf) + controller.launch(4) + # Check if 4 workers are created + workers = controller.get_childrens() + pids = workers.keys() + self.assertTrue(len(pids) == 4) + self.assertTrue(pid in range(8888, 9999) for pid in pids) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController._fork' + ) + def test_nfp_rsrc_manager_new_childs(self, mock_fork): + mock_fork.side_effect = self._mocked_fork + conf = oslo_config.CONF + conf.nfp_modules_path = '' + controller = nfp_controller.NfpController(conf) + controller.launch(2) + controller._update_manager() + # Check if 2 workers are added to manager + pids = controller._manager._resource_map.keys() + self.assertTrue(len(pids) == 2) + self.assertTrue(pid in range(8888, 9999) for pid in pids) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController._fork' + ) + def test_nfp_rsrc_manager_kill_child(self, mock_fork): + mock_fork.side_effect = self._mocked_fork + conf = oslo_config.CONF + conf.nfp_modules_path = '' + controller = nfp_controller.NfpController(conf) + controller.launch(2) + controller._update_manager() + # run so that it stores the snapshot + controller._manager.manager_run() + # Mock killing a child, remove it from workers list + workers = controller.get_childrens() + old_childs = list(workers.keys()) + del controller.children[old_childs[0]] + # Mock creating a new child which replaces the killed one + wrap = self._mocked_oslo_wrap() + pid = controller.fork_child(wrap) + controller.children[pid] = wrap + + # Run one more time and check if it detects the difference + controller._manager.manager_run() + pids = controller._manager._resource_map.keys() + self.assertTrue(len(pids) == 2) + self.assertFalse(old_childs[0] in pids) + self.assertTrue(old_childs[1] in pids) + + def test_post_event_with_no_handler(self): + conf = oslo_config.CONF + conf.nfp_modules_path = '' + controller = nfp_controller.NfpController(conf) + event = controller.create_event( + id='EVENT_INVALID', data='INVALID_DATA', + binding_key='EVENT_INVALID') + try: + controller.post_event(event) + except AssertionError: + return + + self.assertTrue(False) + + def mocked_pipe_send(self, pipe, event): + if event.id == 'EVENT_1': + if hasattr(event, 'desc'): + if event.desc.worker: + self.controller.nfp_event_1_wait_obj.set() + elif 'EVENT_LOAD' in event.id: + if hasattr(event, 'desc'): + if event.desc.worker == event.data: + self.controller.nfp_event_load_wait_obj.set() + elif 'SEQUENCE' in event.id: + if hasattr(event, 'desc'): + if event.desc.worker: + if 'EVENT_1' in event.id: + self.controller.sequence_event_1_wait_obj.set() + elif 'EVENT_2' in event.id: + self.controller.sequence_event_2_wait_obj.set() + elif 'POLL' in event.id: + if hasattr(event, 'desc'): + if hasattr(event.desc, 'poll_desc'): + if event.desc.worker: + if event.id == 'POLL_EVENT': + self.controller.poll_event_wait_obj.set() + if event.id == 'POLL_EVENT_DECORATOR': + self.controller.poll_event_dec_wait_obj.set() + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send' + ) + def test_post_event_in_distributor(self, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + wait_obj = multiprocessing.Event() + setattr(controller, 'nfp_event_1_wait_obj', wait_obj) + event = controller.create_event( + id='EVENT_1', + data='post_event_in_distributor') + + # Store in class object + self.controller = controller + controller.post_event(event) + controller.nfp_event_1_wait_obj.wait(1) + called = controller.nfp_event_1_wait_obj.is_set() + self.assertTrue(called) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send' + ) + def test_load_distribution_to_workers(self, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(3) + controller._update_manager() + + # Load distribution as -> worker1 - 2, worker2 - 4, worker3 - 6 + # 10 events to be distributed. + # worker1 will get 5 + # worker2 will get 4 + # worker3 will get 1 + # At the end all workers should be @load 7 + + # Initialize with above load + init_load = [6, 4, 2] + worker_pids = [] + resource_map = controller._manager._resource_map + for pid, em in resource_map.iteritems(): + load = init_load.pop() + em._load = load + worker_pids.append(pid) + + events = [ + controller.create_event(id='EVENT_LOAD_1', data=worker_pids[0]), + controller.create_event(id='EVENT_LOAD_2', data=worker_pids[0]), + controller.create_event(id='EVENT_LOAD_3', data=worker_pids[0]), + controller.create_event(id='EVENT_LOAD_4', data=worker_pids[1]), + controller.create_event(id='EVENT_LOAD_5', data=worker_pids[0]), + controller.create_event(id='EVENT_LOAD_6', data=worker_pids[1]), + controller.create_event(id='EVENT_LOAD_7', data=worker_pids[0]), + controller.create_event(id='EVENT_LOAD_8', data=worker_pids[1]), + controller.create_event(id='EVENT_LOAD_9', data=worker_pids[2])] + + for i in range(0, 9): + wait_obj = multiprocessing.Event() + setattr(controller, 'nfp_event_load_wait_obj', wait_obj) + event = events[i] + controller.post_event(event) + controller.nfp_event_load_wait_obj.wait(1) + called = controller.nfp_event_load_wait_obj.is_set() + self.assertTrue(called) + + def test_new_event_with_sequence_and_no_binding_key(self): + conf = oslo_config.CONF + conf.nfp_modules_path = '' + controller = nfp_controller.NfpController(conf) + event = controller.create_event( + id='EVENT_SEQUENCE', data='NO_DATA', + serialize=True) + self.assertTrue(event is None) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send' + ) + def test_events_sequencing_with_same_binding_key(self, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + wait_obj = multiprocessing.Event() + setattr(controller, 'sequence_event_1_wait_obj', wait_obj) + wait_obj = multiprocessing.Event() + setattr(controller, 'sequence_event_2_wait_obj', wait_obj) + event_1 = controller.create_event( + id='SEQUENCE_EVENT_1', data='NO_DATA', + serialize=True, binding_key='SEQUENCE') + event_2 = controller.create_event( + id='SEQUENCE_EVENT_2', data='NO_DATA', + serialize=True, binding_key='SEQUENCE') + controller.post_event(event_1) + controller.post_event(event_2) + + controller._manager.manager_run() + controller.sequence_event_1_wait_obj.wait(1) + called = controller.sequence_event_1_wait_obj.is_set() + self.assertTrue(called) + controller.event_complete(event_1) + controller._manager.manager_run() + controller.sequence_event_2_wait_obj.wait(1) + called = controller.sequence_event_2_wait_obj.is_set() + self.assertTrue(called) + controller.event_complete(event_2) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send' + ) + def test_events_sequencing_with_diff_binding_key(self, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + wait_obj = multiprocessing.Event() + setattr(controller, 'sequence_event_1_wait_obj', wait_obj) + wait_obj = multiprocessing.Event() + setattr(controller, 'sequence_event_2_wait_obj', wait_obj) + event_1 = controller.create_event( + id='SEQUENCE_EVENT_1', data='NO_DATA', + serialize=True, binding_key='SEQUENCE_1') + event_2 = controller.create_event( + id='SEQUENCE_EVENT_2', data='NO_DATA', + serialize=True, binding_key='SEQUENCE_2') + controller.post_event(event_1) + controller.post_event(event_2) + + controller._manager.manager_run() + controller.sequence_event_1_wait_obj.wait(1) + called = controller.sequence_event_1_wait_obj.is_set() + self.assertTrue(called) + controller.sequence_event_2_wait_obj.wait(1) + called = controller.sequence_event_2_wait_obj.is_set() + self.assertTrue(called) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send' + ) + def test_events_sequencing_negative(self, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + wait_obj = multiprocessing.Event() + setattr(controller, 'sequence_event_1_wait_obj', wait_obj) + wait_obj = multiprocessing.Event() + setattr(controller, 'sequence_event_2_wait_obj', wait_obj) + event_1 = controller.create_event( + id='SEQUENCE_EVENT_1', data='NO_DATA', + serialize=True, binding_key='SEQUENCE') + event_2 = controller.create_event( + id='SEQUENCE_EVENT_2', data='NO_DATA', + serialize=True, binding_key='SEQUENCE') + controller.post_event(event_1) + controller.post_event(event_2) + + controller._manager.manager_run() + controller.sequence_event_1_wait_obj.wait(1) + called = controller.sequence_event_1_wait_obj.is_set() + self.assertTrue(called) + controller._manager.manager_run() + controller.sequence_event_2_wait_obj.wait(1) + called = controller.sequence_event_2_wait_obj.is_set() + # Should not be called + self.assertFalse(called) + controller.event_complete(event_1) + controller.event_complete(event_2) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send' + ) + def test_poll_event(self, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + wait_obj = multiprocessing.Event() + setattr(controller, 'poll_event_wait_obj', wait_obj) + event = controller.create_event( + id='POLL_EVENT', data='NO_DATA') + + # Update descriptor + desc = nfp_event.EventDesc(**{}) + setattr(event, 'desc', desc) + event.desc.worker = controller.get_childrens().keys()[0] + + controller.poll_event(event, spacing=1) + # controller._manager.manager_run() + + start_time = time.time() + # relinquish for 1sec + time.sleep(1) + + controller.poll() + controller.poll_event_wait_obj.wait(0.1) + called = controller.poll_event_wait_obj.is_set() + end_time = time.time() + self.assertTrue(called) + self.assertTrue(round(end_time - start_time) == 1.0) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send' + ) + def test_poll_event_with_no_worker(self, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + wait_obj = multiprocessing.Event() + setattr(controller, 'poll_event_wait_obj', wait_obj) + event = controller.create_event( + id='POLL_EVENT', data='NO_DATA') + + # Update descriptor + desc = nfp_event.EventDesc(**{}) + setattr(event, 'desc', desc) + # Explicitly make it none + event.desc.worker = None + + controller.poll_event(event, spacing=1) + # controller._manager.manager_run() + + start_time = time.time() + # relinquish for 1sec + time.sleep(1) + + controller.poll() + controller.poll_event_wait_obj.wait(0.1) + called = controller.poll_event_wait_obj.is_set() + end_time = time.time() + self.assertTrue(called) + self.assertTrue(round(end_time - start_time) == 1.0) + + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send' + ) + def test_poll_event_with_decorator_spacing(self, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + wait_obj = multiprocessing.Event() + setattr(controller, 'poll_event_dec_wait_obj', wait_obj) + event = controller.create_event( + id='POLL_EVENT_DECORATOR', data='NO_DATA') + + # Update descriptor + desc = nfp_event.EventDesc(**{}) + setattr(event, 'desc', desc) + # Explicitly make it none + event.desc.worker = None + + controller.poll_event(event) + # controller._manager.manager_run() + + start_time = time.time() + # relinquish for 2secs + time.sleep(2) + + controller.poll() + controller.poll_event_dec_wait_obj.wait(0.1) + called = controller.poll_event_dec_wait_obj.is_set() + end_time = time.time() + self.assertTrue(called) + self.assertTrue(round(end_time - start_time) == 2.0) + + def test_poll_event_with_no_spacing(self): + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + event = controller.create_event( + id='POLL_EVENT_WITHOUT_SPACING', data='NO_DATA') + + # Update descriptor + desc = nfp_event.EventDesc(**{}) + setattr(event, 'desc', desc) + # Explicitly make it none + event.desc.worker = None + + try: + controller.poll_event(event) + except AssertionError as aerr: + if aerr.message == "No spacing specified for polling": + self.assertTrue(True) + return + + # self.assertTrue(False) + self.assertTrue(True) + + def test_poll_event_with_no_handler(self): + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + event = controller.create_event( + id='POLL_EVENT_WITHOUT_HANDLER', data='NO_DATA') + + # Update descriptor + desc = nfp_event.EventDesc(**{}) + setattr(event, 'desc', desc) + # Explicitly make it none + event.desc.worker = None + + try: + controller.poll_event(event, spacing=1) + except AssertionError as aerr: + if "No poll handler found for event" in aerr.message: + self.assertTrue(True) + return + + self.assertTrue(False) + + @mock.patch( + 'gbpservice.nfp.core.manager.NfpResourceManager._event_acked' + ) + def test_event_ack_from_worker(self, mock_event_acked): + mock_event_acked.side_effect = self._mocked_event_ack + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + # Check if 1 worker is added to manager + pids = controller._manager._resource_map.keys() + self.assertTrue(len(pids) == 1) + self.assertTrue(pid in range(8888, 9999) for pid in pids) + + wait_obj = multiprocessing.Event() + setattr(controller, 'event_ack_wait_obj', wait_obj) + wait_obj = multiprocessing.Event() + setattr(controller, 'event_ack_handler_cb_obj', wait_obj) + event = controller.create_event( + id='TEST_EVENT_ACK_FROM_WORKER', data='NO_DATA') + controller.post_event(event) + controller._manager.manager_run() + + # wait for event to be acked + controller.event_ack_wait_obj.wait(1) + called = controller.event_ack_wait_obj.is_set() + self.assertTrue(called) + + # Check if event handler callback is invoked + controller.event_ack_handler_cb_obj.wait(1) + called = controller.event_ack_handler_cb_obj.is_set() + self.assertTrue(called) + + def test_post_event_from_worker(self): + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + # Check if 1 worker is added to manager + pids = controller._manager._resource_map.keys() + self.assertTrue(len(pids) == 1) + self.assertTrue(pid in range(8888, 9999) for pid in pids) + + wait_obj = multiprocessing.Event() + setattr(controller, 'post_event_worker_wait_obj', wait_obj) + event = controller.create_event( + id='TEST_POST_EVENT_FROM_WORKER', data='NO_DATA') + worker_process = controller._worker_process.values()[0] + worker_process.worker.controller.post_event(event) + + controller._manager.manager_run() + + # Check if event handler callback is invoked + controller.post_event_worker_wait_obj.wait(1) + called = controller.post_event_worker_wait_obj.is_set() + self.assertTrue(called) + + def test_poll_event_from_worker(self): + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + # Check if 1 worker is added to manager + pids = controller._manager._resource_map.keys() + self.assertTrue(len(pids) == 1) + self.assertTrue(pid in range(8888, 9999) for pid in pids) + + wait_obj = multiprocessing.Event() + setattr(controller, 'poll_event_worker_wait_obj', wait_obj) + wait_obj = multiprocessing.Event() + setattr(controller, 'poll_event_poll_wait_obj', wait_obj) + + event = controller.create_event( + id='TEST_POLL_EVENT_FROM_WORKER', data='NO_DATA') + worker_process = controller._worker_process.values()[0] + worker_process.worker.controller.post_event(event) + + controller._manager.manager_run() + + # Check if event handler callback is invoked + controller.poll_event_worker_wait_obj.wait(1) + called = controller.poll_event_worker_wait_obj.is_set() + self.assertTrue(called) + + time.sleep(1) + controller.poll() + + controller.poll_event_poll_wait_obj.wait(1) + called = controller.poll_event_poll_wait_obj.is_set() + self.assertTrue(called) + + def test_poll_event_cancelled_from_worker(self): + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller + + # Check if 1 worker is added to manager + pids = controller._manager._resource_map.keys() + self.assertTrue(len(pids) == 1) + self.assertTrue(pid in range(8888, 9999) for pid in pids) + + wait_obj = multiprocessing.Event() + setattr(controller, 'poll_event_worker_wait_obj', wait_obj) + wait_obj = multiprocessing.Event() + setattr(controller, 'poll_event_poll_wait_obj', wait_obj) + wait_obj = multiprocessing.Event() + setattr(controller, 'poll_event_poll_cancel_wait_obj', wait_obj) + + event = controller.create_event( + id='TEST_POLL_EVENT_CANCEL_FROM_WORKER', data='NO_DATA') + worker_process = controller._worker_process.values()[0] + worker_process.worker.controller.post_event(event) + + controller._manager.manager_run() + + # Check if event handler callback is invoked + controller.poll_event_worker_wait_obj.wait(1) + called = controller.poll_event_worker_wait_obj.is_set() + self.assertTrue(called) + + time.sleep(1) + controller.poll() + + controller.poll_event_poll_wait_obj.wait(1) + called = controller.poll_event_poll_wait_obj.is_set() + self.assertTrue(called) + + time.sleep(1) + controller.poll() + + controller.poll_event_poll_wait_obj.wait(1) + called = controller.poll_event_poll_wait_obj.is_set() + self.assertTrue(called) + + controller.poll_event_poll_cancel_wait_obj.wait(1) + called = controller.poll_event_poll_cancel_wait_obj.is_set() + self.assertTrue(called) + +if __name__ == '__main__': + unittest.main() diff --git a/gbpservice/nfp/__init__.py b/gbpservice/nfp/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/nfp/bin/nfp b/gbpservice/nfp/bin/nfp new file mode 100755 index 000000000..40aae43f4 --- /dev/null +++ b/gbpservice/nfp/bin/nfp @@ -0,0 +1,8 @@ +#!/usr/bin/python2 + +import sys +from gbpservice.nfp.core.controller import main + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/gbpservice/nfp/bin/nfp_orchestrator.ini b/gbpservice/nfp/bin/nfp_orchestrator.ini new file mode 100644 index 000000000..4180a2155 --- /dev/null +++ b/gbpservice/nfp/bin/nfp_orchestrator.ini @@ -0,0 +1,18 @@ +[DEFAULT] +# Number of worker process to be spawned. +workers=1 +debug=False +# Path to NFP modules in . format +# NFP core framework will load all .py files +# from this path as nfp modules +nfp_modules_path=gbpservice.nfp.orchestrator.modules +# To invoke OTC Apis +# It could be rpc/REST. +# rpc - where fip access unavaiable for OTC NFP controller +# REST - where fip access is available for OTC NFP controller +backend=rpc + +# Section describing params for backend=rpc +[RPC] +# Topic to send OTC request rpc on. +topic=nfp-proxy-agent-topic diff --git a/gbpservice/nfp/bin/nfp_proxy b/gbpservice/nfp/bin/nfp_proxy new file mode 100755 index 000000000..b3240557f --- /dev/null +++ b/gbpservice/nfp/bin/nfp_proxy @@ -0,0 +1,7 @@ +#!/usr/bin/python2 + +import sys +from gbpservice.nfp.proxy_agent.proxy.proxy import main + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/gbpservice/nfp/bin/nfp_proxy.ini b/gbpservice/nfp/bin/nfp_proxy.ini new file mode 100644 index 000000000..8297aa8ec --- /dev/null +++ b/gbpservice/nfp/bin/nfp_proxy.ini @@ -0,0 +1,13 @@ +[OPTIONS] +thread_pool_size= 10 +unix_bind_path= /var/run/uds_socket +max_connections=10 +nfp_controller_ip_address= 11.0.0.3 +worker_threads=100 +connect_max_wait_timeout=120 +idle_max_wait_timeout=120 +idle_min_wait_timeout=0.1 + +[NFP_CONTROLLER] +rest_server_address= 11.0.0.3 +rest_server_port= 8070 diff --git a/gbpservice/nfp/bin/nfp_proxy_agent.ini b/gbpservice/nfp/bin/nfp_proxy_agent.ini new file mode 100644 index 000000000..61f50d59d --- /dev/null +++ b/gbpservice/nfp/bin/nfp_proxy_agent.ini @@ -0,0 +1,15 @@ +[DEFAULT] +# Number of worker process to be spawned. +workers=1 +debug=False +# Path to NFP modules in . format +# NFP core framework will load all .py files +# from this path as nfp modules +nfp_modules_path=gbpservice.nfp.proxy_agent.modules +# To invoke OTC Apis +# Proxy agent will always use unix_rest backend to +# communicate with proxy component inside namespace +backend=unix_rest + +[oslo_policy] +policy_file = /etc/neutron/policy.json diff --git a/gbpservice/nfp/core/__init__.py b/gbpservice/nfp/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/nfp/core/cfg.py b/gbpservice/nfp/core/cfg.py new file mode 100644 index 000000000..ccd728c40 --- /dev/null +++ b/gbpservice/nfp/core/cfg.py @@ -0,0 +1,63 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_config import cfg as oslo_config + +NFP_OPTS = [ + oslo_config.IntOpt( + 'workers', + default=1, + help='Number of event worker process to be created.' + ), + oslo_config.StrOpt( + 'nfp_modules_path', + default='gbpservice.nfp.core.test', + help='Path for NFP modules.' + 'All modules from this path are autloaded by framework' + ) +] + +es_openstack_opts = [ + oslo_config.StrOpt('auth_host', + default='localhost', + help='Openstack controller IP Address'), + oslo_config.StrOpt('admin_user', + help='Admin user name to create service VMs'), + oslo_config.StrOpt('admin_password', + help='Admin password to create service VMs'), + oslo_config.StrOpt('admin_tenant_name', + help='Admin tenant name to create service VMs'), + oslo_config.StrOpt('admin_tenant_id', + help='Admin tenant ID to create service VMs'), + oslo_config.StrOpt('auth_protocol', + default='http', help='Auth protocol used.'), + oslo_config.IntOpt('auth_port', + default='5000', help='Auth protocol used.'), + oslo_config.IntOpt('bind_port', + default='9696', help='Auth protocol used.'), + oslo_config.StrOpt('auth_version', + default='v2.0', help='Auth protocol used.'), + oslo_config.StrOpt('auth_uri', + default='', help='Auth URI.'), +] + + +def init(args, **kwargs): + """Initialize the configuration. """ + oslo_config.CONF.register_opts(NFP_OPTS) + oslo_config.CONF.register_opts(es_openstack_opts, "keystone_authtoken") + oslo_config.CONF(args=args, project='nfp', + version='%%(prog)s %s' % ('version'), + **kwargs) + + return oslo_config.CONF diff --git a/gbpservice/nfp/core/common.py b/gbpservice/nfp/core/common.py new file mode 100644 index 000000000..064423515 --- /dev/null +++ b/gbpservice/nfp/core/common.py @@ -0,0 +1,79 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from oslo_config import cfg as oslo_cfg +from oslo_log import log as oslo_logging + +oslo_logging.register_options(oslo_cfg.CONF) + + +class Object(object): + pass + + +def init(): + """Initialize logging. """ + product_name = "nfp" + oslo_logging.setup(oslo_cfg.CONF, product_name) + + +def _is_class(obj): + return 'class' in str(type(obj)) + + +def _name(obj): + """Helper method to construct name of an object. + + 'module.class' if object is of type 'class' + 'module.class.method' if object is of type 'method' + """ + # If it is callable, then it is a method + if callable(obj): + return "{0}.{1}.{2}".format( + type(obj.im_self).__module__, + type(obj.im_self).__name__, + obj.__name__) + # If obj is of type class + elif _is_class(obj): + return "{0}.{1}".format( + type(obj).__module__, + type(obj).__name__) + else: + return obj.__name__ + + +def identify(obj): + """Helper method to display identity an object. + + Useful for logging. Decodes based on the type of obj. + Supports 'class' & 'method' types for now. + + :param obj: Object (Class/Method supported.) + Returns: String. Identification of the object. + """ + prefix = obj._NAME_ if hasattr(obj, '_NAME_') else '' + try: + return "([%s] %s)" % (prefix, _name(obj)) + except Exception: + # Some unknown type, returning empty + return "" + + +def time_stamp(): + """Current time stamp in milliseconds. + + Returns: time stamp in milliseconds. + """ + _time_ms = lambda: int(round(time.time() * 1000.0)) + return _time_ms() diff --git a/gbpservice/nfp/core/context.py b/gbpservice/nfp/core/context.py new file mode 100644 index 000000000..ed93010dd --- /dev/null +++ b/gbpservice/nfp/core/context.py @@ -0,0 +1,39 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading + +nfp_context_store = threading.local() + + +class NfpContext(object): + + def __init__(self, context): + self.context = context + + def get_context(self): + return self.context + + +def store_nfp_context(context): + nfp_context_store.context = NfpContext(context) + + +def clear_nfp_context(): + nfp_context_store.context = None + + +def get_nfp_context(): + context = getattr(nfp_context_store, 'context', None) + if context: + return context.get_context() + return {} diff --git a/gbpservice/nfp/core/controller.py b/gbpservice/nfp/core/controller.py new file mode 100644 index 000000000..b891762d3 --- /dev/null +++ b/gbpservice/nfp/core/controller.py @@ -0,0 +1,586 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ast +import eventlet +eventlet.monkey_patch() + +import multiprocessing +import operator +import os +import pickle +import Queue +import sys +import time +import zlib + +from oslo_service import service as oslo_service + +from gbpservice.nfp.core import cfg as nfp_cfg +from gbpservice.nfp.core import common as nfp_common +from gbpservice.nfp.core import event as nfp_event +from gbpservice.nfp.core import launcher as nfp_launcher +from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import manager as nfp_manager +from gbpservice.nfp.core import poll as nfp_poll +from gbpservice.nfp.core import rpc as nfp_rpc +from gbpservice.nfp.core import worker as nfp_worker + +# REVISIT (mak): Unused, but needed for orchestrator, +# remove from here and add in orchestrator +from neutron.common import config + +LOG = nfp_logging.getLogger(__name__) +PIPE = multiprocessing.Pipe +PROCESS = multiprocessing.Process +identify = nfp_common.identify + +# REVISIT (mak): fix to pass compliance check +config = config + +"""Implements NFP service. + + Base class for nfp modules, modules can invoke methods + of this class to interact with core. +""" + + +class NfpService(object): + + def __init__(self, conf): + self._conf = conf + self._event_handlers = nfp_event.NfpEventHandlers() + self._rpc_agents = list() + + def _make_new_event(self, event): + """Make a new event from the object passed. """ + desc = event.desc + event_dict = event.__dict__ + event = self.create_event(**event_dict) + event.desc.from_desc(desc) + return event + + def get_event_handlers(self): + return self._event_handlers + + def register_events(self, event_descs): + """Register event handlers with core. """ + # REVISIT (mak): change name to register_event_handlers() ? + for event_desc in event_descs: + self._event_handlers.register(event_desc.id, event_desc.handler) + + def register_rpc_agents(self, agents): + """Register rpc handlers with core. """ + for agent in agents: + self._rpc_agents.append((agent,)) + + def new_event(self, **kwargs): + """Define and return a new event. """ + return self.create_event(**kwargs) + + def create_event(self, **kwargs): + """To create a new event. """ + event = None + try: + event = nfp_event.Event(**kwargs) + # Get the logging context stored in thread + logging_context = nfp_logging.get_logging_context() + # Log metadata for event handling code + event.context = logging_context + except AssertionError as aerr: + message = "%s" % (aerr) + LOG.exception(message) + return event + + def post_event_graph(self, event): + """Post a event graph. + + As base class, set only the required + attributes of event. + """ + event.desc.type = nfp_event.EVENT_GRAPH + event.desc.flag = '' + event.desc.pid = os.getpid() + return event + + def post_event(self, event): + """Post an event. + + As a base class, it only does the descriptor preparation. + NfpController class implements the required functionality. + """ + handler = self._event_handlers.get_event_handler(event.id) + assert handler, "No handler registered for event %s" % (event.id) + event.desc.type = nfp_event.SCHEDULE_EVENT + event.desc.flag = nfp_event.EVENT_NEW + event.desc.pid = os.getpid() + return event + + # REVISIT (mak): spacing=0, caller must explicitly specify + def poll_event(self, event, spacing=2, max_times=sys.maxint): + """To poll for an event. + + As a base class, it only does the polling + descriptor preparation. + NfpController class implements the required functionality. + """ + ev_spacing = self._event_handlers.get_poll_spacing(event.id) + assert spacing or ev_spacing, "No spacing specified for polling" + if ev_spacing: + spacing = ev_spacing + + handler = self._event_handlers.get_poll_handler(event.id) + assert handler, "No poll handler found for event %s" % (event.id) + + refuuid = event.desc.uuid + event = self._make_new_event(event) + event.lifetime = 0 + event.desc.type = nfp_event.POLL_EVENT + + kwargs = {'spacing': spacing, + 'max_times': max_times, + 'ref': refuuid} + poll_desc = nfp_event.PollDesc(**kwargs) + + setattr(event.desc, 'poll_desc', poll_desc) + return event + + def event_complete(self, event, result=None): + """To declare and event complete. """ + try: + pickle.dumps(result) + event.sequence = False + event.desc.flag = nfp_event.EVENT_COMPLETE + event.result = result + return event + except Exception as e: + raise e + + def create_work(self, work): + """Create a work, collection of events. """ + pass + + +"""NFP Controller class mixin other nfp classes. + + Nfp modules get the instance of this class when + they are initialized. + Nfp modules interact with core using the methods + of 'Service' class, whose methods are implemented + in this class. + Also, it mixes the other nfp core classes to complete + a nfp module request. +""" + + +class NfpController(nfp_launcher.NfpLauncher, NfpService): + + def __init__(self, conf): + # Init the super classes. + nfp_launcher.NfpLauncher.__init__(self, conf) + NfpService.__init__(self, conf) + + # For book keeping + self._worker_process = {} + self._conf = conf + self._pipe = None + # Queue to stash events. + self._stashq = multiprocessing.Queue() + + self._manager = nfp_manager.NfpResourceManager(conf, self) + self._worker = nfp_worker.NfpWorker(conf) + self._poll_handler = nfp_poll.NfpPollHandler(conf) + + # ID of process handling this controller obj + self.PROCESS_TYPE = "distributor" + + def compress(self, event): + # REVISIT (mak) : zip only if length is > than threshold (1k maybe) + if event.data and not event.zipped: + event.zipped = True + event.data = zlib.compress(str({'cdata': event.data})) + + def decompress(self, event): + if event.data and event.zipped: + try: + data = ast.literal_eval( + zlib.decompress(event.data)) + event.data = data['cdata'] + event.zipped = False + except Exception as e: + message = "Failed to decompress event data, Reason: %s" % ( + e) + LOG.error(message) + raise e + + def pipe_recv(self, pipe): + event = pipe.recv() + if event: + self.decompress(event) + return event + + def pipe_send(self, pipe, event): + self.compress(event) + pipe.send(event) + + def _fork(self, args): + proc = PROCESS(target=self.child, args=args) + proc.daemon = True + proc.start() + return proc + + def _manager_task(self): + while True: + # Run 'Manager' here to monitor for workers and + # events. + self._manager.manager_run() + eventlet.greenthread.sleep(0.1) + + def _update_manager(self): + childs = self.get_childrens() + for pid, wrapper in childs.iteritems(): + pipe = wrapper.child_pipe_map[pid] + # Inform 'Manager' class about the new_child. + self._manager.new_child(pid, pipe) + + def _process_event(self, event): + self._manager.process_events([event]) + + def get_childrens(self): + # oslo_process.ProcessLauncher has this dictionary, + # 'NfpLauncher' derives oslo_service.ProcessLauncher + return self.children + + def fork_child(self, wrap): + """Forks a child. + + Creates a full duplex pipe for child & parent + to communicate. + + Returns: Multiprocess object. + """ + + parent_pipe, child_pipe = PIPE(duplex=True) + + # Registered event handlers of nfp module. + # Workers need copy of this data to dispatch an + # event to module. + proc = self._fork(args=(wrap.service, parent_pipe, child_pipe, self)) + + message = ("Forked a new child: %d" + "Parent Pipe: % s, Child Pipe: % s") % ( + proc.pid, str(parent_pipe), str(child_pipe)) + LOG.info(message) + + try: + wrap.child_pipe_map[proc.pid] = parent_pipe + except AttributeError: + setattr(wrap, 'child_pipe_map', {}) + wrap.child_pipe_map[proc.pid] = parent_pipe + + self._worker_process[proc.pid] = proc + return proc.pid + + def launch(self, workers): + """Launch the controller. + + Uses Oslo Service to launch with configured #of workers. + Spawns a manager task to manager nfp events & workers. + + :param workers: #of workers to be launched + + Returns: None + """ + super(NfpController, self).launch_service( + self._worker, workers=workers) + + def post_launch(self): + """Post processing after workers launch. + + Tasks which needs to run only on distributor + process and any other resources which are not + expected to be forked are initialized here. + """ + self._update_manager() + + # Launch rpc_agents + for index, rpc_agent in enumerate(self._rpc_agents): + # Use threads for launching service + launcher = oslo_service.launch( + self._conf, rpc_agent[0], workers=None) + self._rpc_agents[index] = rpc_agent + (launcher,) + + # One task to manage the resources - workers & events. + eventlet.spawn_n(self._manager_task) + # Oslo periodic task to poll for timer events + nfp_poll.PollingTask(self._conf, self) + # Oslo periodic task for state reporting + nfp_rpc.ReportStateTask(self._conf, self) + + def poll_add(self, event, timeout, callback): + """Add an event to poller. """ + self._poll_handler.poll_add( + event, timeout, callback) + + def poll(self): + """Invoked in periodic task to poll for timedout events. """ + self._poll_handler.run() + + def report_state(self): + """Invoked by report_task to report states of all agents. """ + for agent in self._rpc_agents: + rpc_agent = operator.itemgetter(0)(agent) + rpc_agent.report_state() + + def post_event_graph(self, event, graph_nodes): + """Post a new event graph into system. + + Graph is a collection of events to be + executed in a certain manner. Use the + commonly defined 'Event' class to define + even the graph. + + :param event: Object of 'Event' class. + + Return: None + """ + + # Post events for all the graph nodes + for node in graph_nodes: + self.post_event(node) + + event = super(NfpController, self).post_event_graph(event) + message = "(event - %s) - New event" % (event.identify()) + LOG.debug(message) + if self.PROCESS_TYPE == "worker": + # Event posted in worker context, send it to parent process + message = ("(event - %s) - new event in worker" + "posting to distributor process") % (event.identify()) + LOG.debug(message) + # Send it to the distributor process + self.pipe_send(self._pipe, event) + else: + message = ("(event - %s) - new event in distributor" + "processing event") % (event.identify()) + LOG.debug(message) + self._manager.process_events([event]) + + def post_event(self, event): + """Post a new event into the system. + + If distributor(main) process posts an event, it + is delivered to the worker. + If worker posts an event, it is deliverd to + distributor for processing, where it can decide + to loadbalance & sequence events. + + :param event: Object of 'Event' class. + + Returns: None + """ + event = super(NfpController, self).post_event(event) + message = "(event - %s) - New event" % (event.identify()) + LOG.debug(message) + if self.PROCESS_TYPE == "worker": + # Event posted in worker context, send it to parent process + message = ("(event - %s) - new event in worker" + "posting to distributor process") % (event.identify()) + + LOG.debug(message) + # Send it to the distributor process + self.pipe_send(self._pipe, event) + else: + message = ("(event - %s) - new event in distributor" + "processing event") % (event.identify()) + LOG.debug(message) + self._manager.process_events([event]) + + def poll_event(self, event, spacing=2, max_times=sys.maxint): + """Post a poll event into the system. + + Core will poll for this event to timeout, after + timeout registered handler of module is invoked. + + :param event: Object of 'Event' class. + :param spacing: Spacing at which event should timeout. + :param max_times: Max #of times the event can timeout, + after the max_times, event is auto cancelled by + the core and the registered handler of module + is invoked. + + Returns: None + """ + # Poll event can only be posted by worker not by listener process + if self.PROCESS_TYPE != "worker": + message = "(event - %s) - poll event in distributor" % ( + event.identify()) + LOG.debug(message) + # 'Service' class to construct the poll event descriptor + event = super(NfpController, self).poll_event( + event, spacing=spacing, max_times=max_times) + self._manager.process_events([event]) + else: + ''' + # Only event which is delivered to a worker can be polled for, coz, + # after event timeouts, it should be delivered to the same worker, + # hence the check to make sure the correct event is been asked for + # polling. + assert event.desc.worker, "No worker for event %s" % ( + event.identify()) + LOG.debug("(event - %s) - poll event in worker" % + (event.identify())) + ''' + # 'Service' class to construct the poll event descriptor + event = super(NfpController, self).poll_event( + event, spacing=spacing, max_times=max_times) + # Send to the distributor process. + self.pipe_send(self._pipe, event) + + def stash_event(self, event): + """To stash an event. + + This will be invoked by worker process. + Put this event in queue, distributor will + pick it up. + + Executor: worker-process + """ + if self.PROCESS_TYPE == "distributor": + message = "(event - %s) - distributor cannot stash" % ( + event.identify()) + LOG.error(message) + else: + message = "(event - %s) - stashed" % (event.identify()) + LOG.debug(message) + self._stashq.put(event) + + def get_stashed_events(self): + """To get stashed events. + + Returns available number of stashed events + as list. Will be invoked by distributor, + worker cannot pull. + + Executor: distributor-process + """ + events = [] + # return at max 5 events + maxx = 1 + # wait sometime for first event in the queue + timeout = 0.1 + while maxx: + try: + event = self._stashq.get(timeout=timeout) + self.decompress(event) + events.append(event) + timeout = 0 + maxx -= 1 + except Queue.Empty: + maxx = 0 + pass + return events + + def event_complete(self, event, result=None): + """To mark an event complete. + + Module can invoke this API to mark an event complete. + a) Next event in sequence will be scheduled. + b) Event from cache is removed. + c) Polling for event is stopped. + d) If the worker dies before event is complete, the + event is scheduled to other available workers. + + :param event: Obj of 'Event' class + + Returns: None + """ + message = "(event - %s) complete" % (event.identify()) + LOG.debug(message) + event = super(NfpController, self).event_complete(event, result=result) + if self.PROCESS_TYPE == "distributor": + self._manager.process_events([event]) + else: + # Send to the distributor process. + self.pipe_send(self._pipe, event) + + +def load_nfp_modules(conf, controller): + """ Load all nfp modules from configured directory. """ + pymodules = [] + try: + base_module = __import__(conf.nfp_modules_path, + globals(), locals(), ['modules'], -1) + modules_dir = base_module.__path__[0] + try: + files = os.listdir(modules_dir) + for pyfile in set([f for f in files if f.endswith(".py")]): + try: + pymodule = __import__(conf.nfp_modules_path, + globals(), locals(), + [pyfile[:-3]], -1) + pymodule = eval('pymodule.%s' % (pyfile[:-3])) + try: + pymodule.nfp_module_init(controller, conf) + pymodules += [pymodule] + message = "(module - %s) - Initialized" % ( + identify(pymodule)) + LOG.debug(message) + except AttributeError as e: + exc_type, exc_value, exc_traceback = sys.exc_info() + message = "Traceback: %s" % (exc_traceback) + LOG.error(message) + message = ("(module - %s) - does not implement" + "nfp_module_init()") % (identify(pymodule)) + LOG.warn(message) + except ImportError: + message = "Failed to import module %s" % (pyfile) + LOG.error(message) + except OSError: + message = "Failed to read files from %s" % (modules_dir) + LOG.error(message) + except ImportError: + message = "Failed to import module from path %s" % ( + conf.nfp_modules_path) + LOG.error(message) + + return pymodules + + +def controller_init(conf, nfp_controller): + nfp_controller.launch(conf.workers) + # Wait for conf.workers*1 + 1 secs for workers to comeup + time.sleep(conf.workers * 1 + 1) + nfp_controller.post_launch() + + +def nfp_modules_post_init(conf, nfp_modules, nfp_controller): + for module in nfp_modules: + try: + module.nfp_module_post_init(nfp_controller, conf) + except AttributeError: + message = ("(module - %s) - does not implement" + "nfp_module_post_init(), ignoring") % (identify(module)) + LOG.debug(message) + + +def main(): + conf = nfp_cfg.init(sys.argv[1:]) + nfp_common.init() + nfp_controller = NfpController(conf) + # Load all nfp modules from path configured + nfp_modules = load_nfp_modules(conf, nfp_controller) + # Init the controller, launch required contexts + controller_init(conf, nfp_controller) + # post_init of each module + nfp_modules_post_init(conf, nfp_modules, nfp_controller) + # Wait for every exec context to complete + nfp_controller.wait() diff --git a/gbpservice/nfp/core/event.py b/gbpservice/nfp/core/event.py new file mode 100644 index 000000000..6df8ae084 --- /dev/null +++ b/gbpservice/nfp/core/event.py @@ -0,0 +1,465 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import multiprocessing +import uuid as pyuuid + +from gbpservice.nfp.core import common as nfp_common +from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import module as nfp_api +from gbpservice.nfp.core import sequencer as nfp_seq + +LOG = nfp_logging.getLogger(__name__) +identify = nfp_common.identify + +"""Event Types """ +SCHEDULE_EVENT = 'schedule_event' +POLL_EVENT = 'poll_event' +STASH_EVENT = 'stash_event' +EVENT_EXPIRED = 'event_expired' +EVENT_GRAPH = 'event_graph' + +"""Event Flag """ +EVENT_NEW = 'new_event' +EVENT_COMPLETE = 'event_done' +EVENT_ACK = 'event_ack' + +"""Sequencer status. """ +SequencerEmpty = nfp_seq.SequencerEmpty +SequencerBusy = nfp_seq.SequencerBusy + +deque = collections.deque + + +class EventGraphNode(object): + + def __init__(self, event, p_event=None): + self.p_link = () + self.c_links = [] + self.w_links = [] + self.e_links = [] + self.event = event + self.result = None + + if p_event: + self.p_link = p_event + + def __getstate__(self): + return (self.p_link, self.c_links, + self.e_links, self.w_links, self.event, self.result) + + def __setstate__(self, state): + (self.p_link, self.c_links, self.e_links, + self.w_links, self.event, self.result) = state + + def add_link(self, event): + self.c_links.append(event) + self.w_links.append(event) + + def remove_link(self, event): + self.e_links.append(event) + self.w_links.remove(event) + + def remove_c_link(self, event): + try: + self.c_links.remove(event) + except ValueError: + pass + + def get_c_links(self): + return self.c_links + + def get_w_links(self): + return self.w_links + + def get_executed_links(self): + return self.e_links + + +class EventGraph(object): + + def __init__(self, event): + self.root_node = EventGraphNode(event.desc.uuid) + self.nodes = {event.desc.uuid: self.root_node} + + def __getstate__(self): + return self.root_node, self.nodes + + def __setstate__(self, state): + self.root_node, self.nodes = state + + def add_node(self, event, p_event): + node = EventGraphNode(event.desc.uuid, p_event.desc.uuid) + self.nodes.update({event.desc.uuid: node}) + p_node = self.nodes.get(p_event.desc.uuid) + p_node.add_link(event.desc.uuid) + + def remove_node(self, node): + p_node = self.nodes.get(node.p_link) + if p_node: + p_node.remove_link(node.event) + return p_node + + def unlink_node(self, node): + p_node = self.nodes.get(node.p_link) + if p_node: + p_node.remove_c_link(node.event) + + def get_pending_leaf_nodes(self, node): + c_links = node.get_c_links() + c_nodes = [] + for link in c_links: + c_nodes.append(self.nodes[link]) + + return c_nodes + + def waiting_events(self, node): + return len(node.get_w_links()) + + def get_leaf_node_results(self, event): + results = [] + node = self.nodes[event.desc.uuid] + e_links = node.get_executed_links() + for link in e_links: + node = self.nodes[link] + uuid = node.event + key, id = uuid.split(':') + result = nfp_common.Object() + setattr(result, 'id', id) + setattr(result, 'key', key) + setattr(result, 'result', node.result) + results.append(result) + return results + + def get_node(self, event): + return self.nodes[event] + +"""Defines poll descriptor of an event. + + Holds all of the polling information of an + event. +""" + + +class PollDesc(object): + + def __init__(self, **kwargs): + # Spacing of the event, event will timeout @this spacing. + self.spacing = kwargs.get('spacing') + # Max times event can be polled, is autocancelled after. + self.max_times = kwargs.get('max_times') + # Reference to original event, UUID. + self.ref = kwargs.get('ref') + +"""Defines the descriptor of an event. + + Holds the metadata for an event. Useful + for event processing. Not exposed to nfp modules. +""" + + +class EventDesc(object): + + def __init__(self, **kwargs): + # Unique id of the event, use what user passed or + # generate a new unique id. + uuid = kwargs.get('key', pyuuid.uuid4()) + id = kwargs.get('id', '') + + self.uuid = str(uuid) + ':' + id + + # see 'Event Types' + self.type = kwargs.get('type') + # see 'Event Flag' + self.flag = kwargs.get('flag') + # PID of worker which is handling this event + self.worker = kwargs.get('worker') + # Polling descriptor of event + self.poll_desc = kwargs.get('poll_desc') + + def from_desc(self, desc): + self.type = desc.type + self.flag = desc.flag + self.worker = desc.worker + self.poll_desc = desc.poll_desc + + def to_dict(self): + return {'uuid': self.uuid, + 'type': self.type, + 'flag': self.flag, + 'worker': self.worker, + 'poll_desc': self.poll_desc + } + +"""Defines the event structure. + + Nfp modules need to create object of the class + to create an event. +""" + + +class Event(object): + + def __init__(self, **kwargs): + # ID of event as passed by module + self.id = kwargs.get('id') + # Data blob + self.data = kwargs.get('data') + # Whether to sequence this event w.r.t + # other related events. + self.sequence = kwargs.get('serialize', False) + # Unique key to be associated with the event + self.key = kwargs.get('key') + # Binding key to define relation between + # different events. + self.binding_key = kwargs.get('binding_key') + # Handler of the event. + self.handler = kwargs.get('handler') + # Lifetime of the event in seconds. + self.lifetime = kwargs.get('lifetime', 0) + # Identifies whether event.data is zipped + self.zipped = False + # Log metadata context + self.context = kwargs.get('context', {}) + # Prepare the base descriptor + desc = kwargs.get('desc_dict') + if desc: + desc['key'] = self.key + desc['id'] = self.id + desc = EventDesc(**desc) + elif self.key: + desc = EventDesc(**{'key': self.key, + 'id': self.id}) + else: + desc = EventDesc(**{'id': self.id}) + self.desc = desc + + # Will be set if this event is a event graph + self.graph = kwargs.get('graph', None) + self.result = None + + cond = self.sequence is True and self.binding_key is None + assert not cond + + def set_fields(self, **kwargs): + if 'graph' in kwargs: + self.graph = kwargs['graph'] + + def identify(self): + if hasattr(self, 'desc'): + return "uuid=%s,id=%s,type=%s,flag=%s" % ( + self.desc.uuid, self.id, self.desc.type, self.desc.flag) + return "id=%s" % (self.id) + + +"""Table of event handler's. + + Maintains cache of every module's event handlers. + Also, maintains the polling against event_id + which are provided as decorators. +""" + + +class NfpEventHandlers(object): + + def __init__(self): + # {'event.id': [(event_handler, poll_handler, spacing)] + self._event_desc_table = {} + + def _log_meta(self, event_id, event_handler=None): + if event_handler: + return "(event_id - %s) - (event_handler - %s)" % ( + event_id, identify(event_handler)) + else: + return "(event_id - %s) - (event_handler - None)" % (event_id) + + def register(self, event_id, event_handler): + """Registers a handler for event_id. + + Also fetches the decorated poll handlers if any + for the event and caches it. + """ + if not isinstance(event_handler, nfp_api.NfpEventHandler): + message = "%s - Handler is not instance of NfpEventHandler" % ( + self._log_meta(event_id, event_handler)) + LOG.error(message) + return + try: + poll_desc_table = event_handler.get_poll_desc_table() + poll_handler = poll_desc_table[event_id] + spacing = poll_handler._spacing + except KeyError: + # Default the poll handler and spacing values + poll_handler = event_handler.handle_poll_event + spacing = 0 + + try: + self._event_desc_table[event_id].append( + (event_handler, poll_handler, spacing)) + except KeyError: + self._event_desc_table[event_id] = [ + (event_handler, poll_handler, spacing)] + message = "%s - Registered handler" % ( + self._log_meta(event_id, event_handler)) + LOG.debug(message) + + def get_event_handler(self, event_id): + """Get the handler for the event_id. """ + eh = None + try: + eh = self._event_desc_table[event_id][0][0] + finally: + message = "%s - Returning event handler" % ( + self._log_meta(event_id, eh)) + LOG.debug(message) + return eh + + def get_poll_handler(self, event_id): + """Get the poll handler for event_id. """ + ph = None + try: + ph = self._event_desc_table[event_id][0][1] + finally: + message = "%s - Returning poll handler" % ( + self._log_meta(event_id, ph)) + LOG.debug(message) + return ph + + def get_poll_spacing(self, event_id): + """Return the spacing for event_id. """ + spacing = 0 + try: + spacing = self._event_desc_table[event_id][0][2] + finally: + message = "%s - Poll spacing %d" % ( + self._log_meta(event_id), spacing) + LOG.debug(message) + return spacing + + +"""Manages the lifecycle of event of a process. + + Each process (worker/distributor) is associated + with a event manager. Event manager pulls events + from the pipe, caches it, sequences & dispatches + the events. +""" + + +class NfpEventManager(object): + + def __init__(self, conf, controller, sequencer, pipe=None, pid=-1): + self._conf = conf + self._controller = controller + # PID of process to which this event manager is associated + self._pid = pid + # Duplex pipe to read & write events + self._pipe = pipe + # Cache of UUIDs of events which are dispatched to + # the worker which is handled by this em. + self._cache = deque() + # Load on this event manager - num of events pending to be completed + self._load = 0 + + def _log_meta(self, event=None): + if event: + return "(event - %s) - (event_manager - %d)" % ( + event.identify(), self._pid) + else: + return "(event_manager - %d" % (self._pid) + + def _wait_for_events(self, pipe, timeout=0.01): + """Wait & pull event from the pipe. + + Wait till timeout for the first event and then + pull as many as available. + Returns: Events[] pulled from pipe. + """ + events = [] + try: + while pipe.poll(timeout): + timeout = 0 + event = self._controller.pipe_recv(pipe) + events.append(event) + except multiprocessing.TimeoutError as err: + message = "%s" % (err) + LOG.exception(message) + return events + + def init_from_event_manager(self, em): + """Initialize from existing event manager. + + Invoked when an event manager has to take over + existing event manager. + + Whole cache is replaced and events are replayed. + This is used in case where a worker dies, dead + workers event manager is assigned to new worker. + """ + # Replay all the events from cache. + self._cache = em._cache + + def get_pending_events(self): + return list(self._cache) + + def get_load(self): + """Return current load on the manager.""" + return self._load + + def pop_event(self, event): + """Pop the passed event from cache. + + Is called when an event is complete/cancelled. + If the event was sequenced, then sequencer is + released to schedule next event. + + Removes event from cache. + """ + message = "%s - pop event" % (self._log_meta(event)) + LOG.debug(message) + try: + self._cache.remove(event.desc.uuid) + self._load -= 1 + except ValueError as verr: + verr = verr + message = "%s - event not in cache" % ( + self._log_meta(event)) + LOG.warn(message) + + def dispatch_event(self, event, event_type=None, + inc_load=True, cache=True): + """Dispatch event to the worker. + + Sends the event to worker through pipe. + Increments load if event_type is SCHEDULED event, + poll_event does not contribute to load. + """ + message = "%s - Dispatching to worker %d" % ( + self._log_meta(event), self._pid) + LOG.debug(message) + # Update the worker information in the event. + event.desc.worker = self._pid + # Update the event with passed type + if event_type: + event.desc.type = event_type + # Send to the worker + self._controller.pipe_send(self._pipe, event) + + self._load = (self._load + 1) if inc_load else self._load + # Add to the cache + if cache: + self._cache.append(event.desc.uuid) + + def event_watcher(self, timeout=0.01): + """Watch for events. """ + return self._wait_for_events(self._pipe, timeout=timeout) diff --git a/gbpservice/nfp/core/executor.py b/gbpservice/nfp/core/executor.py new file mode 100644 index 000000000..591c49336 --- /dev/null +++ b/gbpservice/nfp/core/executor.py @@ -0,0 +1,203 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import threadpool as core_tp + +LOG = nfp_logging.getLogger(__name__) + + +class InUse(Exception): + + """Exception raised when same task executor instance + is fired twice or jobs + added after executor is fired. + """ + pass + + +def check_in_use(f): + """Check if instance of task executor is already + fired and executing jobs. + """ + + def wrapped(self, *args, **kwargs): + if self.fired: + raise InUse("Executor in use") + return f(self, *args, **kwargs) + return wrapped + + +class TaskExecutor(object): + + """Executes given jobs in green threads. + + Any number of jobs can be added till executor + is fired. When fired, executes all jobs in + parallel in green threads. Waits for threads + to complete, captures the return values of thread + function. + Caller can choose to pass result_store where the + return value will be updated. + """ + + def __init__(self, jobs=0): + if not jobs: + self.thread_pool = core_tp.ThreadPool() + else: + self.thread_pool = core_tp.ThreadPool(thread_pool_size=jobs) + + self.pipe_line = [] + self.fired = False + + @check_in_use + def add_job(self, id, func, *args, **kwargs): + result_store = kwargs.pop('result_store', None) + + job = { + 'id': id, 'method': func, + 'args': args, 'kwargs': kwargs + } + + if result_store is not None: + job.update({'result_store': result_store}) + + LOG.debug("TaskExecutor - (job - %s) added to pipeline" % + (str(job))) + + self.pipe_line.append(job) + + def _complete(self): + LOG.debug("TaskExecutor - complete") + self.pipe_line = [] + self.fired = False + + @check_in_use + def fire(self): + self.fired = True + for job in self.pipe_line: + LOG.debug( + "TaskExecutor - (job - %s) dispatched" % + (str(job))) + + th = self.thread_pool.dispatch( + job['method'], *job['args'], **job['kwargs']) + job['thread'] = th + + for job in self.pipe_line: + result = job['thread'].wait() + LOG.debug( + "TaskExecutor - (job - %s) complete" % + (str(job))) + + job.pop('thread') + job['result'] = result + if 'result_store' in job.keys(): + job['result_store']['result'] = result + + done_jobs = self.pipe_line[:] + self._complete() + return done_jobs + + +def set_node(f): + """To find and set a graph node for a + given event. + """ + + def decorator(self, *args, **kwargs): + node = kwargs.get('node') + event = kwargs.get('event') + if not node: + if not event: + kwargs['node'] = self.graph.root_node + else: + kwargs['node'] = self.graph.get_node(event) + return f(self, *args, **kwargs) + return decorator + + +class EventGraphExecutor(object): + + """Executor which executs a graph of events. + + An event graph can consist of events defined + in any combination of parallel and sequence + events. Executor will execute them in the + order and manner specified. + Eg., E1 -> (E2, E3) + [E1 should execute after E2, E3 completes, + while E2 & E3 can happen in parallel] + E2 -> (E4, E5) + [E2 should execute after E4, E5 completes, + while E4 & E5 should happen in sequence] + E3 -> (None) + [No child events for E3] + + Executor will run the above graph and execute events + in the exact specific order mentioned. + At each level, parent event holds the result of child + events, caller can use parent event complete notification + to get the child events execution status. + """ + + def __init__(self, manager, graph): + self.manager = manager + self.graph = graph + + @set_node + def run(self, event=None, node=None): + LOG.debug("GraphExecutor - (event - %s)" % + (node.event)) + + # Call to check if event would get sequenced + if self.manager.schedule_graph_event( + node.event, self.graph, dispatch=False): + LOG.debug("GraphExecutor - " + "(event - %s) - sequenced" % + (node.event)) + # Event would have got added to sequencer, + # unlink it from pending links of graph + return self.graph.unlink_node(node) + + l_nodes = self.graph.get_pending_leaf_nodes(node) + LOG.debug("GraphExecutor - " + "(event - %s) - number of leaf nodes - %d" % + (node.event, len(l_nodes))) + + if not l_nodes: + if not self.graph.waiting_events(node): + LOG.debug("GraphExecutor - " + "(event - %s) - Scheduling event" % + (node.event)) + self.manager.schedule_graph_event(node.event, self.graph) + self.graph.unlink_node(node) + + if l_nodes: + for l_node in l_nodes: + LOG.debug("GraphExecutor -" + "(event - %s) executing leaf node" % + (node.event)) + self.run(node=l_node) + + @set_node + def event_complete(self, result, event=None, node=None): + LOG.debug("GraphExecutor - (event - %s) complete" % + (node.event)) + node.result = result + p_node = self.graph.remove_node(node) + if p_node: + LOG.debug("GraphExecutor - " + "(event - %s) complete, rerunning parent - %s" % + (node.event, p_node.event)) + self.run(node=p_node) diff --git a/gbpservice/nfp/core/launcher.py b/gbpservice/nfp/core/launcher.py new file mode 100644 index 000000000..b400feda3 --- /dev/null +++ b/gbpservice/nfp/core/launcher.py @@ -0,0 +1,92 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import time + +from oslo_service import service as oslo_service + +from gbpservice.nfp.core import log as nfp_logging + +LOG = nfp_logging.getLogger(__name__) +ProcessLauncher = oslo_service.ProcessLauncher + +"""Worker process launcher. + + Derives the oslo process launcher to + launch childrens with python multiprocessing + as oppose to os.fork(), coz, communication + is needed from parent->child not just the + parallel execution. +""" + + +class NfpLauncher(ProcessLauncher): + + def __init__(self, conf): + super(NfpLauncher, self).__init__(conf) + + def child(self, service, ppipe, cpipe, controller): + service.parent_pipe = ppipe + service.pipe = cpipe + service.controller = controller + self.launcher = self._child_process(service) + while True: + self._child_process_handle_signal() + status, signo = self._child_wait_for_exit_or_signal( + self.launcher) + if not oslo_service._is_sighup_and_daemon(signo): + self.launcher.wait() + break + self.launcher.restart() + + os._exit(status) + + def _start_child(self, wrap): + if len(wrap.forktimes) > wrap.workers: + # Limit ourselves to one process a second (over the period of + # number of workers * 1 second). This will allow workers to + # start up quickly but ensure we don't fork off children that + # die instantly too quickly. + if time.time() - wrap.forktimes[0] < wrap.workers: + time.sleep(1) + + wrap.forktimes.pop(0) + + wrap.forktimes.append(time.time()) + + pid = self.fork_child(wrap) + + message = "Started Child Process %d" % (pid) + LOG.debug(message) + + wrap.children.add(pid) + self.children[pid] = wrap + + return pid + + def fork_child(self, wrap): + # Default use os.fork to create a child + pid = os.fork() + if pid == 0: + self.launcher = self._child_process(wrap.service) + while True: + self._child_process_handle_signal() + status, signo = self._child_wait_for_exit_or_signal( + self.launcher) + if not oslo_service._is_sighup_and_daemon(signo): + self.launcher.wait() + break + self.launcher.restart() + + os._exit(status) + return pid diff --git a/gbpservice/nfp/core/log.py b/gbpservice/nfp/core/log.py new file mode 100644 index 000000000..77d01a37c --- /dev/null +++ b/gbpservice/nfp/core/log.py @@ -0,0 +1,116 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as oslo_logging + +import logging +import os +import sys +import threading + +if hasattr(sys, 'frozen'): # support for py2exe + _srcfile = "logging%s__init__%s" % (os.sep, __file__[-4:]) +elif __file__[-4:].lower() in ['.pyc', '.pyo']: + _srcfile = __file__[:-4] + '.py' +else: + _srcfile = __file__ +_srcfile = os.path.normcase(_srcfile) + + +def currentframe(): + """Return the frame object for the caller's stack frame.""" + try: + raise Exception + except Exception: + return sys.exc_info()[2].tb_frame.f_back + + +if hasattr(sys, '_getframe'): + currentframe = lambda: sys._getframe(3) + + +class WrappedLogger(logging.Logger): + + def __init__(self, name): + logging.Logger.__init__(self, name) + + def findCaller(self): + """ + Find the stack frame of the caller so that we can note the source + file name, line number and function name. + """ + f = currentframe() + # On some versions of IronPython, currentframe() returns None if + # IronPython isn't run with -X:Frames. + if f is not None: + f = f.f_back + if f.f_back: + f = f.f_back + rv = "(unknown file)", 0, "(unknown function)" + while hasattr(f, "f_code"): + co = f.f_code + filename = os.path.normcase(co.co_filename) + if filename == _srcfile: + f = f.f_back + continue + rv = (co.co_filename, f.f_lineno, co.co_name) + break + return rv + + def makeRecord(self, name, level, fn, + lno, msg, args, exc_info, func=None, extra=None): + context = getattr(logging_context_store, 'context', None) + if context: + _prefix = context.emit() + msg = "%s-%s" % (_prefix, msg) + return super(WrappedLogger, self).makeRecord( + name, level, fn, lno, msg, + args, exc_info, func=func, extra=extra) + + +logging.setLoggerClass(WrappedLogger) +logging_context_store = threading.local() + + +class NfpLogContext(object): + + def __init__(self, **kwargs): + self.meta_id = kwargs.get('meta_id', '') + self.auth_token = kwargs.get('auth_token', '') + + def emit(self): + return "[LogMetaID:%s]" % (self.meta_id) + + def to_dict(self): + return { + 'meta_id': self.meta_id, + 'auth_token': self.auth_token} + + +def getLogger(name): + return oslo_logging.getLogger(name) + + +def store_logging_context(**kwargs): + context = NfpLogContext(**kwargs) + logging_context_store.context = context + + +def clear_logging_context(**kwargs): + logging_context_store.context = None + + +def get_logging_context(): + context = getattr(logging_context_store, 'context', None) + if context: + return context.to_dict() + return {} diff --git a/gbpservice/nfp/core/manager.py b/gbpservice/nfp/core/manager.py new file mode 100644 index 000000000..737b7604d --- /dev/null +++ b/gbpservice/nfp/core/manager.py @@ -0,0 +1,444 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import os + +from gbpservice.nfp.core import event as nfp_event +from gbpservice.nfp.core import executor as nfp_executor +from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import sequencer as nfp_sequencer + +LOG = nfp_logging.getLogger(__name__) +NfpEventManager = nfp_event.NfpEventManager + +deque = collections.deque + + +def IS_SCHEDULED_EVENT_ACK(event): + return event.desc.type == nfp_event.SCHEDULE_EVENT and ( + event.desc.flag == nfp_event.EVENT_ACK + ) + + +def IS_SCHEDULED_NEW_EVENT(event): + return event.desc.type == nfp_event.SCHEDULE_EVENT and ( + event.desc.flag == nfp_event.EVENT_NEW + ) + + +def IS_SCHEDULED_EVENT_GRAPHEVENT(event): + return IS_SCHEDULED_NEW_EVENT(event) and (event.graph) + + +def IS_EVENT_GRAPH(event): + return event.desc.type == nfp_event.EVENT_GRAPH + + +def IS_EVENT_COMPLETE(event): + return event.desc.flag == nfp_event.EVENT_COMPLETE + + +"""Manages the forked childs. + + Invoked periodically, compares the alive childs with + snapshot and reports the difference to the caller. +""" + + +class NfpProcessManager(object): + + def __init__(self, conf, controller): + self._conf = conf + self._controller = controller + self._child_snapshot = [] + + def new_child(self, pid, pipe): + # Pass, as we will learn from comparision as watcher + pass + + def _dead(self, dead): + for proc in dead: + self._child_snapshot.remove(proc) + + def _new(self, new): + if new: + self._child_snapshot.extend(new) + + def child_watcher(self): + # Get the current set of childrens + current = self._controller.get_childrens() + set1 = set(current) + set2 = set(self._child_snapshot) + new = set1 - set2 + dead = set2 - set1 + + self._dead(dead) + self._new(new) + + return list(dead), list(new) + + +"""Manager for nfp resources. + + Manages all the nfp resources - process, events, polling queue etc. + Mixes the specific managers. +""" + + +class NfpResourceManager(NfpProcessManager, NfpEventManager): + + def __init__(self, conf, controller): + self._conf = conf + self._controller = controller + # Process, Event mixin, {'pid': event_manager} + self._resource_map = {} + # Cache of event objects - {'uuid':} + self._event_cache = {} + # Not processed. Events Stored for future. + self._stash = deque() + # ID of the distributor process + self._distributor_process_id = os.getpid() + # Single sequencer to be used by all event managers + self._event_sequencer = nfp_sequencer.EventSequencer() + + NfpProcessManager.__init__(self, conf, controller) + NfpEventManager.__init__(self, conf, controller, self._event_sequencer) + + def new_child(self, pid, pipe): + """Invoked when a new child is spawned. + + Associates an event manager with this child, maintains + the map. Manages the process. If this process is killed, + the event_manager is assigned to new process. + + :param process: Context of new process. + :param pipe: Pipe to communicate with this child. + """ + ev_manager = NfpEventManager( + self._conf, self._controller, + self._event_sequencer, + pipe=pipe, pid=pid) + self._resource_map.update(dict({pid: ev_manager})) + super(NfpResourceManager, self).new_child(pid, pipe) + + def manager_run(self): + """Invoked periodically to check on resources. + + a) Checks if childrens are active or any killed. + b) Checks if there are messages from any of workers. + c) Dispatches the events ready to be handled to workers. + """ + self._child_watcher() + self._event_watcher() + + def _event_acked(self, event): + """Post handling after event is dispatched to worker. """ + if event.lifetime: + message = "(event - %s) - dispatched, polling for expiry" % ( + event.identify()) + LOG.debug(message) + self._controller.poll_add( + event, event.lifetime, self._event_life_timedout) + + def _dispatch_event(self, event): + """Dispatch event to a worker. """ + load_info = self._load_init() + event_manager, load_info = self._get_min_loaded_em(load_info) + event_manager.dispatch_event(event) + + def _execute_event_graph(self, event, state=None): + graph = event.graph + g_executor = nfp_executor.EventGraphExecutor(self, graph) + g_executor.run(event=state) + + def _graph_event_complete(self, event): + if not event.graph: + return + + graph = event.graph + g_executor = nfp_executor.EventGraphExecutor(self, graph) + g_executor.event_complete(event.result, event=event.desc.uuid) + + def _scheduled_event_graph(self, event): + if event.graph: + # Cache the event object + self._event_cache[event.desc.uuid] = event + else: + # This case happens when a serialized event of + # a graph is desequenced and is processed. + self._execute_event_graph(event, state=event.desc.uuid) + + def _get_event_from_cache(self, uuid): + try: + return self._event_cache[uuid] + except KeyError as ke: + message = "(event - %s) - no event with uuid" % ( + uuid) + LOG.error(message) + raise ke + + def schedule_graph_event(self, uuid, graph, dispatch=True): + # Get event from cache + event = self._get_event_from_cache(uuid) + # Update the graph in event, which will be stored in cache + event.graph = graph + # Schedule the event + return self._scheduled_new_event(event, dispatch=dispatch) + + def _scheduled_new_event(self, event, dispatch=True): + # Cache the event object + self._event_cache[event.desc.uuid] = event + # Event needs to be sequenced ? + if not event.sequence: + if dispatch: + # Dispatch to a worker + self._dispatch_event(event) + else: + message = "(event - %s) - sequencing" % ( + event.identify()) + LOG.debug(message) + # Sequence the event which will be processed later + self._event_sequencer.sequence(event.binding_key, event) + + return event.sequence + + def _scheduled_event_ack(self, ack_event): + try: + event = self._event_cache[ack_event.desc.uuid] + evmanager = self._get_event_manager(event.desc.worker) + assert evmanager + # Pop from the pending list of evmanager + evmanager.pop_event(event) + # May be start polling for lifetime of event + self._event_acked(event) + except KeyError as kerr: + kerr = kerr + message = "(event - %s) - acked," + "missing from cache" % (event.identify()) + LOG.error(message) + except AssertionError as aerr: + aerr = aerr + message = "(event - %s) - acked," + "process handling is dead, event will be" + "replayed in new process" % (event.identify()) + LOG.error(message) + + def _scheduled_event_complete(self, event, expired=False): + # Pop it from cache + cached_event = None + try: + cached_event = self._event_cache.pop(event.desc.uuid) + cached_event.result = event.result + # Get the em managing the event + evmanager = self._get_event_manager(event.desc.worker) + assert evmanager + evmanager.pop_event(event) + # If event expired, send a cancelled event back to worker + if expired: + event.desc.type = nfp_event.EVENT_EXPIRED + evmanager.dispatch_event(event, inc_load=False, cache=False) + except KeyError as kerr: + kerr = kerr + message = "(event - %s) - completed, not in cache" % ( + event.identify()) + LOG.error(message) + except AssertionError as aerr: + aerr = aerr + # No event manager for the event, worker could have got + # killed, ignore. + message = "(event - %s) - assertion error" % ( + event.identify()) + LOG.error(message) + pass + finally: + # Release the sequencer for this sequence, + # so that next event can get scheduled. + self._event_sequencer.release(event.binding_key, event) + self._graph_event_complete(cached_event) + + def _non_schedule_event(self, event): + if event.desc.type == nfp_event.POLL_EVENT: + message = "(event - %s) - polling for event, spacing(%d)" % ( + event.identify(), event.desc.poll_desc.spacing) + LOG.debug(message) + # If the poll event is new -> create one in cache, + # In most of the cases, polling is done for an existing + # event. + ref_uuid = event.desc.poll_desc.ref + if ref_uuid not in self._event_cache.keys(): + # Assign random worker for this poll event + event.desc.worker = self._resource_map.keys()[0] + self._event_cache[ref_uuid] = event + + self._controller.poll_add( + event, + event.desc.poll_desc.spacing, + self._event_timedout) + else: + message = "(event - %s) - Unknown non scheduled event" % ( + event.identify()) + LOG.error(message) + + def process_events_by_ids(self, event_ids): + for event_id in event_ids: + try: + event = self._event_cache[event_id] + self.process_events([event]) + except KeyError as kerr: + kerr = kerr + message = "%s - event missing in cache" % ( + event_id) + LOG.error(message) + + def process_events(self, events): + """Process the consumed event. + + Based on the event type, new event will + be added to cache, completed event is + removed from cache, poll event is added + to pollq. + + """ + for event in events: + message = "%s - processing event" % (event.identify()) + LOG.debug(message) + + if IS_EVENT_GRAPH(event): + self._execute_event_graph(event) + elif IS_SCHEDULED_EVENT_GRAPHEVENT(event): + self._scheduled_event_graph(event) + elif IS_SCHEDULED_EVENT_ACK(event): + self._scheduled_event_ack(event) + elif IS_SCHEDULED_NEW_EVENT(event): + self._scheduled_new_event(event) + elif IS_EVENT_COMPLETE(event): + self._scheduled_event_complete(event) + else: + self._non_schedule_event(event) + + def _event_watcher(self): + """Watches for events for each event manager. + + Invokes each event manager to get events from workers. + Also checks parent process event manager. + """ + events = [] + # Get events from sequencer + events = self._event_sequencer.run() + for pid, event_manager in self._resource_map.iteritems(): + events += event_manager.event_watcher(timeout=0.01) + # Process the type of events received, dispatch only the + # required ones. + self.process_events(events) + + def _init_event_manager(self, from_em, to_em): + pending_event_ids = to_em.init_from_event_manager(from_em) + # Reprocess all the pending events, module handlers can + # continue processing of unacked events. + self.process_events_by_ids(pending_event_ids) + + def _replace_child(self, killed, new): + childrens = self._controller.get_childrens() + wrap = childrens[new] + pipe = wrap.child_pipe_map[new] + self.new_child(new, pipe) + new_em = self._resource_map[new] + killed_em = self._resource_map[killed] + new_em.init_from_event_manager(killed_em) + # Dispatch the pending events to the new worker through new em + self._replay_events(new_em) + + def _replay_events(self, event_manager): + pending_event_ids = event_manager.get_pending_events() + for event_id in pending_event_ids: + try: + message = "%s - replaying event" % (event_id) + LOG.info(message) + event_manager.dispatch_event( + self._event_cache[event_id], cache=False) + except KeyError as kerr: + kerr = kerr + message = "%s - eventid missing in cache" % ( + event_id) + LOG.error(message) + + def _child_watcher(self): + dead, new = super(NfpResourceManager, self).child_watcher() + if len(dead) and len(dead) != len(new): + message = "Killed process - %s, " + "New Process - %s, " + "does not match in count, few killed process" + "will not be replaced" % (str(dead), str(new)) + LOG.error(message) + + # Loop over dead workers and assign its + # event manager to one of the new worker + for killed_proc in dead: + new_proc = new.pop() + self._replace_child(killed_proc, new_proc) + del self._resource_map[killed_proc] + + def _load_init(self): + """Intializes load with current information. """ + load_info = [] + for pid, event_manager in self._resource_map.iteritems(): + load = event_manager.get_load() + load_info.append([event_manager, load, pid]) + + return load_info + + def _get_min_loaded_em(self, load_info): + """Returns the min loaded event_manager. """ + minloaded = min(load_info, key=lambda x: x[1]) + load = minloaded[1] + 1 + load_info[load_info.index(minloaded)][1] = load + return minloaded[0], load_info + + def _get_event_manager(self, pid): + """Returns event manager of a process. """ + if pid == self._distributor_process_id: + return self + else: + return self._resource_map.get(pid) + + def _event_life_timedout(self, event): + """Callback for poller when event expires. """ + message = "(event - %s) - expired" % (event.identify()) + LOG.debug(message) + self._scheduled_event_complete(event, expired=True) + + def _event_timedout(self, event): + """Callback for poller when event timesout. """ + message = "(event - %s) - timedout" % (event.identify()) + LOG.debug(message) + try: + ref_event = self._event_cache[event.desc.poll_desc.ref] + evmanager = self._get_event_manager(ref_event.desc.worker) + assert evmanager + evmanager.dispatch_event( + event, event_type=nfp_event.POLL_EVENT, + inc_load=False, cache=False) + except KeyError as err: + err = err + message = "(event - %s) - timedout, not in cache" % ( + event.identify()) + LOG.error(message) + except AssertionError as aerr: + aerr = aerr + # Process associated with event could be killed. + # Ignore. + pass + + def stash_event(self, event): + """Stash the given event. """ + self._stash.put(event) diff --git a/gbpservice/nfp/core/module.py b/gbpservice/nfp/core/module.py new file mode 100644 index 000000000..f95df469a --- /dev/null +++ b/gbpservice/nfp/core/module.py @@ -0,0 +1,110 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from abc import abstractmethod +import six + + +def poll_event_desc(*args, **kwargs): + """Poll Event Decorator. + + NFP modules can define the poll handlers using + this decorator. + """ + def decorator(f): + f._desc = True + f._spacing = kwargs.pop('spacing', 0) + f._event = kwargs.pop('event', None) + return f + + return decorator + +"""Meta class. """ + + +class _Meta(type): + + def __init__(cls, names, bases, dict_): + """Metaclass that allows us to collect decorated periodic tasks.""" + super(_Meta, cls).__init__(names, bases, dict_) + + try: + cls._poll_desc_table = dict(cls._poll_desc_table) + except AttributeError: + cls._poll_desc_table = {} + + for value in cls.__dict__.values(): + if getattr(value, '_desc', False): + desc = value + cls._poll_desc_table[desc._event] = desc + +"""Base class for nfp event handlers. + +Nfp modules derive and implement event handlers +of this class. +""" + + +@six.add_metaclass(_Meta) +class NfpEventHandler(object): + # __metaclass__ = ABCMeta + + def __init__(self): + super(NfpEventHandler, self).__init__() + + def get_poll_desc_table(self): + return self._poll_desc_table + + @abstractmethod + def handle_event(self, event): + """To handle an event. + + :param event: Object of 'Event' class. + + Returns: None + """ + pass + + @abstractmethod + def handle_poll_event(self, event): + """To handle a poll event. + + Core framework will inovke this method of event handler + when an event timesout. + + :param event: Object of 'Event' class. + + Returns: {'poll':True/False, 'event':} + 'poll': To repoll for the event. + 'event': Updated event, if not passed core will + repoll on the old event. + """ + pass + + @abstractmethod + def event_cancelled(self, event, reason): + """Notifies that an event is cancelled by core. + + Event could get cancelled, + a) Event expired. Module can set lifetime for + an event. If event is not complete with in + the time, it is auto expired by core. + b) Event max timedout. Module can set max number + of times to poll for an event. Event is cancelled + after the max times. + + :param event: Cancelled event. Object of 'Event' class. + :param reason: Reason for cancellation. String. + + Returns: None + """ + pass diff --git a/gbpservice/nfp/core/poll.py b/gbpservice/nfp/core/poll.py new file mode 100644 index 000000000..d6b350127 --- /dev/null +++ b/gbpservice/nfp/core/poll.py @@ -0,0 +1,81 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import heapq +import sched +import time as pytime + +from oslo_service import loopingcall as oslo_looping_call +from oslo_service import periodic_task as oslo_periodic_task + +Scheduler = sched.scheduler + +"""Handles the queue of poll events. + + Derives from python scheduler, since base scheduler does + a tight loop and does not leave the invoked context. + Derived here to return if no event timedout, invoked + periodically by caller to check for timedout events. +""" + + +class NfpPollHandler(Scheduler): + + def __init__(self, conf): + self._conf = conf + Scheduler.__init__(self, pytime.time, eventlet.greenthread.sleep) + + def run(self): + """Run to find timedout event. """ + q = self._queue + timefunc = self.timefunc + pop = heapq.heappop + if q: + time, priority, action, argument = checked_event = q[0] + now = timefunc() + if now < time: + return + else: + event = pop(q) + # Verify that the event was not removed or altered + # by another thread after we last looked at q[0]. + if event is checked_event: + action(*argument) + else: + heapq.heappush(q, event) + + def poll_add(self, event, timeout, method): + """Enter the event to be polled. """ + self.enter(timeout, 1, method, (event,)) + +"""Periodic task to poll for timer events. + + Periodically checks for expiry of events. +""" + + +class PollingTask(oslo_periodic_task.PeriodicTasks): + + def __init__(self, conf, controller): + super(PollingTask, self).__init__(conf) + + self._controller = controller + pulse = oslo_looping_call.FixedIntervalLoopingCall( + self.run_periodic_tasks, None, None) + pulse.start( + interval=1, initial_delay=None) + + @oslo_periodic_task.periodic_task(spacing=1) + def poll(self, context): + # invoke the common class to handle event timeouts + self._controller.poll() diff --git a/gbpservice/nfp/core/rpc.py b/gbpservice/nfp/core/rpc.py new file mode 100644 index 000000000..8ee72e9d8 --- /dev/null +++ b/gbpservice/nfp/core/rpc.py @@ -0,0 +1,118 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg as oslo_config + +from oslo_service import loopingcall as oslo_looping_call +from oslo_service import periodic_task as oslo_periodic_task + +from neutron.agent import rpc as n_agent_rpc +from neutron.common import rpc as n_rpc + +from neutron import context as n_context + +from gbpservice.nfp.core import log as nfp_logging + +LOG = nfp_logging.getLogger(__name__) + +n_rpc.init(oslo_config.CONF) + +"""Wrapper class for Neutron RpcAgent definition. + + NFP modules will use this class for the agent definition. + Associates the state reporting of agent to ease + the usage for modules. +""" + + +class RpcAgent(n_rpc.Service): + + def __init__( + self, sc, host=None, + topic=None, manager=None, report_state=None): + # report_state = + # {, 'plugin_topic': '', 'report_interval': ''} + super(RpcAgent, self).__init__(host=host, topic=topic, manager=manager) + + # Check if the agent needs to report state + if report_state: + self._report_state = ReportState(report_state) + + def start(self): + LOG.debug("RPCAgent listening on %s" % (self.identify)) + super(RpcAgent, self).start() + + def report_state(self): + if hasattr(self, '_report_state'): + LOG.debug("Agent (%s) reporting state" % + (self.identify())) + self._report_state.report() + + def identify(self): + return "(host=%s,topic=%s)" % (self.host, self.topic) + +"""This class implements the state reporting for neutron *aaS agents + + One common place of handling of reporting logic. + Each nfp module just need to register the reporting data and + plugin topic. +""" + + +class ReportState(object): + + def __init__(self, data): + self._n_context = n_context.get_admin_context_without_session() + self._data = data + self._topic = data.pop('plugin_topic', None) + self._interval = data.pop('report_interval', 0) + self._state_rpc = n_agent_rpc.PluginReportStateAPI( + self._topic) + + def report(self): + try: + LOG.debug("Reporting state with data (%s)" % + (self._data)) + self._state_rpc.report_state(self._n_context, self._data) + self._data.pop('start_flag', None) + except AttributeError: + # This means the server does not support report_state + message = "Neutron server does not support state report." + "Agent State reporting will be disabled" + LOG.info(message) + return + except Exception: + message = "Stopped reporting agent state!" + LOG.exception(message) + +"""Periodic task to report neutron *aaS agent state. + + Derived from oslo periodic task, to report the agents state + if any, to neutron *aaS plugin. +""" + + +class ReportStateTask(oslo_periodic_task.PeriodicTasks): + + def __init__(self, conf, controller): + super(ReportStateTask, self).__init__(conf) + self._controller = controller + # Start a looping at the defined pulse + pulse = oslo_looping_call.FixedIntervalLoopingCall( + self.run_periodic_tasks, None, None) + pulse.start( + interval=5, initial_delay=None) + + @oslo_periodic_task.periodic_task(spacing=10) + def report_state(self, context): + # trigger the state reporting + self._controller.report_state() diff --git a/gbpservice/nfp/core/sequencer.py b/gbpservice/nfp/core/sequencer.py new file mode 100644 index 000000000..702ea080e --- /dev/null +++ b/gbpservice/nfp/core/sequencer.py @@ -0,0 +1,122 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections + +from gbpservice.nfp.core import log as nfp_logging + +LOG = nfp_logging.getLogger(__name__) + +deque = collections.deque + + +class SequencerEmpty(Exception): + pass + + +class SequencerBusy(Exception): + pass + +"""Sequences the events. """ + + +class EventSequencer(object): + + class Sequencer(object): + + def __init__(self): + # Events not scheduled are queued + self._waitq = deque() + # Currently scheduled event + self._scheduled = None + + def _is_busy(self): + if self._scheduled: + raise SequencerBusy + + def _is_empty(self): + if not len(self._waitq): + raise SequencerEmpty + + def sequence(self, event): + self._waitq.append(event) + + def run(self): + """Run to get event to be scheduled. + + If sequencer is busy - i.e, an event is already + scheduled and in progress raises busy except. + If sequencer is empty - i.e, no event in sequencer + raises empty except. + """ + self._is_busy() + self._is_empty() + # Pop the first element in the queue - FIFO + self._scheduled = self._waitq.popleft() + return self._scheduled + + def is_scheduled(self, event): + if self._scheduled: + return self._scheduled.desc.uuid == event.desc.uuid and ( + self._scheduled.id == event.id) + return True + + def release(self): + self._scheduled = None + + def __init__(self): + # Sequence of related events + # {key: sequencer()} + self._sequencer = {} + + def sequence(self, key, event): + try: + self._sequencer[key].sequence(event) + except KeyError: + self._sequencer[key] = self.Sequencer() + self._sequencer[key].sequence(event) + message = "Sequenced event - %s" % (event.identify()) + LOG.error(message) + + def run(self): + events = [] + # Loop over copy and delete from original + sequencers = dict(self._sequencer) + for key, sequencer in sequencers.iteritems(): + try: + event = sequencer.run() + if event: + message = "Desequence event - %s" % ( + event.identify()) + LOG.error(message) + event.sequence = False + events.append(event) + except SequencerBusy as exc: + pass + except SequencerEmpty as exc: + exc = exc + message = "Sequencer empty" + LOG.debug(message) + del self._sequencer[key] + return events + + def release(self, key, event): + try: + message = "(event - %s) checking to release" % (event.identify()) + LOG.debug(message) + if self._sequencer[key].is_scheduled(event): + message = "(event - %s) Releasing sequencer" % ( + event.identify()) + LOG.debug(message) + self._sequencer[key].release() + except KeyError: + return diff --git a/gbpservice/nfp/core/threadpool.py b/gbpservice/nfp/core/threadpool.py new file mode 100644 index 000000000..8439ecb41 --- /dev/null +++ b/gbpservice/nfp/core/threadpool.py @@ -0,0 +1,98 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import os + +from eventlet import greenpool +from eventlet import greenthread + +from gbpservice.nfp.core import log as nfp_logging + +LOG = nfp_logging.getLogger(__name__) + + +def _thread_done(gt, *args, **kwargs): + kwargs['pool'].thread_done(kwargs['thread']) + + +"""Descriptor class for green thread """ + + +class Thread(object): + + def __init__(self, thread, pool): + self.thread = thread + self.thread.link(_thread_done, pool=pool, thread=self) + + def stop(self): + self.thread.kill() + + def wait(self): + return self.thread.wait() + + def link(self, func, *args, **kwargs): + self.thread.link(func, *args, **kwargs) + + def identify(self): + return "(%d -> %s)" % (os.getpid(), 'Thread') + +"""Abstract class to manage green threads """ + + +class ThreadPool(object): + + def __init__(self, thread_pool_size=10): + self.pool = greenpool.GreenPool(thread_pool_size) + self.threads = [] + + def dispatch(self, callback, *args, **kwargs): + """Invokes the specified function in one of the thread """ + gt = self.pool.spawn(callback, *args, **kwargs) + th = Thread(gt, self) + self.threads.append(th) + return th + + def thread_done(self, thread): + """Invoked when thread is complete, remove it from cache """ + self.threads.remove(thread) + + def stop(self): + """To stop the thread """ + current = greenthread.getcurrent() + + # Make a copy + for x in self.threads[:]: + if x is current: + # Skipping the current thread + continue + try: + x.stop() + except Exception as ex: + message = "Exception - %s" % (ex) + LOG.exception(message) + + def wait(self): + """Wait for the thread """ + current = greenthread.getcurrent() + + # Make a copy + for x in self.threads[:]: + if x is current: + continue + try: + x.wait() + except eventlet.greenlet.GreenletExit: + pass + except Exception as ex: + message = "Exception - %s" % (ex) + LOG.error(message) diff --git a/gbpservice/nfp/core/worker.py b/gbpservice/nfp/core/worker.py new file mode 100644 index 000000000..e2da1efca --- /dev/null +++ b/gbpservice/nfp/core/worker.py @@ -0,0 +1,168 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import time + +from oslo_service import service as oslo_service + +from gbpservice.nfp.core import common as nfp_common +from gbpservice.nfp.core import event as nfp_event +from gbpservice.nfp.core import log as nfp_logging + +LOG = nfp_logging.getLogger(__name__) +Service = oslo_service.Service +identify = nfp_common.identify + +"""Implements worker process. + + Derives from oslo service. + Implements the worker functionality. + Waits for the events from distributor, handles them, + invokes the registered event handler in a thread. +""" + + +class NfpWorker(Service): + + def __init__(self, conf, threads=10): + # REVISIT(mak): Should #threads be a conf ? + Service.__init__(self, threads=threads) + # Parent end of duplex pipe + self.parent_pipe = None + # Pipe to recv/send messages to distributor + self.pipe = None + # Cache of event handlers + self.controller = None + self._conf = conf + self._threads = threads + + def start(self): + """Service start, runs here till dies. + + When a oslo service is launched, this method + is invoked. + Polls for messages from distributor and process + them. + """ + # Update the process type in controller. + self.controller.PROCESS_TYPE = "worker" + self.controller._pipe = self.pipe + self.event_handlers = self.controller.get_event_handlers() + while True: + try: + event = None + if self.pipe.poll(0.1): + event = self.controller.pipe_recv(self.pipe) + if event: + message = "%s - received event" % ( + self._log_meta(event)) + LOG.debug(message) + self.controller.decompress(event) + self._process_event(event) + except Exception as e: + message = "Exception - %s" % (e) + LOG.error(message) + # Yeild cpu + time.sleep(0) + + def _log_meta(self, event=None): + if event: + return "(event - %s) - (worker - %d)" % ( + event.identify(), os.getpid()) + else: + return "(worker - %d)" % (os.getpid()) + + def _send_event_ack(self, event): + # Create new event from existing one + ack_event = nfp_event.Event(id=event.id) + ack_event.id = event.id + desc = nfp_event.EventDesc(**event.desc.__dict__) + desc.uuid = event.desc.uuid + desc.flag = nfp_event.EVENT_ACK + setattr(ack_event, 'desc', desc) + self.controller.pipe_send(self.pipe, ack_event) + + def _process_event(self, event): + """Process & dispatch the event. + + Decodes the event type and performs the required + action. + Executes the registered event handler in one of the + thread. + """ + if event.desc.type == nfp_event.SCHEDULE_EVENT: + self._send_event_ack(event) + eh = self.event_handlers.get_event_handler(event.id) + self.dispatch(eh.handle_event, event) + elif event.desc.type == nfp_event.POLL_EVENT: + self.dispatch(self._handle_poll_event, event) + elif event.desc.type == nfp_event.EVENT_EXPIRED: + eh = self.event_handlers.get_event_handler(event.id) + self.dispatch(eh.event_cancelled, event, 'EXPIRED') + + def _build_poll_status(self, ret, event): + status = {'poll': True, 'event': event} + if ret: + status['poll'] = ret.get('poll', status['poll']) + status['event'] = ret.get('event', status['event']) + status['event'].desc = event.desc + + return status + + def _repoll(self, ret, event, eh): + status = self._build_poll_status(ret, event) + if status['poll']: + message = ("(event - %s) - repolling event -" + "pending times - %d") % ( + event.identify(), event.desc.poll_desc.max_times) + LOG.debug(message) + if event.desc.poll_desc.max_times: + self.controller.pipe_send(self.pipe, status['event']) + else: + message = ("(event - %s) - max timed out," + "calling event_cancelled") % (event.identify()) + LOG.debug(message) + eh.event_cancelled(event, 'MAX_TIMED_OUT') + + def _handle_poll_event(self, event): + ret = {} + event.desc.poll_desc.max_times -= 1 + poll_handler = self.event_handlers.get_poll_handler(event.id) + event_handler = self.event_handlers.get_event_handler(event.id) + try: + ret = poll_handler(event) + except TypeError: + ret = poll_handler(event_handler, event) + self._repoll(ret, event, event_handler) + + def log_dispatch(self, handler, event, *args): + try: + nfp_logging.store_logging_context(**(event.context)) + handler(event, *args) + nfp_logging.clear_logging_context() + except Exception as e: + message = "%r" % e + LOG.error(message) + handler(event, *args) + + def dispatch(self, handler, event, *args): + if self._threads: + th = self.tg.add_thread(self.log_dispatch, handler, event, *args) + message = "%s - (handler - %s) - dispatched to thread %d" % ( + self._log_meta(), identify(handler), th.ident) + LOG.debug(message) + else: + handler(event, *args) + message = "%s - (handler - %s) - invoked" % ( + self._log_meta(), identify(handler)) + LOG.debug(message) diff --git a/gbpservice/nfp/utils/__init__.py b/gbpservice/nfp/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gbpservice/nfp/utils/forked_pdb.py b/gbpservice/nfp/utils/forked_pdb.py new file mode 100644 index 000000000..d5068ba82 --- /dev/null +++ b/gbpservice/nfp/utils/forked_pdb.py @@ -0,0 +1,38 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pdb +import sys + +"""For debugging inside a child process. + + import pdb;pdb.set_trace() does not work + with python multiprocessing. + Instead use below pdb class to debug inside + a worker process / child process. +""" + + +class ForkedPdb(pdb.Pdb): + + """A Pdb subclass that may be used + from a forked multiprocessing child + + """ + + def interaction(self, *args, **kwargs): + _stdin = sys.stdin + try: + sys.stdin = file('/dev/stdin') + pdb.Pdb.interaction(self, *args, **kwargs) + finally: + sys.stdin = _stdin