Allow using webapp from connections

Allow connections to register their own handlers for HTTP URIs inside
the zuul's webapp HTTP server. That way, connections can listen for
events comming through HTTP.

Story: 2000774

Change-Id: Ic5887d00ff302f67469df5154e9df10b99f1cfcd
This commit is contained in:
Jan Hruban 2015-08-21 14:00:54 +02:00 committed by Jesse Keating
parent 2a531699a9
commit 7083edd565
7 changed files with 95 additions and 25 deletions

View File

@ -1367,6 +1367,9 @@ class ZuulTestCase(BaseTestCase):
self.sched = zuul.scheduler.Scheduler(self.config)
self.webapp = zuul.webapp.WebApp(
self.sched, port=0, listen_address='127.0.0.1')
self.event_queues = [
self.sched.result_event_queue,
self.sched.trigger_event_queue,
@ -1374,7 +1377,7 @@ class ZuulTestCase(BaseTestCase):
]
self.configure_connections()
self.sched.registerConnections(self.connections)
self.sched.registerConnections(self.connections, self.webapp)
def URLOpenerFactory(*args, **kw):
if isinstance(args[0], urllib.request.Request):
@ -1414,8 +1417,6 @@ class ZuulTestCase(BaseTestCase):
self.sched.setNodepool(self.nodepool)
self.sched.setZooKeeper(self.zk)
self.webapp = zuul.webapp.WebApp(
self.sched, port=0, listen_address='127.0.0.1')
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.start()

View File

@ -19,6 +19,7 @@ import os
import json
from six.moves import urllib
import webob
from tests.base import ZuulTestCase, FIXTURE_DIR
@ -96,3 +97,16 @@ class TestWebapp(ZuulTestCase):
self.port)
f = urllib.request.urlopen(req)
self.assertEqual(f.read(), public_pem)
def test_webapp_custom_handler(self):
def custom_handler(path, tenant_name, request):
return webob.Response(body='ok')
self.webapp.register_path('/custom', custom_handler)
req = urllib.request.Request(
"http://localhost:%s/custom" % self.port)
f = urllib.request.urlopen(req)
self.assertEqual('ok', f.read())
self.webapp.unregister_path('/custom')
self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)

View File

@ -182,7 +182,7 @@ class Scheduler(zuul.cmd.ZuulApp):
self.log.info('Starting scheduler')
try:
self.sched.start()
self.sched.registerConnections(self.connections)
self.sched.registerConnections(self.connections, webapp)
self.sched.reconfigure(self.config)
self.sched.resume()
except Exception:

View File

@ -59,3 +59,21 @@ class BaseConnection(object):
This lets the user supply a list of change objects that are
still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache."""
def registerWebapp(self, webapp):
self.webapp = webapp
def registerHttpHandler(self, path, handler):
"""Add connection handler for HTTP URI.
Connection can use builtin HTTP server for listening on incoming event
requests. The resulting path will be /connection/connection_name/path.
"""
self.webapp.register_path(self._connectionPath(path), handler)
def unregisterHttpHandler(self, path):
"""Remove the connection handler for HTTP URI."""
self.webapp.unregister_path(self._connectionPath(path))
def _connectionPath(self, path):
return '/connection/%s/%s' % (self.connection_name, path)

View File

@ -58,6 +58,13 @@ class ConnectionRegistry(object):
if load:
connection.onLoad()
def registerWebapp(self, webapp):
for driver_name, driver in self.drivers.items():
if hasattr(driver, 'registerWebapp'):
driver.registerWebapp(webapp)
for connection_name, connection in self.connections.items():
connection.registerWebapp(webapp)
def reconfigureDrivers(self, tenant):
for driver in self.drivers.values():
if hasattr(driver, 'reconfigure'):

View File

@ -232,10 +232,11 @@ class Scheduler(threading.Thread):
self.stopConnections()
self.wake_event.set()
def registerConnections(self, connections, load=True):
def registerConnections(self, connections, webapp, load=True):
# load: whether or not to trigger the onLoad for the connection. This
# is useful for not doing a full load during layout validation.
self.connections = connections
self.connections.registerWebapp(webapp)
self.connections.registerScheduler(self, load)
def stopConnections(self):

View File

@ -45,6 +45,7 @@ array of changes, they will not include the queue structure.
class WebApp(threading.Thread):
log = logging.getLogger("zuul.WebApp")
change_path_regexp = '/status/change/(\d+,\d+)$'
def __init__(self, scheduler, port=8001, cache_expiry=1,
listen_address='0.0.0.0'):
@ -56,10 +57,16 @@ class WebApp(threading.Thread):
self.cache_time = 0
self.cache = {}
self.daemon = True
self.routes = {}
self._init_default_routes()
self.server = httpserver.serve(
dec.wsgify(self.app), host=self.listen_address, port=self.port,
start_loop=False)
def _init_default_routes(self):
self.register_path('/(status\.json|status)$', self.status)
self.register_path(self.change_path_regexp, self.change)
def run(self):
self.server.serve_forever()
@ -90,14 +97,13 @@ class WebApp(threading.Thread):
return change['id'] == rev
return self._changes_by_func(func, tenant_name)
def _normalize_path(self, path):
# support legacy status.json as well as new /status
if path == '/status.json' or path == '/status':
return "status"
m = re.match('/status/change/(\d+,\d+)$', path)
if m:
return m.group(1)
return None
def register_path(self, path, handler):
path_re = re.compile(path)
self.routes[path] = (path_re, handler)
def unregister_path(self, path):
if self.routes.get(path):
del self.routes[path]
def _handle_keys(self, request, path):
m = re.match('/keys/(.*?)/(.*?).pub', path)
@ -120,14 +126,43 @@ class WebApp(threading.Thread):
return response.conditional_response_app
def app(self, request):
# Try registered paths without a tenant_name first
path = request.path
for path_re, handler in self.routes.itervalues():
if path_re.match(path):
return handler(path, '', request)
# Now try with a tenant_name stripped
tenant_name = request.path.split('/')[1]
path = request.path.replace('/' + tenant_name, '')
# Handle keys
if path.startswith('/keys'):
return self._handle_keys(request, path)
path = self._normalize_path(path)
if path is None:
for path_re, handler in self.routes.itervalues():
if path_re.match(path):
return handler(path, tenant_name, request)
else:
raise webob.exc.HTTPNotFound()
def status(self, path, tenant_name, request):
def func():
return webob.Response(body=self.cache[tenant_name],
content_type='application/json')
return self._response_with_status_cache(func, tenant_name)
def change(self, path, tenant_name, request):
def func():
m = re.match(self.change_path_regexp, path)
change_id = m.group(1)
status = self._status_for_change(change_id, tenant_name)
if status:
return webob.Response(body=status,
content_type='application/json')
else:
raise webob.exc.HTTPNotFound()
return self._response_with_status_cache(func, tenant_name)
def _refresh_status_cache(self, tenant_name):
if (tenant_name not in self.cache or
(time.time() - self.cache_time) > self.cache_expiry):
try:
@ -140,16 +175,10 @@ class WebApp(threading.Thread):
self.log.exception("Exception formatting status:")
raise
if path == 'status':
response = webob.Response(body=self.cache[tenant_name],
content_type='application/json')
else:
status = self._status_for_change(path, tenant_name)
if status:
response = webob.Response(body=status,
content_type='application/json')
else:
raise webob.exc.HTTPNotFound()
def _response_with_status_cache(self, func, tenant_name):
self._refresh_status_cache(tenant_name)
response = func()
response.headers['Access-Control-Allow-Origin'] = '*'