diff --git a/etc/worker.cfg b/etc/worker.cfg new file mode 100644 index 00000000..954573b5 --- /dev/null +++ b/etc/worker.cfg @@ -0,0 +1,53 @@ +######################################################################## +# A sample configuration file read by the Libra worker utility. +######################################################################## + +#----------------------------------------------------------------------- +# The [DEFAULT] section contains options common to the various Libra +# utilities (worker, mgm, etc). +#----------------------------------------------------------------------- +[DEFAULT] +# Options to enable more verbose output +#verbose = false +#debug = false + +# Daemon process options +#daemon = true +#user = libra +#group = libra + +# Other logging options +#syslog = false +#syslog_socket = /dev/log +#syslog_faciltiy = local7 +#logstash = HOST:PORT + + +#----------------------------------------------------------------------- +# Options for utilities that are Gearman workers or clients. +#----------------------------------------------------------------------- +[gearman] +#servers = localhost:4730, HOST:PORT +#keepalive = false +#keepcnt = COUNT +#keepidle = SECONDS +#keepintvl = SECONDS +#poll = 1 +#reconnect_sleep = 60 +#ssl_ca = /path/to/ssl_ca +#ssl_cert = /path/to/ssl_cert +#ssl_key = /path/to/ssl_key + + +#----------------------------------------------------------------------- +# [worker] and [worker:*] sections are specific to the Libra worker. +#----------------------------------------------------------------------- +[worker] +#driver = haproxy +#pid = /var/run/libra/libra_worker.pid +#logfile = /var/log/libra/libra_worker.log + +# HAProxy driver options for the worker +[worker:haproxy] +#service = ubuntu +#logfile = /var/log/haproxy.log diff --git a/libra/common/options.py b/libra/common/options.py index f50c809e..b362f612 100644 --- a/libra/common/options.py +++ b/libra/common/options.py @@ -20,10 +20,91 @@ import os.path import sys import ConfigParser +from oslo.config import cfg + from libra import __version__, __release__ from logging_handler import CompressedTimedRotatingFileHandler from logging_handler import NewlineFormatter + +CONF = cfg.CONF + +common_opts = [ + cfg.BoolOpt('syslog', + default=False, + help='Use syslog for logging output'), + cfg.StrOpt('syslog_socket', + default='/dev/log', + help='Socket to use for syslog connection'), + cfg.StrOpt('syslog_facility', + default='local7', + help='Syslog logging facility'), + cfg.StrOpt('logstash', + metavar="HOST:PORT", + help='Send logs to logstash at "host:port"'), + cfg.StrOpt('group', + help='Group to use for daemon mode'), + cfg.StrOpt('user', + help='User to use for daemon mode'), +] + +common_cli_opts = [ + cfg.BoolOpt('daemon', + default=True, + help='Run as a daemon'), + cfg.BoolOpt('debug', + short='d', + default=False, + help='Turn on debug output'), + cfg.BoolOpt('verbose', + short='v', + default=False, + help='Turn on verbose output'), +] + +gearman_opts = [ + cfg.BoolOpt('keepalive', + default=False, + help='Enable TCP KEEPALIVE pings'), + cfg.IntOpt('keepcnt', + metavar='COUNT', + help='Max KEEPALIVE probes to send before killing connection'), + cfg.IntOpt('keepidle', + metavar='SECONDS', + help='Seconds of idle time before sending KEEPALIVE probes'), + cfg.IntOpt('keepintvl', + metavar='SECONDS', + help='Seconds between TCP KEEPALIVE probes'), + cfg.IntOpt('poll', + default=1, + metavar='SECONDS', + help='Gearman worker polling timeout'), + cfg.IntOpt('reconnect_sleep', + default=60, + metavar='SECONDS', + help='Seconds to sleep between job server reconnects'), + cfg.ListOpt('servers', + default=['localhost:4730'], + metavar='HOST:PORT,...', + help='List of Gearman job servers'), + cfg.StrOpt('ssl_ca', + metavar='FILE', + help='Gearman SSL certificate authority'), + cfg.StrOpt('ssl_cert', + metavar='FILE', + help='Gearman SSL certificate'), + cfg.StrOpt('ssl_key', + metavar='FILE', + help='Gearman SSL key'), +] + + +def add_common_opts(): + CONF.register_opts(common_opts) + CONF.register_opts(gearman_opts, group='gearman') + CONF.register_cli_opts(common_cli_opts) + + """ Common options parser. @@ -190,6 +271,22 @@ class Options(object): return args +def libra_logging(name, section): + """ + Temporary conversion function for utilities using oslo.config. + """ + class args(object): + debug = CONF['debug'] + verbose = CONF['verbose'] + logfile = CONF[section]['logfile'] + nodaemon = not CONF['daemon'] + syslog = CONF['syslog'] + syslog_socket = CONF['syslog_socket'] + syslog_facility = CONF['syslog_facility'] + logstash = CONF['logstash'] + return setup_logging(name, args) + + def setup_logging(name, args): """ Shared routine for setting up logging. Depends on some common options diff --git a/libra/worker/__init__.py b/libra/worker/__init__.py index 582348cb..433048af 100644 --- a/libra/worker/__init__.py +++ b/libra/worker/__init__.py @@ -11,3 +11,27 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +from oslo.config import cfg +from libra.worker.drivers.base import known_drivers + + +worker_group = cfg.OptGroup('worker', 'Libra Worker options') + +cfg.CONF.register_group(worker_group) + +cfg.CONF.register_opts( + [ + cfg.StrOpt('driver', + default='haproxy', + choices=known_drivers.keys(), + help='Type of device to use'), + cfg.StrOpt('logfile', + default='/var/log/libra/libra_worker.log', + help='Log file'), + cfg.StrOpt('pid', + default='/var/run/libra/libra_worker.pid', + help='PID file'), + ], + group=worker_group +) diff --git a/libra/worker/drivers/haproxy/__init__.py b/libra/worker/drivers/haproxy/__init__.py index 582348cb..c69799f0 100644 --- a/libra/worker/drivers/haproxy/__init__.py +++ b/libra/worker/drivers/haproxy/__init__.py @@ -11,3 +11,21 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +from oslo.config import cfg +from libra.worker.drivers.haproxy.services_base import haproxy_services + +haproxy_group = cfg.OptGroup('worker:haproxy', 'Worker HAProxy options') + +cfg.CONF.register_opts( + [ + cfg.StrOpt('service', + choices=haproxy_services.keys(), + default='ubuntu', + help='OS services to use with HAProxy driver'), + cfg.StrOpt('logfile', + default='/var/log/haproxy.log', + help='Location of HAProxy logfile'), + ], + group=haproxy_group +) diff --git a/libra/worker/main.py b/libra/worker/main.py index f0f108fa..78f2ba3f 100644 --- a/libra/worker/main.py +++ b/libra/worker/main.py @@ -21,8 +21,9 @@ import pwd import time import threading +from libra import __version__ from libra.openstack.common import importutils -from libra.common.options import Options, setup_logging +from libra.common.options import libra_logging, add_common_opts, CONF from libra.worker.drivers.base import known_drivers from libra.worker.drivers.haproxy.services_base import haproxy_services from libra.worker.worker import config_thread @@ -34,7 +35,7 @@ class EventServer(object): non-daemon mode. """ - def main(self, args, tasks): + def main(self, tasks): """ Main method of the server. @@ -43,12 +44,14 @@ class EventServer(object): that function's arguments. """ thread_list = [] - logger = setup_logging('libra_worker', args) + logger = libra_logging('libra_worker', 'worker') - logger.info("Selected driver: %s" % args.driver) - if args.driver == 'haproxy': - logger.info("Selected HAProxy service: %s" % args.haproxy_service) - logger.info("Job server list: %s" % args.server) + driver = CONF['worker']['driver'] + logger.info("Selected driver: %s" % driver) + if driver == 'haproxy': + logger.info("Selected HAProxy service: %s" % + CONF['worker:haproxy']['service']) + logger.info("Job server list: %s" % CONF['gearman']['servers']) for task, task_args in tasks: task_args = (logger,) + task_args # Make the logger the first arg @@ -70,101 +73,32 @@ class EventServer(object): def main(): """ Main Python entry point for the worker utility. """ - options = Options('worker', 'Worker Daemon') - options.parser.add_argument( - '--driver', dest='driver', - choices=known_drivers.keys(), default='haproxy', - help='type of device to use' - ) - options.parser.add_argument( - '--gearman_keepalive', action="store_true", - help='use KEEPALIVE to Gearman server' - ) - options.parser.add_argument( - '--gearman_keepcnt', type=int, metavar='COUNT', - help='max keepalive probes to send before killing connection' - ) - options.parser.add_argument( - '--gearman_keepidle', type=int, metavar='SECONDS', - help='seconds of idle time before sending keepalive probes' - ) - options.parser.add_argument( - '--gearman_keepintvl', type=int, metavar='SECONDS', - help='seconds between TCP keepalive probes' - ) - options.parser.add_argument( - '--gearman_ssl_ca', dest='gearman_ssl_ca', metavar='FILE', - help='Gearman SSL certificate authority' - ) - options.parser.add_argument( - '--gearman_ssl_cert', dest='gearman_ssl_cert', metavar='FILE', - help='Gearman SSL certificate' - ) - options.parser.add_argument( - '--gearman_ssl_key', dest='gearman_ssl_key', metavar='FILE', - help='Gearman SSL key' - ) - options.parser.add_argument( - '--haproxy_service', - choices=haproxy_services.keys(), default='ubuntu', - help='os services to use with HAProxy driver (when used)' - ) - options.parser.add_argument( - '--haproxy_logfile', type=str, - default='/var/log/haproxy.log', - help="Where to store the HAProxy logfile" - ) - options.parser.add_argument( - '-s', '--reconnect_sleep', type=int, metavar='TIME', - default=60, help='seconds to sleep between job server reconnects' - ) - options.parser.add_argument( - '--server', dest='server', action='append', metavar='HOST:PORT', - default=[], - help='add a Gearman job server to the connection list' - ) - options.parser.add_argument( - '--stats_poll', type=int, metavar='TIME', - default=300, help='statistics polling interval in seconds' - ) - options.parser.add_argument( - '--gearman_poll', type=int, metavar='TIME', - default=1, help='Gearman worker polling timeout' - ) - - args = options.run() - - if not args.server: - # NOTE(shrews): Can't set a default in argparse method because the - # value is appended to the specified default. - args.server.append('localhost:4730') - elif not isinstance(args.server, list): - # NOTE(shrews): The Options object cannot intelligently handle - # creating a list from an option that may have multiple values. - # We convert it to the expected type here. - svr_list = args.server.split() - args.server = svr_list + add_common_opts() + CONF(project='libra', version=__version__) # Import the device driver we are going to use. This will be sent # along to the Gearman task that will use it to communicate with # the device. - driver_class = importutils.import_class(known_drivers[args.driver]) + selected_driver = CONF['worker']['driver'] + driver_class = importutils.import_class(known_drivers[selected_driver]) - if args.driver == 'haproxy': - if args.user: - user = args.user + if selected_driver == 'haproxy': + if CONF['user']: + user = CONF['user'] else: user = getpass.getuser() - if args.group: - group = args.group + if CONF['group']: + group = CONF['group'] else: group = None - driver = driver_class(haproxy_services[args.haproxy_service], + haproxy_service = CONF['worker:haproxy']['service'] + haproxy_logfile = CONF['worker:haproxy']['logfile'] + driver = driver_class(haproxy_services[haproxy_service], user, group, - haproxy_logfile=args.haproxy_logfile) + haproxy_logfile=haproxy_logfile) else: driver = driver_class() @@ -172,13 +106,13 @@ def main(): # Tasks to execute in parallel task_list = [ - (config_thread, (driver, args)) + (config_thread, (driver,)) ] - if args.nodaemon: - server.main(args, task_list) + if not CONF['daemon']: + server.main(task_list) else: - pidfile = daemon.pidfile.TimeoutPIDLockFile(args.pid, 10) + pidfile = daemon.pidfile.TimeoutPIDLockFile(CONF['worker']['pid'], 10) if daemon.runner.is_pidfile_stale(pidfile): pidfile.break_lock() context = daemon.DaemonContext( @@ -186,12 +120,12 @@ def main(): umask=0o022, pidfile=pidfile ) - if args.user: - context.uid = pwd.getpwnam(args.user).pw_uid - if args.group: - context.gid = grp.getgrnam(args.group).gr_gid + if CONF['user']: + context.uid = pwd.getpwnam(CONF['user']).pw_uid + if CONF['group']: + context.gid = grp.getgrnam(CONF['group']).gr_gid context.open() - server.main(args, task_list) + server.main(task_list) return 0 diff --git a/libra/worker/worker.py b/libra/worker/worker.py index 97e36488..37426756 100644 --- a/libra/worker/worker.py +++ b/libra/worker/worker.py @@ -17,6 +17,8 @@ import json import socket import time +from oslo.config import cfg + from libra.common.json_gearman import JSONGearmanWorker from libra.worker.controller import LBaaSController @@ -57,24 +59,24 @@ def handler(worker, job): return copy -def config_thread(logger, driver, args): +def config_thread(logger, driver): """ Worker thread function. """ # Hostname should be a unique value, like UUID hostname = socket.gethostname() logger.info("[worker] Registering task %s" % hostname) server_list = [] - for host_port in args.server: + for host_port in cfg.CONF['gearman']['servers']: host, port = host_port.split(':') server_list.append({'host': host, 'port': int(port), - 'keyfile': args.gearman_ssl_key, - 'certfile': args.gearman_ssl_cert, - 'ca_certs': args.gearman_ssl_ca, - 'keepalive': args.gearman_keepalive, - 'keepcnt': args.gearman_keepcnt, - 'keepidle': args.gearman_keepidle, - 'keepintvl': args.gearman_keepintvl}) + 'keyfile': cfg.CONF['gearman']['ssl_key'], + 'certfile': cfg.CONF['gearman']['ssl_cert'], + 'ca_certs': cfg.CONF['gearman']['ssl_ca'], + 'keepalive': cfg.CONF['gearman']['keepalive'], + 'keepcnt': cfg.CONF['gearman']['keepcnt'], + 'keepidle': cfg.CONF['gearman']['keepidle'], + 'keepintvl': cfg.CONF['gearman']['keepintvl']}) worker = CustomJSONGearmanWorker(server_list) worker.set_client_id(hostname) @@ -86,12 +88,12 @@ def config_thread(logger, driver, args): while (retry): try: - worker.work(args.gearman_poll) + worker.work(cfg.CONF['gearman']['poll']) except KeyboardInterrupt: retry = False except gearman.errors.ServerUnavailable: logger.error("[worker] Job server(s) went away. Reconnecting.") - time.sleep(args.reconnect_sleep) + time.sleep(cfg.CONF['gearman']['reconnect_sleep']) retry = True except Exception as e: logger.critical("[worker] Exception: %s, %s" % (e.__class__, e)) diff --git a/requirements.txt b/requirements.txt index 2f54bdba..755a6474 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ pbr>=0.5.21,<1.0 eventlet gearman>=2.0.2 +oslo.config python-daemon>=1.6 python-logstash python_novaclient>=2.14.1,<2.14.2