Reinstantiating the Task Manager aka Reddwarf Manager

This commit is contained in:
Nirmal Ranganathan 2012-05-30 15:11:34 -05:00
parent 50eebfc229
commit 85b5b6da1d
9 changed files with 153 additions and 103 deletions

View File

@ -24,8 +24,10 @@ import optparse
import os
import sys
gettext.install('reddwarf', unicode=1)
# If ../reddwarf/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
@ -38,6 +40,7 @@ from reddwarf import version
from reddwarf.common import config
from reddwarf.common import service
if __name__ == '__main__':
parser = optparse.OptionParser(version="%%prog %s"
% version.version_string())

View File

@ -5,12 +5,6 @@ verbose = True
# Show debugging output in logs (sets DEBUG log level output)
debug = True
# Address to bind the API server
bind_host = 0.0.0.0
# Port the bind the API server to
bind_port = 8778
# AMQP Connection info
rabbit_password=f7999d1955c5014aa32c
@ -19,7 +13,6 @@ rabbit_password=f7999d1955c5014aa32c
# See: http://www.sqlalchemy.org/docs/05/reference/sqlalchemy/connections.html#sqlalchemy.create_engine
sql_connection = sqlite:///reddwarf_test.sqlite
# sql_connection = mysql://root:root@localhost/reddwarf
#sql_connection = postgresql://reddwarf:reddwarf@localhost/reddwarf
# Period in seconds after which SQLAlchemy should reestablish its connection
# to the database.
@ -33,8 +26,6 @@ sql_idle_timeout = 3600
#DB Api Implementation
db_api_implementation = "reddwarf.db.sqlalchemy.api"
# Path to the extensions
api_extensions_path = reddwarf/extensions
# Configuration options for talking to nova via the novaclient.
# These options are for an admin user in your keystone config.
@ -58,38 +49,6 @@ notifier_queue_port = 5672
notifier_queue_virtual_host = /
notifier_queue_transport = memory
[composite:reddwarf-taskmanager]
use = call:reddwarf.common.wsgi:versioned_urlmap
/: versions
/v0.1: reddwarf-taskmanagerapi
[app:versions]
paste.app_factory = reddwarf.versions:app_factory
[pipeline:reddwarf-taskmanagerapi]
pipeline = taskmanagerapp
#pipeline = debug extensions reddwarfapp
[filter:extensions]
paste.filter_factory = reddwarf.common.extensions:factory
[filter:tokenauth]
paste.filter_factory = keystone.middleware.auth_token:filter_factory
service_protocol = http
service_host = 127.0.0.1
service_port = 5000
auth_host = 127.0.0.1
auth_port = 35357
auth_protocol = http
auth_uri = http://127.0.0.1:5000/
admin_token = be19c524ddc92109a224
[filter:authorization]
paste.filter_factory = reddwarf.common.auth:AuthorizationMiddleware.factory
[app:taskmanagerapp]
[app:reddwarf-taskmanager]
paste.app_factory = reddwarf.taskmanager.service:app_factory
#Add this filter to log request and response for debugging
[filter:debug]
paste.filter_factory = reddwarf.common.wsgi:Debug

View File

@ -70,7 +70,7 @@ class ComputeInstanceNotFound(NotFound):
message = _("Resource %(instance_id)s can not be retrieved.")
class DnsRecordNotFound(NotFound):
message = _("DnsRecord with name= %(name)s not found.")
@ -118,3 +118,9 @@ class VolumeAttachmentsNotFound(NotFound):
class VolumeCreationFailure(ReddwarfError):
message = _("Failed to create a volume in Nova.")
class TaskManagerError(ReddwarfError):
message = _("An error occurred communicating with the task manager: "
"%(original_message)s.")

View File

