From d34e7c18958e86812d89a6f69f42f001c11c7c1b Mon Sep 17 00:00:00 2001 From: German Eichberger Date: Wed, 4 Jun 2014 14:52:37 -0700 Subject: [PATCH] api server now uses gear library; remove yelp's gearman; (force re-jenkins) Change-Id: Iaa81fc34161992a98a9449b8c58e80484d048d16 --- libra/common/api/gearman_client.py | 90 +++++++++++++++++++----------- libra/common/json_gearman.py | 26 --------- requirements.txt | 2 - 3 files changed, 56 insertions(+), 62 deletions(-) diff --git a/libra/common/api/gearman_client.py b/libra/common/api/gearman_client.py index 0ea8edc5..cad1217c 100644 --- a/libra/common/api/gearman_client.py +++ b/libra/common/api/gearman_client.py @@ -13,19 +13,22 @@ # under the License. import eventlet +import gear +import json + eventlet.monkey_patch() import ipaddress -from libra.common.json_gearman import JSONGearmanClient from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip from libra.common.api.lbaas import HealthMonitor, Counters from libra.common.api.lbaas import loadbalancers_devices from libra.common.api.mnb import update_mnb from libra.openstack.common import log from pecan import conf - +from time import sleep LOG = log.getLogger(__name__) - +POLL_COUNT = 10 +POLL_SLEEP = 10 gearman_workers = [ 'UPDATE', # Create/Update a Load Balancer. @@ -39,6 +42,17 @@ gearman_workers = [ ] +class DisconnectClient(gear.Client): + def handleDisconnect(self, job): + job.disconnect = True + + +class DisconnectJob(gear.Job): + def __init__(self, name, arguments): + super(DisconnectJob, self).__init__(name, arguments) + self.disconnect = False + + def submit_job(job_type, host, data, lbid): eventlet.spawn_n(client_job, job_type, str(host), data, lbid) @@ -122,19 +136,15 @@ class GearmanClientThread(object): self.host = host self.lbid = lbid - server_list = [] + self.gear_client = DisconnectClient() + for server in conf.gearman.server: ghost, gport = server.split(':') - server_list.append({'host': ghost, - 'port': int(gport), - 'keyfile': conf.gearman.ssl_key, - 'certfile': conf.gearman.ssl_cert, - 'ca_certs': conf.gearman.ssl_ca, - 'keepalive': conf.gearman.keepalive, - 'keepcnt': conf.gearman.keepcnt, - 'keepidle': conf.gearman.keepidle, - 'keepintvl': conf.gearman.keepintvl}) - self.gearman_client = JSONGearmanClient(server_list) + self.gear_client.addServer(ghost, + int(gport), + conf.gearman.ssl_key, + conf.gearman.ssl_cert, + conf.gearman.ssl_ca) def send_assign(self, data): NULL = None # For pep8 @@ -522,28 +532,40 @@ class GearmanClientThread(object): mnb_data["tenantid"]) def _send_message(self, message, response_name): - job_status = self.gearman_client.submit_job( - self.host, message, background=False, wait_until_complete=True, - max_retries=10, poll_timeout=120.0 - ) - if job_status.state == 'UNKNOWN': - # Gearman server connection failed - LOG.error('Could not talk to gearman server') - return False, "System error communicating with load balancer" - if job_status.timed_out: - # Job timed out - LOG.warning( - 'Gearman timeout talking to {0}'.format(self.host) - ) + + self.gear_client.waitForServer() + + job = DisconnectJob(self.host, json.dumps(message)) + + self.gear_client.submitJob(job) + + pollcount = 0 + # Would like to make these config file settings + while not job.complete and pollcount < POLL_COUNT: + sleep(POLL_SLEEP) + pollcount += 1 + + if job.disconnect: + LOG.error('Gearman Job server fail - disconnect') + return False, "Gearman Job server fail - "\ + "disconnect communicating with load balancer" + + # We timed out waiting for the job to finish + if not job.complete: + LOG.warning('Gearman timeout talking to {0}'.format(self.host)) return False, "Timeout error communicating with load balancer" - LOG.debug(job_status.result) - if 'badRequest' in job_status.result: - error = job_status.result['badRequest']['validationErrors'] + + result = json.loads(job.data[0]) + + LOG.debug(result) + + if 'badRequest' in result: + error = result['badRequest']['validationErrors'] return False, error['message'] - if job_status.result[response_name] == 'FAIL': + if result[response_name] == 'FAIL': # Worker says 'no' - if 'hpcs_error' in job_status.result: - error = job_status.result['hpcs_error'] + if 'hpcs_error' in result: + error = result['hpcs_error'] else: error = 'Load Balancer error' LOG.error( @@ -551,4 +573,4 @@ class GearmanClientThread(object): ) return False, error LOG.info('Gearman success from {0}'.format(self.host)) - return True, job_status.result + return True, result diff --git a/libra/common/json_gearman.py b/libra/common/json_gearman.py index c11278d5..e30ab703 100644 --- a/libra/common/json_gearman.py +++ b/libra/common/json_gearman.py @@ -12,36 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. -from gearman import GearmanClient, GearmanWorker, DataEncoder import json import gear -class JSONDataEncoder(DataEncoder): - """ Class to transform data that the worker either receives or sends. """ - - @classmethod - def encode(cls, encodable_object): - """ Encode JSON object as string """ - return json.dumps(encodable_object) - - @classmethod - def decode(cls, decodable_string): - """ Decode string to JSON object """ - return json.loads(decodable_string) - - -class JSONGearmanWorker(GearmanWorker): - """ Overload the Gearman worker class so we can set the data encoder. """ - data_encoder = JSONDataEncoder - - -class JSONGearmanClient(GearmanClient): - """ Overload the Gearman client class so we can set the data encoder. """ - data_encoder = JSONDataEncoder - - -# Here is the good stuff class JsonJob(gear.Job): def __init__(self, name, msg, unique=None): super(JsonJob, self).__init__(name, json.dumps(msg), unique) diff --git a/requirements.txt b/requirements.txt index 9277b16b..e38b0fa6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,8 @@ pbr>=0.5.21,<1.0 - Babel>=0.9.6 eventlet # put back once it's patched # gear -gearman>=2.0.2 oslo.config>=1.2.0 python-daemon>=1.6 python_novaclient>=2.14.1,<2.14.2