From 63befd034db15daed4df2141bd0a5e5791eafb53 Mon Sep 17 00:00:00 2001 From: Michael Basnight Date: Mon, 5 Mar 2012 21:19:09 -0600 Subject: [PATCH] Adding some basic service code from nova. * Adding the proper taskmanager bin script * Adding a taskmanager impl (needs to be a proper baseclass) * Adding novas LoopingCall to utils * Updating dummy rpc cast in the database service so it sends to the task manager --- bin/reddwarf-taskmanager | 37 ++-- etc/reddwarf/reddwarf-taskmanager.conf.sample | 3 + reddwarf/common/service.py | 180 ++++++++++++++++++ reddwarf/common/utils.py | 65 +++++++ reddwarf/database/service.py | 2 +- reddwarf/rpc/impl_kombu.py | 2 - reddwarf/taskmanager/manager.py | 34 ++++ 7 files changed, 294 insertions(+), 29 deletions(-) create mode 100644 reddwarf/common/service.py create mode 100644 reddwarf/taskmanager/manager.py diff --git a/bin/reddwarf-taskmanager b/bin/reddwarf-taskmanager index 2699f1465a..645ea8259d 100755 --- a/bin/reddwarf-taskmanager +++ b/bin/reddwarf-taskmanager @@ -16,15 +16,16 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet +eventlet.monkey_patch() + import gettext import optparse import os import sys - gettext.install('reddwarf', unicode=1) - # If ../reddwarf/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), @@ -35,36 +36,20 @@ if os.path.exists(os.path.join(possible_topdir, 'reddwarf', '__init__.py')): from reddwarf import version from reddwarf.common import config -from reddwarf.common import wsgi -#from reddwarf.db import db_api +from reddwarf.common import service -def create_options(parser): - """Sets up the CLI and config-file options - - :param parser: The option parser - :returns: None - - """ - parser.add_option('-p', '--port', dest="port", metavar="PORT", - type=int, default=8778, - help="Port the Reddwarf Work Manager listens on. " - "Default: %default") +if __name__ == '__main__': + parser = optparse.OptionParser(version="%%prog %s" + % version.version_string()) config.add_common_options(parser) config.add_log_options(parser) - -if __name__ == '__main__': - oparser = optparse.OptionParser(version="%%prog %s" - % version.version_string()) - create_options(oparser) - (options, args) = config.parse_options(oparser) + (options, args) = config.parse_options(parser) try: conf, app = config.Config.load_paste_app('reddwarf-taskmanager', options, args) - #db_api.configure_db(conf) - server = wsgi.Server() - server.start(app, options.get('port', conf['bind_port']), - conf['bind_host']) - server.wait() + server = service.Service.create(binary='reddwarf-taskmanager') + service.serve(server) + service.wait() except RuntimeError as error: import traceback print traceback.format_exc() diff --git a/etc/reddwarf/reddwarf-taskmanager.conf.sample b/etc/reddwarf/reddwarf-taskmanager.conf.sample index 1566521d68..0cc6442959 100644 --- a/etc/reddwarf/reddwarf-taskmanager.conf.sample +++ b/etc/reddwarf/reddwarf-taskmanager.conf.sample @@ -45,6 +45,9 @@ reddwarf_proxy_admin_pass = 3de4922d8b6ac5a1aad9 reddwarf_proxy_admin_tenant_name = admin reddwarf_auth_url = http://0.0.0.0:5000/v2.0 +# Manager impl for the taskmanager +taskmanager_manager=reddwarf.taskmanager.manager.TaskManager + # ============ notifer queue kombu connection options ======================== notifier_queue_hostname = localhost diff --git a/reddwarf/common/service.py b/reddwarf/common/service.py new file mode 100644 index 0000000000..9c4d6061d0 --- /dev/null +++ b/reddwarf/common/service.py @@ -0,0 +1,180 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""STOLEN FROM NOVA.""" + +import inspect +import os +import logging + +import eventlet +import greenlet + +from reddwarf.common import config +from reddwarf import rpc +from reddwarf.common import utils +from reddwarf import version + +LOG = logging.getLogger(__name__) + +class Launcher(object): + """Launch one or more services and wait for them to complete.""" + + def __init__(self): + """Initialize the service launcher.""" + self._services = [] + + @staticmethod + def run_server(server): + """Start and wait for a server to finish.""" + + server.start() + server.wait() + + def launch_server(self, server): + """Load and start the given server.""" + gt = eventlet.spawn(self.run_server, server) + self._services.append(gt) + + def stop(self): + """Stop all services which are currently running.""" + for service in self._services: + service.kill() + + def wait(self): + """Waits until all services have been stopped, and then returns.""" + for service in self._services: + try: + service.wait() + except greenlet.GreenletExit: + LOG.error("greenthread exited") + pass + + + +class Service(object): + """Generic code to start services and get them listening on rpc""" + + def __init__(self, host, binary, topic, manager, report_interval=None, + periodic_interval=None, *args, **kwargs): + self.host = host + self.binary = binary + self.topic = topic + self.manager_class_name = manager + manager_class = utils.import_class(self.manager_class_name) + self.manager = manager_class(host=self.host, *args, **kwargs) + self.report_interval = report_interval + self.periodic_interval = periodic_interval + super(Service, self).__init__(*args, **kwargs) + self.saved_args, self.saved_kwargs = args, kwargs + self.timers = [] + + def periodic_tasks(self, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + self.manager.periodic_tasks(raise_on_error=raise_on_error) + + def report_state(self): + pass + + def __getattr__(self, key): + """This method proxy's the calls to the manager implementation""" + manager = self.__dict__.get('manager', None) + return getattr(manager, key) + + + def start(self): + vcs_string = version.version_string_with_vcs() + LOG.info(_('Starting %(topic)s node (version %(vcs_string)s)'), + {'topic': self.topic, 'vcs_string': vcs_string}) + + self.conn = rpc.create_connection(new=True) + LOG.debug(_("Creating Consumer connection for Service %s") % + self.topic) + + # Share this same connection for these Consumers + self.conn.create_consumer(self.topic, self, fanout=False) + + node_topic = '%s.%s' % (self.topic, self.host) + self.conn.create_consumer(node_topic, self, fanout=False) + + self.conn.create_consumer(self.topic, self, fanout=True) + + # Consume from all consumers in a thread + self.conn.consume_in_thread() + if self.report_interval: + pulse = utils.LoopingCall(self.report_state) + pulse.start(interval=self.report_interval, now=False) + self.timers.append(pulse) + + if self.periodic_interval: + periodic = utils.LoopingCall(self.periodic_tasks) + periodic.start(interval=self.periodic_interval, now=False) + self.timers.append(periodic) + + def wait(self): + for x in self.timers: + try: + x.wait() + except Exception: + pass + + @classmethod + def create(cls, host=None, binary=None, topic=None, manager=None, + report_interval=None, periodic_interval=None): + """Instantiates class and passes back application object. + + :param host: defaults to FLAGS.host + :param binary: defaults to basename of executable + :param topic: defaults to bin_name - 'nova-' part + :param manager: defaults to FLAGS._manager + :param report_interval: defaults to FLAGS.report_interval + :param periodic_interval: defaults to FLAGS.periodic_interval + + """ + if not host: + host = config.Config.get('host') + if not binary: + binary = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = binary.rpartition('reddwarf-')[2] + if not manager: + manager = config.Config.get('%s_manager' % topic, None) + if not report_interval: + report_interval = config.Config.get('report_interval', 10) + if not periodic_interval: + periodic_interval = config.Config.get('periodic_interval', 60) + service_obj = cls(host, binary, topic, manager, + report_interval, periodic_interval) + + return service_obj + + +_launcher = None + +def serve(*servers): + global _launcher + if not _launcher: + _launcher = Launcher() + for server in servers: + _launcher.launch_server(server) + +def wait(): + try: + _launcher.wait() + except KeyboardInterrupt: + _launcher.stop() + rpc.cleanup() diff --git a/reddwarf/common/utils.py b/reddwarf/common/utils.py index 31f84256ca..ccfc4456de 100644 --- a/reddwarf/common/utils.py +++ b/reddwarf/common/utils.py @@ -21,12 +21,18 @@ import inspect import re import uuid +from eventlet import event +from eventlet import greenthread +from eventlet import semaphore +from eventlet.green import subprocess + from reddwarf.openstack.common import utils as openstack_utils import_class = openstack_utils.import_class import_object = openstack_utils.import_object bool_from_string = openstack_utils.bool_from_string +execute = openstack_utils.execute isotime = openstack_utils.isotime def stringify_keys(dictionary): @@ -110,3 +116,62 @@ class MethodInspector(object): required = ["{0}=<{0}>".format(arg) for arg in self.required_args] args_str = ' '.join(required + optionals) return "%s %s" % (self._func.__name__, args_str) + +class LoopingCallDone(Exception): + """Exception to break out and stop a LoopingCall. + + The poll-function passed to LoopingCall can raise this exception to + break out of the loop normally. This is somewhat analogous to + StopIteration. + + An optional return-value can be included as the argument to the exception; + this return-value will be returned by LoopingCall.wait() + + """ + + def __init__(self, retvalue=True): + """:param retvalue: Value that LoopingCall.wait() should return.""" + self.retvalue = retvalue + + +class LoopingCall(object): + """Nabbed from nova.""" + def __init__(self, f=None, *args, **kw): + self.args = args + self.kw = kw + self.f = f + self._running = False + + def start(self, interval, now=True): + self._running = True + done = event.Event() + + def _inner(): + if not now: + greenthread.sleep(interval) + try: + while self._running: + self.f(*self.args, **self.kw) + if not self._running: + break + greenthread.sleep(interval) + except LoopingCallDone, e: + self.stop() + done.send(e.retvalue) + except Exception: + LOG.exception(_('in looping call')) + done.send_exception(*sys.exc_info()) + return + else: + done.send(True) + + self.done = done + + greenthread.spawn(_inner) + return self.done + + def stop(self): + self._running = False + + def wait(self): + return self.done.wait() diff --git a/reddwarf/database/service.py b/reddwarf/database/service.py index 252fa0f19e..198b5c7cac 100644 --- a/reddwarf/database/service.py +++ b/reddwarf/database/service.py @@ -53,7 +53,7 @@ class InstanceController(BaseController): """Return all instances.""" servers = models.Instances(req.headers["X-Auth-Token"]).data() #TODO(hub-cap): Remove this, this is only for testing communication between services - rpc.cast(context.ReddwarfContext(), "foo.ubuntu", {"method":"ZOMG", "BARRRR":"ARGGGGG"}) + rpc.cast(context.ReddwarfContext(), "taskmanager.None", {"method":"test_method", "BARRRR":"ARGGGGG"}) return wsgi.Result(views.InstancesView(servers).data(), 201) diff --git a/reddwarf/rpc/impl_kombu.py b/reddwarf/rpc/impl_kombu.py index f11e762680..58225ff090 100644 --- a/reddwarf/rpc/impl_kombu.py +++ b/reddwarf/rpc/impl_kombu.py @@ -226,8 +226,6 @@ class Publisher(object): def send(self, msg): """Send a message""" - LOG.info("send %s" % self.producer) - LOG.info("%s %s %s" % (self.exchange_name,self.routing_key,self.kwargs)) self.producer.publish(msg) diff --git a/reddwarf/taskmanager/manager.py b/reddwarf/taskmanager/manager.py new file mode 100644 index 0000000000..8d00853a9e --- /dev/null +++ b/reddwarf/taskmanager/manager.py @@ -0,0 +1,34 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +LOG = logging.getLogger(__name__) + +class TaskManager(object): + """Task manager impl""" + + def __init__(self, *args, **kwargs): + LOG.info("TaskManager init %s %s"% (args, kwargs)) + + def periodic_tasks(self, raise_on_error=False): + LOG.info("Launching a periodic task") + + def test_method(self, context): + LOG.info("test_method called with context %s" % context) + +