min switch gearman to gear worker

Change-Id: I0dad4364f15a81ee17d4a1e1aa6585364b14a439
This commit is contained in:
Min Wang
2014-05-28 16:01:49 -07:00
committed by German Eichberger
parent d6c156a433
commit 9444a342cd
2 changed files with 16 additions and 35 deletions

View File

@@ -12,14 +12,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import gearman.errors
import json import json
import socket import socket
import time import gear
from oslo.config import cfg from oslo.config import cfg
from libra.common.json_gearman import JSONGearmanWorker
from libra.worker.controller import LBaaSController from libra.worker.controller import LBaaSController
from libra.openstack.common import log from libra.openstack.common import log
@@ -27,7 +23,7 @@ from libra.openstack.common import log
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class CustomJSONGearmanWorker(JSONGearmanWorker): class CustomJSONGearmanWorker(gear.Worker):
""" Custom class we will use to pass arguments to the Gearman task. """ """ Custom class we will use to pass arguments to the Gearman task. """
driver = None driver = None
@@ -43,13 +39,13 @@ def handler(worker, job):
driver = worker.driver driver = worker.driver
# Hide information that should not be logged # Hide information that should not be logged
copy = job.data.copy() copy = json.loads(job.arguments)
if LBaaSController.OBJ_STORE_TOKEN_FIELD in copy: if LBaaSController.OBJ_STORE_TOKEN_FIELD in copy:
copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****" copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****"
LOG.debug("Received JSON message: %s" % json.dumps(copy)) LOG.debug("Received JSON message: %s" % json.dumps(copy))
controller = LBaaSController(driver, job.data) controller = LBaaSController(driver, json.loads(job.arguments))
response = controller.run() response = controller.run()
# Hide information that should not be logged # Hide information that should not be logged
@@ -58,7 +54,7 @@ def handler(worker, job):
copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****" copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****"
LOG.debug("Return JSON message: %s" % json.dumps(copy)) LOG.debug("Return JSON message: %s" % json.dumps(copy))
return copy job.sendWorkComplete(json.dumps(copy))
def config_thread(driver): def config_thread(driver):
@@ -66,39 +62,23 @@ def config_thread(driver):
# Hostname should be a unique value, like UUID # Hostname should be a unique value, like UUID
hostname = socket.gethostname() hostname = socket.gethostname()
LOG.info("Registering task %s" % hostname) LOG.info("Registering task %s" % hostname)
worker = CustomJSONGearmanWorker(hostname)
server_list = []
for host_port in cfg.CONF['gearman']['servers']: for host_port in cfg.CONF['gearman']['servers']:
host, port = host_port.split(':') host, port = host_port.split(':')
server_list.append({'host': host, worker.addServer(host, port, cfg.CONF['gearman']['ssl_key'],
'port': int(port), cfg.CONF['gearman']['ssl_cert'],
'keyfile': cfg.CONF['gearman']['ssl_key'], cfg.CONF['gearman']['ssl_ca'])
'certfile': cfg.CONF['gearman']['ssl_cert'], worker.registerFunction(hostname)
'ca_certs': cfg.CONF['gearman']['ssl_ca'], worker.log = LOG
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']})
worker = CustomJSONGearmanWorker(server_list)
worker.set_client_id(hostname)
worker.register_task(hostname, handler)
worker.logger = LOG
worker.driver = driver worker.driver = driver
retry = True retry = True
while retry:
while (retry):
try: try:
worker.work(cfg.CONF['gearman']['poll']) job = worker.getJob()
handler(worker, job)
except KeyboardInterrupt: except KeyboardInterrupt:
retry = False retry = False
except gearman.errors.ServerUnavailable:
LOG.error("Job server(s) went away. Reconnecting.")
time.sleep(cfg.CONF['gearman']['reconnect_sleep'])
retry = True
except Exception as e: except Exception as e:
LOG.critical("Exception: %s, %s" % (e.__class__, e)) LOG.critical("Exception: %s, %s" % (e.__class__, e))
retry = False retry = False
LOG.debug("Worker process terminated.") LOG.debug("Worker process terminated.")

View File

@@ -2,6 +2,7 @@ pbr>=0.5.21,<1.0
Babel>=0.9.6 Babel>=0.9.6
eventlet eventlet
gear
gearman>=2.0.2 gearman>=2.0.2
oslo.config>=1.2.0 oslo.config>=1.2.0
python-daemon>=1.6 python-daemon>=1.6
@@ -14,5 +15,5 @@ sqlalchemy>=0.8.0
wsme>=0.5b2 wsme>=0.5b2
mysql-connector-python mysql-connector-python
ipaddress==1.0.4 ipaddress==1.0.4
six<1.4.0 six
kombu kombu