Merge tenant reconfiguration events
Replace the queue used for management events with one that can combine similar events. In this case, make it able to merge tenant reconfiguration events, so that if multiple patches which change the config merge between iterations of the event processor, we only reconfigure the tenant once. Tenant reconfiguration events are also associated with projects, so make sure that when we merge them, we combine the list of projects as well, so they have their cached configuration cleared. Finally, don't store a reference to the tenant, but rather just the tenant name, so that if reconfiguration events are queued, we don't keep extra copies of layouts in ram. Change-Id: If1669a0119b52ad0e3b9a4b92ee10d318df2eb18
This commit is contained in:
parent
3692b613c5
commit
419a8679bb
@ -2506,6 +2506,28 @@ class TestScheduler(ZuulTestCase):
|
||||
self.assertIn('project-merge', status_jobs[1]['dependencies'])
|
||||
self.assertIn('project-merge', status_jobs[2]['dependencies'])
|
||||
|
||||
def test_reconfigure_merge(self):
|
||||
"""Test that two reconfigure events are merged"""
|
||||
|
||||
tenant = self.sched.abide.tenants['tenant-one']
|
||||
(trusted, project) = tenant.getProject('org/project')
|
||||
|
||||
self.sched.run_handler_lock.acquire()
|
||||
self.assertEqual(self.sched.management_event_queue.qsize(), 0)
|
||||
|
||||
self.sched.reconfigureTenant(tenant, project)
|
||||
self.assertEqual(self.sched.management_event_queue.qsize(), 1)
|
||||
|
||||
self.sched.reconfigureTenant(tenant, project)
|
||||
# The second event should have been combined with the first
|
||||
# so we should still only have one entry.
|
||||
self.assertEqual(self.sched.management_event_queue.qsize(), 1)
|
||||
|
||||
self.sched.run_handler_lock.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(self.sched.management_event_queue.qsize(), 0)
|
||||
|
||||
def test_live_reconfiguration(self):
|
||||
"Test that live reconfiguration works"
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
78
zuul/lib/queue.py
Normal file
78
zuul/lib/queue.py
Normal file
@ -0,0 +1,78 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# Copyright 2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import threading
|
||||
|
||||
|
||||
class MergedQueue(object):
|
||||
def __init__(self):
|
||||
self.queue = collections.deque()
|
||||
self.lock = threading.RLock()
|
||||
self.condition = threading.Condition(self.lock)
|
||||
self.join_condition = threading.Condition(self.lock)
|
||||
self.tasks = 0
|
||||
|
||||
def qsize(self):
|
||||
return len(self.queue)
|
||||
|
||||
def empty(self):
|
||||
return self.qsize() == 0
|
||||
|
||||
def put(self, item):
|
||||
# Returns the original item if added, or an updated equivalent
|
||||
# item if already enqueued.
|
||||
self.condition.acquire()
|
||||
ret = None
|
||||
try:
|
||||
for x in self.queue:
|
||||
if item == x:
|
||||
ret = x
|
||||
if hasattr(ret, 'merge'):
|
||||
ret.merge(item)
|
||||
if ret is None:
|
||||
ret = item
|
||||
self.queue.append(item)
|
||||
self.condition.notify()
|
||||
finally:
|
||||
self.condition.release()
|
||||
return ret
|
||||
|
||||
def get(self):
|
||||
self.condition.acquire()
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
ret = self.queue.popleft()
|
||||
self.join_condition.acquire()
|
||||
self.tasks += 1
|
||||
self.join_condition.release()
|
||||
return ret
|
||||
except IndexError:
|
||||
self.condition.wait()
|
||||
finally:
|
||||
self.condition.release()
|
||||
|
||||
def task_done(self):
|
||||
self.join_condition.acquire()
|
||||
self.tasks -= 1
|
||||
self.join_condition.notify()
|
||||
self.join_condition.release()
|
||||
|
||||
def join(self):
|
||||
self.join_condition.acquire()
|
||||
while self.tasks:
|
||||
self.join_condition.wait()
|
||||
self.join_condition.release()
|
@ -31,6 +31,7 @@ from zuul import exceptions
|
||||
from zuul import version as zuul_version
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.statsd import get_statsd
|
||||
import zuul.lib.queue
|
||||
|
||||
|
||||
class ManagementEvent(object):
|
||||
@ -76,8 +77,23 @@ class TenantReconfigureEvent(ManagementEvent):
|
||||
"""
|
||||
def __init__(self, tenant, project):
|
||||
super(TenantReconfigureEvent, self).__init__()
|
||||
self.tenant = tenant
|
||||
self.project = project
|
||||
self.tenant_name = tenant.name
|
||||
self.projects = set([project])
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, TenantReconfigureEvent):
|
||||
return False
|
||||
# We don't check projects because they will get combined when
|
||||
# merged.
|
||||
return (self.tenant_name == other.tenant_name)
|
||||
|
||||
def merge(self, other):
|
||||
if self.tenant_name != other.tenant_name:
|
||||
raise Exception("Can not merge events from different tenants")
|
||||
self.projects |= other.projects
|
||||
|
||||
|
||||
class PromoteEvent(ManagementEvent):
|
||||
@ -223,7 +239,7 @@ class Scheduler(threading.Thread):
|
||||
|
||||
self.trigger_event_queue = queue.Queue()
|
||||
self.result_event_queue = queue.Queue()
|
||||
self.management_event_queue = queue.Queue()
|
||||
self.management_event_queue = zuul.lib.queue.MergedQueue()
|
||||
self.abide = model.Abide()
|
||||
|
||||
if not testonly:
|
||||
@ -475,15 +491,16 @@ class Scheduler(threading.Thread):
|
||||
self.log.debug("Tenant reconfiguration beginning")
|
||||
# If a change landed to a project, clear out the cached
|
||||
# config before reconfiguring.
|
||||
if event.project:
|
||||
event.project.unparsed_config = None
|
||||
for project in event.projects:
|
||||
project.unparsed_config = None
|
||||
old_tenant = self.abide.tenants[event.tenant_name]
|
||||
loader = configloader.ConfigLoader()
|
||||
abide = loader.reloadTenant(
|
||||
self.config.get('scheduler', 'tenant_config'),
|
||||
self._get_project_key_dir(),
|
||||
self, self.merger, self.connections,
|
||||
self.abide, event.tenant)
|
||||
tenant = abide.tenants[event.tenant.name]
|
||||
self.abide, old_tenant)
|
||||
tenant = abide.tenants[event.tenant_name]
|
||||
self._reconfigureTenant(tenant)
|
||||
self.abide = abide
|
||||
finally:
|
||||
|
Loading…
x
Reference in New Issue
Block a user