From afcf213cb5e60e9c6a3d8ab5e0faf43fdb7c50c6 Mon Sep 17 00:00:00 2001 From: Anusha Ramineni Date: Mon, 31 Jul 2017 15:57:02 +0530 Subject: [PATCH] Add async executor This commit adds the framework to support async execution of functions. Will be used in syncing pooled resources while podmanager creation. Also, this adds periodic task support which is used to periodically sync pooled resources after particular interval. Partially-Implements blueprint add-device-orchestration Change-Id: I48f2358e4a7662898796c82d6a47aa6d947495e5 --- requirements.txt | 2 + valence/api/route.py | 4 + valence/cmd/api.py | 26 +++- valence/common/async.py | 113 ++++++++++++++++++ valence/conf/podm.py | 8 ++ valence/controller/podmanagers.py | 20 +++- valence/controller/pooled_devices.py | 6 + valence/tests/unit/common/test_async.py | 51 ++++++++ .../tests/unit/controller/test_podmanagers.py | 27 +++++ 9 files changed, 254 insertions(+), 3 deletions(-) create mode 100644 valence/common/async.py create mode 100644 valence/tests/unit/common/test_async.py diff --git a/requirements.txt b/requirements.txt index d15c19f..310b088 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,9 +4,11 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 aniso8601==1.2.0 click==6.6 +eventlet>=0.18.2,!=0.18.3,!=0.20.1,<0.21.0 # MIT Flask!=0.11,<1.0,>=0.10 # BSD Flask-Cors==3.0.2 Flask-RESTful>=0.3.5 # BSD +futurist>=1.2.0 # Apache-2.0 itsdangerous==0.24 jsonschema<3.0.0,>=2.6.0 # MIT Jinja2!=2.9.0,!=2.9.1,!=2.9.2,!=2.9.3,!=2.9.4,>=2.8 # BSD License (3 clause) diff --git a/valence/api/route.py b/valence/api/route.py index 6841099..2a2f4d3 100644 --- a/valence/api/route.py +++ b/valence/api/route.py @@ -19,7 +19,11 @@ import flask_cors import flask_restful from six.moves import http_client +# Note: setup app needs to be called before the valence imports +# for config options initialization to take place. from valence.api import app as flaskapp +app = flaskapp.get_app() + import valence.api.root as api_root import valence.api.v1.devices as v1_devices import valence.api.v1.flavors as v1_flavors diff --git a/valence/cmd/api.py b/valence/cmd/api.py index ce1d7b6..51d8985 100755 --- a/valence/cmd/api.py +++ b/valence/cmd/api.py @@ -15,13 +15,16 @@ import logging +import eventlet +eventlet.monkey_patch(os=False) import gunicorn.app.base from valence.api.route import app as application +from valence.common import async import valence.conf +from valence.controller import pooled_devices CONF = valence.conf.CONF - LOG = logging.getLogger(__name__) @@ -42,6 +45,24 @@ class StandaloneApplication(gunicorn.app.base.BaseApplication): return self.application +def start_periodic_tasks(server): + """Starts asynchronous periodic sync on app startup + + If enabled in configuration this function will start periodic sync + of pooled resources in background. + + """ + if CONF.podm.enable_periodic_sync: + async.start_periodic_worker([( + pooled_devices.PooledDevices.synchronize_devices, None, None)]) + return + + +def on_server_exit(server): + """Performs cleanup tasks. """ + async.stop_periodic_tasks() + + def main(): options = { 'bind': '%s:%s' % (CONF.api.bind_host, CONF.api.bind_port), @@ -50,6 +71,9 @@ def main(): 'workers': CONF.api.workers, 'loglevel': CONF.api.log_level, 'errorlog': CONF.api.log_file, + 'worker_class': 'eventlet', + 'when_ready': start_periodic_tasks, + 'on_exit': on_server_exit } LOG.info(("Valence Server on http://%(host)s:%(port)s"), {'host': CONF.api.bind_host, 'port': CONF.api.bind_port}) diff --git a/valence/common/async.py b/valence/common/async.py new file mode 100644 index 0000000..d2fd1de --- /dev/null +++ b/valence/common/async.py @@ -0,0 +1,113 @@ +# Copyright (c) 2017 NEC, Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +import futurist + +from valence.common import exception + +LOG = logging.getLogger(__name__) + +_executor = None +_periodics_worker = None + + +def executor(): + global _executor + if not _executor: + _executor = futurist.GreenThreadPoolExecutor(max_workers=10) + return _executor + + +def start_periodic_worker(callables): + """Starts periodic execution of function passed in callables + + To enable this: + 1. Pass callables in following format + [(func, (arg1, arg2), {}), + (func2, (arg1, arg2), {}),] + 2. Decorate func as follow: + @periodics.periodic(spacing=2, enabled=True) + def func(): + pass + + :param callables: pass functions in this to execute periodically + :returns: Future object + """ + global _periodics_worker + _periodics_worker = futurist.periodics.PeriodicWorker( + callables=callables, + executor_factory=futurist.periodics.ExistingExecutor(executor())) + + task_worker = executor().submit(_periodics_worker.start) + task_worker.add_done_callback(_handle_exceptions) + + +def stop_periodic_tasks(): + """Stops all periodic tasks and cleanup resources. """ + + global _periodics_worker + if _periodics_worker is not None: + try: + _periodics_worker.stop() + _periodics_worker.wait() + except Exception: + LOG.exception("Exception occurred when stopping periodic workers") + _periodics_worker = None + + if _executor and _executor.alive: + _executor.shutdown(wait=True) + + LOG.debug("periodic tasks stopped successfully") + + +def _spawn_worker(func, *args, **kwargs): + + """Creates a greenthread to run func(*args, **kwargs). + + Spawns a greenthread if there are free slots in pool, otherwise raises + exception. Execution control returns immediately to the caller. + + :returns: Future object. + :raises: NoFreeWorker if worker pool is currently full. + """ + try: + return executor().submit(func, *args, **kwargs) + except futurist.RejectedSubmission: + raise exception.ValenceException("No free worker available") + + +def async(func): + """Start a job in new background thread. + + To start a async job, decorate the function as follows: + Example: + @async.async + def test(): + pass + """ + def wrapper(*args, **kwargs): + LOG.info("starting async thread for function %s", func.__name__) + future = _spawn_worker(func, *args, **kwargs) + future.add_done_callback(_handle_exceptions) + return wrapper + + +def _handle_exceptions(fut): + try: + fut.result() + except Exception: + msg = 'Unexpected exception in background thread' + LOG.exception(msg) diff --git a/valence/conf/podm.py b/valence/conf/podm.py index 785858e..ea05014 100644 --- a/valence/conf/podm.py +++ b/valence/conf/podm.py @@ -42,6 +42,14 @@ podm_opts = [ default='/redfish/v1/', help=_('The URL extension that specifies the ' 'Redfish API version that valence will interact with')), + cfg.BoolOpt('enable_periodic_sync', + default=False, + help=_('To enable periodic task to automatically sync' + 'resources of podmanager with DB.')), + cfg.IntOpt('sync_interval', + default=30, + help=_('Time interval(in seconds) after which devices will be' + 'synced periodically.')), ] diff --git a/valence/controller/podmanagers.py b/valence/controller/podmanagers.py index 7cdedf1..27a8a71 100644 --- a/valence/controller/podmanagers.py +++ b/valence/controller/podmanagers.py @@ -14,13 +14,16 @@ import logging +from valence.common import async from valence.common import exception from valence.common import utils +import valence.conf from valence.controller import nodes from valence.controller import pooled_devices from valence.db import api as db_api from valence.podmanagers import manager +CONF = valence.conf.CONF LOG = logging.getLogger(__name__) @@ -63,8 +66,7 @@ def create_podmanager(values): values['status'] = mng.podm.get_status() podm = db_api.Connection.create_podmanager(values).as_dict() # updates all devices corresponding to this podm in DB - # TODO(Akhil): Make this as asynchronous action - pooled_devices.PooledDevices.update_device_info(podm['uuid']) + update_podm_resources_to_db(podm['uuid']) return podm @@ -83,3 +85,17 @@ def delete_podmanager(uuid): nodes.Node(node['uuid']).delete_composed_node(node['uuid']) return db_api.Connection.delete_podmanager(uuid) + + +@async.async +def update_podm_resources_to_db(podm_id): + """Starts asynchronous one_time sync + + As set in configuration this function will sync pooled resources + one time if background periodic sync is disabled. + + :param podm_id: to asynchronously sync devices of particular podm + """ + if not CONF.podm.enable_periodic_sync: + pooled_devices.PooledDevices.synchronize_devices(podm_id) + return diff --git a/valence/controller/pooled_devices.py b/valence/controller/pooled_devices.py index 0fccd8e..22a168c 100644 --- a/valence/controller/pooled_devices.py +++ b/valence/controller/pooled_devices.py @@ -14,10 +14,14 @@ import logging +from futurist import periodics + from valence.common import exception +import valence.conf from valence.db import api as db_api from valence.podmanagers import manager +CONF = valence.conf.CONF LOG = logging.getLogger(__name__) @@ -50,6 +54,8 @@ class PooledDevices(object): return db_api.Connection.get_device_by_uuid(device_id).as_dict() @classmethod + @periodics.periodic(spacing=CONF.podm.sync_interval, enabled=True, + run_immediately=True) def synchronize_devices(cls, podm_id=None): """Sync devices connected to podmanager(s) diff --git a/valence/tests/unit/common/test_async.py b/valence/tests/unit/common/test_async.py new file mode 100644 index 0000000..bd77cf2 --- /dev/null +++ b/valence/tests/unit/common/test_async.py @@ -0,0 +1,51 @@ +# Copyright (c) 2018 NEC, Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +import futurist +import mock + +from valence.common import async +from valence.common import exception + + +class AsyncTestCase(unittest.TestCase): + def setUp(self): + super(AsyncTestCase, self).setUp() + self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor) + self.periodics = mock.Mock(spec=futurist.periodics.PeriodicWorker) + async._executor = self.executor + async._periodics_worker = self.periodics + + def test__spawn_worker(self): + async._spawn_worker('fake', 1, foo='bar') + self.executor.submit.assert_called_once_with('fake', 1, foo='bar') + + def test__spawn_worker_none_free(self): + self.executor.submit.side_effect = futurist.RejectedSubmission() + self.assertRaises(exception.ValenceException, + async._spawn_worker, 'fake') + + def test_start_periodic_tasks(self): + fake_callable = mock.MagicMock() + async.start_periodic_worker([(fake_callable, None, None)]) + self.executor.submit.assert_called_once_with( + async._periodics_worker.start) + + def test_stop_periodic_tasks(self): + async.stop_periodic_tasks() + self.periodics.stop.assert_called() + self.periodics.wait.assert_called() + self.executor.shutdown.assert_called() diff --git a/valence/tests/unit/controller/test_podmanagers.py b/valence/tests/unit/controller/test_podmanagers.py index 3e962c1..5c2f7aa 100644 --- a/valence/tests/unit/controller/test_podmanagers.py +++ b/valence/tests/unit/controller/test_podmanagers.py @@ -17,6 +17,7 @@ import mock from valence.common.exception import BadRequest from valence.controller import podmanagers +from valence.podmanagers import podm_base class TestPodManagers(unittest.TestCase): @@ -104,3 +105,29 @@ class TestPodManagers(unittest.TestCase): podmanagers.update_podmanager('fake-podm-id', values) mock_db_update.assert_called_once_with('fake-podm-id', result_values) + + @mock.patch('valence.redfish.sushy.sushy_instance.RedfishInstance') + @mock.patch('valence.controller.podmanagers.update_podm_resources_to_db') + @mock.patch('valence.db.api.Connection.create_podmanager') + @mock.patch('valence.podmanagers.manager.Manager') + @mock.patch('valence.controller.podmanagers._check_creation') + def test_create_podmanager(self, mock_creation, mock_mng, mock_db_create, + mock_resource_update, mock_sushy): + values = {"name": "podm_name", "url": "https://10.240.212.123", + "driver": "redfishv1", "status": None, + "authentication": [{ + "type": "basic", + "auth_items": {"username": "xxxxxxx", + "password": "xxxxxxx"}}]} + mock_creation.return_value = values + mock_mng.podm.return_value = podm_base.PodManagerBase( + 'fake', 'fake-pass', 'http://fake-url') + podmanagers.create_podmanager('fake-values') + mock_db_create.assert_called_once_with(values) + mock_resource_update.assert_called() + + @mock.patch('valence.common.async._spawn_worker') + def test_update_podm_resources_to_db(self, mock_worker): + mock_worker.return_value = mock.MagicMock() + podmanagers.update_podm_resources_to_db('fake-podm-id') + mock_worker.assert_called()