api server now uses gear library; remove yelp's gearman; (force re-jenkins)
Change-Id: Iaa81fc34161992a98a9449b8c58e80484d048d16
This commit is contained in:
@@ -13,19 +13,22 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
import gear
|
||||||
|
import json
|
||||||
|
|
||||||
eventlet.monkey_patch()
|
eventlet.monkey_patch()
|
||||||
import ipaddress
|
import ipaddress
|
||||||
from libra.common.json_gearman import JSONGearmanClient
|
|
||||||
from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip
|
from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip
|
||||||
from libra.common.api.lbaas import HealthMonitor, Counters
|
from libra.common.api.lbaas import HealthMonitor, Counters
|
||||||
from libra.common.api.lbaas import loadbalancers_devices
|
from libra.common.api.lbaas import loadbalancers_devices
|
||||||
from libra.common.api.mnb import update_mnb
|
from libra.common.api.mnb import update_mnb
|
||||||
from libra.openstack.common import log
|
from libra.openstack.common import log
|
||||||
from pecan import conf
|
from pecan import conf
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
POLL_COUNT = 10
|
||||||
|
POLL_SLEEP = 10
|
||||||
|
|
||||||
gearman_workers = [
|
gearman_workers = [
|
||||||
'UPDATE', # Create/Update a Load Balancer.
|
'UPDATE', # Create/Update a Load Balancer.
|
||||||
@@ -39,6 +42,17 @@ gearman_workers = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class DisconnectClient(gear.Client):
|
||||||
|
def handleDisconnect(self, job):
|
||||||
|
job.disconnect = True
|
||||||
|
|
||||||
|
|
||||||
|
class DisconnectJob(gear.Job):
|
||||||
|
def __init__(self, name, arguments):
|
||||||
|
super(DisconnectJob, self).__init__(name, arguments)
|
||||||
|
self.disconnect = False
|
||||||
|
|
||||||
|
|
||||||
def submit_job(job_type, host, data, lbid):
|
def submit_job(job_type, host, data, lbid):
|
||||||
eventlet.spawn_n(client_job, job_type, str(host), data, lbid)
|
eventlet.spawn_n(client_job, job_type, str(host), data, lbid)
|
||||||
|
|
||||||
@@ -122,19 +136,15 @@ class GearmanClientThread(object):
|
|||||||
self.host = host
|
self.host = host
|
||||||
self.lbid = lbid
|
self.lbid = lbid
|
||||||
|
|
||||||
server_list = []
|
self.gear_client = DisconnectClient()
|
||||||
|
|
||||||
for server in conf.gearman.server:
|
for server in conf.gearman.server:
|
||||||
ghost, gport = server.split(':')
|
ghost, gport = server.split(':')
|
||||||
server_list.append({'host': ghost,
|
self.gear_client.addServer(ghost,
|
||||||
'port': int(gport),
|
int(gport),
|
||||||
'keyfile': conf.gearman.ssl_key,
|
conf.gearman.ssl_key,
|
||||||
'certfile': conf.gearman.ssl_cert,
|
conf.gearman.ssl_cert,
|
||||||
'ca_certs': conf.gearman.ssl_ca,
|
conf.gearman.ssl_ca)
|
||||||
'keepalive': conf.gearman.keepalive,
|
|
||||||
'keepcnt': conf.gearman.keepcnt,
|
|
||||||
'keepidle': conf.gearman.keepidle,
|
|
||||||
'keepintvl': conf.gearman.keepintvl})
|
|
||||||
self.gearman_client = JSONGearmanClient(server_list)
|
|
||||||
|
|
||||||
def send_assign(self, data):
|
def send_assign(self, data):
|
||||||
NULL = None # For pep8
|
NULL = None # For pep8
|
||||||
@@ -522,28 +532,40 @@ class GearmanClientThread(object):
|
|||||||
mnb_data["tenantid"])
|
mnb_data["tenantid"])
|
||||||
|
|
||||||
def _send_message(self, message, response_name):
|
def _send_message(self, message, response_name):
|
||||||
job_status = self.gearman_client.submit_job(
|
|
||||||
self.host, message, background=False, wait_until_complete=True,
|
self.gear_client.waitForServer()
|
||||||
max_retries=10, poll_timeout=120.0
|
|
||||||
)
|
job = DisconnectJob(self.host, json.dumps(message))
|
||||||
if job_status.state == 'UNKNOWN':
|
|
||||||
# Gearman server connection failed
|
self.gear_client.submitJob(job)
|
||||||
LOG.error('Could not talk to gearman server')
|
|
||||||
return False, "System error communicating with load balancer"
|
pollcount = 0
|
||||||
if job_status.timed_out:
|
# Would like to make these config file settings
|
||||||
# Job timed out
|
while not job.complete and pollcount < POLL_COUNT:
|
||||||
LOG.warning(
|
sleep(POLL_SLEEP)
|
||||||
'Gearman timeout talking to {0}'.format(self.host)
|
pollcount += 1
|
||||||
)
|
|
||||||
|
if job.disconnect:
|
||||||
|
LOG.error('Gearman Job server fail - disconnect')
|
||||||
|
return False, "Gearman Job server fail - "\
|
||||||
|
"disconnect communicating with load balancer"
|
||||||
|
|
||||||
|
# We timed out waiting for the job to finish
|
||||||
|
if not job.complete:
|
||||||
|
LOG.warning('Gearman timeout talking to {0}'.format(self.host))
|
||||||
return False, "Timeout error communicating with load balancer"
|
return False, "Timeout error communicating with load balancer"
|
||||||
LOG.debug(job_status.result)
|
|
||||||
if 'badRequest' in job_status.result:
|
result = json.loads(job.data[0])
|
||||||
error = job_status.result['badRequest']['validationErrors']
|
|
||||||
|
LOG.debug(result)
|
||||||
|
|
||||||
|
if 'badRequest' in result:
|
||||||
|
error = result['badRequest']['validationErrors']
|
||||||
return False, error['message']
|
return False, error['message']
|
||||||
if job_status.result[response_name] == 'FAIL':
|
if result[response_name] == 'FAIL':
|
||||||
# Worker says 'no'
|
# Worker says 'no'
|
||||||
if 'hpcs_error' in job_status.result:
|
if 'hpcs_error' in result:
|
||||||
error = job_status.result['hpcs_error']
|
error = result['hpcs_error']
|
||||||
else:
|
else:
|
||||||
error = 'Load Balancer error'
|
error = 'Load Balancer error'
|
||||||
LOG.error(
|
LOG.error(
|
||||||
@@ -551,4 +573,4 @@ class GearmanClientThread(object):
|
|||||||
)
|
)
|
||||||
return False, error
|
return False, error
|
||||||
LOG.info('Gearman success from {0}'.format(self.host))
|
LOG.info('Gearman success from {0}'.format(self.host))
|
||||||
return True, job_status.result
|
return True, result
|
||||||
|
@@ -12,36 +12,10 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from gearman import GearmanClient, GearmanWorker, DataEncoder
|
|
||||||
import json
|
import json
|
||||||
import gear
|
import gear
|
||||||
|
|
||||||
|
|
||||||
class JSONDataEncoder(DataEncoder):
|
|
||||||
""" Class to transform data that the worker either receives or sends. """
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def encode(cls, encodable_object):
|
|
||||||
""" Encode JSON object as string """
|
|
||||||
return json.dumps(encodable_object)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def decode(cls, decodable_string):
|
|
||||||
""" Decode string to JSON object """
|
|
||||||
return json.loads(decodable_string)
|
|
||||||
|
|
||||||
|
|
||||||
class JSONGearmanWorker(GearmanWorker):
|
|
||||||
""" Overload the Gearman worker class so we can set the data encoder. """
|
|
||||||
data_encoder = JSONDataEncoder
|
|
||||||
|
|
||||||
|
|
||||||
class JSONGearmanClient(GearmanClient):
|
|
||||||
""" Overload the Gearman client class so we can set the data encoder. """
|
|
||||||
data_encoder = JSONDataEncoder
|
|
||||||
|
|
||||||
|
|
||||||
# Here is the good stuff
|
|
||||||
class JsonJob(gear.Job):
|
class JsonJob(gear.Job):
|
||||||
def __init__(self, name, msg, unique=None):
|
def __init__(self, name, msg, unique=None):
|
||||||
super(JsonJob, self).__init__(name, json.dumps(msg), unique)
|
super(JsonJob, self).__init__(name, json.dumps(msg), unique)
|
||||||
|
@@ -1,10 +1,8 @@
|
|||||||
pbr>=0.5.21,<1.0
|
pbr>=0.5.21,<1.0
|
||||||
|
|
||||||
Babel>=0.9.6
|
Babel>=0.9.6
|
||||||
eventlet
|
eventlet
|
||||||
# put back once it's patched
|
# put back once it's patched
|
||||||
# gear
|
# gear
|
||||||
gearman>=2.0.2
|
|
||||||
oslo.config>=1.2.0
|
oslo.config>=1.2.0
|
||||||
python-daemon>=1.6
|
python-daemon>=1.6
|
||||||
python_novaclient>=2.14.1,<2.14.2
|
python_novaclient>=2.14.1,<2.14.2
|
||||||
|
Reference in New Issue
Block a user