Replaced gearman python library with gear python library in the pool manager

Change-Id: I3b4b0440bb1e0a925cee93909e6f1c44e578ae95
This commit is contained in:
Michael Johnson (johnsom)
2014-06-02 14:12:45 -07:00
parent 9444a342cd
commit fd1947100c
2 changed files with 72 additions and 58 deletions

View File

@@ -12,20 +12,33 @@
# License for the specific language governing permissions and limitations
# under the License.
import gear
import json
from time import sleep
from novaclient import exceptions
from oslo.config import cfg
from gearman.constants import JOB_UNKNOWN
from libra.openstack.common import log
from libra.common.json_gearman import JSONGearmanClient
from libra.mgm.nova import Node, BuildError, NotFound
POLL_COUNT = 10
POLL_SLEEP = 60
LOG = log.getLogger(__name__)
class BuildController(object):
class DisconnectClient(gear.Client):
def handleDisconnect(self, job):
job.disconnect = True
class DisconnectJob(gear.Job):
def __init__(self, name, arguments):
super(DisconnectJob, self).__init__(name, arguments)
self.disconnect = False
class BuildController(object):
RESPONSE_FIELD = 'response'
RESPONSE_SUCCESS = 'PASS'
RESPONSE_FAILURE = 'FAIL'
@@ -96,7 +109,7 @@ class BuildController(object):
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
if resp.status_code not in(200, 203):
if resp.status_code not in (200, 203):
LOG.error(
'Error geting status from Nova, error {0}'
.format(resp.status_code)
@@ -129,44 +142,56 @@ class BuildController(object):
def _test_node(self, name):
""" Run diags on node, blow it away if bad """
server_list = []
client = DisconnectClient()
for server in cfg.CONF['gearman']['servers']:
host, port = server.split(':')
server_list.append({'host': host,
'port': int(port),
'keyfile': cfg.CONF['gearman']['ssl_key'],
'certfile': cfg.CONF['gearman']['ssl_cert'],
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']})
gm_client = JSONGearmanClient(server_list)
client.addServer(host,
int(port),
cfg.CONF['gearman']['ssl_key'],
cfg.CONF['gearman']['ssl_cert'],
cfg.CONF['gearman']['ssl_ca'])
client.waitForServer()
job_data = {'hpcs_action': 'DIAGNOSTICS'}
job_status = gm_client.submit_job(
str(name), job_data, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=10
)
if job_status.state == JOB_UNKNOWN:
# Gearman server connect fail, count as bad node because we can't
# tell if it really is working
LOG.error('Could not talk to gearman server')
job = DisconnectJob(str(name), json.dumps(job_data))
client.submitJob(job)
pollcount = 0
# Would like to make these config file settings
while not job.complete\
and pollcount < POLL_COUNT\
and not job.disconnect:
sleep(POLL_SLEEP)
pollcount += 1
if job.disconnect:
LOG.error('Gearman Job server fail - disconnect')
return False
if job_status.timed_out:
# We timed out waiting for the job to finish
if not job.complete:
LOG.warning('Timeout getting diags from {0}'.format(name))
return False
LOG.debug(job_status.result)
result = json.loads(job.data[0])
LOG.debug(result)
# Would only happen if DIAGNOSTICS call not supported
if job_status.result['hpcs_response'] == 'FAIL':
if result['hpcs_response'] == 'FAIL':
return True
if job_status.result['network'] == 'FAIL':
if result['network'] == 'FAIL':
return False
gearman_count = 0
gearman_fail = 0
for gearman_test in job_status.result['gearman']:
for gearman_test in result['gearman']:
gearman_count += 1
if gearman_test['status'] == 'FAIL':
LOG.info(

View File

@@ -12,14 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import gearman.errors
import gear
import json
import socket
import time
from oslo.config import cfg
from libra.common.json_gearman import JSONGearmanWorker
from libra.mgm.controllers.root import PoolMgmController
from libra.openstack.common import log
@@ -27,49 +24,41 @@ from libra.openstack.common import log
LOG = log.getLogger(__name__)
def handler(worker, job):
LOG.debug("Received JSON message: {0}".format(json.dumps(job.data)))
controller = PoolMgmController(job.data)
def handler(job):
LOG.debug("Received JSON message: {0}".format(json.dumps(job.arguments)))
controller = PoolMgmController(json.loads(job.arguments))
response = controller.run()
LOG.debug("Return JSON message: {0}".format(json.dumps(response)))
return response
job.sendWorkComplete(json.dumps(response))
def worker_thread():
LOG.info("Registering task libra_pool_mgm")
hostname = socket.gethostname()
server_list = []
worker = gear.Worker(hostname)
for host_port in cfg.CONF['gearman']['servers']:
host, port = host_port.split(':')
server_list.append({'host': host,
'port': int(port),
'keyfile': cfg.CONF['gearman']['ssl_key'],
'certfile': cfg.CONF['gearman']['ssl_cert'],
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']})
worker = JSONGearmanWorker(server_list)
worker.set_client_id(hostname)
worker.register_task('libra_pool_mgm', handler)
worker.addServer(host,
int(port),
cfg.CONF['gearman']['ssl_key'],
cfg.CONF['gearman']['ssl_cert'],
cfg.CONF['gearman']['ssl_ca'])
worker.registerFunction('libra_pool_mgm')
worker.logger = LOG
retry = True
while (retry):
while retry:
try:
worker.work(cfg.CONF['gearman']['poll'])
job = worker.getJob()
handler(job)
except KeyboardInterrupt:
retry = False
except gearman.errors.ServerUnavailable:
LOG.error("Job server(s) went away. Reconnecting.")
time.sleep(cfg.CONF['gearman']['reconnect_sleep'])
retry = True
except Exception:
LOG.exception("Exception in worker")
except Exception as e:
LOG.exception("Exception in pool manager worker: %s, %s"
% (e.__class__, e))
retry = False
LOG.debug("Pool manager process terminated.")