[WORKER] Migrate to oslo.config
Major configuration change. Custom Options class is no longer used in favor of the oslo.config style. This does not remove the currently used Options since the other utilities still need to be migrated. Change-Id: I65b50c46b9593ac14dc44c67823200f3108020d0
This commit is contained in:
53
etc/worker.cfg
Normal file
53
etc/worker.cfg
Normal file
@@ -0,0 +1,53 @@
|
||||
########################################################################
|
||||
# A sample configuration file read by the Libra worker utility.
|
||||
########################################################################
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
# The [DEFAULT] section contains options common to the various Libra
|
||||
# utilities (worker, mgm, etc).
|
||||
#-----------------------------------------------------------------------
|
||||
[DEFAULT]
|
||||
# Options to enable more verbose output
|
||||
#verbose = false
|
||||
#debug = false
|
||||
|
||||
# Daemon process options
|
||||
#daemon = true
|
||||
#user = libra
|
||||
#group = libra
|
||||
|
||||
# Other logging options
|
||||
#syslog = false
|
||||
#syslog_socket = /dev/log
|
||||
#syslog_faciltiy = local7
|
||||
#logstash = HOST:PORT
|
||||
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
# Options for utilities that are Gearman workers or clients.
|
||||
#-----------------------------------------------------------------------
|
||||
[gearman]
|
||||
#servers = localhost:4730, HOST:PORT
|
||||
#keepalive = false
|
||||
#keepcnt = COUNT
|
||||
#keepidle = SECONDS
|
||||
#keepintvl = SECONDS
|
||||
#poll = 1
|
||||
#reconnect_sleep = 60
|
||||
#ssl_ca = /path/to/ssl_ca
|
||||
#ssl_cert = /path/to/ssl_cert
|
||||
#ssl_key = /path/to/ssl_key
|
||||
|
||||
|
||||
#-----------------------------------------------------------------------
|
||||
# [worker] and [worker:*] sections are specific to the Libra worker.
|
||||
#-----------------------------------------------------------------------
|
||||
[worker]
|
||||
#driver = haproxy
|
||||
#pid = /var/run/libra/libra_worker.pid
|
||||
#logfile = /var/log/libra/libra_worker.log
|
||||
|
||||
# HAProxy driver options for the worker
|
||||
[worker:haproxy]
|
||||
#service = ubuntu
|
||||
#logfile = /var/log/haproxy.log
|
@@ -20,10 +20,91 @@ import os.path
|
||||
import sys
|
||||
import ConfigParser
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from libra import __version__, __release__
|
||||
from logging_handler import CompressedTimedRotatingFileHandler
|
||||
from logging_handler import NewlineFormatter
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
common_opts = [
|
||||
cfg.BoolOpt('syslog',
|
||||
default=False,
|
||||
help='Use syslog for logging output'),
|
||||
cfg.StrOpt('syslog_socket',
|
||||
default='/dev/log',
|
||||
help='Socket to use for syslog connection'),
|
||||
cfg.StrOpt('syslog_facility',
|
||||
default='local7',
|
||||
help='Syslog logging facility'),
|
||||
cfg.StrOpt('logstash',
|
||||
metavar="HOST:PORT",
|
||||
help='Send logs to logstash at "host:port"'),
|
||||
cfg.StrOpt('group',
|
||||
help='Group to use for daemon mode'),
|
||||
cfg.StrOpt('user',
|
||||
help='User to use for daemon mode'),
|
||||
]
|
||||
|
||||
common_cli_opts = [
|
||||
cfg.BoolOpt('daemon',
|
||||
default=True,
|
||||
help='Run as a daemon'),
|
||||
cfg.BoolOpt('debug',
|
||||
short='d',
|
||||
default=False,
|
||||
help='Turn on debug output'),
|
||||
cfg.BoolOpt('verbose',
|
||||
short='v',
|
||||
default=False,
|
||||
help='Turn on verbose output'),
|
||||
]
|
||||
|
||||
gearman_opts = [
|
||||
cfg.BoolOpt('keepalive',
|
||||
default=False,
|
||||
help='Enable TCP KEEPALIVE pings'),
|
||||
cfg.IntOpt('keepcnt',
|
||||
metavar='COUNT',
|
||||
help='Max KEEPALIVE probes to send before killing connection'),
|
||||
cfg.IntOpt('keepidle',
|
||||
metavar='SECONDS',
|
||||
help='Seconds of idle time before sending KEEPALIVE probes'),
|
||||
cfg.IntOpt('keepintvl',
|
||||
metavar='SECONDS',
|
||||
help='Seconds between TCP KEEPALIVE probes'),
|
||||
cfg.IntOpt('poll',
|
||||
default=1,
|
||||
metavar='SECONDS',
|
||||
help='Gearman worker polling timeout'),
|
||||
cfg.IntOpt('reconnect_sleep',
|
||||
default=60,
|
||||
metavar='SECONDS',
|
||||
help='Seconds to sleep between job server reconnects'),
|
||||
cfg.ListOpt('servers',
|
||||
default=['localhost:4730'],
|
||||
metavar='HOST:PORT,...',
|
||||
help='List of Gearman job servers'),
|
||||
cfg.StrOpt('ssl_ca',
|
||||
metavar='FILE',
|
||||
help='Gearman SSL certificate authority'),
|
||||
cfg.StrOpt('ssl_cert',
|
||||
metavar='FILE',
|
||||
help='Gearman SSL certificate'),
|
||||
cfg.StrOpt('ssl_key',
|
||||
metavar='FILE',
|
||||
help='Gearman SSL key'),
|
||||
]
|
||||
|
||||
|
||||
def add_common_opts():
|
||||
CONF.register_opts(common_opts)
|
||||
CONF.register_opts(gearman_opts, group='gearman')
|
||||
CONF.register_cli_opts(common_cli_opts)
|
||||
|
||||
|
||||
"""
|
||||
Common options parser.
|
||||
|
||||
@@ -190,6 +271,22 @@ class Options(object):
|
||||
return args
|
||||
|
||||
|
||||
def libra_logging(name, section):
|
||||
"""
|
||||
Temporary conversion function for utilities using oslo.config.
|
||||
"""
|
||||
class args(object):
|
||||
debug = CONF['debug']
|
||||
verbose = CONF['verbose']
|
||||
logfile = CONF[section]['logfile']
|
||||
nodaemon = not CONF['daemon']
|
||||
syslog = CONF['syslog']
|
||||
syslog_socket = CONF['syslog_socket']
|
||||
syslog_facility = CONF['syslog_facility']
|
||||
logstash = CONF['logstash']
|
||||
return setup_logging(name, args)
|
||||
|
||||
|
||||
def setup_logging(name, args):
|
||||
"""
|
||||
Shared routine for setting up logging. Depends on some common options
|
||||
|
@@ -11,3 +11,27 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
from libra.worker.drivers.base import known_drivers
|
||||
|
||||
|
||||
worker_group = cfg.OptGroup('worker', 'Libra Worker options')
|
||||
|
||||
cfg.CONF.register_group(worker_group)
|
||||
|
||||
cfg.CONF.register_opts(
|
||||
[
|
||||
cfg.StrOpt('driver',
|
||||
default='haproxy',
|
||||
choices=known_drivers.keys(),
|
||||
help='Type of device to use'),
|
||||
cfg.StrOpt('logfile',
|
||||
default='/var/log/libra/libra_worker.log',
|
||||
help='Log file'),
|
||||
cfg.StrOpt('pid',
|
||||
default='/var/run/libra/libra_worker.pid',
|
||||
help='PID file'),
|
||||
],
|
||||
group=worker_group
|
||||
)
|
||||
|
@@ -11,3 +11,21 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
from libra.worker.drivers.haproxy.services_base import haproxy_services
|
||||
|
||||
haproxy_group = cfg.OptGroup('worker:haproxy', 'Worker HAProxy options')
|
||||
|
||||
cfg.CONF.register_opts(
|
||||
[
|
||||
cfg.StrOpt('service',
|
||||
choices=haproxy_services.keys(),
|
||||
default='ubuntu',
|
||||
help='OS services to use with HAProxy driver'),
|
||||
cfg.StrOpt('logfile',
|
||||
default='/var/log/haproxy.log',
|
||||
help='Location of HAProxy logfile'),
|
||||
],
|
||||
group=haproxy_group
|
||||
)
|
||||
|
@@ -21,8 +21,9 @@ import pwd
|
||||
import time
|
||||
import threading
|
||||
|
||||
from libra import __version__
|
||||
from libra.openstack.common import importutils
|
||||
from libra.common.options import Options, setup_logging
|
||||
from libra.common.options import libra_logging, add_common_opts, CONF
|
||||
from libra.worker.drivers.base import known_drivers
|
||||
from libra.worker.drivers.haproxy.services_base import haproxy_services
|
||||
from libra.worker.worker import config_thread
|
||||
@@ -34,7 +35,7 @@ class EventServer(object):
|
||||
non-daemon mode.
|
||||
"""
|
||||
|
||||
def main(self, args, tasks):
|
||||
def main(self, tasks):
|
||||
"""
|
||||
Main method of the server.
|
||||
|
||||
@@ -43,12 +44,14 @@ class EventServer(object):
|
||||
that function's arguments.
|
||||
"""
|
||||
thread_list = []
|
||||
logger = setup_logging('libra_worker', args)
|
||||
logger = libra_logging('libra_worker', 'worker')
|
||||
|
||||
logger.info("Selected driver: %s" % args.driver)
|
||||
if args.driver == 'haproxy':
|
||||
logger.info("Selected HAProxy service: %s" % args.haproxy_service)
|
||||
logger.info("Job server list: %s" % args.server)
|
||||
driver = CONF['worker']['driver']
|
||||
logger.info("Selected driver: %s" % driver)
|
||||
if driver == 'haproxy':
|
||||
logger.info("Selected HAProxy service: %s" %
|
||||
CONF['worker:haproxy']['service'])
|
||||
logger.info("Job server list: %s" % CONF['gearman']['servers'])
|
||||
|
||||
for task, task_args in tasks:
|
||||
task_args = (logger,) + task_args # Make the logger the first arg
|
||||
@@ -70,101 +73,32 @@ class EventServer(object):
|
||||
def main():
|
||||
""" Main Python entry point for the worker utility. """
|
||||
|
||||
options = Options('worker', 'Worker Daemon')
|
||||
options.parser.add_argument(
|
||||
'--driver', dest='driver',
|
||||
choices=known_drivers.keys(), default='haproxy',
|
||||
help='type of device to use'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--gearman_keepalive', action="store_true",
|
||||
help='use KEEPALIVE to Gearman server'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--gearman_keepcnt', type=int, metavar='COUNT',
|
||||
help='max keepalive probes to send before killing connection'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--gearman_keepidle', type=int, metavar='SECONDS',
|
||||
help='seconds of idle time before sending keepalive probes'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--gearman_keepintvl', type=int, metavar='SECONDS',
|
||||
help='seconds between TCP keepalive probes'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--gearman_ssl_ca', dest='gearman_ssl_ca', metavar='FILE',
|
||||
help='Gearman SSL certificate authority'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--gearman_ssl_cert', dest='gearman_ssl_cert', metavar='FILE',
|
||||
help='Gearman SSL certificate'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--gearman_ssl_key', dest='gearman_ssl_key', metavar='FILE',
|
||||
help='Gearman SSL key'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--haproxy_service',
|
||||
choices=haproxy_services.keys(), default='ubuntu',
|
||||
help='os services to use with HAProxy driver (when used)'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--haproxy_logfile', type=str,
|
||||
default='/var/log/haproxy.log',
|
||||
help="Where to store the HAProxy logfile"
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'-s', '--reconnect_sleep', type=int, metavar='TIME',
|
||||
default=60, help='seconds to sleep between job server reconnects'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--server', dest='server', action='append', metavar='HOST:PORT',
|
||||
default=[],
|
||||
help='add a Gearman job server to the connection list'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--stats_poll', type=int, metavar='TIME',
|
||||
default=300, help='statistics polling interval in seconds'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--gearman_poll', type=int, metavar='TIME',
|
||||
default=1, help='Gearman worker polling timeout'
|
||||
)
|
||||
|
||||
args = options.run()
|
||||
|
||||
if not args.server:
|
||||
# NOTE(shrews): Can't set a default in argparse method because the
|
||||
# value is appended to the specified default.
|
||||
args.server.append('localhost:4730')
|
||||
elif not isinstance(args.server, list):
|
||||
# NOTE(shrews): The Options object cannot intelligently handle
|
||||
# creating a list from an option that may have multiple values.
|
||||
# We convert it to the expected type here.
|
||||
svr_list = args.server.split()
|
||||
args.server = svr_list
|
||||
add_common_opts()
|
||||
CONF(project='libra', version=__version__)
|
||||
|
||||
# Import the device driver we are going to use. This will be sent
|
||||
# along to the Gearman task that will use it to communicate with
|
||||
# the device.
|
||||
|
||||
driver_class = importutils.import_class(known_drivers[args.driver])
|
||||
selected_driver = CONF['worker']['driver']
|
||||
driver_class = importutils.import_class(known_drivers[selected_driver])
|
||||
|
||||
if args.driver == 'haproxy':
|
||||
if args.user:
|
||||
user = args.user
|
||||
if selected_driver == 'haproxy':
|
||||
if CONF['user']:
|
||||
user = CONF['user']
|
||||
else:
|
||||
user = getpass.getuser()
|
||||
|
||||
if args.group:
|
||||
group = args.group
|
||||
if CONF['group']:
|
||||
group = CONF['group']
|
||||
else:
|
||||
group = None
|
||||
|
||||
driver = driver_class(haproxy_services[args.haproxy_service],
|
||||
haproxy_service = CONF['worker:haproxy']['service']
|
||||
haproxy_logfile = CONF['worker:haproxy']['logfile']
|
||||
driver = driver_class(haproxy_services[haproxy_service],
|
||||
user, group,
|
||||
haproxy_logfile=args.haproxy_logfile)
|
||||
haproxy_logfile=haproxy_logfile)
|
||||
else:
|
||||
driver = driver_class()
|
||||
|
||||
@@ -172,13 +106,13 @@ def main():
|
||||
|
||||
# Tasks to execute in parallel
|
||||
task_list = [
|
||||
(config_thread, (driver, args))
|
||||
(config_thread, (driver,))
|
||||
]
|
||||
|
||||
if args.nodaemon:
|
||||
server.main(args, task_list)
|
||||
if not CONF['daemon']:
|
||||
server.main(task_list)
|
||||
else:
|
||||
pidfile = daemon.pidfile.TimeoutPIDLockFile(args.pid, 10)
|
||||
pidfile = daemon.pidfile.TimeoutPIDLockFile(CONF['worker']['pid'], 10)
|
||||
if daemon.runner.is_pidfile_stale(pidfile):
|
||||
pidfile.break_lock()
|
||||
context = daemon.DaemonContext(
|
||||
@@ -186,12 +120,12 @@ def main():
|
||||
umask=0o022,
|
||||
pidfile=pidfile
|
||||
)
|
||||
if args.user:
|
||||
context.uid = pwd.getpwnam(args.user).pw_uid
|
||||
if args.group:
|
||||
context.gid = grp.getgrnam(args.group).gr_gid
|
||||
if CONF['user']:
|
||||
context.uid = pwd.getpwnam(CONF['user']).pw_uid
|
||||
if CONF['group']:
|
||||
context.gid = grp.getgrnam(CONF['group']).gr_gid
|
||||
|
||||
context.open()
|
||||
server.main(args, task_list)
|
||||
server.main(task_list)
|
||||
|
||||
return 0
|
||||
|
@@ -17,6 +17,8 @@ import json
|
||||
import socket
|
||||
import time
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from libra.common.json_gearman import JSONGearmanWorker
|
||||
from libra.worker.controller import LBaaSController
|
||||
|
||||
@@ -57,24 +59,24 @@ def handler(worker, job):
|
||||
return copy
|
||||
|
||||
|
||||
def config_thread(logger, driver, args):
|
||||
def config_thread(logger, driver):
|
||||
""" Worker thread function. """
|
||||
# Hostname should be a unique value, like UUID
|
||||
hostname = socket.gethostname()
|
||||
logger.info("[worker] Registering task %s" % hostname)
|
||||
|
||||
server_list = []
|
||||
for host_port in args.server:
|
||||
for host_port in cfg.CONF['gearman']['servers']:
|
||||
host, port = host_port.split(':')
|
||||
server_list.append({'host': host,
|
||||
'port': int(port),
|
||||
'keyfile': args.gearman_ssl_key,
|
||||
'certfile': args.gearman_ssl_cert,
|
||||
'ca_certs': args.gearman_ssl_ca,
|
||||
'keepalive': args.gearman_keepalive,
|
||||
'keepcnt': args.gearman_keepcnt,
|
||||
'keepidle': args.gearman_keepidle,
|
||||
'keepintvl': args.gearman_keepintvl})
|
||||
'keyfile': cfg.CONF['gearman']['ssl_key'],
|
||||
'certfile': cfg.CONF['gearman']['ssl_cert'],
|
||||
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
|
||||
'keepalive': cfg.CONF['gearman']['keepalive'],
|
||||
'keepcnt': cfg.CONF['gearman']['keepcnt'],
|
||||
'keepidle': cfg.CONF['gearman']['keepidle'],
|
||||
'keepintvl': cfg.CONF['gearman']['keepintvl']})
|
||||
|
||||
worker = CustomJSONGearmanWorker(server_list)
|
||||
worker.set_client_id(hostname)
|
||||
@@ -86,12 +88,12 @@ def config_thread(logger, driver, args):
|
||||
|
||||
while (retry):
|
||||
try:
|
||||
worker.work(args.gearman_poll)
|
||||
worker.work(cfg.CONF['gearman']['poll'])
|
||||
except KeyboardInterrupt:
|
||||
retry = False
|
||||
except gearman.errors.ServerUnavailable:
|
||||
logger.error("[worker] Job server(s) went away. Reconnecting.")
|
||||
time.sleep(args.reconnect_sleep)
|
||||
time.sleep(cfg.CONF['gearman']['reconnect_sleep'])
|
||||
retry = True
|
||||
except Exception as e:
|
||||
logger.critical("[worker] Exception: %s, %s" % (e.__class__, e))
|
||||
|
@@ -2,6 +2,7 @@ pbr>=0.5.21,<1.0
|
||||
|
||||
eventlet
|
||||
gearman>=2.0.2
|
||||
oslo.config
|
||||
python-daemon>=1.6
|
||||
python-logstash
|
||||
python_novaclient>=2.14.1,<2.14.2
|
||||
|
Reference in New Issue
Block a user