diff --git a/requirements.txt b/requirements.txt index d15c19f..310b088 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,9 +4,11 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 aniso8601==1.2.0 click==6.6 +eventlet>=0.18.2,!=0.18.3,!=0.20.1,<0.21.0 # MIT Flask!=0.11,<1.0,>=0.10 # BSD Flask-Cors==3.0.2 Flask-RESTful>=0.3.5 # BSD +futurist>=1.2.0 # Apache-2.0 itsdangerous==0.24 jsonschema<3.0.0,>=2.6.0 # MIT Jinja2!=2.9.0,!=2.9.1,!=2.9.2,!=2.9.3,!=2.9.4,>=2.8 # BSD License (3 clause) diff --git a/valence/api/route.py b/valence/api/route.py index 6841099..2a2f4d3 100644 --- a/valence/api/route.py +++ b/valence/api/route.py @@ -19,7 +19,11 @@ import flask_cors import flask_restful from six.moves import http_client +# Note: setup app needs to be called before the valence imports +# for config options initialization to take place. from valence.api import app as flaskapp +app = flaskapp.get_app() + import valence.api.root as api_root import valence.api.v1.devices as v1_devices import valence.api.v1.flavors as v1_flavors diff --git a/valence/cmd/api.py b/valence/cmd/api.py index ce1d7b6..51d8985 100755 --- a/valence/cmd/api.py +++ b/valence/cmd/api.py @@ -15,13 +15,16 @@ import logging +import eventlet +eventlet.monkey_patch(os=False) import gunicorn.app.base from valence.api.route import app as application +from valence.common import async import valence.conf +from valence.controller import pooled_devices CONF = valence.conf.CONF - LOG = logging.getLogger(__name__) @@ -42,6 +45,24 @@ class StandaloneApplication(gunicorn.app.base.BaseApplication): return self.application +def start_periodic_tasks(server): + """Starts asynchronous periodic sync on app startup + + If enabled in configuration this function will start periodic sync + of pooled resources in background. + + """ + if CONF.podm.enable_periodic_sync: + async.start_periodic_worker([( + pooled_devices.PooledDevices.synchronize_devices, None, None)]) + return + + +def on_server_exit(server): + """Performs cleanup tasks. """ + async.stop_periodic_tasks() + + def main(): options = { 'bind': '%s:%s' % (CONF.api.bind_host, CONF.api.bind_port), @@ -50,6 +71,9 @@ def main(): 'workers': CONF.api.workers, 'loglevel': CONF.api.log_level, 'errorlog': CONF.api.log_file, + 'worker_class': 'eventlet', + 'when_ready': start_periodic_tasks, + 'on_exit': on_server_exit } LOG.info(("Valence Server on http://%(host)s:%(port)s"), {'host': CONF.api.bind_host, 'port': CONF.api.bind_port}) diff --git a/valence/common/async.py b/valence/common/async.py new file mode 100644 index 0000000..d2fd1de --- /dev/null +++ b/valence/common/async.py @@ -0,0 +1,113 @@ +# Copyright (c) 2017 NEC, Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +import futurist + +from valence.common import exception + +LOG = logging.getLogger(__name__) + +_executor = None +_periodics_worker = None + + +def executor(): + global _executor + if not _executor: + _executor = futurist.GreenThreadPoolExecutor(max_workers=10) + return _executor + + +def start_periodic_worker(callables): + """Starts periodic execution of function passed in callables + + To enable this: + 1. Pass callables in following format + [(func, (arg1, arg2), {}), + (func2, (arg1, arg2), {}),] + 2. Decorate func as follow: + @periodics.periodic(spacing=2, enabled=True) + def func(): + pass + + :param callables: pass functions in this to execute periodically + :returns: Future object + """ + global _periodics_worker + _periodics_worker = futurist.periodics.PeriodicWorker( + callables=callables, + executor_factory=futurist.periodics.ExistingExecutor(executor())) + + task_worker = executor().submit(_periodics_worker.start) + task_worker.add_done_callback(_handle_exceptions) + + +def stop_periodic_tasks(): + """Stops all periodic tasks and cleanup resources. """ + + global _periodics_worker + if _periodics_worker is not None: + try: + _periodics_worker.stop() + _periodics_worker.wait() + except Exception: + LOG.exception("Exception occurred when stopping periodic workers") + _periodics_worker = None + + if _executor and _executor.alive: + _executor.shutdown(wait=True) + + LOG.debug("periodic tasks stopped successfully") + + +def _spawn_worker(func, *args, **kwargs): + + """Creates a greenthread to run func(*args, **kwargs). + + Spawns a greenthread if there are free slots in pool, otherwise raises + exception. Execution control returns immediately to the caller. + + :returns: Future object. + :raises: NoFreeWorker if worker pool is currently full. + """ + try: + return executor().submit(func, *args, **kwargs) + except futurist.RejectedSubmission: + raise exception.ValenceException("No free worker available") + + +def async(func): + """Start a job in new background thread. + + To start a async job, decorate the function as follows: + Example: + @async.async + def test(): + pass + """ + def wrapper(*args, **kwargs): + LOG.info("starting async thread for function %s", func.__name__) + future = _spawn_worker(func, *args, **kwargs) + future.add_done_callback(_handle_exceptions) + return wrapper + + +def _handle_exceptions(fut): + try: + fut.result() + except Exception: + msg = 'Unexpected exception in background thread' + LOG.exception(msg) diff --git a/valence/conf/podm.py b/valence/conf/podm.py index 785858e..ea05014 100644 --- a/valence/conf/podm.py +++ b/valence/conf/podm.py @@ -42,6 +42,14 @@ podm_opts = [ default='/redfish/v1/', help=_('The URL extension that specifies the ' 'Redfish API version that valence will interact with')), + cfg.BoolOpt('enable_periodic_sync', + default=False, + help=_('To enable periodic task to automatically sync' + 'resources of podmanager with DB.')), + cfg.IntOpt('sync_interval', + default=30, + help=_('Time interval(in seconds) after which devices will be' + 'synced periodically.')), ] diff --git a/valence/controller/podmanagers.py b/valence/controller/podmanagers.py index 7cdedf1..27a8a71 100644 --- a/valence/controller/podmanagers.py +++ b/valence/controller/podmanagers.py @@ -14,13 +14,16 @@ import logging +from valence.common import async from valence.common import exception from valence.common import utils +import valence.conf from valence.controller import nodes from valence.controller import pooled_devices from valence.db import api as db_api from valence.podmanagers import manager +CONF = valence.conf.CONF LOG = logging.getLogger(__name__) @@ -63,8 +66,7 @@ def create_podmanager(values): values['status'] = mng.podm.get_status() podm = db_api.Connection.create_podmanager(values).as_dict() # updates all devices corresponding to this podm in DB - # TODO(Akhil): Make this as asynchronous action - pooled_devices.PooledDevices.update_device_info(podm['uuid']) + update_podm_resources_to_db(podm['uuid']) return podm @@ -83,3 +85,17 @@ def delete_podmanager(uuid): nodes.Node(node['uuid']).delete_composed_node(node['uuid']) return db_api.Connection.delete_podmanager(uuid) + + +@async.async +def update_podm_resources_to_db(podm_id): + """Starts asynchronous one_time sync + + As set in configuration this function will sync pooled resources + one time if background periodic sync is disabled. + + :param podm_id: to asynchronously sync devices of particular podm + """ + if not CONF.podm.enable_periodic_sync: + pooled_devices.PooledDevices.synchronize_devices(podm_id) + return diff --git a/valence/controller/pooled_devices.py b/valence/controller/pooled_devices.py index 0fccd8e..22a168c 100644 --- a/valence/controller/pooled_devices.py +++ b/valence/controller/pooled_devices.py @@ -14,10 +14,14 @@ import logging +from futurist import periodics + from valence.common import exception +import valence.conf from valence.db import api as db_api from valence.podmanagers import manager +CONF = valence.conf.CONF LOG = logging.getLogger(__name__) @@ -50,6 +54,8 @@ class PooledDevices(object): return db_api.Connection.get_device_by_uuid(device_id).as_dict() @classmethod + @periodics.periodic(spacing=CONF.podm.sync_interval, enabled=True, + run_immediately=True) def synchronize_devices(cls, podm_id=None): """Sync devices connected to podmanager(s) diff --git a/valence/tests/unit/common/test_async.py b/valence/tests/unit/common/test_async.py new file mode 100644 index 0000000..bd77cf2 --- /dev/null +++ b/valence/tests/unit/common/test_async.py @@ -0,0 +1,51 @@ +# Copyright (c) 2018 NEC, Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +import futurist +import mock + +from valence.common import async +from valence.common import exception + + +class AsyncTestCase(unittest.TestCase): + def setUp(self): + super(AsyncTestCase, self).setUp() + self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor) + self.periodics = mock.Mock(spec=futurist.periodics.PeriodicWorker) + async._executor = self.executor + async._periodics_worker = self.periodics + + def test__spawn_worker(self): + async._spawn_worker('fake', 1, foo='bar') + self.executor.submit.assert_called_once_with('fake', 1, foo='bar') + + def test__spawn_worker_none_free(self): + self.executor.submit.side_effect = futurist.RejectedSubmission() + self.assertRaises(exception.ValenceException, + async._spawn_worker, 'fake') + + def test_start_periodic_tasks(self): + fake_callable = mock.MagicMock() + async.start_periodic_worker([(fake_callable, None, None)]) + self.executor.submit.assert_called_once_with( + async._periodics_worker.start) + + def test_stop_periodic_tasks(self): + async.stop_periodic_tasks() + self.periodics.stop.assert_called() + self.periodics.wait.assert_called() + self.executor.shutdown.assert_called() diff --git a/valence/tests/unit/controller/test_podmanagers.py b/valence/tests/unit/controller/test_podmanagers.py index 3e962c1..5c2f7aa 100644 --- a/valence/tests/unit/controller/test_podmanagers.py +++ b/valence/tests/unit/controller/test_podmanagers.py @@ -17,6 +17,7 @@ import mock from valence.common.exception import BadRequest from valence.controller import podmanagers +from valence.podmanagers import podm_base class TestPodManagers(unittest.TestCase): @@ -104,3 +105,29 @@ class TestPodManagers(unittest.TestCase): podmanagers.update_podmanager('fake-podm-id', values) mock_db_update.assert_called_once_with('fake-podm-id', result_values) + + @mock.patch('valence.redfish.sushy.sushy_instance.RedfishInstance') + @mock.patch('valence.controller.podmanagers.update_podm_resources_to_db') + @mock.patch('valence.db.api.Connection.create_podmanager') + @mock.patch('valence.podmanagers.manager.Manager') + @mock.patch('valence.controller.podmanagers._check_creation') + def test_create_podmanager(self, mock_creation, mock_mng, mock_db_create, + mock_resource_update, mock_sushy): + values = {"name": "podm_name", "url": "https://10.240.212.123", + "driver": "redfishv1", "status": None, + "authentication": [{ + "type": "basic", + "auth_items": {"username": "xxxxxxx", + "password": "xxxxxxx"}}]} + mock_creation.return_value = values + mock_mng.podm.return_value = podm_base.PodManagerBase( + 'fake', 'fake-pass', 'http://fake-url') + podmanagers.create_podmanager('fake-values') + mock_db_create.assert_called_once_with(values) + mock_resource_update.assert_called() + + @mock.patch('valence.common.async._spawn_worker') + def test_update_podm_resources_to_db(self, mock_worker): + mock_worker.return_value = mock.MagicMock() + podmanagers.update_podm_resources_to_db('fake-podm-id') + mock_worker.assert_called()