Adding some basic service code from nova.

* Adding the proper taskmanager bin script
* Adding a taskmanager impl (needs to be a proper baseclass)
* Adding novas LoopingCall to utils
* Updating dummy rpc cast in the database service so it sends to the task manager
This commit is contained in:
Michael Basnight 2012-03-05 21:19:09 -06:00
parent f2d09827cd
commit 63befd034d
7 changed files with 294 additions and 29 deletions

View File

@ -16,15 +16,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import gettext
import optparse
import os
import sys
gettext.install('reddwarf', unicode=1)
# If ../reddwarf/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
@ -35,36 +36,20 @@ if os.path.exists(os.path.join(possible_topdir, 'reddwarf', '__init__.py')):
from reddwarf import version
from reddwarf.common import config
from reddwarf.common import wsgi
#from reddwarf.db import db_api
from reddwarf.common import service
def create_options(parser):
"""Sets up the CLI and config-file options
:param parser: The option parser
:returns: None
"""
parser.add_option('-p', '--port', dest="port", metavar="PORT",
type=int, default=8778,
help="Port the Reddwarf Work Manager listens on. "
"Default: %default")
if __name__ == '__main__':
parser = optparse.OptionParser(version="%%prog %s"
% version.version_string())
config.add_common_options(parser)
config.add_log_options(parser)
if __name__ == '__main__':
oparser = optparse.OptionParser(version="%%prog %s"
% version.version_string())
create_options(oparser)
(options, args) = config.parse_options(oparser)
(options, args) = config.parse_options(parser)
try:
conf, app = config.Config.load_paste_app('reddwarf-taskmanager', options, args)
#db_api.configure_db(conf)
server = wsgi.Server()
server.start(app, options.get('port', conf['bind_port']),
conf['bind_host'])
server.wait()
server = service.Service.create(binary='reddwarf-taskmanager')
service.serve(server)
service.wait()
except RuntimeError as error:
import traceback
print traceback.format_exc()

View File

@ -45,6 +45,9 @@ reddwarf_proxy_admin_pass = 3de4922d8b6ac5a1aad9
reddwarf_proxy_admin_tenant_name = admin
reddwarf_auth_url = http://0.0.0.0:5000/v2.0
# Manager impl for the taskmanager
taskmanager_manager=reddwarf.taskmanager.manager.TaskManager
# ============ notifer queue kombu connection options ========================
notifier_queue_hostname = localhost

180
reddwarf/common/service.py Normal file
View File

@ -0,0 +1,180 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""STOLEN FROM NOVA."""
import inspect
import os
import logging
import eventlet
import greenlet
from reddwarf.common import config
from reddwarf import rpc
from reddwarf.common import utils
from reddwarf import version
LOG = logging.getLogger(__name__)
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
def __init__(self):
"""Initialize the service launcher."""
self._services = []
@staticmethod
def run_server(server):
"""Start and wait for a server to finish."""
server.start()
server.wait()
def launch_server(self, server):
"""Load and start the given server."""
gt = eventlet.spawn(self.run_server, server)
self._services.append(gt)
def stop(self):
"""Stop all services which are currently running."""
for service in self._services:
service.kill()
def wait(self):
"""Waits until all services have been stopped, and then returns."""
for service in self._services:
try:
service.wait()
except greenlet.GreenletExit:
LOG.error("greenthread exited")
pass
class Service(object):
"""Generic code to start services and get them listening on rpc"""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, *args, **kwargs):
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
manager_class = utils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
self.manager.periodic_tasks(raise_on_error=raise_on_error)
def report_state(self):
pass
def __getattr__(self, key):
"""This method proxy's the calls to the manager implementation"""
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
def start(self):
vcs_string = version.version_string_with_vcs()
LOG.info(_('Starting %(topic)s node (version %(vcs_string)s)'),
{'topic': self.topic, 'vcs_string': vcs_string})
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, self, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, self, fanout=False)
self.conn.create_consumer(self.topic, self, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
if self.report_interval:
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
self.timers.append(pulse)
if self.periodic_interval:
periodic = utils.LoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval, now=False)
self.timers.append(periodic)
def wait(self):
for x in self.timers:
try:
x.wait()
except Exception:
pass
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None):
"""Instantiates class and passes back application object.
:param host: defaults to FLAGS.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to FLAGS.<topic>_manager
:param report_interval: defaults to FLAGS.report_interval
:param periodic_interval: defaults to FLAGS.periodic_interval
"""
if not host:
host = config.Config.get('host')
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary.rpartition('reddwarf-')[2]
if not manager:
manager = config.Config.get('%s_manager' % topic, None)
if not report_interval:
report_interval = config.Config.get('report_interval', 10)
if not periodic_interval:
periodic_interval = config.Config.get('periodic_interval', 60)
service_obj = cls(host, binary, topic, manager,
report_interval, periodic_interval)
return service_obj
_launcher = None
def serve(*servers):
global _launcher
if not _launcher:
_launcher = Launcher()
for server in servers:
_launcher.launch_server(server)
def wait():
try:
_launcher.wait()
except KeyboardInterrupt:
_launcher.stop()
rpc.cleanup()

