From 85b5b6da1d977c51cfe18b85d44a188680a2ccfb Mon Sep 17 00:00:00 2001 From: Nirmal Ranganathan Date: Wed, 30 May 2012 15:11:34 -0500 Subject: [PATCH] Reinstantiating the Task Manager aka Reddwarf Manager --- bin/reddwarf-taskmanager | 3 + etc/reddwarf/reddwarf-taskmanager.conf.sample | 43 +------------- reddwarf/common/exception.py | 8 ++- reddwarf/common/excutils.py | 48 ++++++++++++++++ reddwarf/common/service.py | 13 +++-- reddwarf/guestagent/service.py | 2 +- reddwarf/taskmanager/api.py | 57 +++++++++++++++++++ reddwarf/taskmanager/manager.py | 27 +++++++-- reddwarf/taskmanager/service.py | 55 ++---------------- 9 files changed, 153 insertions(+), 103 deletions(-) create mode 100644 reddwarf/common/excutils.py create mode 100644 reddwarf/taskmanager/api.py diff --git a/bin/reddwarf-taskmanager b/bin/reddwarf-taskmanager index 5eb5dda035..fa69d636ee 100755 --- a/bin/reddwarf-taskmanager +++ b/bin/reddwarf-taskmanager @@ -24,8 +24,10 @@ import optparse import os import sys + gettext.install('reddwarf', unicode=1) + # If ../reddwarf/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), @@ -38,6 +40,7 @@ from reddwarf import version from reddwarf.common import config from reddwarf.common import service + if __name__ == '__main__': parser = optparse.OptionParser(version="%%prog %s" % version.version_string()) diff --git a/etc/reddwarf/reddwarf-taskmanager.conf.sample b/etc/reddwarf/reddwarf-taskmanager.conf.sample index 0cc6442959..5c6228744d 100644 --- a/etc/reddwarf/reddwarf-taskmanager.conf.sample +++ b/etc/reddwarf/reddwarf-taskmanager.conf.sample @@ -5,12 +5,6 @@ verbose = True # Show debugging output in logs (sets DEBUG log level output) debug = True -# Address to bind the API server -bind_host = 0.0.0.0 - -# Port the bind the API server to -bind_port = 8778 - # AMQP Connection info rabbit_password=f7999d1955c5014aa32c @@ -19,7 +13,6 @@ rabbit_password=f7999d1955c5014aa32c # See: http://www.sqlalchemy.org/docs/05/reference/sqlalchemy/connections.html#sqlalchemy.create_engine sql_connection = sqlite:///reddwarf_test.sqlite # sql_connection = mysql://root:root@localhost/reddwarf -#sql_connection = postgresql://reddwarf:reddwarf@localhost/reddwarf # Period in seconds after which SQLAlchemy should reestablish its connection # to the database. @@ -33,8 +26,6 @@ sql_idle_timeout = 3600 #DB Api Implementation db_api_implementation = "reddwarf.db.sqlalchemy.api" -# Path to the extensions -api_extensions_path = reddwarf/extensions # Configuration options for talking to nova via the novaclient. # These options are for an admin user in your keystone config. @@ -58,38 +49,6 @@ notifier_queue_port = 5672 notifier_queue_virtual_host = / notifier_queue_transport = memory -[composite:reddwarf-taskmanager] -use = call:reddwarf.common.wsgi:versioned_urlmap -/: versions -/v0.1: reddwarf-taskmanagerapi -[app:versions] -paste.app_factory = reddwarf.versions:app_factory - -[pipeline:reddwarf-taskmanagerapi] -pipeline = taskmanagerapp -#pipeline = debug extensions reddwarfapp - -[filter:extensions] -paste.filter_factory = reddwarf.common.extensions:factory - -[filter:tokenauth] -paste.filter_factory = keystone.middleware.auth_token:filter_factory -service_protocol = http -service_host = 127.0.0.1 -service_port = 5000 -auth_host = 127.0.0.1 -auth_port = 35357 -auth_protocol = http -auth_uri = http://127.0.0.1:5000/ -admin_token = be19c524ddc92109a224 - -[filter:authorization] -paste.filter_factory = reddwarf.common.auth:AuthorizationMiddleware.factory - -[app:taskmanagerapp] +[app:reddwarf-taskmanager] paste.app_factory = reddwarf.taskmanager.service:app_factory - -#Add this filter to log request and response for debugging -[filter:debug] -paste.filter_factory = reddwarf.common.wsgi:Debug diff --git a/reddwarf/common/exception.py b/reddwarf/common/exception.py index edd8733744..fd94e90f56 100644 --- a/reddwarf/common/exception.py +++ b/reddwarf/common/exception.py @@ -70,7 +70,7 @@ class ComputeInstanceNotFound(NotFound): message = _("Resource %(instance_id)s can not be retrieved.") - + class DnsRecordNotFound(NotFound): message = _("DnsRecord with name= %(name)s not found.") @@ -118,3 +118,9 @@ class VolumeAttachmentsNotFound(NotFound): class VolumeCreationFailure(ReddwarfError): message = _("Failed to create a volume in Nova.") + + +class TaskManagerError(ReddwarfError): + + message = _("An error occurred communicating with the task manager: " + "%(original_message)s.") diff --git a/reddwarf/common/excutils.py b/reddwarf/common/excutils.py new file mode 100644 index 0000000000..6003ab0663 --- /dev/null +++ b/reddwarf/common/excutils.py @@ -0,0 +1,48 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Exception related utilities. +""" + +import contextlib +import logging +import sys +import traceback + + +@contextlib.contextmanager +def save_and_reraise_exception(): + """Save current exception, run some code and then re-raise. + + In some cases the exception context can be cleared, resulting in None + being attempted to be reraised after an exception handler is run. This + can happen when eventlet switches greenthreads or when running an + exception handler, code raises and catches an exception. In both + cases the exception context will be cleared. + + To work around this, we save the exception state, run handler code, and + then re-raise the original exception. If another exception occurs, the + saved exception is logged and the new exception is reraised. + """ + type_, value, tb = sys.exc_info() + try: + yield + except Exception: + logging.error('Original exception being dropped: %s' % + (traceback.format_exception(type_, value, tb))) + raise + raise type_, value, tb diff --git a/reddwarf/common/service.py b/reddwarf/common/service.py index b98e86662b..64920e8c4d 100644 --- a/reddwarf/common/service.py +++ b/reddwarf/common/service.py @@ -17,9 +17,11 @@ """STOLEN FROM NOVA.""" +import functools import inspect import os import logging +import socket import eventlet import greenlet @@ -71,6 +73,8 @@ class Service(object): def __init__(self, host, binary, topic, manager, report_interval=None, periodic_interval=None, *args, **kwargs): + if not host: + host = socket.gethostname() self.host = host self.binary = binary self.topic = topic @@ -93,7 +97,7 @@ class Service(object): def __getattr__(self, key): """This method proxy's the calls to the manager implementation""" manager = self.__dict__.get('manager', None) - return getattr(manager, key) + return functools.partial(manager._wrapper, key) def start(self): vcs_string = version.version_string_with_vcs() @@ -164,9 +168,6 @@ class Service(object): class Manager(object): def __init__(self, host=None): - if not host: - #TODO(hub-cap): We need to fix this - host = "ubuntu" self.host = host super(Manager, self).__init__() @@ -182,6 +183,10 @@ class Manager(object): """ pass + def _wrapper(self, method, context, *args, **kwargs): + """Wraps the called functions with additional information.""" + func = getattr(self, method) + return func(context, *args, **kwargs) _launcher = None diff --git a/reddwarf/guestagent/service.py b/reddwarf/guestagent/service.py index 93583fc39d..9355feb174 100644 --- a/reddwarf/guestagent/service.py +++ b/reddwarf/guestagent/service.py @@ -22,7 +22,7 @@ import webob.exc from reddwarf.common import wsgi from reddwarf import rpc -LOG = logging.getLogger('reddwarf.taskmanager.service') +LOG = logging.getLogger(__name__) class Controller(wsgi.Controller): diff --git a/reddwarf/taskmanager/api.py b/reddwarf/taskmanager/api.py new file mode 100644 index 0000000000..4704f93ffa --- /dev/null +++ b/reddwarf/taskmanager/api.py @@ -0,0 +1,57 @@ +# Copyright 2012 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +""" +Routes all the requests to the task manager. +""" + + +import logging + +from reddwarf import rpc +from reddwarf.common import config +from reddwarf.common import exception +from reddwarf.common import utils + + +LOG = logging.getLogger(__name__) + + +class API(object): + """API for interacting with the task manager.""" + + def __init__(self, context): + self.context = context + + def _call(self, method_name, **kwargs): + try: + return rpc.call(self.context, self._get_routing_key(), + {"method": method_name, "args": kwargs}) + except Exception as e: + LOG.error(e) + raise exception.TaskManagerError(original_message=str(e)) + + def _cast(self, method_name, **kwargs): + try: + rpc.cast(self.context, self._get_routing_key(), + {"method": method_name, "args": kwargs}) + except Exception as e: + LOG.error(e) + raise exception.TaskManagerError(original_message=str(e)) + + def _get_routing_key(self): + """Create the routing key for the taskmanager""" + return "taskmanager" diff --git a/reddwarf/taskmanager/manager.py b/reddwarf/taskmanager/manager.py index cb4ace2a70..7560450ea5 100644 --- a/reddwarf/taskmanager/manager.py +++ b/reddwarf/taskmanager/manager.py @@ -16,18 +16,37 @@ # under the License. import logging +import weakref + +from eventlet import greenthread + +from reddwarf.common import excutils +from reddwarf.common import service + LOG = logging.getLogger(__name__) -class TaskManager(object): +class TaskManager(service.Manager): """Task manager impl""" def __init__(self, *args, **kwargs): + self.tasks = weakref.WeakKeyDictionary() + super(TaskManager, self).__init__(*args, **kwargs) LOG.info(_("TaskManager init %s %s") % (args, kwargs)) def periodic_tasks(self, raise_on_error=False): - LOG.info(_("Launching a periodic task")) + LOG.debug("No. of running tasks: %r" % len(self.tasks)) - def test_method(self, context): - LOG.info(_("test_method called with context %s") % context) + def _wrapper(self, method, context, *args, **kwargs): + """Maps the respective manager method with a task counter.""" + # TODO(rnirmal): Just adding a basic counter. Will revist and + # re-implement when we have actual tasks. + self.tasks[greenthread.getcurrent()] = context + try: + func = getattr(self, method) + func(context, *args, **kwargs) + except Exception as e: + excutils.save_and_reraise_exception() + finally: + del self.tasks[greenthread.getcurrent()] diff --git a/reddwarf/taskmanager/service.py b/reddwarf/taskmanager/service.py index 3b021ce13a..50a8bd209a 100644 --- a/reddwarf/taskmanager/service.py +++ b/reddwarf/taskmanager/service.py @@ -16,61 +16,14 @@ # under the License. import logging -import routes -import webob.exc - -from reddwarf.common import wsgi -from reddwarf import rpc - -LOG = logging.getLogger('reddwarf.taskmanager.service') -class Controller(wsgi.Controller): - """Base controller class.""" - connected = False - - #TODO(hub-cap):Make this not so nasty, this should not be here - def _create_connection(self, topic, host): - # Create a connection for rpc usage - if (self.connected): - return - self.conn = rpc.create_connection(new=True) - LOG.debug(_("Creating Consumer connection for Service %s") % - topic) - - # Share this same connection for these Consumers - self.conn.create_consumer(topic, self, fanout=False) - - node_topic = '%s.%s' % (topic, host) - self.conn.create_consumer(node_topic, self, fanout=False) - - self.conn.create_consumer(topic, self, fanout=True) - - # Consume from all consumers in a thread - self.conn.consume_in_thread() - - def index(self, req): - """Gets a list of all tasks available""" - self._create_connection("foo", "ubuntu") - return "All Tasks -- Impl TBD" - - def show(self, req, id): - """Gets detailed information about an individual task""" - return "Single Task -- Impl TBD" +LOG = logging.getLogger(__name__) -class API(wsgi.Router): - """API""" - def __init__(self): - mapper = routes.Mapper() - super(API, self).__init__(mapper) - self._instance_router(mapper) - - def _instance_router(self, mapper): - resource = Controller().create_resource() - path = "/tasks" - mapper.resource("task", path, controller=resource) +class TaskService(object): + """Task Manager interface.""" def app_factory(global_conf, **local_conf): - return API() + return TaskService()