Make db queries asynchronous in zuul-web
Previously we blocked on db queries because we didn't await on them. We address this by wrapping the db query in an executor which runs on a threadpool in the asyncio event loop. This allows us to await the the task and context switch to handle other connections asyncronously. Change-Id: I8db001c160e1e11cb7f5a579a2f575652fd9c454
This commit is contained in:
parent
b23da05019
commit
926843ce7e
|
@ -12,6 +12,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from aiohttp import web
|
||||
|
@ -167,6 +168,9 @@ class SqlWebHandler(BaseTenantWebHandler):
|
|||
super(SqlWebHandler, self).__init__(
|
||||
connection=connection, zuul_web=zuul_web, method=method, path=path)
|
||||
|
||||
def setEventLoop(self, event_loop):
|
||||
self.event_loop = event_loop
|
||||
|
||||
def query(self, args):
|
||||
build = self.connection.zuul_build_table
|
||||
buildset = self.connection.zuul_buildset_table
|
||||
|
@ -202,7 +206,14 @@ class SqlWebHandler(BaseTenantWebHandler):
|
|||
builds = []
|
||||
with self.connection.engine.begin() as conn:
|
||||
query = self.query(args)
|
||||
for row in conn.execute(query):
|
||||
query_task = self.event_loop.run_in_executor(
|
||||
None,
|
||||
conn.execute,
|
||||
query
|
||||
)
|
||||
rows = await asyncio.wait_for(query_task, 30)
|
||||
|
||||
for row in rows:
|
||||
build = dict(row)
|
||||
# Convert date to iso format
|
||||
if row.start_time:
|
||||
|
|
|
@ -264,10 +264,12 @@ class ZuulWeb(object):
|
|||
self.log_streaming_handler = LogStreamingHandler(self.rpc)
|
||||
self.gearman_handler = GearmanHandler(self.rpc)
|
||||
self._plugin_routes = [] # type: List[zuul.web.handler.BaseWebHandler]
|
||||
self._connection_handlers = []
|
||||
connections = connections or []
|
||||
for connection in connections:
|
||||
self._plugin_routes.extend(
|
||||
self._connection_handlers.extend(
|
||||
connection.getWebHandlers(self, self.info))
|
||||
self._plugin_routes.extend(self._connection_handlers)
|
||||
|
||||
async def _handleWebsocket(self, request):
|
||||
return await self.log_streaming_handler.processRequest(
|
||||
|
@ -364,6 +366,9 @@ class ZuulWeb(object):
|
|||
|
||||
self.event_loop = loop
|
||||
self.log_streaming_handler.setEventLoop(loop)
|
||||
for handler in self._connection_handlers:
|
||||
if hasattr(handler, 'setEventLoop'):
|
||||
handler.setEventLoop(loop)
|
||||
|
||||
app = web.Application()
|
||||
for method, path, handler in routes:
|
||||
|
|
Loading…
Reference in New Issue