From fbac3c07a2aaa97454319394435efeca2bc3b07d Mon Sep 17 00:00:00 2001 From: Monty Taylor Date: Sat, 28 Mar 2015 19:16:09 -0400 Subject: [PATCH] Add task management framework Some shade clients, like nodepool, have a need to manage threads that are doing API tasks and to manage rate limiting. That's not really a thing that's appropriate for shade to know about, but because shade does want to know the business logic, we need to provide a mechanism for people to do that. TaskManager is essentially nodepool.TaskManager except with threading features removed and reworked to do instantaneous blocking execution, since that's the behavior that most users will expect. A patch will follow to move API calls to be manged by Tasks and the shade.TaskManager. Once those are there, then nodepool can pass in its TaskManager and the API operations in shade will naturally be managed by the rate-limited operations in nodepool. Co-Authored-By: James E. Blair Change-Id: I60d25271de4009ee3f7f7684c72299fbd5d0f54f --- shade/__init__.py | 11 ++++++ shade/task_manager.py | 81 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 shade/task_manager.py diff --git a/shade/__init__.py b/shade/__init__.py index b80f09bab..7a9b937de 100644 --- a/shade/__init__.py +++ b/shade/__init__.py @@ -42,6 +42,7 @@ import warnings warnings.filterwarnings('ignore', 'Certificate has no `subjectAltName`') from shade import meta +from shade import task_manager __version__ = pbr.version.VersionInfo('shade').version_string() OBJECT_MD5_KEY = 'x-object-meta-x-shade-md5' @@ -192,6 +193,10 @@ class OpenStackCloud(object): (optional, defaults to dogpile.cache.null) :param dict cache_arguments: Additional arguments to pass to the cache constructor (optional, defaults to None) + :param TaskManager manager: Optional task manager to use for running + OpenStack API tasks. Unless you're doing + rate limiting client side, you almost + certainly don't need this. (optional) """ def __init__(self, cloud, auth, @@ -204,6 +209,7 @@ class OpenStackCloud(object): debug=False, cache_interval=None, cache_class='dogpile.cache.null', cache_arguments=None, + manager=None, **kwargs): self.name = cloud @@ -213,6 +219,11 @@ class OpenStackCloud(object): self.endpoint_type = endpoint_type self.private = private self.api_timeout = api_timeout + if manager is not None: + self.manager = manager + else: + self.manager = task_manager.TaskManager( + name=self.name, client=self) self.service_types = _get_service_values(kwargs, 'service_type') self.service_names = _get_service_values(kwargs, 'service_name') diff --git a/shade/task_manager.py b/shade/task_manager.py new file mode 100644 index 000000000..9e410ba3f --- /dev/null +++ b/shade/task_manager.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import sys +import logging +import time + +import six + + +@six.add_metaclass(abc.ABCMeta) +class Task(object): + def __init__(self, **kw): + self._exception = None + self._traceback = None + self._result = None + self.args = kw + + @abc.abstractmethod + def main(self, client): + """ Override this method with the actual workload to be performed """ + + def done(self, result): + self._result = result + + def exception(self, e, tb): + self._exception = e + self._traceback = tb + + def wait(self): + if self._exception: + six.reraise(self._exception, None, self._traceback) + return self._result + + def run(self, client): + try: + self.done(self.main(client)) + except Exception as e: + self.exception(e, sys.exc_info()[2]) + + +class TaskManager(object): + log = logging.getLogger("shade.TaskManager") + + def __init__(self, client, name): + self.name = name + self._client = client + + def stop(self): + """ This is a direct action passthrough TaskManager """ + pass + + def run(self): + """ This is a direct action passthrough TaskManager """ + pass + + def submitTask(self, task): + self.log.debug( + "Manager %s running task %s" % (self.name, type(task).__name__)) + start = time.time() + task.run(self._client) + end = time.time() + self.log.debug( + "Manager %s ran task %s in %ss" % (self.name, task, (end - start))) + return task.wait()