diff --git a/libra/worker/worker.py b/libra/worker/worker.py index 31ad4f32..2356d36a 100644 --- a/libra/worker/worker.py +++ b/libra/worker/worker.py @@ -12,14 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. -import gearman.errors import json import socket -import time - +import gear from oslo.config import cfg - -from libra.common.json_gearman import JSONGearmanWorker from libra.worker.controller import LBaaSController from libra.openstack.common import log @@ -27,7 +23,7 @@ from libra.openstack.common import log LOG = log.getLogger(__name__) -class CustomJSONGearmanWorker(JSONGearmanWorker): +class CustomJSONGearmanWorker(gear.Worker): """ Custom class we will use to pass arguments to the Gearman task. """ driver = None @@ -43,13 +39,13 @@ def handler(worker, job): driver = worker.driver # Hide information that should not be logged - copy = job.data.copy() + copy = json.loads(job.arguments) if LBaaSController.OBJ_STORE_TOKEN_FIELD in copy: copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****" LOG.debug("Received JSON message: %s" % json.dumps(copy)) - controller = LBaaSController(driver, job.data) + controller = LBaaSController(driver, json.loads(job.arguments)) response = controller.run() # Hide information that should not be logged @@ -58,7 +54,7 @@ def handler(worker, job): copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****" LOG.debug("Return JSON message: %s" % json.dumps(copy)) - return copy + job.sendWorkComplete(json.dumps(copy)) def config_thread(driver): @@ -66,39 +62,23 @@ def config_thread(driver): # Hostname should be a unique value, like UUID hostname = socket.gethostname() LOG.info("Registering task %s" % hostname) - - server_list = [] + worker = CustomJSONGearmanWorker(hostname) for host_port in cfg.CONF['gearman']['servers']: host, port = host_port.split(':') - server_list.append({'host': host, - 'port': int(port), - 'keyfile': cfg.CONF['gearman']['ssl_key'], - 'certfile': cfg.CONF['gearman']['ssl_cert'], - 'ca_certs': cfg.CONF['gearman']['ssl_ca'], - 'keepalive': cfg.CONF['gearman']['keepalive'], - 'keepcnt': cfg.CONF['gearman']['keepcnt'], - 'keepidle': cfg.CONF['gearman']['keepidle'], - 'keepintvl': cfg.CONF['gearman']['keepintvl']}) - - worker = CustomJSONGearmanWorker(server_list) - worker.set_client_id(hostname) - worker.register_task(hostname, handler) - worker.logger = LOG + worker.addServer(host, port, cfg.CONF['gearman']['ssl_key'], + cfg.CONF['gearman']['ssl_cert'], + cfg.CONF['gearman']['ssl_ca']) + worker.registerFunction(hostname) + worker.log = LOG worker.driver = driver - retry = True - - while (retry): + while retry: try: - worker.work(cfg.CONF['gearman']['poll']) + job = worker.getJob() + handler(worker, job) except KeyboardInterrupt: retry = False - except gearman.errors.ServerUnavailable: - LOG.error("Job server(s) went away. Reconnecting.") - time.sleep(cfg.CONF['gearman']['reconnect_sleep']) - retry = True except Exception as e: LOG.critical("Exception: %s, %s" % (e.__class__, e)) retry = False - LOG.debug("Worker process terminated.") diff --git a/requirements.txt b/requirements.txt index 5895e278..419a08c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ pbr>=0.5.21,<1.0 Babel>=0.9.6 eventlet +gear gearman>=2.0.2 oslo.config>=1.2.0 python-daemon>=1.6 @@ -14,5 +15,5 @@ sqlalchemy>=0.8.0 wsme>=0.5b2 mysql-connector-python ipaddress==1.0.4 -six<1.4.0 +six kombu