@ -0,0 +1,48 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Exception related utilities.
"""
import contextlib
import logging
import sys
import traceback
@contextlib.contextmanager
def save_and_reraise_exception():
"""Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None
being attempted to be reraised after an exception handler is run. This
can happen when eventlet switches greenthreads or when running an
exception handler, code raises and catches an exception. In both
cases the exception context will be cleared.
To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is reraised.
"""
type_, value, tb = sys.exc_info()
try:
yield
except Exception:
logging.error('Original exception being dropped: %s' %
(traceback.format_exception(type_, value, tb)))
raise
raise type_, value, tb

View File

@ -17,9 +17,11 @@
"""STOLEN FROM NOVA."""
import functools
import inspect
import os
import logging
import socket
import eventlet
import greenlet
@ -71,6 +73,8 @@ class Service(object):
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, *args, **kwargs):
if not host:
host = socket.gethostname()
self.host = host
self.binary = binary
self.topic = topic
@ -93,7 +97,7 @@ class Service(object):
def __getattr__(self, key):
"""This method proxy's the calls to the manager implementation"""
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
return functools.partial(manager._wrapper, key)
def start(self):
vcs_string = version.version_string_with_vcs()
@ -164,9 +168,6 @@ class Service(object):
class Manager(object):
def __init__(self, host=None):
if not host:
#TODO(hub-cap): We need to fix this
host = "ubuntu"
self.host = host
super(Manager, self).__init__()
@ -182,6 +183,10 @@ class Manager(object):
"""
pass
def _wrapper(self, method, context, *args, **kwargs):
"""Wraps the called functions with additional information."""
func = getattr(self, method)
return func(context, *args, **kwargs)
_launcher = None

View File

@ -22,7 +22,7 @@ import webob.exc
from reddwarf.common import wsgi
from reddwarf import rpc
LOG = logging.getLogger('reddwarf.taskmanager.service')
LOG = logging.getLogger(__name__)
class Controller(wsgi.Controller):

View File

@ -0,0 +1,57 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Routes all the requests to the task manager.
"""
import logging
from reddwarf import rpc
from reddwarf.common import config
from reddwarf.common import exception
from reddwarf.common import utils
LOG = logging.getLogger(__name__)
class API(object):
"""API for interacting with the task manager."""
def __init__(self, context):
self.context = context
def _call(self, method_name, **kwargs):
try:
return rpc.call(self.context, self._get_routing_key(),
{"method": method_name, "args": kwargs})
except Exception as e:
LOG.error(e)
raise exception.TaskManagerError(original_message=str(e))
def _cast(self, method_name, **kwargs):
try:
rpc.cast(self.context, self._get_routing_key(),
{"method": method_name, "args": kwargs})
except Exception as e:
LOG.error(e)
raise exception.TaskManagerError(original_message=str(e))
def _get_routing_key(self):
"""Create the routing key for the taskmanager"""
return "taskmanager"

View File

@ -16,18 +16,37 @@
# under the License.
import logging
import weakref
from eventlet import greenthread
from reddwarf.common import excutils
from reddwarf.common import service
LOG = logging.getLogger(__name__)
class TaskManager(object):
class TaskManager(service.Manager):
"""Task manager impl"""
def __init__(self, *args, **kwargs):
self.tasks = weakref.WeakKeyDictionary()
super(TaskManager, self).__init__(*args, **kwargs)
LOG.info(_("TaskManager init %s %s") % (args, kwargs))
def periodic_tasks(self, raise_on_error=False):
LOG.info(_("Launching a periodic task"))
LOG.debug("No. of running tasks: %r" % len(self.tasks))
def test_method(self, context):
LOG.info(_("test_method called with context %s") % context)
def _wrapper(self, method, context, *args, **kwargs):
"""Maps the respective manager method with a task counter."""
# TODO(rnirmal): Just adding a basic counter. Will revist and
# re-implement when we have actual tasks.
self.tasks[greenthread.getcurrent()] = context
try:
func = getattr(self, method)
func(context, *args, **kwargs)
except Exception as e:
excutils.save_and_reraise_exception()
finally:
del self.tasks[greenthread.getcurrent()]

View File

@ -16,61 +16,14 @@
# under the License.
import logging
import routes
import webob.exc
from reddwarf.common import wsgi
from reddwarf import rpc
LOG = logging.getLogger('reddwarf.taskmanager.service')
class Controller(wsgi.Controller):
"""Base controller class."""
connected = False
#TODO(hub-cap):Make this not so nasty, this should not be here
def _create_connection(self, topic, host):
# Create a connection for rpc usage
if (self.connected):
return
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
topic)
# Share this same connection for these Consumers
self.conn.create_consumer(topic, self, fanout=False)
node_topic = '%s.%s' % (topic, host)
self.conn.create_consumer(node_topic, self, fanout=False)
self.conn.create_consumer(topic, self, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def index(self, req):
"""Gets a list of all tasks available"""
self._create_connection("foo", "ubuntu")
return "All Tasks -- Impl TBD"
def show(self, req, id):
"""Gets detailed information about an individual task"""
return "Single Task -- Impl TBD"
LOG = logging.getLogger(__name__)
class API(wsgi.Router):
"""API"""
def __init__(self):
mapper = routes.Mapper()
super(API, self).__init__(mapper)
self._instance_router(mapper)
def _instance_router(self, mapper):
resource = Controller().create_resource()
path = "/tasks"
mapper.resource("task", path, controller=resource)
class TaskService(object):
"""Task Manager interface."""
def app_factory(global_conf, **local_conf):
return API()
return TaskService()