From 80e7bae30c477de9ed7adf6acef84503c940bc0b Mon Sep 17 00:00:00 2001 From: German Eichberger Date: Mon, 23 Jun 2014 11:26:01 -0700 Subject: [PATCH] backporting... Change-Id: I894cfb152297dd175105d8f35023a090bc5d8bb5 --- libra/admin_api/device_pool/manage_pool.py | 299 ++++++++++++--------- libra/admin_api/stats/offline_sched.py | 4 +- libra/admin_api/stats/ping_sched.py | 4 +- libra/admin_api/stats/stats_gearman.py | 209 +++++++------- libra/admin_api/stats/stats_sched.py | 4 +- libra/common/api/gearman_client.py | 90 +++---- libra/common/json_gearman.py | 32 ++- libra/mgm/controllers/build.py | 80 ++---- libra/mgm/gearman_worker.py | 49 ++-- 9 files changed, 387 insertions(+), 384 deletions(-) diff --git a/libra/admin_api/device_pool/manage_pool.py b/libra/admin_api/device_pool/manage_pool.py index 783d0c1f..768636b3 100644 --- a/libra/admin_api/device_pool/manage_pool.py +++ b/libra/admin_api/device_pool/manage_pool.py @@ -16,14 +16,14 @@ import ipaddress import threading from datetime import datetime +from gearman.constants import JOB_UNKNOWN from oslo.config import cfg from sqlalchemy import func from libra.common.api.lbaas import Device, PoolBuilding, Vip, db_session from libra.common.api.lbaas import Counters -from libra.common.json_gearman import JsonJob +from libra.common.json_gearman import JSONGearmanClient from libra.openstack.common import log -import gear # TODO: Lots of duplication of code here, need to cleanup @@ -31,6 +31,7 @@ LOG = log.getLogger(__name__) class Pool(object): + DELETE_SECONDS = cfg.CONF['admin_api'].delete_timer_seconds PROBE_SECONDS = cfg.CONF['admin_api'].probe_timer_seconds VIPS_SECONDS = cfg.CONF['admin_api'].vips_timer_seconds @@ -48,8 +49,6 @@ class Pool(object): self.start_probe_sched() self.start_vips_sched() - self.gear = GearmanWork() # set up the async gearman - def shutdown(self): if self.probe_timer: self.probe_timer.cancel() @@ -67,8 +66,9 @@ class Pool(object): return LOG.info('Running device delete check') try: + message = [] with db_session() as session: - devices = session.query(Device). \ + devices = session.query(Device).\ filter(Device.status == 'DELETED').all() for device in devices: @@ -76,12 +76,17 @@ class Pool(object): 'action': 'DELETE_DEVICE', 'name': device.name } - self.gear.send_delete_message(job_data) + message.append(dict(task='libra_pool_mgm', data=job_data)) - counter = session.query(Counters). \ + counter = session.query(Counters).\ filter(Counters.name == 'devices_deleted').first() counter.value += len(devices) session.commit() + if not message: + LOG.info("No devices to delete") + else: + gear = GearmanWork() + gear.send_delete_message(message) except: LOG.exception("Exception when deleting devices") @@ -97,7 +102,7 @@ class Pool(object): try: with db_session() as session: NULL = None # For pep8 - vip_count = session.query(Vip). \ + vip_count = session.query(Vip).\ filter(Vip.device == NULL).count() if vip_count >= self.vip_pool_size: LOG.info("Enough vips exist, no work to do") @@ -123,11 +128,11 @@ class Pool(object): try: with db_session() as session: # Double check we have no outstanding builds assigned to us - session.query(PoolBuilding). \ - filter(PoolBuilding.server_id == self.server_id). \ + session.query(PoolBuilding).\ + filter(PoolBuilding.server_id == self.server_id).\ delete() session.flush() - dev_count = session.query(Device). \ + dev_count = session.query(Device).\ filter(Device.status == 'OFFLINE').count() if dev_count >= self.node_pool_size: LOG.info("Enough devices exist, no work to do") @@ -159,8 +164,8 @@ class Pool(object): # for a long time locking tables self._build_nodes(build_count) with db_session() as session: - session.query(PoolBuilding). \ - filter(PoolBuilding.server_id == self.server_id). \ + session.query(PoolBuilding).\ + filter(PoolBuilding.server_id == self.server_id).\ delete() session.commit() except: @@ -168,14 +173,24 @@ class Pool(object): self.start_probe_sched() def _build_nodes(self, count): + message = [] + it = 0 job_data = {'action': 'BUILD_DEVICE'} - for it in range(0, count): - self.gear.send_create_message(job_data) + while it < count: + message.append(dict(task='libra_pool_mgm', data=job_data)) + it += 1 + gear = GearmanWork() + gear.send_create_message(message) def _build_vips(self, count): + message = [] + it = 0 job_data = {'action': 'BUILD_IP'} - for it in range(0, count): - self.gear.send_vips_message(job_data) + while it < count: + message.append(dict(task='libra_pool_mgm', data=job_data)) + it += 1 + gear = GearmanWork() + gear.send_vips_message(message) def start_probe_sched(self): seconds = datetime.now().second @@ -213,126 +228,160 @@ class Pool(object): class GearmanWork(object): - class VIPClient(gear.Client): - def handleWorkComplete(self, packet): - job = super(GearmanWork.VIPClient, self).handleWorkComplete(packet) + def __init__(self): + server_list = [] + for server in cfg.CONF['gearman']['servers']: + host, port = server.split(':') + server_list.append({'host': host, + 'port': int(port), + 'keyfile': cfg.CONF['gearman']['ssl_key'], + 'certfile': cfg.CONF['gearman']['ssl_cert'], + 'ca_certs': cfg.CONF['gearman']['ssl_ca'], + 'keepalive': cfg.CONF['gearman']['keepalive'], + 'keepcnt': cfg.CONF['gearman']['keepcnt'], + 'keepidle': cfg.CONF['gearman']['keepidle'], + 'keepintvl': cfg.CONF['gearman']['keepintvl'] + }) + self.gearman_client = JSONGearmanClient(server_list) + + def send_delete_message(self, message): + LOG.info("Sending %d gearman messages", len(message)) + job_status = self.gearman_client.submit_multiple_jobs( + message, background=False, wait_until_complete=True, + max_retries=10, poll_timeout=30.0 + ) + delete_count = 0 + for status in job_status: + if status.state == JOB_UNKNOWN: + LOG.error('Gearman Job server fail') + continue + if status.timed_out: + LOG.error('Gearman timeout whilst deleting device') + continue + if status.result['response'] == 'FAIL': + LOG.error( + 'Pool manager failed to delete a device, removing from DB' + ) + + delete_count += 1 + with db_session() as session: + session.query(Device).\ + filter(Device.name == status.result['name']).delete() + session.commit() + + LOG.info('%d freed devices delete from pool', delete_count) + + def send_vips_message(self, message): + # TODO: make this gearman part more async, not wait for all builds + LOG.info("Sending %d gearman messages", len(message)) + job_status = self.gearman_client.submit_multiple_jobs( + message, background=False, wait_until_complete=True, + max_retries=10, poll_timeout=3600.0 + ) + built_count = 0 + for status in job_status: + if status.state == JOB_UNKNOWN: + LOG.error('Gearman Job server fail') + continue + if status.timed_out: + LOG.error('Gearman timeout whilst building vip') + continue + if status.result['response'] == 'FAIL': + LOG.error('Pool manager failed to build a vip') + continue + + built_count += 1 try: - if job.msg['response'] == 'FAIL': - LOG.error('Pool manager failed to build a vip') - else: - self._add_vip(job.msg) + self._add_vip(status.result) except: LOG.exception( 'Could not add vip to DB, node data: {0}' - .format(job.msg) + .format(status.result) ) + LOG.info( + '{vips} vips built and added to pool'.format(vips=built_count) + ) - def _add_vip(self, data): - LOG.info('Adding vip {0} to DB'.format(data['ip'])) - vip = Vip() - vip.ip = int(ipaddress.IPv4Address(unicode(data['ip']))) - with db_session() as session: - session.add(vip) - counter = session.query(Counters). \ - filter(Counters.name == 'vips_built').first() - counter.value += 1 - session.commit() + def send_create_message(self, message): + # TODO: make this gearman part more async, not wait for all builds + LOG.info("Sending {0} gearman messages".format(len(message))) + job_status = self.gearman_client.submit_multiple_jobs( + message, background=False, wait_until_complete=True, + max_retries=10, poll_timeout=3600.0 + ) + built_count = 0 + for status in job_status: + if status.state == JOB_UNKNOWN: + LOG.error('Gearman Job server fail') + continue + if status.timed_out: + LOG.error('Gearman timeout whilst building device') + continue + if status.result['response'] == 'FAIL': + LOG.error('Pool manager failed to build a device') + if 'name' in status.result: + self._add_bad_node(status.result) + continue - class DeleteClient(gear.Client): - def handleWorkComplete(self, packet): - job = super(GearmanWork.DeleteClient, - self).handleWorkComplete(packet) - - if job.msg['response'] == 'FAIL': - LOG.error( - 'Pool manager failed to delete a device, removing from DB') - - self._delete_from_db(job.msg) - - def _delete_from_db(self, msg): - with db_session() as session: - session.query(Device). \ - filter(Device.name == msg['name']).delete() - session.commit() - LOG.info("Delete device %s" % msg['name']) - - class CreateClient(gear.Client): - def handleWorkComplete(self, packet): - job = super(GearmanWork.CreateClient, - self).handleWorkComplete(packet) + built_count += 1 try: - if job.msg['response'] == 'FAIL': - LOG.error('Pool manager failed to build a device') - if 'name' in job.msg: - self._add_bad_node(job.msg) - else: - self._add_node(job.msg) + self._add_node(status.result) except: LOG.exception( 'Could not add node to DB, node data: {0}' - .format(job.msg) + .format(status.result) ) + LOG.info( + '{nodes} devices built and added to pool'.format(nodes=built_count) + ) - def _add_node(self, data): - LOG.info('Adding device {0} to DB'.format(data['name'])) - device = Device() - device.name = data['name'] - device.publicIpAddr = data['addr'] - # TODO: kill this field, make things use publicIpAddr instead - device.floatingIpAddr = data['addr'] - device.az = data['az'] - device.type = data['type'] - device.pingCount = 0 - device.status = 'OFFLINE' - device.created = None - with db_session() as session: - session.add(device) - counter = session.query(Counters). \ - filter(Counters.name == 'devices_built').first() - counter.value += 1 - session.commit() + def _add_vip(self, data): + LOG.info('Adding vip {0} to DB'.format(data['ip'])) + vip = Vip() + vip.ip = int(ipaddress.IPv4Address(unicode(data['ip']))) + with db_session() as session: + session.add(vip) + counter = session.query(Counters).\ + filter(Counters.name == 'vips_built').first() + counter.value += 1 + session.commit() - def _add_bad_node(self, data): - LOG.info( - "Adding bad device {0} to DB to be deleted" % (data['name'])) - device = Device() - device.name = data['name'] - device.publicIpAddr = data['addr'] - # TODO: kill this field, make things use publicIpAddr instead - device.floatingIpAddr = data['addr'] - device.az = data['az'] - device.type = data['type'] - device.pingCount = 0 - device.status = 'DELETED' - device.created = None - with db_session() as session: - session.add(device) - counter = session.query(Counters). \ - filter(Counters.name == 'devices_bad_built').first() - counter.value += 1 - session.commit() + def _add_node(self, data): + LOG.info('Adding device {0} to DB'.format(data['name'])) + device = Device() + device.name = data['name'] + device.publicIpAddr = data['addr'] + # TODO: kill this field, make things use publicIpAddr instead + device.floatingIpAddr = data['addr'] + device.az = data['az'] + device.type = data['type'] + device.pingCount = 0 + device.status = 'OFFLINE' + device.created = None + with db_session() as session: + session.add(device) + counter = session.query(Counters).\ + filter(Counters.name == 'devices_built').first() + counter.value += 1 + session.commit() - def __init__(self): - self.vip_client = GearmanWork.VIPClient("Vip Client") - self.delete_client = GearmanWork.DeleteClient("Delete Client") - self.create_client = GearmanWork.CreateClient("Create Client") - - for x in [self.vip_client, self.create_client, self.delete_client]: - self._init_client(x) - - def _init_client(self, client): - client.log = LOG - for server in cfg.CONF['gearman']['servers']: - host, port = server.split(':') - client.addServer(host, port, cfg.CONF['gearman']['ssl_key'], - cfg.CONF['gearman']['ssl_cert'], - cfg.CONF['gearman']['ssl_ca']) - - def send_delete_message(self, message, name='libra_pool_mgm'): - self.delete_client.submitJob(JsonJob(name, message)) - - def send_vips_message(self, message, name='libra_pool_mgm'): - self.vip_client.submitJob(JsonJob(name, message)) - - def send_create_message(self, message, name='libra_pool_mgm'): - self.create_client.submitJob(JsonJob(name, message)) + def _add_bad_node(self, data): + LOG.info( + 'Adding bad device {0} to DB to be deleted'.format(data['name']) + ) + device = Device() + device.name = data['name'] + device.publicIpAddr = data['addr'] + # TODO: kill this field, make things use publicIpAddr instead + device.floatingIpAddr = data['addr'] + device.az = data['az'] + device.type = data['type'] + device.pingCount = 0 + device.status = 'DELETED' + device.created = None + with db_session() as session: + session.add(device) + counter = session.query(Counters).\ + filter(Counters.name == 'devices_bad_built').first() + counter.value += 1 + session.commit() diff --git a/libra/admin_api/stats/offline_sched.py b/libra/admin_api/stats/offline_sched.py index e32defd6..b595a571 100644 --- a/libra/admin_api/stats/offline_sched.py +++ b/libra/admin_api/stats/offline_sched.py @@ -37,7 +37,6 @@ class OfflineStats(object): self.server_id = cfg.CONF['admin_api']['server_id'] self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] - self.gearman = GearJobs() self.start_offline_sched() def shutdown(self): @@ -81,7 +80,8 @@ class OfflineStats(object): return (0, 0) for lb in devices: node_list.append(lb.name) - failed_lbs = self.gearman.offline_check(node_list) + gearman = GearJobs() + failed_lbs = gearman.offline_check(node_list) failed = len(failed_lbs) if failed > self.error_limit: LOG.error( diff --git a/libra/admin_api/stats/ping_sched.py b/libra/admin_api/stats/ping_sched.py index b45aac31..538d2eb0 100644 --- a/libra/admin_api/stats/ping_sched.py +++ b/libra/admin_api/stats/ping_sched.py @@ -36,7 +36,6 @@ class PingStats(object): self.stats_driver = cfg.CONF['admin_api']['stats_driver'] LOG.info("Selected stats drivers: %s", self.stats_driver) - self.gearman = GearJobs() self.start_ping_sched() def shutdown(self): @@ -76,7 +75,8 @@ class PingStats(object): return (0, 0) for lb in devices: node_list.append(lb.name) - failed_lbs, node_status = self.gearman.send_pings(node_list) + gearman = GearJobs() + failed_lbs, node_status = gearman.send_pings(node_list) failed = len(failed_lbs) if failed > self.error_limit: LOG.error( diff --git a/libra/admin_api/stats/stats_gearman.py b/libra/admin_api/stats/stats_gearman.py index 9f20adc3..f180048b 100644 --- a/libra/admin_api/stats/stats_gearman.py +++ b/libra/admin_api/stats/stats_gearman.py @@ -12,238 +12,213 @@ # License for the specific language governing permissions and limitations # under the License. +from gearman.constants import JOB_UNKNOWN from oslo.config import cfg +from libra.common.json_gearman import JSONGearmanClient from libra.openstack.common import log -import gear -from libra.common.json_gearman import JsonJob -import time LOG = log.getLogger(__name__) class GearJobs(object): - class DisconnectClient(gear.Client): - def handleDisconnect(self, job): - job.disconnect = True - - class DisconnectJob(JsonJob): - def __init__(self, name, msg, unique=None): - super(GearJobs.DisconnectJob, self).__init__(name, msg, unique) - self.disconnect = False - def __init__(self): self.poll_timeout = cfg.CONF['admin_api']['stats_poll_timeout'] self.poll_retry = cfg.CONF['admin_api']['stats_poll_timeout_retry'] - self.gm_client = gear.Client("stats") - self.gm_client.log = LOG + server_list = [] for server in cfg.CONF['gearman']['servers']: host, port = server.split(':') - self.gm_client.addServer(host, port, - cfg.CONF['gearman']['ssl_key'], - cfg.CONF['gearman']['ssl_cert'], - cfg.CONF['gearman']['ssl_ca']) - - def _all_complete(self, jobs): - for job in jobs: - if not (job.complete or job.disconnect): - return False - return True - - def _wait(self, pings): - poll_count = 0 - while not self._all_complete(pings) and poll_count < self.poll_retry: - # wait for jobs - time.sleep(self.poll_timeout) - poll_count += 1 + server_list.append({'host': host, + 'port': int(port), + 'keyfile': cfg.CONF['gearman']['ssl_key'], + 'certfile': cfg.CONF['gearman']['ssl_cert'], + 'ca_certs': cfg.CONF['gearman']['ssl_ca'], + 'keepalive': cfg.CONF['gearman']['keepalive'], + 'keepcnt': cfg.CONF['gearman']['keepcnt'], + 'keepidle': cfg.CONF['gearman']['keepidle'], + 'keepintvl': cfg.CONF['gearman']['keepintvl'] + }) + self.gm_client = JSONGearmanClient(server_list) def send_pings(self, node_list): + # TODO: lots of duplicated code that needs cleanup + list_of_jobs = [] failed_list = [] node_status = dict() retry_list = [] - submitted_pings = [] # The message name is STATS for historical reasons. Real # data statistics are gathered with METRICS messages. job_data = {"hpcs_action": "STATS"} for node in node_list: - job = GearJobs.DisconnectJob(str(node), job_data) - self.gm_client.submitJob(job) - submitted_pings.append(job) - - self._wait(submitted_pings) - + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_pings = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=self.poll_timeout + ) for ping in submitted_pings: - if ping.disconnect: + if ping.state == JOB_UNKNOWN: # TODO: Gearman server failed, ignoring for now LOG.error('Gearman Job server fail') continue - if not ping.complete: + if ping.timed_out: # Ping timeout - retry_list.append(ping) + retry_list.append(ping.job.task) continue - if ping.msg['hpcs_response'] == 'FAIL': + if ping.result['hpcs_response'] == 'FAIL': if ( 'status' in ping.result and - ping.msg['status'] == 'DELETED' + ping.result['status'] == 'DELETED' ): continue # Error returned by Gearman - failed_list.append(ping) + failed_list.append(ping.job.task) continue else: - if 'nodes' in ping.msg: - node_status[ping.name] = ping.msg['nodes'] + if 'nodes' in ping.result: + node_status[ping.job.task] = ping.result['nodes'] - submitted_pings = [] + list_of_jobs = [] if len(retry_list) > 0: LOG.info( "{0} pings timed out, retrying".format(len(retry_list)) ) for node in retry_list: - job = GearJobs.DisconnectJob(node.name, node.msg) - self.gm_client.submitJob(job) - submitted_pings.append(job) - - self._wait(submitted_pings) - + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_pings = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=self.poll_retry + ) for ping in submitted_pings: - if ping.disconnect: + if ping.state == JOB_UNKNOWN: # TODO: Gearman server failed, ignoring for now LOG.error('Gearman Job server fail') continue - if not ping.complete: + if ping.timed_out: # Ping timeout - failed_list.append(ping.name) + failed_list.append(ping.job.task) continue - if ping.msg['hpcs_response'] == 'FAIL': + if ping.result['hpcs_response'] == 'FAIL': if ( - 'status' in ping.msg and - ping.msg['status'] == 'DELETED' + 'status' in ping.result and + ping.result['status'] == 'DELETED' ): continue # Error returned by Gearman - failed_list.append(ping.name) + failed_list.append(ping.job.task) continue else: if 'nodes' in ping.result: - node_status[ping.name] = ping.msg['nodes'] + node_status[ping.job.task] = ping.result['nodes'] return failed_list, node_status def offline_check(self, node_list): + list_of_jobs = [] failed_list = [] - submitted_pings = [] job_data = {"hpcs_action": "DIAGNOSTICS"} for node in node_list: - job = GearJobs.DisconnectJob(str(node), job_data) - self.gm_client.submitJob(job) - submitted_pings.append(job) - - self._wait(submitted_pings) - + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_pings = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=self.poll_timeout + ) for ping in submitted_pings: - if ping.disconnect: + if ping.state == JOB_UNKNOWN: LOG.error( "Gearman Job server failed during OFFLINE check of {0}". format(ping.job.task) ) - elif not ping.complete: - failed_list.append(ping.name) - elif ping.msg['network'] == 'FAIL': - failed_list.append(ping.name) + elif ping.timed_out: + failed_list.append(ping.job.task) + elif ping.result['network'] == 'FAIL': + failed_list.append(ping.job.task) else: gearman_count = 0 gearman_fail = 0 - for gearman_test in ping.msg['gearman']: + for gearman_test in ping.result['gearman']: gearman_count += 1 if gearman_test['status'] == 'FAIL': gearman_fail += 1 # Need 2/3rds gearman up max_fail_count = gearman_count / 3 if gearman_fail > max_fail_count: - failed_list.append(ping.name) + failed_list.append(ping.job.task) return failed_list def get_discover(self, name): # Used in the v2 devices controller job_data = {"hpcs_action": "DISCOVER"} - job = GearJobs.DisconnectJob(str(name), job_data) - self.gm_client.submitJob(job, gear.PRECEDENCE_HIGH) - - poll_count = 0 - while not job.complete and not job.disconnect \ - and poll_count < self.poll_retry: - # wait for jobs TODO make sure right unit/value - time.sleep(self.poll_timeout) - poll_count += 1 - - if not job.complete: + job = self.gm_client.submit_job( + str(name), job_data, background=False, wait_until_complete=True, + poll_timeout=10 + ) + if job.state == JOB_UNKNOWN: + # Gearman server failed return None - - if job.result['hpcs_response'] == 'FAIL': + elif job.timed_out: + # Time out is a fail + return None + elif job.result['hpcs_response'] == 'FAIL': # Fail response is a fail return None - return job.result def get_stats(self, node_list): # TODO: lots of duplicated code that needs cleanup + list_of_jobs = [] failed_list = [] retry_list = [] - submitted_stats = [] results = {} job_data = {"hpcs_action": "METRICS"} for node in node_list: - job = GearJobs.DisconnectJob(str(node), job_data) - self.gm_client.submitJob(job) - submitted_stats.append(job) - - self._wait(submitted_stats) - + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_stats = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=self.poll_timeout + ) for stats in submitted_stats: - if stats.disconnect: + if stats.state == JOB_UNKNOWN: # TODO: Gearman server failed, ignoring for now - retry_list.append(stats) - elif not stats.complete: + retry_list.append(stats.job.task) + elif stats.timed_out: # Timeout - retry_list.append(stats) - elif stats.msg['hpcs_response'] == 'FAIL': + retry_list.append(stats.job.task) + elif stats.result['hpcs_response'] == 'FAIL': # Error returned by Gearman - failed_list.append(stats.name) + failed_list.append(stats.job.task) else: # Success - results[stats.name] = stats.msg + results[stats.job.task] = stats.result - submitted_stats = [] + list_of_jobs = [] if len(retry_list) > 0: LOG.info( "{0} Statistics gathering timed out, retrying". format(len(retry_list)) ) for node in retry_list: - job = GearJobs.DisconnectJob(node.name, node.msg) - self.gm_client.submitJob(job) - submitted_stats.append(job) - - self._wait(submitted_stats) - + list_of_jobs.append(dict(task=str(node), data=job_data)) + submitted_stats = self.gm_client.submit_multiple_jobs( + list_of_jobs, background=False, wait_until_complete=True, + poll_timeout=self.poll_retry + ) for stats in submitted_stats: - if stats.disconnect: + if stats.state == JOB_UNKNOWN: # TODO: Gearman server failed, ignoring for now LOG.error( "Gearman Job server failed gathering statistics " "on {0}".format(stats.job.task) ) - failed_list.append(stats.name) - elif not stats.complete: + failed_list.append(stats.job.task) + elif stats.timed_out: # Timeout - failed_list.append(stats.name) - elif stats.msg['hpcs_response'] == 'FAIL': + failed_list.append(stats.job.task) + elif stats.result['hpcs_response'] == 'FAIL': # Error returned by Gearman - failed_list.append(stats.name) + failed_list.append(stats.job.task) else: # Success - results[stats.name] = stats.msg + results[stats.job.task] = stats.result return failed_list, results diff --git a/libra/admin_api/stats/stats_sched.py b/libra/admin_api/stats/stats_sched.py index edc894c5..6d8594fe 100644 --- a/libra/admin_api/stats/stats_sched.py +++ b/libra/admin_api/stats/stats_sched.py @@ -37,7 +37,6 @@ class UsageStats(object): self.server_id = cfg.CONF['admin_api']['server_id'] self.number_of_servers = cfg.CONF['admin_api']['number_of_servers'] self.stats_freq = cfg.CONF['admin_api'].stats_freq - self.gearman = GearJobs() self.start_stats_sched() @@ -102,7 +101,8 @@ class UsageStats(object): for device in devices: node_list.append(device.name) - failed_list, results = self.gearman.get_stats(node_list) + gearman = GearJobs() + failed_list, results = gearman.get_stats(node_list) failed = len(failed_list) if failed > 0: diff --git a/libra/common/api/gearman_client.py b/libra/common/api/gearman_client.py index ac50cb8f..0ea8edc5 100644 --- a/libra/common/api/gearman_client.py +++ b/libra/common/api/gearman_client.py @@ -13,22 +13,19 @@ # under the License. import eventlet -import gear -import json - eventlet.monkey_patch() import ipaddress +from libra.common.json_gearman import JSONGearmanClient from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip from libra.common.api.lbaas import HealthMonitor, Counters from libra.common.api.lbaas import loadbalancers_devices from libra.common.api.mnb import update_mnb from libra.openstack.common import log from pecan import conf -from time import sleep + LOG = log.getLogger(__name__) -POLL_COUNT = 10 -POLL_SLEEP = 10 + gearman_workers = [ 'UPDATE', # Create/Update a Load Balancer. @@ -42,17 +39,6 @@ gearman_workers = [ ] -class DisconnectClient(gear.Client): - def handleDisconnect(self, job): - job.disconnect = True - - -class DisconnectJob(gear.Job): - def __init__(self, name, arguments): - super(DisconnectJob, self).__init__(name, arguments) - self.disconnect = False - - def submit_job(job_type, host, data, lbid): eventlet.spawn_n(client_job, job_type, str(host), data, lbid) @@ -136,15 +122,19 @@ class GearmanClientThread(object): self.host = host self.lbid = lbid - self.gear_client = DisconnectClient() - + server_list = [] for server in conf.gearman.server: ghost, gport = server.split(':') - self.gear_client.addServer(ghost, - int(gport), - conf.gearman.ssl_key, - conf.gearman.ssl_cert, - conf.gearman.ssl_ca) + server_list.append({'host': ghost, + 'port': int(gport), + 'keyfile': conf.gearman.ssl_key, + 'certfile': conf.gearman.ssl_cert, + 'ca_certs': conf.gearman.ssl_ca, + 'keepalive': conf.gearman.keepalive, + 'keepcnt': conf.gearman.keepcnt, + 'keepidle': conf.gearman.keepidle, + 'keepintvl': conf.gearman.keepintvl}) + self.gearman_client = JSONGearmanClient(server_list) def send_assign(self, data): NULL = None # For pep8 @@ -532,40 +522,28 @@ class GearmanClientThread(object): mnb_data["tenantid"]) def _send_message(self, message, response_name): - - self.gear_client.waitForServer() - - job = DisconnectJob(self.host, json.dumps(message)) - - self.gear_client.submitJob(job) - - pollcount = 0 - # Would like to make these config file settings - while not job.complete and pollcount < POLL_COUNT: - sleep(POLL_SLEEP) - pollcount += 1 - - if job.disconnect: - LOG.error('Gearman Job server fail - disconnect') - return False, "Gearman Job server fail - "\ - "disconnect communicating with load balancer" - - # We timed out waiting for the job to finish - if not job.complete: - LOG.warning('Gearman timeout talking to {0}'.format(self.host)) + job_status = self.gearman_client.submit_job( + self.host, message, background=False, wait_until_complete=True, + max_retries=10, poll_timeout=120.0 + ) + if job_status.state == 'UNKNOWN': + # Gearman server connection failed + LOG.error('Could not talk to gearman server') + return False, "System error communicating with load balancer" + if job_status.timed_out: + # Job timed out + LOG.warning( + 'Gearman timeout talking to {0}'.format(self.host) + ) return False, "Timeout error communicating with load balancer" - - result = json.loads(job.data[0]) - - LOG.debug(result) - - if 'badRequest' in result: - error = result['badRequest']['validationErrors'] + LOG.debug(job_status.result) + if 'badRequest' in job_status.result: + error = job_status.result['badRequest']['validationErrors'] return False, error['message'] - if result[response_name] == 'FAIL': + if job_status.result[response_name] == 'FAIL': # Worker says 'no' - if 'hpcs_error' in result: - error = result['hpcs_error'] + if 'hpcs_error' in job_status.result: + error = job_status.result['hpcs_error'] else: error = 'Load Balancer error' LOG.error( @@ -573,4 +551,4 @@ class GearmanClientThread(object): ) return False, error LOG.info('Gearman success from {0}'.format(self.host)) - return True, result \ No newline at end of file + return True, job_status.result diff --git a/libra/common/json_gearman.py b/libra/common/json_gearman.py index bc66c328..503a8835 100644 --- a/libra/common/json_gearman.py +++ b/libra/common/json_gearman.py @@ -13,14 +13,28 @@ # under the License. import json -import gear +from gearman import GearmanClient, GearmanWorker, DataEncoder -# Here is the good stuff -class JsonJob(gear.Job): - def __init__(self, name, msg, unique=None): - super(JsonJob, self).__init__(name, json.dumps(msg), unique) - @property - def msg(self): - if self.data: - return json.loads(self.data[0]) +class JSONDataEncoder(DataEncoder): + """ Class to transform data that the worker either receives or sends. """ + + @classmethod + def encode(cls, encodable_object): + """ Encode JSON object as string """ + return json.dumps(encodable_object) + + @classmethod + def decode(cls, decodable_string): + """ Decode string to JSON object """ + return json.loads(decodable_string) + + +class JSONGearmanWorker(GearmanWorker): + """ Overload the Gearman worker class so we can set the data encoder. """ + data_encoder = JSONDataEncoder + + +class JSONGearmanClient(GearmanClient): + """ Overload the Gearman client class so we can set the data encoder. """ + data_encoder = JSONDataEncoder diff --git a/libra/mgm/controllers/build.py b/libra/mgm/controllers/build.py index a0e3360f..7df2c85d 100644 --- a/libra/mgm/controllers/build.py +++ b/libra/mgm/controllers/build.py @@ -12,32 +12,20 @@ # License for the specific language governing permissions and limitations # under the License. -import gear -import json - from time import sleep from novaclient import exceptions from oslo.config import cfg +from gearman.constants import JOB_UNKNOWN from libra.openstack.common import log +from libra.common.json_gearman import JSONGearmanClient from libra.mgm.nova import Node, BuildError, NotFound -POLL_COUNT = 10 LOG = log.getLogger(__name__) -class DisconnectClient(gear.Client): - def handleDisconnect(self, job): - job.disconnect = True - - -class DisconnectJob(gear.Job): - def __init__(self, name, arguments): - super(DisconnectJob, self).__init__(name, arguments) - self.disconnect = False - - class BuildController(object): + RESPONSE_FIELD = 'response' RESPONSE_SUCCESS = 'PASS' RESPONSE_FAILURE = 'FAIL' @@ -108,7 +96,7 @@ class BuildController(object): ) self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE return self.msg - if resp.status_code not in (200, 203): + if resp.status_code not in(200, 203): LOG.error( 'Error geting status from Nova, error {0}' .format(resp.status_code) @@ -141,56 +129,44 @@ class BuildController(object): def _test_node(self, name): """ Run diags on node, blow it away if bad """ - - client = DisconnectClient() - + server_list = [] for server in cfg.CONF['gearman']['servers']: host, port = server.split(':') - client.addServer(host, - int(port), - cfg.CONF['gearman']['ssl_key'], - cfg.CONF['gearman']['ssl_cert'], - cfg.CONF['gearman']['ssl_ca']) - - client.waitForServer() + server_list.append({'host': host, + 'port': int(port), + 'keyfile': cfg.CONF['gearman']['ssl_key'], + 'certfile': cfg.CONF['gearman']['ssl_cert'], + 'ca_certs': cfg.CONF['gearman']['ssl_ca'], + 'keepalive': cfg.CONF['gearman']['keepalive'], + 'keepcnt': cfg.CONF['gearman']['keepcnt'], + 'keepidle': cfg.CONF['gearman']['keepidle'], + 'keepintvl': cfg.CONF['gearman']['keepintvl']}) + gm_client = JSONGearmanClient(server_list) job_data = {'hpcs_action': 'DIAGNOSTICS'} - - job = DisconnectJob(str(name), json.dumps(job_data)) - - client.submitJob(job) - - pollcount = 0 - pollsleepinterval = cfg.CONF['mgm']['build_diag_timeout'] / POLL_COUNT - while not job.complete\ - and pollcount < POLL_COUNT\ - and not job.disconnect: - sleep(pollsleepinterval) - pollcount += 1 - - if job.disconnect: - LOG.error('Gearman Job server fail - disconnect') + job_status = gm_client.submit_job( + str(name), job_data, background=False, wait_until_complete=True, + max_retries=10, poll_timeout=10 + ) + if job_status.state == JOB_UNKNOWN: + # Gearman server connect fail, count as bad node because we can't + # tell if it really is working + LOG.error('Could not talk to gearman server') return False - - # We timed out waiting for the job to finish - if not job.complete: + if job_status.timed_out: LOG.warning('Timeout getting diags from {0}'.format(name)) return False - - result = json.loads(job.data[0]) - - LOG.debug(result) - + LOG.debug(job_status.result) # Would only happen if DIAGNOSTICS call not supported - if result['hpcs_response'] == 'FAIL': + if job_status.result['hpcs_response'] == 'FAIL': return True - if result['network'] == 'FAIL': + if job_status.result['network'] == 'FAIL': return False gearman_count = 0 gearman_fail = 0 - for gearman_test in result['gearman']: + for gearman_test in job_status.result['gearman']: gearman_count += 1 if gearman_test['status'] == 'FAIL': LOG.info( diff --git a/libra/mgm/gearman_worker.py b/libra/mgm/gearman_worker.py index 0bd1865b..d8d53e65 100644 --- a/libra/mgm/gearman_worker.py +++ b/libra/mgm/gearman_worker.py @@ -12,11 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. -import gear +import gearman.errors import json import socket +import time from oslo.config import cfg + +from libra.common.json_gearman import JSONGearmanWorker from libra.mgm.controllers.root import PoolMgmController from libra.openstack.common import log @@ -24,41 +27,49 @@ from libra.openstack.common import log LOG = log.getLogger(__name__) -def handler(job): - LOG.debug("Received JSON message: {0}".format(json.dumps(job.arguments))) - controller = PoolMgmController(json.loads(job.arguments)) +def handler(worker, job): + LOG.debug("Received JSON message: {0}".format(json.dumps(job.data))) + controller = PoolMgmController(job.data) response = controller.run() LOG.debug("Return JSON message: {0}".format(json.dumps(response))) - job.sendWorkComplete(json.dumps(response)) + return response def worker_thread(): LOG.info("Registering task libra_pool_mgm") hostname = socket.gethostname() - worker = gear.Worker(hostname) - + server_list = [] for host_port in cfg.CONF['gearman']['servers']: host, port = host_port.split(':') - worker.addServer(host, - int(port), - cfg.CONF['gearman']['ssl_key'], - cfg.CONF['gearman']['ssl_cert'], - cfg.CONF['gearman']['ssl_ca']) - worker.registerFunction('libra_pool_mgm') + server_list.append({'host': host, + 'port': int(port), + 'keyfile': cfg.CONF['gearman']['ssl_key'], + 'certfile': cfg.CONF['gearman']['ssl_cert'], + 'ca_certs': cfg.CONF['gearman']['ssl_ca'], + 'keepalive': cfg.CONF['gearman']['keepalive'], + 'keepcnt': cfg.CONF['gearman']['keepcnt'], + 'keepidle': cfg.CONF['gearman']['keepidle'], + 'keepintvl': cfg.CONF['gearman']['keepintvl']}) + worker = JSONGearmanWorker(server_list) + + worker.set_client_id(hostname) + worker.register_task('libra_pool_mgm', handler) worker.logger = LOG retry = True - while retry: + while (retry): try: - job = worker.getJob() - handler(job) + worker.work(cfg.CONF['gearman']['poll']) except KeyboardInterrupt: retry = False - except Exception as e: - LOG.exception("Exception in pool manager worker: %s, %s" - % (e.__class__, e)) + except gearman.errors.ServerUnavailable: + LOG.error("Job server(s) went away. Reconnecting.") + time.sleep(cfg.CONF['gearman']['reconnect_sleep']) + retry = True + except Exception: + LOG.exception("Exception in worker") retry = False LOG.debug("Pool manager process terminated.")