Make gearman calls async in ZuulWeb

These need to be scheduled on the event loop, or else they'll block.

We choose a 300s gearman timeout to match the change in:
I12741bb259c1a78fa2446d764318f84df34bac67

Change-Id: I2785d945c8032f73bfdc240cf09954b5ed9a3978
This commit is contained in:
David Shrewsbury
2018-04-10 09:33:55 -04:00
parent d4e2d97986
commit 9d84647dee

View File

@@ -170,15 +170,35 @@ class GearmanHandler(object):
'key_get': self.key_get,
}
def setEventLoop(self, event_loop):
self.event_loop = event_loop
# TODO: At some point, we should make this use a gear.Client, rather than
# the RPC client, so we can use that to make async Gearman calls. This
# implementation will create additional threads by putting the call onto
# the asycio ThreadPool, which is not ideal.
async def asyncSubmitJob(self, name, data):
'''
Submit a job to Gearman asynchronously.
This will raise a asyncio.TimeoutError if we hit the timeout. It is
up to the caller to handle the exception.
'''
gear_task = self.event_loop.run_in_executor(
None, self.rpc.submitJob, name, data)
job = await asyncio.wait_for(gear_task, 300)
return job
async def tenant_list(self, request, result_filter=None):
job = self.rpc.submitJob('zuul:tenant_list', {})
job = await self.asyncSubmitJob('zuul:tenant_list', {})
return web.json_response(json.loads(job.data[0]))
async def status_get(self, request, result_filter=None):
tenant = request.match_info["tenant"]
if tenant not in self.cache or \
(time.time() - self.cache_time[tenant]) > self.cache_expiry:
job = self.rpc.submitJob('zuul:status_get', {'tenant': tenant})
job = await self.asyncSubmitJob('zuul:status_get',
{'tenant': tenant})
self.cache[tenant] = json.loads(job.data[0])
self.cache_time[tenant] = time.time()
payload = self.cache[tenant]
@@ -194,14 +214,14 @@ class GearmanHandler(object):
async def job_list(self, request, result_filter=None):
tenant = request.match_info["tenant"]
job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant})
job = await self.asyncSubmitJob('zuul:job_list', {'tenant': tenant})
return web.json_response(json.loads(job.data[0]))
async def key_get(self, request, result_filter=None):
tenant = request.match_info["tenant"]
project = request.match_info["project"]
job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant,
'project': project})
job = await self.asyncSubmitJob('zuul:key_get', {'tenant': tenant,
'project': project})
return web.Response(body=job.data[0])
async def processRequest(self, request, action, result_filter=None):
@@ -364,6 +384,8 @@ class ZuulWeb(object):
self.event_loop = loop
self.log_streaming_handler.setEventLoop(loop)
self.gearman_handler.setEventLoop(loop)
for handler in self._connection_handlers:
if hasattr(handler, 'setEventLoop'):
handler.setEventLoop(loop)