From d0bce05abdfe4d68e7b4751365fcb80cef7a65f5 Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Fri, 2 Nov 2012 16:35:29 -0400 Subject: [PATCH] More code reorg: move Gearman code to worker.py Move the Gearman code used by the worker out of main.py and into worker.py where it belongs. Change-Id: Ied1e32034e7c2f497fcc6c183fc6c5f48cc08129 --- libra/worker/main.py | 39 ++++----------------------------------- libra/worker/worker.py | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 42 insertions(+), 36 deletions(-) diff --git a/libra/worker/main.py b/libra/worker/main.py index c7040cdc..d4ad53ec 100644 --- a/libra/worker/main.py +++ b/libra/worker/main.py @@ -14,26 +14,16 @@ import daemon import daemon.pidfile -import gearman.errors import grp import pwd -import socket -from time import sleep from libra.openstack.common import importutils -from libra.common.json_gearman import JSONGearmanWorker from libra.common.options import Options, setup_logging from libra.worker.worker import config_manager from libra.worker.drivers.base import known_drivers from libra.worker.drivers.haproxy.services_base import haproxy_services -class CustomJSONGearmanWorker(JSONGearmanWorker): - """ Custom class we will use to pass arguments to the Gearman task. """ - logger = None - driver = None - - class Server(object): """ Encapsulates server activity so we can run it in either daemon or @@ -48,31 +38,10 @@ class Server(object): def main(self): """ Main method of the server. """ - my_ip = socket.gethostbyname(socket.gethostname()) - task_name = "lbaas-%s" % my_ip - self.logger.info("Registering task %s" % task_name) - - worker = CustomJSONGearmanWorker(self.servers) - worker.set_client_id(my_ip) - worker.register_task(task_name, config_manager) - worker.logger = self.logger - worker.driver = self.driver - - retry = True - - while (retry): - try: - worker.work() - except KeyboardInterrupt: - retry = False - except gearman.errors.ServerUnavailable: - self.logger.error("Job server(s) went away. Reconnecting.") - sleep(self.reconnect_sleep) - retry = True - except Exception as e: - self.logger.critical("Exception: %s, %s" % (e.__class__, e)) - retry = False - + config_manager(self.logger, + self.driver, + self.servers, + self.reconnect_sleep) self.logger.info("Shutting down") diff --git a/libra/worker/worker.py b/libra/worker/worker.py index 4c60c7ab..c93b80d9 100644 --- a/libra/worker/worker.py +++ b/libra/worker/worker.py @@ -12,12 +12,22 @@ # License for the specific language governing permissions and limitations # under the License. +import gearman.errors import json +import socket +import time +from libra.common.json_gearman import JSONGearmanWorker from libra.worker.controller import LBaaSController -def config_manager(worker, job): +class CustomJSONGearmanWorker(JSONGearmanWorker): + """ Custom class we will use to pass arguments to the Gearman task. """ + logger = None + driver = None + + +def handler(worker, job): """ Main Gearman worker task. @@ -36,3 +46,30 @@ def config_manager(worker, job): logger.debug("Return JSON message: %s" % json.dumps(response, indent=4)) return response + + +def config_manager(logger, driver, servers, reconnect_sleep): + my_ip = socket.gethostbyname(socket.gethostname()) + task_name = "lbaas-%s" % my_ip + logger.info("Registering task %s" % task_name) + + worker = CustomJSONGearmanWorker(servers) + worker.set_client_id(my_ip) + worker.register_task(task_name, handler) + worker.logger = logger + worker.driver = driver + + retry = True + + while (retry): + try: + worker.work() + except KeyboardInterrupt: + retry = False + except gearman.errors.ServerUnavailable: + logger.error("Job server(s) went away. Reconnecting.") + time.sleep(reconnect_sleep) + retry = True + except Exception as e: + logger.critical("Exception: %s, %s" % (e.__class__, e)) + retry = False