Spice up WBE banner and add simple worker __main__ entrypoint

Change-Id: Ifdf275a623352aa3e42fbf0aa9a4394b64b54337
This commit is contained in:
Joshua Harlow
2016-02-05 17:45:34 -08:00
parent 61efc31e96
commit 3e3efc562b
3 changed files with 161 additions and 71 deletions

View File

@@ -13,6 +13,11 @@ Async
.. automodule:: taskflow.utils.async_utils
Banner
~~~~~~
.. automodule:: taskflow.utils.banner
Deprecation
~~~~~~~~~~~

View File

@@ -17,7 +17,6 @@
import os
import platform
import socket
import string
import sys
import futurist
@@ -27,9 +26,9 @@ from taskflow.engines.worker_based import endpoint
from taskflow.engines.worker_based import server
from taskflow import logging
from taskflow import task as t_task
from taskflow.utils import banner
from taskflow.utils import misc
from taskflow.utils import threading_utils as tu
from taskflow import version
LOG = logging.getLogger(__name__)
@@ -58,39 +57,6 @@ class Worker(object):
(see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`)
"""
BANNER_TEMPLATE = string.Template("""
TaskFlow v${version} WBE worker.
Connection details:
Driver = $transport_driver
Exchange = $exchange
Topic = $topic
Transport = $transport_type
Uri = $connection_uri
Powered by:
Executor = $executor_type
Thread count = $executor_thread_count
Supported endpoints:$endpoints
System details:
Hostname = $hostname
Pid = $pid
Platform = $platform
Python = $python
Thread id = $thread_id
""".strip())
# See: http://bugs.python.org/issue13173 for why we are doing this...
BANNER_TEMPLATE.defaults = {
# These values may not be possible to fetch/known, default
# to ??? to represent that they are unknown...
'pid': '???',
'hostname': '???',
'executor_thread_count': '???',
'endpoints': ' %s' % ([]),
# These are static (avoid refetching...)
'version': version.version_string(),
'python': sys.version.split("\n", 1)[0].strip(),
}
def __init__(self, exchange, topic, tasks,
executor=None, threads_count=None, url=None,
transport=None, transport_options=None,
@@ -116,12 +82,9 @@ System details:
derived_tasks = misc.find_subclasses(tasks, t_task.BaseTask)
return [endpoint.Endpoint(task) for task in derived_tasks]
def _generate_banner(self):
"""Generates a banner that can be useful to display before running."""
try:
tpl_params = dict(self.BANNER_TEMPLATE.defaults)
except AttributeError:
tpl_params = {}
@misc.cachedproperty
def banner(self):
"""A banner that can be useful to display before running."""
connection_details = self._server.connection_details
transport = connection_details.transport
if transport.driver_version:
@@ -129,47 +92,45 @@ System details:
transport.driver_version)
else:
transport_driver = transport.driver_name
tpl_params['transport_driver'] = transport_driver
tpl_params['exchange'] = self._exchange
tpl_params['topic'] = self._topic
tpl_params['transport_type'] = transport.driver_type
tpl_params['connection_uri'] = connection_details.uri
tpl_params['executor_type'] = reflection.get_class_name(self._executor)
threads_count = getattr(self._executor, 'max_workers', None)
if threads_count is not None:
tpl_params['executor_thread_count'] = threads_count
if self._endpoints:
pretty_endpoints = []
for ep in self._endpoints:
pretty_endpoints.append(" - %s" % ep)
# This ensures there is a newline before the list...
tpl_params['endpoints'] = "\n" + "\n".join(pretty_endpoints)
try:
tpl_params['hostname'] = socket.getfqdn()
hostname = socket.getfqdn()
except socket.error:
pass
hostname = "???"
try:
tpl_params['pid'] = os.getpid()
pid = os.getpid()
except OSError:
pass
tpl_params['platform'] = platform.platform()
tpl_params['thread_id'] = tu.get_ident()
banner = self.BANNER_TEMPLATE.substitute(**tpl_params)
# NOTE(harlowja): this is needed since the template in this file
# will always have newlines that end with '\n' (even on different
# platforms due to the way this source file is encoded) so we have
# to do this little dance to make it platform neutral...
return misc.fix_newlines(banner)
pid = "???"
chapters = {
'Connection details': {
'Driver': transport_driver,
'Exchange': self._exchange,
'Topic': self._topic,
'Transport': transport.driver_type,
'Uri': connection_details.uri,
},
'Powered by': {
'Executor': reflection.get_class_name(self._executor),
'Thread count': getattr(self._executor, 'max_workers', "???"),
},
'Supported endpoints': [str(ep) for ep in self._endpoints],
'System details': {
'Hostname': hostname,
'Pid': pid,
'Platform': platform.platform(),
'Python': sys.version.split("\n", 1)[0].strip(),
'Thread id': tu.get_ident(),
},
}
return banner.make_banner('WBE worker', chapters)
def run(self, display_banner=True, banner_writer=None):
"""Runs the worker."""
if display_banner:
banner = self._generate_banner()
if banner_writer is None:
for line in banner.splitlines():
for line in self.banner.splitlines():
LOG.info(line)
else:
banner_writer(banner)
banner_writer(self.banner)
self._server.start()
def wait(self):
@@ -181,3 +142,20 @@ System details:
self._server.stop()
if self._owns_executor:
self._executor.shutdown()
if __name__ == '__main__':
import argparse
import logging as log
parser = argparse.ArgumentParser()
parser.add_argument("--exchange", required=True)
parser.add_argument("--connection-url", required=True)
parser.add_argument("--topic", required=True)
parser.add_argument("--task", action='append',
metavar="TASK", default=[])
parser.add_argument("-v", "--verbose", action='store_true')
args = parser.parse_args()
if args.verbose:
log.basicConfig(level=logging.DEBUG, format="")
w = Worker(args.exchange, args.topic, args.task, url=args.connection_url)
w.run()

107
taskflow/utils/banner.py Normal file
View File

@@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2016 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import string
import six
from taskflow.utils import misc
from taskflow import version
BANNER_HEADER = string.Template("""
___ __
| |_
|ask |low v$version
""".strip())
BANNER_HEADER = BANNER_HEADER.substitute(version=version.version_string())
def make_banner(what, chapters):
"""Makes a taskflow banner string.
For example::
>>> from taskflow.utils import banner
>>> chapters = {
'Connection details': {
'Topic': 'hello',
},
'Powered by': {
'Executor': 'parallel',
},
}
>>> print(banner.make_banner('Worker', chapters))
This will output::
___ __
| |_
|ask |low v1.26.1
*Worker*
Connection details:
Topic => hello
Powered by:
Executor => parallel
"""
buf = misc.StringIO()
buf.write_nl(BANNER_HEADER)
if chapters:
buf.write_nl("*%s*" % what)
chapter_names = sorted(six.iterkeys(chapters))
else:
buf.write("*%s*" % what)
chapter_names = []
for i, chapter_name in enumerate(chapter_names):
chapter_contents = chapters[chapter_name]
if chapter_contents:
buf.write_nl("%s:" % (chapter_name))
else:
buf.write("%s:" % (chapter_name))
if isinstance(chapter_contents, dict):
section_names = sorted(six.iterkeys(chapter_contents))
for j, section_name in enumerate(section_names):
if j + 1 < len(section_names):
buf.write_nl(" %s => %s"
% (section_name,
chapter_contents[section_name]))
else:
buf.write(" %s => %s" % (section_name,
chapter_contents[section_name]))
elif isinstance(chapter_contents, (list, tuple, set)):
if isinstance(chapter_contents, set):
sections = sorted(chapter_contents)
else:
sections = chapter_contents
for j, section in enumerate(sections):
if j + 1 < len(sections):
buf.write_nl(" %s. %s" % (j + 1, section))
else:
buf.write(" %s. %s" % (j + 1, section))
else:
raise TypeError("Unsupported chapter contents"
" type: one of dict, list, tuple, set expected"
" and not %s" % type(chapter_contents).__name__)
if i + 1 < len(chapter_names):
buf.write_nl("")
# NOTE(harlowja): this is needed since the template in this file
# will always have newlines that end with '\n' (even on different
# platforms due to the way this source file is encoded) so we have
# to do this little dance to make it platform neutral...
if os.linesep != "\n":
return misc.fix_newlines(buf.getvalue())
return buf.getvalue()