View File

@ -21,12 +21,18 @@ import inspect
import re
import uuid
from eventlet import event
from eventlet import greenthread
from eventlet import semaphore
from eventlet.green import subprocess
from reddwarf.openstack.common import utils as openstack_utils
import_class = openstack_utils.import_class
import_object = openstack_utils.import_object
bool_from_string = openstack_utils.bool_from_string
execute = openstack_utils.execute
isotime = openstack_utils.isotime
def stringify_keys(dictionary):
@ -110,3 +116,62 @@ class MethodInspector(object):
required = ["{0}=<{0}>".format(arg) for arg in self.required_args]
args_str = ' '.join(required + optionals)
return "%s %s" % (self._func.__name__, args_str)
class LoopingCallDone(Exception):
"""Exception to break out and stop a LoopingCall.
The poll-function passed to LoopingCall can raise this exception to
break out of the loop normally. This is somewhat analogous to
StopIteration.
An optional return-value can be included as the argument to the exception;
this return-value will be returned by LoopingCall.wait()
"""
def __init__(self, retvalue=True):
""":param retvalue: Value that LoopingCall.wait() should return."""
self.retvalue = retvalue
class LoopingCall(object):
"""Nabbed from nova."""
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
def start(self, interval, now=True):
self._running = True
done = event.Event()
def _inner():
if not now:
greenthread.sleep(interval)
try:
while self._running:
self.f(*self.args, **self.kw)
if not self._running:
break
greenthread.sleep(interval)
except LoopingCallDone, e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn(_inner)
return self.done
def stop(self):
self._running = False
def wait(self):
return self.done.wait()

View File

@ -53,7 +53,7 @@ class InstanceController(BaseController):
"""Return all instances."""
servers = models.Instances(req.headers["X-Auth-Token"]).data()
#TODO(hub-cap): Remove this, this is only for testing communication between services
rpc.cast(context.ReddwarfContext(), "foo.ubuntu", {"method":"ZOMG", "BARRRR":"ARGGGGG"})
rpc.cast(context.ReddwarfContext(), "taskmanager.None", {"method":"test_method", "BARRRR":"ARGGGGG"})
return wsgi.Result(views.InstancesView(servers).data(), 201)

View File

@ -226,8 +226,6 @@ class Publisher(object):
def send(self, msg):
"""Send a message"""
LOG.info("send %s" % self.producer)
LOG.info("%s %s %s" % (self.exchange_name,self.routing_key,self.kwargs))
self.producer.publish(msg)

View File

@ -0,0 +1,34 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
LOG = logging.getLogger(__name__)
class TaskManager(object):
"""Task manager impl"""
def __init__(self, *args, **kwargs):
LOG.info("TaskManager init %s %s"% (args, kwargs))
def periodic_tasks(self, raise_on_error=False):
LOG.info("Launching a periodic task")
def test_method(self, context):
LOG.info("test_method called with context %s" % context)